mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Merge pull request #967 from lloydw/master
Add: extend ElasticSearch query_runner to support aggregations
This commit is contained in:
@@ -105,8 +105,6 @@ class BaseElasticSearch(BaseQueryRunner):
|
|||||||
r = requests.get(url, auth=self.auth)
|
r = requests.get(url, auth=self.auth)
|
||||||
mappings_data = r.json()
|
mappings_data = r.json()
|
||||||
|
|
||||||
logger.debug(mappings_data)
|
|
||||||
|
|
||||||
for index_name in mappings_data:
|
for index_name in mappings_data:
|
||||||
index_mappings = mappings_data[index_name]
|
index_mappings = mappings_data[index_name]
|
||||||
for m in index_mappings.get("mappings", {}):
|
for m in index_mappings.get("mappings", {}):
|
||||||
@@ -133,6 +131,49 @@ class BaseElasticSearch(BaseQueryRunner):
|
|||||||
"type" : mappings.get(column_name, "string")})
|
"type" : mappings.get(column_name, "string")})
|
||||||
result_columns_index[friendly_name] = result_columns[-1]
|
result_columns_index[friendly_name] = result_columns[-1]
|
||||||
|
|
||||||
|
def get_row(rows, row):
|
||||||
|
if row is None:
|
||||||
|
row = {}
|
||||||
|
rows.append(row)
|
||||||
|
return row
|
||||||
|
|
||||||
|
def collect_value(mappings, row, key, value, type):
|
||||||
|
if result_fields and key not in result_fields_index:
|
||||||
|
return
|
||||||
|
|
||||||
|
mappings[key] = type
|
||||||
|
add_column_if_needed(mappings, key, key, result_columns, result_columns_index)
|
||||||
|
row[key] = value
|
||||||
|
|
||||||
|
def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index):
|
||||||
|
|
||||||
|
if isinstance(data, dict):
|
||||||
|
|
||||||
|
for key, value in data.iteritems():
|
||||||
|
val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index)
|
||||||
|
if val:
|
||||||
|
row = get_row(rows, row)
|
||||||
|
collect_value(mappings, row, key, val, 'long')
|
||||||
|
|
||||||
|
for data_key in ['value', 'doc_count']:
|
||||||
|
if data_key not in data:
|
||||||
|
continue
|
||||||
|
if 'key' in data and len(data.keys()) == 2:
|
||||||
|
key_is_string = 'key_as_string' in data
|
||||||
|
collect_value(mappings, row, data['key'] if not key_is_string else data['key_as_string'], data[data_key], 'long' if not key_is_string else 'string')
|
||||||
|
else:
|
||||||
|
return data[data_key]
|
||||||
|
|
||||||
|
elif isinstance(data, list):
|
||||||
|
|
||||||
|
for value in data:
|
||||||
|
result_row = get_row(rows, row)
|
||||||
|
collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index)
|
||||||
|
if 'key' in value:
|
||||||
|
collect_value(mappings, result_row, parent_key, value['key'], 'string')
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
result_columns_index = {c["name"] : c for c in result_columns}
|
result_columns_index = {c["name"] : c for c in result_columns}
|
||||||
|
|
||||||
result_fields_index = {}
|
result_fields_index = {}
|
||||||
@@ -140,14 +181,35 @@ class BaseElasticSearch(BaseQueryRunner):
|
|||||||
for r in result_fields:
|
for r in result_fields:
|
||||||
result_fields_index[r] = None
|
result_fields_index[r] = None
|
||||||
|
|
||||||
|
if 'error' in raw_result:
|
||||||
|
|
||||||
|
error = raw_result['error']
|
||||||
|
if len(error) > 10240:
|
||||||
|
error = error[:10240] + '... continues'
|
||||||
|
|
||||||
|
raise Exception(error)
|
||||||
|
|
||||||
|
elif 'aggregations' in raw_result:
|
||||||
|
|
||||||
|
if result_fields:
|
||||||
|
for field in result_fields:
|
||||||
|
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
|
||||||
|
|
||||||
|
for key, data in raw_result["aggregations"].iteritems():
|
||||||
|
collect_aggregations(mappings, result_rows, key, data, None, result_columns, result_columns_index)
|
||||||
|
|
||||||
|
logger.debug("result_rows", str(result_rows))
|
||||||
|
logger.debug("result_columns", str(result_columns))
|
||||||
|
|
||||||
|
elif 'hits' in raw_result and 'hits' in raw_result['hits']:
|
||||||
|
|
||||||
|
if result_fields:
|
||||||
|
for field in result_fields:
|
||||||
|
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
|
||||||
|
|
||||||
for h in raw_result["hits"]["hits"]:
|
for h in raw_result["hits"]["hits"]:
|
||||||
row = {}
|
row = {}
|
||||||
|
|
||||||
for field, column in ELASTICSEARCH_BUILTIN_FIELDS_MAPPING.iteritems():
|
|
||||||
if field in h:
|
|
||||||
add_column_if_needed(mappings, field, column, result_columns, result_columns_index)
|
|
||||||
row[column] = h[field]
|
|
||||||
|
|
||||||
column_name = "_source" if "_source" in h else "fields"
|
column_name = "_source" if "_source" in h else "fields"
|
||||||
for column in h[column_name]:
|
for column in h[column_name]:
|
||||||
if result_fields and column not in result_fields_index:
|
if result_fields and column not in result_fields_index:
|
||||||
@@ -158,9 +220,10 @@ class BaseElasticSearch(BaseQueryRunner):
|
|||||||
value = h[column_name][column]
|
value = h[column_name][column]
|
||||||
row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value
|
row[column] = value[0] if isinstance(value, list) and len(value) == 1 else value
|
||||||
|
|
||||||
|
|
||||||
if row and len(row) > 0:
|
|
||||||
result_rows.append(row)
|
result_rows.append(row)
|
||||||
|
else:
|
||||||
|
|
||||||
|
raise Exception("Redash failed to parse the results it got from ElasticSearch.")
|
||||||
|
|
||||||
|
|
||||||
class Kibana(BaseElasticSearch):
|
class Kibana(BaseElasticSearch):
|
||||||
@@ -273,6 +336,7 @@ class ElasticSearch(BaseElasticSearch):
|
|||||||
query_dict = json.loads(query)
|
query_dict = json.loads(query)
|
||||||
|
|
||||||
index_name = query_dict.pop("index", "")
|
index_name = query_dict.pop("index", "")
|
||||||
|
result_fields = query_dict.pop("result_fields", None)
|
||||||
|
|
||||||
if not self.server_url:
|
if not self.server_url:
|
||||||
error = "Missing configuration key 'server'"
|
error = "Missing configuration key 'server'"
|
||||||
@@ -283,8 +347,6 @@ class ElasticSearch(BaseElasticSearch):
|
|||||||
|
|
||||||
mappings = self._get_mappings(mapping_url)
|
mappings = self._get_mappings(mapping_url)
|
||||||
|
|
||||||
logger.debug(json.dumps(mappings, indent=4))
|
|
||||||
|
|
||||||
params = {"source": json.dumps(query_dict)}
|
params = {"source": json.dumps(query_dict)}
|
||||||
logger.debug("Using URL: %s", url)
|
logger.debug("Using URL: %s", url)
|
||||||
logger.debug("Using params : %s", params)
|
logger.debug("Using params : %s", params)
|
||||||
@@ -293,7 +355,7 @@ class ElasticSearch(BaseElasticSearch):
|
|||||||
|
|
||||||
result_columns = []
|
result_columns = []
|
||||||
result_rows = []
|
result_rows = []
|
||||||
self._parse_results(mappings, None, r.json(), result_columns, result_rows)
|
self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows)
|
||||||
|
|
||||||
json_data = json.dumps({
|
json_data = json.dumps({
|
||||||
"columns" : result_columns,
|
"columns" : result_columns,
|
||||||
@@ -310,3 +372,4 @@ class ElasticSearch(BaseElasticSearch):
|
|||||||
|
|
||||||
register(Kibana)
|
register(Kibana)
|
||||||
register(ElasticSearch)
|
register(ElasticSearch)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user