diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index a79ffe0c5..cb94503ac 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -204,7 +204,8 @@ public class KuduTable extends Table { setTableStats(msTable_); // Connect to Kudu to retrieve table metadata - try (KuduClient kuduClient = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { kuduTable = kuduClient.openTable(kuduTableName_); } catch (KuduException e) { throw new TableLoadingException(String.format( @@ -389,7 +390,8 @@ public class KuduTable extends Table { resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); - try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); List tablets = kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); @@ -432,7 +434,8 @@ public class KuduTable extends Table { // Build column header String header = "RANGE (" + Joiner.on(',').join(getRangePartitioningColNames()) + ")"; resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift())); - try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); // The Kudu table API will return the partitions in sorted order by value. List partitions = kuduTable.getFormattedRangePartitions( diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 46871297c..57403e40b 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -111,7 +111,8 @@ public class KuduScanNode extends ScanNode { public void init(Analyzer analyzer) throws ImpalaRuntimeException { conjuncts_ = orderConjunctsByCost(conjuncts_); - try (KuduClient client = KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable rpcTable = client.openTable(kuduTable_.getKuduTableName()); validateSchema(rpcTable); diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 9984239c0..cbbfccf8c 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -76,7 +76,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). if (kudu.tableExists(kuduTableName)) { @@ -213,7 +214,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Dropping table '%s' from master '%s'", tableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { Preconditions.checkState(!Strings.isNullOrEmpty(tableName)); // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). @@ -244,7 +246,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Loading schema of table '%s' from master '%s'", kuduTableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { if (!kudu.tableExists(kuduTableName)) { throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " + "'%s'", kuduTableName)); @@ -286,7 +289,8 @@ public class KuduCatalogOpExecutor { Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts)); String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME); Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName)); - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { kudu.tableExists(kuduTableName); } catch (Exception e) { // TODO: This is misleading when there are other errors, e.g. timeouts. @@ -305,7 +309,8 @@ public class KuduCatalogOpExecutor { alterTableOptions.renameTable(newName); String errMsg = String.format("Error renaming Kudu table " + "%s to %s", tbl.getKuduTableName(), newName); - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts()); + try { client.alterTable(tbl.getKuduTableName(), alterTableOptions); if (!client.isAlterTableDone(newName)) { throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); @@ -475,7 +480,8 @@ public class KuduCatalogOpExecutor { */ public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, String errMsg) throws ImpalaRuntimeException { - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts()); + try { client.alterTable(tbl.getKuduTableName(), ato); if (!client.isAlterTableDone(tbl.getKuduTableName())) { throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index be98cf699..4df8005fe 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -20,6 +20,7 @@ package org.apache.impala.util; import static java.lang.String.format; import java.util.List; +import java.util.Map; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.DescriptorTable; @@ -55,6 +56,7 @@ import org.apache.kudu.client.RangePartitionBound; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class KuduUtil { @@ -66,19 +68,27 @@ public class KuduUtil { // be sufficient for the Frontend/Catalog use, and has been tested in stress tests. private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5; + // Maps lists of master addresses to KuduClients, for sharing clients across the FE. + private static Map kuduClients_ = Maps.newHashMap(); + /** - * Creates a KuduClient with the specified Kudu master addresses (as a comma-separated - * list of host:port pairs). The 'admin operation timeout' and the 'operation timeout' - * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is - * used for operations like creating/deleting tables. The 'operation timeout' is used - * when fetching tablet metadata. + * Gets a KuduClient for the specified Kudu master addresses (as a comma-separated + * list of host:port pairs). It will look up and share an existing KuduClient, if + * possible, or it will create a new one to return. + * The 'admin operation timeout' and the 'operation timeout' are set to + * BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is used for + * operations like creating/deleting tables. The 'operation timeout' is used when + * fetching tablet metadata. */ - public static KuduClient createKuduClient(String kuduMasters) { - KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); - b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); - b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); - b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); - return b.build(); + public static KuduClient getKuduClient(String kuduMasters) { + if (!kuduClients_.containsKey(kuduMasters)) { + KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); + b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); + b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); + b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); + kuduClients_.put(kuduMasters, b.build()); + } + return kuduClients_.get(kuduMasters); } /** diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test index d811cfd41..63af18dee 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test @@ -1,3 +1,10 @@ +# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685) +#==== +#---- QUERY +#show create table functional_kudu.alltypestiny +#---- CATCH +#Error opening Kudu table 'impala::functional_kudu.alltypestiny' +#==== ==== ---- QUERY # TODO: improve error messages (here and below) when KUDU-1734 is resolved @@ -6,11 +13,6 @@ describe functional_kudu.alltypes Error opening Kudu table 'impala::functional_kudu.alltypes' ==== ---- QUERY -show create table functional_kudu.alltypes ----- CATCH -Error opening Kudu table 'impala::functional_kudu.alltypes' -==== ----- QUERY create table test_kudu (x int primary key) partition by hash(x) partitions 3 stored as kudu ---- CATCH diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test index ba3341e06..cde4df5df 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test @@ -1,3 +1,10 @@ +# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685) +#==== +#---- QUERY +#show table stats functional_kudu.alltypestiny +#---- CATCH +#Error accessing Kudu for table stats +#==== ==== ---- QUERY # Expected timeout while planning the scan node. @@ -6,8 +13,3 @@ select * from functional_kudu.alltypes ---- CATCH Unable to initialize the Kudu scan node ==== ----- QUERY -show table stats functional_kudu.alltypes ----- CATCH -Error accessing Kudu for table stats -====