Compare commits

...

7 Commits

Author SHA1 Message Date
guyu
9e0e128244 fix 2025-03-10 18:10:27 +08:00
Peter Lee
34e932556b Merge branch 'master' into redis-lock 2025-03-08 11:25:44 +08:00
Peter Lee
a50ea05b19 Merge branch 'master' into redis-lock 2025-02-08 12:18:16 +08:00
Arik Fraimovich
5cfa6bc217 Update ci.yml to match latest master 2025-01-31 10:29:54 +02:00
guyu
06c9a2b21a fix 2025-01-24 11:48:29 +08:00
guyu
f841b217e8 format 2025-01-24 11:46:17 +08:00
guyu
af496fe5e3 for same query_text refresh just execution once 2025-01-24 11:41:18 +08:00
2 changed files with 70 additions and 3 deletions

View File

@@ -15,6 +15,7 @@ from redash.tasks.alerts import check_alerts_for_query
from redash.tasks.failure_report import track_failure from redash.tasks.failure_report import track_failure
from redash.tasks.worker import Job, Queue from redash.tasks.worker import Job, Queue
from redash.utils import gen_query_hash, utcnow from redash.utils import gen_query_hash, utcnow
from redash.utils.locks import acquire_lock, release_lock
from redash.worker import get_job_logger from redash.worker import get_job_logger
logger = get_job_logger(__name__) logger = get_job_logger(__name__)
@@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata) logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0 try_count = 0
job = None job = None
job_lock_id = _job_lock_id(query_hash, data_source.id)
while try_count < 5: while try_count < 5:
try_count += 1 try_count += 1
identifier = acquire_lock(job_lock_id)
if identifier is None:
continue
pipe = redis_connection.pipeline() pipe = redis_connection.pipeline()
try: try:
pipe.watch(_job_lock_id(query_hash, data_source.id)) pipe.watch(job_lock_id)
job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) job_id = pipe.get(job_lock_id)
if job_id: if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id) logger.info("[%s] Found existing job: %s", query_hash, job_id)
job_complete = None job_complete = None
@@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
if lock_is_irrelevant: if lock_is_irrelevant:
logger.info("[%s] %s, removing lock", query_hash, message) logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id)) redis_connection.delete(job_lock_id)
job = None job = None
if not job: if not job:
@@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
except redis.WatchError: except redis.WatchError:
continue continue
finally: finally:
release_lock(job_lock_id, identifier)
pipe.reset() pipe.reset()
if not job: if not job:

61
redash/utils/locks.py Normal file
View File

@@ -0,0 +1,61 @@
import logging
import random
import time
import uuid
from redis import WatchError
from redash import redis_connection
logger = logging.getLogger(__name__)
def acquire_lock(name, acquire_timeout=10, lock_timeout=5):
identifier = str(uuid.uuid4())
lock_name = f"lock:{name}"
end = time.time() + acquire_timeout
base_delay = 0.001
max_delay = 0.05
while time.time() < end:
if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True):
logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
return identifier
delay = base_delay + random.uniform(0, base_delay)
time.sleep(min(delay, max_delay))
base_delay = min(base_delay * 2, max_delay)
return None
def release_lock(name, identifier):
lock_name = f"lock:{name}"
logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
with redis_connection.pipeline() as pipe:
while True:
try:
pipe.watch(lock_name)
if pipe.get(lock_name) == identifier:
pipe.multi()
pipe.delete(lock_name)
pipe.execute()
logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
return True
pipe.unwatch()
logger.warning(
"Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier
)
break
except WatchError:
logger.warning(
"WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]",
lock_name,
identifier,
)
except Exception as e:
logger.error("Error releasing lock: %s", str(e))
break
return False