diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index a9beb29a8..fc26a5352 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -167,6 +167,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() { return table_->NewInsert(); } else if (sink_action_ == TSinkAction::UPDATE) { return table_->NewUpdate(); + } else if (sink_action_ == TSinkAction::UPSERT) { + return table_->NewUpsert(); } else { DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: " << sink_action_; @@ -190,17 +192,24 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { unique_ptr write(NewWriteOp()); for (int j = 0; j < output_expr_ctxs_.size(); ++j) { + // For INSERT, output_expr_ctxs_ will contain all columns of the table in order. + // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns that the op + // applies to, i.e. columns explicitly mentioned in the query, and + // referenced_columns is then used to map to actual column positions. int col = kudu_table_sink_.referenced_columns.empty() ? j : kudu_table_sink_.referenced_columns[j]; void* value = output_expr_ctxs_[j]->GetValue(current_row); - // If the value is NULL and no explicit column references are provided, the column - // should be ignored, else it's explicitly set to NULL. + // If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT. + // For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL. if (value == NULL) { - if (!kudu_table_sink_.referenced_columns.empty()) { + if (sink_action_ == TSinkAction::UPDATE || sink_action_ == TSinkAction::UPSERT) { + DCHECK(!kudu_table_sink_.referenced_columns.empty()); KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col), "Could not add Kudu WriteOp."); + } else { + DCHECK(kudu_table_sink_.referenced_columns.empty()); } continue; } diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 83c63b7b9..26f74cf0d 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -33,6 +33,7 @@ enum TDataSinkType { enum TSinkAction { INSERT, UPDATE, + UPSERT, DELETE } diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 836bc2da2..c0865c462 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -262,8 +262,8 @@ terminal KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE, - KW_UPDATE_FN, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, - KW_WITH; + KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, + KW_WHERE, KW_WITH; terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; @@ -361,7 +361,7 @@ nonterminal ArrayList opt_plan_hints; nonterminal TypeDef type_def; nonterminal Type type; nonterminal Expr sign_chain_expr; -nonterminal InsertStmt insert_stmt; +nonterminal InsertStmt insert_stmt, upsert_stmt; nonterminal UpdateStmt update_stmt; nonterminal DeleteStmt delete_stmt; nonterminal ArrayList> update_set_expr_list; @@ -521,6 +521,8 @@ stmt ::= {: RESULT = insert; :} | update_stmt:update {: RESULT = update; :} + | upsert_stmt:upsert + {: RESULT = upsert; :} | delete_stmt:delete {: RESULT = delete; :} | use_stmt:use @@ -659,6 +661,11 @@ explain_stmt ::= update.setIsExplain(); RESULT = update; :} + | KW_EXPLAIN upsert_stmt:upsert + {: + upsert.setIsExplain(); + RESULT = upsert; + :} | KW_EXPLAIN delete_stmt:delete {: delete.setIsExplain(); @@ -673,19 +680,29 @@ insert_stmt ::= opt_with_clause:w KW_INSERT KW_OVERWRITE opt_ignore:ignore opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN partition_clause:list opt_plan_hints:hints opt_query_stmt:query - {: RESULT = new InsertStmt(w, table, true, list, hints, query, col_perm, ignore); :} + {: + RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, col_perm, + ignore); + :} | opt_with_clause:w KW_INSERT KW_OVERWRITE opt_ignore:ignore opt_kw_table table_name:table partition_clause:list opt_plan_hints:hints query_stmt:query - {: RESULT = new InsertStmt(w, table, true, list, hints, query, null, ignore); :} + {: + RESULT = InsertStmt.createInsert(w, table, true, list, hints, query, null, ignore); + :} | opt_with_clause:w KW_INSERT opt_ignore:ignore KW_INTO opt_kw_table table_name:table LPAREN opt_ident_list:col_perm RPAREN partition_clause:list opt_plan_hints:hints opt_query_stmt:query - {: RESULT = new InsertStmt(w, table, false, list, hints, query, col_perm, ignore); :} + {: + RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, col_perm, + ignore); + :} | opt_with_clause:w KW_INSERT opt_ignore:ignore KW_INTO opt_kw_table table_name:table partition_clause:list opt_plan_hints:hints query_stmt:query - {: RESULT = new InsertStmt(w, table, false, list, hints, query, null, ignore); :} + {: + RESULT = InsertStmt.createInsert(w, table, false, list, hints, query, null, ignore); + :} ; // Update statements have an optional WHERE and optional FROM clause. @@ -717,6 +734,17 @@ update_set_expr_list ::= :} ; +// Upsert statements have an optional column permutation clause. If the column permutation +// is present, the query statement clause is optional as well. +upsert_stmt ::= + opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table + LPAREN opt_ident_list:col_perm RPAREN opt_plan_hints:hints opt_query_stmt:query + {: RESULT = InsertStmt.createUpsert(w, table, hints, query, col_perm); :} + | opt_with_clause:w KW_UPSERT KW_INTO opt_kw_table table_name:table + opt_plan_hints:hints query_stmt:query + {: RESULT = InsertStmt.createUpsert(w, table, hints, query, null); :} + ; + // A DELETE statement comes in two main representations, the DELETE keyword with a path // specification as the target table with an optional FROM keyword or the DELETE // keyword followed by a table alias or reference and a full FROM clause. In all cases @@ -3217,6 +3245,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_UPDATE_FN:r {: RESULT = r.toString(); :} + | KW_UPSERT:r + {: RESULT = r.toString(); :} | KW_USE:r {: RESULT = r.toString(); :} | KW_USING:r diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index 2f5f1661a..2eb399f61 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -80,8 +80,8 @@ public class CreateTableAsSelectStmt extends StatementBase { pkvs.add(new PartitionKeyValue(key, null)); } } - insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs, - null, queryStmt, null, false); + insertStmt_ = InsertStmt.createInsert( + null, createStmt.getTblName(), false, pkvs, null, queryStmt, null, false); } public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); } diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 5e457a364..da360b199 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -47,7 +47,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** - * Representation of a single insert statement, including the select statement + * Representation of a single insert or upsert statement, including the select statement * whose results are to be inserted. */ public class InsertStmt extends StatementBase { @@ -70,20 +70,22 @@ public class InsertStmt extends StatementBase { // auto-generate one (for insert into tbl()) during analysis. private final boolean needsGeneratedQueryStatement_; - // The column permutation is specified by writing INSERT INTO tbl(col3, col1, col2...) + // The column permutation is specified by writing: + // (INSERT|UPSERT) INTO tbl(col3, col1, col2...) // // It is a mapping from select-list expr index to (non-partition) output column. If // null, will be set to the default permutation of all non-partition columns in Hive - // order. + // order or all columns for Kudu tables. // // A column is said to be 'mentioned' if it occurs either in the column permutation, or // the PARTITION clause. If columnPermutation is null, all non-partition columns are // considered mentioned. // - // Between them, the columnPermutation and the set of partitionKeyValues must mention to + // Between them, the columnPermutation and the set of partitionKeyValues must mention // every partition column in the target table exactly once. Other columns, if not - // explicitly mentioned, will be assigned NULL values. Partition columns are not - // defaulted to NULL by design, and are not just for NULL-valued partition slots. + // explicitly mentioned, will be assigned NULL values for INSERTs and left unassigned + // for UPSERTs. Partition columns are not defaulted to NULL by design, and are not just + // for NULL-valued partition slots. // // Dynamic partition keys may occur in either the permutation or the PARTITION // clause. Partition columns with static values may only be mentioned in the PARTITION @@ -123,11 +125,20 @@ public class InsertStmt extends StatementBase { private boolean hasClusteredHint_ = false; // Output expressions that produce the final results to write to the target table. May - // include casts, and NullLiterals where an output column isn't explicitly mentioned. - // Set in prepareExpressions(). The i'th expr produces the i'th column of the target + // include casts. Set in prepareExpressions(). + // If this is an INSERT, will contain one Expr for all non-partition columns of the + // target table with NullLiterals where an output column isn't explicitly mentioned. + // The i'th expr produces the i'th column of the target table. + // If this is an UPSERT, will contain one Expr per column mentioned in the query and + // mentionedUpsertColumns_ is used to map between the Exprs and columns in the target // table. private ArrayList resultExprs_ = Lists.newArrayList(); + // Position mapping of exprs in resultExprs_ to columns in the target table - + // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the target table. + // Only used for UPSERT, set in prepareExpressions(). + private final List mentionedUpsertColumns_ = Lists.newArrayList(); + // Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for // non-Kudu tables. private ArrayList primaryKeyExprs_ = Lists.newArrayList(); @@ -138,9 +149,29 @@ public class InsertStmt extends StatementBase { // For tables with primary keys, indicates if duplicate key errors are ignored. private final boolean ignoreDuplicates_; - public InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite, + // True iff this is an UPSERT operation. Only supported for Kudu tables. + private final boolean isUpsert_; + + public static InsertStmt createInsert(WithClause withClause, TableName targetTable, + boolean overwrite, List partitionKeyValues, + List planHints, QueryStmt queryStmt, List columnPermutation, + boolean ignoreDuplicates) { + return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues, + planHints, queryStmt, columnPermutation, ignoreDuplicates, false); + } + + public static InsertStmt createUpsert(WithClause withClause, TableName targetTable, + List planHints, QueryStmt queryStmt, List columnPermutation) { + return new InsertStmt(withClause, targetTable, false, null, planHints, queryStmt, + columnPermutation, false, true); + } + + protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite, List partitionKeyValues, List planHints, - QueryStmt queryStmt, List columnPermutation, boolean ignoreDuplicates) { + QueryStmt queryStmt, List columnPermutation, boolean ignoreDuplicates, + boolean isUpsert) { + Preconditions.checkState(!isUpsert || (!overwrite && !ignoreDuplicates && + partitionKeyValues == null)); withClause_ = withClause; targetTableName_ = targetTable; originalTableName_ = targetTableName_; @@ -152,6 +183,7 @@ public class InsertStmt extends StatementBase { columnPermutation_ = columnPermutation; table_ = null; ignoreDuplicates_ = ignoreDuplicates; + isUpsert_ = isUpsert; } /** @@ -170,6 +202,7 @@ public class InsertStmt extends StatementBase { columnPermutation_ = other.columnPermutation_; table_ = other.table_; ignoreDuplicates_ = other.ignoreDuplicates_; + isUpsert_ = other.isUpsert_; } @Override @@ -184,6 +217,7 @@ public class InsertStmt extends StatementBase { hasNoShuffleHint_ = false; hasClusteredHint_ = false; resultExprs_.clear(); + mentionedUpsertColumns_.clear(); primaryKeyExprs_.clear(); } @@ -223,7 +257,7 @@ public class InsertStmt extends StatementBase { // Set target table and perform table-type specific analysis and auth checking. // Also checks if the target table is missing. - setTargetTable(analyzer); + analyzeTargetTable(analyzer); // Abort analysis if there are any missing tables beyond this point. if (!analyzer.getMissingTbls().isEmpty()) { @@ -251,7 +285,8 @@ public class InsertStmt extends StatementBase { // Finally, prepareExpressions analyzes the expressions themselves, and confirms that // they are type-compatible with the target columns. Where columns are not mentioned // (and by this point, we know that missing columns are not partition columns), - // prepareExpressions assigns them a NULL literal expressions. + // prepareExpressions assigns them a NULL literal expressions, unless this is an + // UPSERT, in which case we don't want to overwrite unmentioned columns with NULL. // An null permutation clause is the same as listing all non-partition columns in // order. @@ -332,13 +367,12 @@ public class InsertStmt extends StatementBase { /** * Sets table_ based on targetTableName_ and performs table-type specific analysis: - * - Partition clause is invalid for unpartitioned Hdfs tables and HBase tables - * - Overwrite is invalid for HBase tables - * - Check INSERT privileges as well as write access to Hdfs paths - * - Cannot insert into a view + * - Cannot (in|up)sert into a view + * - Cannot (in|up)sert into a table with unsupported column types + * - Analysis specific to insert and upsert operations * Adds table_ to the analyzer's descriptor table if analysis succeeds. */ - private void setTargetTable(Analyzer analyzer) throws AnalysisException { + private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { // If the table has not yet been set, load it from the Catalog. This allows for // callers to set a table to analyze that may not actually be created in the Catalog. // One example use case is CREATE TABLE AS SELECT which must run analysis on the @@ -356,21 +390,42 @@ public class InsertStmt extends StatementBase { .allOf(Privilege.INSERT).toRequest()); } - // We do not support inserting into views. + // We do not support (in|up)serting into views. if (table_ instanceof View) { throw new AnalysisException( - String.format("Impala does not support inserting into views: %s", - table_.getFullName())); + String.format("Impala does not support %sing into views: %s", getOpName(), + table_.getFullName())); } + // We do not support (in|up)serting into tables with unsupported column types. for (Column c: table_.getColumns()) { if (!c.getType().isSupported()) { - throw new AnalysisException(String.format("Unable to INSERT into target table " + + throw new AnalysisException(String.format("Unable to %s into target table " + "(%s) because the column '%s' has an unsupported type '%s'.", - targetTableName_, c.getName(), c.getType().toSql())); + getOpName(), targetTableName_, c.getName(), c.getType().toSql())); } } + // Perform operation-specific analysis. + if (isUpsert_) { + if (!(table_ instanceof KuduTable)) { + throw new AnalysisException("UPSERT is only supported for Kudu tables"); + } + } else { + analyzeTableForInsert(analyzer); + } + + // Add target table to descriptor table. + analyzer.getDescTbl().setTargetTable(table_); + } + + /** + * Performs INSERT-specific table analysis: + * - Partition clause is invalid for unpartitioned or HBase tables + * - Check INSERT privileges as well as write access to Hdfs paths + * - Overwrite is invalid for HBase and Kudu tables + */ + private void analyzeTableForInsert(Analyzer analyzer) throws AnalysisException { boolean isHBaseTable = (table_ instanceof HBaseTable); int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols(); @@ -433,48 +488,28 @@ public class InsertStmt extends StatementBase { if (isHBaseTable && overwrite_) { throw new AnalysisException("HBase doesn't have a way to perform INSERT OVERWRITE"); } - - // Add target table to descriptor table. - analyzer.getDescTbl().setTargetTable(table_); } /** - * Checks that the column permutation + select list + static partition exprs + - * dynamic partition exprs collectively cover exactly all columns in the target table - * (not more of fewer). + * Checks that the column permutation + select list + static partition exprs + dynamic + * partition exprs collectively cover exactly all required columns in the target table, + * depending on the table type. */ private void checkColumnCoverage(ArrayList selectExprTargetColumns, Set mentionedColumnNames, int numSelectListExprs, int numStaticPartitionExprs) throws AnalysisException { - boolean isHBaseTable = (table_ instanceof HBaseTable); - int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols(); - // Check that all columns are mentioned by the permutation and partition clauses + // Check that all required cols are mentioned by the permutation and partition clauses if (selectExprTargetColumns.size() + numStaticPartitionExprs != table_.getColumns().size()) { // We've already ruled out too many columns in the permutation and partition clauses // by checking that there are no duplicates and that every column mentioned actually - // exists. So all columns aren't mentioned in the query. If the unmentioned columns - // include partition columns, this is an error. - List missingColumnNames = Lists.newArrayList(); - for (Column column: table_.getColumns()) { - if (!mentionedColumnNames.contains(column.getName())) { - // HBase tables have a single row-key column which is always in position 0. It - // must be mentioned, since it is invalid to set it to NULL (which would - // otherwise happen by default). - if (isHBaseTable && column.getPosition() == 0) { - throw new AnalysisException("Row-key column '" + column.getName() + - "' must be explicitly mentioned in column permutation."); - } - if (column.getPosition() < numClusteringCols) { - missingColumnNames.add(column.getName()); - } - } - } - - if (!missingColumnNames.isEmpty()) { - throw new AnalysisException( - "Not enough partition columns mentioned in query. Missing columns are: " + - Joiner.on(", ").join(missingColumnNames)); + // exists. So all columns aren't mentioned in the query. + if (table_ instanceof KuduTable) { + checkRequiredKuduColumns(mentionedColumnNames); + } else if (table_ instanceof HBaseTable) { + checkRequiredHBaseColumns(mentionedColumnNames); + } else if (table_.getNumClusteringCols() > 0) { + checkRequiredPartitionedColumns(mentionedColumnNames); } } @@ -506,6 +541,66 @@ public class InsertStmt extends StatementBase { } } + /** + * For a Kudu table, checks that all key columns are mentioned. + */ + private void checkRequiredKuduColumns(Set mentionedColumnNames) + throws AnalysisException { + Preconditions.checkState(table_ instanceof KuduTable); + List keyColumns = ((KuduTable) table_).getPrimaryKeyColumnNames(); + List missingKeyColumnNames = Lists.newArrayList(); + for (Column column : table_.getColumns()) { + if (!mentionedColumnNames.contains(column.getName()) + && keyColumns.contains(column.getName())) { + missingKeyColumnNames.add(column.getName()); + } + } + + if (!missingKeyColumnNames.isEmpty()) { + throw new AnalysisException(String.format( + "All primary key columns must be specified for %sing into Kudu tables. " + + "Missing columns are: %s", getOpName(), + Joiner.on(", ").join(missingKeyColumnNames))); + } + } + + /** + * For an HBase table, checks that the row-key column is mentioned. + * HBase tables have a single row-key column which is always in position 0. It + * must be mentioned, since it is invalid to set it to NULL (which would + * otherwise happen by default). + */ + private void checkRequiredHBaseColumns(Set mentionedColumnNames) + throws AnalysisException { + Preconditions.checkState(table_ instanceof HBaseTable); + Column column = table_.getColumns().get(0); + if (!mentionedColumnNames.contains(column.getName())) { + throw new AnalysisException("Row-key column '" + column.getName() + + "' must be explicitly mentioned in column permutation."); + } + } + + /** + * For partitioned tables, checks that all partition columns are mentioned. + */ + private void checkRequiredPartitionedColumns(Set mentionedColumnNames) + throws AnalysisException { + int numClusteringCols = table_.getNumClusteringCols(); + List missingPartitionColumnNames = Lists.newArrayList(); + for (Column column : table_.getColumns()) { + if (!mentionedColumnNames.contains(column.getName()) + && column.getPosition() < numClusteringCols) { + missingPartitionColumnNames.add(column.getName()); + } + } + + if (!missingPartitionColumnNames.isEmpty()) { + throw new AnalysisException( + "Not enough partition columns mentioned in query. Missing columns are: " + + Joiner.on(", ").join(missingPartitionColumnNames)); + } + } + /** * Performs four final parts of the analysis: * 1. Checks type compatibility between all expressions and their targets @@ -515,7 +610,7 @@ public class InsertStmt extends StatementBase { * * 3. Populates resultExprs_ with type-compatible expressions, in Hive column order, * for all expressions in the select-list. Unmentioned columns are assigned NULL literal - * expressions. + * expressions, unless this is an UPSERT. * * 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_. * @@ -581,21 +676,24 @@ public class InsertStmt extends StatementBase { } // Finally, 'undo' the permutation so that the selectListExprs are in Hive column - // order, and add NULL expressions to all missing columns. - for (Column tblColumn: table_.getColumnsInHiveOrder()) { + // order, and add NULL expressions to all missing columns, unless this is an UPSERT. + ArrayList columns = table_.getColumnsInHiveOrder(); + for (int col = 0; col < columns.size(); ++col) { + Column tblColumn = columns.get(col); boolean matchFound = false; for (int i = 0; i < selectListExprs.size(); ++i) { if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) { resultExprs_.add(selectListExprs.get(i)); + if (isUpsert_) mentionedUpsertColumns_.add(col); matchFound = true; break; } } // If no match is found, either the column is a clustering column with a static // value, or it was unmentioned and therefore should have a NULL select-list - // expression. + // expression if this is an INSERT. if (!matchFound) { - if (tblColumn.getPosition() >= numClusteringCols) { + if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) { // Unmentioned non-clustering columns get NULL literals with the appropriate // target type because Parquet cannot handle NULL_TYPE (IMPALA-617). resultExprs_.add(NullLiteral.create(tblColumn.getType())); @@ -666,6 +764,8 @@ public class InsertStmt extends StatementBase { queryStmt_.rewriteExprs(rewriter); } + private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; } + public List getPlanHints() { return planHints_; } public TableName getTargetTableName() { return targetTableName_; } public Table getTargetTable() { return table_; } @@ -687,8 +787,9 @@ public class InsertStmt extends StatementBase { public DataSink createDataSink() { // analyze() must have been called before. Preconditions.checkState(table_ != null); - return TableSink.create(table_, TableSink.Op.INSERT, partitionKeyExprs_, - ImmutableList.of(), overwrite_, ignoreDuplicates_); + Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty()); + return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, + partitionKeyExprs_, mentionedUpsertColumns_, overwrite_, ignoreDuplicates_); } /** @@ -708,7 +809,7 @@ public class InsertStmt extends StatementBase { if (withClause_ != null) strBuilder.append(withClause_.toSql() + " "); - strBuilder.append("INSERT "); + strBuilder.append(getOpName() + " "); if (overwrite_) { strBuilder.append("OVERWRITE "); } else { diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index 3d98acae7..0a99af433 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -58,14 +58,16 @@ public class KuduTableSink extends TableSink { StringBuilder output = new StringBuilder(); output.append(prefix + sinkOp_.toExplainString()); output.append(" KUDU [" + targetTable_.getFullName() + "]\n"); - output.append(detailPrefix); - if (sinkOp_ == Op.INSERT) { - output.append("check unique keys: "); - } else { - output.append("check keys exist: "); + if (sinkOp_ != Op.UPSERT) { + output.append(detailPrefix); + if (sinkOp_ == Op.INSERT) { + output.append("check unique keys: "); + } else { + output.append("check keys exist: "); + } + output.append(ignoreNotFoundOrDuplicate_); + output.append("\n"); } - output.append(ignoreNotFoundOrDuplicate_); - output.append("\n"); if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes())); output.append(PrintUtils.printMemCost(" ", perHostMemCost_)); diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 788bb506f..102b28095 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -51,6 +51,13 @@ public abstract class TableSink extends DataSink { @Override public TSinkAction toThrift() { return TSinkAction.UPDATE; } }, + UPSERT { + @Override + public String toExplainString() { return "UPSERT INTO"; } + + @Override + public TSinkAction toThrift() { return TSinkAction.UPSERT; } + }, DELETE { @Override public String toExplainString() { return "DELETE FROM"; } @@ -105,6 +112,8 @@ public abstract class TableSink extends DataSink { Preconditions.checkState(overwrite == false); // Partition clauses don't make sense for Kudu inserts. Preconditions.checkState(partitionKeyExprs.isEmpty()); + // UPSERT is incompatible with ignoreDuplicates. + Preconditions.checkState(sinkAction != Op.UPSERT || !ignoreDuplicates); return new KuduTableSink(table, sinkAction, referencedColumns, ignoreDuplicates); } else { throw new UnsupportedOperationException( diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index def0be2ab..099ceda9d 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -228,6 +228,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("union", new Integer(SqlParserSymbols.KW_UNION)); keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE)); keywordMap.put("update_fn", new Integer(SqlParserSymbols.KW_UPDATE_FN)); + keywordMap.put("upsert", new Integer(SqlParserSymbols.KW_UPSERT)); keywordMap.put("use", new Integer(SqlParserSymbols.KW_USE)); keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING)); keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES)); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index cbd2d07f8..a6760de58 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -30,6 +30,8 @@ import org.apache.impala.common.AnalysisException; import org.junit.Assert; import org.junit.Test; +import org.apache.impala.common.RuntimeEnv; +import org.apache.impala.testutil.TestUtils; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -2462,6 +2464,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "(1, true, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a', cast(0 as timestamp), 2009, 10)," + "(2, false, 2, 2, NULL, 2, 2.0, 2.0, 'b', 'b', cast(0 as timestamp), 2009, 2)," + "(3, true, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c', cast(0 as timestamp), 2009, 3))"); + // Test multiple aliases. Values() is like union, the column labels are 'x' and 'y'. AnalyzesOk("values((1 as x, 'a' as y), (2 as k, 'b' as j))"); // Test order by, offset and limit. @@ -2820,7 +2823,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest { // Cannot insert into a view. AnalysisError("insert into functional.alltypes_view partition(year, month) " + "select * from functional.alltypes", - "Impala does not support inserting into views: functional.alltypes_view"); + "Impala does not support INSERTing into views: functional.alltypes_view"); // Cannot load into a view. AnalysisError("load data inpath '/test-warehouse/tpch.lineitem/lineitem.tbl' " + "into table functional.alltypes_view", @@ -2978,6 +2981,13 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "'b.int_array_col' correlated with an outer block as well as an " + "uncorrelated one 'functional.alltypestiny':\n" + "SELECT item FROM b.int_array_col, functional.alltypestiny"); + + if (RuntimeEnv.INSTANCE.isKuduSupported()) { + // Key columns missing from permutation + AnalysisError("insert into functional_kudu.testtbl(zip) values(1)", + "All primary key columns must be specified for INSERTing into Kudu tables. " + + "Missing columns are: id"); + } } /** diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java index fb2d63a72..329be5c85 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java @@ -1163,6 +1163,14 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest { "select * from functional.alltypestiny where id = (select 1) " + "union select * from functional.alltypestiny where id = (select 2)"); + // UPSERT + AnalyzesOk("upsert into functional_kudu.testtbl select * from " + + "functional_kudu.testtbl where id in (select id from functional_kudu.testtbl " + + "where zip = 0)"); + AnalyzesOk("upsert into functional_kudu.testtbl select * from " + + "functional_kudu.testtbl union select bigint_col, string_col, int_col from " + + "functional.alltypes"); + // CTAS with correlated subqueries AnalyzesOk("create table functional.test_tbl as select * from " + "functional.alltypes t where t.id in (select id from functional.alltypesagg " + diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java new file mode 100644 index 000000000..118b322be --- /dev/null +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import org.junit.Test; + +import org.apache.impala.testutil.TestUtils; + +public class AnalyzeUpsertStmtTest extends AnalyzerTest { + @Test + public void TestUpsert() { + TestUtils.assumeKuduIsSupported(); + // VALUES clause + AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1)"); + AnalyzesOk("upsert into table functional_kudu.testtbl(id) values(1)"); + AnalyzesOk("upsert into table functional_kudu.testtbl values(1, 'a', 1), " + + "(2, 'b', 2), (3, 'c', 3)"); + // SELECT clause + AnalyzesOk("upsert into functional_kudu.testtbl select bigint_col, string_col, " + + "int_col from functional.alltypes"); + // Permutation lists + AnalyzesOk("upsert into table functional_kudu.testtbl(id) select bigint_col " + + "from functional.alltypes"); + AnalyzesOk("upsert into table functional_kudu.testtbl(id, name) select bigint_col, " + + "string_col from functional.alltypes"); + AnalyzesOk("upsert into table functional_kudu.testtbl(name, zip, id) select " + + "string_col, int_col, bigint_col from functional.alltypes"); + // WITH clause + AnalyzesOk("with t1 as (select 1, 'a', 2) upsert into functional_kudu.testtbl " + + "select * from t1"); + AnalyzesOk("with t1 as (select * from functional.alltypes) upsert into " + + "functional_kudu.testtbl select bigint_col, string_col, int_col from t1"); + // WITH belonging to the select clause + AnalyzesOk("upsert into functional_kudu.testtbl with t1 as (select * from " + + "functional.alltypes) select bigint_col, string_col, int_col from t1"); + AnalyzesOk("upsert into functional_kudu.testtbl(id) with t1 as (select * from " + + "functional.alltypes) select bigint_col from t1"); + // Multiple WITH clauses + AnalyzesOk("with t1 as (select * from functional.alltypestiny) " + + "upsert into functional_kudu.testtbl with t2 as (select * from " + + "functional.alltypessmall) select bigint_col, string_col, int_col from t1"); + // Correlated inline view + AnalyzesOk("upsert into table functional_kudu.testtbl " + + "select a.id, string_col, b.month " + + "from functional.alltypes a, functional.allcomplextypes b, " + + "(select item from b.int_array_col) v1 " + + "where a.id = b.id"); + // Hint + AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * from " + + "functional_kudu.testtbl"); + // Incorrect hint, results in warning + AnalyzesOk("upsert into table functional_kudu.testtbl [badhint] select * from " + + "functional_kudu.testtbl", "INSERT hint not recognized: badhint"); + + // Key columns missing from permutation + AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)", + "All primary key columns must be specified for UPSERTing into Kudu tables. " + + "Missing columns are: id"); + // SELECT clause with wrong number of columns + AnalysisError("upsert into functional_kudu.testtbl select * from functional.alltypes", + "Target table 'functional_kudu.testtbl' has fewer columns (3) than the SELECT " + + "/ VALUES clause returns (13)"); + // VALUES clause with wrong number of columns + AnalysisError("upsert into functional_kudu.testtbl values(1)", "Target table " + + "'functional_kudu.testtbl' has more columns (3) than the SELECT / VALUES " + + "clause returns (1)"); + // Permutation with wrong number of columns + AnalysisError("upsert into functional_kudu.testtbl(id, name, zip) values(1)", + "Column permutation mentions more columns (3) than the SELECT / VALUES " + + "clause returns (1)"); + // Type mismatch + AnalysisError("upsert into functional_kudu.testtbl values(1, 1, 1)", + "Target table 'functional_kudu.testtbl' is incompatible with source " + + "expressions.\nExpression '1' (type: TINYINT) is not compatible with column " + + "'name' (type: STRING)"); + // Permutation with type mismatch + AnalysisError("upsert into functional_kudu.testtbl(zip, id, name) " + + "values('a', 'a', 'a')", "Target table 'functional_kudu.testtbl' is " + + "incompatible with source expressions.\nExpression ''a'' (type: STRING) is not " + + "compatible with column 'zip' (type: INT)"); + // Permutation with invalid column name + AnalysisError("upsert into functional_kudu.testtbl (id, name, invalid) values " + + "(1, 'a', 1)", "Unknown column 'invalid' in column permutation"); + // Permutation with repeated column + AnalysisError("upsert into functional_kudu.testtbl (id, name, zip, id) values " + + "(1, 'a', 1, 1)", "Duplicate column 'id' in column permutation"); + // UPSERT into non-Kudu table + AnalysisError("upsert into functional.alltypes select * from functional.alltypes", + "UPSERT is only supported for Kudu tables"); + // Unknown target DB + AnalysisError("upsert into UNKNOWNDB.testtbl select * " + + "from functional.alltypesnopart", "Database does not exist: UNKNOWNDB"); + // WITH-clause tables cannot be upserted into + AnalysisError("with t1 as (select 'a' x) upsert into t1 values('b' x)", + "Table does not exist: default.t1"); + // Cannot upsert into a view + AnalysisError("upsert into functional.alltypes_view select * from " + + "functional.alltypes", + "Impala does not support UPSERTing into views: functional.alltypes_view"); + // Upsert with uncorrelated inline view + AnalysisError("upsert into table functional_kudu.testtbl " + + "select a.id, a.string_col, b.month " + + "from functional.alltypes a, functional.allcomplextypes b, " + + "(select item from b.int_array_col, functional.alltypestiny) v1 " + + "where a.id = b.id", + "Nested query is illegal because it contains a table reference " + + "'b.int_array_col' correlated with an outer block as well as an " + + "uncorrelated one 'functional.alltypestiny':\n" + + "SELECT item FROM b.int_array_col, functional.alltypestiny"); + // Illegal complex-typed expr + AnalysisError("upsert into functional_kudu.testtbl " + + "select int_struct_col from functional.allcomplextypes", + "Expr 'int_struct_col' in select list returns a " + + "complex type 'STRUCT'.\n" + + "Only scalar types are allowed in the select list."); + } +} diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java index 993f4896e..b9d1595d7 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java @@ -428,6 +428,10 @@ public class AnalyzerTest extends FrontendTestBase { "select id from (select id+2 from functional_hbase.alltypessmall) a", "Could not resolve column/field reference: 'id'"); + // Analysis error from explain upsert + AnalysisError("explain upsert into table functional.alltypes select * from " + + "functional.alltypes", "UPSERT is only supported for Kudu tables"); + // Positive test for explain query AnalyzesOk("explain select * from functional.AllTypes"); @@ -437,6 +441,10 @@ public class AnalyzerTest extends FrontendTestBase { "select id, bool_col, tinyint_col, smallint_col, int_col, int_col, " + "float_col, float_col, date_string_col, string_col, timestamp_col " + "from functional.alltypes"); + + // Positive test for explain upsert + AnalyzesOk("explain upsert into table functional_kudu.testtbl select * from " + + "functional_kudu.testtbl"); } @Test diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index c163d7089..092e14600 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -393,6 +393,12 @@ public class ParserTest { "insert overwrite t(a, b) partition(x, y) %sfoo,bar,baz%s select * from t", prefix, suffix), "foo", "bar", "baz"); + // Test upsert hints. + ParsesOk(String.format("upsert into t %sshuffle%s select * from t", prefix, + suffix)); + ParsesOk(String.format("upsert into t (x, y) %sshuffle%s select * from t", prefix, + suffix)); + // Test TableRef hints. TestTableHints(String.format( "select * from functional.alltypes %sschedule_disk_local%s", prefix, suffix), @@ -761,6 +767,11 @@ public class ParserTest { ParsesOk("insert overwrite table t select a from test union select a from test " + "union select a from test union select a from test"); + // Union in upsert query. + ParsesOk("upsert into table t select a from test union select a from test"); + ParsesOk("upsert into table t select a from test union select a from test " + + "union select a from test union select a from test"); + // No complete select statement on lhs. ParserError("a from test union select a from test"); // No complete select statement on rhs. @@ -777,12 +788,14 @@ public class ParserTest { ParsesOk("select * from (values(1, 'a', abc, 1.0, *)) as t"); ParsesOk("values(1, 'a', abc, 1.0, *) union all values(1, 'a', abc, 1.0, *)"); ParsesOk("insert into t values(1, 'a', abc, 1.0, *)"); + ParsesOk("upsert into t values(1, 'a', abc, 1.0, *)"); // Values stmt with multiple rows. ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi)"); ParsesOk("select * from (values(1, abc), ('x', cde), (2), (efg, fgh, ghi)) as t"); ParsesOk("values(1, abc), ('x', cde), (2), (efg, fgh, ghi) " + "union all values(1, abc), ('x', cde), (2), (efg, fgh, ghi)"); ParsesOk("insert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)"); + ParsesOk("upsert into t values(1, abc), ('x', cde), (2), (efg, fgh, ghi)"); // Test additional parenthesis. ParsesOk("(values(1, abc), ('x', cde), (2), (efg, fgh, ghi))"); ParsesOk("values((1, abc), ('x', cde), (2), (efg, fgh, ghi))"); @@ -838,6 +851,14 @@ public class ParserTest { // With clause before insert statement. ParsesOk("with t as (select 1) insert into x select * from t"); ParsesOk("with t(x) as (select 1) insert into x select * from t"); + // With clause in query statement of upsert statement. + ParsesOk("upsert into x with t as (select * from tab) select * from t"); + ParsesOk("upsert into x with t(x, y) as (select * from tab) select * from t"); + ParsesOk("upsert into x with t as (values(1, 2, 3)) select * from t"); + ParsesOk("upsert into x with t(x, y) as (values(1, 2, 3)) select * from t"); + // With clause before upsert statement. + ParsesOk("with t as (select 1) upsert into x select * from t"); + ParsesOk("with t(x) as (select 1) upsert into x select * from t"); // Test quoted identifier or string literal as table alias. ParsesOk("with `t1` as (select 1 a), 't2' as (select 2 a), \"t3\" as (select 3 a)" + @@ -854,6 +875,10 @@ public class ParserTest { ParsesOk("with t as (select 1) insert into x with t as (select 2) select * from t"); ParsesOk("with t(c1) as (select 1) " + "insert into x with t(c2) as (select 2) select * from t"); + // Multiple with clauses. One before the upsert and one inside the query statement. + ParsesOk("with t as (select 1) upsert into x with t as (select 2) select * from t"); + ParsesOk("with t(c1) as (select 1) " + + "upsert into x with t(c2) as (select 2) select * from t"); // Empty with clause. ParserError("with t as () select 1"); @@ -873,6 +898,9 @@ public class ParserTest { // Insert in with clause is not valid. ParserError("with t as (insert into x select * from tab) select * from t"); ParserError("with t(c1) as (insert into x select * from tab) select * from t"); + // Upsert in with clause is not valid. + ParserError("with t as (upsert into x select * from tab) select * from t"); + ParserError("with t(c1) as (upsert into x select * from tab) select * from t"); // Union operands need to be parenthesized to have their own with clause. ParserError("select * from t union all with t as (select 2) select * from t"); } @@ -966,7 +994,7 @@ public class ParserTest { ParsesOk("select a from `abc\u007fabc`"); // Quoted identifiers can contain keywords. - ParsesOk("select `select`, `insert` from `table` where `where` = 10"); + ParsesOk("select `select`, `insert`, `upsert` from `table` where `where` = 10"); // Quoted identifiers cannot contain "`" ParserError("select a from `abcde`abcde`"); @@ -1609,6 +1637,47 @@ public class ParserTest { ParsesOk("insert ignore into table t values (1,2,3)"); } + @Test + public void TestUpsert() { + for (String optTbl: new String[] {"", "table"}) { + // SELECT clause + ParsesOk(String.format("upsert into %s t select a from src", optTbl)); + // VALUES clause + ParsesOk(String.format("upsert into %s t values (1, 2, 3)", optTbl)); + // Permutation + ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3)", optTbl)); + // Permutation with mismatched select list (should parse fine) + ParsesOk(String.format("upsert into %s t (a,b,c) values(1,2,3,4,5,6)", optTbl)); + // Empty permutation list + ParsesOk(String.format("upsert into %s t () select 1 from a", optTbl)); + // Permutation with optional query statement + ParsesOk(String.format("upsert into %s t () ", optTbl)); + // WITH clause + ParsesOk(String.format("with x as (select a from src where b > 5) upsert into %s " + + "t select * from x", optTbl)); + + // Missing query statement + ParserError(String.format("upsert into %s t", optTbl)); + // Missing 'into'. + ParserError(String.format("upsert %s t select a from src where b > 5", optTbl)); + // Missing target table identifier. + ParserError(String.format("upsert into %s select a from src where b > 5", optTbl)); + // No comma in permutation list + ParserError(String.format("upsert %s t(a b c) select 1 from a", optTbl)); + // Can't use strings as identifiers in permutation list + ParserError(String.format("upsert %s t('a') select 1 from a", optTbl)); + // Expressions not allowed in permutation list + ParserError(String.format("upsert %s t(a=1, b) select 1 from a", optTbl)); + // Upsert doesn't support ignore. + ParserError(String.format("upsert ignore into %s t select a from src", optTbl)); + // Upsert doesn't support partition clauses. + ParserError(String.format( + "upsert into %s t partition (pk1=10) select a from src", optTbl)); + // Upsert doesn't support overwrite. + ParserError(String.format("upsert overwrite %s t select 1 from src", optTbl)); + } + } + @Test public void TestUpdate() { ParsesOk("update t set x = 3 where a < b"); @@ -2512,6 +2581,7 @@ public class ParserTest { ParserError("CREATE VIEW Foo.Bar (x) AS"); // Invalid view definitions. A view definition must be a query statement. ParserError("CREATE VIEW Foo.Bar (x) AS INSERT INTO t select * from t"); + ParserError("CREATE VIEW Foo.Bar (x) AS UPSERT INTO t select * from t"); ParserError("CREATE VIEW Foo.Bar (x) AS CREATE TABLE Wrong (i int)"); ParserError("CREATE VIEW Foo.Bar (x) AS ALTER TABLE Foo COLUMNS (i int, s string)"); ParserError("CREATE VIEW Foo.Bar (x) AS CREATE VIEW Foo.Bar AS SELECT 1"); @@ -2541,6 +2611,7 @@ public class ParserTest { ParserError("ALTER VIEW Foo.Bar AS"); // Invalid view definitions. A view definition must be a query statement. ParserError("ALTER VIEW Foo.Bar AS INSERT INTO t select * from t"); + ParserError("ALTER VIEW Foo.Bar AS UPSERT INTO t select * from t"); ParserError("ALTER VIEW Foo.Bar AS CREATE TABLE Wrong (i int)"); ParserError("ALTER VIEW Foo.Bar AS ALTER TABLE Foo COLUMNS (i int, s string)"); ParserError("ALTER VIEW Foo.Bar AS CREATE VIEW Foo.Bar AS SELECT 1, 2, 3"); @@ -2571,8 +2642,9 @@ public class ParserTest { ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS WITH"); ParserError("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUET AS"); - // INSERT statements are not allowed + // INSERT/UPSERT statements are not allowed ParserError("CREATE TABLE Foo AS INSERT INTO Foo SELECT 1"); + ParserError("CREATE TABLE Foo AS UPSERT INTO Foo SELECT 1"); // Column and partition definitions not allowed ParserError("CREATE TABLE Foo(i int) AS SELECT 1"); @@ -2840,7 +2912,7 @@ public class ParserTest { "Encountered: IDENTIFIER\n" + "Expected: ALTER, COMPUTE, CREATE, DELETE, DESCRIBE, DROP, EXPLAIN, GRANT, " + "INSERT, INVALIDATE, LOAD, REFRESH, REVOKE, SELECT, SET, SHOW, TRUNCATE, " + - "UPDATE, USE, VALUES, WITH\n"); + "UPDATE, UPSERT, USE, VALUES, WITH\n"); // missing select list ParserError("select from t", @@ -2981,6 +3053,7 @@ public class ParserTest { public void TestExplain() { ParsesOk("explain select a from tbl"); ParsesOk("explain insert into tbl select a, b, c, d from tbl"); + ParsesOk("explain upsert into tbl select a, b, c, d from tbl"); ParserError("explain"); // cannot EXPLAIN an explain stmt ParserError("explain explain select a from tbl"); diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index 371e81166..b5cf44694 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -655,6 +655,8 @@ public class ToSqlTest extends FrontendTestBase { "values(1, true, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', cast (0 as timestamp))", "INSERT INTO TABLE functional.alltypessmall PARTITION (year=2009, month=4) " + "VALUES(1, TRUE, 1, 1, 10, 10, 10.0, 10.0, 'a', 'a', CAST(0 AS TIMESTAMP))"); + testToSql("upsert into table functional_kudu.testtbl values(1, 'a', 1)", + "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)"); } /** @@ -903,6 +905,11 @@ public class ToSqlTest extends FrontendTestBase { "WITH t1 AS (SELECT * FROM functional.alltypes) " + "INSERT INTO TABLE functional.alltypes PARTITION (year, month) " + "SELECT * FROM t1"); + // WITH clause in upsert stmt. + testToSql("with t1 as (select * from functional.alltypes) upsert into " + + "functional_kudu.testtbl select bigint_col, string_col, int_col from t1", + "WITH t1 AS (SELECT * FROM functional.alltypes) UPSERT INTO TABLE " + + "functional_kudu.testtbl SELECT bigint_col, string_col, int_col FROM t1"); // Test joins in WITH-clause view. testToSql("with t as (select a.* from functional.alltypes a, " + "functional.alltypes b where a.id = b.id) select * from t", @@ -1015,6 +1022,29 @@ public class ToSqlTest extends FrontendTestBase { "PARTITION (year=2009, month) VALUES(1, 12)"); } + @Test + public void upsertTest() { + // VALUES clause + testToSql("upsert into functional_kudu.testtbl values (1, 'a', 1)", + "UPSERT INTO TABLE functional_kudu.testtbl VALUES(1, 'a', 1)"); + + // SELECT clause + testToSql("upsert into functional_kudu.testtbl select bigint_col, string_col, " + + "int_col from functional.alltypes", "UPSERT INTO TABLE functional_kudu.testtbl " + + "SELECT bigint_col, string_col, int_col FROM functional.alltypes"); + + // WITH clause + testToSql("with x as (select bigint_col, string_col, int_col from " + + "functional.alltypes) upsert into table functional_kudu.testtbl select * from x", + "WITH x AS (SELECT bigint_col, string_col, int_col FROM functional.alltypes) " + + "UPSERT INTO TABLE functional_kudu.testtbl SELECT * FROM x"); + + // Permutation + testToSql("upsert into table functional_kudu.testtbl (zip, id, name) values " + + "(1, 1, 'a')", "UPSERT INTO TABLE functional_kudu.testtbl(zip, id, name) " + + "VALUES(1, 1, 'a')"); + } + @Test public void testAnalyticExprs() { testToSql( diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 6250969d0..91035d175 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -253,6 +253,12 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("kudu"); } + @Test + public void testKuduUpsert() { + Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported()); + runPlannerTestFile("kudu-upsert"); + } + @Test public void testKuduUpdate() { Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported()); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test new file mode 100644 index 000000000..b106b02f6 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test @@ -0,0 +1,92 @@ +# simple upsert with select +upsert into table functional_kudu.testtbl +select bigint_col, string_col, int_col from functional.alltypes +where year=2009 and month=05 +---- PLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +00:SCAN HDFS [functional.alltypes] + partitions=1/24 files=1 size=20.36KB +==== +# simple upsert with values clause +upsert into table functional_kudu.testtbl +values (1, 'a', 1), (2, 'b', 2) +---- PLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +00:UNION + constant-operands=2 +==== +# upsert with 'with' clause and limit +with x as (select string_col, count(*) from functional.alltypes group by string_col) +upsert into table functional_kudu.testtbl +select a.bigint_col, a.string_col, a.int_col from functional.alltypes a, x +where x.string_col = a.string_col +---- PLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +03:HASH JOIN [INNER JOIN] +| hash predicates: a.string_col = string_col +| runtime filters: RF000 <- string_col +| +|--02:AGGREGATE [FINALIZE] +| | group by: string_col +| | +| 01:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> a.string_col +---- DISTRIBUTEDPLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +03:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.string_col = string_col +| runtime filters: RF000 <- string_col +| +|--06:EXCHANGE [BROADCAST] +| | +| 05:AGGREGATE [FINALIZE] +| | group by: string_col +| | +| 04:EXCHANGE [HASH(string_col)] +| | +| 02:AGGREGATE [STREAMING] +| | group by: string_col +| | +| 01:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> a.string_col +==== +# upsert with inline view +upsert into functional_kudu.testtbl +select v.id, v.string_col, v.cnt from ( + select id, string_col, cast(count(*) as int) cnt from + functional.alltypes + group by 1, 2) v +where cnt < 10 +---- PLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| group by: id, string_col +| having: CAST(count(*) AS INT) < 10 +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +upsert into functional_kudu.testtbl /*+ clustered */ +select * from functional_kudu.testtbl +---- PLAN +UPSERT INTO KUDU [functional_kudu.testtbl] +| +01:SORT +| order by: id DESC NULLS LAST +| +00:SCAN KUDU [functional_kudu.testtbl] +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index 278290802..949a25c3c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -261,6 +261,75 @@ delete ignore a from tdata a, tdata b where a.id = 666 row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY +select * from tdata +---- RESULTS +40,'he',0,43,'e',false +120,'she',0,99,'f',true +320,'',2,932,'',false +1,'unknown',1,43,'aaaaaaaaaaaaaaaaaaaa',false +2,'david',1,43,'b',false +3,'todd',1,43,'c',true +---- TYPES +INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN +==== +---- QUERY +upsert into table tdata values (40, 'they', 1, 43, cast('e' as VARCHAR(20)), false), +(1, NULL, 1, 0, cast('a' as VARCHAR(20)), true) +---- RESULTS +==== +---- QUERY +select * from tdata +---- RESULTS +40,'they',1,43,'e',false +120,'she',0,99,'f',true +320,'',2,932,'',false +1,'NULL',1,0,'a',true +2,'david',1,43,'b',false +3,'todd',1,43,'c',true +---- TYPES +INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN +==== +---- QUERY +upsert into table tdata (id, valf) values (2, NULL), (120, 20), (0, 0) +---- RESULTS +==== +---- QUERY +select * from tdata +---- RESULTS +40,'they',1,43,'e',false +120,'she',20,99,'f',true +320,'',2,932,'',false +1,'NULL',1,0,'a',true +2,'david',NULL,43,'b',false +3,'todd',1,43,'c',true +0,'NULL',0,NULL,'NULL',NULL +---- TYPES +INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN +==== +---- QUERY +upsert into table tdata (valb, name, id) +select false as valb, 'he' as name, id from tdata where id < 2 +---- RESULTS +==== +---- QUERY +select * from tdata +---- RESULTS +40,'they',1,43,'e',false +120,'she',20,99,'f',true +320,'',2,932,'',false +1,'he',1,0,'a',false +2,'david',NULL,43,'b',false +3,'todd',1,43,'c',true +0,'he',0,NULL,'NULL',false +---- TYPES +INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN +==== +---- QUERY +upsert into table tdata (id, name) values (null, '') +---- CATCH +Could not add Kudu WriteOp.: Invalid argument: column not nullable: id[int32 NOT NULL] +==== +---- QUERY # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct # if the Kudu columns are of different types. create table impala_3454 (key_1 tinyint, key_2 bigint, PRIMARY KEY (key_1, key_2))