diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index 03a1d261c..184d29f85 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -149,7 +149,16 @@ Status DataSourceScanNode::GetNextInputBatch() { params.__set_scan_handle(scan_handle_); RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get())); RETURN_IF_ERROR(Status(input_batch_->status)); - return ValidateRowBatchSize(); + RETURN_IF_ERROR(ValidateRowBatchSize()); + if (!InputBatchHasNext() && !input_batch_->eos) { + // The data source should have set eos, but if it didn't we should just log a + // warning and continue as if it had. + VLOG_QUERY << "Data source " << data_src_node_.data_source.name << " returned no " + << "rows but did not set 'eos'. No more rows will be fetched from the " + << "data source."; + input_batch_->eos = true; + } + return Status::OK; } // Sets the decimal value in the slot. Inline method to avoid nested switch statements. diff --git a/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java b/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java index 893222548..e9a4b1135 100644 --- a/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java +++ b/ext-data-source/test/src/main/java/com/cloudera/impala/extdatasource/AllTypesDataSource.java @@ -20,6 +20,7 @@ import java.sql.Timestamp; import java.util.List; import java.util.UUID; +import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate; import com.cloudera.impala.extdatasource.thrift.TCloseParams; import com.cloudera.impala.extdatasource.thrift.TCloseResult; import com.cloudera.impala.extdatasource.thrift.TColumnDesc; @@ -35,16 +36,19 @@ import com.cloudera.impala.extdatasource.util.SerializationUtils; import com.cloudera.impala.extdatasource.v1.ExternalDataSource; import com.cloudera.impala.thrift.TColumnData; import com.cloudera.impala.thrift.TColumnType; +import com.cloudera.impala.thrift.TPrimitiveType; import com.cloudera.impala.thrift.TStatus; import com.cloudera.impala.thrift.TStatusCode; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; /** - * Data source implementation for tests that: - * (a) Accepts every other offered conjunct for testing planning, though - * predicates are not actually evaluated. - * (b) Returns trivial data of all supported types for query tests. + * Data source implementation for tests that can: + * (a) Accepts every other offered conjunct for testing planning (though predicates are + * not actually evaluated) and returns trivial data of all supported types for + * query tests. + * (b) Validate the predicates offered by Impala. */ public class AllTypesDataSource implements ExternalDataSource { // Total number of rows to return @@ -63,6 +67,7 @@ public class AllTypesDataSource implements ExternalDataSource { private TTableSchema schema_; private DataSourceState state_; private String scanHandle_; + private String validatePredicatesResult_; // Enumerates the states of the data source. private enum DataSourceState { @@ -85,12 +90,76 @@ public class AllTypesDataSource implements ExternalDataSource { public TPrepareResult prepare(TPrepareParams params) { Preconditions.checkState(state_ == DataSourceState.CREATED); List accepted = Lists.newArrayList(); - for (int i = 0; i < params.getPredicatesSize(); ++i) { - if (i % 2 == 0) accepted.add(i); + int numRowsReturned = 0; + if (validatePredicates(params.getPredicates())) { + // Indicate all predicates are applied because we return a dummy row with the + // result later to validate the result in tests. Impala shouldn't try to apply + // predicates to that dummy row. + for (int i = 0; i < params.getPredicatesSize(); ++i) accepted.add(i); + numRowsReturned = 1; + } else { + // Default behavior is to accept every other predicate. They are not actually + // applied, but we want to validate that Impala applies the correct predicates. + for (int i = 0; i < params.getPredicatesSize(); ++i) { + if (i % 2 == 0) accepted.add(i); + } + numRowsReturned = NUM_ROWS_RETURNED; } return new TPrepareResult(STATUS_OK) .setAccepted_conjuncts(accepted) - .setNum_rows_estimate(NUM_ROWS_RETURNED); + .setNum_rows_estimate(numRowsReturned); + } + + /** + * If the predicate value (assuming STRING) starts with 'VALIDATE_PREDICATES##', + * we validate the TPrepareParams.predicates against predicates specified after the + * 'VALIDATE_PREDICATES##' and return true. The result of the validation is stored + * in validatePredicatesResult_. + * + * The expected predicates are specified in the form "{slot} {TComparisonOp} {val}", + * and conjunctive predicates are separated by '&&'. + * + * For example, the predicates_spec validates the predicates in the following query: + * select * from table_name + * where predicates_spec = 'x LT 1 && y GT 2' and + * x < 1 and + * 2 > y; + * + * Current limitations: + * - Disjunctive predicates are not supported (e.g. "expr1 or expr2") + * - Only INT is supported + */ + private boolean validatePredicates(List> predicates) { + if (predicates == null || predicates.isEmpty()) return false; + TBinaryPredicate firstPredicate = predicates.get(0).get(0); + if (!firstPredicate.getValue().isSetString_val()) return false; + String colVal = firstPredicate.getValue().getString_val(); + if (!colVal.toUpperCase().startsWith("VALIDATE_PREDICATES##")) return false; + + String[] colValParts = colVal.split("##"); + Preconditions.checkArgument(colValParts.length == 2); + String[] expectedPredicates = colValParts[1].split("&&"); + Preconditions.checkArgument(expectedPredicates.length == predicates.size() - 1); + + String result = "SUCCESS"; + for (int i = 1; i < predicates.size(); ++i) { + String[] predicateParts = expectedPredicates[i - 1].trim().split(" "); + Preconditions.checkArgument(predicateParts.length == 3); + TBinaryPredicate predicate = + Iterables.getOnlyElement(predicates.get(i)); + Preconditions.checkArgument(predicate.getValue().isSetInt_val()); + + String slotName = predicate.getCol().getName().toUpperCase(); + int intVal = predicate.getValue().getInt_val(); + if (!predicateParts[0].toUpperCase().equals(slotName) || + !predicateParts[1].toUpperCase().equals(predicate.getOp().name()) || + !predicateParts[2].equals(Integer.toString(intVal))) { + result = "Failed predicate, expected=" + expectedPredicates[i - 1].trim() + + " actual=" + predicate.toString(); + } + } + validatePredicatesResult_ = result; + return true; } /** @@ -102,15 +171,24 @@ public class AllTypesDataSource implements ExternalDataSource { state_ = DataSourceState.OPENED; batchSize_ = INITIAL_BATCH_SIZE; schema_ = params.getRow_schema(); + // Need to check validatePredicates again because the call in Prepare() was from + // the frontend and used a different instance of this data source class. + if (validatePredicates(params.getPredicates())) { + // If validating predicates, only one STRING column should be selected. + Preconditions.checkArgument(schema_.getColsSize() == 1); + TColumnDesc firstCol = schema_.getCols().get(0); + Preconditions.checkArgument(firstCol.getType().getType() == TPrimitiveType.STRING); + } scanHandle_ = UUID.randomUUID().toString(); return new TOpenResult(STATUS_OK).setScan_handle(scanHandle_); } /** - * Returns row batches with generated rows based on the row index. Called multiple - * times, so the current row is stored between calls. Each row batch is a different - * size (not necessarily the size specified by TOpenParams.batch_size to ensure - * that Impala can handle unexpected batch sizes. + * If validating predicates, returns a single row with the result of the validation. + * Otherwise returns row batches with generated rows based on the row index. Called + * multiple times, so the current row is stored between calls. Each row batch is a + * different size (not necessarily the size specified by TOpenParams.batch_size to + * ensure that Impala can handle unexpected batch sizes. */ @Override public TGetNextResult getNext(TGetNextParams params) { @@ -118,6 +196,15 @@ public class AllTypesDataSource implements ExternalDataSource { Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_)); if (eos_) return new TGetNextResult(STATUS_OK).setEos(eos_); + if (validatePredicatesResult_ != null) { + TColumnData colData = new TColumnData(); + colData.setIs_null(Lists.newArrayList(false)); + colData.setString_vals(Lists.newArrayList(validatePredicatesResult_)); + eos_ = true; + return new TGetNextResult(STATUS_OK).setEos(eos_) + .setRows(new TRowBatch().setCols(Lists.newArrayList(colData)).setNum_rows(1)); + } + List cols = Lists.newArrayList(); for (int i = 0; i < schema_.getColsSize(); ++i) { cols.add(new TColumnData().setIs_null(Lists.newArrayList())); diff --git a/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java b/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java index 6ebb76bba..a52f68457 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java @@ -61,6 +61,18 @@ public class BinaryPredicate extends Predicate { public String toString() { return description_; } public String getName() { return name_; } public TComparisonOp getThriftOp() { return thriftOp_; } + + public Operator converse() { + switch (this) { + case EQ: return EQ; + case NE: return NE; + case LE: return GE; + case GE: return LE; + case LT: return GT; + case GT: return LT; + default: throw new IllegalStateException("Invalid operator"); + } + } } public static void initBuiltins(Db db) { diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java index a9e48d40d..2a91bb670 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java @@ -56,6 +56,7 @@ import com.cloudera.impala.thrift.TStatusCode; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -191,10 +192,12 @@ public class DataSourceScanNode extends ScanNode { numRowsEstimate_ = prepareResult.getNum_rows_estimate(); acceptedPredicates_ = Lists.newArrayList(); - for (Integer acceptedIdx: prepareResult.getAccepted_conjuncts()) { + List acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ? + prepareResult.getAccepted_conjuncts() : ImmutableList.of(); + for (Integer acceptedIdx: acceptedPredicatesIdx) { acceptedPredicates_.add(offeredPredicates.get(acceptedIdx)); } - removeAcceptedConjuncts(prepareResult.getAccepted_conjuncts(), conjunctsIdx); + removeAcceptedConjuncts(acceptedPredicatesIdx, conjunctsIdx); } /** @@ -216,14 +219,17 @@ public class DataSourceScanNode extends ScanNode { if (conjunct.getChildren().size() != 2) return false; SlotRef slotRef = null; LiteralExpr literalExpr = null; + TComparisonOp op = null; if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) && (conjunct.getChild(1) instanceof LiteralExpr)) { slotRef = (SlotRef) conjunct.getChild(0).unwrapSlotRef(true); literalExpr = (LiteralExpr) conjunct.getChild(1); + op = ((BinaryPredicate) conjunct).getOp().getThriftOp(); } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) && (conjunct.getChild(0) instanceof LiteralExpr)) { slotRef = (SlotRef) conjunct.getChild(1).unwrapSlotRef(true); literalExpr = (LiteralExpr) conjunct.getChild(0); + op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp(); } else { return false; } @@ -233,7 +239,6 @@ public class DataSourceScanNode extends ScanNode { TColumnDesc col = new TColumnDesc().setName(slotRef.getColumnName()) .setType(slotRef.getType().toThrift()); - TComparisonOp op = ((BinaryPredicate) conjunct).getOp().getThriftOp(); predicates.add(new TBinaryPredicate().setCol(col).setOp(op).setValue(val)); return true; } else if (conjunct instanceof CompoundPredicate) { diff --git a/testdata/bin/create-data-source-table.sql b/testdata/bin/create-data-source-table.sql index d6f432458..7942e4838 100644 --- a/testdata/bin/create-data-source-table.sql +++ b/testdata/bin/create-data-source-table.sql @@ -33,15 +33,10 @@ CREATE TABLE alltypes_datasource ( float_col FLOAT, double_col DOUBLE, timestamp_col TIMESTAMP, - string_col STRING) + string_col STRING, + dec_col1 DECIMAL(9,0), + dec_col2 DECIMAL(10,0), + dec_col3 DECIMAL(20,10), + dec_col4 DECIMAL(38,37), + dec_col5 DECIMAL(10,5)) PRODUCED BY DATASOURCE AllTypesDataSource("TestInitString"); - --- TODO: Remove table and move decimal cols into alltypes_datasource -DROP TABLE IF EXISTS decimal_datasource; -CREATE TABLE decimal_datasource ( - d1 DECIMAL(9,0), - d2 DECIMAL(10,0), - d3 DECIMAL(20,10), - d4 DECIMAL(38,37), - d5 DECIMAL(10,5)) -PRODUCED BY DATASOURCE AllTypesDataSource; diff --git a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test index 849d6385c..ff97ceb2c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test +++ b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test @@ -8,13 +8,13 @@ from alltypes_datasource where float_col != 0 and int_col >= 1990 limit 5 ---- RESULTS -1990,true,0,90,1990,19900,2189,1990,1970-01-01 00:00:01.990000000,'NULL' -1991,false,1,91,1991,19910,2190.10009765625,1991,1970-01-01 00:00:01.991000000,'1991' -1992,true,2,92,1992,19920,2191.199951171875,1992,1970-01-01 00:00:01.992000000,'1992' -1993,false,3,93,1993,19930,2192.300048828125,1993,1970-01-01 00:00:01.993000000,'1993' -1994,true,4,94,1994,19940,2193.39990234375,1994,1970-01-01 00:00:01.994000000,'1994' +1990,true,0,90,1990,4294967295,2189,1990,1970-01-01 00:00:01.990000000,'NULL',-999998009,-9999998009,-9999999999.9999998009,-9.9999999999999999999999999999999998009,-99999.98009 +1991,false,1,91,1991,0,2190.10009765625,1991,1970-01-01 00:00:01.991000000,'1991',999998008,9999998008,9999999999.9999998008,9.9999999999999999999999999999999998008,99999.98008 +1992,true,2,92,1992,4294967295,2191.199951171875,1992,1970-01-01 00:00:01.992000000,'1992',-999998007,-9999998007,-9999999999.9999998007,-9.9999999999999999999999999999999998007,-99999.98007 +1993,false,3,93,1993,0,2192.300048828125,1993,1970-01-01 00:00:01.993000000,'1993',999998006,9999998006,9999999999.9999998006,9.9999999999999999999999999999999998006,99999.98006 +1994,true,4,94,1994,4294967295,2193.39990234375,1994,1970-01-01 00:00:01.994000000,'1994',-999998005,-9999998005,-9999999999.9999998005,-9.9999999999999999999999999999999998005,-99999.98005 ---- TYPES -INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, TIMESTAMP, STRING +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, TIMESTAMP, STRING, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL ==== ---- QUERY # Project a subset of the columns @@ -47,14 +47,20 @@ select count(*) from alltypes_datasource BIGINT ==== ---- QUERY -# Test decimal values. The test data source returns very large and very small values. -select * from decimal_datasource limit 5 +select string_col from alltypes_datasource +where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && id EQ 1 && id NE 1' + and id < 1 and id > 1 and id <= 1 and id >= 1 and id = 1 and id != 1 ---- RESULTS --999999999,-9999999999,-9999999999.9999999999,-9.9999999999999999999999999999999999999,-99999.99999 -999999998,9999999998,9999999999.9999999998,9.9999999999999999999999999999999999998,99999.99998 --999999997,-9999999997,-9999999999.9999999997,-9.9999999999999999999999999999999999997,-99999.99997 -999999996,9999999996,9999999999.9999999996,9.9999999999999999999999999999999999996,99999.99996 --999999995,-9999999995,-9999999999.9999999995,-9.9999999999999999999999999999999999995,-99999.99995 +'SUCCESS' ---- TYPES -DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL +STRING +==== +---- QUERY +select string_col from alltypes_datasource +where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && id EQ 1 && id NE 1' + and 1 > id and 1 < id and 1 >= id and 1 <= id and 1 = id and 1 != id +---- RESULTS +'SUCCESS' +---- TYPES +STRING ==== \ No newline at end of file