mirror of
https://github.com/getredash/redash.git
synced 2025-12-20 09:57:35 -05:00
Compare commits
7 Commits
master
...
redis-lock
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e0e128244 | ||
|
|
34e932556b | ||
|
|
a50ea05b19 | ||
|
|
5cfa6bc217 | ||
|
|
06c9a2b21a | ||
|
|
f841b217e8 | ||
|
|
af496fe5e3 |
@@ -15,6 +15,7 @@ from redash.tasks.alerts import check_alerts_for_query
|
|||||||
from redash.tasks.failure_report import track_failure
|
from redash.tasks.failure_report import track_failure
|
||||||
from redash.tasks.worker import Job, Queue
|
from redash.tasks.worker import Job, Queue
|
||||||
from redash.utils import gen_query_hash, utcnow
|
from redash.utils import gen_query_hash, utcnow
|
||||||
|
from redash.utils.locks import acquire_lock, release_lock
|
||||||
from redash.worker import get_job_logger
|
from redash.worker import get_job_logger
|
||||||
|
|
||||||
logger = get_job_logger(__name__)
|
logger = get_job_logger(__name__)
|
||||||
@@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
|||||||
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
|
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
|
||||||
try_count = 0
|
try_count = 0
|
||||||
job = None
|
job = None
|
||||||
|
job_lock_id = _job_lock_id(query_hash, data_source.id)
|
||||||
|
|
||||||
while try_count < 5:
|
while try_count < 5:
|
||||||
try_count += 1
|
try_count += 1
|
||||||
|
identifier = acquire_lock(job_lock_id)
|
||||||
|
if identifier is None:
|
||||||
|
continue
|
||||||
|
|
||||||
pipe = redis_connection.pipeline()
|
pipe = redis_connection.pipeline()
|
||||||
try:
|
try:
|
||||||
pipe.watch(_job_lock_id(query_hash, data_source.id))
|
pipe.watch(job_lock_id)
|
||||||
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
|
job_id = pipe.get(job_lock_id)
|
||||||
if job_id:
|
if job_id:
|
||||||
logger.info("[%s] Found existing job: %s", query_hash, job_id)
|
logger.info("[%s] Found existing job: %s", query_hash, job_id)
|
||||||
job_complete = None
|
job_complete = None
|
||||||
@@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
|||||||
|
|
||||||
if lock_is_irrelevant:
|
if lock_is_irrelevant:
|
||||||
logger.info("[%s] %s, removing lock", query_hash, message)
|
logger.info("[%s] %s, removing lock", query_hash, message)
|
||||||
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
|
redis_connection.delete(job_lock_id)
|
||||||
job = None
|
job = None
|
||||||
|
|
||||||
if not job:
|
if not job:
|
||||||
@@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
|
|||||||
except redis.WatchError:
|
except redis.WatchError:
|
||||||
continue
|
continue
|
||||||
finally:
|
finally:
|
||||||
|
release_lock(job_lock_id, identifier)
|
||||||
pipe.reset()
|
pipe.reset()
|
||||||
|
|
||||||
if not job:
|
if not job:
|
||||||
|
|||||||
61
redash/utils/locks.py
Normal file
61
redash/utils/locks.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from redis import WatchError
|
||||||
|
|
||||||
|
from redash import redis_connection
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def acquire_lock(name, acquire_timeout=10, lock_timeout=5):
|
||||||
|
identifier = str(uuid.uuid4())
|
||||||
|
lock_name = f"lock:{name}"
|
||||||
|
end = time.time() + acquire_timeout
|
||||||
|
|
||||||
|
base_delay = 0.001
|
||||||
|
max_delay = 0.05
|
||||||
|
|
||||||
|
while time.time() < end:
|
||||||
|
if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True):
|
||||||
|
logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||||
|
return identifier
|
||||||
|
|
||||||
|
delay = base_delay + random.uniform(0, base_delay)
|
||||||
|
time.sleep(min(delay, max_delay))
|
||||||
|
base_delay = min(base_delay * 2, max_delay)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def release_lock(name, identifier):
|
||||||
|
lock_name = f"lock:{name}"
|
||||||
|
logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||||
|
with redis_connection.pipeline() as pipe:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
pipe.watch(lock_name)
|
||||||
|
if pipe.get(lock_name) == identifier:
|
||||||
|
pipe.multi()
|
||||||
|
pipe.delete(lock_name)
|
||||||
|
pipe.execute()
|
||||||
|
logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier)
|
||||||
|
return True
|
||||||
|
pipe.unwatch()
|
||||||
|
logger.warning(
|
||||||
|
"Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier
|
||||||
|
)
|
||||||
|
break
|
||||||
|
except WatchError:
|
||||||
|
logger.warning(
|
||||||
|
"WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]",
|
||||||
|
lock_name,
|
||||||
|
identifier,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error releasing lock: %s", str(e))
|
||||||
|
break
|
||||||
|
|
||||||
|
return False
|
||||||
Reference in New Issue
Block a user