mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13361: Add INSERT * and UPDATE SET * syntax for MERGE statement
This change adds INSERT * and UPDATE SET * language elements for WHEN NOT MATCHED and WHEN MATCHED clauses. INSERT * enumerates all source expressions from source table/subquery and analyzes the clause similarly to the regular WHEN NOT MATCHED THEN INSERT case. UPDATE SET * creates assignments for each target table column by enumerating the table columns and assigning source expressions by index. If the target column count and the source expression count mismatches or the types mismatches both clauses report analysis errors. Tests: - parser tests added - analyzer tests added - E2E tests added Change-Id: I31cb771f2355ba4acb0f3b9f570ec44fdececdf3 Reviewed-on: http://gerrit.cloudera.org:8080/22051 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
01b8b45252
commit
d23ba87d46
@@ -1101,13 +1101,18 @@ merge_update_and_delete ::=
|
||||
|
||||
merge_update ::=
|
||||
KW_UPDATE KW_SET update_set_expr_list:values
|
||||
{: RESULT = new MergeUpdate(values); :};
|
||||
{: RESULT = new MergeUpdate(values); :}
|
||||
| KW_UPDATE KW_SET STAR
|
||||
{: RESULT = new MergeUpdateStar(); :};
|
||||
|
||||
merge_insert ::=
|
||||
KW_INSERT LPAREN opt_ident_list:columns RPAREN KW_VALUES LPAREN select_list:operands RPAREN
|
||||
{: RESULT = new MergeInsert(columns, operands); :}
|
||||
| KW_INSERT KW_VALUES LPAREN select_list:operands RPAREN
|
||||
{: RESULT = new MergeInsert(Collections.emptyList(), operands); :};
|
||||
{: RESULT = new MergeInsert(Collections.emptyList(), operands); :}
|
||||
| KW_INSERT STAR
|
||||
{: RESULT = new MergeInsertStar(); :};
|
||||
|
||||
|
||||
merge_delete ::= KW_DELETE
|
||||
{: RESULT = new MergeDelete(); :};
|
||||
|
||||
@@ -94,8 +94,6 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
|
||||
// * Updating partition column value with a non-constant expression
|
||||
// Therefore we are throwing an exception here because we cannot guarantee
|
||||
// that the result will be valid.
|
||||
// TODO(IMPALA-12531): Mention the MERGE statement in the error message,
|
||||
// as the MERGE statement should be able to do the duplicate checking.
|
||||
if (IcebergUtil.isPartitionColumn(c,
|
||||
originalTargetTable_.getDefaultPartitionSpec()) &&
|
||||
(modifyStmt_.fromClause_ != null && modifyStmt_.fromClause_.size() > 1) &&
|
||||
@@ -103,7 +101,8 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
|
||||
throw new AnalysisException(
|
||||
String.format("Cannot UPDATE partitioning column '%s' via UPDATE FROM " +
|
||||
"statement with multiple table refs, and when right-hand side '%s' is " +
|
||||
"non-constant. ", lhsSlotRef.toSql(), rhsExpr.toSql()));
|
||||
"non-constant. Use a MERGE statement instead.", lhsSlotRef.toSql(),
|
||||
rhsExpr.toSql()));
|
||||
}
|
||||
DmlStatementBase.checkLhsOnlyAppearsOnce(colToExprs, c, lhsSlotRef, rhsExpr);
|
||||
colToExprs.put(c.getPosition(), rhsExpr);
|
||||
|
||||
@@ -41,6 +41,7 @@ public abstract class MergeCase extends StatementBase {
|
||||
protected TableName targetTableName_;
|
||||
protected List<Column> targetTableColumns_;
|
||||
protected TableRef targetTableRef_;
|
||||
protected TableRef sourceTableRef_;
|
||||
protected TMergeMatchType matchType_;
|
||||
|
||||
protected MergeCase() {
|
||||
@@ -51,10 +52,11 @@ public abstract class MergeCase extends StatementBase {
|
||||
|
||||
protected MergeCase(List<Expr> resultExprs, List<Expr> filterExprs,
|
||||
TableName targetTableName, List<Column> targetTableColumns,
|
||||
TableRef targetTableRef, TMergeMatchType matchType) {
|
||||
TableRef targetTableRef, TMergeMatchType matchType, TableRef sourceTableRef) {
|
||||
targetTableName_ = targetTableName;
|
||||
targetTableColumns_ = targetTableColumns;
|
||||
targetTableRef_ = targetTableRef;
|
||||
sourceTableRef_ = sourceTableRef;
|
||||
resultExprs_ = resultExprs;
|
||||
filterExprs_ = filterExprs;
|
||||
matchType_ = matchType;
|
||||
@@ -69,6 +71,16 @@ public abstract class MergeCase extends StatementBase {
|
||||
targetTableName_ = parent.getTargetTable().getTableName();
|
||||
targetTableColumns_ = parent.getTargetTable().getColumns();
|
||||
targetTableRef_ = parent.getTargetTableRef();
|
||||
sourceTableRef_ = parent.getSourceTableRef();
|
||||
}
|
||||
|
||||
protected List<String> getSourceColumnLabels() {
|
||||
if (sourceTableRef_ instanceof InlineViewRef) {
|
||||
InlineViewRef source = (InlineViewRef) sourceTableRef_;
|
||||
return source.getColLabels();
|
||||
} else {
|
||||
return sourceTableRef_.getTable().getColumnNames();
|
||||
}
|
||||
}
|
||||
|
||||
public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
|
||||
|
||||
@@ -39,9 +39,9 @@ public class MergeDelete extends MergeCase {
|
||||
|
||||
protected MergeDelete(List<Expr> resultExprs, List<Expr> filterExprs,
|
||||
TableName targetTableName, List<Column> targetTableColumns,
|
||||
TableRef targetTableRef, TMergeMatchType matchType) {
|
||||
TableRef targetTableRef, TMergeMatchType matchType, TableRef sourceTableRef) {
|
||||
super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef,
|
||||
matchType);
|
||||
matchType, sourceTableRef);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -68,6 +68,7 @@ public class MergeDelete extends MergeCase {
|
||||
@Override
|
||||
public MergeDelete clone() {
|
||||
return new MergeDelete(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()),
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, matchType_);
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, matchType_,
|
||||
sourceTableRef_);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,8 +40,8 @@ import org.apache.impala.thrift.TMergeMatchType;
|
||||
*/
|
||||
public class MergeInsert extends MergeCase {
|
||||
// Stores the column names targeted by the merge insert case.
|
||||
private final List<String> columnPermutation_;
|
||||
private final SelectList selectList_;
|
||||
protected List<String> columnPermutation_;
|
||||
protected final SelectList selectList_;
|
||||
|
||||
public MergeInsert(List<String> columnPermutation, SelectList selectList) {
|
||||
columnPermutation_ = columnPermutation;
|
||||
@@ -50,10 +50,10 @@ public class MergeInsert extends MergeCase {
|
||||
|
||||
protected MergeInsert(List<Expr> resultExprs, List<Expr> filterExprs,
|
||||
TableName targetTableName, List<Column> targetTableColumns, TableRef targetTableRef,
|
||||
TMergeMatchType matchType, List<String> columnPermutation,
|
||||
SelectList selectList) {
|
||||
List<String> columnPermutation, SelectList selectList, TMergeMatchType matchType,
|
||||
TableRef sourceTableRef) {
|
||||
super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef,
|
||||
matchType);
|
||||
matchType, sourceTableRef);
|
||||
columnPermutation_ = columnPermutation;
|
||||
selectList_ = selectList;
|
||||
}
|
||||
@@ -88,9 +88,9 @@ public class MergeInsert extends MergeCase {
|
||||
builder.append("VALUES ");
|
||||
StringJoiner selectItemJoiner = new StringJoiner(", ");
|
||||
for (SelectListItem item : selectList_.getItems()) {
|
||||
selectItemJoiner.add(item.toSql(options));
|
||||
selectItemJoiner.add(item.getExpr().toSql(options));
|
||||
}
|
||||
builder.append(String.format("(%s) ", selectItemJoiner));
|
||||
builder.append(String.format("(%s)", selectItemJoiner));
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@@ -100,9 +100,8 @@ public class MergeInsert extends MergeCase {
|
||||
@Override
|
||||
public MergeInsert clone() {
|
||||
return new MergeInsert(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()),
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, matchType_,
|
||||
columnPermutation_,
|
||||
selectList_);
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, columnPermutation_,
|
||||
selectList_, matchType_, sourceTableRef_);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,16 +174,24 @@ public class MergeInsert extends MergeCase {
|
||||
|
||||
if (columnPermutationSize > selectListSize) {
|
||||
throw new AnalysisException(
|
||||
String.format("%s more columns (%d) than the VALUES clause returns (%d)",
|
||||
target, columnPermutationSize, selectListSize));
|
||||
String.format(moreColumnsMessageTemplate(),
|
||||
target, columnPermutationSize, selectListSize, toSql()));
|
||||
}
|
||||
if (columnPermutationSize < selectListSize) {
|
||||
throw new AnalysisException(
|
||||
(String.format("%s fewer columns (%d) than the VALUES clause returns (%d)",
|
||||
target, columnPermutationSize, selectListSize)));
|
||||
(String.format(fewerColumnsMessageTemplate(),
|
||||
target, columnPermutationSize, selectListSize, toSql())));
|
||||
}
|
||||
}
|
||||
|
||||
protected String moreColumnsMessageTemplate() {
|
||||
return "%s more columns (%d) than the VALUES clause returns (%d): %s";
|
||||
}
|
||||
|
||||
protected String fewerColumnsMessageTemplate() {
|
||||
return "%s fewer columns (%d) than the VALUES clause returns (%d): %s";
|
||||
}
|
||||
|
||||
private void duplicateColumnCheck() throws AnalysisException {
|
||||
Set<String> columnNames = new HashSet<>();
|
||||
Set<String> duplicatedColumnNames = new HashSet<>();
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.impala.analysis;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.impala.catalog.Column;
|
||||
import org.apache.impala.common.AnalysisException;
|
||||
|
||||
/**
|
||||
* Special INSERT clause for MERGE statements, it tries to match the source
|
||||
* table/subquery's result expression list against the columns of the target table.
|
||||
*/
|
||||
public class MergeInsertStar extends MergeInsert {
|
||||
|
||||
public MergeInsertStar() {
|
||||
super(Collections.emptyList(), new SelectList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
Preconditions.checkNotNull(targetTableColumns_);
|
||||
Preconditions.checkNotNull(sourceTableRef_);
|
||||
List<String> sourceColumnLabels = getSourceColumnLabels();
|
||||
String sourceAlias = sourceTableRef_.getUniqueAlias();
|
||||
|
||||
columnPermutation_ = targetTableColumns_.stream().map(Column::getName).collect(
|
||||
Collectors.toList());
|
||||
|
||||
for (String column : sourceColumnLabels) {
|
||||
SlotRef slotRef = DmlStatementBase.createSlotRef(analyzer, sourceAlias, column);
|
||||
selectList_.getItems().add(new SelectListItem(slotRef, column));
|
||||
}
|
||||
super.analyze(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String moreColumnsMessageTemplate() {
|
||||
return "%s more columns (%d) than the source expression (%d): %s";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String fewerColumnsMessageTemplate() {
|
||||
return "%s fewer columns (%d) than the source expression (%d): %s";
|
||||
}
|
||||
}
|
||||
@@ -176,4 +176,8 @@ public class MergeStmt extends DmlStatementBase {
|
||||
return cases_.stream().allMatch(
|
||||
mergeCase -> mergeCase.caseType().equals(TMergeCaseType.DELETE));
|
||||
}
|
||||
|
||||
public TableRef getSourceTableRef() {
|
||||
return sourceTableRef_;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ import org.apache.impala.thrift.TMergeCaseType;
|
||||
import org.apache.impala.thrift.TMergeMatchType;
|
||||
|
||||
public class MergeUpdate extends MergeCase {
|
||||
private final List<Pair<SlotRef, Expr>> assignmentExprs_;
|
||||
protected List<Pair<SlotRef, Expr>> assignmentExprs_;
|
||||
|
||||
public MergeUpdate(List<Pair<SlotRef, Expr>> assignmentExprs) {
|
||||
assignmentExprs_ = assignmentExprs;
|
||||
@@ -40,9 +40,10 @@ public class MergeUpdate extends MergeCase {
|
||||
|
||||
protected MergeUpdate(List<Expr> resultExprs, List<Expr> filterExprs,
|
||||
TableName targetTableName, List<Column> targetTableColumns, TableRef targetTableRef,
|
||||
TMergeMatchType matchType, List<Pair<SlotRef, Expr>> assignmentExprs) {
|
||||
List<Pair<SlotRef, Expr>> assignmentExprs, TMergeMatchType matchType,
|
||||
TableRef sourceTableRef) {
|
||||
super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef,
|
||||
matchType);
|
||||
matchType, sourceTableRef);
|
||||
assignmentExprs_ = assignmentExprs;
|
||||
}
|
||||
|
||||
@@ -83,16 +84,16 @@ public class MergeUpdate extends MergeCase {
|
||||
@Override
|
||||
public String toSql(ToSqlOptions options) {
|
||||
String parent = super.toSql(options);
|
||||
StringBuilder builder = new StringBuilder(parent);
|
||||
builder.append("UPDATE SET ");
|
||||
return parent + "UPDATE SET " + listAssignments(options);
|
||||
}
|
||||
|
||||
protected String listAssignments(ToSqlOptions options) {
|
||||
StringJoiner assignmentJoiner = new StringJoiner(", ");
|
||||
for (Pair<SlotRef, Expr> entry : assignmentExprs_) {
|
||||
assignmentJoiner.add(String.format(
|
||||
"%s = %s", entry.first.toSql(options), entry.second.toSql(options)));
|
||||
}
|
||||
builder.append(assignmentJoiner);
|
||||
|
||||
return builder.toString();
|
||||
return assignmentJoiner.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -124,7 +125,7 @@ public class MergeUpdate extends MergeCase {
|
||||
@Override
|
||||
public MergeUpdate clone() {
|
||||
return new MergeUpdate(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()),
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, matchType_,
|
||||
assignmentExprs_);
|
||||
targetTableName_, targetTableColumns_, targetTableRef_, assignmentExprs_,
|
||||
matchType_, sourceTableRef_);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.impala.analysis;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.StringJoiner;
|
||||
import org.apache.impala.common.AnalysisException;
|
||||
import org.apache.impala.common.Pair;
|
||||
|
||||
/**
|
||||
* Special UPDATE clause for MERGE statements, tries to match every source expression
|
||||
* brought by source table or source subquery against the target table's column list.
|
||||
*/
|
||||
public class MergeUpdateStar extends MergeUpdate {
|
||||
|
||||
public MergeUpdateStar() {
|
||||
super(Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
assignmentExprs_ = Lists.newArrayList();
|
||||
Preconditions.checkNotNull(targetTableColumns_);
|
||||
Preconditions.checkNotNull(sourceTableRef_);
|
||||
List<String> sourceColumnLabels = getSourceColumnLabels();
|
||||
String tableName = sourceTableRef_.getUniqueAlias();
|
||||
|
||||
checkSize(sourceColumnLabels);
|
||||
|
||||
for (int i = 0; i < targetTableColumns_.size(); i++) {
|
||||
SlotRef sourceExpr = DmlStatementBase.createSlotRef(analyzer, tableName,
|
||||
sourceColumnLabels.get(i));
|
||||
SlotRef targetColumn = DmlStatementBase.createSlotRef(analyzer,
|
||||
targetTableRef_.getUniqueAlias(),
|
||||
targetTableColumns_.get(i).getName());
|
||||
assignmentExprs_.add(Pair.create(targetColumn, sourceExpr));
|
||||
}
|
||||
super.analyze(analyzer);
|
||||
}
|
||||
|
||||
private void checkSize(List<String> sourceColumnLabels) throws AnalysisException {
|
||||
int sourceColSize = sourceColumnLabels.size();
|
||||
int targetColSize = targetTableColumns_.size();
|
||||
if (targetColSize < sourceColSize) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Target table has fewer columns (%d) than the source expression (%d): %s",
|
||||
targetColSize, sourceColSize, toSql()));
|
||||
}
|
||||
if (targetColSize > sourceColSize) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Target table has more columns (%d) than the source expression (%d): %s",
|
||||
targetColSize, sourceColSize, toSql()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String listAssignments(ToSqlOptions options) {
|
||||
List<String> sourceColumnLabels = getSourceColumnLabels();
|
||||
int minAvailableItems = Math.min(sourceColumnLabels.size(),
|
||||
targetTableColumns_.size());
|
||||
StringJoiner assignmentJoiner = new StringJoiner(", ");
|
||||
for (int i = 0; i < minAvailableItems; ++i) {
|
||||
assignmentJoiner.add(String.format(
|
||||
"%s = %s", targetTableColumns_.get(i).getName(), sourceColumnLabels.get(i)));
|
||||
}
|
||||
return assignmentJoiner.toString();
|
||||
}
|
||||
}
|
||||
@@ -360,6 +360,24 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on target.id = s.id "
|
||||
+ "when not matched by source and target.user = 'something' then delete");
|
||||
// INSERT *
|
||||
AnalyzesOk("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes target "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on target.id = s.id when not matched then insert *");
|
||||
AnalyzesOk("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes target "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on target.id = s.id when not matched by target then insert *");
|
||||
// UPDATE SET *
|
||||
AnalyzesOk("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes target "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on target.id = s.id when matched then update set *");
|
||||
AnalyzesOk("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes target "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on target.id = s.id when not matched by source then update set *");
|
||||
|
||||
// Inline view as target
|
||||
AnalysisError("merge into "
|
||||
@@ -414,8 +432,9 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
|
||||
+ "(select * from functional_parquet.iceberg_non_partitioned) s "
|
||||
+ "on t.id = s.id "
|
||||
+ "when not matched then insert (id, user, action) values(t.user)",
|
||||
"Column permutation mentions more columns (3) than "
|
||||
+ "the VALUES clause returns (1)");
|
||||
"Column permutation mentions more columns (3) than the VALUES clause"
|
||||
+ " returns (1): WHEN NOT MATCHED BY TARGET THEN INSERT (id, user, action)"
|
||||
+ " VALUES (t.`user`)");
|
||||
// More columns in VALUES
|
||||
AnalysisError("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes t "
|
||||
@@ -424,8 +443,9 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
|
||||
+ "on t.id = s.id "
|
||||
+ "when not matched then "
|
||||
+ "insert (id) values(t.id, t.user, t.action)",
|
||||
"Column permutation mentions fewer columns (1) than "
|
||||
+ "the VALUES clause returns (3)");
|
||||
"Column permutation mentions fewer columns (1) than the VALUES clause"
|
||||
+ " returns (3): WHEN NOT MATCHED BY TARGET THEN INSERT (id) "
|
||||
+ "VALUES (t.id, t.`user`, t.action)");
|
||||
AnalysisError("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes t "
|
||||
+ "using "
|
||||
@@ -433,8 +453,9 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
|
||||
+ "on t.id = s.id "
|
||||
+ "when not matched then insert (id, user, action) "
|
||||
+ "values(t.id, t.action, t.user, t.user)",
|
||||
"Column permutation mentions fewer columns (3) than "
|
||||
+ "the VALUES clause returns (4)");
|
||||
"Column permutation mentions fewer columns (3) than the VALUES clause"
|
||||
+ " returns (4): WHEN NOT MATCHED BY TARGET THEN INSERT (id, user, action) "
|
||||
+ "VALUES (t.id, t.action, t.`user`, t.`user`)");
|
||||
// Unresolved reference in WHEN NOT MATCHED clause
|
||||
AnalysisError("merge into "
|
||||
+ "functional_parquet.iceberg_v2_partitioned_position_deletes t "
|
||||
@@ -465,5 +486,47 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
|
||||
+ "on t.id = s.id "
|
||||
+ "when matched and s.id > 2 then delete",
|
||||
"Unable to rewrite MERGE query statement");
|
||||
// UPDATE SET * with different column lists
|
||||
AnalysisError("merge into functional_parquet.iceberg_partition_evolution t "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on t.id = s.id when matched and s.id > 2 then update set *",
|
||||
"Target table has more columns (6) than the source expression (4): "
|
||||
+ "WHEN MATCHED AND s.id > 2 THEN UPDATE SET id = id, int_col = user, "
|
||||
+ "string_col = action, date_string_col = event_time");
|
||||
AnalysisError("merge into functional_parquet.iceberg_timestamp_part t "
|
||||
+ "using functional_parquet.iceberg_partition_evolution s "
|
||||
+ "on t.i = s.id when matched and s.id > 2 then update set *",
|
||||
"Target table has fewer columns (2) than the source expression (6): "
|
||||
+ "WHEN MATCHED AND s.id > 2 THEN UPDATE SET i = id, ts = int_col");
|
||||
// UPDATE SET * with different column types
|
||||
AnalysisError("merge into functional_parquet.iceberg_partition_evolution t "
|
||||
+ "using (select int_col, id, date_string_col, month, string_col, year from "
|
||||
+ "functional_parquet.iceberg_partition_evolution) s "
|
||||
+ "on t.id = s.id when matched and s.id > 2 then update set *",
|
||||
"Target table 'functional_parquet.iceberg_partition_evolution' is incompatible"
|
||||
+ " with source expressions.\nExpression 's.`month`' (type: INT) is not "
|
||||
+ "compatible with column 'date_string_col' (type: STRING)");
|
||||
// INSERT * with different column lists
|
||||
AnalysisError("merge into functional_parquet.iceberg_partition_evolution t "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on t.id = s.id when not matched and s.id > 2 then insert *",
|
||||
"Column permutation mentions more columns (6) than the source "
|
||||
+ "expression (4): WHEN NOT MATCHED BY TARGET AND s.id > 2 "
|
||||
+ "THEN INSERT (id, int_col, string_col, date_string_col, year, month) "
|
||||
+ "VALUES (s.id, s.`user`, s.action, s.event_time)");
|
||||
AnalysisError("merge into functional_parquet.iceberg_timestamp_part t "
|
||||
+ "using functional_parquet.iceberg_non_partitioned s "
|
||||
+ "on t.i = s.id when not matched and s.id > 2 then insert *",
|
||||
"Column permutation mentions fewer columns (2) than the source "
|
||||
+ "expression (4): WHEN NOT MATCHED BY TARGET AND s.id > 2 THEN INSERT "
|
||||
+ "(i, ts) VALUES (s.id, s.`user`, s.action, s.event_time)");
|
||||
// INSERT * with different column types
|
||||
AnalysisError("merge into functional_parquet.iceberg_partition_evolution t "
|
||||
+ "using (select int_col, id, date_string_col, month, string_col, year from "
|
||||
+ "functional_parquet.iceberg_partition_evolution) s "
|
||||
+ "on t.id = s.id when not matched then insert *",
|
||||
"Target table 'functional_parquet.iceberg_partition_evolution' is incompatible"
|
||||
+ " with source expressions.\nExpression 's.`month`' (type: INT) is not "
|
||||
+ "compatible with column 'date_string_col' (type: STRING)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,8 +84,9 @@ public class MergeInsertTest {
|
||||
try {
|
||||
mergeInsert.analyzeColumnPermutation();
|
||||
} catch (AnalysisException e) {
|
||||
assertEquals("Column permutation mentions fewer columns (4) than the"
|
||||
+ " VALUES clause returns (5)",
|
||||
assertEquals("Column permutation mentions fewer columns (4) than the VALUES "
|
||||
+ "clause returns (5): WHEN MATCHED THEN INSERT (a, b, c, d) VALUES"
|
||||
+ " (a, b, c, d, e)",
|
||||
e.getMessage());
|
||||
return;
|
||||
}
|
||||
@@ -104,8 +105,8 @@ public class MergeInsertTest {
|
||||
mergeInsert.analyzeColumnPermutation();
|
||||
} catch (AnalysisException e) {
|
||||
assertEquals(
|
||||
"Column permutation mentions more columns (5) than the VALUES "
|
||||
+ "clause returns (4)",
|
||||
"Column permutation mentions more columns (5) than the VALUES clause returns"
|
||||
+ " (4): WHEN MATCHED THEN INSERT (a, b, c, d, e) VALUES (a, b, c, d)",
|
||||
e.getMessage());
|
||||
return;
|
||||
}
|
||||
@@ -122,7 +123,7 @@ public class MergeInsertTest {
|
||||
MergeInsert mergeInsert = prepareMergeInsert(columnMap, columns, selectItems);
|
||||
try {
|
||||
List<Expr> exprs = mergeInsert.analyzeColumnPermutation();
|
||||
exprs.size();
|
||||
assertEquals(5, exprs.size());
|
||||
} catch (AnalysisException e) {
|
||||
fail();
|
||||
}
|
||||
@@ -158,8 +159,8 @@ public class MergeInsertTest {
|
||||
mergeInsert.analyzeColumnPermutation();
|
||||
} catch (AnalysisException e) {
|
||||
assertEquals(
|
||||
"Column permutation mentions fewer columns (0) than the VALUES "
|
||||
+ "clause returns (5)",
|
||||
"Column permutation mentions fewer columns (0) than the VALUES clause returns"
|
||||
+ " (5): WHEN MATCHED THEN INSERT () VALUES (a, b, c, d, e)",
|
||||
e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1980,7 +1980,7 @@ public class ParserTest extends FrontendTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TestMerge(){
|
||||
public void TestMerge() {
|
||||
ParsesOk("merge into t using s on t.id = s.id "
|
||||
+ "when matched then update set t.a = s.a");
|
||||
ParsesOk("merge into target t using source s on t.id = s.id "
|
||||
@@ -2021,7 +2021,14 @@ public class ParserTest extends FrontendTestBase {
|
||||
+ "then update set b = 12");
|
||||
ParsesOk("merge into t using s on t.id = s.id when not matched by source "
|
||||
+ "and func(b) != func(a) then update set b = 12");
|
||||
|
||||
ParsesOk("merge into t using s on t.id = s.id when not matched "
|
||||
+ "then insert *");
|
||||
ParsesOk("merge into t using s on t.id = s.id when not matched "
|
||||
+ "and i > 10 then insert *");
|
||||
ParsesOk("merge into t using s on t.id = s.id when matched "
|
||||
+ "then update set *");
|
||||
ParsesOk("merge into t using s on t.id = s.id when matched "
|
||||
+ "and i > 10 then update set *");
|
||||
|
||||
ParserError("merge into t using s on t.id = s.id "
|
||||
+ "when matched and t.a > s.b then delete from");
|
||||
@@ -2043,6 +2050,10 @@ public class ParserTest extends FrontendTestBase {
|
||||
+ "when not matched by source then insert values (1, 2, 3)");
|
||||
ParserError("merge into t using s on t.id = s.id "
|
||||
+ "when not matched by source and a <> b then insert values (1, 2, 3)");
|
||||
ParserError("merge into t using s on t.id = s.id when not matched "
|
||||
+ "and i > 10 then update *");
|
||||
ParserError("merge into t using s on t.id = s.id when matched "
|
||||
+ "and i > 10 then insert *");
|
||||
|
||||
// Invalid column permutation
|
||||
ParserError("merge into target t using (select * from source) s "
|
||||
|
||||
@@ -263,3 +263,41 @@ when not matched by source then update set string_col = "sixth", float_col = -68
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
====
|
||||
---- QUERY
|
||||
# Merge into partitioned target table from the source table as
|
||||
# an inline view using an update to reset the remaining rows of the target table based on source
|
||||
merge into target_part target using (select * from source) source on target.id = source.id
|
||||
when matched then update set *
|
||||
when not matched by source then delete
|
||||
---- DML_RESULTS: target_part
|
||||
0,true,0,0,0,2009-01-01,'0'
|
||||
1,false,1,1.100000023841858,10,2009-01-01,'1'
|
||||
2,true,2,2.200000047683716,20,2009-01-01,'2'
|
||||
3,false,3,3.299999952316284,30,2009-01-01,'3'
|
||||
5,false,5,5.5,50,2009-01-01,'5'
|
||||
6,true,6,6.599999904632568,60,2009-01-01,'6'
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
====
|
||||
---- QUERY
|
||||
# Merge into partitioned target table from the source table as
|
||||
# an inline view using an insert to add a new rows by shifting the ids
|
||||
# and an update existing rows with shifted rows
|
||||
merge into target_part target using
|
||||
(select cast(id + 3 as int) id, false bool_col, int_col, float_col, decimal_col, date_col, string_col from source)
|
||||
source on target.id = source.id
|
||||
when not matched and source.id > 7 then insert *
|
||||
when matched and source.id < 4 then update set *
|
||||
---- DML_RESULTS: target_part
|
||||
0,true,0,0,0,2009-01-01,'0'
|
||||
1,false,1,1.100000023841858,10,2009-01-01,'1'
|
||||
2,true,2,2.200000047683716,20,2009-01-01,'2'
|
||||
3,false,0,0,0,2009-01-01,'0'
|
||||
5,false,5,5.5,50,2009-01-01,'5'
|
||||
6,true,6,6.599999904632568,60,2009-01-01,'6'
|
||||
8,false,5,5.5,50,2009-01-01,'5'
|
||||
9,false,6,6.599999904632568,60,2009-01-01,'6'
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
====
|
||||
|
||||
|
||||
56
testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-star.test
vendored
Normal file
56
testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-star.test
vendored
Normal file
@@ -0,0 +1,56 @@
|
||||
====
|
||||
---- QUERY
|
||||
# Table creation and initial data loading
|
||||
create table target(id int, bool_col boolean, int_col int, float_col float,
|
||||
decimal_col decimal(20,0), date_col date, string_col string)
|
||||
stored as iceberg tblproperties("format-version"="2");
|
||||
|
||||
create table source(count int, registered boolean, numbers int, average_count float,
|
||||
min_count decimal(20,0), insertion_date date, description string)
|
||||
stored as iceberg tblproperties("format-version"="2");
|
||||
|
||||
insert into source select id, bool_col, int_col,
|
||||
float_col, cast(bigint_col as decimal(20,0)), to_date(timestamp_col),
|
||||
string_col from functional.alltypes order by id limit 7;
|
||||
|
||||
====
|
||||
---- QUERY
|
||||
# Merge into unpartitioned target table from the source table
|
||||
# using when not matched insert * clause where the column names are different
|
||||
merge into target using source on target.id = source.count
|
||||
when not matched then insert *
|
||||
---- DML_RESULTS: target
|
||||
0,true,0,0.0,0,2009-01-01,'0'
|
||||
1,false,1,1.10000002384,10,2009-01-01,'1'
|
||||
2,true,2,2.20000004768,20,2009-01-01,'2'
|
||||
3,false,3,3.29999995232,30,2009-01-01,'3'
|
||||
4,true,4,4.40000009537,40,2009-01-01,'4'
|
||||
5,false,5,5.5,50,2009-01-01,'5'
|
||||
6,true,6,6.59999990463,60,2009-01-01,'6'
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
---- RUNTIME_PROFILE
|
||||
NumModifiedRows: 7
|
||||
NumDeletedRows: 0
|
||||
====
|
||||
---- QUERY
|
||||
# Merge into unpartitioned target table from the source table in an inline view
|
||||
# using when matched update set * clause where the column names are different
|
||||
merge into target using
|
||||
(select count, false registered, 123 fixed_numbers, 999.0, 78900000 , insertion_date, "something different" from source)source on target.id = source.count
|
||||
when matched then update set *
|
||||
---- DML_RESULTS: target
|
||||
0,false,123,999,78900000,2009-01-01,'something different'
|
||||
1,false,123,999,78900000,2009-01-01,'something different'
|
||||
2,false,123,999,78900000,2009-01-01,'something different'
|
||||
3,false,123,999,78900000,2009-01-01,'something different'
|
||||
4,false,123,999,78900000,2009-01-01,'something different'
|
||||
5,false,123,999,78900000,2009-01-01,'something different'
|
||||
6,false,123,999,78900000,2009-01-01,'something different'
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
---- RUNTIME_PROFILE
|
||||
NumModifiedRows: 7
|
||||
NumDeletedRows: 7
|
||||
====
|
||||
|
||||
@@ -122,4 +122,20 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*
|
||||
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY'
|
||||
---- TYPES
|
||||
STRING,STRING,STRING,STRING
|
||||
====
|
||||
---- QUERY
|
||||
# Merge into unpartitioned target table from the source table
|
||||
# using not matched and insert * clause
|
||||
merge into target using source on target.id = source.id
|
||||
when not matched then insert *
|
||||
---- DML_RESULTS: target
|
||||
0,true,222,0,0,2022-12-12,'0 case 2'
|
||||
1,false,1,1.100000023841858,10,2009-01-01,'1'
|
||||
2,true,222,2.200000047683716,20,2009-01-01,'2 case 2'
|
||||
3,false,3,3.299999952316284,30,2009-01-01,'3'
|
||||
4,true,222,4.400000095367432,40,2009-01-01,'4 case 2'
|
||||
5,NULL,5,NULL,NULL,NULL,'NULL'
|
||||
6,true,6,6.599999904632568,60,2009-01-01,'6'
|
||||
---- TYPES
|
||||
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
|
||||
====
|
||||
@@ -1998,6 +1998,9 @@ class TestIcebergV2Table(IcebergTestSuite):
|
||||
def test_merge_long(self, vector, unique_database):
|
||||
self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database)
|
||||
|
||||
def test_merge_star(self, vector, unique_database):
|
||||
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
|
||||
|
||||
|
||||
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most
|
||||
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
|
||||
|
||||
Reference in New Issue
Block a user