Merge pull request #1488 from getredash/snowflake

[Data Sources] Add: Snowflake query runner
This commit is contained in:
Arik Fraimovich
2016-12-25 12:39:24 +02:00
committed by GitHub
3 changed files with 116 additions and 1 deletions

View File

@@ -0,0 +1,113 @@
from __future__ import absolute_import
import json
try:
import snowflake.connector
enabled = True
except ImportError:
enabled = False
from redash.query_runner import BaseQueryRunner, register
from redash.query_runner import TYPE_STRING, TYPE_DATE, TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN
from redash.utils import json_dumps
TYPES_MAP = {
0: TYPE_INTEGER,
1: TYPE_FLOAT,
2: TYPE_STRING,
3: TYPE_DATE,
4: TYPE_DATETIME,
5: TYPE_STRING,
6: TYPE_DATETIME,
13: TYPE_BOOLEAN
}
class Snowflake(BaseQueryRunner):
noop_query = "SELECT 1"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"account": {
"type": "string"
},
"user": {
"type": "string"
},
"password": {
"type": "string"
},
"warehouse": {
"type": "string"
},
"database": {
"type": "string"
}
},
"required": ["user", "password", "account", "database", "warehouse"],
"secret": ["password"]
}
@classmethod
def enabled(cls):
return enabled
def run_query(self, query, user):
connection = snowflake.connector.connect(
user=self.configuration['user'],
password=self.configuration['password'],
account=self.configuration['account'],
)
cursor = connection.cursor()
try:
cursor.execute("USE WAREHOUSE {}".format(self.configuration['warehouse']))
cursor.execute("USE {}".format(self.configuration['database']))
cursor.execute(query)
columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data)
finally:
cursor.close()
connection.close()
return json_data, error
def get_schema(self, get_stats=False):
query = """
SELECT col.table_schema,
col.table_name,
col.column_name
FROM {database}.information_schema.columns col
WHERE col.table_schema <> 'INFORMATION_SCHEMA'
""".format(database=self.configuration['database'])
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
schema = {}
results = json.loads(results)
for row in results['rows']:
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['COLUMN_NAME'])
return schema.values()
register(Snowflake)

View File

@@ -184,7 +184,8 @@ default_query_runners = [
'redash.query_runner.dynamodb_sql',
'redash.query_runner.mssql',
'redash.query_runner.jql',
'redash.query_runner.google_analytics'
'redash.query_runner.google_analytics',
'redash.query_runner.snowflake'
]
enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))

View File

@@ -17,5 +17,6 @@ sasl>=0.1.3
thrift>=0.8.0
thrift_sasl>=0.1.0
cassandra-driver==3.1.1
snowflake_connector_python==1.3.7
# certifi is needed to support MongoDB and SSL:
certifi