IMPALA-13594: Read Puffin stats also from older snapshots

Before this change, Puffin stats were only read from the current
snapshot. Now we also consider older snapshots, and for each column we
choose the most recent available stats. Note that this means that the
stats for different columns may come from different snapshots.

In case there are both HMS and Puffin stats for a column, the more
recent one will be used - for HMS stats we use the
'impala.lastComputeStatsTime' table property, and for Puffin stats we
use the snapshot timestamp to determine which is more recent.

This commit also renames the startup flag 'disable_reading_puffin_stats'
to 'enable_reading_puffin_stats' and the table property
'impala.iceberg_disable_reading_puffin_stats' to
'impala.iceberg_read_puffin_stats' to make them more intuitive. The
default values are flipped to keep the same behaviour as before.

The documentation of Puffin reading is updated in
docs/topics/impala_iceberg.xml

Testing:
 - updated existing test cases and added new ones in
   test_iceberg_with_puffin.py
 - reorganised the tests in TestIcebergTableWithPuffinStats in
   test_iceberg_with_puffin.py: tests that modify table properties and
   other state that other tests rely on are now run separately to
   provide a clean environment for all tests.

Change-Id: Ia37abe8c9eab6d91946c8f6d3df5fb0889704a39
Reviewed-on: http://gerrit.cloudera.org:8080/22177
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Daniel Becker
2024-11-22 18:33:22 +01:00
committed by Impala Public Jenkins
parent 4a645105f9
commit c5b474d3f5
54 changed files with 2550 additions and 474 deletions

View File

@@ -72,7 +72,8 @@ import com.google.common.collect.ImmutableList;
* template). This should match the schema of the table for which we are generating
* metadata. The statistics in the generated Puffin files do not normally match the actual
* data in the table. The template metadata.json file can be taken from a newly created
* and truncated table (truncation is needed so that a snapshot exists).
* and truncated table (truncation is needed so that a snapshot exists). The table is
* truncated multiple times so that there are multiple snapshots.
*/
public class PuffinDataGenerator {
// The table for which we generate data can be created this way:
@@ -98,6 +99,7 @@ public class PuffinDataGenerator {
private final String localOutputDir_;
private final long snapshotId_;
private final List<Long> oldSnapshotIds_;
private final ObjectMapper mapper_;
private final JsonNode metadataJsonTemplate_;
@@ -130,7 +132,7 @@ public class PuffinDataGenerator {
public static void main(String[] args) throws FileNotFoundException, IOException {
final String metadataJsonTemplatePath =
"./testdata/ice_puffin/00001-2e1ade02-35ae-4a8f-a84f-784d1e0c0790.metadata.json";
"./testdata/ice_puffin/00003-442f9acd-964c-43d7-92b8-e0737a39719a.metadata.json";
final String localOutputDir = "./puffin_files/";
PuffinDataGenerator generator = new PuffinDataGenerator(
metadataJsonTemplatePath, localOutputDir);
@@ -149,6 +151,7 @@ public class PuffinDataGenerator {
generator.writeFileWithInvalidAndCorruptSketches();
generator.writeFileMetadataNdvOkFileCorrupt();
generator.writeFileMultipleFieldIds();
generator.writeSomeBlobsCurrentSomeNotInTwoFiles();
}
public PuffinDataGenerator(String metadataJsonTemplatePath, String localOutputDir)
@@ -167,6 +170,10 @@ public class PuffinDataGenerator {
mapper_ = new ObjectMapper();
metadataJsonTemplate_ = mapper_.readTree(metadataJsonStr);
List<Long> snapshotIds = getSnapshotIds();
snapshotIds.remove(snapshotId_);
oldSnapshotIds_ = snapshotIds;
}
private static long getSnapshotIdFromMetadataJson(String metadataJsonStr) {
@@ -186,6 +193,16 @@ public class PuffinDataGenerator {
return matcher.group(1);
}
private List<Long> getSnapshotIds() {
JsonNode snapshots = metadataJsonTemplate_.findValue("snapshots");
List<JsonNode> snapshotIds = snapshots.findValues("snapshot-id");
List<Long> res = new ArrayList<>();
for (JsonNode node : snapshotIds) {
res.add(node.asLong());
}
return res;
}
private String getPuffinFilePrefix() {
return TABLE_LOCATION_PLACEHOLDER + "/metadata/";
}
@@ -277,13 +294,13 @@ public class PuffinDataGenerator {
blobs1.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 2, 2));
List<Blob> blobs2 = new ArrayList<>();
long notCurrentSnapshotId = snapshotId_ - 1;
long notCurrentSnapshotId = oldSnapshotIds_.get(0);
blobs2.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 3, 3));
blobs2.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 4, 4));
List<FileData> puffinFiles = new ArrayList<>();
puffinFiles.add(new FileData(
"current_snapshot_id.stats", snapshotId_, blobs1, true));
"current_snapshot_id.stats", snapshotId_, blobs1, true));
puffinFiles.add(new FileData(
"not_current_snapshot_id.stats", notCurrentSnapshotId, blobs2, true));
writeFilesForScenario(puffinFiles, "one_file_current_one_not.metadata.json");
@@ -291,12 +308,12 @@ public class PuffinDataGenerator {
// Some blobs are for the current snapshot while some are not.
private void writeNotAllBlobsCurrent() throws IOException {
long notCurrentSnapshotId = snapshotId_ - 1;
long notCurrentSnapshotId = oldSnapshotIds_.get(0);
List<Blob> blobs = new ArrayList<>();
blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 1, 1));
blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 2, 2));
blobs.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 3, 3));
blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 4, 4));
blobs.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 4, 4));
List<FileData> puffinFiles = new ArrayList<>();
puffinFiles.add(new FileData(
@@ -304,6 +321,35 @@ public class PuffinDataGenerator {
writeFilesForScenario(puffinFiles, "not_all_blobs_current.metadata.json");
}
// Two Puffin files both contain ndv values for the current and an old snapshot for the
// same column. The reader should take the snapshot into account, otherwise it will
// produce an invalid result whichever file is read first, i.e. there is no "lucky"
// ordering.
private void writeSomeBlobsCurrentSomeNotInTwoFiles() throws IOException {
long notCurrentSnapshotId = oldSnapshotIds_.get(0);
List<Blob> blobs1 = new ArrayList<>();
blobs1.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 1, 1));
blobs1.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 2, 4));
List<Blob> blobs2 = new ArrayList<>();
blobs2.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 1, 5));
blobs2.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 2, 2));
// Even within a file, the reader has to take the snapshot into account.
blobs2.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 3, 8));
blobs2.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 3, 3));
blobs2.add(createBlob(notCurrentSnapshotId, SEQUENCE_NUMBER, 3, 6));
List<FileData> puffinFiles = new ArrayList<>();
puffinFiles.add(new FileData(
"some_blobs_current_some_not_in_2_files1.stats", snapshotId_, blobs1, true));
puffinFiles.add(new FileData("some_blobs_current_some_not_in_2_files2.stats",
notCurrentSnapshotId, blobs2, true));
writeFilesForScenario(puffinFiles,
"some_blobs_current_some_not_in_2_files.metadata.json");
}
// One of the Puffin files is missing. The other file(s) should be taken into account.
private void writeMissingFile() throws IOException {
List<Blob> blobs1 = new ArrayList<>();