mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12502: Support Impala to Impala federation
This patch adds support to read Impala tables in the Impala cluster through JDBC external data source. It also adds a new counter NumExternalDataSourceGetNext in profile for the total number of calls to ExternalDataSource::GetNext(). Setting query options for Impala will be supported in a following patch. Testing: - Added an end-to-end unit test to read Impala tables from Impala cluster through JDBC external data source. Manually ran the unit-test with Impala tables in Impala cluster on a remote host by setting $INTERNAL_LISTEN_HOST in jdbc.url as the ip address of the remote host on which an Impala cluster is running. - Added LDAP test for reading table through JDBC external data source with LDAP authentication. Manually ran the unit-test with Impala tables in a remote Impala cluster. - Passed core tests. Change-Id: I79ad3273932b658cb85c9c17cc834fa1b5fbd64f Reviewed-on: http://gerrit.cloudera.org:8080/20731 Reviewed-by: Abhishek Rawat <arawat@cloudera.com> Tested-by: Wenzhe Zhou <wzhou@cloudera.com>
This commit is contained in:
@@ -49,6 +49,9 @@ DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on
|
||||
|
||||
namespace impala {
|
||||
|
||||
PROFILE_DEFINE_COUNTER(NumExternalDataSourceGetNext, DEBUG, TUnit::UNIT,
|
||||
"The total number of calls to ExternalDataSource::GetNext()");
|
||||
|
||||
// $0 = num expected cols, $1 = actual num columns
|
||||
const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. "
|
||||
"Expected $0 but received $1. This likely indicates a problem with the data source "
|
||||
@@ -93,6 +96,8 @@ Status DataSourceScanNode::Prepare(RuntimeState* state) {
|
||||
data_src_node_.init_string));
|
||||
|
||||
cols_next_val_idx_.resize(tuple_desc_->slots().size(), 0);
|
||||
num_ext_data_source_get_next_ =
|
||||
PROFILE_NumExternalDataSourceGetNext.Instantiate(runtime_profile_);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -157,6 +162,7 @@ Status DataSourceScanNode::GetNextInputBatch() {
|
||||
Ubsan::MemSet(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size());
|
||||
TGetNextParams params;
|
||||
params.__set_scan_handle(scan_handle_);
|
||||
COUNTER_ADD(num_ext_data_source_get_next_, 1);
|
||||
RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
|
||||
RETURN_IF_ERROR(Status(input_batch_->status));
|
||||
RETURN_IF_ERROR(ValidateRowBatchSize());
|
||||
|
||||
@@ -98,6 +98,9 @@ class DataSourceScanNode : public ScanNode {
|
||||
/// the next row batch.
|
||||
std::vector<int> cols_next_val_idx_;
|
||||
|
||||
/// The total number of calls to ExternalDataSource::GetNext().
|
||||
RuntimeProfile::Counter* num_ext_data_source_get_next_;
|
||||
|
||||
/// Materializes the next row (next_row_idx_) into tuple. 'local_tz' is used as the
|
||||
/// local time-zone for materializing 'TYPE_TIMESTAMP' slots.
|
||||
Status MaterializeNextRow(const Timezone* local_tz, MemPool* mem_pool, Tuple* tuple);
|
||||
|
||||
@@ -207,6 +207,10 @@ if [[ $ARCH_NAME == 'aarch64' ]]; then
|
||||
export IMPALA_HADOOP_CLIENT_VERSION=3.3.6
|
||||
unset IMPALA_HADOOP_CLIENT_URL
|
||||
fi
|
||||
|
||||
# Impala JDBC driver for testing.
|
||||
export IMPALA_SIMBA_JDBC_DRIVER_VERSION=42-2.6.32.1041
|
||||
|
||||
# Thrift related environment variables.
|
||||
# IMPALA_THRIFT_POM_VERSION is used to populate IMPALA_THRIFT_JAVA_VERSION and
|
||||
# thrift.version in java/pom.xml.
|
||||
|
||||
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.THttpClient;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@CreateDS(name = "myDS",
|
||||
partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
|
||||
@@ -51,6 +53,8 @@ import org.junit.Test;
|
||||
* ldap authentication is being used.
|
||||
*/
|
||||
public class LdapHS2Test {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LdapHS2Test.class);
|
||||
|
||||
@ClassRule
|
||||
public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
|
||||
|
||||
@@ -92,7 +96,9 @@ public class LdapHS2Test {
|
||||
verifySuccess(fetchResp.getStatus());
|
||||
List<TColumn> columns = fetchResp.getResults().getColumns();
|
||||
assertEquals(1, columns.size());
|
||||
assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
|
||||
if (expectedResult != null) {
|
||||
assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
|
||||
}
|
||||
|
||||
return execResp.getOperationHandle();
|
||||
}
|
||||
@@ -704,4 +710,149 @@ public class LdapHS2Test {
|
||||
assertEquals(e.getMessage(), "HTTP Response code: 401");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests LDAP for reading Impala table through JDBC external data source.
|
||||
*/
|
||||
@Test
|
||||
public void testImpalaExtJdbcTables() throws Exception {
|
||||
setUp("");
|
||||
verifyMetrics(0, 0);
|
||||
THttpClient transport = new THttpClient("http://localhost:28000");
|
||||
Map<String, String> headers = new HashMap<String, String>();
|
||||
// Authenticate as 'Test1Ldap' with password '12345'
|
||||
headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
|
||||
transport.setCustomHeaders(headers);
|
||||
transport.open();
|
||||
TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
|
||||
|
||||
// Open a session which will get username 'Test1Ldap'.
|
||||
TOpenSessionReq openReq = new TOpenSessionReq();
|
||||
TOpenSessionResp openResp = client.OpenSession(openReq);
|
||||
TSessionHandle session = openResp.getSessionHandle();
|
||||
// One successful authentication.
|
||||
verifyMetrics(1, 0);
|
||||
|
||||
// Download Impala JDBC driver.
|
||||
String downloadImpalaJdbcDriver = new File(System.getenv("IMPALA_HOME"),
|
||||
"testdata/bin/download-impala-jdbc-driver.sh").getPath();
|
||||
String[] cmd = { downloadImpalaJdbcDriver };
|
||||
RunShellCommand.Run(cmd, /*shouldSucceed*/ true, "", "");
|
||||
|
||||
// Define queries.
|
||||
String fileSystemPrefix = System.getenv("FILESYSTEM_PREFIX");
|
||||
String internalListenHost = System.getenv("INTERNAL_LISTEN_HOST");
|
||||
|
||||
String dropDSQuery = "DROP DATA SOURCE IF EXISTS impala_jdbc_test_ds";
|
||||
String createDSQuery = String.format("CREATE DATA SOURCE impala_jdbc_test_ds " +
|
||||
"LOCATION '%s/test-warehouse/data-sources/jdbc-data-source.jar' " +
|
||||
"CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' " +
|
||||
"API_VERSION 'V1'", fileSystemPrefix);
|
||||
String dropTableQuery = "DROP TABLE IF EXISTS %s";
|
||||
// Set JDBC authentication mechanisms as LDAP (3) with username/password as
|
||||
// TEST_USER_1/TEST_PASSWORD_1.
|
||||
String createTableQuery = String.format("CREATE TABLE impala_jdbc_ext_test_table (" +
|
||||
"id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
|
||||
"int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
|
||||
"date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
|
||||
"PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
|
||||
"'{\"database.type\":\"IMPALA\", " +
|
||||
"\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
|
||||
"\"jdbc.auth\":\"AuthMech=3\", " +
|
||||
"\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
|
||||
"\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
|
||||
"ImpalaJDBC42.jar\", " +
|
||||
"\"dbcp.username\":\"%s\", " +
|
||||
"\"dbcp.password\":\"%s\", " +
|
||||
"\"table\":\"alltypes\"}')",
|
||||
internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
|
||||
// Set JDBC authentication mechanisms as LDAP with wrong password.
|
||||
String createTableWithWrongPassword =
|
||||
String.format("CREATE TABLE impala_jdbc_tbl_wrong_password (" +
|
||||
"id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
|
||||
"int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
|
||||
"date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
|
||||
"PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
|
||||
"'{\"database.type\":\"IMPALA\", " +
|
||||
"\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
|
||||
"\"jdbc.auth\":\"AuthMech=3\", " +
|
||||
"\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
|
||||
"\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
|
||||
"ImpalaJDBC42.jar\", " +
|
||||
"\"dbcp.username\":\"%s\", " +
|
||||
"\"dbcp.password\":\"wrong-password\", " +
|
||||
"\"table\":\"alltypes\"}')",
|
||||
internalListenHost, fileSystemPrefix, TEST_USER_1);
|
||||
// Set JDBC authentication mechanisms as LDAP without AuthMech.
|
||||
String createTableWithoutAuthMech =
|
||||
String.format("CREATE TABLE impala_jdbc_tbl_without_auth_mech (" +
|
||||
"id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, " +
|
||||
"int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, " +
|
||||
"date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP) " +
|
||||
"PRODUCED BY DATA SOURCE impala_jdbc_test_ds(" +
|
||||
"'{\"database.type\":\"IMPALA\", " +
|
||||
"\"jdbc.url\":\"jdbc:impala://%s:21050/functional\", " +
|
||||
"\"jdbc.driver\":\"com.cloudera.impala.jdbc.Driver\", " +
|
||||
"\"driver.url\":\"%s/test-warehouse/data-sources/jdbc-drivers/" +
|
||||
"ImpalaJDBC42.jar\", " +
|
||||
"\"dbcp.username\":\"%s\", " +
|
||||
"\"dbcp.password\":\"%s\", " +
|
||||
"\"table\":\"alltypes\"}')",
|
||||
internalListenHost, fileSystemPrefix, TEST_USER_1, TEST_PASSWORD_1);
|
||||
String selectQuery = "select string_col from %s where id=9";
|
||||
|
||||
// Run queries.
|
||||
//
|
||||
// Create data source and tables.
|
||||
execAndFetch(client, session, dropDSQuery, null);
|
||||
execAndFetch(client, session, createDSQuery, "Data source has been created.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_ext_test_table"), null);
|
||||
execAndFetch(client, session, createTableQuery, "Table has been created.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"), null);
|
||||
execAndFetch(client, session, createTableWithWrongPassword,
|
||||
"Table has been created.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"), null);
|
||||
execAndFetch(client, session, createTableWithoutAuthMech, "Table has been created.");
|
||||
|
||||
// Successfully access JDBC data source table with LDAP.
|
||||
execAndFetch(client, session,
|
||||
String.format(selectQuery, "impala_jdbc_ext_test_table"), "9");
|
||||
// Negative case for JDBC data source table with wrong password.
|
||||
String expectedError = "Error initialized or created transport for authentication";
|
||||
try {
|
||||
execAndFetch(client, session,
|
||||
String.format(selectQuery, "impala_jdbc_tbl_wrong_password"), "9");
|
||||
fail("Expected error: " + expectedError);
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage().contains(expectedError));
|
||||
}
|
||||
// Negative case for JDBC data source table without AuthMech.
|
||||
expectedError = "Communication link failure. Failed to connect to server";
|
||||
try {
|
||||
execAndFetch(client, session,
|
||||
String.format(selectQuery, "impala_jdbc_tbl_without_auth_mech"), "9");
|
||||
fail("Expected error: " + expectedError);
|
||||
} catch (Exception e) {
|
||||
assertTrue(String.format("Authentication failed with error: %s", e.getMessage()),
|
||||
e.getMessage().contains(expectedError));
|
||||
}
|
||||
|
||||
// Drop data source and tables.
|
||||
execAndFetch(client, session, dropDSQuery, "Data source has been dropped.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_ext_test_table"),
|
||||
"Table has been dropped.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_tbl_wrong_password"),
|
||||
"Table has been dropped.");
|
||||
execAndFetch(client, session,
|
||||
String.format(dropTableQuery, "impala_jdbc_tbl_without_auth_mech"),
|
||||
"Table has been dropped.");
|
||||
|
||||
// Two successful authentications for each ExecAndFetch().
|
||||
verifyMetrics(31, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,6 +229,9 @@ public class JdbcDataSource implements ExternalDataSource {
|
||||
initString = initString.substring(CACHE_CLASS_PREFIX.length());
|
||||
cacheClass_ = true;
|
||||
}
|
||||
// Replace '\n' with single space character so that one property setting in
|
||||
// initString can be broken into multiple lines for better readability.
|
||||
initString = initString.replace('\n', ' ');
|
||||
Map<String, String> config = new ObjectMapper().readValue(initString, typeRef);
|
||||
tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config);
|
||||
} catch (JsonProcessingException e) {
|
||||
@@ -294,6 +297,7 @@ public class JdbcDataSource implements ExternalDataSource {
|
||||
}
|
||||
// Execute query and get iterator
|
||||
tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
|
||||
LOG.trace("JDBC Query: " + sb.toString());
|
||||
|
||||
if (schema_.getColsSize() != 0) {
|
||||
int limit = -1;
|
||||
|
||||
@@ -24,5 +24,6 @@ public enum DatabaseType {
|
||||
ORACLE,
|
||||
POSTGRES,
|
||||
MSSQL,
|
||||
JETHRO_DATA
|
||||
JETHRO_DATA,
|
||||
IMPALA
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ public enum JdbcStorageConfig {
|
||||
// JDBC connection string, including the database type, IP address, port number, and
|
||||
// database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
|
||||
JDBC_URL("jdbc.url", true),
|
||||
// Authentication mechanisms of JDBC driver.
|
||||
JDBC_AUTH("jdbc.auth", false),
|
||||
// Class name of JDBC driver. For example, "org.postgresql.Driver"
|
||||
JDBC_DRIVER_CLASS("jdbc.driver", true),
|
||||
// Driver URL for downloading the Jar file package that is used to access the external
|
||||
|
||||
@@ -57,6 +57,10 @@ public class DatabaseAccessorFactory {
|
||||
accessor = new DB2DatabaseAccessor();
|
||||
break;
|
||||
|
||||
case IMPALA:
|
||||
accessor = new ImpalaDatabaseAccessor();
|
||||
break;
|
||||
|
||||
default:
|
||||
accessor = new GenericJdbcDatabaseAccessor();
|
||||
break;
|
||||
|
||||
@@ -48,6 +48,7 @@ import org.apache.impala.thrift.TStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
@@ -283,7 +284,12 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
|
||||
}
|
||||
|
||||
// essential properties
|
||||
dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
|
||||
String jdbcUrl = conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName());
|
||||
String jdbcAuth = conf.get(JdbcStorageConfig.JDBC_AUTH.getPropertyName());
|
||||
if (!Strings.isNullOrEmpty(jdbcAuth)) {
|
||||
jdbcUrl += ";" + jdbcAuth;
|
||||
}
|
||||
dbProperties.put("url", jdbcUrl);
|
||||
dbProperties.put("driverClassName",
|
||||
conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
|
||||
dbProperties.put("driverUrl",
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.impala.extdatasource.jdbc.dao;
|
||||
|
||||
/**
|
||||
* Impala specific data accessor. This is needed because Impala JDBC drivers do not
|
||||
* support generic LIMIT and OFFSET escape functions
|
||||
*/
|
||||
public class ImpalaDatabaseAccessor extends GenericJdbcDatabaseAccessor {
|
||||
|
||||
@Override
|
||||
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
|
||||
if (offset == 0) {
|
||||
return addLimitToQuery(sql, limit);
|
||||
} else {
|
||||
if (limit != -1) {
|
||||
return sql + " LIMIT " + limit + " OFFSET " + offset;
|
||||
} else {
|
||||
return sql;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String addLimitToQuery(String sql, int limit) {
|
||||
if (limit != -1) {
|
||||
return sql + " LIMIT " + limit;
|
||||
} else {
|
||||
return sql;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
72
testdata/bin/download-impala-jdbc-driver.sh
vendored
Executable file
72
testdata/bin/download-impala-jdbc-driver.sh
vendored
Executable file
@@ -0,0 +1,72 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# This script download the Impala jdbc driver and copy it to Hadoop FS.
|
||||
|
||||
set -euo pipefail
|
||||
. $IMPALA_HOME/bin/report_build_error.sh
|
||||
setup_report_build_error
|
||||
|
||||
. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
|
||||
|
||||
EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources
|
||||
JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers
|
||||
SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpala_JDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
|
||||
INNER_SIMBA_DRIVER_ZIP_FILENAME=ClouderaImpalaJDBC${IMPALA_SIMBA_JDBC_DRIVER_VERSION}
|
||||
DRIVER_JAR_VERSION=${IMPALA_SIMBA_JDBC_DRIVER_VERSION%-*}
|
||||
SIMBA_DRIVER_JAR_FILENAME=ImpalaJDBC${DRIVER_JAR_VERSION}.jar
|
||||
|
||||
found=$(hadoop fs -find ${JDBC_DRIVERS_HDFS_PATH} -name ${SIMBA_DRIVER_JAR_FILENAME})
|
||||
if [ ! -z "$found" ]; then
|
||||
echo "JDBC driver jar file already exists"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH}
|
||||
pushd /tmp
|
||||
|
||||
mkdir -p impala_jdbc_driver
|
||||
cd impala_jdbc_driver
|
||||
|
||||
# Download Impala jdbc driver.
|
||||
wget "https://downloads.cloudera.com/connectors/${SIMBA_DRIVER_ZIP_FILENAME}.zip"
|
||||
|
||||
# Use Python modules to unzip zip file since 'unzip' command is not available in some
|
||||
# testing environments.
|
||||
cat > unzip.py <<__EOT__
|
||||
import sys
|
||||
from zipfile import PyZipFile
|
||||
pzf = PyZipFile(sys.argv[1])
|
||||
pzf.extractall()
|
||||
__EOT__
|
||||
|
||||
# Extract driver jar file from zip file.
|
||||
python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}.zip
|
||||
python ./unzip.py ${SIMBA_DRIVER_ZIP_FILENAME}/${INNER_SIMBA_DRIVER_ZIP_FILENAME}.zip
|
||||
|
||||
# Copy driver jar file to Hadoop FS.
|
||||
hadoop fs -put -f /tmp/impala_jdbc_driver/${SIMBA_DRIVER_JAR_FILENAME} \
|
||||
${JDBC_DRIVERS_HDFS_PATH}/${SIMBA_DRIVER_JAR_FILENAME}
|
||||
|
||||
echo "Copied ${SIMBA_DRIVER_JAR_FILENAME} into HDFS ${JDBC_DRIVERS_HDFS_PATH}"
|
||||
|
||||
cd ..
|
||||
rm -rf impala_jdbc_driver
|
||||
popd
|
||||
|
||||
237
testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
vendored
Normal file
237
testdata/workloads/functional-query/queries/QueryTest/impala-ext-jdbc-tables.test
vendored
Normal file
@@ -0,0 +1,237 @@
|
||||
====
|
||||
---- QUERY
|
||||
# Create DataSource
|
||||
DROP DATA SOURCE IF EXISTS TestJdbcDataSource;
|
||||
CREATE DATA SOURCE TestJdbcDataSource
|
||||
LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
|
||||
CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
|
||||
API_VERSION 'V1';
|
||||
---- RESULTS
|
||||
'Data source has been created.'
|
||||
====
|
||||
---- QUERY
|
||||
# Show created DataSource
|
||||
SHOW DATA SOURCES LIKE 'testjdbcdatasource';
|
||||
---- LABELS
|
||||
NAME,LOCATION,CLASS NAME,API VERSION
|
||||
---- RESULTS
|
||||
'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
|
||||
---- TYPES
|
||||
STRING,STRING,STRING,STRING
|
||||
====
|
||||
---- QUERY
|
||||
# Create external JDBC DataSource table
|
||||
DROP TABLE IF EXISTS alltypes_jdbc_datasource;
|
||||
CREATE TABLE alltypes_jdbc_datasource (
|
||||
id INT,
|
||||
bool_col BOOLEAN,
|
||||
tinyint_col TINYINT,
|
||||
smallint_col SMALLINT,
|
||||
int_col INT,
|
||||
bigint_col BIGINT,
|
||||
float_col FLOAT,
|
||||
double_col DOUBLE,
|
||||
date_string_col STRING,
|
||||
string_col STRING,
|
||||
timestamp_col TIMESTAMP)
|
||||
PRODUCED BY DATA SOURCE TestJdbcDataSource(
|
||||
'{"database.type":"IMPALA",
|
||||
"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
|
||||
"jdbc.auth":"AuthMech=0",
|
||||
"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
|
||||
"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
|
||||
"dbcp.username":"impala",
|
||||
"dbcp.password":"cloudera",
|
||||
"table":"alltypes"}');
|
||||
---- RESULTS
|
||||
'Table has been created.'
|
||||
====
|
||||
---- QUERY
|
||||
# Create external JDBC DataSource table
|
||||
DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
|
||||
CREATE TABLE alltypes_jdbc_datasource_2 (
|
||||
id INT,
|
||||
bool_col BOOLEAN,
|
||||
tinyint_col TINYINT,
|
||||
smallint_col SMALLINT,
|
||||
int_col INT,
|
||||
bigint_col BIGINT,
|
||||
float_col FLOAT,
|
||||
double_col DOUBLE,
|
||||
date_string_col STRING,
|
||||
string_col STRING,
|
||||
timestamp_col TIMESTAMP)
|
||||
PRODUCED BY DATA SOURCE TestJdbcDataSource(
|
||||
'{"database.type":"IMPALA",
|
||||
"jdbc.url":"jdbc:impala://$INTERNAL_LISTEN_HOST:21050/functional",
|
||||
"jdbc.auth":"AuthMech=0",
|
||||
"jdbc.driver":"com.cloudera.impala.jdbc.Driver",
|
||||
"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/ImpalaJDBC42.jar",
|
||||
"dbcp.username":"impala",
|
||||
"dbcp.password":"cloudera",
|
||||
"table":"alltypes"}');
|
||||
---- RESULTS
|
||||
'Table has been created.'
|
||||
====
|
||||
---- QUERY
|
||||
# Test the jdbc DataSource
|
||||
# count(*) with a predicate evaluated by Impala
|
||||
# Binary predicates are pushed to the external jdbc DataSource.
|
||||
select count(*) from alltypes_jdbc_datasource
|
||||
where float_col = 0 and string_col is not NULL
|
||||
---- RESULTS
|
||||
730
|
||||
---- TYPES
|
||||
BIGINT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 1 .*
|
||||
row_regex: .*RowsRead: 730 .*
|
||||
aggregation(SUM, RowsRead): 730
|
||||
====
|
||||
---- QUERY
|
||||
# count(*) with no predicates has no materialized slots
|
||||
select count(*) from alltypes_jdbc_datasource
|
||||
---- RESULTS
|
||||
7300
|
||||
---- TYPES
|
||||
BIGINT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 1 .*
|
||||
row_regex: .*RowsRead: 7.30K .*
|
||||
aggregation(SUM, RowsRead): 7300
|
||||
====
|
||||
---- QUERY
|
||||
# Gets all types including a row with a NULL value. The binary predicates are pushed to
|
||||
# the DataSource, "order by" and "limit" are evaluated locally.
|
||||
select *
|
||||
from alltypes_jdbc_datasource
|
||||
where id > 10 and int_col< 5 order by id limit 5 offset 0
|
||||
---- RESULTS
|
||||
11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000
|
||||
12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000
|
||||
13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000
|
||||
14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000
|
||||
20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000
|
||||
---- TYPES
|
||||
INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 4 .*
|
||||
row_regex: .*RowsRead: 3.64K .*
|
||||
aggregation(SUM, RowsRead): 3644
|
||||
====
|
||||
---- QUERY
|
||||
# Gets specified columns.
|
||||
# The binary predicates are pushed to the DataSource, "order by" and "limit" are
|
||||
# evaluated locally.
|
||||
select id, bool_col, smallint_col, float_col, double_col, date_string_col
|
||||
from alltypes_jdbc_datasource
|
||||
where id > 10 and int_col< 5 order by id limit 5 offset 0
|
||||
---- RESULTS
|
||||
11,false,1,1.100000023841858,10.1,'01/02/09'
|
||||
12,true,2,2.200000047683716,20.2,'01/02/09'
|
||||
13,false,3,3.299999952316284,30.3,'01/02/09'
|
||||
14,true,4,4.400000095367432,40.4,'01/02/09'
|
||||
20,true,0,0,0,'01/03/09'
|
||||
---- TYPES
|
||||
INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 4 .*
|
||||
row_regex: .*RowsRead: 3.64K .*
|
||||
aggregation(SUM, RowsRead): 3644
|
||||
====
|
||||
---- QUERY
|
||||
# Gets specified columns from external jdbc table with case sensitive column names
|
||||
# and table name.
|
||||
# The binary predicates are pushed to the DataSource, "order by" and "limit" are
|
||||
# evaluated locally.
|
||||
select id, bool_col, smallint_col, float_col, double_col, date_string_col
|
||||
from alltypes_jdbc_datasource_2
|
||||
where id > 10 and int_col< 5 order by id limit 5 offset 0
|
||||
---- RESULTS
|
||||
11,false,1,1.100000023841858,10.1,'01/02/09'
|
||||
12,true,2,2.200000047683716,20.2,'01/02/09'
|
||||
13,false,3,3.299999952316284,30.3,'01/02/09'
|
||||
14,true,4,4.400000095367432,40.4,'01/02/09'
|
||||
20,true,0,0,0,'01/03/09'
|
||||
---- TYPES
|
||||
INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 4 .*
|
||||
row_regex: .*RowsRead: 3.64K .*
|
||||
aggregation(SUM, RowsRead): 3644
|
||||
====
|
||||
---- QUERY
|
||||
# Inner join with a non jdbc table
|
||||
# The binary predicates are pushed to the DataSource, but no predicate defined for
|
||||
# local table.
|
||||
select a.id, b.int_col
|
||||
from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id)
|
||||
where a.id = 1
|
||||
---- RESULTS
|
||||
1,1
|
||||
---- TYPES
|
||||
INT, INT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 1 .*
|
||||
row_regex: .*RowsRead: 1 .*
|
||||
aggregation(SUM, RowsRead): 7301
|
||||
====
|
||||
---- QUERY
|
||||
# Inner join with another jdbc table
|
||||
# The binary predicates are pushed to the two DataSource Nodes.
|
||||
select a.id, b.int_col
|
||||
from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id)
|
||||
where a.id < 3 group by a.id, b.int_col
|
||||
---- RESULTS
|
||||
0,0
|
||||
1,1
|
||||
2,2
|
||||
---- TYPES
|
||||
INT, INT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 1 .*
|
||||
row_regex: .*RowsRead: 3 .*
|
||||
aggregation(SUM, RowsRead): 6
|
||||
====
|
||||
---- QUERY
|
||||
# Cross join
|
||||
# The binary predicates are pushed to the two DataSource Nodes.
|
||||
select a.id, b.id
|
||||
from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b
|
||||
where (a.id < 3 and b.id < 3)
|
||||
order by a.id, b.id limit 10
|
||||
---- RESULTS
|
||||
0,0
|
||||
0,1
|
||||
0,2
|
||||
1,0
|
||||
1,1
|
||||
1,2
|
||||
2,0
|
||||
2,1
|
||||
2,2
|
||||
---- TYPES
|
||||
INT, INT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: .*NumExternalDataSourceGetNext: 1 .*
|
||||
row_regex: .*RowsRead: 3 .*
|
||||
aggregation(SUM, RowsRead): 6
|
||||
====
|
||||
---- QUERY
|
||||
# Drop table
|
||||
DROP TABLE alltypes_jdbc_datasource;
|
||||
---- RESULTS
|
||||
'Table has been dropped.'
|
||||
====
|
||||
---- QUERY
|
||||
# Drop table
|
||||
DROP TABLE alltypes_jdbc_datasource_2;
|
||||
---- RESULTS
|
||||
'Table has been dropped.'
|
||||
====
|
||||
---- QUERY
|
||||
# Drop DataSource
|
||||
DROP DATA SOURCE TestJdbcDataSource;
|
||||
---- RESULTS
|
||||
'Data source has been dropped.'
|
||||
====
|
||||
@@ -109,3 +109,37 @@ class TestMySqlExtJdbcTables(CustomClusterTestSuite):
|
||||
def test_mysql_ext_jdbc_tables(self, vector, unique_database):
|
||||
"""Run tests for external jdbc tables on MySQL"""
|
||||
self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database)
|
||||
|
||||
|
||||
class TestImpalaExtJdbcTables(CustomClusterTestSuite):
|
||||
"""Impala query tests for external jdbc tables in Impala cluster."""
|
||||
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
return 'functional-query'
|
||||
|
||||
@classmethod
|
||||
def _download_impala_jdbc_driver(cls):
|
||||
# Download Impala jdbc driver and copy jdbc driver to HDFS.
|
||||
script = os.path.join(
|
||||
os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh')
|
||||
run_cmd = [script]
|
||||
try:
|
||||
subprocess.check_call(run_cmd, close_fds=True)
|
||||
except subprocess.CalledProcessError:
|
||||
assert False, "Failed to download Impala JDBC driver"
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
cls._download_impala_jdbc_driver()
|
||||
super(TestImpalaExtJdbcTables, cls).setup_class()
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
super(TestImpalaExtJdbcTables, cls).teardown_class()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_impala_ext_jdbc_tables(self, vector, unique_database):
|
||||
"""Run tests for external jdbc tables in Impala cluster"""
|
||||
self.run_test_case(
|
||||
'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)
|
||||
|
||||
Reference in New Issue
Block a user