mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Compare commits
7 Commits
25.08.0-de
...
redis-lock
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e0e128244 | ||
|
|
34e932556b | ||
|
|
a50ea05b19 | ||
|
|
5cfa6bc217 | ||
|
|
06c9a2b21a | ||
|
|
f841b217e8 | ||
|
|
af496fe5e3 |
@@ -15,6 +15,7 @@ from redash.tasks.alerts import check_alerts_for_query
|
||||
from redash.tasks.failure_report import track_failure
|
||||
from redash.tasks.worker import Job, Queue
|
||||
from redash.utils import gen_query_hash, utcnow
|
||||
from redash.utils.locks import acquire_lock, release_lock
|
||||
from redash.worker import get_job_logger
|
||||
|
||||
logger = get_job_logger(__name__)
|
||||
@@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
||||
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
|
||||
try_count = 0
|
||||
job = None
|
||||
job_lock_id = _job_lock_id(query_hash, data_source.id)
|
||||
|
||||
while try_count < 5:
|
||||
try_count += 1
|
||||
identifier = acquire_lock(job_lock_id)
|
||||
if identifier is None:
|
||||
continue
|
||||
|
||||
pipe = redis_connection.pipeline()
|
||||
try:
|
||||
pipe.watch(_job_lock_id(query_hash, data_source.id))
|
||||
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
|
||||
pipe.watch(job_lock_id)
|
||||
job_id = pipe.get(job_lock_id)
|
||||
if job_id:
|
||||
logger.info("[%s] Found existing job: %s", query_hash, job_id)
|
||||
job_complete = None
|
||||
@@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
||||
|
||||
if lock_is_irrelevant:
|
||||
logger.info("[%s] %s, removing lock", query_hash, message)
|
||||
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
|
||||
redis_connection.delete(job_lock_id)
|
||||
job = None
|
||||
|
||||
if not job:
|
||||
@@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
||||
except redis.WatchError:
|
||||
continue
|
||||
finally:
|
||||
release_lock(job_lock_id, identifier)
|
||||
pipe.reset()
|
||||
|
||||
if not job:
|
||||
|
||||
61
redash/utils/locks.py
Normal file
61
redash/utils/locks.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from redis import WatchError
|
||||
|
||||
from redash import redis_connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def acquire_lock(name, acquire_timeout=10, lock_timeout=5):
|
||||
identifier = str(uuid.uuid4())
|
||||
lock_name = f"lock:{name}"
|
||||
end = time.time() + acquire_timeout
|
||||
|
||||
base_delay = 0.001
|
||||
max_delay = 0.05
|
||||
|
||||
while time.time() < end:
|
||||
if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True):
|
||||
logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||
return identifier
|
||||
|
||||
delay = base_delay + random.uniform(0, base_delay)
|
||||
time.sleep(min(delay, max_delay))
|
||||
base_delay = min(base_delay * 2, max_delay)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def release_lock(name, identifier):
|
||||
lock_name = f"lock:{name}"
|
||||
logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||
with redis_connection.pipeline() as pipe:
|
||||
while True:
|
||||
try:
|
||||
pipe.watch(lock_name)
|
||||
if pipe.get(lock_name) == identifier:
|
||||
pipe.multi()
|
||||
pipe.delete(lock_name)
|
||||
pipe.execute()
|
||||
logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||
return True
|
||||
pipe.unwatch()
|
||||
logger.warning(
|
||||
"Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier
|
||||
)
|
||||
break
|
||||
except WatchError:
|
||||
logger.warning(
|
||||
"WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]",
|
||||
lock_name,
|
||||
identifier,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Error releasing lock: %s", str(e))
|
||||
break
|
||||
|
||||
return False
|
||||
Reference in New Issue
Block a user