diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index d7b9bc622..9f27b90eb 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -28,8 +28,10 @@ add_library(Exec STATIC catalog-op-executor.cc cross-join-node.cc data-sink.cc + data-source-scan-node.cc delimited-text-parser.cc exec-node.cc + external-data-source-executor.cc exchange-node.cc hash-join-node.cc hash-join-node-ir.cc diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc new file mode 100644 index 000000000..c2ec47605 --- /dev/null +++ b/be/src/exec/data-source-scan-node.cc @@ -0,0 +1,352 @@ +// Copyright 2014 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/data-source-scan-node.h" + +#include +#include +#include + +#include "exec/parquet-common.h" +#include "exec/read-write-util.h" +#include "exprs/expr.h" +#include "runtime/mem-pool.h" +#include "runtime/runtime-state.h" +#include "runtime/row-batch.h" +#include "runtime/string-value.h" +#include "runtime/tuple-row.h" +#include "util/jni-util.h" +#include "util/periodic-counter-updater.h" +#include "util/runtime-profile.h" + +using namespace std; +using namespace strings; +using namespace impala::extdatasource; + +DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on " + "external data sources."); + +namespace impala { + +// $0 = num expected cols, $1 = actual num columns +const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. " + "Expected $0 but received $1. This likely indicates a problem with the data source " + "library."; +const string ERROR_MISMATCHED_COL_SIZES = "Data source returned columns containing " + "different numbers of rows. This likely indicates a problem with the data source " + "library."; +// $0 = column type (e.g. INT) +const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent column data. " + "Expected value of type $0 based on column metadata. This likely indicates a " + "problem with the data source library."; +const string ERROR_INVALID_TIMESTAMP = "Data source returned invalid timestamp data. " + "This likely indicates a problem with the data source library."; +const string ERROR_INVALID_DECIMAL = "Data source returned invalid decimal data. " + "This likely indicates a problem with the data source library."; + +// Size of an encoded TIMESTAMP +const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t); + +DataSourceScanNode::DataSourceScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + data_src_node_(tnode.data_source_node), + tuple_idx_(0), + num_rows_(0), + next_row_idx_(0) { +} + +DataSourceScanNode::~DataSourceScanNode() { +} + +Status DataSourceScanNode::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Prepare(state)); + tuple_desc_ = state->desc_tbl().GetTupleDescriptor(data_src_node_.tuple_id); + DCHECK(tuple_desc_ != NULL); + + data_source_executor_.reset(new ExternalDataSourceExecutor()); + RETURN_IF_ERROR(data_source_executor_->Init(data_src_node_.data_source.hdfs_location, + data_src_node_.data_source.class_name, data_src_node_.data_source.api_version)); + + // Initialize materialized_slots_ and cols_next_val_idx_. + BOOST_FOREACH(SlotDescriptor* slot, tuple_desc_->slots()) { + if (!slot->is_materialized()) continue; + materialized_slots_.push_back(slot); + cols_next_val_idx_.push_back(0); + } + return Status::OK; +} + +Status DataSourceScanNode::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecNode::Open(state)); + RETURN_IF_CANCELLED(state); + + // Prepare the schema for TOpenParams.row_schema + vector cols; + BOOST_FOREACH(const SlotDescriptor* slot, materialized_slots_) { + extdatasource::TColumnDesc col; + int col_idx = slot->col_pos(); + col.__set_name(tuple_desc_->table_desc()->col_names()[col_idx]); + col.__set_type(slot->type().ToThrift()); + cols.push_back(col); + } + extdatasource::TTableSchema row_schema; + row_schema.__set_cols(cols); + + TOpenParams params; + params.__set_query_id(state->query_id()); + params.__set_table_name(tuple_desc_->table_desc()->name()); + params.__set_init_string(data_src_node_.init_string); + params.__set_authenticated_user_name(state->connected_user()); + params.__set_row_schema(row_schema); + params.__set_batch_size(FLAGS_data_source_batch_size); + params.__set_predicates(data_src_node_.accepted_predicates); + TOpenResult result; + RETURN_IF_ERROR(data_source_executor_->Open(params, &result)); + RETURN_IF_ERROR(Status(result.status)); + scan_handle_ = result.scan_handle; + return GetNextInputBatch(); +} + +Status DataSourceScanNode::ValidateRowBatchSize() { + if (!input_batch_->__isset.rows) return Status::OK; + const vector& cols = input_batch_->rows.cols; + if (materialized_slots_.size() != cols.size()) { + return Status(Substitute(ERROR_NUM_COLUMNS, materialized_slots_.size(), cols.size())); + } + + num_rows_ = -1; + // If num_rows was set, use that, otherwise we set it to be the number of rows in + // the first TColumnData and then ensure the number of rows in other columns are + // consistent. + if (input_batch_->rows.__isset.num_rows) num_rows_ = input_batch_->rows.num_rows; + for (int i = 0; i < materialized_slots_.size(); ++i) { + const TColumnData& col_data = cols[i]; + if (num_rows_ < 0) num_rows_ = col_data.is_null.size(); + if (num_rows_ != col_data.is_null.size()) return Status(ERROR_MISMATCHED_COL_SIZES); + } + return Status::OK; +} + +Status DataSourceScanNode::GetNextInputBatch() { + input_batch_.reset(new TGetNextResult()); + next_row_idx_ = 0; + // Reset all the indexes into the column value arrays to 0 + memset(&cols_next_val_idx_[0], 0, sizeof(int) * cols_next_val_idx_.size()); + TGetNextParams params; + params.__set_scan_handle(scan_handle_); + RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get())); + RETURN_IF_ERROR(Status(input_batch_->status)); + return ValidateRowBatchSize(); +} + +// Sets the decimal value in the slot. Inline method to avoid nested switch statements. +inline Status SetDecimalVal(const ColumnType& type, char* bytes, int len, + void* slot) { + uint8_t* buffer = reinterpret_cast(bytes); + switch (type.GetByteSize()) { + case 4: { + Decimal4Value* val = reinterpret_cast(slot); + if (len > sizeof(Decimal4Value)) return Status(ERROR_INVALID_DECIMAL); + // TODO: Move Decode() to a more generic utils class (here and below) + ParquetPlainEncoder::Decode(buffer, len, val); + } + case 8: { + Decimal8Value* val = reinterpret_cast(slot); + if (len > sizeof(Decimal8Value)) return Status(ERROR_INVALID_DECIMAL); + ParquetPlainEncoder::Decode(buffer, len, val); + break; + } + case 16: { + Decimal16Value* val = reinterpret_cast(slot); + if (len > sizeof(Decimal16Value)) return Status(ERROR_INVALID_DECIMAL); + ParquetPlainEncoder::Decode(buffer, len, val); + break; + } + default: DCHECK(false); + } + return Status::OK; +} + +Status DataSourceScanNode::MaterializeNextRow(MemPool* tuple_pool) { + const vector& cols = input_batch_->rows.cols; + tuple_->Init(tuple_desc_->byte_size()); + + for (int i = 0; i < materialized_slots_.size(); ++i) { + const SlotDescriptor* slot_desc = materialized_slots_[i]; + void* slot = tuple_->GetSlot(slot_desc->tuple_offset()); + const TColumnData& col = cols[i]; + + if (col.is_null[next_row_idx_]) { + tuple_->SetNull(slot_desc->null_indicator_offset()); + continue; + } + + // Get and increment the index into the values array (e.g. int_vals) for this col. + int val_idx = cols_next_val_idx_[i]++; + switch (slot_desc->type().type) { + case TYPE_STRING: { + if (val_idx >= col.string_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "STRING")); + } + const string& val = col.string_vals[val_idx]; + size_t val_size = val.size(); + char* buffer = reinterpret_cast(tuple_pool->Allocate(val_size)); + memcpy(buffer, val.data(), val_size); + reinterpret_cast(slot)->ptr = buffer; + reinterpret_cast(slot)->len = val_size; + break; + } + case TYPE_TINYINT: + if (val_idx >= col.byte_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "TINYINT")); + } + *reinterpret_cast(slot) = col.byte_vals[val_idx]; + break; + case TYPE_SMALLINT: + if (val_idx >= col.short_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "SMALLINT")); + } + *reinterpret_cast(slot) = col.short_vals[val_idx]; + break; + case TYPE_INT: + if (val_idx >= col.int_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "INT")); + } + *reinterpret_cast(slot) = col.int_vals[val_idx]; + break; + case TYPE_BIGINT: + if (val_idx >= col.long_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "BIGINT")); + } + *reinterpret_cast(slot) = col.long_vals[val_idx]; + break; + case TYPE_DOUBLE: + if (val_idx >= col.double_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "DOUBLE")); + } + *reinterpret_cast(slot) = col.double_vals[val_idx]; + break; + case TYPE_FLOAT: + if (val_idx >= col.double_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "FLOAT")); + } + *reinterpret_cast(slot) = col.double_vals[val_idx]; + break; + case TYPE_BOOLEAN: + if (val_idx >= col.bool_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN")); + } + *reinterpret_cast(slot) = col.bool_vals[val_idx]; + break; + case TYPE_TIMESTAMP: { + if (val_idx >= col.binary_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "TIMESTAMP")); + } + const string& val = col.binary_vals[val_idx]; + if (val.size() != TIMESTAMP_SIZE) return Status(ERROR_INVALID_TIMESTAMP); + const uint8_t* bytes = reinterpret_cast(val.data()); + *reinterpret_cast(slot) = TimestampValue( + ReadWriteUtil::GetInt(bytes), + ReadWriteUtil::GetInt(bytes + sizeof(int64_t))); + break; + } + case TYPE_DECIMAL: { + if (val_idx >= col.binary_vals.size()) { + return Status(Substitute(ERROR_INVALID_COL_DATA, "DECIMAL")); + } + const string& val = col.binary_vals[val_idx]; + RETURN_IF_ERROR(SetDecimalVal(slot_desc->type(), const_cast(val.data()), + val.size(), slot)); + break; + } + default: + DCHECK(false); + } + } + return Status::OK; +} + +Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(runtime_profile_->total_time_counter()); + if (ReachedLimit()) { + *eos = true; + return Status::OK; + } + *eos = false; + + // create new tuple buffer for row_batch + MemPool* tuple_pool = row_batch->tuple_data_pool(); + int tuple_buffer_size = row_batch->capacity() * tuple_desc_->byte_size(); + void* tuple_buffer = tuple_pool->Allocate(tuple_buffer_size); + tuple_ = reinterpret_cast(tuple_buffer); + Expr** conjuncts = &conjuncts_[0]; + int num_conjuncts = conjuncts_.size(); + + while (true) { + { + SCOPED_TIMER(materialize_tuple_timer()); + // copy rows until we hit the limit/capacity or until we exhaust input_batch_ + while (!ReachedLimit() && !row_batch->AtCapacity() && InputBatchHasNext() && + tuple_pool->total_allocated_bytes() < RowBatch::AT_CAPACITY_MEM_USAGE) { + RETURN_IF_ERROR(MaterializeNextRow(tuple_pool)); + int row_idx = row_batch->AddRow(); + TupleRow* tuple_row = row_batch->GetRow(row_idx); + tuple_row->SetTuple(tuple_idx_, tuple_); + + if (ExecNode::EvalConjuncts(conjuncts, num_conjuncts, tuple_row)) { + row_batch->CommitLastRow(); + char* new_tuple = reinterpret_cast(tuple_); + new_tuple += tuple_desc_->byte_size(); + tuple_ = reinterpret_cast(new_tuple); + ++num_rows_returned_; + } + ++next_row_idx_; + } + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + + if (ReachedLimit() || row_batch->AtCapacity() || input_batch_->eos) { + *eos = ReachedLimit() || input_batch_->eos; + return Status::OK; + } + } + + // Need more rows + DCHECK(!InputBatchHasNext()); + RETURN_IF_ERROR(GetNextInputBatch()); + } +} + +void DataSourceScanNode::Close(RuntimeState* state) { + if (is_closed()) return; + SCOPED_TIMER(runtime_profile_->total_time_counter()); + PeriodicCounterUpdater::StopRateCounter(total_throughput_counter()); + PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); + input_batch_.reset(); + TCloseParams params; + TCloseResult result; + Status status = data_source_executor_->Close(params, &result); + state->LogError(status); // logs the error if status != OK + ExecNode::Close(state); +} + +void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const { + string indent(indentation_level * 2, ' '); + *out << indent << "DataSourceScanNode(tupleid=" << data_src_node_.tuple_id << ")"; +} + +} // namespace impala diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h new file mode 100644 index 000000000..dec7c881e --- /dev/null +++ b/be/src/exec/data-source-scan-node.h @@ -0,0 +1,120 @@ +// Copyright 2014 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#ifndef IMPALA_EXEC_DATA_SOURCE_SCAN_NODE_H_ +#define IMPALA_EXEC_DATA_SOURCE_SCAN_NODE_H_ + +#include +#include + +#include "exec/scan-node.h" +#include "exec/external-data-source-executor.h" +#include "runtime/descriptors.h" +#include "runtime/mem-pool.h" + +#include "gen-cpp/ExternalDataSource_types.h" + +namespace impala { + +class Tuple; + +// Scan node for external data sources. The external data source jar is loaded +// in Prepare() (via an ExternalDataSourceExecutor), and then the data source +// is called to receive row batches when necessary. This node converts the +// rows stored in a thrift structure to RowBatches. The external data source is +// closed in Close(). +class DataSourceScanNode : public ScanNode { + public: + DataSourceScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + ~DataSourceScanNode(); + + // Load the data source library and create the ExternalDataSourceExecutor. + virtual Status Prepare(RuntimeState* state); + + // Open the data source and initialize the first row batch. + virtual Status Open(RuntimeState* state); + + // Fill the next row batch, calls GetNext() on the external scanner. + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); + + // Close the scanner, and report errors. + virtual void Close(RuntimeState* state); + + protected: + // Write debug string of this into out. + virtual void DebugString(int indentation_level, std::stringstream* out) const; + + private: + // Used to call the external data source. + boost::scoped_ptr data_source_executor_; + + // Thrift structure describing the data source scan node. + const TDataSourceScanNode data_src_node_; + + // Descriptor of tuples read + const TupleDescriptor* tuple_desc_; + + // Tuple index in tuple row. + int tuple_idx_; + + // Current tuple. + Tuple* tuple_; + + // Vector containing slot descriptors for all materialized slots. These + // descriptors are sorted in order of increasing col_pos. + // TODO: Refactor to base class. HdfsScanNode has this and other nodes could use it. + std::vector materialized_slots_; + + // The opaque handle returned by the data source for the scan. + std::string scan_handle_; + + // The current result from calling GetNext() on the data source. Contains the + // thrift representation of the rows. + boost::scoped_ptr input_batch_; + + // The number of rows in input_batch_->rows. The data source should have set + // TRowBatch.num_rows, but we compute it just in case they haven't. + int num_rows_; + + // The index of the next row in input_batch_, + // i.e. the index into TColumnData.is_null. + size_t next_row_idx_; + + // The indexes of the next non-null value in the row batch, per column. Should always + // contain materialized_slots_.size() integers. All values are reset to 0 when getting + // the next row batch. + std::vector cols_next_val_idx_; + + // Materializes the next row (next_row_idx_) into tuple_. + Status MaterializeNextRow(MemPool* mem_pool); + + // Gets the next batch from the data source, stored in input_batch_. + Status GetNextInputBatch(); + + // Validate row_batch_ contains the correct number of columns and that columns + // contain the same number of rows. + Status ValidateRowBatchSize(); + + // True if input_batch_ has more rows. + bool InputBatchHasNext() { + if (!input_batch_->__isset.rows) return false; + return next_row_idx_ < num_rows_; + } +}; + +} + +#endif diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 5cda783de..ec85cbccc 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -27,6 +27,7 @@ #include "exec/hash-join-node.h" #include "exec/hdfs-scan-node.h" #include "exec/hbase-scan-node.h" +#include "exec/data-source-scan-node.h" #include "exec/exchange-node.h" #include "exec/merge-node.h" #include "exec/cross-join-node.h" @@ -239,6 +240,9 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, case TPlanNodeType::HBASE_SCAN_NODE: *node = pool->Add(new HBaseScanNode(pool, tnode, descs)); break; + case TPlanNodeType::DATA_SOURCE_NODE: + *node = pool->Add(new DataSourceScanNode(pool, tnode, descs)); + break; case TPlanNodeType::AGGREGATION_NODE: *node = pool->Add(new AggregationNode(pool, tnode, descs)); break; diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc new file mode 100644 index 000000000..c5a6e5b61 --- /dev/null +++ b/be/src/exec/external-data-source-executor.cc @@ -0,0 +1,117 @@ +// Copyright 2014 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/external-data-source-executor.h" + +#include +#include + +#include "common/logging.h" +#include "rpc/thrift-util.h" +#include "runtime/lib-cache.h" +#include "util/jni-util.h" +#include "util/parse-util.h" + +using namespace std; +using namespace impala; +using namespace impala::extdatasource; + +ExternalDataSourceExecutor::~ExternalDataSourceExecutor() { + DCHECK(!is_initialized_); +} + +Status ExternalDataSourceExecutor::Init(const string& jar_path, + const string& class_name, const string& api_version) { + DCHECK(!is_initialized_); + string local_jar_path; + RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( + jar_path, LibCache::TYPE_JAR, &local_jar_path)); + + // TODO: Make finding the class and methods static, i.e. only loaded once + JniMethodDescriptor methods[] = { + {"", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", &ctor_}, + {"open", "([B)[B", &open_id_}, + {"getNext", "([B)[B", &get_next_id_}, + {"close", "([B)[B", &close_id_}}; + + JNIEnv* jni_env = getJNIEnv(); + external_data_source_executor_class_ = + jni_env->FindClass("com/cloudera/impala/extdatasource/ExternalDataSourceExecutor"); + RETURN_ERROR_IF_EXC(jni_env); + uint32_t num_methods = sizeof(methods) / sizeof(methods[0]); + for (int i = 0; i < num_methods; ++i) { + RETURN_IF_ERROR(JniUtil::LoadJniMethod(jni_env, external_data_source_executor_class_, + &(methods[i]))); + } + + jstring jar_path_jstr = jni_env->NewStringUTF(local_jar_path.c_str()); + RETURN_ERROR_IF_EXC(jni_env); + jstring class_name_jstr = jni_env->NewStringUTF(class_name.c_str()); + RETURN_ERROR_IF_EXC(jni_env); + jstring api_version_jstr = jni_env->NewStringUTF(api_version.c_str()); + RETURN_ERROR_IF_EXC(jni_env); + + jobject external_data_source_executor = jni_env->NewObject( + external_data_source_executor_class_, ctor_, jar_path_jstr, class_name_jstr, + api_version_jstr); + RETURN_ERROR_IF_EXC(jni_env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, external_data_source_executor, + &external_data_source_executor_)); + RETURN_ERROR_IF_EXC(jni_env); + is_initialized_ = true; + return Status::OK; +} + +// JniUtil::CallJniMethod() does not compile when the template parameters are in +// another namespace. The issue seems to be that SerializeThriftMsg/DeserializeThriftMsg +// are not being generated for these types. +// TODO: Understand what's happening, remove, and use JniUtil::CallJniMethod +template +Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg, + R* response) { + JNIEnv* jni_env = getJNIEnv(); + jbyteArray request_bytes; + JniLocalFrame jni_frame; + RETURN_IF_ERROR(jni_frame.push(jni_env)); + RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes)); + jbyteArray result_bytes = static_cast( + jni_env->CallObjectMethod(obj, method, request_bytes)); + RETURN_ERROR_IF_EXC(jni_env); + RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response)); + return Status::OK; +} + +Status ExternalDataSourceExecutor::Open(const TOpenParams& params, TOpenResult* result) { + DCHECK(is_initialized_); + return CallJniMethod(external_data_source_executor_, open_id_, params, result); +} + +Status ExternalDataSourceExecutor::GetNext(const TGetNextParams& params, + TGetNextResult* result) { + DCHECK(is_initialized_); + return CallJniMethod(external_data_source_executor_, get_next_id_, params, result); +} + +Status ExternalDataSourceExecutor::Close(const TCloseParams& params, + TCloseResult* result) { + DCHECK(is_initialized_); + Status status = CallJniMethod(external_data_source_executor_, close_id_, params, + result); + JNIEnv* env = getJNIEnv(); + env->DeleteGlobalRef(external_data_source_executor_); + status.AddError(JniUtil::GetJniExceptionMsg(env)); // no-op if Status == OK + is_initialized_ = false; + return status; +} + diff --git a/be/src/exec/external-data-source-executor.h b/be/src/exec/external-data-source-executor.h new file mode 100644 index 000000000..3e8ce52d9 --- /dev/null +++ b/be/src/exec/external-data-source-executor.h @@ -0,0 +1,70 @@ +// Copyright 2014 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_EXEC_EXTERNAL_DATA_SOURCE_EXECUTOR_H +#define IMPALA_EXEC_EXTERNAL_DATA_SOURCE_EXECUTOR_H + +#include +#include + +#include "common/status.h" + +#include "gen-cpp/ExternalDataSource_types.h" + +namespace impala { + +// Wraps the Java class ExternalDataSourceExecutor to call a data source. +// There is an explicit Init() method (rather than initializing in the c'tor) so +// that the initialization can return an error status if an error occurs. +class ExternalDataSourceExecutor { + public: + ExternalDataSourceExecutor() : is_initialized_(false) { }; + virtual ~ExternalDataSourceExecutor(); + + // Initialize the data source library. jar_path is the HDFS location of the jar + // containing the ExternalDataSource implementation specified by class_name. The + // class must implement the specified api_version. + Status Init(const std::string& jar_path, const std::string& class_name, + const std::string& api_version); + + // Calls ExternalDataSource.open() + Status Open(const impala::extdatasource::TOpenParams& params, + impala::extdatasource::TOpenResult* result); + + // Calls ExternalDataSource.getNext() + Status GetNext(const impala::extdatasource::TGetNextParams& params, + impala::extdatasource::TGetNextResult* result); + + // Calls ExternalDataSource.close() and deletes the reference to the + // external_data_source_executor_. After calling Close(), this should no + // longer be used. + Status Close(const impala::extdatasource::TCloseParams& params, + impala::extdatasource::TCloseResult* result); + + private: + bool is_initialized_; // Set true in Init() to ensure the class is initialized. + + // Descriptor of Java ExternalDataSourceExecutor class, used to create a new instance. + jclass external_data_source_executor_class_; + // Instance of com.cloudera.impala.extdatasource.ExternalDataSourceExecutor + jobject external_data_source_executor_; + jmethodID ctor_; + jmethodID open_id_; // ExternalDataSourceExecutor.open() + jmethodID get_next_id_; // ExternalDataSourceExecutor.getNext() + jmethodID close_id_; // ExternalDataSourceExecutor.close() +}; + +} + +#endif diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 81c55d49f..47d9d8dac 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -353,6 +353,7 @@ class HdfsScanNode : public ScanNode { // Vector containing slot descriptors for all materialized non-partition key // slots. These descriptors are sorted in order of increasing col_pos + // TODO: Put this (with associated fields and logic) on ScanNode or ExecNode std::vector materialized_slots_; // Vector containing slot descriptors for all materialized partition key slots diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 8f8cb660d..b9c419261 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -132,6 +132,12 @@ string HdfsPartitionDescriptor::DebugString() const { return out.str(); } +string DataSourceTableDescriptor::DebugString() const { + stringstream out; + out << "DataSourceTable(" << TableDescriptor::DebugString() << ")"; + return out.str(); +} + HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : TableDescriptor(tdesc), @@ -323,6 +329,9 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::HBASE_TABLE: desc = pool->Add(new HBaseTableDescriptor(tdesc)); break; + case TTableType::DATA_SOURCE_TABLE: + desc = pool->Add(new DataSourceTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index ff11f6c1f..2ce5fb295 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -272,6 +272,13 @@ class HBaseTableDescriptor : public TableDescriptor { std::vector cols_; }; +// Descriptor for a DataSourceTable +class DataSourceTableDescriptor : public TableDescriptor { + public: + DataSourceTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) { } + virtual std::string DebugString() const; +}; + class TupleDescriptor { public: int byte_size() const { return byte_size_; } diff --git a/be/src/statestore/simple-scheduler.cc b/be/src/statestore/simple-scheduler.cc index bb4425d9b..a877faecd 100644 --- a/be/src/statestore/simple-scheduler.cc +++ b/be/src/statestore/simple-scheduler.cc @@ -663,6 +663,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request vector scan_node_types; scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE); scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE); + scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE); unordered_set unique_hosts; // compute hosts of producer fragment before those of consumer fragment(s), diff --git a/ext-data-source/api/src/main/java/com/cloudera/impala/extdatasource/util/SerializationUtils.java b/ext-data-source/api/src/main/java/com/cloudera/impala/extdatasource/util/SerializationUtils.java new file mode 100644 index 000000000..dc2e45600 --- /dev/null +++ b/ext-data-source/api/src/main/java/com/cloudera/impala/extdatasource/util/SerializationUtils.java @@ -0,0 +1,45 @@ +// Copyright 2014 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.extdatasource.util; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Timestamp; + +/** + * Utility methods for serialization by an ExternalDataSource. + */ +public class SerializationUtils { + + /** + * Encodes a DECIMAL value. + */ + public static ByteBuffer encodeDecimal(BigDecimal decimal) { + if (decimal == null) throw new NullPointerException("decimal cannot be null."); + return ByteBuffer.wrap(decimal.unscaledValue().toByteArray()); + } + + /** + * Encodes a TIMESTAMP value. + */ + public static ByteBuffer encodeTimestamp(Timestamp timestamp) { + if (timestamp == null) throw new NullPointerException("timestamp cannot be null."); + ByteBuffer buffer = ByteBuffer.allocate(8 + 4); + buffer.putLong(timestamp.getTime() / 1000); + buffer.putInt(timestamp.getNanos()); + buffer.rewind(); + return buffer; + } +} diff --git a/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java b/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java index e9f1f4b2c..5c6f52d1a 100644 --- a/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java +++ b/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java @@ -14,15 +14,11 @@ package com.cloudera.impala.extdatasource; +import java.math.BigDecimal; import java.math.BigInteger; -import java.nio.ByteBuffer; import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; - import com.cloudera.impala.extdatasource.thrift.TCloseParams; import com.cloudera.impala.extdatasource.thrift.TCloseResult; import com.cloudera.impala.extdatasource.thrift.TColumnDesc; @@ -34,12 +30,12 @@ import com.cloudera.impala.extdatasource.thrift.TPrepareParams; import com.cloudera.impala.extdatasource.thrift.TPrepareResult; import com.cloudera.impala.extdatasource.thrift.TRowBatch; import com.cloudera.impala.extdatasource.thrift.TTableSchema; +import com.cloudera.impala.extdatasource.util.SerializationUtils; import com.cloudera.impala.extdatasource.v1.ExternalDataSource; import com.cloudera.impala.thrift.TColumnData; import com.cloudera.impala.thrift.TColumnType; import com.cloudera.impala.thrift.TStatus; import com.cloudera.impala.thrift.TStatusCode; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -186,7 +182,7 @@ public class AllTypesDataSource implements ExternalDataSource { case TIMESTAMP: colData.addToIs_null(false); colData.addToBinary_vals( - ByteBuffer.wrap(new TimestampWritable(new Timestamp(currRow_)).getBytes())); + SerializationUtils.encodeTimestamp(new Timestamp(currRow_))); break; case DECIMAL: colData.addToIs_null(false); @@ -194,7 +190,7 @@ public class AllTypesDataSource implements ExternalDataSource { BigInteger val = maxUnscaled.subtract(BigInteger.valueOf(currRow_ + 1)); val = val.mod(maxUnscaled); if (currRow_ % 2 == 0) val = val.negate(); - colData.addToBinary_vals(ByteBuffer.wrap(val.toByteArray())); + colData.addToBinary_vals(SerializationUtils.encodeDecimal(new BigDecimal(val))); break; case BINARY: case CHAR: diff --git a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test new file mode 100644 index 000000000..71fcac5ac --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test @@ -0,0 +1,61 @@ +==== +---- QUERY +# Gets all types including a row with a NULL value. The predicate pushed to +# the data source is not actually used, but the second predicate is +# evaluated by Impala. +select smallint_col, tinyint_col, int_col, bigint_col, + float_col, bool_col, double_col, string_col, timestamp_col +from alltypes_datasource +where float_col != 0 and + int_col >= 1990 limit 5 +---- RESULTS +90,0,1990,19900,2189,true,1990,'NULL',1970-01-01 00:00:01.990000000 +91,1,1991,19910,2190.10009765625,false,1991,'1991',1970-01-01 00:00:01.991000000 +92,2,1992,19920,2191.199951171875,true,1992,'1992',1970-01-01 00:00:01.992000000 +93,3,1993,19930,2192.300048828125,false,1993,'1993',1970-01-01 00:00:01.993000000 +94,4,1994,19940,2193.39990234375,true,1994,'1994',1970-01-01 00:00:01.994000000 +---- TYPES +SMALLINT, TINYINT, INT, BIGINT, FLOAT, BOOLEAN, DOUBLE, STRING, TIMESTAMP +==== +---- QUERY +# Project a subset of the columns +select bigint_col, timestamp_col, double_col +from alltypes_datasource +where double_col != 0 and int_col >= 1990 limit 3 +---- RESULTS +19900,1970-01-01 00:00:01.990000000,1990 +19910,1970-01-01 00:00:01.991000000,1991 +19920,1970-01-01 00:00:01.992000000,1992 +---- TYPES +BIGINT, TIMESTAMP, DOUBLE +==== +---- QUERY +# count(*) with a predicate evaluated by Impala +select count(*) from alltypes_datasource +where float_col = 0 and + string_col is not NULL +---- RESULTS +4000 +---- TYPES +BIGINT +==== +---- QUERY +# count(*) with no predicates has no materialized slots +select count(*) from alltypes_datasource +---- RESULTS +5000 +---- TYPES +BIGINT +==== +---- QUERY +# Test decimal values. The test data source returns very large and very small values. +select * from decimal_datasource limit 5 +---- RESULTS +-999999999,-9999999999,-9999999999.9999999999,-9.9999999999999999999999999999999999999,-99999.99999 +999999998,9999999998,9999999999.9999999998,9.9999999999999999999999999999999999998,99999.99998 +-999999997,-9999999997,-9999999999.9999999997,-9.9999999999999999999999999999999999997,-99999.99997 +999999996,9999999996,9999999999.9999999996,9.9999999999999999999999999999999999996,99999.99996 +-999999995,-9999999995,-9999999999.9999999995,-9.9999999999999999999999999999999999995,-99999.99995 +---- TYPES +DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL +==== \ No newline at end of file diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index 198fdbabe..e58399552 100755 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -46,6 +46,12 @@ class TestQueries(ImpalaTestSuite): vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/distinct-estimate', vector) + def test_data_source_tables(self, vector): + # Only need to test on a single format + if vector.get_value('table_format').file_format != 'text': + pytest.skip() + self.run_test_case('QueryTest/data-source-tables', vector) + def test_scan_range(self, vector): self.run_test_case('QueryTest/hdfs-partitions', vector)