From 399b184bbcf5a1fb06b5afbebf9062e69d02beed Mon Sep 17 00:00:00 2001 From: Thomas Tauber-Marshall Date: Tue, 16 May 2017 09:37:03 -0700 Subject: [PATCH] IMPALA-5167: Reduce the number of Kudu clients created (FE) Creating Kudu clients is very expensive as each will fetch metadata from the Kudu master, so we should minimize the number of Kudu clients that get created. This patch stores a map from Kudu master addressed to Kudu clients in KuduUtil to be used across the FE and catalog. Another patch has already addressed the BE. Future work will consider providing a way to invalidate the stored Kudu clients in case something goes wrong (IMPALA-5685) This relies on two changes on the Kudu side: one that clears non-covered range entries from the client's cache on table open (d07ecd6ded01201c912d2e336611a6a941f48d98), and one that automatically refreshes auth tokens when they expire (603c1578c78c0377ffafdd9c427ebfd8a206bda3). This patch disables some tests that no longer work as they relied on Kudu metadata loading operations timing out, but since we're reusing clients the metadata is already loaded when the test is run. Testing: - Ran a stress test on a 10 node cluster: scan of a small Kudu table, 1000 concurrent queries, load on the Kudu master was reduced signficantly, from ~50% cpu to ~5%. (with the BE changes included) - Ran the Kudu e2e tests. - Manually ran a test with concurrent INSERTs and 'ALTER TABLE ADD PARTITION' (which is affected by the Kudu side change mentiond above) and verified correctness. Change-Id: I9b0b346f37ee43f7f0eefe34a093eddbbdcf2a5e Reviewed-on: http://gerrit.cloudera.org:8080/6898 Reviewed-by: Thomas Tauber-Marshall Tested-by: Impala Public Jenkins --- .../org/apache/impala/catalog/KuduTable.java | 9 ++++-- .../apache/impala/planner/KuduScanNode.java | 3 +- .../impala/service/KuduCatalogOpExecutor.java | 18 +++++++---- .../java/org/apache/impala/util/KuduUtil.java | 32 ++++++++++++------- .../QueryTest/kudu-timeouts-catalogd.test | 12 ++++--- .../QueryTest/kudu-timeouts-impalad.test | 12 ++++--- 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index a79ffe0c5..cb94503ac 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -204,7 +204,8 @@ public class KuduTable extends Table { setTableStats(msTable_); // Connect to Kudu to retrieve table metadata - try (KuduClient kuduClient = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { kuduTable = kuduClient.openTable(kuduTableName_); } catch (KuduException e) { throw new TableLoadingException(String.format( @@ -389,7 +390,8 @@ public class KuduTable extends Table { resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); - try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); List tablets = kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); @@ -432,7 +434,8 @@ public class KuduTable extends Table { // Build column header String header = "RANGE (" + Joiner.on(',').join(getRangePartitioningColNames()) + ")"; resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift())); - try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); // The Kudu table API will return the partitions in sorted order by value. List partitions = kuduTable.getFormattedRangePartitions( diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 46871297c..57403e40b 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -111,7 +111,8 @@ public class KuduScanNode extends ScanNode { public void init(Analyzer analyzer) throws ImpalaRuntimeException { conjuncts_ = orderConjunctsByCost(conjuncts_); - try (KuduClient client = KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts()); + try { org.apache.kudu.client.KuduTable rpcTable = client.openTable(kuduTable_.getKuduTableName()); validateSchema(rpcTable); diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 9984239c0..cbbfccf8c 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -76,7 +76,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). if (kudu.tableExists(kuduTableName)) { @@ -213,7 +214,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Dropping table '%s' from master '%s'", tableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { Preconditions.checkState(!Strings.isNullOrEmpty(tableName)); // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). @@ -244,7 +246,8 @@ public class KuduCatalogOpExecutor { LOG.trace(String.format("Loading schema of table '%s' from master '%s'", kuduTableName, masterHosts)); } - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { if (!kudu.tableExists(kuduTableName)) { throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " + "'%s'", kuduTableName)); @@ -286,7 +289,8 @@ public class KuduCatalogOpExecutor { Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts)); String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME); Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName)); - try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { + KuduClient kudu = KuduUtil.getKuduClient(masterHosts); + try { kudu.tableExists(kuduTableName); } catch (Exception e) { // TODO: This is misleading when there are other errors, e.g. timeouts. @@ -305,7 +309,8 @@ public class KuduCatalogOpExecutor { alterTableOptions.renameTable(newName); String errMsg = String.format("Error renaming Kudu table " + "%s to %s", tbl.getKuduTableName(), newName); - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts()); + try { client.alterTable(tbl.getKuduTableName(), alterTableOptions); if (!client.isAlterTableDone(newName)) { throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); @@ -475,7 +480,8 @@ public class KuduCatalogOpExecutor { */ public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, String errMsg) throws ImpalaRuntimeException { - try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) { + KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts()); + try { client.alterTable(tbl.getKuduTableName(), ato); if (!client.isAlterTableDone(tbl.getKuduTableName())) { throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out"); diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index be98cf699..4df8005fe 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -20,6 +20,7 @@ package org.apache.impala.util; import static java.lang.String.format; import java.util.List; +import java.util.Map; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.DescriptorTable; @@ -55,6 +56,7 @@ import org.apache.kudu.client.RangePartitionBound; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class KuduUtil { @@ -66,19 +68,27 @@ public class KuduUtil { // be sufficient for the Frontend/Catalog use, and has been tested in stress tests. private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5; + // Maps lists of master addresses to KuduClients, for sharing clients across the FE. + private static Map kuduClients_ = Maps.newHashMap(); + /** - * Creates a KuduClient with the specified Kudu master addresses (as a comma-separated - * list of host:port pairs). The 'admin operation timeout' and the 'operation timeout' - * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is - * used for operations like creating/deleting tables. The 'operation timeout' is used - * when fetching tablet metadata. + * Gets a KuduClient for the specified Kudu master addresses (as a comma-separated + * list of host:port pairs). It will look up and share an existing KuduClient, if + * possible, or it will create a new one to return. + * The 'admin operation timeout' and the 'operation timeout' are set to + * BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is used for + * operations like creating/deleting tables. The 'operation timeout' is used when + * fetching tablet metadata. */ - public static KuduClient createKuduClient(String kuduMasters) { - KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); - b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); - b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); - b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); - return b.build(); + public static KuduClient getKuduClient(String kuduMasters) { + if (!kuduClients_.containsKey(kuduMasters)) { + KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); + b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); + b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); + b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); + kuduClients_.put(kuduMasters, b.build()); + } + return kuduClients_.get(kuduMasters); } /** diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test index d811cfd41..63af18dee 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test @@ -1,3 +1,10 @@ +# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685) +#==== +#---- QUERY +#show create table functional_kudu.alltypestiny +#---- CATCH +#Error opening Kudu table 'impala::functional_kudu.alltypestiny' +#==== ==== ---- QUERY # TODO: improve error messages (here and below) when KUDU-1734 is resolved @@ -6,11 +13,6 @@ describe functional_kudu.alltypes Error opening Kudu table 'impala::functional_kudu.alltypes' ==== ---- QUERY -show create table functional_kudu.alltypes ----- CATCH -Error opening Kudu table 'impala::functional_kudu.alltypes' -==== ----- QUERY create table test_kudu (x int primary key) partition by hash(x) partitions 3 stored as kudu ---- CATCH diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test index ba3341e06..cde4df5df 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test @@ -1,3 +1,10 @@ +# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685) +#==== +#---- QUERY +#show table stats functional_kudu.alltypestiny +#---- CATCH +#Error accessing Kudu for table stats +#==== ==== ---- QUERY # Expected timeout while planning the scan node. @@ -6,8 +13,3 @@ select * from functional_kudu.alltypes ---- CATCH Unable to initialize the Kudu scan node ==== ----- QUERY -show table stats functional_kudu.alltypes ----- CATCH -Error accessing Kudu for table stats -====