From c850acb3b9ee7cdad4f63ba9dc2abb28af054d1a Mon Sep 17 00:00:00 2001 From: Lloyd Weehuizen Date: Fri, 1 Apr 2016 14:06:54 +1300 Subject: [PATCH 1/3] Extend ElasticSearch query_runner to support aggregations --- redash/query_runner/elasticsearch.py | 100 ++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/redash/query_runner/elasticsearch.py b/redash/query_runner/elasticsearch.py index 6640b2c32..59489de28 100644 --- a/redash/query_runner/elasticsearch.py +++ b/redash/query_runner/elasticsearch.py @@ -104,8 +104,6 @@ class BaseElasticSearch(BaseQueryRunner): r = requests.get(url, auth=self.auth) mappings_data = r.json() - logger.debug(mappings_data) - for index_name in mappings_data: index_mappings = mappings_data[index_name] for m in index_mappings.get("mappings", {}): @@ -132,6 +130,48 @@ class BaseElasticSearch(BaseQueryRunner): "type" : mappings.get(column_name, "string")}) result_columns_index[friendly_name] = result_columns[-1] + def getRow(rows, row): + if row == None: + row = {} + rows.append(row) + return row + + def collect_value(mappings, row, key, value, type): + if result_fields and key not in result_fields_index: + return + + mappings[key] = type + add_column_if_needed(mappings, key, key, result_columns, result_columns_index) + row[key] = value + + def collect_aggregations(mappings, rows, parentKey, data, row, result_columns, result_columns_index): + + if isinstance(data, dict): + + for key, value in data.iteritems(): + val = collect_aggregations(mappings, rows, parentKey if key == 'buckets' else key, value, row, result_columns, result_columns_index) + if val: + row = getRow(rows, row) + collect_value(mappings, row, key, val, 'long') + + for dataKey in ['value', 'doc_count']: + if not dataKey in data: + continue + if 'key' in data and len(data.keys()) == 2: + collect_value(mappings, row, data['key'] if not 'key_as_string' in data else data['key_as_string'], data[dataKey]) + else: + return data[dataKey] + + elif isinstance(data, list): + + for value in data: + resultRow = getRow(rows, row) + collect_aggregations(mappings, rows, parentKey, value, resultRow, result_columns, result_columns_index) + if 'key' in value: + collect_value(mappings, resultRow, parentKey, value['key'], 'string') + + return None + result_columns_index = {c["name"] : c for c in result_columns} result_fields_index = {} @@ -139,27 +179,49 @@ class BaseElasticSearch(BaseQueryRunner): for r in result_fields: result_fields_index[r] = None - for h in raw_result["hits"]["hits"]: - row = {} + if 'error' in raw_result: - for field, column in ELASTICSEARCH_BUILTIN_FIELDS_MAPPING.iteritems(): - if field in h: - add_column_if_needed(mappings, field, column, result_columns, result_columns_index) - row[column] = h[field] + error = raw_result['error'] + if len(error) > 10240: + error = error[:10240] + '... continues' - column_name = "_source" if "_source" in h else "fields" - for column in h[column_name]: - if result_fields and column not in result_fields_index: - continue + raise Exception(error) - add_column_if_needed(mappings, column, column, result_columns, result_columns_index) + elif 'aggregations' in raw_result: - value = h[column_name][column] - row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value + if result_fields: + for field in result_fields: + add_column_if_needed(mappings, field, field, result_columns, result_columns_index) + for key, data in raw_result["aggregations"].iteritems(): + collect_aggregations(mappings, result_rows, key, data, None, result_columns, result_columns_index) + + logger.debug("result_rows", str(result_rows)) + logger.debug("result_columns", str(result_columns)) + + elif 'hits' in raw_result and 'hits' in raw_result['hits']: + + if result_fields: + for field in result_fields: + add_column_if_needed(mappings, field, field, result_columns, result_columns_index) + + for h in raw_result["hits"]["hits"]: + row = {} + + column_name = "_source" if "_source" in h else "fields" + for column in h[column_name]: + if result_fields and column not in result_fields_index: + continue + + add_column_if_needed(mappings, column, column, result_columns, result_columns_index) + + value = h[column_name][column] + row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value - if row and len(row) > 0: result_rows.append(row) + else: + + raise Exception('Seems you\'ve hit an unsupported feature') class Kibana(BaseElasticSearch): @@ -272,6 +334,7 @@ class ElasticSearch(BaseElasticSearch): query_dict = json.loads(query) index_name = query_dict.pop("index", "") + result_fields = query_dict.pop("result_fields", None) if not self.server_url: error = "Missing configuration key 'server'" @@ -282,8 +345,6 @@ class ElasticSearch(BaseElasticSearch): mappings = self._get_mappings(mapping_url) - logger.debug(json.dumps(mappings, indent=4)) - params = {"source": json.dumps(query_dict)} logger.debug("Using URL: %s", url) logger.debug("Using params : %s", params) @@ -292,7 +353,7 @@ class ElasticSearch(BaseElasticSearch): result_columns = [] result_rows = [] - self._parse_results(mappings, None, r.json(), result_columns, result_rows) + self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows) json_data = json.dumps({ "columns" : result_columns, @@ -309,3 +370,4 @@ class ElasticSearch(BaseElasticSearch): register(Kibana) register(ElasticSearch) + From 203cf6e28b136229b598b8e0921bbae73770779c Mon Sep 17 00:00:00 2001 From: Lloyd Weehuizen Date: Mon, 2 May 2016 10:18:11 +1200 Subject: [PATCH 2/3] Style updates --- redash/query_runner/elasticsearch.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/redash/query_runner/elasticsearch.py b/redash/query_runner/elasticsearch.py index 59489de28..dd683e93e 100644 --- a/redash/query_runner/elasticsearch.py +++ b/redash/query_runner/elasticsearch.py @@ -130,8 +130,8 @@ class BaseElasticSearch(BaseQueryRunner): "type" : mappings.get(column_name, "string")}) result_columns_index[friendly_name] = result_columns[-1] - def getRow(rows, row): - if row == None: + def get_row(rows, row): + if row is None: row = {} rows.append(row) return row @@ -144,31 +144,31 @@ class BaseElasticSearch(BaseQueryRunner): add_column_if_needed(mappings, key, key, result_columns, result_columns_index) row[key] = value - def collect_aggregations(mappings, rows, parentKey, data, row, result_columns, result_columns_index): + def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index): if isinstance(data, dict): for key, value in data.iteritems(): - val = collect_aggregations(mappings, rows, parentKey if key == 'buckets' else key, value, row, result_columns, result_columns_index) + val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index) if val: - row = getRow(rows, row) + row = get_row(rows, row) collect_value(mappings, row, key, val, 'long') - for dataKey in ['value', 'doc_count']: - if not dataKey in data: + for data_key in ['value', 'doc_count']: + if data_key not in data: continue if 'key' in data and len(data.keys()) == 2: - collect_value(mappings, row, data['key'] if not 'key_as_string' in data else data['key_as_string'], data[dataKey]) + collect_value(mappings, row, data['key'] if not 'key_as_string' in data else data['key_as_string'], data[data_key]) else: - return data[dataKey] + return data[data_key] elif isinstance(data, list): for value in data: - resultRow = getRow(rows, row) - collect_aggregations(mappings, rows, parentKey, value, resultRow, result_columns, result_columns_index) + result_row = get_row(rows, row) + collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index) if 'key' in value: - collect_value(mappings, resultRow, parentKey, value['key'], 'string') + collect_value(mappings, result_row, parent_key, value['key'], 'string') return None @@ -221,7 +221,7 @@ class BaseElasticSearch(BaseQueryRunner): result_rows.append(row) else: - raise Exception('Seems you\'ve hit an unsupported feature') + raise Exception("Redash failed to parse the results it got from ElasticSearch.") class Kibana(BaseElasticSearch): From 3f208c03fd2e56246567f57a3c0715f23f55728f Mon Sep 17 00:00:00 2001 From: Lloyd Weehuizen Date: Fri, 27 May 2016 10:23:16 +1200 Subject: [PATCH 3/3] Add missing type parameter to collect_value call --- redash/query_runner/elasticsearch.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redash/query_runner/elasticsearch.py b/redash/query_runner/elasticsearch.py index dd683e93e..ced0af275 100644 --- a/redash/query_runner/elasticsearch.py +++ b/redash/query_runner/elasticsearch.py @@ -158,7 +158,8 @@ class BaseElasticSearch(BaseQueryRunner): if data_key not in data: continue if 'key' in data and len(data.keys()) == 2: - collect_value(mappings, row, data['key'] if not 'key_as_string' in data else data['key_as_string'], data[data_key]) + key_is_string = 'key_as_string' in data + collect_value(mappings, row, data['key'] if not key_is_string else data['key_as_string'], data[data_key], 'long' if not key_is_string else 'string') else: return data[data_key]