No more using connection pool in DataManager, as it used accross processes

This commit is contained in:
Arik Fraimovich
2013-12-06 20:56:08 +02:00
parent 7b85e78636
commit bce60758e9

View File

@@ -6,7 +6,6 @@ from contextlib import contextmanager
import json
import logging
import psycopg2
import psycopg2.pool
import qr
import redis
import time
@@ -29,8 +28,7 @@ class Manager(object):
def __init__(self, redis_connection, db_connection_string, db_max_connections):
self.redis_connection = redis_connection
self.workers = []
self.db_connection_pool = psycopg2.pool.ThreadedConnectionPool(1, db_max_connections,
db_connection_string)
self.db_connection_string = db_connection_string
self.queue = qr.PriorityQueue("jobs", **self.redis_connection.connection_pool.connection_kwargs)
self.max_retries = 5
self.status = {
@@ -102,8 +100,6 @@ class Manager(object):
return job
def refresh_queries(self):
logging.info("Refreshing queries...")
sql = """SELECT queries.query, queries.ttl, retrieved_at
FROM (SELECT query, min(ttl) as ttl FROM queries WHERE ttl > 0 GROUP by query) queries
JOIN (SELECT query, max(retrieved_at) as retrieved_at
@@ -115,6 +111,7 @@ class Manager(object):
self.status['last_refresh_at'] = time.time()
self._save_status()
logging.info("Refreshing queries...")
queries = self.run_query(sql)
for query, ttl, retrieved_at in queries:
self.add_job(query, worker.Job.LOW_PRIORITY)
@@ -172,7 +169,7 @@ class Manager(object):
@contextmanager
def db_transaction(self):
connection = self.db_connection_pool.getconn()
connection = psycopg2.connect(self.db_connection_string)
cursor = connection.cursor()
try:
yield cursor
@@ -182,7 +179,7 @@ class Manager(object):
else:
connection.commit()
finally:
self.db_connection_pool.putconn(connection)
connection.close()
def _save_status(self):
self.redis_connection.hmset('manager:status', self.status)