mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
ODBC Based Databricks Connector (#4814)
* ODBC Based Databricks connector. * Install Databricks' ODBC driver in Docker image * Add useragent string. * Add Types enum to redash.query_runner to replace the seprate constants. * Databricks connector: 1. Parse types. 2. Send additional connection options. 3. Correctly parse errors. * Switch to TYPE constants to use code with Python 2. * Add note about the Databricks driver terms and conditions. * Show message about Databricks driver terms and conditions. * Handle cases when the query doesn't return any results. * Update redash/query_runner/databricks.py Co-Authored-By: Jesse <jesse@whitehouse.dev> * Use new Databricks logo * Fix connection string options Co-authored-by: Jesse <jesse@whitehouse.dev>
This commit is contained in:
13
Dockerfile
13
Dockerfile
@@ -38,8 +38,10 @@ RUN apt-get update && \
|
||||
libssl-dev \
|
||||
default-libmysqlclient-dev \
|
||||
freetds-dev \
|
||||
libsasl2-dev && \
|
||||
# MSSQL ODBC Driver:
|
||||
libsasl2-dev \
|
||||
unzip \
|
||||
libsasl2-modules-gssapi-mit && \
|
||||
# MSSQL ODBC Driver:
|
||||
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
|
||||
curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
|
||||
apt-get update && \
|
||||
@@ -47,6 +49,13 @@ RUN apt-get update && \
|
||||
apt-get clean && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
ADD https://databricks.com/wp-content/uploads/2.6.10.1010-2/SimbaSparkODBC-2.6.10.1010-2-Debian-64bit.zip /tmp/simba_odbc.zip
|
||||
RUN unzip /tmp/simba_odbc.zip -d /tmp/ \
|
||||
&& dpkg -i /tmp/SimbaSparkODBC-2.6.10.1010-2-Debian-64bit/simbaspark_2.6.10.1010-2_amd64.deb \
|
||||
&& echo "[Simba]\nDriver = /opt/simba/spark/lib/64/libsparkodbc_sb64.so" >> /etc/odbcinst.ini \
|
||||
&& rm /tmp/simba_odbc.zip \
|
||||
&& rm -rf /tmp/SimbaSparkODBC*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Disalbe PIP Cache and Version Check
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 2.5 KiB |
@@ -116,6 +116,15 @@ class CreateSourceDialog extends React.Component {
|
||||
)}
|
||||
</div>
|
||||
<DynamicForm id="sourceForm" fields={fields} onSubmit={this.createSource} feedbackIcons hideSubmitButton />
|
||||
{selectedType.type === "databricks" && (
|
||||
<small>
|
||||
By using the Databricks Data Source you agree to the Databricks JDBC/ODBC{" "}
|
||||
<a href="https://databricks.com/spark/odbc-driver-download" target="_blank" rel="noopener noreferrer">
|
||||
Driver Download Terms and Conditions
|
||||
</a>
|
||||
.
|
||||
</small>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,17 +1,43 @@
|
||||
import base64
|
||||
from .hive_ds import Hive
|
||||
from redash.query_runner import register
|
||||
import datetime
|
||||
from redash.query_runner import (
|
||||
register,
|
||||
BaseSQLQueryRunner,
|
||||
TYPE_STRING,
|
||||
TYPE_BOOLEAN,
|
||||
TYPE_DATE,
|
||||
TYPE_DATETIME,
|
||||
TYPE_INTEGER,
|
||||
TYPE_FLOAT,
|
||||
)
|
||||
from redash.utils import json_dumps
|
||||
from redash import __version__
|
||||
|
||||
try:
|
||||
from pyhive import hive
|
||||
from thrift.transport import THttpClient
|
||||
import pyodbc
|
||||
|
||||
enabled = True
|
||||
except ImportError:
|
||||
enabled = False
|
||||
|
||||
|
||||
class Databricks(Hive):
|
||||
TYPES_MAP = {
|
||||
str: TYPE_STRING,
|
||||
bool: TYPE_BOOLEAN,
|
||||
datetime.date: TYPE_DATE,
|
||||
datetime.datetime: TYPE_DATETIME,
|
||||
int: TYPE_INTEGER,
|
||||
float: TYPE_FLOAT,
|
||||
}
|
||||
|
||||
|
||||
def _build_odbc_connection_string(**kwargs):
|
||||
return ";".join([f"{k}={v}" for k, v in kwargs.items()])
|
||||
|
||||
|
||||
class Databricks(BaseSQLQueryRunner):
|
||||
noop_query = "SELECT 1"
|
||||
should_annotate_query = False
|
||||
|
||||
@classmethod
|
||||
def type(cls):
|
||||
return "databricks"
|
||||
@@ -26,67 +52,101 @@ class Databricks(Hive):
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"host": {"type": "string"},
|
||||
"database": {"type": "string"},
|
||||
"http_path": {"type": "string", "title": "HTTP Path"},
|
||||
# We're using `http_password` here for legacy reasons
|
||||
"http_password": {"type": "string", "title": "Access Token"},
|
||||
"schemas": {"type": "string", "title": "Schemas to Load Metadata For"},
|
||||
},
|
||||
"order": ["host", "http_path", "http_password", "database"],
|
||||
"order": ["host", "http_path", "http_password"],
|
||||
"secret": ["http_password"],
|
||||
"required": ["host", "database", "http_path", "http_password"],
|
||||
"required": ["host", "http_path", "http_password"],
|
||||
}
|
||||
|
||||
def _get_connection(self):
|
||||
host = self.configuration["host"]
|
||||
def _get_cursor(self):
|
||||
user_agent = "Redash/{} (Databricks)".format(__version__.split("-")[0])
|
||||
connection_string = _build_odbc_connection_string(
|
||||
Driver="Simba",
|
||||
UID="token",
|
||||
PORT="443",
|
||||
SSL="1",
|
||||
THRIFTTRANSPORT="2",
|
||||
SPARKSERVERTYPE="3",
|
||||
AUTHMECH=3,
|
||||
# Use the query as is without rewriting:
|
||||
UseNativeQuery="1",
|
||||
# Automatically reconnect to the cluster if an error occurs
|
||||
AutoReconnect="1",
|
||||
# Minimum interval between consecutive polls for query execution status (1ms)
|
||||
AsyncExecPollInterval="1",
|
||||
UserAgentEntry=user_agent,
|
||||
HOST=self.configuration["host"],
|
||||
PWD=self.configuration["http_password"],
|
||||
HTTPPath=self.configuration["http_path"],
|
||||
)
|
||||
|
||||
# if path is set but is missing initial slash, append it
|
||||
path = self.configuration.get("http_path", "")
|
||||
if path and path[0] != "/":
|
||||
path = "/" + path
|
||||
connection = pyodbc.connect(connection_string, autocommit=True)
|
||||
return connection.cursor()
|
||||
|
||||
http_uri = "https://{}{}".format(host, path)
|
||||
def run_query(self, query, user):
|
||||
try:
|
||||
cursor = self._get_cursor()
|
||||
|
||||
transport = THttpClient.THttpClient(http_uri)
|
||||
cursor.execute(query)
|
||||
|
||||
password = self.configuration.get("http_password", "")
|
||||
auth = base64.b64encode(b"token:" + password.encode("ascii"))
|
||||
transport.setCustomHeaders({"Authorization": "Basic " + auth.decode()})
|
||||
if cursor.description is not None:
|
||||
data = cursor.fetchall()
|
||||
columns = self.fetch_columns(
|
||||
[
|
||||
(i[0], TYPES_MAP.get(i[1], TYPE_STRING))
|
||||
for i in cursor.description
|
||||
]
|
||||
)
|
||||
|
||||
connection = hive.connect(thrift_transport=transport)
|
||||
return connection
|
||||
rows = [
|
||||
dict(zip((column["name"] for column in columns), row))
|
||||
for row in data
|
||||
]
|
||||
|
||||
data = {"columns": columns, "rows": rows}
|
||||
json_data = json_dumps(data)
|
||||
error = None
|
||||
else:
|
||||
error = None
|
||||
json_data = json_dumps(
|
||||
{
|
||||
"columns": [{"name": "result", "type": TYPE_STRING}],
|
||||
"rows": [{"result": "No data was returned."}],
|
||||
}
|
||||
)
|
||||
|
||||
cursor.close()
|
||||
except pyodbc.Error as e:
|
||||
if len(e.args) > 1:
|
||||
error = str(e.args[1])
|
||||
else:
|
||||
error = str(e)
|
||||
json_data = None
|
||||
|
||||
return json_data, error
|
||||
|
||||
def _get_tables(self, schema):
|
||||
schemas_query = "show schemas"
|
||||
tables_query = "show tables in %s"
|
||||
columns_query = "show columns in %s.%s"
|
||||
cursor = self._get_cursor()
|
||||
|
||||
schemas = self._run_query_internal(schemas_query)
|
||||
schemas = self.configuration.get(
|
||||
"schemas", self.configuration.get("database", "")
|
||||
).split(",")
|
||||
|
||||
for schema_name in [
|
||||
a for a in [str(a["databaseName"]) for a in schemas] if len(a) > 0
|
||||
]:
|
||||
for table_name in [
|
||||
a
|
||||
for a in [
|
||||
str(a["tableName"])
|
||||
for a in self._run_query_internal(tables_query % schema_name)
|
||||
]
|
||||
if len(a) > 0
|
||||
]:
|
||||
columns = [
|
||||
a
|
||||
for a in [
|
||||
str(a["col_name"])
|
||||
for a in self._run_query_internal(
|
||||
columns_query % (schema_name, table_name)
|
||||
)
|
||||
]
|
||||
if len(a) > 0
|
||||
]
|
||||
for schema_name in schemas:
|
||||
cursor.columns(schema=schema_name)
|
||||
|
||||
if schema_name != "default":
|
||||
table_name = "{}.{}".format(schema_name, table_name)
|
||||
for column in cursor:
|
||||
table_name = "{}.{}".format(column[1], column[2])
|
||||
|
||||
if table_name not in schema:
|
||||
schema[table_name] = {"name": table_name, "columns": []}
|
||||
|
||||
schema[table_name]["columns"].append(column[3])
|
||||
|
||||
schema[table_name] = {"name": table_name, "columns": columns}
|
||||
return list(schema.values())
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user