IMPALA-14261: Take 'impala.computeStatsSnapshotId' into account when deciding between Puffin and HMS stats

Since IMPALA-13609, Impala writes snapshot information for each column
on COMPUTE STATS for Iceberg tables (see there for why it is useful),
but this information has so far been ignored.

After this change, snapshot information is used when deciding which of
HMS and Puffin NDV stats should be used (i.e. which is more recent).

This test also modifies the
IcebergUtil.ComputeStatsSnapshotPropertyConverter class: previously
Iceberg fieldIds were stored as Long, but now they are stored as
Integer, in accordance with the Iceberg spec.

Documentation:
 - updated the docs about Puffin stats in docs/topics/impala_iceberg.xml
Testing:
 - modified existing tests to fit the new decision mechanism

Change-Id: I95a5b152dd504e94dea368a107d412e33f67930c
Reviewed-on: http://gerrit.cloudera.org:8080/23251
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Daniel Becker <daniel.becker@cloudera.com>
This commit is contained in:
Daniel Becker
2025-07-22 16:08:17 +02:00
committed by Daniel Becker
parent a68f716458
commit 19c12e0e06
7 changed files with 70 additions and 63 deletions

View File

@@ -896,10 +896,11 @@ ORDER BY made_current_at;
come from different snapshots.
</p>
<p>
In case there are both HMS and Puffin stats for a column, the more recent one will
be used - for HMS stats we use the 'impala.lastComputeStatsTime' table property, and
for Puffin stats we use the snapshot timestamp to determine which one is more
recent.
In case there are both HMS and Puffin NDV stats for a column, the more recent one
will be used. For HMS stats we use the 'impala.computeStatsSnapshotId' table
property which stores, for each column, the snapshot for which HMS stats were
calculated. We compare this with the snapshot of the Puffin stats to decide which
is more recent.
</p>
<p>
Reading Puffin stats is disabled by default; set the "--enable_reading_puffin_stats"

View File

@@ -574,12 +574,11 @@ public class IcebergTable extends Table implements FeIcebergTable {
if (!BackendConfig.INSTANCE.enableReadingPuffinStats()) return;
if (!isPuffinStatsReadingEnabledForTable()) return;
long hmsStatsTimestampMs = getLastComputeStatsTimeMs();
Set<Integer> fieldIdsWithHmsStats = collectFieldIdsWithNdvStats();
Map<Integer, Long> fieldIdsWithHmsStats = getComputeStatsSnapshotMap(msTable_);
Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinNdvs =
PuffinStatsLoader.loadPuffinStats(icebergApiTable_, getFullName(),
hmsStatsTimestampMs, fieldIdsWithHmsStats);
fieldIdsWithHmsStats);
for (Map.Entry<Integer, PuffinStatsLoader.PuffinStatsRecord> entry
: puffinNdvs.entrySet()) {
int fieldId = entry.getKey();
@@ -600,15 +599,10 @@ public class IcebergTable extends Table implements FeIcebergTable {
// mode: in local catalog mode, the catalog sends the stats in HMS objects, so
// NDVs for unsupported types would be lost.
if (ColumnStats.supportsNdv(colType)) {
// Only use the value from Puffin if it is more recent than the HMS stat value
// or if the latter doesn't exist.
if (!col.getStats().hasNumDistinctValues()
|| snapshot.timestampMillis() >= hmsStatsTimestampMs) {
col.getStats().setNumDistinctValues(ndv);
}
}
}
}
if (!puffinNdvs.isEmpty()) {
catalogTimeline.markEvent("Loaded Puffin stats");
@@ -892,18 +886,23 @@ public class IcebergTable extends Table implements FeIcebergTable {
public void updateComputeStatsIcebergSnapshotsProperty(
org.apache.hadoop.hive.metastore.api.Table msTbl,
TAlterTableUpdateStatsParams params) {
String snapshotIds = msTbl.getParameters().get(
IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS);
TreeMap<Integer, Long> computeStatsMap = getComputeStatsSnapshotMap(msTbl);
TreeMap<Long, Long> computeStatsMap =
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(snapshotIds);
updateComputeStatsIcebergSnapshotMap(computeStatsMap, params);
String property =
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(computeStatsMap);
msTbl.putToParameters(IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, property);
}
private void updateComputeStatsIcebergSnapshotMap(Map<Long, Long> map,
private TreeMap<Integer, Long> getComputeStatsSnapshotMap(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
String snapshotIds = msTbl.getParameters().get(
IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS);
return IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(snapshotIds);
}
private void updateComputeStatsIcebergSnapshotMap(Map<Integer, Long> map,
TAlterTableUpdateStatsParams params) {
// This will be -1 if there is no snapshot yet.
Preconditions.checkState(params.isSetSnapshot_id());
@@ -912,7 +911,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
// Insert/update columns for which we have computed stats.
if (params.isSetColumn_stats()) {
for (String colName : params.column_stats.keySet()) {
long fieldId = getIcebergApiTable().schema().findField(colName).fieldId();
int fieldId = getIcebergApiTable().schema().findField(colName).fieldId();
map.put(fieldId, currentSnapshotId);
}
}

View File

@@ -66,10 +66,9 @@ public class PuffinStatsLoader {
private final Table iceApiTable_;
private final String tblName_;
// The timestamp of the HMS stats and columns that have HMS stats. Puffin NDVs will only
// be loaded if they are more recent than existing HMS stats.
private final long hmsStatsTimestampMs_;
private final Set<Integer> fieldIdsWithHmsStats_;
// Map of fields that have HMS stats - the values are the snapshot IDs. Puffin NDVs will
// only be loaded if they are more recent than existing HMS stats.
private final Map<Integer, Long> fieldIdsWithHmsStats_;
// The blobs to read from Puffin files. Initialised in 'initBlobsToRead()'. The keys of
// the inner map are fieldIds and its values are snapshotIds - together they identify
@@ -94,11 +93,10 @@ public class PuffinStatsLoader {
}
}
private PuffinStatsLoader(Table iceApiTable, String tblName, long hmsStatsTimestampMs,
Set<Integer> fieldIdsWithHmsStats) {
private PuffinStatsLoader(Table iceApiTable, String tblName,
Map<Integer, Long> fieldIdsWithHmsStats) {
iceApiTable_ = iceApiTable;
tblName_ = tblName;
hmsStatsTimestampMs_ = hmsStatsTimestampMs;
fieldIdsWithHmsStats_ = fieldIdsWithHmsStats;
}
@@ -107,15 +105,16 @@ public class PuffinStatsLoader {
* column, the most recent available NDV value is chosen.
*
* Stats for columns in 'fieldIdsWithHmsStats' are only loaded if they belong to a
* snapshot that is more recent than 'hmsStatsTimestampMs'.
* snapshot that is more recent than the corresponding snapshot id in the map, i.e. if
* the Puffin NDV is more recent than the HMS one.
*
* If it is detected that there are multiple blobs for a given fieldId-snapshotId pair,
* a warning log is issued, but no attempt is made to detect all such cases.
*/
public static Map<Integer, PuffinStatsRecord> loadPuffinStats(Table iceApiTable,
String tblName, long hmsStatsTimestampMs, Set<Integer> fieldIdsWithHmsStats) {
String tblName, Map<Integer, Long> fieldIdsWithHmsStats) {
PuffinStatsLoader loader = new PuffinStatsLoader(iceApiTable, tblName,
hmsStatsTimestampMs, fieldIdsWithHmsStats);
fieldIdsWithHmsStats);
return loader.loadPuffinStatsImpl();
}
@@ -194,8 +193,9 @@ public class PuffinStatsLoader {
// Returns true if there are HMS stats for the column referenced by 'fieldId' that are
// at least as recent as the snapshot referenced by 'snapshotId'
private boolean hmsHasMoreRecentStats(int fieldId, long snapshotId) {
long snapshotTs = iceApiTable_.snapshot(snapshotId).timestampMillis();
return hmsStatsTimestampMs_ >= snapshotTs && fieldIdsWithHmsStats_.contains(fieldId);
Long hmsSnapshot = fieldIdsWithHmsStats_.get(fieldId);
if (hmsSnapshot == null) return false;
return isMoreRecentSnapshot(hmsSnapshot, snapshotId);
}
// Checks the metadata of 'statsFile' and loads NDV values where available.

View File

@@ -325,7 +325,7 @@ public class IcebergMetaProvider implements MetaProvider {
org.apache.iceberg.Table iceTbl = tblImpl.iceApiTbl_;
Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinStats =
PuffinStatsLoader.loadPuffinStats(iceTbl, tblImpl.fullName(),
-1, Collections.emptySet());
Collections.emptyMap());
List<ColumnStatisticsObj> res = new ArrayList<>();
for (String colName : colNames) {

View File

@@ -1380,9 +1380,18 @@ public class IcebergUtil {
return "";
}
/**
* This is a helper class for converting the value of the
* 'impala.computeStatsSnapshotIds' table property to a Map representation and back.
*
* In the case of Iceberg tables, for each column with HMS stats, this property stores
* the snapshot id for which stats have been computed. It is a comma-separated list of
* values of the form "fieldIdRangeStart[-fieldIdRangeEndIncl]:snapshotId". The fieldId
* part may be a single value or a contiguous, inclusive range.
*/
public static class ComputeStatsSnapshotPropertyConverter {
public static TreeMap<Long, Long> stringToMap(String snapshotIds) {
TreeMap<Long, Long> res = new TreeMap<Long, Long>();
public static TreeMap<Integer, Long> stringToMap(String snapshotIds) {
TreeMap<Integer, Long> res = new TreeMap<Integer, Long>();
if (snapshotIds == null) return res;
String[] columns = snapshotIds.split(",");
@@ -1399,11 +1408,11 @@ public class IcebergUtil {
String[] colRange = colStr.split("-");
if (colRange.length == 1) {
res.put(Long.parseLong(colRange[0]), snapshotId);
res.put(Integer.parseInt(colRange[0]), snapshotId);
} else if (colRange.length == 2) {
long rangeStart = Long.parseLong(colRange[0]);
long rangeEnd = Long.parseLong(colRange[1]);
for (long colId = rangeStart; colId <= rangeEnd; ++colId) {
int rangeStart = Integer.parseInt(colRange[0]);
int rangeEnd = Integer.parseInt(colRange[1]);
for (int colId = rangeStart; colId <= rangeEnd; ++colId) {
res.put(colId, snapshotId);
}
} else {
@@ -1417,11 +1426,11 @@ public class IcebergUtil {
return res;
}
public static String mapToString(TreeMap<Long, Long> colAndSnapshotIds) {
public static String mapToString(TreeMap<Integer, Long> colAndSnapshotIds) {
ConversionState state = new ConversionState();
for (Map.Entry<Long, Long> entry : colAndSnapshotIds.entrySet()) {
long col = entry.getKey();
for (Map.Entry<Integer, Long> entry : colAndSnapshotIds.entrySet()) {
int col = entry.getKey();
long snapshotId = entry.getValue();
if (state.canContinueRange(col, snapshotId)) {
@@ -1436,7 +1445,7 @@ public class IcebergUtil {
return state.getResult();
}
private static TreeMap<Long, Long> logAndReturnEmptyMap(String snapshotIdsStr) {
private static TreeMap<Integer, Long> logAndReturnEmptyMap(String snapshotIdsStr) {
LOG.warn(String.format(
"Invalid value for table property '%s': \"%s\". Ignoring it.",
IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, snapshotIdsStr));
@@ -1445,13 +1454,13 @@ public class IcebergUtil {
private static class ConversionState {
// Intentionally not using -1 as that is the snapshot ID of empty tables.
private static final long INVALID = -10;
private long colRangeStart_ = INVALID;
private long lastCol_ = INVALID;
private static final int INVALID = -10;
private int colRangeStart_ = INVALID;
private int lastCol_ = INVALID;
private long lastSnapshotId_ = INVALID;
private final StringBuilder sb_ = new StringBuilder();
private boolean canContinueRange(long col, long snapshotId) {
private boolean canContinueRange(int col, long snapshotId) {
return lastCol_ != INVALID && lastSnapshotId_ != INVALID
&& lastSnapshotId_ == snapshotId && (lastCol_ + 1 == col);
}
@@ -1468,7 +1477,7 @@ public class IcebergUtil {
}
}
private void initNewRange(long col, long snapshotId) {
private void initNewRange(int col, long snapshotId) {
if (colRangeStart_ != INVALID) sb_.append(",");
colRangeStart_ = col;
lastCol_ = col;

View File

@@ -477,18 +477,18 @@ public class IcebergUtilTest {
// Fill a disjunct range
String disjunctRanges = "1-4:1234,6-8:1234,10:2345";
TreeMap<Long, Long> disjunctRangesMap =
TreeMap<Integer, Long> disjunctRangesMap =
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(disjunctRanges);
disjunctRangesMap.put(5L, 1234L);
disjunctRangesMap.put(5, 1234L);
String completedRange = "1-8:1234,10:2345";
assertEquals(completedRange,
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(disjunctRangesMap));
// Split a range
String rangeToSplit = "1-8:1234,10:2345";
TreeMap<Long, Long> rangeToSplitMap =
TreeMap<Integer, Long> rangeToSplitMap =
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(rangeToSplit);
rangeToSplitMap.put(4L, 2345L);
rangeToSplitMap.put(4, 2345L);
String splitRanges = "1-3:1234,4:2345,5-8:1234,10:2345";
assertEquals(splitRanges,
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(rangeToSplitMap));
@@ -514,7 +514,7 @@ public class IcebergUtilTest {
}
private String computeStatsPropertyRoundTrip(String property) {
TreeMap<Long, Long> map =
TreeMap<Integer, Long> map =
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(property);
return IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(map);
}

View File

@@ -117,12 +117,9 @@ class TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
self._change_metadata_json_file(tbl_info, "not_all_blobs_current.metadata.json")
# Get the latest snapshot's timestamp, set an HMS stat and set the last compute stats
# property to one second before the latest snapshot.
latest_snapshot_timestamp = self._get_latest_snapshot_timestamp(
tbl_info.full_tbl_name)
timestamp_before_snapshot = latest_snapshot_timestamp - 1
prev_snapshot_id = self._get_snapshot_ids(tbl_info.full_tbl_name)[1]
# Set HMS stats for the first and third column, for the previous snapshot.
# There are Puffin stats from the latest snapshot for the first two columns, and older
# Puffin stats for the next two columns - the HMS stats are more recent than these
# older Puffin stats.
@@ -131,8 +128,9 @@ class TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
tbl_info.full_tbl_name),
"alter table {} set column stats bigint_col('numDVs'='300')".format(
tbl_info.full_tbl_name),
"alter table {} set tblproperties('impala.lastComputeStatsTime'='{}')".format(
tbl_info.full_tbl_name, timestamp_before_snapshot)
"alter table {} set tblproperties('impala.computeStatsSnapshotIds'= \
'1:{prev_snapshot_id},3:{prev_snapshot_id}')".format(tbl_info.full_tbl_name,
prev_snapshot_id=prev_snapshot_id)
]
for stmt in stmts:
self.execute_query(stmt)
@@ -175,12 +173,12 @@ class TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
return self.TblInfo(tbl_name, vector, tbl_loc, tbl_properties)
def _get_latest_snapshot_timestamp(self, tbl_name):
query_template = "select unix_timestamp(max(committed_at)) \
latest_snapshot from {}.snapshots"
def _get_snapshot_ids(self, tbl_name):
""" Returns the list of the table's snapshot ids, starting with the current one."""
query_template = "select snapshot_id from {}.snapshots order by committed_at desc"
query = query_template.format(tbl_name)
query_res = self.execute_query(query)
return int(query_res.data[0])
return query_res.data
def _copy_files_to_puffin_tbl(self, tbl_name, tbl_loc, uuid):
version_info = sys.version_info