Nuke Celery (#4521)

* enforce hard limits on non-responsive work horses by workers

* move differences from Worker to helper methods to help make the specialization clearer

* move HardLimitingWorker to redash/tasks

* move schedule.py to /tasks

* explain the motivation for HardLimitingWorker

* pleasing CodeClimate

* pleasing CodeClimate

* port query execution to RQ

* get rid of argsrepr

* avoid star imports

* allow queries to be cancelled in RQ

* return QueryExecutionErrors as job results

* fix TestTaskEnqueue and QueryExecutorTests

* remove Celery monitoring

* get rid of QueryTask and use RQ jobs directly (with a job serializer)

* Revert "remove Celery monitoring"

This reverts commit 37a74ea403.

* reduce occurences of the word 'task'

* use Worker, Queue and Job instead of spreading names that share behavior details

* remove locks for failed jobs as well

* did I not commit that colon? oh my

* push the redis connection to RQ's stack on every request to avoid verbose connection setting

* use a connection context for tests

* remove Celery monitoring

* 👋 Celery

* remove Celery from Cypress

* black it up

* some more black

* return all started/queued job ids (for future monitoring

* Restyled by prettier (#4522)

* remove celery.py

* remove some frontend residuals that reappeared after a merge

Co-authored-by: restyled-io[bot] <32688539+restyled-io[bot]@users.noreply.github.com>
This commit is contained in:
Omer Lachish
2020-01-12 22:36:48 +02:00
committed by GitHub
parent 13c3531956
commit aa17681af2
22 changed files with 46 additions and 592 deletions

View File

@@ -31,25 +31,12 @@ services:
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
celery_worker:
build: ../
command: celery_worker
depends_on:
- server
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "queries,scheduled_queries"
WORKERS_COUNT: 2
cypress:
build:
context: ../
dockerfile: .circleci/Dockerfile.cypress
depends_on:
- server
- celery_worker
- worker
- scheduler
environment:

View File

@@ -1,15 +1,6 @@
#!/bin/bash
set -e
celery_worker() {
WORKERS_COUNT=${WORKERS_COUNT:-2}
QUEUES=${QUEUES:-queries,scheduled_queries}
WORKER_EXTRA_OPTIONS=${WORKER_EXTRA_OPTIONS:-}
echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..."
exec /usr/local/bin/celery worker --app=redash.worker -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair $WORKER_EXTRA_OPTIONS
}
scheduler() {
echo "Starting RQ scheduler..."
@@ -37,15 +28,6 @@ dev_worker() {
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES
}
dev_celery_worker() {
WORKERS_COUNT=${WORKERS_COUNT:-2}
QUEUES=${QUEUES:-queries,scheduled_queries}
echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..."
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- /usr/local/bin/celery worker --app=redash.worker -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair
}
server() {
# Recycle gunicorn workers every n-th request. See http://docs.gunicorn.org/en/stable/settings.html#max-requests for more details.
MAX_REQUESTS=${MAX_REQUESTS:-1000}
@@ -57,10 +39,6 @@ create_db() {
exec /app/manage.py database create_tables
}
celery_healthcheck() {
exec /usr/local/bin/celery inspect ping --app=redash.worker -d celery@$HOSTNAME
}
rq_healthcheck() {
exec /app/manage.py rq healthcheck
}
@@ -72,13 +50,10 @@ help() {
echo ""
echo "server -- start Redash server (with gunicorn)"
echo "celery_worker -- start Celery worker"
echo "dev_celery_worker -- start Celery worker process which picks up code changes and reloads"
echo "worker -- start a single RQ worker"
echo "dev_worker -- start a single RQ worker with code reloading"
echo "scheduler -- start an rq-scheduler instance"
echo "dev_scheduler -- start an rq-scheduler instance with code reloading"
echo "celery_healthcheck -- runs a Celery healthcheck. Useful for Docker's HEALTHCHECK mechanism."
echo "rq_healthcheck -- runs a RQ healthcheck that verifies that all local workers are active. Useful for Docker's HEALTHCHECK mechanism."
echo ""
echo "shell -- open shell"
@@ -117,14 +92,6 @@ case "$1" in
shift
dev_scheduler
;;
celery_worker)
shift
celery_worker
;;
dev_celery_worker)
shift
dev_celery_worker
;;
dev_worker)
shift
dev_worker
@@ -133,10 +100,6 @@ case "$1" in
shift
rq_healthcheck
;;
celery_healthcheck)
shift
celery_healthcheck
;;
dev_server)
export FLASK_DEBUG=1
exec /app/manage.py runserver --debugger --reload -h 0.0.0.0

View File

@@ -1,89 +0,0 @@
import { map } from "lodash";
import React from "react";
import PropTypes from "prop-types";
import Table from "antd/lib/table";
import Card from "antd/lib/card";
import Spin from "antd/lib/spin";
import Badge from "antd/lib/badge";
import { Columns } from "@/components/items-list/components/ItemsTable";
// CounterCard
export function CounterCard({ title, value, loading }) {
return (
<Spin spinning={loading}>
<Card>
{title}
<div className="f-20">{value}</div>
</Card>
</Spin>
);
}
CounterCard.propTypes = {
title: PropTypes.string.isRequired,
value: PropTypes.oneOfType([PropTypes.number, PropTypes.string]),
loading: PropTypes.bool.isRequired,
};
CounterCard.defaultProps = {
value: "",
};
// Tables
const commonColumns = [
{ title: "Worker Name", dataIndex: "worker" },
{ title: "PID", dataIndex: "worker_pid" },
{ title: "Queue", dataIndex: "queue" },
Columns.custom(
value => {
if (value === "active") {
return (
<span>
<Badge status="processing" /> Active
</span>
);
}
return (
<span>
<Badge status="warning" /> {value}
</span>
);
},
{
title: "State",
dataIndex: "state",
}
),
Columns.timeAgo({ title: "Start Time", dataIndex: "start_time" }),
];
const queryColumns = commonColumns.concat([
Columns.timeAgo({ title: "Enqueue Time", dataIndex: "enqueue_time" }),
{ title: "Query ID", dataIndex: "query_id" },
{ title: "Org ID", dataIndex: "org_id" },
{ title: "Data Source ID", dataIndex: "data_source_id" },
{ title: "User ID", dataIndex: "user_id" },
{ title: "Scheduled", dataIndex: "scheduled" },
]);
const queuesColumns = map(["Name", "Active", "Reserved", "Waiting"], c => ({ title: c, dataIndex: c.toLowerCase() }));
const TablePropTypes = {
loading: PropTypes.bool.isRequired,
items: PropTypes.arrayOf(PropTypes.object).isRequired,
};
export function QueuesTable({ loading, items }) {
return <Table loading={loading} columns={queuesColumns} rowKey="name" dataSource={items} />;
}
QueuesTable.propTypes = TablePropTypes;
export function QueriesTable({ loading, items }) {
return <Table loading={loading} columns={queryColumns} rowKey="task_id" dataSource={items} />;
}
QueriesTable.propTypes = TablePropTypes;

View File

@@ -15,9 +15,6 @@ export default function Layout({ activeTab, children }) {
<Tabs.TabPane key="system_status" tab={<a href="admin/status">System Status</a>}>
{activeTab === "system_status" ? children : null}
</Tabs.TabPane>
<Tabs.TabPane key="tasks" tab={<a href="admin/queries/tasks">Celery Status</a>}>
{activeTab === "tasks" ? children : null}
</Tabs.TabPane>
<Tabs.TabPane key="jobs" tab={<a href="admin/queries/jobs">RQ Status</a>}>
{activeTab === "jobs" ? children : null}
</Tabs.TabPane>

View File

@@ -3,9 +3,34 @@ import React from "react";
import PropTypes from "prop-types";
import Badge from "antd/lib/badge";
import Card from "antd/lib/card";
import Spin from "antd/lib/spin";
import Table from "antd/lib/table";
import { Columns } from "@/components/items-list/components/ItemsTable";
// CounterCard
export function CounterCard({ title, value, loading }) {
return (
<Spin spinning={loading}>
<Card>
{title}
<div className="f-20">{value}</div>
</Card>
</Spin>
);
}
CounterCard.propTypes = {
title: PropTypes.string.isRequired,
value: PropTypes.oneOfType([PropTypes.number, PropTypes.string]),
loading: PropTypes.bool.isRequired,
};
CounterCard.defaultProps = {
value: "",
};
// Tables
const otherJobsColumns = [

View File

@@ -6,8 +6,7 @@ import Alert from "antd/lib/alert";
import Tabs from "antd/lib/tabs";
import * as Grid from "antd/lib/grid";
import Layout from "@/components/admin/Layout";
import { CounterCard } from "@/components/admin/CeleryStatus";
import { WorkersTable, QueuesTable, OtherJobsTable } from "@/components/admin/RQStatus";
import { CounterCard, WorkersTable, QueuesTable, OtherJobsTable } from "@/components/admin/RQStatus";
import { $http, $location, $rootScope } from "@/services/ng";
import recordEvent from "@/services/recordEvent";

View File

@@ -1,136 +0,0 @@
import { values, each } from "lodash";
import moment from "moment";
import React from "react";
import { react2angular } from "react2angular";
import Alert from "antd/lib/alert";
import Tabs from "antd/lib/tabs";
import * as Grid from "antd/lib/grid";
import Layout from "@/components/admin/Layout";
import { CounterCard, QueuesTable, QueriesTable } from "@/components/admin/CeleryStatus";
import { $http } from "@/services/ng";
import recordEvent from "@/services/recordEvent";
import { routesToAngularRoutes } from "@/lib/utils";
// Converting name coming from API to the one the UI expects.
// TODO: update the UI components to use `waiting_in_queue` instead of `waiting`.
function stateName(state) {
if (state === "waiting_in_queue") {
return "waiting";
}
return state;
}
class Tasks extends React.Component {
state = {
isLoading: true,
error: null,
queues: [],
queries: [],
counters: { active: 0, reserved: 0, waiting: 0 },
};
componentDidMount() {
recordEvent("view", "page", "admin/tasks");
$http
.get("/api/admin/queries/tasks")
.then(({ data }) => this.processTasks(data.tasks))
.catch(error => this.handleError(error));
}
componentWillUnmount() {
// Ignore data after component unmounted
this.processTasks = () => {};
this.handleError = () => {};
}
processTasks = tasks => {
const queues = {};
const queries = [];
const counters = { active: 0, reserved: 0, waiting: 0 };
each(tasks, task => {
task.state = stateName(task.state);
queues[task.queue] = queues[task.queue] || { name: task.queue, active: 0, reserved: 0, waiting: 0 };
queues[task.queue][task.state] += 1;
if (task.enqueue_time) {
task.enqueue_time = moment(task.enqueue_time * 1000.0);
}
if (task.start_time) {
task.start_time = moment(task.start_time * 1000.0);
}
counters[task.state] += 1;
if (task.task_name === "redash.tasks.execute_query") {
queries.push(task);
}
});
this.setState({ isLoading: false, queues: values(queues), queries, counters });
};
handleError = error => {
this.setState({ isLoading: false, error });
};
render() {
const { isLoading, error, queues, queries, counters } = this.state;
return (
<Layout activeTab="tasks">
<div className="p-15">
{error && <Alert type="error" message="Failed loading status. Please refresh." />}
{!error && (
<React.Fragment>
<Grid.Row gutter={15} className="m-b-15">
<Grid.Col span={8}>
<CounterCard title="Active Tasks" value={counters.active} loading={isLoading} />
</Grid.Col>
<Grid.Col span={8}>
<CounterCard title="Reserved Tasks" value={counters.reserved} loading={isLoading} />
</Grid.Col>
<Grid.Col span={8}>
<CounterCard title="Waiting Tasks" value={counters.waiting} loading={isLoading} />
</Grid.Col>
</Grid.Row>
<Tabs defaultActiveKey="queues" animated={false}>
<Tabs.TabPane key="queues" tab="Queues">
<QueuesTable loading={isLoading} items={queues} />
</Tabs.TabPane>
<Tabs.TabPane key="queries" tab="Queries">
<QueriesTable loading={isLoading} items={queries} />
</Tabs.TabPane>
</Tabs>
</React.Fragment>
)}
</div>
</Layout>
);
}
}
export default function init(ngModule) {
ngModule.component("pageTasks", react2angular(Tasks));
return routesToAngularRoutes(
[
{
path: "/admin/queries/tasks",
title: "Celery Status",
key: "tasks",
},
],
{
template: "<page-tasks></page-tasks>",
}
);
}
init.init = true;

View File

@@ -50,25 +50,6 @@ services:
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
REDASH_MAIL_DEFAULT_SENDER: redash@example.com
REDASH_MAIL_SERVER: email
celery-worker:
build: .
command: dev_celery_worker
volumes:
- type: bind
source: .
target: /app
depends_on:
- server
- email
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "queries,scheduled_queries"
WORKERS_COUNT: 2
REDASH_MAIL_DEFAULT_SENDER: redash@example.com
REDASH_MAIL_SERVER: email
redis:
image: redis:3-alpine
restart: unless-stopped

View File

@@ -12,8 +12,8 @@ from supervisor_checks import check_runner
from supervisor_checks.check_modules import base
from redash import rq_redis_connection
from redash.tasks import Worker
from redash.tasks.schedule import (
from redash.tasks import (
Worker,
rq_scheduler,
schedule_periodic_jobs,
periodic_job_definitions,

View File

@@ -8,7 +8,7 @@ from redash.handlers.base import json_response, record_event
from redash.permissions import require_super_admin
from redash.serializers import QuerySerializer
from redash.utils import json_loads
from redash.monitor import celery_tasks, rq_status
from redash.monitor import rq_status
@routes.route("/api/admin/queries/outdated", methods=["GET"])
@@ -29,7 +29,7 @@ def outdated_queries():
record_event(
current_org,
current_user._get_current_object(),
{"action": "list", "object_type": "outdated_queries"},
{"action": "list", "object_type": "outdated_queries",},
)
response = {
@@ -41,21 +41,6 @@ def outdated_queries():
return json_response(response)
@routes.route("/api/admin/queries/tasks", methods=["GET"])
@require_super_admin
@login_required
def queries_tasks():
record_event(
current_org,
current_user._get_current_object(),
{"action": "list", "object_type": "celery_tasks"},
)
response = {"tasks": celery_tasks()}
return json_response(response)
@routes.route("/api/admin/queries/rq_status", methods=["GET"])
@require_super_admin
@login_required

View File

@@ -15,7 +15,6 @@ from redash.permissions import (
require_permission,
require_any_of_permission,
view_only,
view_only,
)
from redash.tasks import Job
from redash.tasks.queries import enqueue_query

View File

@@ -1,61 +0,0 @@
import logging
import socket
import time
from redash import settings
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = settings.CELERY_INIT_TIMEOUT
from celery.signals import task_postrun, task_prerun
from redash import settings, statsd_client
from redash.utils import json_dumps
tasks_start_time = {}
@task_prerun.connect
def task_prerun_handler(signal, sender, task_id, task, args, kwargs, **kw):
try:
tasks_start_time[task_id] = time.time()
except Exception:
logging.exception("Exception during task_prerun handler:")
def metric_name(name, tags):
# TODO: use some of the tags in the metric name if tags are not supported
# TODO: support additional tag formats (this one is for InfluxDB)
if not settings.STATSD_USE_TAGS:
return name
tags_string = ",".join(["{}={}".format(k, v) for k, v in tags.items()])
return "{},{}".format(name, tags_string)
@task_postrun.connect
def task_postrun_handler(
signal, sender, task_id, task, args, kwargs, retval, state, **kw
):
try:
run_time = 1000 * (time.time() - tasks_start_time.pop(task_id))
state = (state or "unknown").lower()
tags = {"state": state, "hostname": socket.gethostname()}
if task.name == "redash.tasks.execute_query":
if isinstance(retval, Exception):
tags["state"] = "exception"
state = "exception"
tags["data_source_id"] = args[1]
normalized_task_name = task.name.replace("redash.tasks.", "").replace(".", "_")
metric = "celery.task_runtime.{}".format(normalized_task_name)
logging.debug(
"metric=%s", json_dumps({"metric": metric, "tags": tags, "value": run_time})
)
statsd_client.timing(metric_name(metric, tags), run_time)
statsd_client.incr(
metric_name("celery.task.{}.{}".format(normalized_task_name, state), tags)
)
except Exception:
logging.exception("Exception during task_postrun handler.")

View File

@@ -1,10 +1,10 @@
from __future__ import absolute_import
import itertools
from funcy import flatten
from sqlalchemy import union_all
from redash import redis_connection, rq_redis_connection, __version__, settings
from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget
from redash.utils import json_loads
from redash.worker import celery
from rq import Queue, Worker
from rq.job import Job
from rq.registry import StartedJobRegistry
@@ -29,25 +29,8 @@ def get_object_counts():
return status
def get_celery_queues():
queue_names = db.session.query(DataSource.queue_name).distinct()
scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct()
query = db.session.execute(union_all(queue_names, scheduled_queue_names))
return ["celery"] + [row[0] for row in query]
def get_queues_status():
return {
**{
queue: {"size": redis_connection.llen(queue)}
for queue in get_celery_queues()
},
**{
queue.name: {"size": len(queue)}
for queue in Queue.all()
},
}
return {queue.name: {"size": len(queue)} for queue in Queue.all()}
def get_db_sizes():
@@ -78,71 +61,13 @@ def get_status():
return status
def get_waiting_in_queue(queue_name):
jobs = []
for raw in redis_connection.lrange(queue_name, 0, -1):
job = json_loads(raw)
try:
args = json_loads(job["headers"]["argsrepr"])
if args.get("query_id") == "adhoc":
args["query_id"] = None
except ValueError:
args = {}
def rq_job_ids():
queues = Queue.all(connection=redis_connection)
job_row = {
"state": "waiting_in_queue",
"task_name": job["headers"]["task"],
"worker": None,
"worker_pid": None,
"start_time": None,
"task_id": job["headers"]["id"],
"queue": job["properties"]["delivery_info"]["routing_key"],
}
started_jobs = [StartedJobRegistry(queue=q).get_job_ids() for q in queues]
queued_jobs = [q.job_ids for q in queues]
job_row.update(args)
jobs.append(job_row)
return jobs
def parse_tasks(task_lists, state):
rows = []
for task in itertools.chain(*task_lists.values()):
task_row = {
"state": state,
"task_name": task["name"],
"worker": task["hostname"],
"queue": task["delivery_info"]["routing_key"],
"task_id": task["id"],
"worker_pid": task["worker_pid"],
"start_time": task["time_start"],
}
if task["name"] == "redash.tasks.execute_query":
try:
args = json_loads(task["args"])
except ValueError:
args = {}
if args.get("query_id") == "adhoc":
args["query_id"] = None
task_row.update(args)
rows.append(task_row)
return rows
def celery_tasks():
tasks = parse_tasks(celery.control.inspect().active(), "active")
tasks += parse_tasks(celery.control.inspect().reserved(), "reserved")
for queue_name in get_celery_queues():
tasks += get_waiting_in_queue(queue_name)
return tasks
return flatten(started_jobs + queued_jobs)
def fetch_jobs(queue, job_ids):

View File

@@ -14,6 +14,7 @@ from redash.permissions import has_access, view_only
from redash.utils import json_loads
from redash.models.parameterized_query import ParameterizedQuery
from .query_result import (
serialize_query_result,
serialize_query_result_to_dsv,

View File

@@ -14,7 +14,7 @@ from .helpers import (
)
from .organization import DATE_FORMAT, TIME_FORMAT # noqa
# _REDIS_URL is the unchanged REDIS_URL we get from env vars, to be used later with Celery
# _REDIS_URL is the unchanged REDIS_URL we get from env vars, to be used later with RQ
_REDIS_URL = os.environ.get(
"REDASH_REDIS_URL", os.environ.get("REDIS_URL", "redis://localhost:6379/0")
)
@@ -41,42 +41,6 @@ SQLALCHEMY_ECHO = False
RQ_REDIS_URL = os.environ.get("RQ_REDIS_URL", _REDIS_URL)
# Celery related settings
CELERY_BROKER = os.environ.get("REDASH_CELERY_BROKER", _REDIS_URL)
CELERY_RESULT_BACKEND = os.environ.get(
"REDASH_CELERY_RESULT_BACKEND",
os.environ.get("REDASH_CELERY_BACKEND", CELERY_BROKER),
)
CELERY_RESULT_EXPIRES = int(
os.environ.get(
"REDASH_CELERY_RESULT_EXPIRES",
os.environ.get("REDASH_CELERY_TASK_RESULT_EXPIRES", 3600 * 4),
)
)
CELERY_INIT_TIMEOUT = int(os.environ.get("REDASH_CELERY_INIT_TIMEOUT", 10))
CELERY_BROKER_USE_SSL = CELERY_BROKER.startswith("rediss")
CELERY_SSL_CONFIG = (
{
"ssl_cert_reqs": int(
os.environ.get("REDASH_CELERY_BROKER_SSL_CERT_REQS", ssl.CERT_OPTIONAL)
),
"ssl_ca_certs": os.environ.get("REDASH_CELERY_BROKER_SSL_CA_CERTS"),
"ssl_certfile": os.environ.get("REDASH_CELERY_BROKER_SSL_CERTFILE"),
"ssl_keyfile": os.environ.get("REDASH_CELERY_BROKER_SSL_KEYFILE"),
}
if CELERY_BROKER_USE_SSL
else None
)
CELERY_WORKER_PREFETCH_MULTIPLIER = int(
os.environ.get("REDASH_CELERY_WORKER_PREFETCH_MULTIPLIER", 1)
)
CELERY_ACCEPT_CONTENT = os.environ.get("REDASH_CELERY_ACCEPT_CONTENT", "json").split(
","
)
CELERY_TASK_SERIALIZER = os.environ.get("REDASH_CELERY_TASK_SERIALIZER", "json")
CELERY_RESULT_SERIALIZER = os.environ.get("REDASH_CELERY_RESULT_SERIALIZER", "json")
# The following enables periodic job (every 5 minutes) of removing unused query results.
QUERY_RESULTS_CLEANUP_ENABLED = parse_boolean(
os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_ENABLED", "true")
@@ -263,26 +227,6 @@ LOG_FORMAT = os.environ.get(
"REDASH_LOG_FORMAT",
LOG_PREFIX + "[%(asctime)s][PID:%(process)d][%(levelname)s][%(name)s] %(message)s",
)
CELERYD_WORKER_LOG_FORMAT = os.environ.get(
"REDASH_CELERYD_WORKER_LOG_FORMAT",
os.environ.get(
"REDASH_CELERYD_LOG_FORMAT",
LOG_PREFIX
+ "[%(asctime)s][PID:%(process)d][%(levelname)s][%(processName)s] %(message)s",
),
)
CELERYD_WORKER_TASK_LOG_FORMAT = os.environ.get(
"REDASH_CELERYD_WORKER_TASK_LOG_FORMAT",
os.environ.get(
"REDASH_CELERYD_TASK_LOG_FORMAT",
(
LOG_PREFIX
+ "[%(asctime)s][PID:%(process)d][%(levelname)s][%(processName)s] "
"task_name=%(task_name)s "
"task_id=%(task_id)s %(message)s"
),
),
)
RQ_WORKER_JOB_LOG_FORMAT = os.environ.get(
"REDASH_RQ_WORKER_JOB_LOG_FORMAT",
(

View File

@@ -1,6 +1,6 @@
from flask import current_app
import datetime
from redash.worker import celery, job, get_job_logger
from redash.worker import job, get_job_logger
from redash import models, utils

View File

@@ -14,7 +14,7 @@ from redash.tasks.worker import Queue, Job
from redash.tasks.alerts import check_alerts_for_query
from redash.tasks.failure_report import track_failure
from redash.utils import gen_query_hash, json_dumps, utcnow
from redash.worker import celery, get_job_logger
from redash.worker import get_job_logger
logger = get_job_logger(__name__)
TIMEOUT_MESSAGE = "Query exceeded Redash query execution time limit."

View File

@@ -1,7 +1,7 @@
import logging
import time
from celery.exceptions import SoftTimeLimitExceeded
from rq.timeouts import JobTimeoutException
from redash import models, redis_connection, settings, statsd_client
from redash.models.parameterized_query import (
InvalidParameterError,
@@ -148,7 +148,7 @@ def refresh_schema(data_source_id):
time.time() - start_time,
)
statsd_client.incr("refresh_schema.success")
except SoftTimeLimitExceeded:
except JobTimeoutException:
logger.info(
u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f",
ds.id,
@@ -202,3 +202,4 @@ def refresh_schemas():
u"task=refresh_schemas state=finish total_runtime=%.2f",
time.time() - global_start_time,
)

View File

@@ -1,6 +1,5 @@
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.rq import RqIntegration
@@ -29,7 +28,6 @@ def init():
send_default_pii=True,
integrations=[
FlaskIntegration(),
CeleryIntegration(),
SqlalchemyIntegration(),
RedisIntegration(),
RqIntegration(),

View File

@@ -4,10 +4,6 @@ from functools import partial
from flask import current_app
import logging
from celery import Celery
from celery.signals import worker_process_init
from celery.utils.log import get_logger
from rq import get_current_job
from rq.decorators import job as rq_job
@@ -18,9 +14,6 @@ from redash import (
redis_connection,
rq_redis_connection,
)
from redash.metrics import celery as celery_metrics # noqa
logger = get_logger(__name__)
job = partial(rq_job, connection=rq_redis_connection)
@@ -46,59 +39,3 @@ def get_job_logger(name):
logger.propagate = False
return logger
celery = Celery(
"redash",
broker=settings.CELERY_BROKER,
broker_use_ssl=settings.CELERY_SSL_CONFIG,
redis_backend_use_ssl=settings.CELERY_SSL_CONFIG,
include="redash.tasks",
)
celery.conf.update(
result_backend=settings.CELERY_RESULT_BACKEND,
timezone="UTC",
result_expires=settings.CELERY_RESULT_EXPIRES,
worker_log_format=settings.CELERYD_WORKER_LOG_FORMAT,
worker_task_log_format=settings.CELERYD_WORKER_TASK_LOG_FORMAT,
worker_prefetch_multiplier=settings.CELERY_WORKER_PREFETCH_MULTIPLIER,
accept_content=settings.CELERY_ACCEPT_CONTENT,
task_serializer=settings.CELERY_TASK_SERIALIZER,
result_serializer=settings.CELERY_RESULT_SERIALIZER,
)
# Create a new Task base class, that pushes a new Flask app context to allow DB connections if needed.
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with current_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
@worker_process_init.connect
def init_celery_flask_app(**kwargs):
"""Create the Flask app after forking a new worker.
This is to make sure no resources are shared between processes.
"""
app = create_app()
app.app_context().push()
@celery.on_after_configure.connect
def add_periodic_tasks(sender, **kwargs):
"""Load all periodic tasks from extensions and add them to Celery."""
# Populate the redash.extensions.periodic_tasks dictionary
extensions.load_periodic_tasks(logger)
for params in extensions.periodic_tasks.values():
# Add it to Celery's periodic task registry, too.
sender.add_periodic_task(**params)

View File

@@ -37,8 +37,6 @@ statsd==3.3.0
gunicorn==19.9.0
rq==1.1.0
rq-scheduler==0.9.1
celery==4.3.0
kombu==4.6.3
jsonschema==3.1.1
RestrictedPython==5.0
pysaml2==4.8.0

View File

@@ -7,8 +7,8 @@ from contextlib import contextmanager
os.environ["REDASH_REDIS_URL"] = os.environ.get(
"REDASH_REDIS_URL", "redis://localhost:6379/0"
).replace("/0", "/5")
# Use different url for Celery to avoid DB being cleaned up:
os.environ["REDASH_CELERY_BROKER"] = os.environ.get(
# Use different url for RQ to avoid DB being cleaned up:
os.environ["RQ_REDIS_URL"] = os.environ.get(
"REDASH_REDIS_URL", "redis://localhost:6379/0"
).replace("/5", "/6")