IMPALA-13370: Read Puffin stats from metadata.json property if available

When Trino writes Puffin stats for a column, it includes the NDV as a
property (with key "ndv") in the "statistics" section of the
metadata.json file, in addition to the Theta sketch in the Puffin file.
When we are only reading the stats and not writing/updating them, it is
enough to read this property if it is present.

After this change, Impala only opens and reads a Puffin stats file if it
contains stats for at least one column for which the "ndv" property is
not set in the metadata.json file.

Testing:
 - added a test in test_iceberg_with_puffin.py that verifies that the
   Puffin stats file is not read if the the metadata.json file contains
   the NDV property. It uses the newly added stats file with corrupt
   datasketches: 'metadata_ndv_ok_sketches_corrupt.stats'.

Change-Id: I5e92056ce97c4849742db6309562af3b575f647b
Reviewed-on: http://gerrit.cloudera.org:8080/21959
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Daniel Becker
2024-10-02 16:56:33 +02:00
committed by Impala Public Jenkins
parent 0ca42fafec
commit e5919f13f9
7 changed files with 673 additions and 49 deletions

View File

@@ -27,11 +27,12 @@ import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Scanner;
@@ -146,14 +147,18 @@ public class PuffinDataGenerator {
generator.writeFileContainsInvalidFieldId();
generator.writeStatForUnsupportedType();
generator.writeFileWithInvalidAndCorruptSketches();
generator.writeFileMetadataNdvOkFileCorrupt();
generator.writeFileMultipleFieldIds();
}
public PuffinDataGenerator(String metadataJsonTemplatePath, String localOutputDir)
throws java.io.FileNotFoundException, JsonProcessingException {
localOutputDir_ = localOutputDir;
String metadataJsonStr = new Scanner(new File(metadataJsonTemplatePath))
.useDelimiter("\\Z").next();
String metadataJsonStr;
try (Scanner scanner = new Scanner(new File(metadataJsonTemplatePath))) {
metadataJsonStr = scanner.useDelimiter("\\Z").next();
}
snapshotId_ = getSnapshotIdFromMetadataJson(metadataJsonStr);
@@ -398,13 +403,7 @@ public class PuffinDataGenerator {
SEQUENCE_NUMBER, sketches.get(1)));
blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 3, 3));
// Corrupt sketch.
byte[] bytes = {0, 0};
ByteBuffer corruptSketch = ByteBuffer.wrap(bytes);
blobs.add(new Blob(SKETCH_TYPE, Arrays.asList(4), snapshotId_, SEQUENCE_NUMBER,
corruptSketch));
blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 4, 4, false));
blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 5, 5));
FileData fileData = new FileData(
@@ -414,6 +413,37 @@ public class PuffinDataGenerator {
writeFilesForScenario(puffinFiles, "invalidAndCorruptSketches.metadata.json");
}
private void writeFileMetadataNdvOkFileCorrupt() throws IOException {
// The sketches in the Puffin file are corrupt but it shouldn't cause an error since
// we don't actually read it because we read the NDV value from the metadata.json
// file.
List<Blob> blobs = new ArrayList<>();
blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 1, 1, true));
blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 2, 2, true));
FileData corruptFile = new FileData(
"metadata_ndv_ok_sketches_corrupt.stats", snapshotId_, blobs, true);
List<FileData> puffinFiles = new ArrayList<>();
puffinFiles.add(corruptFile);
writeFilesForScenario(puffinFiles,
"metadata_ndv_ok_stats_file_corrupt.metadata.json");
}
private void writeFileMultipleFieldIds() throws IOException {
List<Blob> blobs = new ArrayList<>();
List<Integer> fieldIds = Arrays.asList(1, 2);
blobs.add(createBlobMultipleFieldIds(snapshotId_, SEQUENCE_NUMBER, fieldIds, 1,
true));
FileData file = new FileData(
"multiple_field_ids.stats", snapshotId_, blobs, true);
List<FileData> puffinFiles = new ArrayList<>();
puffinFiles.add(file);
writeFilesForScenario(puffinFiles, "multiple_field_ids.metadata.json");
}
private static ByteBuffer createSketchWithNdv(int ndv) {
UpdateSketch sketch = UpdateSketch.builder().build();
for (int i = 0; i < ndv; i++) sketch.update(i);
@@ -430,8 +460,39 @@ public class PuffinDataGenerator {
private static Blob createBlob(long snapshotId, long sequenceNumber,
int fieldId, int ndv) {
return new Blob(SKETCH_TYPE, Arrays.asList(fieldId), snapshotId, sequenceNumber,
sketches.get(ndv-1));
return createBlob(snapshotId, sequenceNumber, fieldId, ndv, false);
}
private static Blob createBlob(long snapshotId, long sequenceNumber,
int fieldId, int ndv, boolean addNdvProperty) {
return createBlobMultipleFieldIds(snapshotId, sequenceNumber, Arrays.asList(fieldId),
ndv, addNdvProperty);
}
private static Blob createBlobCorruptSketch(long snapshotId, long sequenceNumber,
int fieldId, int ndv, boolean addNdvProperty) {
// Corrupt sketch.
byte[] bytes = {0, 0};
ByteBuffer corruptSketch = ByteBuffer.wrap(bytes);
return createBlobWithProperties(snapshotId, sequenceNumber, Arrays.asList(fieldId),
ndv, corruptSketch, addNdvProperty);
}
private static Blob createBlobMultipleFieldIds(long snapshotId, long sequenceNumber,
List<Integer> fieldIds, int ndv, boolean addNdvProperty) {
return createBlobWithProperties(snapshotId, sequenceNumber, fieldIds, ndv,
sketches.get(ndv-1), addNdvProperty);
}
private static Blob createBlobWithProperties(long snapshotId, long sequenceNumber,
List<Integer> fieldIds, int ndv, ByteBuffer datasketch, boolean addNdvProperty) {
Map<String, String> properties = new HashMap<>();
if (addNdvProperty) {
properties.put("ndv", Integer.toString(ndv));
}
return new Blob(SKETCH_TYPE, fieldIds, snapshotId, sequenceNumber,
datasketch, null, properties);
}
private void writeFilesForScenario(List<FileData> puffinFiles, String statsJsonFile)
@@ -498,6 +559,15 @@ public class PuffinDataGenerator {
for (int fieldId : blob.inputFields()) fieldsList.add(fieldId);
blobNode.set("fields", fieldsList);
// Put properties
if (!blob.properties().isEmpty()) {
ObjectNode properties = mapper_.createObjectNode();
for (Map.Entry<String, String> entry : blob.properties().entrySet()) {
properties.put(entry.getKey(), entry.getValue());
}
blobNode.set("properties", properties);
}
return blobNode;
}