refreshing snowflake schema w/o waking cluster (#4285)

* refreshing snowflake schema w/o waking cluster

Have also added a new internal method to not select a
warehouse while executing query
Using 'show columns' to fetch database schema instead of
executing a select query in information schema
show columns does not require a warehouse to run

* modularising snowflake code to avoid repetitions

fixing internal function syntax and avoiding
code repetition

* removing user object in snowflake schema query
This commit is contained in:
Monica Gangwar
2019-12-02 14:18:30 +05:30
committed by Arik Fraimovich
parent 36ab8eae89
commit 4d6c30ef13

View File

@@ -67,7 +67,7 @@ class Snowflake(BaseQueryRunner):
return TYPE_FLOAT
return t
def run_query(self, query, user):
def _get_connection(self):
region = self.configuration.get('region')
# for us-west we don't need to pass a region (and if we do, it fails to connect)
@@ -81,6 +81,19 @@ class Snowflake(BaseQueryRunner):
region=region
)
return connection
def _parse_results(self, cursor):
columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]
data = {'columns': columns, 'rows': rows}
return data
def run_query(self, query, user):
connection = self._get_connection()
cursor = connection.cursor()
try:
@@ -90,12 +103,7 @@ class Snowflake(BaseQueryRunner):
cursor.execute(query)
columns = self.fetch_columns(
[(i[0], self.determine_type(i[1], i[5])) for i in cursor.description])
rows = [dict(zip((column['name'] for column in columns), row))
for row in cursor]
data = {'columns': columns, 'rows': rows}
data = self._parse_results(cursor)
error = None
json_data = json_dumps(data)
finally:
@@ -104,30 +112,42 @@ class Snowflake(BaseQueryRunner):
return json_data, error
def _run_query_without_warehouse(self, query):
connection = self._get_connection()
cursor = connection.cursor()
try:
cursor.execute("USE {}".format(self.configuration['database']))
cursor.execute(query)
data = self._parse_results(cursor)
error = None
finally:
cursor.close()
connection.close()
return data, error
def get_schema(self, get_stats=False):
query = """
SELECT col.table_schema,
col.table_name,
col.column_name
FROM {database}.information_schema.columns col
WHERE col.table_schema <> 'INFORMATION_SCHEMA'
SHOW COLUMNS IN DATABASE {database}
""".format(database=self.configuration['database'])
results, error = self.run_query(query, None)
results, error = self._run_query_without_warehouse(query)
if error is not None:
raise Exception("Failed getting schema.")
schema = {}
results = json_loads(results)
for row in results['rows']:
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
if row['kind'] == 'COLUMN':
table_name = '{}.{}'.format(row['schema_name'], row['table_name'])
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['COLUMN_NAME'])
schema[table_name]['columns'].append(row['column_name'])
return list(schema.values())