IMPALA-5842: Write page index in Parquet files

This commit builds on the previous work of
Pooja Nilangekar: https://gerrit.cloudera.org/#/c/7464/

The commit implements the write path of PARQUET-922:
"Add column indexes to parquet.thrift". As specified in the
parquet-format, Impala writes the page indexes just before
the footer. This allows much more efficient page filtering
than using the same information from the 'statistics' field
of DataPageHeader.

I updated Pooja's python tests as well.

Change-Id: Icbacf7fe3b7672e3ce719261ecef445b16f8dec9
Reviewed-on: http://gerrit.cloudera.org:8080/9693
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2018-04-09 16:10:00 +02:00
committed by Impala Public Jenkins
parent 05e0db3a0e
commit ccf19f9f8f
14 changed files with 1002 additions and 113 deletions

View File

@@ -36,6 +36,7 @@
#include "util/debug-util.h"
#include "util/dict-encoding.h"
#include "util/hdfs-util.h"
#include "util/string-util.h"
#include "util/rle-encoding.h"
#include <sstream>
@@ -44,7 +45,6 @@
#include "common/names.h"
using namespace impala;
using namespace parquet;
using namespace apache::thrift;
// Managing file sizes: We need to estimate how big the files being buffered
@@ -102,7 +102,10 @@ class HdfsParquetTableWriter::BaseColumnWriter {
def_levels_(nullptr),
values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
page_stats_base_(nullptr),
row_group_stats_base_(nullptr) {
row_group_stats_base_(nullptr),
table_sink_mem_tracker_(parent_->parent_->mem_tracker()) {
static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value,
"'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink");
def_levels_ = parent_->state_->obj_pool()->Add(
new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
DEFAULT_DATA_PAGE_SIZE, 1));
@@ -145,7 +148,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// Encodes the row group statistics into a parquet::Statistics object and attaches it to
// 'meta_data'.
void EncodeRowGroupStats(ColumnMetaData* meta_data) {
void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) {
DCHECK(row_group_stats_base_ != nullptr);
if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
@@ -162,13 +165,21 @@ class HdfsParquetTableWriter::BaseColumnWriter {
current_page_ = nullptr;
num_values_ = 0;
total_compressed_byte_size_ = 0;
current_encoding_ = Encoding::PLAIN;
next_page_encoding_ = Encoding::PLAIN;
current_encoding_ = parquet::Encoding::PLAIN;
next_page_encoding_ = parquet::Encoding::PLAIN;
column_encodings_.clear();
dict_encoding_stats_.clear();
data_encoding_stats_.clear();
// Repetition/definition level encodings are constant. Incorporate them here.
column_encodings_.insert(Encoding::RLE);
column_encodings_.insert(parquet::Encoding::RLE);
offset_index_.page_locations.clear();
column_index_.null_pages.clear();
column_index_.min_values.clear();
column_index_.max_values.clear();
table_sink_mem_tracker_->Release(page_index_memory_consumption_);
page_index_memory_consumption_ = 0;
column_index_.null_counts.clear();
valid_column_index_ = true;
}
// Close this writer. This is only called after Flush() and no more rows will
@@ -176,6 +187,9 @@ class HdfsParquetTableWriter::BaseColumnWriter {
void Close() {
if (compressor_.get() != nullptr) compressor_->Close();
if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close();
// We must release the memory consumption of this column writer.
table_sink_mem_tracker_->Release(page_index_memory_consumption_);
page_index_memory_consumption_ = 0;
}
const ColumnType& type() const { return expr_eval_->root().type(); }
@@ -211,7 +225,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
struct DataPage {
// Page header. This is a union of all page types.
PageHeader header;
parquet::PageHeader header;
// Number of bytes needed to store definition levels.
int num_def_bytes;
@@ -259,21 +273,21 @@ class HdfsParquetTableWriter::BaseColumnWriter {
int64_t total_compressed_byte_size_;
int64_t total_uncompressed_byte_size_;
// Encoding of the current page.
Encoding::type current_encoding_;
parquet::Encoding::type current_encoding_;
// Encoding to use for the next page. By default, the same as 'current_encoding_'.
// Used by the column writer to switch encoding while writing a column, e.g. if the
// dictionary overflows.
Encoding::type next_page_encoding_;
parquet::Encoding::type next_page_encoding_;
// Set of all encodings used in the column chunk
unordered_set<Encoding::type> column_encodings_;
unordered_set<parquet::Encoding::type> column_encodings_;
// Map from the encoding to the number of pages in the column chunk with this encoding
// These are used to construct the PageEncodingStats, which provide information
// about encoding usage for each different page type. Currently, only dictionary
// and data pages are used.
unordered_map<Encoding::type, int> dict_encoding_stats_;
unordered_map<Encoding::type, int> data_encoding_stats_;
unordered_map<parquet::Encoding::type, int> dict_encoding_stats_;
unordered_map<parquet::Encoding::type, int> data_encoding_stats_;
// Created, owned, and set by the derived class.
DictEncoderBase* dict_encoder_base_;
@@ -292,6 +306,22 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// Pointers to statistics, created, owned, and set by the derived class.
ColumnStatsBase* page_stats_base_;
ColumnStatsBase* row_group_stats_base_;
// OffsetIndex stores the locations of the pages.
parquet::OffsetIndex offset_index_;
// ColumnIndex stores the statistics of the pages.
parquet::ColumnIndex column_index_;
// Pointer to the HdfsTableSink's MemTracker.
MemTracker* table_sink_mem_tracker_;
// Memory consumption of the min/max values in the page index.
int64_t page_index_memory_consumption_ = 0;
// Only write ColumnIndex when 'valid_column_index_' is true. We always need to write
// the OffsetIndex though.
bool valid_column_index_ = true;
};
// Per type column writer.
@@ -312,8 +342,8 @@ class HdfsParquetTableWriter::ColumnWriter :
BaseColumnWriter::Reset();
// Default to dictionary encoding. If the cardinality ends up being too high,
// it will fall back to plain.
current_encoding_ = Encoding::PLAIN_DICTIONARY;
next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
dict_encoder_.reset(
new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_,
parent_->parent_->mem_tracker()));
@@ -328,7 +358,7 @@ class HdfsParquetTableWriter::ColumnWriter :
protected:
virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
if (UNLIKELY(num_values_since_dict_size_check_ >=
DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
num_values_since_dict_size_check_ = 0;
@@ -339,11 +369,11 @@ class HdfsParquetTableWriter::ColumnWriter :
// If the dictionary contains the maximum number of values, switch to plain
// encoding for the next page. The current page is full and must be written out.
if (UNLIKELY(*bytes_needed < 0)) {
next_page_encoding_ = Encoding::PLAIN;
next_page_encoding_ = parquet::Encoding::PLAIN;
return false;
}
parent_->file_size_estimate_ += *bytes_needed;
} else if (current_encoding_ == Encoding::PLAIN) {
} else if (current_encoding_ == parquet::Encoding::PLAIN) {
T* v = CastValue(value);
*bytes_needed = plain_encoded_value_size_ < 0 ?
ParquetPlainEncoder::ByteSize<T>(*v) :
@@ -386,8 +416,7 @@ class HdfsParquetTableWriter::ColumnWriter :
// Temporary string value to hold CHAR(N)
StringValue temp_;
// Tracks statistics per page. These are not written out currently but are merged into
// the row group stats. TODO(IMPALA-5841): Write these to the page index.
// Tracks statistics per page. These are written out to the page index.
scoped_ptr<ColumnStats<T>> page_stats_;
// Tracks statistics per row group. This gets reset when starting a new row group.
@@ -424,7 +453,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
new BitWriter(values_buffer_, values_buffer_len_));
// Dictionary encoding doesn't make sense for bools and is not allowed by
// the format.
current_encoding_ = Encoding::PLAIN;
current_encoding_ = parquet::Encoding::PLAIN;
dict_encoder_base_ = nullptr;
page_stats_base_ = &page_stats_;
@@ -455,8 +484,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
// Used to encode bools as single bit values. This is reused across pages.
BitWriter* bool_values_;
// Tracks statistics per page. These are not written out currently but are merged into
// the row group stats. TODO(IMPALA-5841): Write these to the page index.
// Tracks statistics per page. These are written out to the page index.
ColumnStats<bool> page_stats_;
// Tracks statistics per row group. This gets reset when starting a new file.
@@ -559,13 +587,13 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
if (dict_encoder_base_ != nullptr) {
*first_dictionary_page = *file_pos;
// Write dictionary page header
DictionaryPageHeader dict_header;
parquet::DictionaryPageHeader dict_header;
dict_header.num_values = dict_encoder_base_->num_entries();
dict_header.encoding = Encoding::PLAIN_DICTIONARY;
dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY;
++dict_encoding_stats_[dict_header.encoding];
PageHeader header;
header.type = PageType::DICTIONARY_PAGE;
parquet::PageHeader header;
header.type = parquet::PageType::DICTIONARY_PAGE;
header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
header.__set_dictionary_page_header(dict_header);
@@ -608,15 +636,26 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
}
*first_data_page = *file_pos;
int64_t current_row_group_index = 0;
offset_index_.page_locations.resize(num_data_pages_);
// Write data pages
for (int i = 0; i < num_data_pages_; ++i) {
DataPage& page = pages_[i];
parquet::PageLocation location;
if (page.header.data_page_header.num_values == 0) {
// Skip empty pages
location.offset = -1;
location.compressed_page_size = 0;
location.first_row_index = -1;
offset_index_.page_locations[i] = location;
continue;
}
location.offset = *file_pos;
location.first_row_index = current_row_group_index;
// Write data page header
uint8_t* buffer = nullptr;
uint32_t len = 0;
@@ -625,9 +664,17 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
RETURN_IF_ERROR(parent_->Write(buffer, len));
*file_pos += len;
// Note that the namings are confusing here:
// parquet::PageHeader::compressed_page_size is the compressed page size in bytes, as
// its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
// also includes the size of the page header.
location.compressed_page_size = page.header.compressed_page_size + len;
offset_index_.page_locations[i] = location;
// Write the page data
RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
*file_pos += page.header.compressed_page_size;
current_row_group_index += page.header.data_page_header.num_values;
}
return Status::OK();
}
@@ -639,11 +686,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
// If the entire page was NULL, encode it as PLAIN since there is no
// data anyway. We don't output a useless dictionary page and it works
// around a parquet MR bug (see IMPALA-759 for more details).
if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN;
if (current_encoding_ == Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
PageHeader& header = current_page_->header;
parquet::PageHeader& header = current_page_->header;
header.data_page_header.encoding = current_encoding_;
// Accumulate encoding statistics
@@ -698,9 +745,41 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
max_compressed_size - header.compressed_page_size);
}
DCHECK(page_stats_base_ != nullptr);
parquet::Statistics page_stats;
page_stats_base_->EncodeToThrift(&page_stats);
{
// If pages_stats contains min_value and max_value, then append them to min_values_
// and max_values_ and also mark the page as not null. In case min and max values are
// not set, push empty strings to maintain the consistency of the index and mark the
// page as null. Always push the null_count.
string min_val;
string max_val;
if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
&min_val);
Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
&max_val);
if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
column_index_.null_pages.push_back(false);
} else {
DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
column_index_.null_pages.push_back(true);
DCHECK_EQ(page_stats.null_count, num_values_);
}
int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
"Failed to allocate memory for Parquet page index.", new_memory_allocation);
}
page_index_memory_consumption_ += new_memory_allocation;
column_index_.min_values.emplace_back(std::move(min_val));
column_index_.max_values.emplace_back(std::move(max_val));
column_index_.null_counts.push_back(page_stats.null_count);
}
// Update row group statistics from page statistics.
DCHECK(row_group_stats_base_ != nullptr);
DCHECK(page_stats_base_ != nullptr);
row_group_stats_base_->Merge(*page_stats_base_);
// Add the size of the data page header
@@ -728,13 +807,13 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
pages_.push_back(DataPage());
current_page_ = &pages_[num_data_pages_++];
DataPageHeader header;
parquet::DataPageHeader header;
header.num_values = 0;
// The code that populates the column chunk metadata's encodings field
// relies on these specific values for the definition/repetition level
// encodings.
header.definition_level_encoding = Encoding::RLE;
header.repetition_level_encoding = Encoding::RLE;
header.definition_level_encoding = parquet::Encoding::RLE;
header.repetition_level_encoding = parquet::Encoding::RLE;
current_page_->header.__set_data_page_header(header);
}
current_encoding_ = next_page_encoding_;
@@ -861,14 +940,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
const ColumnType& type = output_expr_evals_[i]->root().type();
node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
node.__set_type(ConvertInternalToParquetType(type.type));
node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
if (type.type == TYPE_DECIMAL) {
// This column is type decimal. Update the file metadata to include the
// additional fields:
// 1) converted_type: indicate this is really a decimal column.
// 2) type_length: the number of bytes used per decimal value in the data
// 3) precision/scale
node.__set_converted_type(ConvertedType::DECIMAL);
node.__set_converted_type(parquet::ConvertedType::DECIMAL);
node.__set_type_length(
ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
node.__set_scale(output_expr_evals_[i]->root().type().scale);
@@ -876,15 +955,15 @@ Status HdfsParquetTableWriter::CreateSchema() {
} else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
(type.type == TYPE_STRING &&
state_->query_options().parquet_annotate_strings_utf8)) {
node.__set_converted_type(ConvertedType::UTF8);
node.__set_converted_type(parquet::ConvertedType::UTF8);
} else if (type.type == TYPE_TINYINT) {
node.__set_converted_type(ConvertedType::INT_8);
node.__set_converted_type(parquet::ConvertedType::INT_8);
} else if (type.type == TYPE_SMALLINT) {
node.__set_converted_type(ConvertedType::INT_16);
node.__set_converted_type(parquet::ConvertedType::INT_16);
} else if (type.type == TYPE_INT) {
node.__set_converted_type(ConvertedType::INT_32);
node.__set_converted_type(parquet::ConvertedType::INT_32);
} else if (type.type == TYPE_BIGINT) {
node.__set_converted_type(ConvertedType::INT_64);
node.__set_converted_type(parquet::ConvertedType::INT_64);
}
}
@@ -893,14 +972,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
Status HdfsParquetTableWriter::AddRowGroup() {
if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
file_metadata_.row_groups.push_back(RowGroup());
file_metadata_.row_groups.push_back(parquet::RowGroup());
current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
// Initialize new row group metadata.
int num_clustering_cols = table_desc_->num_clustering_cols();
current_row_group_->columns.resize(columns_.size());
for (int i = 0; i < columns_.size(); ++i) {
ColumnMetaData metadata;
parquet::ColumnMetaData metadata;
metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
metadata.path_in_schema.push_back(
table_desc_->col_descs()[i + num_clustering_cols].name());
@@ -1029,12 +1108,13 @@ Status HdfsParquetTableWriter::Finalize() {
file_metadata_.num_rows = row_count_;
// Set the ordering used to write parquet statistics for columns in the file.
ColumnOrder col_order = ColumnOrder();
col_order.__set_TYPE_ORDER(TypeDefinedOrder());
parquet::ColumnOrder col_order = parquet::ColumnOrder();
col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder());
file_metadata_.column_orders.assign(columns_.size(), col_order);
file_metadata_.__isset.column_orders = true;
RETURN_IF_ERROR(FlushCurrentRowGroup());
RETURN_IF_ERROR(WritePageIndex());
RETURN_IF_ERROR(WriteFileFooter());
stats_.__set_parquet_stats(parquet_insert_stats_);
COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
@@ -1069,8 +1149,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset));
DCHECK_GT(data_page_offset, 0);
ColumnChunk& col_chunk = current_row_group_->columns[i];
ColumnMetaData& col_metadata = col_chunk.meta_data;
parquet::ColumnChunk& col_chunk = current_row_group_->columns[i];
parquet::ColumnMetaData& col_metadata = col_chunk.meta_data;
col_metadata.data_page_offset = data_page_offset;
if (dict_page_offset >= 0) {
col_metadata.__set_dictionary_page_offset(dict_page_offset);
@@ -1089,23 +1169,23 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
// Write encodings and encoding stats for this column
col_metadata.encodings.clear();
for (Encoding::type encoding : col_writer->column_encodings_) {
for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
col_metadata.encodings.push_back(encoding);
}
vector<PageEncodingStats> encoding_stats;
vector<parquet::PageEncodingStats> encoding_stats;
// Add dictionary page encoding stats
for (const auto& entry: col_writer->dict_encoding_stats_) {
PageEncodingStats dict_enc_stat;
dict_enc_stat.page_type = PageType::DICTIONARY_PAGE;
parquet::PageEncodingStats dict_enc_stat;
dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE;
dict_enc_stat.encoding = entry.first;
dict_enc_stat.count = entry.second;
encoding_stats.push_back(dict_enc_stat);
}
// Add data page encoding stats
for (const auto& entry: col_writer->data_encoding_stats_) {
PageEncodingStats data_enc_stat;
data_enc_stat.page_type = PageType::DATA_PAGE;
parquet::PageEncodingStats data_enc_stat;
data_enc_stat.page_type = parquet::PageType::DATA_PAGE;
data_enc_stat.encoding = entry.first;
data_enc_stat.count = entry.second;
encoding_stats.push_back(data_enc_stat);
@@ -1129,8 +1209,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
file_pos_ += len;
col_writer->Reset();
}
// Populate RowGroup::sorting_columns with all columns specified by the Frontend.
@@ -1148,6 +1226,47 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
return Status::OK();
}
Status HdfsParquetTableWriter::WritePageIndex() {
// Currently Impala only write Parquet files with a single row group. The current
// page index logic depends on this behavior as it only keeps one row group's
// statistics in memory.
DCHECK_EQ(file_metadata_.row_groups.size(), 1);
parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]);
// Write out the column indexes.
for (int i = 0; i < columns_.size(); ++i) {
auto& column = *columns_[i];
if (!column.valid_column_index_) continue;
column.column_index_.__set_boundary_order(
column.row_group_stats_base_->GetBoundaryOrder());
// We always set null_counts.
column.column_index_.__isset.null_counts = true;
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.column_index_, &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
// Update the column_index_offset and column_index_length of the ColumnChunk
row_group->columns[i].__set_column_index_offset(file_pos_);
row_group->columns[i].__set_column_index_length(len);
file_pos_ += len;
}
// Write out the offset indexes.
for (int i = 0; i < columns_.size(); ++i) {
auto& column = *columns_[i];
uint8_t* buffer = nullptr;
uint32_t len = 0;
RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.offset_index_, &len, &buffer));
RETURN_IF_ERROR(Write(buffer, len));
// Update the offset_index_offset and offset_index_length of the ColumnChunk
row_group->columns[i].__set_offset_index_offset(file_pos_);
row_group->columns[i].__set_offset_index_length(len);
file_pos_ += len;
}
// Reset column writers.
for (auto& column : columns_) column->Reset();
return Status::OK();
}
Status HdfsParquetTableWriter::WriteFileFooter() {
// Write file_meta_data
uint32_t file_metadata_len = 0;

View File

@@ -103,6 +103,12 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
/// as 'parquet-mr'.
static const int MAX_COLUMN_STATS_SIZE = 4 * 1024;
/// In parquet::ColumnIndex we store the min and max values for each page.
/// However, we don't want to store very long strings, so we truncate them.
/// The value of it must not be too small, since we don't want to truncate
/// non-string values.
static const int PAGE_INDEX_MAX_STRING_LENGTH = 64;
/// Per-column information state. This contains some metadata as well as the
/// data buffers.
class BaseColumnWriter;
@@ -120,10 +126,14 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
/// table_desc_ into the format in the file metadata
Status CreateSchema();
/// Write the file header information to the output file.
/// Writes the file header information to the output file.
Status WriteFileHeader();
/// Write the file metadata and footer.
/// Writes the column index and offset index of each page in the file.
/// It also resets the column writers.
Status WritePageIndex();
/// Writes the file metadata and footer.
Status WriteFileFooter();
/// Flushes the current row group to file. This will compute the final

View File

@@ -26,6 +26,8 @@
#include "runtime/timestamp-value.h"
#include "runtime/types.h"
#include "gen-cpp/parquet_types.h"
namespace impala {
/// This class, together with its derivatives, is used to update column statistics when
@@ -67,6 +69,11 @@ class ColumnStatsBase {
struct MinMaxTrait {
static decltype(auto) MinValue(const T& a, const T& b) { return std::min(a, b); }
static decltype(auto) MaxValue(const T& a, const T& b) { return std::max(a, b); }
static int Compare(const T& a, const T& b) {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
};
/// min and max functions for floating point types
@@ -74,6 +81,13 @@ class ColumnStatsBase {
struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> {
static decltype(auto) MinValue(const T& a, const T& b) { return std::fmin(a, b); }
static decltype(auto) MaxValue(const T& a, const T& b) { return std::fmax(a, b); }
static int Compare(const T& a, const T& b) {
//TODO: Should be aligned with PARQUET-1222, once resolved
if (a == b) return 0;
if (std::isnan(a) && std::isnan(b)) return 0;
if (MaxValue(a, b) == a) return 1;
return -1;
}
};
ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
@@ -94,7 +108,8 @@ class ColumnStatsBase {
int64_t* null_count);
/// Merges this statistics object with values from 'other'. If other has not been
/// initialized, then this object will not be changed.
/// initialized, then this object will not be changed. It maintains internal state that
/// tracks whether the min/max values are ordered.
virtual void Merge(const ColumnStatsBase& other) = 0;
/// Copies the contents of this object's statistics values to internal buffers. Some
@@ -119,6 +134,18 @@ class ColumnStatsBase {
/// value is appended to the column or the statistics are merged.
void IncrementNullCount(int64_t count) { null_count_ += count; }
/// Returns the boundary order of the pages. That is, whether the lists of min/max
/// elements inside the ColumnIndex are ordered and if so, in which direction.
/// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is true,
/// it means all elements are equal, we choose ascending order in this case.
/// If only one flag is true, or both of them is false, then we return the identified
/// ordering, or unordered.
parquet::BoundaryOrder::type GetBoundaryOrder() const {
if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING;
if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING;
return parquet::BoundaryOrder::UNORDERED;
}
protected:
// Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
// 'buffer' is reset before making the copy.
@@ -130,6 +157,16 @@ class ColumnStatsBase {
// Number of null values since the last call to Reset().
int64_t null_count_;
// If true, min/max values are ascending.
// We assume the values are ascending, so start with true and only make it false when
// we find a descending value. If not all values are equal, then at least one of
// 'ascending_boundary_order_' and 'descending_boundary_order_' will be false.
bool ascending_boundary_order_ = true;
// If true, min/max values are descending.
// See description of 'ascending_boundary_order_'.
bool descending_boundary_order_ = true;
private:
/// Returns true if we support reading statistics stored in the fields 'min_value' and
/// 'max_value' in parquet::Statistics for the type 'col_type' and the column order
@@ -174,7 +211,9 @@ class ColumnStats : public ColumnStatsBase {
plain_encoded_value_size_(plain_encoded_value_size),
mem_pool_(mem_pool),
min_buffer_(mem_pool),
max_buffer_(mem_pool) {}
max_buffer_(mem_pool),
prev_page_min_buffer_(mem_pool),
prev_page_max_buffer_(mem_pool) {}
/// Updates the statistics based on the values min_value and max_value. If necessary,
/// initializes the statistics. It may keep a reference to either value until
@@ -216,12 +255,20 @@ class ColumnStats : public ColumnStatsBase {
// Maximum value since the last call to Reset().
T max_value_;
// Minimum value of the previous page. Need to store that to calculate boundary order.
T prev_page_min_value_;
// Maximum value of the previous page. Need to store that to calculate boundary order.
T prev_page_max_value_;
// Memory pool to allocate from when making copies of the statistics data.
MemPool* mem_pool_;
// Local buffers to copy statistics data into.
StringBuffer min_buffer_;
StringBuffer max_buffer_;
StringBuffer prev_page_min_buffer_;
StringBuffer prev_page_max_buffer_;
};
} // end ns impala

View File

@@ -28,6 +28,8 @@ namespace impala {
inline void ColumnStatsBase::Reset() {
has_min_max_values_ = false;
null_count_ = 0;
ascending_boundary_order_ = true;
descending_boundary_order_ = true;
}
template <typename T>
@@ -46,7 +48,25 @@ template <typename T>
inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_);
if (cs->has_min_max_values_) {
if (has_min_max_values_) {
if (ascending_boundary_order_) {
if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 ||
MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) {
ascending_boundary_order_ = false;
}
}
if (descending_boundary_order_) {
if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 ||
MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) {
descending_boundary_order_ = false;
}
}
}
Update(cs->min_value_, cs->max_value_);
prev_page_min_value_ = cs->min_value_;
prev_page_max_value_ = cs->max_value_;
}
IncrementNullCount(cs->null_count_);
}
@@ -176,12 +196,52 @@ inline void ColumnStats<StringValue>::Update(
}
}
template <>
inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
const ColumnStats<StringValue>* cs = static_cast<
const ColumnStats<StringValue>*>(&other);
if (cs->has_min_max_values_) {
if (has_min_max_values_) {
// Make sure that we copied the previous page's min/max values to their own buffer.
DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr),
static_cast<void*>(cs->min_value_.ptr));
DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr),
static_cast<void*>(cs->max_value_.ptr));
if (ascending_boundary_order_) {
if (prev_page_max_value_ > cs->max_value_ ||
prev_page_min_value_ > cs->min_value_) {
ascending_boundary_order_ = false;
}
}
if (descending_boundary_order_) {
if (prev_page_max_value_ < cs->max_value_ ||
prev_page_min_value_ < cs->min_value_) {
descending_boundary_order_ = false;
}
}
}
Update(cs->min_value_, cs->max_value_);
prev_page_min_value_ = cs->min_value_;
prev_page_max_value_ = cs->max_value_;
prev_page_min_buffer_.Clear();
prev_page_max_buffer_.Clear();
}
IncrementNullCount(cs->null_count_);
}
// StringValues need to be copied at the end of processing a row batch, since the batch
// memory will be released.
template <>
inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
if (prev_page_min_buffer_.IsEmpty()) {
RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, &prev_page_min_value_));
}
if (prev_page_max_buffer_.IsEmpty()) {
RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, &prev_page_max_value_));
}
return Status::OK();
}

View File

@@ -74,6 +74,7 @@ add_library(Util
runtime-profile.cc
simple-logger.cc
string-parser.cc
string-util.cc
symbols-util.cc
static-asserts.cc
summary-util.cc
@@ -135,6 +136,7 @@ ADD_BE_TEST(redactor-unconfigured-test)
ADD_BE_TEST(rle-test)
ADD_BE_TEST(runtime-profile-test)
ADD_BE_TEST(string-parser-test)
ADD_BE_TEST(string-util-test)
ADD_BE_TEST(symbols-util-test)
ADD_BE_TEST(sys-info-test)
ADD_BE_TEST(thread-pool-test)

View File

@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "testutil/gtest-util.h"
#include "util/string-util.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "common/names.h"
namespace impala {
enum Truncation {
DOWN,
UP
};
void EvalTruncation(const string& original, const string& expected_result,
int32_t max_length, Truncation boundary) {
string result;
if (boundary == DOWN) {
ASSERT_OK(TruncateDown(original, max_length, &result));
} else {
ASSERT_OK(TruncateUp(original, max_length, &result));
}
EXPECT_EQ(expected_result, result);
}
TEST(TruncateDownTest, Basic) {
EvalTruncation("0123456789", "0123456789", 100, DOWN);
EvalTruncation("0123456789", "0123456789", 10, DOWN);
EvalTruncation("0123456789", "01234", 5, DOWN);
EvalTruncation("0123456789", "", 0, DOWN);
EvalTruncation("", "", 10, DOWN);
EvalTruncation(string("\0\0\0", 3), string("\0\0", 2), 2, DOWN);
EvalTruncation("asdfghjkl", "asdf", 4, DOWN);
char a[] = {'a', CHAR_MAX, CHAR_MIN, 'b', '\0'};
char b[] = {'a', CHAR_MAX, '\0'};
EvalTruncation(a, b, 2, DOWN);
}
TEST(TruncateUpTest, Basic) {
EvalTruncation("0123456789", "0123456789", 100, UP);
EvalTruncation("abcdefghij", "abcdefghij", 10, UP);
EvalTruncation("abcdefghij", "abcdefghj", 9, UP);
EvalTruncation("abcdefghij", "abcdf", 5, UP);
string max_string(100, 0xFF);
EvalTruncation(max_string, max_string, 100, UP);
string normal_plus_max = "abcdef" + max_string;
EvalTruncation(normal_plus_max, normal_plus_max, 200, UP);
EvalTruncation(normal_plus_max, "abcdeg", 10, UP);
string result;
Status s = TruncateUp(max_string, 10, &result);
EXPECT_EQ(s.GetDetail(), "TruncateUp() couldn't increase string.\n");
EvalTruncation("", "", 10, UP);
EvalTruncation(string("\0\0\0", 3), string("\0\001", 2), 2, UP);
EvalTruncation("asdfghjkl", "asdg", 4, UP);
char a[] = {0, (char)0x7F, (char)0xFF, 0};
char b[] = {0, (char)0x80, 0};
EvalTruncation(a, b, 2, UP);
}
}
IMPALA_TEST_MAIN();

View File

@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include "gutil/strings/substitute.h"
#include "util/string-util.h"
#include "common/names.h"
namespace impala {
Status TruncateDown(const string& str, int32_t max_length, string* result) {
DCHECK(result != nullptr);
*result = str.substr(0, std::min(static_cast<int32_t>(str.length()), max_length));
return Status::OK();
}
Status TruncateUp(const string& str, int32_t max_length, string* result) {
DCHECK(result != nullptr);
if (str.length() <= max_length) {
*result = str;
return Status::OK();
}
*result = str.substr(0, max_length);
int i = max_length - 1;
while (i > 0 && static_cast<int32_t>((*result)[i]) == -1) {
(*result)[i] += 1;
--i;
}
// We convert it to unsigned because signed overflow results in undefined behavior.
unsigned char uch = static_cast<unsigned char>((*result)[i]);
uch += 1;
(*result)[i] = uch;
if (i == 0 && (*result)[i] == 0) {
return Status("TruncateUp() couldn't increase string.");
}
result->resize(i + 1);
return Status::OK();
}
}

42
be/src/util/string-util.h Normal file
View File

@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_STRING_UTIL_H
#define IMPALA_UTIL_STRING_UTIL_H
#include <string>
#include "common/status.h"
namespace impala {
/// 'str' holds the minimum value of some string set. We need to truncate it
/// if it is longer than 'max_length'.
WARN_UNUSED_RESULT
Status TruncateDown(const std::string& str, int32_t max_length, std::string* result);
/// 'str' holds the maximum value of some string set. We want to truncate it
/// to only occupy 'max_length' bytes. We also want to guarantee that the truncated
/// value remains greater than all the strings in the original set, so we need
/// to increase it after truncation. E.g.: when 'max_length' == 3: AAAAAAA => AAB
/// Returns error if it cannot increase the string value, ie. all bytes are 0xFF.
WARN_UNUSED_RESULT
Status TruncateUp(const std::string& str, int32_t max_length, std::string* result);
}
#endif

View File

@@ -362,6 +362,16 @@ enum PageType {
DATA_PAGE_V2 = 3;
}
/**
* Enum to annotate whether lists of min/max elements inside ColumnIndex
* are ordered and if so, in which direction.
*/
enum BoundaryOrder {
UNORDERED = 0;
ASCENDING = 1;
DESCENDING = 2;
}
/** Data page header */
struct DataPageHeader {
/** Number of values, including NULLs, in this data page. **/
@@ -551,6 +561,18 @@ struct ColumnChunk {
* metadata.
**/
3: optional ColumnMetaData meta_data
/** File offset of ColumnChunk's OffsetIndex **/
4: optional i64 offset_index_offset
/** Size of ColumnChunk's OffsetIndex, in bytes **/
5: optional i32 offset_index_length
/** File offset of ColumnChunk's ColumnIndex **/
6: optional i64 column_index_offset
/** Size of ColumnChunk's ColumnIndex, in bytes **/
7: optional i32 column_index_length
}
struct RowGroup {
@@ -588,6 +610,69 @@ union ColumnOrder {
1: TypeDefinedOrder TYPE_ORDER;
}
struct PageLocation {
/** Offset of the page in the file **/
1: required i64 offset
/**
* Size of the page, including header. Sum of compressed_page_size and header
* length
*/
2: required i32 compressed_page_size
/**
* Index within the RowGroup of the first row of the page; this means pages
* change on record boundaries (r = 0).
*/
3: required i64 first_row_index
}
struct OffsetIndex {
/**
* PageLocations, ordered by increasing PageLocation.offset. It is required
* that page_locations[i].first_row_index < page_locations[i+1].first_row_index.
*/
1: required list<PageLocation> page_locations
}
/**
* Description for ColumnIndex.
* Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i]
*/
struct ColumnIndex {
/**
* A list of Boolean values to determine the validity of the corresponding
* min and max values. If true, a page contains only null values, and writers
* have to set the corresponding entries in min_values and max_values to
* byte[0], so that all lists have the same length. If false, the
* corresponding entries in min_values and max_values must be valid.
*/
1: required list<bool> null_pages
/**
* Two lists containing lower and upper bounds for the values of each page.
* These may be the actual minimum and maximum values found on a page, but
* can also be (more compact) values that do not exist on a page. For
* example, instead of storing ""Blart Versenwald III", a writer may set
* min_values[i]="B", max_values[i]="C". Such more compact values must still
* be valid values within the column's logical type. Readers must make sure
* that list entries are populated before using them by inspecting null_pages.
*/
2: required list<binary> min_values
3: required list<binary> max_values
/**
* Stores whether both min_values and max_values are orderd and if so, in
* which direction. This allows readers to perform binary searches in both
* lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
* if the lists are ordered.
*/
4: required BoundaryOrder boundary_order
/** A list containing the number of null values for each page **/
5: optional list<i64> null_counts
}
/**
* Description for file metadata
*/

View File

@@ -66,6 +66,38 @@ ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=2)
ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=3)
SET FILEFORMAT RCFILE;
DROP TABLE IF EXISTS functional_parquet.chars_formats;
CREATE EXTERNAL TABLE functional_parquet.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS PARQUET
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_parquet';
DROP TABLE IF EXISTS functional_orc_def.chars_formats;
CREATE EXTERNAL TABLE functional_orc_def.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS ORC
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_orc_def';
DROP TABLE IF EXISTS functional.chars_formats;
CREATE EXTERNAL TABLE functional.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
ROW FORMAT delimited fields terminated by ',' escaped by '\\'
STORED AS TEXTFILE
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_text';
DROP TABLE IF EXISTS functional_avro_snap.chars_formats;
CREATE EXTERNAL TABLE functional_avro_snap.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS AVRO
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_avro_snap'
TBLPROPERTIES ('avro.schema.literal'='{"type":"record",
"name":"CharTypesTest","doc":"Schema generated by Kite",
"fields":[
{"name":"cs","type":["null","string"], "doc":"Type inferred"},
{"name":"cl","type":["null","string"], "doc":"Type inferred"},
{"name":"vc","type":["null","string"], "doc":"Type inferred"}
]}');
---- Unsupported Impala table types
USE functional;
CREATE VIEW IF NOT EXISTS hive_view AS SELECT 1 AS int_col FROM alltypes limit 1;
@@ -74,4 +106,5 @@ USE functional;
DROP INDEX IF EXISTS hive_index ON alltypes;
CREATE INDEX hive_index ON TABLE alltypes (int_col)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD IN TABLE hive_index_tbl
WITH DEFERRED REBUILD IN TABLE hive_index_tbl;

View File

@@ -33,17 +33,17 @@ show table stats alltypes
YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
---- RESULTS
'2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
'2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
'2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
'2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
'Total','',3650,3650,12,regex:.*B,'0B','','','',''
---- TYPES
STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING

View File

@@ -47,39 +47,6 @@ class TestCharFormats(ImpalaTestSuite):
def get_workload(cls):
return 'functional-query'
def setup_method(self, method):
self.__create_char_tables()
def __create_char_tables(self):
self.client.execute('''create external table if not exists
functional_parquet.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS PARQUET
LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_parquet")))
self.client.execute('''create external table if not exists
functional_orc_def.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS ORC
LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_orc_def")))
self.client.execute('''create external table if not exists
functional.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
ROW FORMAT delimited fields terminated by ',' escaped by '\\\\'
STORED AS TEXTFILE
LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_text")))
self.client.execute('''create external table if not exists
functional_avro_snap.chars_formats
(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
STORED AS AVRO
LOCATION "{0}"
TBLPROPERTIES ('avro.schema.literal'='{{"type":"record",
"name":"CharTypesTest","doc":"Schema generated by Kite",
"fields":[
{{"name":"cs","type":["null","string"], "doc":"Type inferred"}},
{{"name":"cl","type":["null","string"], "doc":"Type inferred"}},
{{"name":"vc","type":["null","string"],"doc":"Type inferred"}}]}}')
'''.format(get_fs_path("/test-warehouse/chars_formats_avro_snap")))
@classmethod
def add_test_dimensions(cls):
super(TestCharFormats, cls).add_test_dimensions()

View File

@@ -0,0 +1,365 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Targeted Impala insert tests
import os
from collections import namedtuple
from subprocess import check_call
from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.filesystem_utils import get_fs_path
from tests.util.get_parquet_metadata import (
decode_stats_value,
get_parquet_metadata,
read_serialized_object
)
PAGE_INDEX_MAX_STRING_LENGTH = 64
class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
"""Since PARQUET-922 page statistics can be written before the footer.
The tests in this class checks if Impala writes the page indices correctly.
"""
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def _get_row_group_from_file(self, parquet_file):
"""Returns namedtuples that contain the schema, stats, offset_index, column_index,
and page_headers for each column in the first row group in file 'parquet_file'. Fails
if the file contains multiple row groups.
"""
ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
'column_index', 'page_headers'])
file_meta_data = get_parquet_metadata(parquet_file)
assert len(file_meta_data.row_groups) == 1
# We only support flat schemas, the additional element is the root element.
schemas = file_meta_data.schema[1:]
row_group = file_meta_data.row_groups[0]
assert len(schemas) == len(row_group.columns)
row_group_index = []
with open(parquet_file) as file_handle:
for column, schema in zip(row_group.columns, schemas):
column_index_offset = column.column_index_offset
column_index_length = column.column_index_length
column_index = None
if column_index_offset and column_index_length:
column_index = read_serialized_object(ColumnIndex, file_handle,
column_index_offset, column_index_length)
column_meta_data = column.meta_data
stats = None
if column_meta_data:
stats = column_meta_data.statistics
offset_index_offset = column.offset_index_offset
offset_index_length = column.offset_index_length
offset_index = None
page_headers = []
if offset_index_offset and offset_index_length:
offset_index = read_serialized_object(OffsetIndex, file_handle,
offset_index_offset, offset_index_length)
for page_loc in offset_index.page_locations:
page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
page_loc.compressed_page_size)
page_headers.append(page_header)
column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
row_group_index.append(column_info)
return row_group_index
def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
"""Returns a list of column infos (containing the schema, stats, offset_index,
column_index, and page_headers) for the first row group in all parquet files in
'hdfs_path'.
"""
row_group_indexes = []
check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
for root, subdirs, files in os.walk(tmpdir.strpath):
for f in files:
parquet_file = os.path.join(root, str(f))
row_group_indexes.append(self._get_row_group_from_file(parquet_file))
return row_group_indexes
def _validate_page_locations(self, page_locations):
"""Validate that the page locations are in order."""
for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
assert previous_loc.offset < current_loc.offset
assert previous_loc.first_row_index < current_loc.first_row_index
def _validate_null_stats(self, index_size, column_info):
"""Validates the statistics stored in null_pages and null_counts."""
column_index = column_info.column_index
column_stats = column_info.stats
assert column_index.null_pages is not None
assert len(column_index.null_pages) == index_size
assert column_index.null_counts is not None
assert len(column_index.null_counts) == index_size
for page_is_null, null_count, page_header in zip(column_index.null_pages,
column_index.null_counts, column_info.page_headers):
assert page_header.type == PageType.DATA_PAGE
num_values = page_header.data_page_header.num_values
assert not page_is_null or null_count == num_values
if column_stats:
assert column_stats.null_count == sum(column_index.null_counts)
def _validate_min_max_values(self, index_size, column_info):
"""Validate min/max values of the pages in a column chunk."""
column_index = column_info.column_index
min_values = column_info.column_index.min_values
assert len(min_values) == index_size
max_values = column_info.column_index.max_values
assert len(max_values) == index_size
if not column_info.stats:
return
column_min_value_str = column_info.stats.min_value
column_max_value_str = column_info.stats.max_value
if column_min_value_str is None or column_max_value_str is None:
# If either is None, then both need to be None.
assert column_min_value_str is None and column_max_value_str is None
# No min and max value, all pages need to be null
for idx, null_page in enumerate(column_index.null_pages):
assert null_page, "Page {} of column {} is not null, \
but doesn't have min and max values!".format(idx, column_index.schema.name)
# Everything is None, no further checks needed.
return
column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
for null_page, page_min_str in zip(column_index.null_pages, min_values):
if not null_page:
page_min_value = decode_stats_value(column_info.schema, page_min_str)
# If type is str, page_min_value might have been truncated.
if isinstance(page_min_value, basestring):
assert page_min_value >= column_min_value[:len(page_min_value)]
else:
assert page_min_value >= column_min_value
column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
for null_page, page_max_str in zip(column_index.null_pages, max_values):
if not null_page:
page_max_value = decode_stats_value(column_info.schema, page_max_str)
# If type is str, page_max_value might have been truncated and incremented.
if (isinstance(page_max_value, basestring) and
len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
max_val_prefix = page_max_value.rstrip('\0')
assert max_val_prefix[:-1] <= column_max_value
else:
assert page_max_value <= column_max_value
def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
"""Check if the ordering of the values reflects the value of 'ordering'."""
def is_sorted(l, reverse=False):
if not reverse:
return all(a <= b for a, b in zip(l, l[1:]))
else:
return all(a >= b for a, b in zip(l, l[1:]))
# Filter out null pages and decode the actual min/max values.
actual_min_values = [decode_stats_value(schema, min_val)
for min_val, is_null in zip(min_values, null_pages)
if not is_null]
actual_max_values = [decode_stats_value(schema, max_val)
for max_val, is_null in zip(max_values, null_pages)
if not is_null]
# For ASCENDING and DESCENDING, both min and max values need to be sorted.
if ordering == BoundaryOrder.ASCENDING:
assert is_sorted(actual_min_values)
assert is_sorted(actual_max_values)
elif ordering == BoundaryOrder.DESCENDING:
assert is_sorted(actual_min_values, reverse=True)
assert is_sorted(actual_max_values, reverse=True)
else:
assert ordering == BoundaryOrder.UNORDERED
# For UNORDERED, min and max values cannot be both sorted.
assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
assert (not is_sorted(actual_min_values, reverse=True) or
not is_sorted(actual_max_values, reverse=True))
def _validate_boundary_order(self, column_info):
"""Validate that min/max values are really in the order specified by
boundary order.
"""
column_index = column_info.column_index
self._validate_ordering(column_index.boundary_order, column_info.schema,
column_index.null_pages, column_index.min_values, column_index.max_values)
def _validate_parquet_page_index(self, hdfs_path, tmpdir):
"""Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
index in that file is in the valid format.
"""
row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
for columns in row_group_indexes:
for column_info in columns:
try:
index_size = len(column_info.offset_index.page_locations)
assert index_size > 0
self._validate_page_locations(column_info.offset_index.page_locations)
self._validate_null_stats(index_size, column_info)
self._validate_min_max_values(index_size, column_info)
self._validate_boundary_order(column_info)
except AssertionError as e:
e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
raise
def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
tmpdir, sorting_column=None):
"""Copies 'source_table' into a parquet table and makes sure that the index
in the resulting parquet file is valid.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
# Setting num_nodes = 1 ensures that the query is executed on the coordinator,
# resulting in a single parquet file being written.
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query("drop table if exists {0}".format(qualified_table_name))
if sorting_column is None:
query = ("create table {0} stored as parquet as select * from {1}").format(
qualified_table_name, source_table)
else:
query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
).format(qualified_table_name, sorting_column, source_table)
self.execute_query(query, vector.get_value('exec_option'))
self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
def _create_string_table_with_values(self, vector, unique_database, table_name,
values_sql):
"""Creates a parquet table that has a single string column, then invokes an insert
statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
It returns the HDFS path for the table.
"""
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
self.execute_query("drop table if exists {0}".format(qualified_table_name))
vector.get_value('exec_option')['num_nodes'] = 1
query = ("create table {0} (str string) stored as parquet").format(qualified_table_name)
self.execute_query(query, vector.get_value('exec_option'))
self.execute_query("insert into {0} values {1}".format(qualified_table_name,
values_sql), vector.get_value('exec_option'))
return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
def test_write_index_alltypes(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup indexes with the correct
values.
"""
self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
tmpdir)
def test_write_index_decimals(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup indexes with the correct
values, using decimal types.
"""
self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
tmpdir)
def test_write_index_chars(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup indexes with the correct
values, using char types.
"""
self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
tmpdir)
def test_write_index_null(self, vector, unique_database, tmpdir):
"""Test that we don't write min/max values in the index for null columns.
Ensure null_count is set for columns with null values.
"""
self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
tmpdir)
def test_write_index_multi_page(self, vector, unique_database, tmpdir):
"""Test that when a ColumnChunk is written across multiple pages, the index is
valid.
"""
self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
tmpdir)
self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
tmpdir)
def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
"""Test that when the schema has a sorting column, the index is valid."""
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet.zipcode_incomes", tmpdir, "id")
def test_write_index_wide_table(self, vector, unique_database, tmpdir):
"""Test table with wide row."""
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet.widerow", tmpdir)
def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir):
"""Test tables with wide rows and many columns."""
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet.widetable_250_cols", tmpdir)
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet.widetable_500_cols", tmpdir)
self._ctas_table_and_verify_index(vector, unique_database,
"functional_parquet.widetable_1000_cols", tmpdir)
def test_max_string_values(self, vector, unique_database, tmpdir):
"""Test string values that are all 0xFFs or end with 0xFFs."""
# String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
short_tbl = "short_tbl"
short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
# String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
fit_tbl = "fit_tbl"
fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
# All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
# should not write page statistics.
too_long_tbl = "too_long_tbl"
too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
tmpdir.join(too_long_tbl))
column = row_group_indexes[0][0]
assert column.column_index is None
# We always write the offset index
assert column.offset_index is not None
# Test string with value that starts with 'aaa' following with 0xFFs and its length is
# greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
aaa_tbl = "aaa_tbl"
aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
tmpdir.join(aaa_tbl))
column = row_group_indexes[0][0]
assert len(column.column_index.max_values) == 1
max_value = column.column_index.max_values[0]
assert max_value == 'aab'

View File

@@ -20,13 +20,21 @@ import struct
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from parquet.ttypes import FileMetaData, Type
from parquet.ttypes import ColumnIndex, FileMetaData, OffsetIndex, PageHeader, Type
from thrift.protocol import TCompactProtocol
from thrift.transport import TTransport
PARQUET_VERSION_NUMBER = 'PAR1'
def create_protocol(serialized_object_buffer):
"""Creates a thrift protocol object from a memory buffer. The buffer should
contain a serialized thrift object.
"""
transport = TTransport.TMemoryBuffer(serialized_object_buffer)
return TCompactProtocol.TCompactProtocol(transport)
def julian_day_to_date(julian_day):
"""Converts a julian day number into a Gregorian date. The reference date is picked
arbitrarily and can be validated with an online converter like
@@ -71,7 +79,8 @@ def parse_double(s):
def decode_timestamp(s):
"""Reinterprets the string 's' as a 12-byte timestamp as written by Impala and decode it
into a datetime object."""
into a datetime object.
"""
# Impala writes timestamps as 12-byte values. The first 8 byte store a
# boost::posix_time::time_duration, which is the time within the current day in
# nanoseconds stored as int64. The last 4 bytes store a boost::gregorian::date,
@@ -99,7 +108,8 @@ def decode_decimal(schema, value):
def decode_stats_value(schema, value):
"""Decodes 'value' according to 'schema. It expects 'value' to be plain encoded. For
BOOLEAN values, only the least significant bit is parsed and returned. Binary arrays are
expected to be stored as such, without a preceding length."""
expected to be stored as such, without a preceding length.
"""
column_type = schema.type
if column_type == Type.BOOLEAN:
return parse_boolean(value)
@@ -123,9 +133,22 @@ def decode_stats_value(schema, value):
return None
def read_serialized_object(thrift_class, file, file_pos, length):
"""Reads an instance of class 'thrift_class' from an already opened file at the
given position.
"""
file.seek(file_pos)
serialized_thrift_object = file.read(length)
protocol = create_protocol(serialized_thrift_object)
thrift_object = thrift_class()
thrift_object.read(protocol)
return thrift_object
def get_parquet_metadata(filename):
"""Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a local
file path."""
file path.
"""
file_size = os.path.getsize(filename)
with open(filename) as f:
# Check file starts and ends with magic bytes
@@ -140,13 +163,8 @@ def get_parquet_metadata(filename):
f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4)
metadata_len = parse_int32(f.read(4))
# Read metadata
f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len)
serialized_metadata = f.read(metadata_len)
# Calculate metadata position in file
metadata_pos = file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len
# Deserialize metadata
transport = TTransport.TMemoryBuffer(serialized_metadata)
protocol = TCompactProtocol.TCompactProtocol(transport)
metadata = FileMetaData()
metadata.read(protocol)
return metadata
# Return deserialized FileMetaData object
return read_serialized_object(FileMetaData, f, metadata_pos, metadata_len)