diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 8c978b490..0f8f8fde1 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -288,11 +288,15 @@ public class KuduCatalogOpExecutor { Preconditions.checkState(!Strings.isNullOrEmpty(newName)); AlterTableOptions alterTableOptions = new AlterTableOptions(); alterTableOptions.renameTable(newName); + String errMsg = String.format("Error renaming Kudu table " + + "%s to %s", tbl.getKuduTableName(), newName); try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { client.alterTable(tbl.getKuduTableName(), alterTableOptions); + if (!client.isAlterTableDone(newName)) { + throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); + } } catch (KuduException e) { - throw new ImpalaRuntimeException(String.format("Error renaming Kudu table " + - "%s to %s", tbl.getName(), newName), e); + throw new ImpalaRuntimeException(errMsg, e); } } @@ -316,15 +320,13 @@ public class KuduCatalogOpExecutor { alterTableOptions.dropRangePartition(lowerBound.first, upperBound.first, lowerBound.second, upperBound.second); } - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { - client.alterTable(tbl.getKuduTableName(), alterTableOptions); - } catch (KuduException e) { - if (!params.isIgnore_errors()) { - throw new ImpalaRuntimeException(String.format("Error %s range partition in " + - "table %s", - (type == TRangePartitionOperationType.ADD ? "adding" : "dropping"), - tbl.getName()), e); - } + String errMsg = String.format("Error %s range partition in " + + "table %s", (type == TRangePartitionOperationType.ADD ? "adding" : "dropping"), + tbl.getName()); + try { + alterKuduTable(tbl, alterTableOptions, errMsg); + } catch (ImpalaRuntimeException e) { + if (!params.isIgnore_errors()) throw e; } } @@ -394,12 +396,8 @@ public class KuduCatalogOpExecutor { } } } - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { - client.alterTable(tbl.getKuduTableName(), alterTableOptions); - } catch (KuduException e) { - throw new ImpalaRuntimeException("Error adding columns to Kudu table " + - tbl.getKuduTableName(), e); - } + String errMsg = "Error adding columns to Kudu table " + tbl.getName(); + alterKuduTable(tbl, alterTableOptions, errMsg); } /** @@ -410,12 +408,9 @@ public class KuduCatalogOpExecutor { Preconditions.checkState(!Strings.isNullOrEmpty(colName)); AlterTableOptions alterTableOptions = new AlterTableOptions(); alterTableOptions.dropColumn(colName); - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { - client.alterTable(tbl.getKuduTableName(), alterTableOptions); - } catch (KuduException e) { - throw new ImpalaRuntimeException(String.format("Error dropping column %s from " + - "Kudu table %s", colName, tbl.getName()), e); - } + String errMsg = String.format("Error dropping column %s from " + + "Kudu table %s", colName, tbl.getName()); + alterKuduTable(tbl, alterTableOptions, errMsg); } /** @@ -427,11 +422,25 @@ public class KuduCatalogOpExecutor { Preconditions.checkNotNull(newCol); AlterTableOptions alterTableOptions = new AlterTableOptions(); alterTableOptions.renameColumn(oldName, newCol.getColumnName()); + String errMsg = String.format("Error renaming column %s to %s " + + "for Kudu table %s", oldName, newCol.getColumnName(), tbl.getName()); + alterKuduTable(tbl, alterTableOptions, errMsg); + } + + /** + * Alters a Kudu table based on the specified AlterTableOptions params. Blocks until + * the alter table operation is finished or until the operation timeout is reached. + * Throws an ImpalaRuntimeException if the operation cannot be completed successfully. + */ + public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, String errMsg) + throws ImpalaRuntimeException { try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { - client.alterTable(tbl.getKuduTableName(), alterTableOptions); + client.alterTable(tbl.getKuduTableName(), ato); + if (!client.isAlterTableDone(tbl.getKuduTableName())) { + throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); + } } catch (KuduException e) { - throw new ImpalaRuntimeException(String.format("Error renaming column %s to %s " + - "for Kudu table %s", oldName, newCol.getColumnName(), tbl.getName()), e); + throw new ImpalaRuntimeException(errMsg, e); } } } diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test index 505e9fefd..116a7e943 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test @@ -50,7 +50,6 @@ BIGINT # Create a table with range distribution create table tbl_to_alter (id int primary key, name string null, vali bigint not null) distribute by range (id) (partition 1 < values <= 10) stored as kudu - tblproperties('kudu.table_name'='tbl_to_alter') ---- RESULTS ==== ---- QUERY @@ -296,7 +295,7 @@ alter table tbl_to_alter set tblproperties('kudu.table_name'='kudu_tbl_to_alter' ---- QUERY # Create a new table and try to rename to an existing kudu table create table copy_of_tbl (a int primary key) distribute by hash (a) into 3 buckets - stored as kudu; + stored as kudu tblproperties('kudu.table_name'='copy_of_tbl'); alter table copy_of_tbl set tblproperties('kudu.table_name'='kudu_tbl_to_alter') ---- CATCH ImpalaRuntimeException: Error renaming Kudu table copy_of_tbl to kudu_tbl_to_alter