mirror of
https://github.com/getredash/redash.git
synced 2025-12-20 01:47:39 -05:00
Compare commits
1 Commits
25.06.0-de
...
user-and-o
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2dce31dd32 |
@@ -61,3 +61,14 @@ def database_key_definitions(default):
|
|||||||
# Since you can define custom primary key types using `database_key_definitions`, you may want to load certain extensions when creating the database.
|
# Since you can define custom primary key types using `database_key_definitions`, you may want to load certain extensions when creating the database.
|
||||||
# To do so, simply add the name of the extension you'd like to load to this list.
|
# To do so, simply add the name of the extension you'd like to load to this list.
|
||||||
database_extensions = []
|
database_extensions = []
|
||||||
|
|
||||||
|
|
||||||
|
# If you'd like to limit the amount of concurrent query executions made by a certain org or user,
|
||||||
|
# implement this method by returning a boolean which would indicate if the limit has reached.
|
||||||
|
# If you return `True`, the query execution would move to a waiting list and would only be executed
|
||||||
|
# when a spot clears up for it within the defined capacity.
|
||||||
|
# `entity` is either "user" or "org".
|
||||||
|
# `executions` is the number of currently running query execution jobs for the specific user/org.
|
||||||
|
# `meta` is the query execution job's meta attribute.
|
||||||
|
def capacity_reached_for(entity, executions, meta):
|
||||||
|
return False
|
||||||
@@ -16,6 +16,7 @@ from .queries import (
|
|||||||
from .alerts import check_alerts_for_query
|
from .alerts import check_alerts_for_query
|
||||||
from .failure_report import send_aggregated_errors
|
from .failure_report import send_aggregated_errors
|
||||||
from .worker import Worker, Queue, Job
|
from .worker import Worker, Queue, Job
|
||||||
|
from .capacity import cleanup_waiting_lists
|
||||||
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
|
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
|
||||||
|
|
||||||
from redash import rq_redis_connection
|
from redash import rq_redis_connection
|
||||||
|
|||||||
135
redash/tasks/capacity.py
Normal file
135
redash/tasks/capacity.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
import re
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
from rq import Queue, Worker
|
||||||
|
from rq.job import Job
|
||||||
|
from redash import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_waiting_lists():
|
||||||
|
"""
|
||||||
|
When a job is enqueued/dequeued to/from a CapacityQueue and it exceeds the org/user capacity, it is entered into a waiting list.
|
||||||
|
Later on, when a CapacityWorker finishes work on a job and a slot for a job on the waiting list becomes available, the worker will trigger the corresponding job
|
||||||
|
on the waiting list and re-queue it back to the original queue.
|
||||||
|
|
||||||
|
However, if a (non-horse) worker dies in the middle of execution, it will not trigger the next item on the waiting list. If there is any other
|
||||||
|
job for that org or user queued or executing, they will trigger those jobs eventually, but if no other jobs are queued or executing, the jobs
|
||||||
|
on the waiting list may never execute.
|
||||||
|
|
||||||
|
This periodic task looks at all waiting lists and sees if there are no triggers for any of them. In case no triggers are found, we can assume that
|
||||||
|
their worker died and re-enqueue them back into their original queues.
|
||||||
|
|
||||||
|
If a waiting list is empty, it can be deleted.
|
||||||
|
"""
|
||||||
|
queues = set(Queue.all())
|
||||||
|
waiting_lists = set([q for q in queues if q.name.endswith(":waiting")])
|
||||||
|
wip = itertools.chain(
|
||||||
|
*[
|
||||||
|
queue.started_job_registry.get_job_ids()
|
||||||
|
for queue in (queues - waiting_lists)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
for waiting_list in waiting_lists:
|
||||||
|
trigger = next(
|
||||||
|
(j for j in wip if waiting_list.name.split(":origin")[0] in j), None
|
||||||
|
)
|
||||||
|
|
||||||
|
if trigger is None:
|
||||||
|
if waiting_list.is_empty():
|
||||||
|
logger.warning(
|
||||||
|
f"Waiting list {waiting_list.name} is empty and will be deleted."
|
||||||
|
)
|
||||||
|
waiting_list.delete()
|
||||||
|
else:
|
||||||
|
origin_name = re.findall(r"origin:(.*?):", waiting_list.name)[0]
|
||||||
|
logger.warning(
|
||||||
|
f"Waiting list {waiting_list.name} has no executing job to trigger it. Returning all jobs from the waiting list back to their original queue ({origin_name})."
|
||||||
|
)
|
||||||
|
origin = CapacityQueue(origin_name)
|
||||||
|
|
||||||
|
while waiting_list.count > 0:
|
||||||
|
job_id = waiting_list.pop_job_id()
|
||||||
|
job = Job.fetch(job_id)
|
||||||
|
origin.enqueue_job(job, at_front=True)
|
||||||
|
|
||||||
|
|
||||||
|
entity_key = lambda entity, job: f"{entity}:{job.meta[f'{entity}_id']}"
|
||||||
|
|
||||||
|
waiting_list_key = (
|
||||||
|
lambda entity, job, origin_name: f"{entity_key(entity, job)}:origin:{origin_name}:waiting"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CapacityQueue(Queue):
|
||||||
|
def find_waiting_list(self, job_ids, entity, job):
|
||||||
|
executions = sum(map(lambda job_id: entity_key(entity, job) in job_id, job_ids))
|
||||||
|
if settings.dynamic_settings.capacity_reached_for(entity, executions, job.meta):
|
||||||
|
waiting_list = waiting_list_key(entity, job, self.name)
|
||||||
|
logger.warning(
|
||||||
|
f"Moving job {job.id} to the {entity}'s waiting list ({waiting_list}) since {entity_key(entity, job)} is currently executing {executions} jobs and has reached the {entity} capacity."
|
||||||
|
)
|
||||||
|
return waiting_list
|
||||||
|
|
||||||
|
def enter_waiting_list(self, job, pipeline=None):
|
||||||
|
if job.meta.get("is_query_execution", False):
|
||||||
|
job_ids = self.started_job_registry.get_job_ids()
|
||||||
|
|
||||||
|
waiting_list = self.find_waiting_list(
|
||||||
|
job_ids, "user", job
|
||||||
|
) or self.find_waiting_list(job_ids, "org", job)
|
||||||
|
|
||||||
|
if waiting_list:
|
||||||
|
return Queue(waiting_list).enqueue_job(job, pipeline=pipeline)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def dequeue_any(cls, *args, **kwargs):
|
||||||
|
result = super(CapacityQueue, cls).dequeue_any(*args, **kwargs)
|
||||||
|
if result is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
job, queue = result
|
||||||
|
|
||||||
|
if queue.enter_waiting_list(job):
|
||||||
|
return cls.dequeue_any(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
return job, queue
|
||||||
|
|
||||||
|
def enqueue_job(self, job, pipeline=None, at_front=False):
|
||||||
|
return self.enter_waiting_list(job, pipeline) or super().enqueue_job(
|
||||||
|
job, pipeline=pipeline, at_front=at_front
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CapacityWorker(Worker):
|
||||||
|
queue_class = CapacityQueue
|
||||||
|
|
||||||
|
def _process_waiting_lists(self, queue, job):
|
||||||
|
if job.meta.get("is_query_execution", False):
|
||||||
|
waiting_lists = [
|
||||||
|
Queue(waiting_list_key("user", job, queue.name)),
|
||||||
|
Queue(waiting_list_key("org", job, queue.name)),
|
||||||
|
]
|
||||||
|
|
||||||
|
result = Queue.dequeue_any(waiting_lists, None, job_class=self.job_class)
|
||||||
|
|
||||||
|
if result is not None:
|
||||||
|
waiting_job, _ = result
|
||||||
|
logger.warning(
|
||||||
|
f"Moving job {waiting_job.id} from waiting list ({waiting_job.origin}) back to the original queue ({queue.name}) since an execution slot opened up for it."
|
||||||
|
)
|
||||||
|
queue.enqueue_job(waiting_job)
|
||||||
|
|
||||||
|
def handle_job_success(self, job, queue, started_job_registry):
|
||||||
|
try:
|
||||||
|
super().handle_job_success(job, queue, started_job_registry)
|
||||||
|
finally:
|
||||||
|
self._process_waiting_lists(queue, job)
|
||||||
|
|
||||||
|
def handle_job_failure(self, job, queue, started_job_registry=None, exc_string=""):
|
||||||
|
try:
|
||||||
|
super().handle_job_failure(job, queue, started_job_registry, exc_string)
|
||||||
|
finally:
|
||||||
|
self._process_waiting_lists(queue, job)
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
import redis
|
import redis
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from rq import get_current_job
|
from rq import get_current_job
|
||||||
from rq.job import JobStatus
|
from rq.job import JobStatus
|
||||||
@@ -86,12 +87,14 @@ def enqueue_query(
|
|||||||
|
|
||||||
queue = Queue(queue_name)
|
queue = Queue(queue_name)
|
||||||
enqueue_kwargs = {
|
enqueue_kwargs = {
|
||||||
|
"job_id": f"org:{data_source.org_id}:user:{user_id}:id:{uuid4()}",
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"scheduled_query_id": scheduled_query_id,
|
"scheduled_query_id": scheduled_query_id,
|
||||||
"is_api_key": is_api_key,
|
"is_api_key": is_api_key,
|
||||||
"job_timeout": time_limit,
|
"job_timeout": time_limit,
|
||||||
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
|
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
|
||||||
"meta": {
|
"meta": {
|
||||||
|
"is_query_execution": True,
|
||||||
"data_source_id": data_source.id,
|
"data_source_id": data_source.id,
|
||||||
"org_id": data_source.org_id,
|
"org_id": data_source.org_id,
|
||||||
"scheduled": scheduled_query_id is not None,
|
"scheduled": scheduled_query_id is not None,
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from redash.tasks import (
|
|||||||
version_check,
|
version_check,
|
||||||
send_aggregated_errors,
|
send_aggregated_errors,
|
||||||
Queue,
|
Queue,
|
||||||
|
cleanup_waiting_lists,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -81,6 +82,7 @@ def periodic_job_definitions():
|
|||||||
"func": send_aggregated_errors,
|
"func": send_aggregated_errors,
|
||||||
"interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
|
"interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
|
||||||
},
|
},
|
||||||
|
{"func": cleanup_waiting_lists, "interval": timedelta(minutes=1)},
|
||||||
]
|
]
|
||||||
|
|
||||||
if settings.VERSION_CHECK:
|
if settings.VERSION_CHECK:
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import errno
|
|||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
from redash.tasks.capacity import CapacityQueue, CapacityWorker
|
||||||
from redash import statsd_client
|
from redash import statsd_client
|
||||||
from rq import Queue as BaseQueue, get_current_job
|
from rq import Queue as BaseQueue, get_current_job
|
||||||
from rq.worker import HerokuWorker # HerokuWorker implements graceful shutdown on SIGTERM
|
from rq.worker import HerokuWorker # HerokuWorker implements graceful shutdown on SIGTERM
|
||||||
@@ -37,7 +38,7 @@ class CancellableQueue(BaseQueue):
|
|||||||
job_class = CancellableJob
|
job_class = CancellableJob
|
||||||
|
|
||||||
|
|
||||||
class RedashQueue(StatsdRecordingQueue, CancellableQueue):
|
class RedashQueue(StatsdRecordingQueue, CancellableQueue, CapacityQueue):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -165,7 +166,7 @@ class HardLimitingWorker(HerokuWorker):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
|
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker, CapacityWorker):
|
||||||
queue_class = RedashQueue
|
queue_class = RedashQueue
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
184
tests/tasks/test_capacity.py
Normal file
184
tests/tasks/test_capacity.py
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
from mock import MagicMock, patch, call
|
||||||
|
from tests import BaseTestCase
|
||||||
|
|
||||||
|
from rq import push_connection, pop_connection, Queue
|
||||||
|
from rq.job import JobStatus
|
||||||
|
from redash import rq_redis_connection
|
||||||
|
from redash.tasks.capacity import CapacityWorker, CapacityQueue
|
||||||
|
from redash.tasks.worker import Job
|
||||||
|
|
||||||
|
|
||||||
|
def say_hello():
|
||||||
|
return "Hello!"
|
||||||
|
|
||||||
|
|
||||||
|
def create_job(job_id, **meta):
|
||||||
|
meta["is_query_execution"] = True
|
||||||
|
return Job.create(
|
||||||
|
say_hello,
|
||||||
|
id=f"org:{meta['org_id']}:user:{meta['user_id']}:id:{job_id}",
|
||||||
|
meta=meta,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCapacityQueue(BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
push_connection(rq_redis_connection)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
pop_connection()
|
||||||
|
rq_redis_connection.flushdb()
|
||||||
|
|
||||||
|
@patch("redash.settings.dynamic_settings.capacity_reached_for", return_value=True)
|
||||||
|
def test_redirects_to_user_waiting_list_if_over_capacity(self, _):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
|
||||||
|
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
self.assertEqual(job_1.origin, "user:John:origin:default:waiting")
|
||||||
|
self.assertEqual(job_2.origin, "user:John:origin:default:waiting")
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"redash.settings.dynamic_settings.capacity_reached_for",
|
||||||
|
side_effect=[False, True, False, True],
|
||||||
|
)
|
||||||
|
def test_redirects_to_org_waiting_list_if_over_capacity(self, _):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
|
||||||
|
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="Mark"))
|
||||||
|
|
||||||
|
self.assertEqual(job_1.origin, "org:Acme:origin:default:waiting")
|
||||||
|
self.assertEqual(job_2.origin, "org:Acme:origin:default:waiting")
|
||||||
|
|
||||||
|
|
||||||
|
class TestCapacityWorker(BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
push_connection(rq_redis_connection)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
pop_connection()
|
||||||
|
rq_redis_connection.flushdb()
|
||||||
|
|
||||||
|
@patch("redash.settings.dynamic_settings.capacity_reached_for", return_value=True)
|
||||||
|
def test_always_handles_non_query_execution_jobs(self, _):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job = queue.enqueue(say_hello)
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
worker.work(burst=True)
|
||||||
|
|
||||||
|
self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
|
||||||
|
|
||||||
|
def test_handles_job_if_within_capacity(self):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
worker.work(burst=True)
|
||||||
|
|
||||||
|
self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
|
||||||
|
|
||||||
|
def test_doesnt_handle_job_if_over_user_capacity(self):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
|
||||||
|
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
with patch(
|
||||||
|
"redash.settings.dynamic_settings.capacity_reached_for",
|
||||||
|
side_effect=[False, False, True],
|
||||||
|
):
|
||||||
|
worker.work(burst=True)
|
||||||
|
|
||||||
|
job_1.refresh()
|
||||||
|
self.assertEqual(job_1.get_status(), JobStatus.FINISHED)
|
||||||
|
|
||||||
|
job_2.refresh()
|
||||||
|
self.assertEqual(job_2.get_status(), JobStatus.QUEUED)
|
||||||
|
self.assertEqual(job_2.origin, "user:John:origin:default:waiting")
|
||||||
|
|
||||||
|
def test_doesnt_handle_job_if_over_org_capacity(self):
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
|
||||||
|
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
with patch(
|
||||||
|
"redash.settings.dynamic_settings.capacity_reached_for",
|
||||||
|
side_effect=[False, False, False, True],
|
||||||
|
):
|
||||||
|
worker.work(burst=True)
|
||||||
|
|
||||||
|
job_1.refresh()
|
||||||
|
self.assertEqual(job_1.get_status(), JobStatus.FINISHED)
|
||||||
|
|
||||||
|
job_2.refresh()
|
||||||
|
self.assertEqual(job_2.get_status(), JobStatus.QUEUED)
|
||||||
|
self.assertEqual(job_2.origin, "org:Acme:origin:default:waiting")
|
||||||
|
|
||||||
|
def test_isolates_capacity_between_original_queues(self):
|
||||||
|
queries_queue = CapacityQueue("queries")
|
||||||
|
adhoc_query = queries_queue.enqueue_job(
|
||||||
|
create_job(1, org_id="Acme", user_id="John")
|
||||||
|
)
|
||||||
|
|
||||||
|
scheduled_queries_queue = CapacityQueue("scheduled_queries")
|
||||||
|
scheduled_query = scheduled_queries_queue.enqueue_job(
|
||||||
|
create_job(2, org_id="Acme", user_id="John")
|
||||||
|
)
|
||||||
|
|
||||||
|
worker = CapacityWorker([queries_queue, scheduled_queries_queue])
|
||||||
|
with patch(
|
||||||
|
"redash.settings.dynamic_settings.capacity_reached_for", return_value=True
|
||||||
|
):
|
||||||
|
worker.work(burst=True)
|
||||||
|
|
||||||
|
adhoc_query.refresh()
|
||||||
|
self.assertEqual(adhoc_query.get_status(), JobStatus.QUEUED)
|
||||||
|
self.assertEqual(adhoc_query.origin, "user:John:origin:queries:waiting")
|
||||||
|
|
||||||
|
scheduled_query.refresh()
|
||||||
|
self.assertEqual(scheduled_query.get_status(), JobStatus.QUEUED)
|
||||||
|
self.assertEqual(
|
||||||
|
scheduled_query.origin, "user:John:origin:scheduled_queries:waiting"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_handles_waiting_user_jobs_when_user_slot_opens_up(self):
|
||||||
|
user_waiting_list = Queue("user:John:origin:default:waiting")
|
||||||
|
user_waiting_job = user_waiting_list.enqueue_job(
|
||||||
|
create_job(1, org_id="Acme", user_id="John")
|
||||||
|
)
|
||||||
|
|
||||||
|
org_waiting_list = Queue("org:Acme")
|
||||||
|
org_waiting_job = org_waiting_list.enqueue_job(
|
||||||
|
create_job(2, org_id="Acme", user_id="Mark", original_queue="default")
|
||||||
|
)
|
||||||
|
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job = queue.enqueue_job(create_job(3, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
worker.work(max_jobs=2)
|
||||||
|
|
||||||
|
user_waiting_job.refresh()
|
||||||
|
self.assertEqual(user_waiting_job.get_status(), JobStatus.FINISHED)
|
||||||
|
self.assertEqual(user_waiting_job.origin, "default")
|
||||||
|
|
||||||
|
org_waiting_job.refresh()
|
||||||
|
self.assertEqual(org_waiting_job.get_status(), JobStatus.QUEUED)
|
||||||
|
|
||||||
|
def test_handles_waiting_org_jobs_when_org_job_opens_up(self):
|
||||||
|
org_waiting_list = Queue("org:Acme:origin:default:waiting")
|
||||||
|
org_waiting_job = org_waiting_list.enqueue_job(
|
||||||
|
create_job(1, org_id="Acme", user_id="Mark")
|
||||||
|
)
|
||||||
|
|
||||||
|
queue = CapacityQueue()
|
||||||
|
job = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
|
||||||
|
|
||||||
|
worker = CapacityWorker([queue])
|
||||||
|
worker.work(max_jobs=2)
|
||||||
|
|
||||||
|
org_waiting_job.refresh()
|
||||||
|
self.assertEqual(org_waiting_job.get_status(), JobStatus.FINISHED)
|
||||||
|
self.assertEqual(org_waiting_job.origin, "default")
|
||||||
Reference in New Issue
Block a user