Fix: don't remove locks for queries with task status of PENDING.

It's possible the Celery metadata object was expired, but the task
is still running (which will result in PENDING status when querying
the AsyncResult object).
This commit is contained in:
Arik Fraimovich
2017-05-18 09:25:44 +03:00
parent 52084c322f
commit 76470b9f09

View File

@@ -1,14 +1,17 @@
import json
import time
import logging
import signal
import time
import redis
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client, settings, utils
from redash import models, redis_connection, settings, statsd_client, utils
from redash.query_runner import InterruptException
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import InterruptException
from .alerts import check_alerts_for_query
logger = get_task_logger(__name__)
@@ -265,7 +268,7 @@ def refresh_queries():
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logging.info("Disabled refresh queries.")
elif query.data_source.paused:
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
@@ -299,14 +302,6 @@ def cleanup_tasks():
for tracker in in_progress:
result = AsyncResult(tracker.task_id)
# If the AsyncResult status is PENDING it means there is no celery task object for this tracker, and we can
# mark it as "dead":
if result.status == 'PENDING':
logging.info("In progress tracker for %s is no longer enqueued, cancelling (task: %s).",
tracker.query_hash, tracker.task_id)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='cancelled')
if result.ready():
logging.info("in progress tracker %s finished", tracker.query_hash)
_unlock(tracker.query_hash, tracker.data_source_id)