IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count

This changes HdfsTable.getFilesSample() to allocate its intermediate
sampling array based on the number of files in the selected
(post-pruning) partitions, rather than the total number of files in the
table. While the former behavior was correct (the total file count is of
course an upper bound on the pruned file count), it was an unnecessarily
large allocation, which has some downsides around garbage collection.

In addition, this is important for the LocalCatalog implementation of
table sampling, since we do not want to have to load all partition file
lists in order to compute a sample over a pruned subset of partitions.

The original code indicated that this was an optimization to avoid
looping over the partition list an extra time. However, typical
partition lists are relatively small even in the worst case (order of
100k) and looping over 100k in-memory Java objects is not likely to be
the bottleneck in planning any query. This is especially true
considering that we loop over that same list later in the function
anyway, so we probably aren't saving page faults or LLC cache misses
either.

In testing this change I noticed that the existing test for TABLESAMPLE
didn't test TABLESAMPLE when applied in conjunction with a predicate.
I added a new dimension to the test which employs a predicate which
prunes some partitions to ensure that the code works in that case.
I also added coverage of the "100%" sampling parameter as a sanity check
that it returns the same results as a non-sampled query.

Change-Id: I0248d89bcd9dd4ff8b4b85fef282c19e3fe9bdd5
Reviewed-on: http://gerrit.cloudera.org:8080/10936
Reviewed-by: Philip Zeyliger <philip@cloudera.com>
Reviewed-by: Vuk Ercegovac <vercegovac@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Todd Lipcon
2018-07-12 17:07:09 -07:00
committed by Todd Lipcon
parent c845aab86e
commit effe4e6668
2 changed files with 29 additions and 13 deletions

View File

@@ -2158,16 +2158,20 @@ public class HdfsTable extends Table implements FeFsTable {
Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
Preconditions.checkState(minSampleBytes >= 0);
long totalNumFiles = 0;
for (FeFsPartition part : inputParts) {
totalNumFiles += part.getNumFileDescriptors();
}
// Conservative max size for Java arrays. The actual maximum varies
// from JVM version and sometimes between configurations.
final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
if (fileMetadataStats_.numFiles > JVM_MAX_ARRAY_SIZE) {
if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
throw new IllegalStateException(String.format(
"Too many files to generate a table sample. " +
"Table '%s' has %s files, but a maximum of %s files are supported.",
getTableName().toString(), fileMetadataStats_.numFiles, JVM_MAX_ARRAY_SIZE));
"Too many files to generate a table sample of table %s. " +
"Sample requested over %s files, but a maximum of %s files are supported.",
getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
}
int totalNumFiles = (int) fileMetadataStats_.numFiles;
// Ensure a consistent ordering of files for repeatable runs. The files within a
// partition are already ordered based on how they are loaded in the catalog.
@@ -2177,12 +2181,11 @@ public class HdfsTable extends Table implements FeFsTable {
// fileIdxs contains indexes into the file descriptor lists of all inputParts
// parts[i] contains the partition corresponding to fileIdxs[i]
// fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
// Use max size to avoid looping over inputParts for the exact size.
// The purpose of these arrays is to efficiently avoid selecting the same file
// multiple times during the sampling, regardless of the sample percent. We purposely
// avoid generating objects proportional to the number of files.
int[] fileIdxs = new int[totalNumFiles];
FeFsPartition[] parts = new FeFsPartition[totalNumFiles];
int[] fileIdxs = new int[(int)totalNumFiles];
FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
int idx = 0;
long totalBytes = 0;
for (FeFsPartition part: orderedParts) {
@@ -2194,6 +2197,9 @@ public class HdfsTable extends Table implements FeFsTable {
++idx;
}
}
if (idx != totalNumFiles) {
throw new AssertionError("partition file counts changed during iteration");
}
int numFilesRemaining = idx;
double fracPercentBytes = (double) percentBytes / 100;

View File

@@ -32,6 +32,7 @@ class TestTableSample(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestTableSample, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('repeatable', *[True, False]))
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('filtered', *[True, False]))
# Tablesample is only supported on HDFS tables.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format != 'kudu' and
@@ -48,15 +49,21 @@ class TestTableSample(ImpalaTestSuite):
# 2. The results of queries without a repeatable clause could change due to
# changes in data loading that affect the number or size of files.
repeatable = vector.get_value('repeatable')
filtered = vector.get_value('filtered')
where_clause = ""
if filtered:
where_clause = "where month between 1 and 6"
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
result = self.client.execute("select count(*) from alltypes")
result = self.client.execute("select count(*) from alltypes %s" % where_clause)
baseline_count = int(result.data[0])
prev_count = None
for perc in [5, 20, 50]:
for perc in [5, 20, 50, 100]:
rep_sql = ""
if repeatable: rep_sql = " repeatable(1)"
sql_stmt = "select count(*) from alltypes tablesample system(%s)%s" \
% (perc, rep_sql)
sql_stmt = "select count(*) from alltypes tablesample system(%s)%s %s" \
% (perc, rep_sql, where_clause)
handle = self.client.execute_async(sql_stmt)
# IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 sec before
# failing and logging the backtraces of all impalads.
@@ -76,7 +83,10 @@ class TestTableSample(ImpalaTestSuite):
result = self.client.fetch(sql_stmt, handle)
self.client.close_query(handle)
count = int(result.data[0])
assert count < baseline_count
if perc < 100:
assert count < baseline_count
else:
assert count == baseline_count
if prev_count and repeatable:
# May not necessarily be true for non-repeatable samples
assert count > prev_count