Cleaner shutdown (#8)

This commit is contained in:
Arik Fraimovich
2014-04-22 14:53:08 +03:00
parent 74f9d85752
commit ac89584083
2 changed files with 25 additions and 11 deletions

View File

@@ -8,7 +8,7 @@ query language (for example: HiveQL).
import json
import sys
import select
import logging
import psycopg2
from redash.utils import JSONEncoder
@@ -20,15 +20,18 @@ def pg(connection_string):
def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
try:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
except select.error:
raise psycopg2.OperationalError("select.error received")
def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
@@ -51,6 +54,10 @@ def pg(connection_string):
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except (select.error, OSError, psycopg2.OperationalError) as e:
logging.exception(e)
error = "Query interrupted. Please retry."
json_data = None
except psycopg2.DatabaseError as e:
json_data = None
error = e.message

View File

@@ -297,7 +297,14 @@ class Worker(multiprocessing.Process):
self._process(job_id)
else:
logging.info("[%s] Waiting for pid: %d", self.name, self.child_pid)
_, status = os.waitpid(self.child_pid, 0)
try:
_, status = os.waitpid(self.child_pid, 0)
except OSError:
logging.info("[%s] OSError while waiting for child to finish", self.name)
# setting status to >0, so the job cleanup is triggered
status = 1
self._update_status('done_jobs_count')
job = Job.load(self.manager.redis_connection, job_id)