jcloud/jcloud/utils/jobs.py
2025-04-12 17:39:38 +08:00

94 lines
2.5 KiB
Python

from typing import Any, Generator, Optional
import signal
import jingrow
from jingrow.core.pagetype.rq_job.rq_job import fetch_job_ids
from jingrow.utils.background_jobs import get_queues, get_redis_conn
from redis import Redis
from rq.command import send_stop_job_command
from rq.job import Job, JobStatus, NoSuchJobError, get_current_job
def stop_background_job(job: Job):
try:
if job.get_status() == JobStatus.STARTED:
send_stop_job_command(job.connection, job.id)
elif job.get_status() in [JobStatus.QUEUED, JobStatus.SCHEDULED]:
job.cancel()
except Exception:
return
def get_background_jobs(
pagetype: str,
name: str,
status: list[str] | None = None,
connection: "Optional[Redis]" = None,
) -> Generator[Job, Any, None]:
"""
Returns background jobs for a `pg` created using the `run_pg_method`
Returned jobs are in the QUEUED, SCHEDULED or STARTED state.
"""
connection = connection or get_redis_conn()
status = status or ["queued", "scheduled", "started"]
for job_id in get_job_ids(status, connection):
try:
job = Job.fetch(job_id, connection=connection)
except NoSuchJobError:
continue
if not does_job_belong_to_pg(job, pagetype, name):
continue
yield job
def get_job_ids(
status: str | list[str],
connection: "Optional[Redis]" = None,
) -> Generator[str, Any, None]:
if isinstance(status, str):
status = [status]
connection = connection or get_redis_conn()
for q in get_queues(connection):
for s in status:
try:
job_ids = fetch_job_ids(q, s)
# ValueError thrown on macOS
# Message: signal only works in main thread of the main interpreter
except ValueError:
return
for jid in job_ids:
yield jid
def does_job_belong_to_pg(job: Job, pagetype: str, name: str) -> bool:
site = job.kwargs.get("site")
if site and site != jingrow.local.site:
return False
job_name = (
job.kwargs.get("job_type") or job.kwargs.get("job_name") or job.kwargs.get("method")
)
if job_name != "jingrow.utils.background_jobs.run_pg_method":
return False
kwargs = job.kwargs.get("kwargs", {})
if kwargs.get("pagetype") != pagetype:
return False
if kwargs.get("name") != name:
return False
return True
def has_job_timeout_exceeded() -> bool:
# RQ sets up an alarm signal and a signal handler that raises
# JobTimeoutException after the timeout amount
# getitimer returns the time left for this timer
# 0.0 means the timer is expired
return bool(get_current_job()) and (signal.getitimer(signal.ITIMER_REAL)[0] <= 0)