IMPALA-6433: Add read support for PageHeaderV2

Parquet v2 means several changes in Parquet files compared to v1:

1. file version = 2 instead of 1

c185faf0c4/src/main/thrift/parquet.thrift (L1016)
Before this patch Impala rejected Parquet files with version!=1.

2. possible use of DataPageHeaderV2 instead DataPageHeader

c185faf0c4/src/main/thrift/parquet.thrift (L561)

The main differences compared to V1 DataPageHeader:
a. rep/def levels are not compressed, so the compressed part contains
   only the actual encoded values
b. rep/def levels must be RLE encoded (Impala only supports RLE encoded
   levels even for V1 pages)
c. compression can be turned on/off per page (member is_compressed)
d. number of nulls (member num_nulls) is required - in v1 it was
   included in statistics which is optional
e. number of rows is required (member num_rows) which can help with
   matching collection items with the top level collection

The patch adds support for understanding v2 data pages but does not
implement some potential optimizations:

a. would allow an optimization for queries that need only the nullness
of a column but not the actual value: as the values are not needed the
decompression of the page data can be skipped. This optimization is not
implemented - currently Impala materializes both the null bit and the
value for all columns regardless of whether the value is actually
needed.

d. could be also used for optimizations / additional validity checks
but it is not used currently

e. could make skipping rows easier but is not used, as the existing
scanner has to be able to skip rows efficiently also in v1 files so
it can't rely on num_rows

3. possible use of new encodings (e.g. DELTA_BINARY_PACKED)

No new encoding is added - when an unsupported encoding is encountered
Impala returns an error.

parquet-mr uses new encodings (DELTA_BINARY_PACKED, DELTA_BYTE_ARRAY)
for most types if the file version is 2, so with this patch Impala is
not yet able to read all v2 Parquet tables written by Hive.

4. Encoding PLAIN_DICTIONARY is deprecated and RLE_DICTIONARY is used
instead. The semantics of the two encodings are exactly the same.

Additional changes:
Some responsibilites are moved from ParquetColumnReader to
ParquetColumnChunkReader:
- ParquetColumnChunkReader decodes rep/def level sizes to hide v1/v2
  differences (see 2.a.)
- ParquetColumnChunkReader skips empty data pages in
  ReadNextDataPageHeader
- the state machine of ParquetColumnChunkReader is simplified by
  separating data page header reading / reading rest of the page

Testing:
- added 4 v2 Parquet test tables (written by Hive) to cover
  compressed / uncompressed and scalar/complex cases
- added EE and fuzz tests for the test tables above
- manual tested v2 Parquet files written by pyarrow
- ran core tests

Note that no test is added where some pages are compressed while
some are not. It would be tricky to create such files with existing
writers. The code should handle this case and it is very unlikely that
files like this will be encountered.

Change-Id: I282962a6e4611e2b662c04a81592af83ecaf08ca
Reviewed-on: http://gerrit.cloudera.org:8080/19793
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Csaba Ringhofer
2023-04-17 21:35:30 +02:00
committed by Impala Public Jenkins
parent 3608ab25f1
commit 4261225f65
14 changed files with 566 additions and 187 deletions

View File

@@ -1280,7 +1280,7 @@ void HdfsParquetTableWriter::ConfigureForIceberg(int num_cols) {
Status HdfsParquetTableWriter::Init() {
// Initialize file metadata
file_metadata_.version = PARQUET_CURRENT_VERSION;
file_metadata_.version = PARQUET_WRITER_VERSION;
stringstream created_by;
created_by << "impala version " << GetDaemonBuildVersion()

View File

@@ -19,6 +19,7 @@
#include <string>
#include "exec/parquet/parquet-level-decoder.h"
#include "runtime/mem-pool.h"
#include "runtime/runtime-state.h"
#include "runtime/scoped-buffer.h"
@@ -44,13 +45,16 @@ static bool RequiresSkippedDictionaryHeaderCheck(
}
ParquetColumnChunkReader::ParquetColumnChunkReader(HdfsParquetScanner* parent,
string schema_name, int slot_id, ValueMemoryType value_mem_type)
string schema_name, int slot_id, ValueMemoryType value_mem_type,
bool has_rep_level, bool has_def_level)
: parent_(parent),
schema_name_(schema_name),
page_reader_(parent, schema_name),
slot_id_(slot_id),
data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())),
value_mem_type_(value_mem_type)
value_mem_type_(value_mem_type),
has_rep_level_(has_rep_level),
has_def_level_(has_def_level)
{
}
@@ -203,79 +207,185 @@ Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b
return Status::OK();
}
Status ParquetColumnChunkReader::ReadNextDataPage(
bool* eos, uint8_t** data, int* data_size, bool read_data) {
// Read the next data page, skipping page types we don't care about. This method should
// be called after we know that the first page is not a dictionary page. Therefore, if
// we find a dictionary page, it is an error in the parquet file and we return a non-ok
// status (returned by page_reader_.ReadPageHeader()).
Status ParquetColumnChunkReader::ReadNextDataPageHeader(int* num_values) {
// Read the next data page header, skipping page types we don't care about and empty
// data pages. This method should be called after we know that the first page is not a
// dictionary page. Therefore, if we find a dictionary page, it is an error in
// the parquet file and we return a non-ok status (returned by
// page_reader_.ReadPageHeader()).
bool eos = false;
*num_values = 0;
bool next_data_page_found = false;
while (!next_data_page_found) {
RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
RETURN_IF_ERROR(page_reader_.ReadPageHeader(&eos));
const parquet::PageHeader current_page_header = CurrentPageHeader();
const parquet::PageHeader& header = CurrentPageHeader();
// If page_reader_.ReadPageHeader() > 0 and the page is a dictionary page then
// ReadPageHeader would have returned an error.
DCHECK(page_reader_.PageHeadersRead() > 0
|| !current_page_header.__isset.dictionary_page_header)
|| !header.__isset.dictionary_page_header)
<< "Should not call this method on the first page if it is a dictionary.";
if (*eos) return Status::OK();
if (eos) return Status::OK();
if (current_page_header.type == parquet::PageType::DATA_PAGE) {
next_data_page_found = true;
if (header.type== parquet::PageType::DATA_PAGE
|| header.type == parquet::PageType::DATA_PAGE_V2) {
bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
int tmp_num_values = is_v2 ? header.data_page_header_v2.num_values
: header.data_page_header.num_values;
if (tmp_num_values < 0) {
return Status(Substitute("Error reading data page in Parquet file '$0'. "
"Invalid number of values in metadata: $1", filename(), tmp_num_values));
} else if (tmp_num_values == 0) {
// Skip pages with 0 values.
VLOG_FILE << "Found empty page in " << filename();
RETURN_IF_ERROR(SkipPageData());
} else {
*num_values = tmp_num_values;
next_data_page_found = true;
}
} else {
// We can safely skip non-data pages
RETURN_IF_ERROR(SkipPageData());
}
}
if (read_data) {
return ReadDataPageData(data, data_size);
} else {
return Status::OK();
}
return Status::OK();
}
Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size) {
const parquet::PageHeader& current_page_header = CurrentPageHeader();
Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV1(
const parquet::DataPageHeader* header_v1, DataPageInfo* page_info,
uint8_t** data, int* data_size) {
page_info->rep_level_encoding = header_v1->repetition_level_encoding;
page_info->def_level_encoding = header_v1->definition_level_encoding;
int compressed_size = current_page_header.compressed_page_size;
int uncompressed_size = current_page_header.uncompressed_page_size;
int32_t rep_level_size = 0;
if (has_rep_level_) {
RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
filename(), page_info->rep_level_encoding));
RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
filename(), data, data_size, &rep_level_size));
}
page_info->rep_level_ptr = *data;
page_info->rep_level_size = rep_level_size;
*data += rep_level_size;
*data_size -= rep_level_size;
int32_t def_level_size = 0;
if (has_def_level_) {
RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
filename(), page_info->def_level_encoding));
RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
filename(), data, data_size, &def_level_size));
}
page_info->def_level_ptr = *data;
page_info->def_level_size = def_level_size;
*data += def_level_size;
*data_size -= def_level_size;
return Status::OK();
}
Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV2(
const parquet::DataPageHeaderV2* header_v2,
DataPageInfo* page_info, uint8_t* data, int max_size) {
int rep_level_size = header_v2->repetition_levels_byte_length;
int def_level_size = header_v2->definition_levels_byte_length;
if (rep_level_size < 0 || def_level_size < 0
|| rep_level_size + def_level_size > max_size) {
return Status(Substitute("Corrupt rep/def level sizes in v2 data page in file '$0'. "
"rep level size: $1 def level size: $2 max size: $3",
filename(), rep_level_size, def_level_size, max_size));
}
page_info->rep_level_size = rep_level_size;
page_info->def_level_size = def_level_size;
page_info->rep_level_ptr = data;
page_info->def_level_ptr = data + rep_level_size;
// v2 pages always use RLE for rep/def levels.
page_info->rep_level_encoding = parquet::Encoding::RLE;
page_info->def_level_encoding = parquet::Encoding::RLE;
return Status::OK();
}
Status ParquetColumnChunkReader::ReadDataPageData(DataPageInfo* page_info) {
DCHECK(page_info != nullptr);
page_info->is_valid = false;
const parquet::PageHeader& header = CurrentPageHeader();
bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
DCHECK(is_v2 || header.type == parquet::PageType::DATA_PAGE);
// In v2 pages if decompressor_ == nullptr it is still possible that is_compressed
// is true in the header (parquet-mr writes like this if compression=UNCOMPRESSED).
bool is_compressed = decompressor_.get() != nullptr
&& (!is_v2 || header.data_page_header_v2.is_compressed);
int orig_compressed_size = header.compressed_page_size;
int orig_uncompressed_size = header.uncompressed_page_size;
int compressed_size = orig_compressed_size;
int uncompressed_size = orig_uncompressed_size;
// Read compressed data.
uint8_t* compressed_data;
RETURN_IF_ERROR(page_reader_.ReadPageData(&compressed_data));
// If v2 data page, fill rep/def level info based on header. For v1 pages this will be
// done after decompression.
if (is_v2) {
RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV2(&header.data_page_header_v2,
page_info, compressed_data, orig_uncompressed_size));
// In v2 pages compressed_page_size size also includes the uncompressed
// rep/def levels.
// https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/src/main/thrift/parquet.thrift#L578
int levels_size = page_info->rep_level_size + page_info->def_level_size;
compressed_size -= levels_size;
uncompressed_size -= levels_size;
compressed_data += levels_size;
}
const bool has_slot_desc = value_mem_type_ != ValueMemoryType::NO_SLOT_DESC;
*data_size = uncompressed_size;
if (decompressor_.get() != nullptr) {
int data_size = uncompressed_size;
uint8_t* data = nullptr;
if (is_compressed) {
SCOPED_TIMER(parent_->decompress_timer_);
uint8_t* decompressed_buffer;
RETURN_IF_ERROR(AllocateUncompressedDataPage(
uncompressed_size, "decompressed data", &decompressed_buffer));
int actual_uncompressed_size = uncompressed_size;
RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
compressed_size, compressed_data, &uncompressed_size,
compressed_size, compressed_data, &actual_uncompressed_size,
&decompressed_buffer));
// TODO: can't we call stream_->ReleaseCompletedResources(false); at this point?
VLOG_FILE << "Decompressed " << current_page_header.compressed_page_size
<< " to " << uncompressed_size;
if (current_page_header.uncompressed_page_size != uncompressed_size) {
// (we can't in v2 data page as the original buffer contains rep/def levels)
VLOG_FILE << "Decompressed " << compressed_size
<< " to " << actual_uncompressed_size;
if (uncompressed_size != actual_uncompressed_size) {
return Status(Substitute("Error decompressing data page in file '$0'. "
"Expected $1 uncompressed bytes but got $2", filename(),
current_page_header.uncompressed_page_size, uncompressed_size));
uncompressed_size, actual_uncompressed_size));
}
*data = decompressed_buffer;
data = decompressed_buffer;
if (has_slot_desc) {
parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, compressed_size);
parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
parent_->UpdateCompressedPageSizeCounter(compressed_size);
// Use original sizes (includes levels in v2) in the profile.
parent_->scan_node_->UpdateBytesRead(
slot_id_, orig_uncompressed_size, orig_compressed_size);
parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
parent_->UpdateCompressedPageSizeCounter(orig_compressed_size);
}
} else {
if (compressed_size != uncompressed_size) {
return Status(Substitute("Error reading data page in file '$0'. "
"Expected $1 bytes but got $2", filename(),
"Compressed size ($1) should be the same as uncompressed size ($2) "
"in pages without compression.", filename(),
compressed_size, uncompressed_size));
}
// TODO: could skip copying when the data page is dict encoded as strings
// will point to the dictionary instead of the data buffer (IMPALA-12137)
const bool copy_buffer = value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
if (copy_buffer) {
@@ -286,15 +396,31 @@ Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size
RETURN_IF_ERROR(AllocateUncompressedDataPage(
uncompressed_size, "uncompressed variable-length data", &buffer));
memcpy(buffer, compressed_data, uncompressed_size);
*data = buffer;
data = buffer;
} else {
*data = compressed_data;
data = compressed_data;
}
if (has_slot_desc) {
parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, 0);
parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
// Use original sizes (includes levels in v2) in the profile.
parent_->scan_node_->UpdateBytesRead(slot_id_, orig_uncompressed_size, 0);
parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
}
}
// The buffers to return are ready at this point.
// If v1 data page, fill rep/def level info by parsing the beginning of the data. For
// v2 pages this was done before decompression.
if (!is_v2) {
RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV1(&header.data_page_header, page_info,
&data, &data_size));
}
page_info->data_encoding = is_v2 ? header.data_page_header_v2.encoding
: header.data_page_header.encoding;
page_info->data_ptr = data;
page_info->data_size = data_size;
page_info->is_valid = true;
return Status::OK();
}

View File

@@ -29,7 +29,8 @@ class MemPool;
class ScopedBuffer;
/// A class to read data from Parquet pages. It handles the page headers, decompression
/// and the possible copying of the data buffers.
/// and the possible copying of the data buffers. It also hides the differences between
/// v1 and v2 data pages.
/// Before reading, InitColumnChunk(), set_io_reservation() and StartScan() must be called
/// in this order.
class ParquetColumnChunkReader {
@@ -50,23 +51,29 @@ class ParquetColumnChunkReader {
VAR_LEN_STR
};
// Class to hide differences between v1 and v2 data pages.
struct DataPageInfo {
parquet::Encoding::type rep_level_encoding;
parquet::Encoding::type def_level_encoding;
parquet::Encoding::type data_encoding;
uint8_t* rep_level_ptr = nullptr;
uint8_t* def_level_ptr = nullptr;
uint8_t* data_ptr = nullptr;
int rep_level_size = -1;
int def_level_size = -1;
int data_size = -1;
bool is_valid = false;
};
const char* filename() const { return parent_->filename(); }
const parquet::PageHeader& CurrentPageHeader() const {
return page_reader_.CurrentPageHeader();
}
io::ScanRange* scan_range() const { return page_reader_.scan_range(); }
parquet::PageType::type page_type() const { return CurrentPageHeader().type; }
ScannerContext::Stream* stream() const { return page_reader_.stream(); }
parquet::Encoding::type encoding() const {
return CurrentPageHeader().data_page_header.encoding;
}
/// Moved to implementation to be able to forward declare class in scoped_ptr.
ParquetColumnChunkReader(HdfsParquetScanner* parent, std::string schema_name,
int slot_id, ValueMemoryType value_mem_type);
int slot_id, ValueMemoryType value_mem_type,
bool has_rep_level, bool has_def_level);
~ParquetColumnChunkReader();
/// Resets the reader for each row group in the file and creates the scan
@@ -113,16 +120,16 @@ class ParquetColumnChunkReader {
ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
int64_t* data_size, int* num_entries);
/// Reads the next data page to '*data' and '*data_size', if 'read_data' is true.
/// Else reads page header only, following which client should either call
/// 'ReadDataPageData' or 'SkipPageData'.
/// Skips other types of pages (except for dictionary) until it finds a data page. If it
/// finds a dictionary page, returns an error as the dictionary page should be the first
/// page and this method should only be called if a data page is expected.
/// If the stream reaches the end before reading a complete page header, '*eos' is set
/// to true.
Status ReadNextDataPage(
bool* eos, uint8_t** data, int* data_size, bool read_data = true);
/// Reads the next non-empty data page's page header, following which the client
/// should either call 'ReadDataPageData' or 'SkipPageData'.
/// Skips other types of pages (except for dictionary) and empty data pages until it
/// finds a non-empty data page. If it finds a dictionary page, returns an error as
/// the dictionary page should be the first page and this method should only be called
/// if a data page is expected.
/// `num_values` is set to the number of values in the page (including nulls).
/// If the stream reaches the end before reading a complete page header for a non-empty
/// data page `num_values` is set to 0 to indicate EOS.
Status ReadNextDataPageHeader(int* num_values);
/// If the column type is a variable length string, transfers the remaining resources
/// backing tuples to 'mem_pool' and frees up other resources. Otherwise frees all
@@ -132,12 +139,11 @@ class ParquetColumnChunkReader {
/// Skips the data part of the page. The header must be already read.
Status SkipPageData();
/// Reads the data part of the next data page. Sets '*data' to point to the buffer and
/// '*data_size' to its size.
/// Reads the data part of the next data page and fills the members of `page_info`.
/// If the column type is a variable length string, the buffer is allocated from
/// data_page_pool_. Otherwise the returned buffer will be valid only until the next
/// function call that advances the buffer.
Status ReadDataPageData(uint8_t** data, int* data_size);
Status ReadDataPageData(DataPageInfo* page_info);
private:
HdfsParquetScanner* parent_;
@@ -160,6 +166,11 @@ class ParquetColumnChunkReader {
boost::scoped_ptr<Codec> decompressor_;
ValueMemoryType value_mem_type_;
const bool has_rep_level_;
const bool has_def_level_;
/// See TryReadDictionaryPage() for information about the parameters.
Status ReadDictionaryData(ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
int64_t* data_size, int* num_entries);
@@ -170,7 +181,17 @@ class ParquetColumnChunkReader {
Status AllocateUncompressedDataPage(
int64_t size, const char* err_ctx, uint8_t** buffer);
ValueMemoryType value_mem_type_;
const parquet::PageHeader& CurrentPageHeader() const {
return page_reader_.CurrentPageHeader();
}
// Fills rep/def level related members in page_info by parsing the start of the buffer.
Status ProcessRepDefLevelsInDataPageV1(const parquet::DataPageHeader* header_v1,
DataPageInfo* page_info, uint8_t** data, int* data_size);
// Fills rep/def level related members in page_info based on the header.
Status ProcessRepDefLevelsInDataPageV2(const parquet::DataPageHeaderV2* header_v2,
DataPageInfo* page_info, uint8_t* data, int max_size);
};
} // namespace impala

View File

@@ -184,7 +184,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
dict_decoder_init_ = false;
}
virtual Status InitDataPage(uint8_t* data, int size) override;
virtual Status InitDataDecoder(uint8_t* data, int size) override;
virtual bool SkipEncodedValuesInPage(int64_t num_values) override;
@@ -344,18 +344,23 @@ inline bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
// TODO: consider performing filter selectivity checks in this function.
template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataDecoder(
uint8_t* data, int size) {
// Data can be empty if the column contains all NULLs
DCHECK_GE(size, 0);
DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
<< "Bool has specialized impl";
page_encoding_ = col_chunk_reader_.encoding();
if (!IsDictionaryEncoding(page_encoding_)
&& page_encoding_ != parquet::Encoding::PLAIN) {
return GetUnsupportedDecodingError();
}
// PLAIN_DICTIONARY is deprecated in Parquet V2. It means the same as RLE_DICTIONARY
// so internally PLAIN_DICTIONARY can be used to represent both encodings.
if (page_encoding_ == parquet::Encoding::RLE_DICTIONARY) {
page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
}
// If slot_desc_ is NULL, we don't need to decode any values so dict_decoder_ does
// not need to be initialized.
if (IsDictionaryEncoding(page_encoding_) && slot_desc_ != nullptr) {
@@ -379,11 +384,10 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag
}
template <>
Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataPage(
Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataDecoder(
uint8_t* data, int size) {
// Data can be empty if the column contains all NULLs
DCHECK_GE(size, 0);
page_encoding_ = col_chunk_reader_.encoding();
/// Boolean decoding is delegated to 'bool_decoder_'.
if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK();
@@ -1156,34 +1160,20 @@ Status BaseScalarColumnReader::ReadDataPage() {
return Status::OK();
}
bool eos;
int data_size;
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size));
if (eos) return HandleTooEarlyEos();
data_end_ = data_ + data_size;
const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
int num_values = current_page_header.data_page_header.num_values;
if (num_values < 0) {
return Status(Substitute("Error reading data page in Parquet file '$0'. "
"Invalid number of values in metadata: $1", filename(), num_values));
}
num_buffered_values_ = num_values;
// Read the next header, return if not found.
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_));
if (num_buffered_values_ == 0) return HandleTooEarlyEos();
DCHECK_GT(num_buffered_values_, 0);
// Read the data in the data page.
ParquetColumnChunkReader::DataPageInfo page;
RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page));
DCHECK(page.is_valid);
num_values_read_ += num_buffered_values_;
/// TODO: Move the level decoder initialisation to ParquetPageReader to abstract away
/// the differences between Parquet header V1 and V2.
// Initialize the repetition level data
RETURN_IF_ERROR(rep_levels_.Init(filename(),
&current_page_header.data_page_header.repetition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
&data_size));
// Initialize the definition level data
RETURN_IF_ERROR(def_levels_.Init(filename(),
&current_page_header.data_page_header.definition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
&data_size));
// Data can be empty if the column contains all NULLs
RETURN_IF_ERROR(InitDataPage(data_, data_size));
RETURN_IF_ERROR(InitDataPageDecoders(page));
// Skip rows if needed.
RETURN_IF_ERROR(StartPageFiltering());
@@ -1208,48 +1198,49 @@ Status BaseScalarColumnReader::ReadNextDataPageHeader() {
return Status::OK();
}
bool eos;
int data_size;
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size,
false /*Read next data page's header only*/));
if (eos) return HandleTooEarlyEos();
const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
int num_values = current_page_header.data_page_header.num_values;
if (UNLIKELY(num_values < 0)) {
return Status(Substitute("Error reading data page in Parquet file '$0'. "
"Invalid number of values in metadata: $1",
filename(), num_values));
}
num_buffered_values_ = num_values;
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_));
if (num_buffered_values_ == 0) return HandleTooEarlyEos();
DCHECK_GT(num_buffered_values_, 0);
num_values_read_ += num_buffered_values_;
if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
return Status::OK();
}
Status BaseScalarColumnReader::ReadCurrentDataPage() {
int data_size;
RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&data_, &data_size));
data_end_ = data_ + data_size;
const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
/// TODO: Move the level decoder initialisation to ParquetPageReader to abstract away
/// the differences between Parquet header V1 and V2.
// Initialize the repetition level data
RETURN_IF_ERROR(rep_levels_.Init(filename(),
&current_page_header.data_page_header.repetition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
&data_size));
// Initialize the definition level data
RETURN_IF_ERROR(def_levels_.Init(filename(),
&current_page_header.data_page_header.definition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
&data_size));
// Data can be empty if the column contains all NULLs
RETURN_IF_ERROR(InitDataPage(data_, data_size));
ParquetColumnChunkReader::DataPageInfo page;
RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page));
DCHECK(page.is_valid);
RETURN_IF_ERROR(InitDataPageDecoders(page));
// Skip rows if needed.
RETURN_IF_ERROR(StartPageFiltering());
return Status::OK();
}
Status BaseScalarColumnReader::InitDataPageDecoders(
const ParquetColumnChunkReader::DataPageInfo& page) {
// Initialize the repetition level data
DCHECK(page.rep_level_encoding == Encoding::RLE || page.rep_level_size == 0 );
RETURN_IF_ERROR(rep_levels_.Init(filename(),
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(),
page.rep_level_ptr, page.rep_level_size));
// Initialize the definition level data
DCHECK(page.def_level_encoding == Encoding::RLE || page.def_level_size == 0 );
RETURN_IF_ERROR(def_levels_.Init(filename(),
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(),
page.def_level_ptr, page.def_level_size));
page_encoding_ = page.data_encoding;
data_ = page.data_ptr;
data_end_ = data_ + page.data_size;
// Data can be empty if the column contains all NULLs
RETURN_IF_ERROR(InitDataDecoder(page.data_ptr, page.data_size));
return Status::OK();
}
template <bool ADVANCE_REP_LEVEL>
bool BaseScalarColumnReader::NextLevels() {
if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
@@ -1661,24 +1652,18 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row
if (!AdvanceNextPageHeader()) {
return false;
}
const parquet::PageHeader& current_page_header =
col_chunk_reader_.CurrentPageHeader();
int32_t current_page_values = current_page_header.data_page_header.num_values;
if (UNLIKELY(current_page_values <= 0)) {
return false;
}
DCHECK_GT(num_buffered_values_, 0);
// Keep advancing to next page header if rows to be skipped are more than number
// of values in the page. Note we will just be reading headers and skipping
// pages without decompressing them as we advance.
while (num_rows > current_page_values) {
while (num_rows > num_buffered_values_) {
COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
num_rows -= current_page_values;
current_row_ += current_page_values;
num_rows -= num_buffered_values_;
current_row_ += num_buffered_values_;
if (!col_chunk_reader_.SkipPageData().ok() || !AdvanceNextPageHeader()) {
return false;
}
current_page_values =
col_chunk_reader_.CurrentPageHeader().data_page_header.num_values;
DCHECK_GT(num_buffered_values_, 0);
}
// Read the data page (includes decompressing them if required).
Status page_read = ReadCurrentDataPage();

View File

@@ -301,7 +301,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
col_chunk_reader_(parent, node.element->name,
slot_desc != nullptr ? slot_desc->id() : -1, PageReaderValueMemoryType()) {
slot_desc != nullptr ? slot_desc->id() : -1, PageReaderValueMemoryType(),
max_rep_level() > 0,
max_def_level() > 0) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
@@ -407,7 +409,11 @@ class BaseScalarColumnReader : public ParquetColumnReader {
ParquetLevelDecoder rep_levels_{false};
/// Page encoding for values of the current data page. Cached here for perf. Set in
/// InitDataPage().
/// InitDataPageDecoders().
///
/// Parquet V2 deprecated PLAIN_DICTIONARY and RLE_DICTIONARY should be used instead.
/// In this member PLAIN_DICTIONARY is used both for pages with PLAIN_DICTIONARY and
/// RLE_DICTIONARY as the encodings mean the same.
parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
/// Num values remaining in the current data page
@@ -519,7 +525,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
/// decompressed data page. Decoders can initialize state from here. The caller must
/// validate the input such that 'size' is non-negative and that 'data' has at least
/// 'size' bytes remaining.
virtual Status InitDataPage(uint8_t* data, int size) = 0;
virtual Status InitDataDecoder(uint8_t* data, int size) = 0;
/// Initializes decoders for rep/def levels and data.
Status InitDataPageDecoders(const ParquetColumnChunkReader::DataPageInfo& page_info);
ParquetColumnChunkReader::ValueMemoryType PageReaderValueMemoryType() {
if (slot_desc_ == nullptr) {

View File

@@ -37,7 +37,8 @@
namespace impala {
const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
const uint32_t PARQUET_CURRENT_VERSION = 1;
const uint32_t PARQUET_MAX_SUPPORTED_VERSION = 2;
const uint32_t PARQUET_WRITER_VERSION = 1;
/// Struct that specifies an inclusive range of rows.
struct RowRange {

View File

@@ -33,10 +33,42 @@ const int16_t ParquetLevel::ROW_GROUP_END;
const int16_t ParquetLevel::INVALID_LEVEL;
const int16_t ParquetLevel::INVALID_POS;
Status ParquetLevelDecoder::Init(const string& filename, const Encoding::type* encoding,
MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size) {
DCHECK(*data != nullptr);
DCHECK_GE(*data_size, 0);
Status ParquetLevelDecoder::ValidateEncoding(const string& filename,
const Encoding::type encoding) {
if (Ubsan::EnumToInt(&encoding) > Encoding::MAX_ENUM_VALUE) {
stringstream ss;
ss << "Unsupported encoding: " << Ubsan::EnumToInt(&encoding);
return Status(ss.str());
}
switch (encoding) {
case Encoding::RLE:
return Status::OK();
case Encoding::BIT_PACKED:
return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
default: {
stringstream ss;
ss << "Unsupported encoding: " << encoding;
return Status(ss.str());
}
}
}
Status ParquetLevelDecoder::ParseRleByteSize(const string& filename,
uint8_t** data, int* total_data_size, int32_t* num_bytes) {
Status status;
if (!ReadWriteUtil::Read(data, total_data_size, num_bytes, &status)) {
return status;
}
if (*num_bytes < 0 || *num_bytes > *total_data_size) {
return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, *num_bytes);
}
return Status::OK();
}
Status ParquetLevelDecoder::Init(const string& filename, MemPool* cache_pool,
int cache_size, int max_level, uint8_t* data, int32_t num_bytes) {
DCHECK(data != nullptr);
DCHECK_GE(num_bytes, 0);
DCHECK_GT(cache_size, 0);
cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
max_level_ = max_level;
@@ -46,40 +78,9 @@ Status ParquetLevelDecoder::Init(const string& filename, const Encoding::type* e
// Return because there is no level data to read, e.g., required field.
if (max_level == 0) return Status::OK();
int32_t num_bytes = 0;
if (Ubsan::EnumToInt(encoding) > Encoding::MAX_ENUM_VALUE) {
stringstream ss;
ss << "Unsupported encoding: " << Ubsan::EnumToInt(encoding);
return Status(ss.str());
}
switch (*encoding) {
case Encoding::RLE: {
Status status;
if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
return status;
}
if (num_bytes < 0 || num_bytes > *data_size) {
return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
}
int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
rle_decoder_.Reset(*data, num_bytes, bit_width);
break;
}
case parquet::Encoding::BIT_PACKED:
return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
default: {
stringstream ss;
ss << "Unsupported encoding: " << *encoding;
return Status(ss.str());
}
}
if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but "
"only $2 bytes left in page",
filename, num_bytes, *data_size));
}
*data += num_bytes;
*data_size -= num_bytes;
int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
rle_decoder_.Reset(data, num_bytes, bit_width);
return Status::OK();
}

View File

@@ -49,11 +49,19 @@ class ParquetLevelDecoder {
: decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR :
TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
/// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
/// encoding requires reading metadata from the page header. 'cache_size' will be
/// rounded up to a multiple of 32 internally.
Status Init(const std::string& filename, const parquet::Encoding::type* encoding,
MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
/// Initialize the LevelDecoder. Assumes that data is RLE encoded.
/// 'cache_size' will be rounded up to a multiple of 32 internally.
Status Init(const std::string& filename, MemPool* cache_pool, int cache_size,
int max_level, uint8_t* data, int32_t num_bytes);
/// Parses the number of bytes used for level encoding from the buffer and moves
/// 'data' forward.
static Status ParseRleByteSize(const string& filename,
uint8_t** data, int* total_data_size, int32_t* num_bytes);
// Validates that encoding is RLE.
static Status ValidateEncoding(const string& filename,
const parquet::Encoding::type encoding);
/// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
/// as batched methods.

View File

@@ -253,7 +253,7 @@ const std::vector<ParquetSchemaResolver::ArrayEncoding>
Status ParquetMetadataUtils::ValidateFileVersion(
const parquet::FileMetaData& file_metadata, const char* filename) {
if (file_metadata.version > PARQUET_CURRENT_VERSION) {
if (file_metadata.version > PARQUET_MAX_SUPPORTED_VERSION) {
stringstream ss;
ss << "File: " << filename << " is of an unsupported version. "
<< "file version: " << file_metadata.version;

View File

@@ -4068,3 +4068,89 @@ INSERT INTO TABLE {db_name}{db_suffix}.{table_name} VALUES (6);
transactional=true
transactional_properties=insert_only
====
---- DATASET
functional
---- BASE_TABLE_NAME
alltypesagg_parquet_v2_uncompressed
---- PARTITION_COLUMNS
year int
month int
day int
---- COLUMNS
id int
bool_col boolean
tinyint_col tinyint
smallint_col smallint
int_col int
bigint_col bigint
float_col float
double_col double
date_string_col string
string_col string
timestamp_col timestamp
---- DEPENDENT_LOAD_HIVE
INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from functional.alltypesagg;
---- TABLE_PROPERTIES
parquet.writer.version=v2
parquet.compression=UNCOMPRESSED
====
---- DATASET
functional
---- BASE_TABLE_NAME
alltypesagg_parquet_v2_snappy
---- PARTITION_COLUMNS
year int
month int
day int
---- COLUMNS
id int
bool_col boolean
tinyint_col tinyint
smallint_col smallint
int_col int
bigint_col bigint
float_col float
double_col double
date_string_col string
string_col string
timestamp_col timestamp
---- DEPENDENT_LOAD_HIVE
INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from functional.alltypesagg;
---- TABLE_PROPERTIES
parquet.writer.version=v2
parquet.compression=SNAPPY
====
---- DATASET
functional
---- BASE_TABLE_NAME
complextypestbl_parquet_v2_uncompressed
---- COLUMNS
id bigint
int_array array<int>
int_array_array array<array<int>>
int_map map<string, int>
int_map_array array<map<string, int>>
nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
---- DEPENDENT_LOAD_HIVE
INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from functional_parquet.complextypestbl;
---- TABLE_PROPERTIES
parquet.writer.version=v2
parquet.compression=UNCOMPRESSED
====
---- DATASET
functional
---- BASE_TABLE_NAME
complextypestbl_parquet_v2_snappy
---- COLUMNS
id bigint
int_array array<int>
int_array_array array<array<int>>
int_map map<string, int>
int_map_array array<map<string, int>>
nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
---- DEPENDENT_LOAD_HIVE
INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from functional_parquet.complextypestbl;
---- TABLE_PROPERTIES
parquet.writer.version=v2
parquet.compression=SNAPPY
====

View File

@@ -372,3 +372,8 @@ table_name:insert_only_major_and_minor_compacted, constraint:restrict_to, table_
# The table is used in large scale metadata test. File format doesn't matter so restrict to text only
table_name:widetable_2000_cols_partitioned, constraint:restrict_to, table_format:text/none/none
table_name:alltypesagg_parquet_v2_uncompressed, constraint:restrict_to, table_format:parquet/none/none
table_name:alltypesagg_parquet_v2_snappy, constraint:restrict_to, table_format:parquet/none/none
table_name:complextypestbl_parquet_v2_uncompressed, constraint:restrict_to, table_format:parquet/none/none
table_name:complextypestbl_parquet_v2_snappy, constraint:restrict_to, table_format:parquet/none/none
1 # Table level constraints:
372
373
374
375
376
377
378
379

View File

@@ -0,0 +1,97 @@
====
---- QUERY
# Check if count(*) optimization works correctly.
select count(*) from alltypesagg_parquet_v2_uncompressed
---- RESULTS
11000
---- TYPES
BIGINT
====
---- QUERY
select count(*) from alltypesagg_parquet_v2_snappy
---- RESULTS
11000
---- TYPES
BIGINT
====
---- QUERY
# Check that definition levels are decoded correctly.
select count(double_col) from alltypesagg_parquet_v2_uncompressed
---- RESULTS
10980
---- TYPES
BIGINT
====
---- QUERY
select count(double_col) from alltypesagg_parquet_v2_snappy
---- RESULTS
10980
---- TYPES
BIGINT
====
---- QUERY
# Check that values are decoded correctly.
select distinct double_col from alltypesagg_parquet_v2_uncompressed
order by double_col limit 5
---- RESULTS
10.1
20.2
30.3
40.4
50.5
---- TYPES
double
====
---- QUERY
select distinct double_col from alltypesagg_parquet_v2_snappy
order by double_col limit 5
---- RESULTS
10.1
20.2
30.3
40.4
50.5
---- TYPES
double
====
---- QUERY
# Check that repetition levels are decoded correctly.
select int_array from complextypestbl_parquet_v2_uncompressed
---- RESULTS
'[1,2,3]'
'[null,1,2,null,3,null]'
'[]'
'NULL'
'NULL'
'NULL'
'NULL'
'[-1]'
---- TYPES
string
====
---- QUERY
select int_array from complextypestbl_parquet_v2_snappy
---- RESULTS
'[1,2,3]'
'[null,1,2,null,3,null]'
'[]'
'NULL'
'NULL'
'NULL'
'NULL'
'[-1]'
---- TYPES
string
====
---- QUERY
# Check that DELTA_BINARY_PACKED encoding returns an error message.
select count(id) from alltypesagg_parquet_v2_snappy
---- CATCH
unsupported encoding: DELTA_BINARY_PACKED for column 'id'
====
---- QUERY
# Check that DELTA_BYTE_ARRAY encoding returns an error message.
select count(string_col) from alltypesagg_parquet_v2_snappy
---- CATCH
unsupported encoding: DELTA_BYTE_ARRAY for column 'string_col'
====

View File

@@ -1909,3 +1909,18 @@ class TestBinaryType(ImpalaTestSuite):
def test_binary_type(self, vector):
self.run_test_case('QueryTest/binary-type', vector)
class TestParquetV2(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestParquetV2, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def test_parquet_v2(self, vector):
self.run_test_case('QueryTest/parquet-v2', vector)

View File

@@ -159,6 +159,28 @@ class TestScannersFuzzing(ImpalaTestSuite):
self.run_fuzz_test(vector, "functional_orc_def", src_table_name, unique_database,
fuzz_table_name, 10)
def test_fuzz_parquet_v2(self, vector, unique_database):
table_format = vector.get_value('table_format')
if table_format.file_format != 'parquet': pytest.skip()
tables = ["alltypesagg_parquet_v2_uncompressed", "alltypesagg_parquet_v2_snappy"]
for table_name in tables:
custom_queries = [
"select avg(float_col), avg(double_col), avg(timestamp_col)"
" from %s where bool_col;" % table_name
]
self.run_fuzz_test(vector, "functional_parquet", table_name, unique_database,
table_name, 10, custom_queries)
tables = ["complextypestbl_parquet_v2_uncompressed",
"complextypestbl_parquet_v2_snappy"]
for table_name in tables:
custom_queries = [
"select int_array from %s;" % table_name
]
self.run_fuzz_test(vector, "functional_parquet", table_name, unique_database,
table_name, 10, custom_queries)
# TODO: add test coverage for additional data types like char and varchar
def run_fuzz_test(self, vector, src_db, src_table, fuzz_db, fuzz_table, num_copies=1,
@@ -311,6 +333,9 @@ class TestScannersFuzzing(ImpalaTestSuite):
not suffix.startswith('base_') and
not suffix.startswith('delta_') and
not suffix.startswith('delete_delta_')):
# Null partitions are stored as __HIVE_DEFAULT_PARTITION__ but expected as null
# in ALTER TABLE ADD PARTITION.
suffix = suffix.replace("__HIVE_DEFAULT_PARTITION__", "null")
reversed_partitions.append(suffix)
return reversed(reversed_partitions)