add pagination and base_url to JSON query_runner (#6499)

This commit is contained in:
Ken Michalak
2023-10-18 08:08:37 -05:00
committed by GitHub
parent ac9f24a781
commit 7b03e60f9d
2 changed files with 196 additions and 17 deletions

View File

@@ -1,5 +1,6 @@
import datetime
import logging
from urllib.parse import urljoin
import yaml
from funcy import compact, project
@@ -61,7 +62,7 @@ def add_column(columns, column_name, column_type):
columns.append({"name": column_name, "friendly_name": column_name, "type": column_type})
def _apply_path_search(response, path):
def _apply_path_search(response, path, default=None):
if path is None:
return response
@@ -71,6 +72,8 @@ def _apply_path_search(response, path):
current_path = path_parts.pop()
if current_path in response:
response = response[current_path]
elif default is not None:
return default
else:
raise Exception("Couldn't find path {} in response.".format(path))
@@ -78,6 +81,8 @@ def _apply_path_search(response, path):
def _normalize_json(data, path):
if not data:
return None
data = _apply_path_search(data, path)
if isinstance(data, dict):
@@ -94,9 +99,7 @@ def _sort_columns_with_fields(columns, fields):
# TODO: merge the logic here with the one in MongoDB's queyr runner
def parse_json(data, path, fields):
data = _normalize_json(data, path)
def parse_json(data, fields):
rows = []
columns = []
@@ -130,17 +133,19 @@ def parse_json(data, path, fields):
class JSON(BaseHTTPQueryRunner):
requires_url = False
base_url_title = "Base URL"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"base_url": {"type": "string", "title": cls.base_url_title},
"username": {"type": "string", "title": cls.username_title},
"password": {"type": "string", "title": cls.password_title},
},
"secret": ["password"],
"order": ["username", "password"],
"order": ["base_url", "username", "password"],
}
def __init__(self, configuration):
@@ -153,6 +158,16 @@ class JSON(BaseHTTPQueryRunner):
def run_query(self, query, user):
query = parse_query(query)
results, error = self._run_json_query(query)
if error is not None:
return None, error
data = json_dumps(results)
if data:
return data, None
return None, "Got empty response from '{}'.".format(query["url"])
def _run_json_query(self, query):
if not isinstance(query, dict):
raise QueryParseError("Query should be a YAML object describing the URL to query.")
@@ -165,13 +180,15 @@ class JSON(BaseHTTPQueryRunner):
fields = query.get("fields")
path = query.get("path")
if "pagination" in query:
pagination = RequestPagination.from_config(self.configuration, query["pagination"])
else:
pagination = None
if isinstance(request_options.get("auth", None), list):
request_options["auth"] = tuple(request_options["auth"])
elif self.configuration.get("username") or self.configuration.get("password"):
request_options["auth"] = (
self.configuration.get("username"),
self.configuration.get("password"),
)
request_options["auth"] = (self.configuration.get("username"), self.configuration.get("password"))
if method not in ("get", "post"):
raise QueryParseError("Only GET or POST methods are allowed.")
@@ -179,17 +196,91 @@ class JSON(BaseHTTPQueryRunner):
if fields and not isinstance(fields, list):
raise QueryParseError("'fields' needs to be a list.")
response, error = self.get_response(query["url"], http_method=method, **request_options)
results, error = self._get_all_results(query["url"], method, path, pagination, **request_options)
return parse_json(results, fields), error
if error is not None:
return None, error
def _get_all_results(self, url, method, result_path, pagination, **request_options):
"""Get all results from a paginated endpoint."""
base_url = self.configuration.get("base_url")
url = urljoin(base_url, url)
data = json_dumps(parse_json(response.json(), path, fields))
results = []
has_more = True
while has_more:
response, error = self._get_json_response(url, method, **request_options)
has_more = False
if data:
return data, None
else:
return None, "Got empty response from '{}'.".format(query["url"])
result = _normalize_json(response, result_path)
if result:
results.extend(result)
if pagination:
has_more, url, request_options = pagination.next(url, request_options, response)
return results, error
def _get_json_response(self, url, method, **request_options):
response, error = self.get_response(url, http_method=method, **request_options)
result = response.json() if error is None else {}
return result, error
class RequestPagination:
def next(self, url, request_options, response):
"""Checks the response for another page.
Returns:
has_more, next_url, next_request_options
"""
return False, None, request_options
@staticmethod
def from_config(configuration, pagination):
if not isinstance(pagination, dict) or not isinstance(pagination.get("type"), str):
raise QueryParseError("'pagination' should be an object with a `type` property")
if pagination["type"] == "url":
return UrlPagination(pagination)
elif pagination["type"] == "token":
return TokenPagination(pagination)
raise QueryParseError("Unknown 'pagination.type' {}".format(pagination["type"]))
class UrlPagination(RequestPagination):
def __init__(self, pagination):
self.path = pagination.get("path", "_links.next.href")
if not isinstance(self.path, str):
raise QueryParseError("'pagination.path' should be a string")
def next(self, url, request_options, response):
next_url = _apply_path_search(response, self.path, "")
if not next_url:
return False, None, request_options
next_url = urljoin(url, next_url)
return True, next_url, request_options
class TokenPagination(RequestPagination):
def __init__(self, pagination):
self.fields = pagination.get("fields", ["next_page_token", "page_token"])
if not isinstance(self.fields, list) or len(self.fields) != 2:
raise QueryParseError("'pagination.fields' should be a list of 2 field names")
def next(self, url, request_options, response):
next_token = _apply_path_search(response, self.fields[0], "")
if not next_token:
return False, None, request_options
params = request_options.get("params", {})
# prevent infinite loop that can happen if self.fields[1] is wrong
if next_token == params.get(self.fields[1]):
raise Exception("{} did not change; possible misconfiguration".format(self.fields[0]))
params[self.fields[1]] = next_token
request_options["params"] = params
return True, url, request_options
register(JSON)

View File

@@ -0,0 +1,88 @@
"""
Some test cases for JSON api runner
"""
from unittest import TestCase
from urllib.parse import urlencode, urljoin
from redash.query_runner.json_ds import JSON
def mock_api(url, method, **request_options):
if "params" in request_options:
qs = urlencode(request_options["params"])
url = urljoin(url, "?{}".format(qs))
data, error = None, None
if url == "http://localhost/basics":
data = [{"id": 1}, {"id": 2}]
elif url == "http://localhost/token-test":
data = {"next_page_token": "2", "records": [{"id": 1}, {"id": 2}]}
elif url == "http://localhost/token-test?page_token=2":
data = {"next_page_token": "3", "records": [{"id": 3}, {"id": 4}]}
elif url == "http://localhost/token-test?page_token=3":
data = {"records": [{"id": 5}]}
elif url == "http://localhost/hateoas":
data = {
"_embedded": {"records": [{"id": 10}, {"id": 11}]},
"_links": {
"first": {"href": "http://localhost/hateoas"},
"self": {"href": "http://localhost/hateoas"},
"next": {"href": "http://localhost/hateoas?page=2"},
"last": {"href": "http://localhost/hateoas?page=2"},
},
"page": {"size": 2, "totalElements": 3, "totalPages": 2},
}
elif url == "http://localhost/hateoas?page=2":
data = {
"_embedded": {"records": [{"id": 12}]},
"_links": {
"first": {"href": "http://localhost/hateoas"},
"self": {"href": "http://localhost/hateoas?page=2"},
"prev": {"href": "http://localhost/hateoas"},
"last": {"href": "http://localhost/hateoas?page=2"},
},
"page": {"size": 2, "totalElements": 3, "totalPages": 2},
}
else:
error = "404: {} not found".format(url)
return data, error
class TestJSON(TestCase):
def setUp(self):
self.runner = JSON({"base_url": "http://localhost/"})
self.runner._get_json_response = mock_api
def test_basics(self):
q = {"url": "basics"}
results, error = self.runner._run_json_query(q)
expected = [{"id": 1}, {"id": 2}]
self.assertEqual(results["rows"], expected)
def test_token_pagination(self):
q = {
"url": "token-test",
"pagination": {"type": "token", "fields": ["next_page_token", "page_token"]},
"path": "records",
}
results, error = self.runner._run_json_query(q)
self.assertIsNone(error)
expected = [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}]
self.assertEqual(results["rows"], expected)
def test_url_pagination(self):
q = {
"url": "hateoas",
"pagination": {"type": "url", "path": "_links.next.href"},
"path": "_embedded.records",
"fields": ["id"],
}
results, error = self.runner._run_json_query(q)
self.assertIsNone(error)
expected = [{"id": 10}, {"id": 11}, {"id": 12}]
self.assertEqual(results["rows"], expected)