IMPALA-5741: Initial support for reading tiny RDBMS tables

This patch uses the "external data source" mechanism in Impala to
implement data source for querying JDBC.
It has some limitations due to the restrictions of "external data
source":
  - It is not distributed, e.g, fragment is unpartitioned. The queries
    are executed on coordinator.
  - Queries which read following data types from external JDBC tables
    are not supported:
    BINARY, CHAR, DATETIME, and COMPLEX.
  - Only support binary predicates with operators =, !=, <=, >=,
    <, > to be pushed to RDBMS.
  - Following data types are not supported for predicates:
    DECIMAL, TIMESTAMP, DATE, and BINARY.
  - External tables with complex types of columns are not supported.
  - Support is limited to the following databases:
    MySQL, Postgres, Oracle, MSSQL, H2, DB2, and JETHRO_DATA.
  - Catalog V2 is not supported (IMPALA-7131).
  - DataSource objects are not persistent (IMPALA-12375).

Additional fixes are planned on top of this patch.

Source files under jdbc/conf, jdbc/dao and jdbc/exception are
replicated from Hive JDBC Storage Handler.

In order to query the RDBMS tables, the following steps should be
followed (note that existing data source table will be rebuilt):
1. Make sure the Impala cluster has been started.

2. Copy the jar files of JDBC drivers and the data source library into
HDFS.
${IMPALA_HOME}/testdata/bin/copy-ext-data-sources.sh

3. Create an `alltypes` table in the Postgres database.
${IMPALA_HOME}/testdata/bin/load-ext-data-sources.sh

4. Create data source tables (alltypes_jdbc_datasource and
alltypes_jdbc_datasource_2).
${IMPALA_HOME}/bin/impala-shell.sh -f\
  ${IMPALA_HOME}/testdata/bin/create-ext-data-source-table.sql

5. It's ready to run query to access data source tables created
in last step. Don't need to restart Impala cluster.

Testing:
 - Added unit-test for Postgres and ran unit-test with JDBC driver
   postgresql-42.5.1.jar.
 - Ran manual unit-test for MySql with JDBC driver
   mysql-connector-j-8.1.0.jar.
 - Ran core tests successfully.

Change-Id: I8244e978c7717c6f1452f66f1630b6441392e7d2
Reviewed-on: http://gerrit.cloudera.org:8080/17842
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Fucun Chu
2021-08-31 09:50:44 +08:00
committed by Impala Public Jenkins
parent 8b2598cd70
commit c2bd30a1b3
33 changed files with 2469 additions and 67 deletions

View File

@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.impala</groupId>
<artifactId>impala-data-source</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<artifactId>impala-data-source-jdbc</artifactId>
<name>Apache Impala External Data Source JDBC Library</name>
<description>JDBC External Data Source</description>
<packaging>jar</packaging>
<url>.</url>
<properties>
<commons-dbcp2.version>2.9.0</commons-dbcp2.version>
<h2database.version>1.3.166</h2database.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.impala</groupId>
<artifactId>impala-data-source-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.impala</groupId>
<artifactId>impala-frontend</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>${commons-dbcp2.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2database.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,338 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager;
import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessor;
import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessorFactory;
import org.apache.impala.extdatasource.jdbc.dao.JdbcRecordIterator;
import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
import org.apache.impala.extdatasource.jdbc.util.QueryConditionUtil;
import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
import org.apache.impala.extdatasource.thrift.TCloseParams;
import org.apache.impala.extdatasource.thrift.TCloseResult;
import org.apache.impala.extdatasource.thrift.TColumnDesc;
import org.apache.impala.extdatasource.thrift.TComparisonOp;
import org.apache.impala.extdatasource.thrift.TGetNextParams;
import org.apache.impala.extdatasource.thrift.TGetNextResult;
import org.apache.impala.extdatasource.thrift.TOpenParams;
import org.apache.impala.extdatasource.thrift.TOpenResult;
import org.apache.impala.extdatasource.thrift.TPrepareParams;
import org.apache.impala.extdatasource.thrift.TPrepareResult;
import org.apache.impala.extdatasource.thrift.TRowBatch;
import org.apache.impala.extdatasource.thrift.TTableSchema;
import org.apache.impala.extdatasource.v1.ExternalDataSource;
import org.apache.impala.thrift.TColumnData;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* The Jdbc data source returns the data of the underlying database configured by
* initString.
*/
public class JdbcDataSource implements ExternalDataSource {
private final static Logger LOG = LoggerFactory.getLogger(JdbcDataSource.class);
/**
* @see org.apache.impala.extdatasource.ExternalDataSourceExecutor
*/
private final static String CACHE_CLASS_PREFIX = "CACHE_CLASS::";
private static final TStatus STATUS_OK =
new TStatus(TErrorCode.OK, Lists.newArrayList());
private boolean eos_;
private int batchSize_;
private TTableSchema schema_;
private DataSourceState state_;
// Handle to identify JdbcDataSource object.
// It's assigned with random UUID value in open() API, and returned to the caller.
// In getNext() and close() APIs, compare this value with the handle value passed in
// input parameters to make sure the object is valid.
private String scanHandle_;
// Set to true if initString started with "CACHE_CLASS::".
// It is passed to DatabaseAccessor::close() to indicate if dataSourceCache should be
// cleaned when DatabaseAccessor object is closed.
private boolean cacheClass_ = false;
// Properties of external jdbc table, converted from initString which is specified in
// create table statement.
// Supported properties are defined in JdbcStorageConfig.
private Configuration tableConfig_;
private DatabaseAccessor dbAccessor_ = null;
// iterator_ is used when schema_.getColsSize() does not equal 0.
private JdbcRecordIterator iterator_ = null;
// currRow_ and totalNumberOfRecords_ are used when schema_.getColsSize() equals 0.
private int currRow_;
private long totalNumberOfRecords_ = 0;
// Enumerates the states of the data source, which indicates which ExternalDataSource
// API has been called. The states are checked in each API to make sure that the APIs
// are called in right order, e.g. state transitions must be in the below order:
// CREATED -> OPENED -> CLOSED.
// Note that the ExternalDataSourceExecutors of frontend and backend will create
// separate JdbcDataSource objects so that the state of JdbcDataSource which is set
// by frontend will not be transferred to backend. The prepare() is called by frontend,
// open(), getNext() and close() are called by backend. We don't need to change state
// in prepare() since the state will not be transferred to other APIs, and the input
// state for open() must be 'CREATED'.
private enum DataSourceState {
// The object is created.
CREATED,
// The open() API is called.
OPENED,
// The close() API is called.
CLOSED
}
public JdbcDataSource() {
eos_ = false;
currRow_ = 0;
state_ = DataSourceState.CREATED;
}
@Override
public TPrepareResult prepare(TPrepareParams params) {
Preconditions.checkState(state_ == DataSourceState.CREATED);
if (!convertInitStringToConfiguration(params.getInit_string())) {
return new TPrepareResult(
new TStatus(TErrorCode.INTERNAL_ERROR,
Lists.newArrayList("Invalid init_string value")));
}
List<Integer> acceptedPredicates = acceptedPredicates(params.getPredicates());
return new TPrepareResult(STATUS_OK)
.setAccepted_conjuncts(acceptedPredicates);
}
@Override
public TOpenResult open(TOpenParams params) {
Preconditions.checkState(state_ == DataSourceState.CREATED);
state_ = DataSourceState.OPENED;
batchSize_ = params.getBatch_size();
schema_ = params.getRow_schema();
// 1. Check init string again because the call in prepare() was from
// the frontend and used a different instance of this JdbcDataSource class.
if (!convertInitStringToConfiguration(params.getInit_string())) {
return new TOpenResult(
new TStatus(TErrorCode.INTERNAL_ERROR,
Lists.newArrayList("Invalid init_string value")));
}
// 2. Build the query and execute it
try {
dbAccessor_ = DatabaseAccessorFactory.getAccessor(tableConfig_);
buildQueryAndExecute(params);
} catch (JdbcDatabaseAccessException e) {
return new TOpenResult(
new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(e.getMessage())));
}
scanHandle_ = UUID.randomUUID().toString();
return new TOpenResult(STATUS_OK).setScan_handle(scanHandle_);
}
@Override
public TGetNextResult getNext(TGetNextParams params) {
Preconditions.checkState(state_ == DataSourceState.OPENED);
Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
if (eos_) return new TGetNextResult(STATUS_OK).setEos(eos_);
List<TColumnData> cols = Lists.newArrayList();
long numRows = 0;
if (schema_.getColsSize() != 0) {
if (iterator_ == null) {
return new TGetNextResult(
new TStatus(TErrorCode.INTERNAL_ERROR,
Lists.newArrayList("Iterator of JDBC resultset is null")));
}
for (int i = 0; i < schema_.getColsSize(); ++i) {
cols.add(new TColumnData().setIs_null(Lists.newArrayList()));
}
boolean hasNext = true;
try {
while (numRows < batchSize_ && (hasNext = iterator_.hasNext())) {
iterator_.next(schema_.getCols(), cols);
++numRows;
}
} catch (Exception e) {
hasNext = false;
}
if (!hasNext) eos_ = true;
} else { // for count(*)
if (currRow_ + batchSize_ <= totalNumberOfRecords_) {
numRows = batchSize_;
} else {
numRows = totalNumberOfRecords_ - currRow_;
}
currRow_ += numRows;
if (currRow_ == totalNumberOfRecords_) eos_ = true;
}
return new TGetNextResult(STATUS_OK).setEos(eos_)
.setRows(new TRowBatch().setCols(cols).setNum_rows(numRows));
}
@Override
public TCloseResult close(TCloseParams params) {
Preconditions.checkState(state_ == DataSourceState.OPENED);
Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
try {
if (iterator_ != null) iterator_.close();
if (dbAccessor_ != null) dbAccessor_.close(!cacheClass_);
state_ = DataSourceState.CLOSED;
return new TCloseResult(STATUS_OK);
} catch (Exception e) {
return new TCloseResult(
new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(e.getMessage())));
}
}
protected boolean convertInitStringToConfiguration(String initString) {
Preconditions.checkState(initString != null);
if (tableConfig_ == null) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {
};
if (initString.startsWith(CACHE_CLASS_PREFIX)) {
initString = initString.substring(CACHE_CLASS_PREFIX.length());
cacheClass_ = true;
}
Map<String, String> config = new ObjectMapper().readValue(initString, typeRef);
tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config);
} catch (JsonProcessingException e) {
String errorMessage = String
.format("Invalid JSON from initString_ '%s'", initString);
LOG.error(errorMessage, e);
return false;
}
}
return true;
}
private List<Integer> acceptedPredicates(List<List<TBinaryPredicate>> predicates) {
// Return the indexes of accepted predicates.
List<Integer> acceptedPredicates = Lists.newArrayList();
if (predicates == null || predicates.isEmpty()) {
return acceptedPredicates;
}
for (int i = 0; i < predicates.size(); ++i) {
boolean accepted = true;
for (TBinaryPredicate predicate : predicates.get(i)) {
// Don't support 'IS DISTINCT FROM' and 'IS NOT DISTINCT FROM' operators now.
if (predicate.getOp() == TComparisonOp.DISTINCT_FROM
|| predicate.getOp() == TComparisonOp.NOT_DISTINCT) {
accepted = false;
break;
}
}
if (accepted) acceptedPredicates.add(i);
}
return acceptedPredicates;
}
private void buildQueryAndExecute(TOpenParams params)
throws JdbcDatabaseAccessException {
Map<String, String> columnMapping = getColumnMapping(tableConfig_
.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName()));
// Build query statement
StringBuilder sb = new StringBuilder("SELECT ");
String project;
// If cols size equals to 0, it is 'select count(*) from tbl' statement.
if (schema_.getColsSize() == 0) {
project = "*";
} else {
project =
schema_.getCols().stream().map(
TColumnDesc::getName).map(
name -> columnMapping.getOrDefault(name, name))
.collect(Collectors.joining(", "));
}
sb.append(project);
sb.append(" FROM ");
// Make jdbc table name to be quoted with double quotes if columnMapping is not empty
String jdbcTableName = tableConfig_.get(JdbcStorageConfig.TABLE.getPropertyName());
if (!columnMapping.isEmpty() && jdbcTableName.charAt(0) != '\"') {
StringBuilder sb2 = new StringBuilder("\"");
sb2.append(jdbcTableName);
sb2.append("\"");
jdbcTableName = sb2.toString();
}
sb.append(jdbcTableName);
String condition = QueryConditionUtil
.buildCondition(params.getPredicates(), columnMapping);
if (StringUtils.isNotBlank(condition)) {
sb.append(" WHERE ").append(condition);
}
// Execute query and get iterator
tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
if (schema_.getColsSize() != 0) {
int limit = -1;
if (params.isSetLimit()) limit = (int) params.getLimit();
iterator_ = dbAccessor_.getRecordIterator(tableConfig_, limit, 0);
} else {
totalNumberOfRecords_ = dbAccessor_.getTotalNumberOfRecords(tableConfig_);
}
}
/*
* Return Impala-to-X column mapping, or empty if it is not set.
*
*/
private Map<String, String> getColumnMapping(String columnMapping) {
if ((columnMapping == null) || (columnMapping.trim().isEmpty())) {
return Maps.newHashMap();
}
Map<String, String> columnMap = Maps.newHashMap();
String[] mappingPairs = columnMapping.split(",");
for (String mapPair : mappingPairs) {
String[] columns = mapPair.split("=");
// Make jdbc column name to be quoted with double quotes
String jdbcColumnName = columns[1].trim();
if (!jdbcColumnName.isEmpty() && jdbcColumnName.charAt(0) != '\"') {
StringBuilder sb = new StringBuilder("\"");
sb.append(jdbcColumnName);
sb.append("\"");
jdbcColumnName = sb.toString();
}
columnMap.put(columns[0].trim(), jdbcColumnName);
}
return columnMap;
}
}

View File

@@ -0,0 +1,60 @@
This JDBC External Data Source library is implemented with "External Data Source" mechanism
to query the JDBC table from Impala server.
Following two source files consists Impala specific logic:
JdbcDataSource.java
util/QueryConditionUtil.java
Other source files, which add supports to access external database tables through JDBC
drivers, are replicated from Hive JDBC Storage Handler with some modifications:
(https://github.com/apache/hive/tree/master/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc)
conf/DatabaseType.java
remove dbType for HIVE, DERBY, and METASTORE.
conf/JdbcStorageConfig.java
don't use org.apache.hadoop.hive.conf.Constants
conf/JdbcStorageConfigManager.java
add functions: convertMapToConfiguration(), getQueryToExecute(),
getOrigQueryToExecute()
remove functions: copyConfigurationToJob(), countNonNull(),
getPasswordFromProperties(), copySecretsToJob(),
convertPropertiesToConfiguration(), resolveMetadata(), getMetastoreDatabaseType(),
getMetastoreConnectionURL(), getMetastoreDriver(), getMetastoreJdbcUser(),
getMetastoreJdbcPasswd().
modify functions: checkRequiredPropertiesAreDefined()
dao/DB2DatabaseAccessor.java
remove function constructQuery()
dao/DatabaseAccessor.java
remoce following APIs:
getColumnNames(), getColumnTypes(), getRecordWriter(), getBounds(),
needColumnQuote().
remove following parameters for API getRecordIterator():
'partitionColumn', 'lowerBound', and 'upperBound'.
dao/DatabaseAccessorFactory.java
remove dbType for HIVE, DERBY, and METASTORE.
dao/GenericJdbcDatabaseAccessor.java
add member variable: dataSourceCache
remove member variable typeInfoTranslator
remove functions: getColumnMetadata(), getColumnMetadata(), getColumnNames()
getColumnTypes(), getColNamesFromRS(), getColTypesFromRS(),
getMetaDataQuery(), getRecordWriter(), constructQuery(),
addBoundaryToQuery(), removeDbcpPrefix(), getFromProperties(), getBounds(),
quote(), needColumnQuote(), getQualifiedTableName(), selectAllFromTable(),
finalize().
remove following parameters for API getRecordIterator():
'partitionColumn', 'lowerBound', and 'upperBound'.
Modify functions: close().
dao/JdbcRecordIterator.java
remove member variable accessor, remove parameter 'accessor' from ctor,
remove functions: remove()
modify functions: next(), close()
dao/JethroDatabaseAccessor.java
remove getMetaDataQuery()
dao/MsSqlDatabaseAccessor.java
dao/MySqlDatabaseAccessor.java
remove function needColumnQuote()
dao/OracleDatabaseAccessor.java
remove function constructQuery()
dao/PostgresDatabaseAccessor.java
exception/JdbcDatabaseAccessException.java
renamed from exception/HiveJdbcDatabaseAccessException.java

View File

@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.conf;
public enum DatabaseType {
MYSQL,
H2,
DB2,
ORACLE,
POSTGRES,
MSSQL,
JETHRO_DATA
}

View File

@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.conf;
public enum JdbcStorageConfig {
// Table properties specified in the create table statement.
// The database from which the external table comes, such as MySQL, ORACLE, POSTGRES,
// and MSSQL, etc.
DATABASE_TYPE("database.type", true),
// JDBC connection string, including the database type, IP address, port number, and
// database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
JDBC_URL("jdbc.url", true),
// Class name of JDBC driver. For example, "org.postgresql.Driver"
JDBC_DRIVER_CLASS("jdbc.driver", true),
// Driver URL for downloading the Jar file package that is used to access the external
// database.
JDBC_DRIVER_URL("driver.url", true),
// Username for accessing the external database.
DBCP_USERNAME("dbcp.username", false),
// Password of the user.
DBCP_PASSWORD("dbcp.password", false),
// Number of rows to fetch in a batch.
JDBC_FETCH_SIZE("jdbc.fetch.size", false),
// SQL query which specify how to get data from external database.
// User need to specify either “table” or “query” in the create table statement.
QUERY("query", false),
// Name of the external table to be mapped in Impala.
TABLE("table", true),
// Mapping of column names between external table and Impala.
COLUMN_MAPPING("column.mapping", false);
private final String propertyName;
private boolean required = false;
JdbcStorageConfig(String propertyName, boolean required) {
this.propertyName = propertyName;
this.required = required;
}
public String getPropertyName() {
return propertyName;
}
public boolean isRequired() {
return required;
}
}

View File

@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.conf;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main configuration handler class
*/
public class JdbcStorageConfigManager {
private static final Logger LOG = LoggerFactory
.getLogger(JdbcStorageConfigManager.class);
public static Configuration convertMapToConfiguration(Map<String, String> props) {
checkRequiredPropertiesAreDefined(props);
Configuration conf = new Configuration();
for (Entry<String, String> entry : props.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
return conf;
}
private static void checkRequiredPropertiesAreDefined(Map<String, String> props) {
try {
String dbTypeName = props.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName());
DatabaseType.valueOf(dbTypeName);
} catch (Exception e) {
throw new IllegalArgumentException("Unknown database type.", e);
}
// Check the required parameters
for (JdbcStorageConfig config : JdbcStorageConfig.values()) {
if (config.isRequired() && !props.containsKey(config.getPropertyName())) {
throw new IllegalArgumentException(String.format("Required config '%s' was not "
+ "present!", config.getPropertyName()));
}
}
}
public static String getConfigValue(JdbcStorageConfig key, Configuration config) {
return config.get(key.getPropertyName());
}
public static String getOrigQueryToExecute(Configuration config) {
String query;
String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
if (tableName != null) {
// We generate query as 'select * from tbl'
query = "select * from " + tableName;
} else {
query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
}
return query;
}
public static String getQueryToExecute(Configuration config) {
String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
if (query != null) {
// Query has been defined, return it
return query;
}
// We generate query as 'select * from tbl'
String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
query = "select * from " + tableName;
return query;
}
private static boolean isEmptyString(String value) {
return ((value == null) || (value.trim().isEmpty()));
}
}

View File

@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* DB2 specific data accessor. DB2 JDBC drivers works similar to Postgres, so the current
* implementation of DB2DatabaseAccessor is the same as PostgresDatabaseAccessor
*/
public class DB2DatabaseAccessor extends GenericJdbcDatabaseAccessor {
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
if (limit == -1) {
return sql;
}
return sql + " LIMIT " + limit + " OFFSET " + offset;
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
if (limit == -1) {
return sql;
}
return sql + " LIMIT " + limit;
}
}

View File

@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
public interface DatabaseAccessor {
int getTotalNumberOfRecords(Configuration conf)
throws JdbcDatabaseAccessException;
JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset)
throws JdbcDatabaseAccessException;
void close(boolean cleanCache);
}

View File

@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.extdatasource.jdbc.conf.DatabaseType;
import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
/**
* Factory for creating the correct DatabaseAccessor class for the job
*/
public class DatabaseAccessorFactory {
private DatabaseAccessorFactory() {
}
public static DatabaseAccessor getAccessor(DatabaseType dbType) {
DatabaseAccessor accessor;
switch (dbType) {
case MYSQL:
accessor = new MySqlDatabaseAccessor();
break;
case JETHRO_DATA:
accessor = new JethroDatabaseAccessor();
break;
case POSTGRES:
accessor = new PostgresDatabaseAccessor();
break;
case ORACLE:
accessor = new OracleDatabaseAccessor();
break;
case MSSQL:
accessor = new MsSqlDatabaseAccessor();
break;
case DB2:
accessor = new DB2DatabaseAccessor();
break;
default:
accessor = new GenericJdbcDatabaseAccessor();
break;
}
return accessor;
}
public static DatabaseAccessor getAccessor(Configuration conf) {
DatabaseType dbType = DatabaseType.valueOf(
conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()).toUpperCase());
return getAccessor(dbType);
}
}

View File

@@ -0,0 +1,299 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager;
import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TCacheJarResult;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
/**
* A data accessor that should in theory work with all JDBC compliant database drivers.
*/
public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
protected static final Logger LOG = LoggerFactory
.getLogger(GenericJdbcDatabaseAccessor.class);
protected static final String DBCP_CONFIG_PREFIX = "dbcp";
protected static final int DEFAULT_FETCH_SIZE = 1000;
protected static final int CACHE_EXPIRE_TIMEOUT_S = 1800;
protected static final int CACHE_SIZE = 100;
protected DataSource dbcpDataSource = null;
// Cache datasource for sharing
public static Cache<String, DataSource> dataSourceCache = CacheBuilder
.newBuilder()
.removalListener((RemovalListener<String, DataSource>) notification -> {
DataSource ds = notification.getValue();
if (ds instanceof BasicDataSource) {
BasicDataSource dbcpDs = (BasicDataSource) ds;
try {
dbcpDs.close();
LOG.info("Close datasource for '{}'.", notification.getKey());
} catch (SQLException e) {
LOG.warn("Caught exception during datasource cleanup.", e);
}
}
})
.expireAfterAccess(CACHE_EXPIRE_TIMEOUT_S, TimeUnit.SECONDS)
.maximumSize(CACHE_SIZE)
.build();
@Override
public int getTotalNumberOfRecords(Configuration conf)
throws JdbcDatabaseAccessException {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
initializeDatabaseSource(conf);
String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
// TODO: If a target database cannot flatten this view query, try to text
// replace the generated "select *".
String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable";
LOG.info("Query to execute is [{}]", countQuery);
conn = dbcpDataSource.getConnection();
ps = conn.prepareStatement(countQuery);
rs = ps.executeQuery();
if (rs.next()) {
return rs.getInt(1);
} else {
LOG.warn("The count query '{}' did not return any results.", countQuery);
throw new JdbcDatabaseAccessException(
"Count query did not return any results.");
}
} catch (JdbcDatabaseAccessException he) {
throw he;
} catch (Exception e) {
LOG.error("Caught exception while trying to get the number of records", e);
throw new JdbcDatabaseAccessException(e);
} finally {
cleanupResources(conn, ps, rs);
}
}
@Override
public JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset)
throws JdbcDatabaseAccessException {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
initializeDatabaseSource(conf);
String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
String partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
LOG.info("Query to execute is [{}]", partitionQuery);
conn = dbcpDataSource.getConnection();
ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(getFetchSize(conf));
rs = ps.executeQuery();
return new JdbcRecordIterator(conn, ps, rs, conf);
} catch (Exception e) {
LOG.error("Caught exception while trying to execute query", e);
cleanupResources(conn, ps, rs);
throw new JdbcDatabaseAccessException(
"Caught exception while trying to execute query:" + e.getMessage(), e);
}
}
@Override
public void close(boolean cleanCache) {
dbcpDataSource = null;
if (cleanCache && dataSourceCache != null) {
dataSourceCache.invalidateAll();
dataSourceCache = null;
}
}
/**
* Uses generic JDBC escape functions to add a limit and offset clause to a query
* string
*
* @param sql
* @param limit
* @param offset
* @return
*/
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else if (limit != -1) {
return sql + " {LIMIT " + limit + " OFFSET " + offset + "}";
} else {
return sql + " {OFFSET " + offset + "}";
}
}
/*
* Uses generic JDBC escape functions to add a limit clause to a query string
*/
protected String addLimitToQuery(String sql, int limit) {
if (limit == -1) {
return sql;
}
return sql + " {LIMIT " + limit + "}";
}
protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) {
try {
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
LOG.warn("Caught exception during resultset cleanup.", e);
}
try {
if (ps != null) {
ps.close();
}
} catch (SQLException e) {
LOG.warn("Caught exception during statement cleanup.", e);
}
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
LOG.warn("Caught exception during connection cleanup.", e);
}
}
protected void initializeDatabaseSource(Configuration conf)
throws ExecutionException {
if (dbcpDataSource == null) {
synchronized (this) {
if (dbcpDataSource == null) {
Properties props = getConnectionPoolProperties(conf);
String jdbcUrl = props.getProperty("url");
String username = props.getProperty("username", "-");
String cacheMapKey = String.format("%s.%s", jdbcUrl, username);
dbcpDataSource = dataSourceCache.get(cacheMapKey,
() -> {
LOG.info("Datasource for '{}' was not cached. "
+ "Loading now.", cacheMapKey);
BasicDataSource basicDataSource =
BasicDataSourceFactory.createDataSource(props);
// Put jdbc driver to cache
String driverUrl = props.getProperty("driverUrl");
TCacheJarResult cacheResult = FeSupport.CacheJar(driverUrl);
TStatus cacheJarStatus = cacheResult.getStatus();
if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
throw new JdbcDatabaseAccessException(String.format(
"Unable to cache jdbc driver jar at location '%s'. " +
"Check that the file exists and is readable. Message: %s",
driverUrl, cacheJarStatus.getError_msgs()));
}
String driverLocalPath = cacheResult.getLocal_path();
// Create class loader for jdbc driver and set it for the
// BasicDataSource object so that the driver class could be loaded
// from jar file without searching classpath.
URL driverJarUrl = new File(driverLocalPath).toURI().toURL();
URLClassLoader driverLoader =
URLClassLoader.newInstance( new URL[] { driverJarUrl },
getClass().getClassLoader());
basicDataSource.setDriverClassLoader(driverLoader);
return basicDataSource;
});
}
}
}
}
protected Properties getConnectionPoolProperties(Configuration conf) {
// Create the default properties object
Properties dbProperties = getDefaultDBCPProperties();
// user properties
Map<String, String> userProperties = conf.getValByRegex(DBCP_CONFIG_PREFIX + "\\.*");
if ((userProperties != null) && (!userProperties.isEmpty())) {
for (Entry<String, String> entry : userProperties.entrySet()) {
dbProperties.put(entry.getKey().replaceFirst(DBCP_CONFIG_PREFIX + "\\.", ""),
entry.getValue());
}
}
// essential properties
dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
dbProperties.put("driverClassName",
conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
dbProperties.put("driverUrl",
conf.get(JdbcStorageConfig.JDBC_DRIVER_URL.getPropertyName()));
dbProperties.put("type", "javax.sql.DataSource");
return dbProperties;
}
protected Properties getDefaultDBCPProperties() {
Properties props = new Properties();
// Don't set 'initialSize', otherwise the driver class will be loaded in
// BasicDataSourceFactory.createDataSource() before the class loader is set
// by calling BasicDataSource.setDriverClassLoader.
// props.put("initialSize", "1");
props.put("maxActive", "3");
props.put("maxIdle", "0");
props.put("maxWait", "10000");
props.put("timeBetweenEvictionRunsMillis", "30000");
return props;
}
protected int getFetchSize(Configuration conf) {
return conf
.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE);
}
}

View File

@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
import org.apache.impala.extdatasource.thrift.TColumnDesc;
import org.apache.impala.extdatasource.util.SerializationUtils;
import org.apache.impala.thrift.TColumnData;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TScalarType;
import org.apache.impala.thrift.TTypeNodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* An iterator that allows iterating through a SQL resultset. Includes methods to clear up
* resources.
*/
public class JdbcRecordIterator {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcRecordIterator.class);
private final Connection conn;
private final PreparedStatement ps;
private final ResultSet rs;
private final List<String> jdbcColumnNames;
public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs,
Configuration conf) throws JdbcDatabaseAccessException {
this.conn = conn;
this.ps = ps;
this.rs = rs;
try {
ResultSetMetaData metadata = rs.getMetaData();
int numColumns = metadata.getColumnCount();
List<String> columnNames = new ArrayList<>(numColumns);
List<Integer> jdbcColumnTypes = new ArrayList<>(numColumns);
for (int i = 0; i < numColumns; i++) {
columnNames.add(metadata.getColumnName(i + 1));
jdbcColumnTypes.add(metadata.getColumnType(i + 1));
}
jdbcColumnNames = columnNames;
} catch (Exception e) {
LOGGER.error("Error while trying to get column names.", e);
throw new JdbcDatabaseAccessException(
"Error while trying to get column names: " + e.getMessage(), e);
}
LOGGER.debug("Iterator ColumnNames = {}", jdbcColumnNames);
}
public boolean hasNext() throws JdbcDatabaseAccessException {
try {
return rs.next();
} catch (Exception e) {
LOGGER.warn("hasNext() threw exception", e);
throw new JdbcDatabaseAccessException(
"Error while retrieving next batch of rows: " + e.getMessage(), e);
}
}
public void next(List<TColumnDesc> colDescs, List<TColumnData> colDatas) {
Preconditions.checkState(colDescs.size() == colDatas.size());
for (int i = 0; i < colDescs.size(); ++i) {
TColumnType type = colDescs.get(i).getType();
TColumnData colData = colDatas.get(i);
if (type.types.get(0).getType() != TTypeNodeType.SCALAR) {
// Unsupported non-scalar type.
throw new UnsupportedOperationException("Unsupported column type: " +
type.types.get(0).getType());
}
Preconditions.checkState(type.getTypesSize() == 1);
TScalarType scalarType = type.types.get(0).scalar_type;
try {
Object value = rs.getObject(i + 1);
if (value == null) {
colData.addToIs_null(true);
continue;
}
switch (scalarType.type) {
case TINYINT:
colData.addToByte_vals(rs.getByte(i + 1));
break;
case SMALLINT:
colData.addToShort_vals(rs.getShort(i + 1));
break;
case INT:
colData.addToInt_vals(rs.getInt(i + 1));
break;
case DATE:
LocalDate localDate = Instant.ofEpochMilli(rs.getDate(i + 1).getTime())
.atZone(ZoneId.systemDefault())
.toLocalDate();
colData.addToInt_vals((int) localDate.toEpochDay());
break;
case BIGINT:
colData.addToLong_vals(rs.getLong(i + 1));
break;
case DOUBLE:
colData.addToDouble_vals(rs.getDouble(i + 1));
break;
case FLOAT:
colData.addToDouble_vals(rs.getFloat(i + 1));
break;
case STRING:
colData.addToString_vals(rs.getString(i + 1));
break;
case BOOLEAN:
colData.addToBool_vals(rs.getBoolean(i + 1));
break;
case TIMESTAMP:
// Use UTC time zone instead of system default time zone
colData.addToBinary_vals(
SerializationUtils.encodeTimestamp(rs.getTimestamp(i + 1,
Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)))));
break;
case DECIMAL:
BigDecimal val = rs.getBigDecimal(i + 1);
colData.addToBinary_vals(
SerializationUtils.encodeDecimal(new BigDecimal(val.byteValue())));
break;
case BINARY:
case CHAR:
case DATETIME:
case INVALID_TYPE:
case NULL_TYPE:
default:
// Unsupported.
throw new UnsupportedOperationException("Unsupported column type: " +
scalarType.getType());
}
colData.addToIs_null(false);
} catch (SQLException throwables) {
colData.addToIs_null(true);
}
}
}
/**
* Release all DB resources
*/
public void close() throws JdbcDatabaseAccessException {
try {
rs.close();
ps.close();
conn.close();
} catch (Exception e) {
LOGGER.warn("Caught exception while trying to close database objects", e);
throw new JdbcDatabaseAccessException(
"Error while releasing database resources: " + e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* JethroData specific data accessor. This is needed because JethroData JDBC drivers do
* not support generic LIMIT and OFFSET escape functions, and has some special
* optimization for getting the query metadata using limit 0.
*/
public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor {
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
return sql + " LIMIT " + offset + "," + limit;
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
return "Select * from (" + sql + ") as \"tmp\" limit " + limit;
}
}

View File

@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* MSSQL specific data accessor. This is needed because MSSQL JDBC drivers do not support
* generic LIMIT and OFFSET escape functions
*/
public class MsSqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
if (limit == -1) {
return sql;
}
// Order by is not necessary, but MS SQL require it to use FETCH
return sql + " ORDER BY 1 OFFSET " + offset + " ROWS FETCH NEXT " + limit
+ " ROWS ONLY";
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
if (limit == -1) {
return sql;
}
return sql + " {LIMIT " + limit + "}";
}
}

View File

@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* MySQL specific data accessor. This is needed because MySQL JDBC drivers do not support
* generic LIMIT and OFFSET escape functions
*/
public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
if (limit != -1) {
return sql + " LIMIT " + offset + "," + limit;
} else {
return sql;
}
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
if (limit != -1) {
return sql + " LIMIT " + limit;
} else {
return sql;
}
}
}

View File

@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* Oracle specific data accessor. This is needed because Oracle JDBC drivers do not
* support generic LIMIT and OFFSET escape functions
*/
public class OracleDatabaseAccessor extends GenericJdbcDatabaseAccessor {
// Random column name to reduce the chance of conflict
static final String ROW_NUM_COLUMN_NAME = "dummy_rownum_col_rn1938392";
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
if (limit == -1) {
return sql;
}
// A simple ROWNUM > offset and ROWNUM <= (offset + limit) won't work, it will
// return nothing
return "SELECT * FROM (SELECT t.*, ROWNUM AS " + ROW_NUM_COLUMN_NAME + " FROM ("
+ sql + ") t) WHERE "
+ ROW_NUM_COLUMN_NAME + " >" + offset + " AND " + ROW_NUM_COLUMN_NAME + " <="
+ (offset + limit);
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
if (limit == -1) {
return sql;
}
return "SELECT * FROM (" + sql + ") WHERE ROWNUM <= " + limit;
}
}

View File

@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.dao;
/**
* Postgres specific data accessor. Postgres JDBC drivers do not support generic LIMIT and
* OFFSET escape functions
*/
public class PostgresDatabaseAccessor extends GenericJdbcDatabaseAccessor {
@Override
protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) {
if (offset == 0) {
return addLimitToQuery(sql, limit);
} else {
if (limit == -1) {
return sql;
}
return sql + " LIMIT " + limit + " OFFSET " + offset;
}
}
@Override
protected String addLimitToQuery(String sql, int limit) {
if (limit == -1) {
return sql;
}
return sql + " LIMIT " + limit;
}
}

View File

@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.exception;
public class JdbcDatabaseAccessException extends Exception {
private static final long serialVersionUID = -4106595742876276803L;
public JdbcDatabaseAccessException() {
super();
}
public JdbcDatabaseAccessException(String message, Throwable cause) {
super(message, cause);
}
public JdbcDatabaseAccessException(String message) {
super(message);
}
public JdbcDatabaseAccessException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc.util;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
import org.apache.impala.extdatasource.thrift.TComparisonOp;
import org.apache.impala.thrift.TColumnValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Translates the impala query predicates into a condition that can be run on the
* underlying database
*/
public class QueryConditionUtil {
private final static Logger LOG = LoggerFactory.getLogger(QueryConditionUtil.class);
public static String buildCondition(List<List<TBinaryPredicate>> predicates,
Map<String, String> columnMapping) {
List<String> condition = Lists.newArrayList();
for (List<TBinaryPredicate> tBinaryPredicates : predicates) {
StringJoiner joiner = new StringJoiner(" OR ", "(", ")");
for (TBinaryPredicate predicate : tBinaryPredicates) {
String name = predicate.getCol().getName();
name = columnMapping.getOrDefault(name, name);
String op = converse(predicate.getOp());
String value = getTColumnValueAsString(predicate.getValue());
joiner.add(String.format("%s %s %s", name, op, value));
}
condition.add(joiner.toString());
}
return Joiner.on(" AND ").join(condition);
}
/**
* Return the value of a defined field as a string. If the "value" is null or the type
* is not supported, an exception is thrown.
*
* @see org.apache.impala.planner.DataSourceScanNode#literalToColumnValue
*/
public static String getTColumnValueAsString(TColumnValue value) {
Preconditions.checkState(value != null);
StringBuilder sb = new StringBuilder();
if (value.isSetBool_val()) {
sb.append(value.bool_val);
} else if (value.isSetByte_val()) {
sb.append(value.byte_val);
} else if (value.isSetShort_val()) {
sb.append(value.short_val);
} else if (value.isSetInt_val()) {
sb.append(value.int_val);
} else if (value.isSetLong_val()) {
sb.append(value.long_val);
} else if (value.isSetDouble_val()) {
sb.append(value.double_val);
} else if (value.isSetString_val()) {
sb.append(String.format("'%s'", value.string_val));
} else {
// TODO: Support data types of DECIMAL, TIMESTAMP, DATE and binary for predicates.
// Keep in-sync with DataSourceScanNode.literalToColumnValue().
throw new IllegalArgumentException("Unsupported data type.");
}
return sb.toString();
}
/**
* @see BinaryPredicate.Operator
*/
public static String converse(TComparisonOp op) {
for (Operator operator : BinaryPredicate.Operator.values()) {
if (operator.getThriftOp() == op) {
return operator.toString();
}
}
return null;
}
}

View File

@@ -0,0 +1,253 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.extdatasource.jdbc;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
import org.apache.impala.extdatasource.thrift.TCloseParams;
import org.apache.impala.extdatasource.thrift.TCloseResult;
import org.apache.impala.extdatasource.thrift.TColumnDesc;
import org.apache.impala.extdatasource.thrift.TComparisonOp;
import org.apache.impala.extdatasource.thrift.TGetNextParams;
import org.apache.impala.extdatasource.thrift.TGetNextResult;
import org.apache.impala.extdatasource.thrift.TOpenParams;
import org.apache.impala.extdatasource.thrift.TOpenResult;
import org.apache.impala.extdatasource.thrift.TPrepareParams;
import org.apache.impala.extdatasource.thrift.TPrepareResult;
import org.apache.impala.extdatasource.thrift.TRowBatch;
import org.apache.impala.extdatasource.thrift.TTableSchema;
import org.apache.impala.thrift.TColumnData;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPrimitiveType;
import org.apache.impala.thrift.TScalarType;
import org.apache.impala.thrift.TTypeNode;
import org.apache.impala.thrift.TTypeNodeType;
import org.apache.impala.thrift.TUniqueId;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class JdbcDataSourceTest {
private static final Logger LOG = LoggerFactory.getLogger(JdbcDataSourceTest.class);
private static String initString_ = "CACHE_CLASS::{\"database.type\":\"H2\", "
+ "\"jdbc.url\":\"jdbc:h2:mem:test;MODE=MySQL;INIT=runscript from "
+ "'classpath:test_script.sql'\", "
+ "\"jdbc.driver\":\"org.h2.Driver\", "
+ "\"table\":\"test_strategy\","
+ "\"column.mapping\":\"id=strategy_id\"}";
// Share data between tests
private static JdbcDataSource jdbcDataSource_ = new JdbcDataSource();
private static String scanHandle_;
private static TTableSchema schema_;
private static List<List<TBinaryPredicate>> predicates_ = Lists.newArrayList();
private static List<List<TBinaryPredicate>> acceptedPredicates_ = Lists.newArrayList();
private static long expectReturnRows_ = 5L;
@Test
public void test01Init() {
String colName = "id";
TComparisonOp op = TComparisonOp.LE;
TTypeNode typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
TScalarType scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.INT);
typeNode.setScalar_type(scalarType);
TColumnType colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
TColumnDesc col = new TColumnDesc().setName(colName).setType(colType);
TColumnValue value = new TColumnValue();
value.setInt_val(3);
TBinaryPredicate idPredicate = new TBinaryPredicate().setCol(col).setOp(op)
.setValue(value);
// predicates filter
predicates_.add(Lists.newArrayList(idPredicate));
expectReturnRows_ = 3L;
LOG.info("setup predicates:{}, expectReturnRows: {}", predicates_, expectReturnRows_);
boolean ret = jdbcDataSource_.convertInitStringToConfiguration(initString_);
Assert.assertTrue(ret);
}
@Test
public void test02Prepare() {
TPrepareParams params = new TPrepareParams();
params.setTable_name("test_strategy");
params.setInit_string(initString_);
params.setPredicates(Lists.newArrayList());
params.setPredicates(predicates_);
TPrepareResult resp = jdbcDataSource_.prepare(params);
Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
if (resp.isSetAccepted_conjuncts()) {
acceptedPredicates_ = Lists.newArrayList();
// @see org.apache.impala.planner.DataSourceScanNode#removeAcceptedConjuncts
List<Integer> acceptedPredicatesIdx = resp.getAccepted_conjuncts();
// Because conjuncts_ is modified in place using positional indexes from
// conjunctsIdx, we remove the accepted predicates in reverse order.
for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) {
int acceptedPredIdx = acceptedPredicatesIdx.get(i);
acceptedPredicates_.add(predicates_.remove(acceptedPredIdx));
}
// Returns a view of the list in the original order as we will print these
// in the explain string and it's convenient to have predicates printed
// in the same order that they're specified.
acceptedPredicates_ = Lists.reverse(acceptedPredicates_);
}
if (resp.isSetNum_rows_estimate()) {
long estimate = resp.getNum_rows_estimate();
Assert.assertEquals(5, estimate);
}
}
@Test
public void test03Open() {
TOpenParams params = new TOpenParams();
TUniqueId unique_id = new TUniqueId();
unique_id.hi = 0xfeedbeeff00d7777L;
unique_id.lo = 0x2020202020202020L;
String str = "feedbeeff00d7777:2020202020202020";
params.setQuery_id(unique_id);
params.setTable_name("test_strategy");
params.setInit_string(initString_);
schema_ = initSchema();
params.setRow_schema(schema_);
params.setBatch_size(5);
params.setPredicates(acceptedPredicates_);
TOpenResult resp = jdbcDataSource_.open(params);
Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
scanHandle_ = resp.getScan_handle();
Assert.assertTrue(StringUtils.isNoneBlank(scanHandle_));
}
@Test
public void test04GetNext() {
TGetNextParams params = new TGetNextParams();
params.setScan_handle(scanHandle_);
boolean eos;
long totalNumRows = 0;
do {
TGetNextResult resp = jdbcDataSource_.getNext(params);
Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
eos = resp.isEos();
TRowBatch rowBatch = resp.getRows();
long numRows = rowBatch.getNum_rows();
totalNumRows += numRows;
List<TColumnData> cols = rowBatch.getCols();
Assert.assertEquals(schema_.getColsSize(), cols.size());
} while (!eos);
Assert.assertEquals(expectReturnRows_, totalNumRows);
}
@Test
public void test05Close() {
TCloseParams params = new TCloseParams();
params.setScan_handle(scanHandle_);
TCloseResult resp = jdbcDataSource_.close(params);
Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
}
private static TTableSchema initSchema() {
// strategy_id int, name string, referrer string, landing string, priority int,
// implementation string, last_modified timestamp
TTableSchema schema_ = new TTableSchema();
TColumnDesc col = new TColumnDesc();
col.setName("id");
TTypeNode typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
TScalarType scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.INT);
typeNode.setScalar_type(scalarType);
TColumnType colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
col.setType(colType);
schema_.addToCols(col);
col = new TColumnDesc();
col.setName("name");
typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.STRING);
typeNode.setScalar_type(scalarType);
colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
col.setType(colType);
schema_.addToCols(col);
col = new TColumnDesc();
col.setName("priority");
typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.INT);
typeNode.setScalar_type(scalarType);
colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
col.setType(colType);
schema_.addToCols(col);
col = new TColumnDesc();
col.setName("implementation");
typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.STRING);
typeNode.setScalar_type(scalarType);
colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
col.setType(colType);
schema_.addToCols(col);
col = new TColumnDesc();
col.setName("last_modified");
typeNode = new TTypeNode();
typeNode.setType(TTypeNodeType.SCALAR);
scalarType = new TScalarType();
scalarType.setType(TPrimitiveType.TIMESTAMP);
typeNode.setScalar_type(scalarType);
colType = new TColumnType();
colType.setTypes(Lists.newArrayList(typeNode));
col.setType(colType);
schema_.addToCols(col);
return schema_;
}
public static void printData(List<TColumnDesc> colDescs, List<TColumnData> colDatas) {
for (int i = 0; i < colDatas.size(); ++i) {
TColumnDesc colDesc = colDescs.get(i);
TColumnData colData = colDatas.get(i);
System.out.println("idx: " + i);
System.out.println(" Name: " + colDesc);
System.out.println(" Data: " + colData);
}
}
}

View File

@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define some default values that can be overridden by system properties
# Don't use hadoop.root.logger because Hadoop's config scripts override it
impala.hadoop.root.logger=INFO,console
# Define the root logger to the system property "impala.hadoop.root.logger".
log4j.rootLogger=${impala.hadoop.root.logger}
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

View File

@@ -0,0 +1,47 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
DROP TABLE IF EXISTS test_strategy;
CREATE TABLE IF NOT EXISTS test_strategy (
strategy_id int(11) NOT NULL,
name varchar(50) NOT NULL,
referrer varchar(1024) DEFAULT NULL,
landing varchar(1024) DEFAULT NULL,
priority int(11) DEFAULT NULL,
implementation varchar(512) DEFAULT NULL,
last_modified timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (strategy_id)
);
INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation,
last_modified)
VALUES (1, 'S1', 'aaa', 'abc', 1000, NULL, '2012-05-08 15:01:15');
INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation,
last_modified)
VALUES (2, 'S2', 'bbb', 'def', 990, NULL, '2012-05-08 15:01:15');
INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation,
last_modified)
VALUES (3, 'S3', 'ccc', 'ghi', 1000, NULL, '2012-05-08 15:01:15');
INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation,
last_modified)
VALUES (4, 'S4', 'ddd', 'jkl', 980, NULL, '2012-05-08 15:01:15');
INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation,
last_modified)
VALUES (5, 'S5', 'eee', NULL, NULL, NULL, '2012-05-08 15:01:15');

View File

@@ -33,5 +33,6 @@
<module>api</module>
<module>sample</module>
<module>test</module>
<module>jdbc</module>
</modules>
</project>