Compare commits

...

1 Commits

Author SHA1 Message Date
Omer Lachish
2dce31dd32 add per-org/user capacity 2021-02-15 23:39:57 +02:00
7 changed files with 340 additions and 3 deletions

View File

@@ -60,4 +60,15 @@ def database_key_definitions(default):
# Since you can define custom primary key types using `database_key_definitions`, you may want to load certain extensions when creating the database.
# To do so, simply add the name of the extension you'd like to load to this list.
database_extensions = []
database_extensions = []
# If you'd like to limit the amount of concurrent query executions made by a certain org or user,
# implement this method by returning a boolean which would indicate if the limit has reached.
# If you return `True`, the query execution would move to a waiting list and would only be executed
# when a spot clears up for it within the defined capacity.
# `entity` is either "user" or "org".
# `executions` is the number of currently running query execution jobs for the specific user/org.
# `meta` is the query execution job's meta attribute.
def capacity_reached_for(entity, executions, meta):
return False

View File

@@ -16,6 +16,7 @@ from .queries import (
from .alerts import check_alerts_for_query
from .failure_report import send_aggregated_errors
from .worker import Worker, Queue, Job
from .capacity import cleanup_waiting_lists
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
from redash import rq_redis_connection

135
redash/tasks/capacity.py Normal file
View File

@@ -0,0 +1,135 @@
import re
import itertools
import logging
from rq import Queue, Worker
from rq.job import Job
from redash import settings
logger = logging.getLogger(__name__)
def cleanup_waiting_lists():
"""
When a job is enqueued/dequeued to/from a CapacityQueue and it exceeds the org/user capacity, it is entered into a waiting list.
Later on, when a CapacityWorker finishes work on a job and a slot for a job on the waiting list becomes available, the worker will trigger the corresponding job
on the waiting list and re-queue it back to the original queue.
However, if a (non-horse) worker dies in the middle of execution, it will not trigger the next item on the waiting list. If there is any other
job for that org or user queued or executing, they will trigger those jobs eventually, but if no other jobs are queued or executing, the jobs
on the waiting list may never execute.
This periodic task looks at all waiting lists and sees if there are no triggers for any of them. In case no triggers are found, we can assume that
their worker died and re-enqueue them back into their original queues.
If a waiting list is empty, it can be deleted.
"""
queues = set(Queue.all())
waiting_lists = set([q for q in queues if q.name.endswith(":waiting")])
wip = itertools.chain(
*[
queue.started_job_registry.get_job_ids()
for queue in (queues - waiting_lists)
]
)
for waiting_list in waiting_lists:
trigger = next(
(j for j in wip if waiting_list.name.split(":origin")[0] in j), None
)
if trigger is None:
if waiting_list.is_empty():
logger.warning(
f"Waiting list {waiting_list.name} is empty and will be deleted."
)
waiting_list.delete()
else:
origin_name = re.findall(r"origin:(.*?):", waiting_list.name)[0]
logger.warning(
f"Waiting list {waiting_list.name} has no executing job to trigger it. Returning all jobs from the waiting list back to their original queue ({origin_name})."
)
origin = CapacityQueue(origin_name)
while waiting_list.count > 0:
job_id = waiting_list.pop_job_id()
job = Job.fetch(job_id)
origin.enqueue_job(job, at_front=True)
entity_key = lambda entity, job: f"{entity}:{job.meta[f'{entity}_id']}"
waiting_list_key = (
lambda entity, job, origin_name: f"{entity_key(entity, job)}:origin:{origin_name}:waiting"
)
class CapacityQueue(Queue):
def find_waiting_list(self, job_ids, entity, job):
executions = sum(map(lambda job_id: entity_key(entity, job) in job_id, job_ids))
if settings.dynamic_settings.capacity_reached_for(entity, executions, job.meta):
waiting_list = waiting_list_key(entity, job, self.name)
logger.warning(
f"Moving job {job.id} to the {entity}'s waiting list ({waiting_list}) since {entity_key(entity, job)} is currently executing {executions} jobs and has reached the {entity} capacity."
)
return waiting_list
def enter_waiting_list(self, job, pipeline=None):
if job.meta.get("is_query_execution", False):
job_ids = self.started_job_registry.get_job_ids()
waiting_list = self.find_waiting_list(
job_ids, "user", job
) or self.find_waiting_list(job_ids, "org", job)
if waiting_list:
return Queue(waiting_list).enqueue_job(job, pipeline=pipeline)
@classmethod
def dequeue_any(cls, *args, **kwargs):
result = super(CapacityQueue, cls).dequeue_any(*args, **kwargs)
if result is None:
return None
job, queue = result
if queue.enter_waiting_list(job):
return cls.dequeue_any(*args, **kwargs)
else:
return job, queue
def enqueue_job(self, job, pipeline=None, at_front=False):
return self.enter_waiting_list(job, pipeline) or super().enqueue_job(
job, pipeline=pipeline, at_front=at_front
)
class CapacityWorker(Worker):
queue_class = CapacityQueue
def _process_waiting_lists(self, queue, job):
if job.meta.get("is_query_execution", False):
waiting_lists = [
Queue(waiting_list_key("user", job, queue.name)),
Queue(waiting_list_key("org", job, queue.name)),
]
result = Queue.dequeue_any(waiting_lists, None, job_class=self.job_class)
if result is not None:
waiting_job, _ = result
logger.warning(
f"Moving job {waiting_job.id} from waiting list ({waiting_job.origin}) back to the original queue ({queue.name}) since an execution slot opened up for it."
)
queue.enqueue_job(waiting_job)
def handle_job_success(self, job, queue, started_job_registry):
try:
super().handle_job_success(job, queue, started_job_registry)
finally:
self._process_waiting_lists(queue, job)
def handle_job_failure(self, job, queue, started_job_registry=None, exc_string=""):
try:
super().handle_job_failure(job, queue, started_job_registry, exc_string)
finally:
self._process_waiting_lists(queue, job)

View File

@@ -1,6 +1,7 @@
import signal
import time
import redis
from uuid import uuid4
from rq import get_current_job
from rq.job import JobStatus
@@ -86,12 +87,14 @@ def enqueue_query(
queue = Queue(queue_name)
enqueue_kwargs = {
"job_id": f"org:{data_source.org_id}:user:{user_id}:id:{uuid4()}",
"user_id": user_id,
"scheduled_query_id": scheduled_query_id,
"is_api_key": is_api_key,
"job_timeout": time_limit,
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
"meta": {
"is_query_execution": True,
"data_source_id": data_source.id,
"org_id": data_source.org_id,
"scheduled": scheduled_query_id is not None,

View File

@@ -18,6 +18,7 @@ from redash.tasks import (
version_check,
send_aggregated_errors,
Queue,
cleanup_waiting_lists,
)
logger = logging.getLogger(__name__)
@@ -81,6 +82,7 @@ def periodic_job_definitions():
"func": send_aggregated_errors,
"interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
},
{"func": cleanup_waiting_lists, "interval": timedelta(minutes=1)},
]
if settings.VERSION_CHECK:

View File

@@ -2,6 +2,7 @@ import errno
import os
import signal
import time
from redash.tasks.capacity import CapacityQueue, CapacityWorker
from redash import statsd_client
from rq import Queue as BaseQueue, get_current_job
from rq.worker import HerokuWorker # HerokuWorker implements graceful shutdown on SIGTERM
@@ -37,7 +38,7 @@ class CancellableQueue(BaseQueue):
job_class = CancellableJob
class RedashQueue(StatsdRecordingQueue, CancellableQueue):
class RedashQueue(StatsdRecordingQueue, CancellableQueue, CapacityQueue):
pass
@@ -165,7 +166,7 @@ class HardLimitingWorker(HerokuWorker):
)
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker, CapacityWorker):
queue_class = RedashQueue

View File

@@ -0,0 +1,184 @@
from mock import MagicMock, patch, call
from tests import BaseTestCase
from rq import push_connection, pop_connection, Queue
from rq.job import JobStatus
from redash import rq_redis_connection
from redash.tasks.capacity import CapacityWorker, CapacityQueue
from redash.tasks.worker import Job
def say_hello():
return "Hello!"
def create_job(job_id, **meta):
meta["is_query_execution"] = True
return Job.create(
say_hello,
id=f"org:{meta['org_id']}:user:{meta['user_id']}:id:{job_id}",
meta=meta,
)
class TestCapacityQueue(BaseTestCase):
def setUp(self):
push_connection(rq_redis_connection)
def tearDown(self):
pop_connection()
rq_redis_connection.flushdb()
@patch("redash.settings.dynamic_settings.capacity_reached_for", return_value=True)
def test_redirects_to_user_waiting_list_if_over_capacity(self, _):
queue = CapacityQueue()
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
self.assertEqual(job_1.origin, "user:John:origin:default:waiting")
self.assertEqual(job_2.origin, "user:John:origin:default:waiting")
@patch(
"redash.settings.dynamic_settings.capacity_reached_for",
side_effect=[False, True, False, True],
)
def test_redirects_to_org_waiting_list_if_over_capacity(self, _):
queue = CapacityQueue()
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="Mark"))
self.assertEqual(job_1.origin, "org:Acme:origin:default:waiting")
self.assertEqual(job_2.origin, "org:Acme:origin:default:waiting")
class TestCapacityWorker(BaseTestCase):
def setUp(self):
push_connection(rq_redis_connection)
def tearDown(self):
pop_connection()
rq_redis_connection.flushdb()
@patch("redash.settings.dynamic_settings.capacity_reached_for", return_value=True)
def test_always_handles_non_query_execution_jobs(self, _):
queue = CapacityQueue()
job = queue.enqueue(say_hello)
worker = CapacityWorker([queue])
worker.work(burst=True)
self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
def test_handles_job_if_within_capacity(self):
queue = CapacityQueue()
job = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
worker = CapacityWorker([queue])
worker.work(burst=True)
self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
def test_doesnt_handle_job_if_over_user_capacity(self):
queue = CapacityQueue()
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
worker = CapacityWorker([queue])
with patch(
"redash.settings.dynamic_settings.capacity_reached_for",
side_effect=[False, False, True],
):
worker.work(burst=True)
job_1.refresh()
self.assertEqual(job_1.get_status(), JobStatus.FINISHED)
job_2.refresh()
self.assertEqual(job_2.get_status(), JobStatus.QUEUED)
self.assertEqual(job_2.origin, "user:John:origin:default:waiting")
def test_doesnt_handle_job_if_over_org_capacity(self):
queue = CapacityQueue()
job_1 = queue.enqueue_job(create_job(1, org_id="Acme", user_id="John"))
job_2 = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
worker = CapacityWorker([queue])
with patch(
"redash.settings.dynamic_settings.capacity_reached_for",
side_effect=[False, False, False, True],
):
worker.work(burst=True)
job_1.refresh()
self.assertEqual(job_1.get_status(), JobStatus.FINISHED)
job_2.refresh()
self.assertEqual(job_2.get_status(), JobStatus.QUEUED)
self.assertEqual(job_2.origin, "org:Acme:origin:default:waiting")
def test_isolates_capacity_between_original_queues(self):
queries_queue = CapacityQueue("queries")
adhoc_query = queries_queue.enqueue_job(
create_job(1, org_id="Acme", user_id="John")
)
scheduled_queries_queue = CapacityQueue("scheduled_queries")
scheduled_query = scheduled_queries_queue.enqueue_job(
create_job(2, org_id="Acme", user_id="John")
)
worker = CapacityWorker([queries_queue, scheduled_queries_queue])
with patch(
"redash.settings.dynamic_settings.capacity_reached_for", return_value=True
):
worker.work(burst=True)
adhoc_query.refresh()
self.assertEqual(adhoc_query.get_status(), JobStatus.QUEUED)
self.assertEqual(adhoc_query.origin, "user:John:origin:queries:waiting")
scheduled_query.refresh()
self.assertEqual(scheduled_query.get_status(), JobStatus.QUEUED)
self.assertEqual(
scheduled_query.origin, "user:John:origin:scheduled_queries:waiting"
)
def test_handles_waiting_user_jobs_when_user_slot_opens_up(self):
user_waiting_list = Queue("user:John:origin:default:waiting")
user_waiting_job = user_waiting_list.enqueue_job(
create_job(1, org_id="Acme", user_id="John")
)
org_waiting_list = Queue("org:Acme")
org_waiting_job = org_waiting_list.enqueue_job(
create_job(2, org_id="Acme", user_id="Mark", original_queue="default")
)
queue = CapacityQueue()
job = queue.enqueue_job(create_job(3, org_id="Acme", user_id="John"))
worker = CapacityWorker([queue])
worker.work(max_jobs=2)
user_waiting_job.refresh()
self.assertEqual(user_waiting_job.get_status(), JobStatus.FINISHED)
self.assertEqual(user_waiting_job.origin, "default")
org_waiting_job.refresh()
self.assertEqual(org_waiting_job.get_status(), JobStatus.QUEUED)
def test_handles_waiting_org_jobs_when_org_job_opens_up(self):
org_waiting_list = Queue("org:Acme:origin:default:waiting")
org_waiting_job = org_waiting_list.enqueue_job(
create_job(1, org_id="Acme", user_id="Mark")
)
queue = CapacityQueue()
job = queue.enqueue_job(create_job(2, org_id="Acme", user_id="John"))
worker = CapacityWorker([queue])
worker.work(max_jobs=2)
org_waiting_job.refresh()
self.assertEqual(org_waiting_job.get_status(), JobStatus.FINISHED)
self.assertEqual(org_waiting_job.origin, "default")