jcloud/jcloud/utils/test.py
2025-04-12 17:39:38 +08:00

47 lines
936 B
Python

"""Utility methods for writing tests"""
from typing import Callable
import jingrow
def foreground_enqueue_pg(
pagetype: str,
docname: str,
method: str,
queue="default",
timeout=None,
now=False, # default args unused to avoid them from going to kwargs
enqueue_after_commit=False,
job_id=None,
deduplicate=False,
at_front: bool = False,
**kwargs,
):
"""
Run enqueued method in foreground
Use for monkey patching enqueue_pg in tests
"""
getattr(jingrow.get_pg(pagetype, docname), method)(**kwargs)
def foreground_enqueue(
method: str | Callable,
queue: str = "default",
timeout: int | None = None,
event=None,
is_async: bool = True,
job_name: str | None = None,
now: bool = True,
enqueue_after_commit: bool = False,
*,
on_success: Callable = None,
on_failure: Callable = None,
at_front: bool = False,
job_id: str = None,
deduplicate: bool = False,
**kwargs,
):
return jingrow.call(method, **kwargs)