mirror of
https://github.com/getredash/redash.git
synced 2025-12-25 01:03:20 -05:00
Merge pull request #514 from alexanderlz/master
Feature: Support Hive as datasource
This commit is contained in:
@@ -11,7 +11,7 @@ Prior to **_re:dash_**, we tried to use traditional BI suites and discovered a s
|
||||
|
||||
**_re:dash_** was built to allow fast and easy access to billions of records, that we process and collect using Amazon Redshift ("petabyte scale data warehouse" that "speaks" PostgreSQL).
|
||||
Today **_re:dash_** has support for querying multiple databases, including: Redshift, Google BigQuery, PostgreSQL, MySQL, Graphite,
|
||||
Presto, Google Spreadsheets, Cloudera Impala and custom scripts.
|
||||
Presto, Google Spreadsheets, Cloudera Impala, Hive and custom scripts.
|
||||
|
||||
**_re:dash_** consists of two parts:
|
||||
|
||||
|
||||
@@ -70,6 +70,13 @@ class BaseQueryRunner(object):
|
||||
def get_schema(self):
|
||||
return []
|
||||
|
||||
def _run_query_internal(self, query):
|
||||
results, error = self.run_query(query)
|
||||
|
||||
if error is not None:
|
||||
raise Exception("Failed running query [%s]." % query)
|
||||
return json.loads(results)['rows']
|
||||
|
||||
@classmethod
|
||||
def to_dict(cls):
|
||||
return {
|
||||
|
||||
135
redash/query_runner/hive_ds.py
Normal file
135
redash/query_runner/hive_ds.py
Normal file
@@ -0,0 +1,135 @@
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from redash.query_runner import *
|
||||
from redash.utils import JSONEncoder
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
from pyhive import hive
|
||||
enabled = True
|
||||
except ImportError, e:
|
||||
logger.exception(e)
|
||||
logger.warning("Missing dependencies. Please install pyhive.")
|
||||
logger.warning("You can use pip: pip install pyhive")
|
||||
enabled = False
|
||||
|
||||
COLUMN_NAME = 0
|
||||
COLUMN_TYPE = 1
|
||||
|
||||
types_map = {
|
||||
'BIGINT': TYPE_INTEGER,
|
||||
'TINYINT': TYPE_INTEGER,
|
||||
'SMALLINT': TYPE_INTEGER,
|
||||
'INT': TYPE_INTEGER,
|
||||
'DOUBLE': TYPE_FLOAT,
|
||||
'DECIMAL': TYPE_FLOAT,
|
||||
'FLOAT': TYPE_FLOAT,
|
||||
'REAL': TYPE_FLOAT,
|
||||
'BOOLEAN': TYPE_BOOLEAN,
|
||||
'TIMESTAMP': TYPE_DATETIME,
|
||||
'DATE': TYPE_DATETIME,
|
||||
'CHAR': TYPE_STRING,
|
||||
'STRING': TYPE_STRING,
|
||||
'VARCHAR': TYPE_STRING
|
||||
}
|
||||
|
||||
|
||||
class Hive(BaseQueryRunner):
|
||||
@classmethod
|
||||
def configuration_schema(cls):
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"host": {
|
||||
"type": "string"
|
||||
},
|
||||
"port": {
|
||||
"type": "number"
|
||||
},
|
||||
"database": {
|
||||
"type": "string"
|
||||
},
|
||||
"username": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["host"]
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def annotate_query(cls):
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def type(cls):
|
||||
return "hive"
|
||||
|
||||
def __init__(self, configuration_json):
|
||||
super(Hive, self).__init__(configuration_json)
|
||||
|
||||
def get_schema(self):
|
||||
try:
|
||||
schemas_query = "show schemas"
|
||||
|
||||
tables_query = "show tables in %s"
|
||||
|
||||
columns_query = "show columns in %s"
|
||||
|
||||
schema = {}
|
||||
for schema_name in filter(lambda a: len(a) > 0, map(lambda a: str(a['database_name']), self._run_query_internal(schemas_query))):
|
||||
for table_name in filter(lambda a: len(a) > 0, map(lambda a: str(a['tab_name']), self._run_query_internal(tables_query % schema_name))):
|
||||
columns = filter(lambda a: len(a) > 0, map(lambda a: str(a['field']), self._run_query_internal(columns_query % table_name)))
|
||||
|
||||
if schema_name != 'default':
|
||||
table_name = '{}.{}'.format(schema_name, table_name)
|
||||
|
||||
schema[table_name] = {'name': table_name, 'columns': columns}
|
||||
except Exception, e:
|
||||
raise sys.exc_info()[1], None, sys.exc_info()[2]
|
||||
return schema.values()
|
||||
|
||||
def run_query(self, query):
|
||||
|
||||
connection = None
|
||||
try:
|
||||
connection = hive.connect(**self.configuration)
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
cursor.execute(query)
|
||||
|
||||
column_names = []
|
||||
columns = []
|
||||
|
||||
for column in cursor.description:
|
||||
column_name = column[COLUMN_NAME]
|
||||
column_names.append(column_name)
|
||||
|
||||
columns.append({
|
||||
'name': column_name,
|
||||
'friendly_name': column_name,
|
||||
'type': types_map.get(column[COLUMN_TYPE], None)
|
||||
})
|
||||
|
||||
rows = [dict(zip(column_names, row)) for row in cursor]
|
||||
|
||||
data = {'columns': columns, 'rows': rows}
|
||||
json_data = json.dumps(data, cls=JSONEncoder)
|
||||
error = None
|
||||
cursor.close()
|
||||
except KeyboardInterrupt:
|
||||
connection.cancel()
|
||||
error = "Query cancelled by user."
|
||||
json_data = None
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
raise sys.exc_info()[1], None, sys.exc_info()[2]
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
return json_data, error
|
||||
|
||||
register(Hive)
|
||||
@@ -79,13 +79,6 @@ class Impala(BaseQueryRunner):
|
||||
def __init__(self, configuration_json):
|
||||
super(Impala, self).__init__(configuration_json)
|
||||
|
||||
def _run_query_internal(self, query):
|
||||
results, error = self.run_query(query)
|
||||
|
||||
if error is not None:
|
||||
raise Exception("Failed getting schema.")
|
||||
return json.loads(results)['rows']
|
||||
|
||||
def get_schema(self):
|
||||
try:
|
||||
schemas_query = "show schemas;"
|
||||
|
||||
@@ -126,6 +126,7 @@ QUERY_RUNNERS = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS",
|
||||
'redash.query_runner.influx_db',
|
||||
'redash.query_runner.elasticsearch',
|
||||
'redash.query_runner.presto',
|
||||
'redash.query_runner.hive_ds',
|
||||
'redash.query_runner.impala_ds',
|
||||
])))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user