mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Fix RQ wrongly moving jobs to FailedJobRegistry (#7186)
Something changed in python-rq and the old code was behaving in a way that if a job ran for longer than 2 min it would be automatically set as failed, but it would continue running. This causes a problem in the UI because it is as if the job stopped, but it actually didn't
This commit is contained in:
@@ -6,7 +6,7 @@ import sys
|
||||
from rq import Queue as BaseQueue
|
||||
from rq.job import Job as BaseJob
|
||||
from rq.job import JobStatus
|
||||
from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty
|
||||
from rq.timeouts import HorseMonitorTimeoutException
|
||||
from rq.utils import utcnow
|
||||
from rq.worker import (
|
||||
HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM
|
||||
@@ -113,30 +113,44 @@ class HardLimitingWorker(BaseWorker):
|
||||
)
|
||||
self.kill_horse()
|
||||
|
||||
def monitor_work_horse(self, job, queue):
|
||||
def monitor_work_horse(self, job: "Job", queue: "Queue"):
|
||||
"""The worker will monitor the work horse and make sure that it
|
||||
either executes successfully or the status of the job is set to
|
||||
failed
|
||||
|
||||
Args:
|
||||
job (Job): _description_
|
||||
queue (Queue): _description_
|
||||
"""
|
||||
self.monitor_started = utcnow()
|
||||
retpid = ret_val = rusage = None
|
||||
job.started_at = utcnow()
|
||||
while True:
|
||||
try:
|
||||
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
|
||||
retpid, ret_val = os.waitpid(self._horse_pid, 0)
|
||||
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
|
||||
retpid, ret_val, rusage = self.wait_for_horse()
|
||||
break
|
||||
except HorseMonitorTimeoutException:
|
||||
# Horse has not exited yet and is still running.
|
||||
# Send a heartbeat to keep the worker alive.
|
||||
self.heartbeat(self.job_monitoring_interval + 5)
|
||||
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())
|
||||
|
||||
job.refresh()
|
||||
# Kill the job from this side if something is really wrong (interpreter lock/etc).
|
||||
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
|
||||
self.heartbeat(self.job_monitoring_interval + 60)
|
||||
self.kill_horse()
|
||||
self.wait_for_horse()
|
||||
break
|
||||
|
||||
self.maintain_heartbeats(job)
|
||||
|
||||
if job.is_cancelled:
|
||||
self.stop_executing_job(job)
|
||||
|
||||
if self.soft_limit_exceeded(job):
|
||||
self.enforce_hard_limit(job)
|
||||
|
||||
except OSError as e:
|
||||
# In case we encountered an OSError due to EINTR (which is
|
||||
# caused by a SIGINT or SIGTERM signal during
|
||||
@@ -149,29 +163,32 @@ class HardLimitingWorker(BaseWorker):
|
||||
# Send a heartbeat to keep the worker alive.
|
||||
self.heartbeat()
|
||||
|
||||
self.set_current_job_working_time(0)
|
||||
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
|
||||
if ret_val == os.EX_OK: # The process exited normally.
|
||||
return
|
||||
|
||||
job_status = job.get_status()
|
||||
|
||||
if job_status is None: # Job completed and its ttl has expired
|
||||
return
|
||||
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
||||
elif self._stopped_job_id == job.id:
|
||||
# Work-horse killed deliberately
|
||||
self.log.warning("Job stopped by user, moving job to FailedJobRegistry")
|
||||
if job.stopped_callback:
|
||||
job.execute_stopped_callback(self.death_penalty_class)
|
||||
self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
|
||||
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
||||
if not job.ended_at:
|
||||
job.ended_at = utcnow()
|
||||
|
||||
# Unhandled failure: move the job to the failed queue
|
||||
self.log.warning(
|
||||
(
|
||||
"Moving job to FailedJobRegistry "
|
||||
"(work-horse terminated unexpectedly; waitpid returned {})" # fmt: skip
|
||||
).format(ret_val)
|
||||
)
|
||||
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
|
||||
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
|
||||
self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string)
|
||||
|
||||
self.handle_job_failure(
|
||||
job,
|
||||
queue=queue,
|
||||
exc_string="Work-horse process was terminated unexpectedly "
|
||||
"(waitpid returned %s)" % ret_val, # fmt: skip
|
||||
)
|
||||
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
|
||||
self.handle_job_failure(job, queue=queue, exc_string=exc_string)
|
||||
|
||||
|
||||
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
|
||||
|
||||
Reference in New Issue
Block a user