Enable schema browser for elasticsearch

Split _get_mappings into 2 functions.
Add _get_query_mappings which is used by existing logic
and parses the mappings json for query mappings.
Add get_schema which uses basic _get_mappings and recurses
through the mappings json to build a schema for the indices.
This commit is contained in:
Adam Griffiths
2016-12-21 15:22:04 +11:00
parent 9d579dc089
commit e0d6d3feeb

View File

@@ -102,26 +102,11 @@ class BaseElasticSearch(BaseQueryRunner):
def _get_mappings(self, url):
mappings = {}
error = None
try:
r = requests.get(url, auth=self.auth)
r.raise_for_status()
mappings_data = r.json()
for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if property_name not in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))
mappings = r.json()
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
@@ -133,6 +118,59 @@ class BaseElasticSearch(BaseQueryRunner):
return mappings, error
def _get_query_mappings(self, url):
mappings, error = self._get_mappings(url)
if error:
return mappings, error
for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if property_name not in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))
return mappings, error
def get_schema(self, *args, **kwargs):
def parse_doc(doc, path=None):
'''Recursively parse a doc type dictionary
'''
path = path or []
result = []
for field, description in doc['properties'].items():
if 'properties' in description:
result.extend(parse_doc(description, path + [field]))
else:
result.append('.'.join(path + [field]))
return result
schema = {}
url = "{0}/_mappings".format(self.server_url)
mappings, error = self._get_mappings(url)
if mappings:
# make a schema for each index
# the index contains a mappings dict with documents
# in a hierarchical format
for name, index in mappings.items():
columns = []
schema[name] = {'name': name}
for doc, items in index['mappings'].items():
columns.extend(parse_doc(items))
# remove duplicates
# sort alphabetically
schema[name]['columns'] = sorted(set(columns))
return schema.values()
def _parse_results(self, mappings, result_fields, raw_result, result_columns, result_rows):
def add_column_if_needed(mappings, column_name, friendly_name, result_columns, result_columns_index):
if friendly_name not in result_columns_index:
@@ -290,7 +328,7 @@ class Kibana(BaseElasticSearch):
url = "{0}/{1}/_search?".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)
mappings, error = self._get_mappings(mapping_url)
mappings, error = self._get_query_mappings(mapping_url)
if error:
return None, error
#logger.debug(json.dumps(mappings, indent=4))
@@ -369,7 +407,7 @@ class ElasticSearch(BaseElasticSearch):
url = "{0}/{1}/_search".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)
mappings, error = self._get_mappings(mapping_url)
mappings, error = self._get_query_mappings(mapping_url)
if error:
return None, error