mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-7644: Hide Parquet page index writing with feature flag
This commit adds the command line flag enable_parquet_page_index_writing to the Impala daemon that switches Impala's ability of writing the Parquet page index. By default the flag is false, i.e. Impala doesn't write the page index. This flag is only temporary, we plan to remove it once Impala is able to read the page index and has better testing around it. Because of this change I had to move test_parquet_page_index.py to the custom_cluster test suite since I need to set this command line flag in order to test the functionality. I also merged most of the test cases because we don't want to restart the cluster too many times. I removed 'num_data_pages_' from BaseColumnWriter since it was rather confusing and didn't provide any measurable performance improvement. This commit fixes the ASAN error produced by the first IMPALA-7644 commit which was reverted later. Change-Id: Ib4a9098a2085a385351477c715ae245d83bf1c72 Reviewed-on: http://gerrit.cloudera.org:8080/11694 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
275124e874
commit
de7f09d726
@@ -239,6 +239,12 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
|
||||
"The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
|
||||
"old GC generation to be almost full.");
|
||||
|
||||
DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true, Impala "
|
||||
"will write the Parquet page index. It is not advised to use it in a production "
|
||||
"environment, only for testing and development. This flag is meant to be temporary. "
|
||||
"We plan to remove this flag once Impala is able to read the page index and has "
|
||||
"better test coverage around it.");
|
||||
|
||||
// ++========================++
|
||||
// || Startup flag graveyard ||
|
||||
// ++========================++
|
||||
|
||||
@@ -83,6 +83,8 @@ using namespace apache::thrift;
|
||||
// the columns and run that function over row batches.
|
||||
// TODO: we need to pass in the compression from the FE/metadata
|
||||
|
||||
DECLARE_bool(enable_parquet_page_index_writing_debug_only);
|
||||
|
||||
namespace impala {
|
||||
|
||||
// Base class for column writers. This contains most of the logic except for
|
||||
@@ -163,12 +165,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
|
||||
// Any data for previous row groups must be reset (e.g. dictionaries).
|
||||
// Subclasses must call this if they override this function.
|
||||
virtual void Reset() {
|
||||
num_data_pages_ = 0;
|
||||
current_page_ = nullptr;
|
||||
num_values_ = 0;
|
||||
total_compressed_byte_size_ = 0;
|
||||
current_encoding_ = parquet::Encoding::PLAIN;
|
||||
next_page_encoding_ = parquet::Encoding::PLAIN;
|
||||
pages_.clear();
|
||||
current_page_ = nullptr;
|
||||
column_encodings_.clear();
|
||||
dict_encoding_stats_.clear();
|
||||
data_encoding_stats_.clear();
|
||||
@@ -205,6 +207,58 @@ class HdfsParquetTableWriter::BaseColumnWriter {
|
||||
protected:
|
||||
friend class HdfsParquetTableWriter;
|
||||
|
||||
Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
|
||||
if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
|
||||
return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
|
||||
"Failed to allocate memory for Parquet page index.", new_memory_allocation);
|
||||
}
|
||||
page_index_memory_consumption_ += new_memory_allocation;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ReserveOffsetIndex(int64_t capacity) {
|
||||
if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
|
||||
RETURN_IF_ERROR(
|
||||
AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
|
||||
offset_index_.page_locations.reserve(capacity);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
|
||||
if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
|
||||
offset_index_.page_locations.push_back(location);
|
||||
}
|
||||
|
||||
Status AddPageStatsToColumnIndex() {
|
||||
if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
|
||||
parquet::Statistics page_stats;
|
||||
page_stats_base_->EncodeToThrift(&page_stats);
|
||||
// If pages_stats contains min_value and max_value, then append them to min_values_
|
||||
// and max_values_ and also mark the page as not null. In case min and max values are
|
||||
// not set, push empty strings to maintain the consistency of the index and mark the
|
||||
// page as null. Always push the null_count.
|
||||
string min_val;
|
||||
string max_val;
|
||||
if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
|
||||
Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
|
||||
&min_val);
|
||||
Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
|
||||
&max_val);
|
||||
if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
|
||||
column_index_.null_pages.push_back(false);
|
||||
} else {
|
||||
DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
|
||||
column_index_.null_pages.push_back(true);
|
||||
DCHECK_EQ(page_stats.null_count, num_values_);
|
||||
}
|
||||
RETURN_IF_ERROR(
|
||||
AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
|
||||
column_index_.min_values.emplace_back(std::move(min_val));
|
||||
column_index_.max_values.emplace_back(std::move(max_val));
|
||||
column_index_.null_counts.push_back(page_stats.null_count);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Encodes value into the current page output buffer and updates the column statistics
|
||||
// aggregates. Returns true if the value was appended successfully to the current page.
|
||||
// Returns false if the value was not appended to the current page and the caller can
|
||||
@@ -254,12 +308,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
|
||||
// compressed.
|
||||
scoped_ptr<Codec> compressor_;
|
||||
|
||||
vector<DataPage> pages_;
|
||||
|
||||
// Number of pages in 'pages_' that are used. 'pages_' is reused between flushes
|
||||
// so this number can be less than pages_.size()
|
||||
int num_data_pages_;
|
||||
|
||||
// Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased
|
||||
// when pages are not big enough. This only happens when there are enough unique values
|
||||
// such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very
|
||||
@@ -267,6 +315,10 @@ class HdfsParquetTableWriter::BaseColumnWriter {
|
||||
// TODO: Consider removing and only creating a single large page as necessary.
|
||||
int64_t page_size_;
|
||||
|
||||
// Pages belong to this column chunk. We need to keep them in memory in order to write
|
||||
// them together.
|
||||
vector<DataPage> pages_;
|
||||
|
||||
// Pointer to the current page in 'pages_'. Not owned.
|
||||
DataPage* current_page_;
|
||||
|
||||
@@ -645,11 +697,10 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
|
||||
|
||||
*first_data_page = *file_pos;
|
||||
int64_t current_row_group_index = 0;
|
||||
offset_index_.page_locations.resize(num_data_pages_);
|
||||
RETURN_IF_ERROR(ReserveOffsetIndex(pages_.size()));
|
||||
|
||||
// Write data pages
|
||||
for (int i = 0; i < num_data_pages_; ++i) {
|
||||
DataPage& page = pages_[i];
|
||||
for (const DataPage& page : pages_) {
|
||||
parquet::PageLocation location;
|
||||
|
||||
if (page.header.data_page_header.num_values == 0) {
|
||||
@@ -657,7 +708,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
|
||||
location.offset = -1;
|
||||
location.compressed_page_size = 0;
|
||||
location.first_row_index = -1;
|
||||
offset_index_.page_locations[i] = location;
|
||||
AddLocationToOffsetIndex(location);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -677,7 +728,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
|
||||
// its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
|
||||
// also includes the size of the page header.
|
||||
location.compressed_page_size = page.header.compressed_page_size + len;
|
||||
offset_index_.page_locations[i] = location;
|
||||
AddLocationToOffsetIndex(location);
|
||||
|
||||
// Write the page data
|
||||
RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
|
||||
@@ -754,37 +805,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
|
||||
}
|
||||
|
||||
DCHECK(page_stats_base_ != nullptr);
|
||||
parquet::Statistics page_stats;
|
||||
page_stats_base_->EncodeToThrift(&page_stats);
|
||||
{
|
||||
// If pages_stats contains min_value and max_value, then append them to min_values_
|
||||
// and max_values_ and also mark the page as not null. In case min and max values are
|
||||
// not set, push empty strings to maintain the consistency of the index and mark the
|
||||
// page as null. Always push the null_count.
|
||||
string min_val;
|
||||
string max_val;
|
||||
if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
|
||||
Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
|
||||
&min_val);
|
||||
Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
|
||||
&max_val);
|
||||
if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
|
||||
column_index_.null_pages.push_back(false);
|
||||
} else {
|
||||
DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
|
||||
column_index_.null_pages.push_back(true);
|
||||
DCHECK_EQ(page_stats.null_count, num_values_);
|
||||
}
|
||||
int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
|
||||
if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
|
||||
return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
|
||||
"Failed to allocate memory for Parquet page index.", new_memory_allocation);
|
||||
}
|
||||
page_index_memory_consumption_ += new_memory_allocation;
|
||||
column_index_.min_values.emplace_back(std::move(min_val));
|
||||
column_index_.max_values.emplace_back(std::move(max_val));
|
||||
column_index_.null_counts.push_back(page_stats.null_count);
|
||||
}
|
||||
RETURN_IF_ERROR(AddPageStatsToColumnIndex());
|
||||
|
||||
// Update row group statistics from page statistics.
|
||||
DCHECK(row_group_stats_base_ != nullptr);
|
||||
@@ -805,25 +826,17 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
|
||||
}
|
||||
|
||||
void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
|
||||
if (num_data_pages_ < pages_.size()) {
|
||||
// Reuse an existing page
|
||||
current_page_ = &pages_[num_data_pages_++];
|
||||
current_page_->header.data_page_header.num_values = 0;
|
||||
current_page_->header.compressed_page_size = 0;
|
||||
current_page_->header.uncompressed_page_size = 0;
|
||||
} else {
|
||||
pages_.push_back(DataPage());
|
||||
current_page_ = &pages_[num_data_pages_++];
|
||||
pages_.push_back(DataPage());
|
||||
current_page_ = &pages_.back();
|
||||
|
||||
parquet::DataPageHeader header;
|
||||
header.num_values = 0;
|
||||
// The code that populates the column chunk metadata's encodings field
|
||||
// relies on these specific values for the definition/repetition level
|
||||
// encodings.
|
||||
header.definition_level_encoding = parquet::Encoding::RLE;
|
||||
header.repetition_level_encoding = parquet::Encoding::RLE;
|
||||
current_page_->header.__set_data_page_header(header);
|
||||
}
|
||||
parquet::DataPageHeader header;
|
||||
header.num_values = 0;
|
||||
// The code that populates the column chunk metadata's encodings field
|
||||
// relies on these specific values for the definition/repetition level
|
||||
// encodings.
|
||||
header.definition_level_encoding = parquet::Encoding::RLE;
|
||||
header.repetition_level_encoding = parquet::Encoding::RLE;
|
||||
current_page_->header.__set_data_page_header(header);
|
||||
current_encoding_ = next_page_encoding_;
|
||||
current_page_->finalized = false;
|
||||
current_page_->num_non_null = 0;
|
||||
@@ -1137,6 +1150,7 @@ Status HdfsParquetTableWriter::Finalize() {
|
||||
|
||||
RETURN_IF_ERROR(FlushCurrentRowGroup());
|
||||
RETURN_IF_ERROR(WritePageIndex());
|
||||
for (auto& column : columns_) column->Reset();
|
||||
RETURN_IF_ERROR(WriteFileFooter());
|
||||
stats_.__set_parquet_stats(parquet_insert_stats_);
|
||||
COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
|
||||
@@ -1249,6 +1263,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
|
||||
}
|
||||
|
||||
Status HdfsParquetTableWriter::WritePageIndex() {
|
||||
if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
|
||||
|
||||
// Currently Impala only write Parquet files with a single row group. The current
|
||||
// page index logic depends on this behavior as it only keeps one row group's
|
||||
// statistics in memory.
|
||||
@@ -1284,8 +1300,6 @@ Status HdfsParquetTableWriter::WritePageIndex() {
|
||||
row_group->columns[i].__set_offset_index_length(len);
|
||||
file_pos_ += len;
|
||||
}
|
||||
// Reset column writers.
|
||||
for (auto& column : columns_) column->Reset();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -33,17 +33,17 @@ show table stats alltypes
|
||||
YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
|
||||
---- RESULTS
|
||||
'2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
|
||||
'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
|
||||
'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
|
||||
'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
|
||||
'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
|
||||
'2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
|
||||
'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
|
||||
'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
|
||||
'2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
|
||||
'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
|
||||
'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
|
||||
'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
|
||||
'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
|
||||
'2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
|
||||
'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
|
||||
'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
|
||||
'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
|
||||
'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
|
||||
'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
|
||||
'Total','',3650,3650,12,regex:.*B,'0B','','','',''
|
||||
---- TYPES
|
||||
STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
|
||||
|
||||
@@ -23,7 +23,7 @@ from collections import namedtuple
|
||||
from subprocess import check_call
|
||||
from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import SkipIfLocal
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
from tests.util.get_parquet_metadata import (
|
||||
@@ -36,9 +36,14 @@ PAGE_INDEX_MAX_STRING_LENGTH = 64
|
||||
|
||||
|
||||
@SkipIfLocal.parquet_file_size
|
||||
class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
|
||||
class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
|
||||
"""Since PARQUET-922 page statistics can be written before the footer.
|
||||
The tests in this class checks if Impala writes the page indices correctly.
|
||||
It is temporarily a custom cluster test suite because we need to set the
|
||||
enable_parquet_page_index_writing command-line flag for the Impala daemon
|
||||
in order to make it write the page index.
|
||||
TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by
|
||||
default, this test suite should be moved back to query tests.
|
||||
"""
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
@@ -46,7 +51,7 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
|
||||
super(CustomClusterTestSuite, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.add_constraint(
|
||||
lambda v: v.get_value('table_format').file_format == 'parquet')
|
||||
|
||||
@@ -269,62 +274,54 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
|
||||
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
|
||||
self.execute_query("drop table if exists {0}".format(qualified_table_name))
|
||||
vector.get_value('exec_option')['num_nodes'] = 1
|
||||
query = ("create table {0} (str string) stored as parquet").format(qualified_table_name)
|
||||
query = ("create table {0} (str string) stored as parquet").format(
|
||||
qualified_table_name)
|
||||
self.execute_query(query, vector.get_value('exec_option'))
|
||||
self.execute_query("insert into {0} values {1}".format(qualified_table_name,
|
||||
values_sql), vector.get_value('exec_option'))
|
||||
return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
|
||||
table_name))
|
||||
|
||||
def test_write_index_alltypes(self, vector, unique_database, tmpdir):
|
||||
"""Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
values.
|
||||
"""
|
||||
@CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
|
||||
def test_ctas_tables(self, vector, unique_database, tmpdir):
|
||||
"""Test different Parquet files created via CTAS statements."""
|
||||
|
||||
# Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
# values.
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
|
||||
tmpdir)
|
||||
|
||||
def test_write_index_decimals(self, vector, unique_database, tmpdir):
|
||||
"""Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
values, using decimal types.
|
||||
"""
|
||||
# Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
# values, using decimal types.
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
|
||||
tmpdir)
|
||||
|
||||
def test_write_index_chars(self, vector, unique_database, tmpdir):
|
||||
"""Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
values, using char types.
|
||||
"""
|
||||
# Test that writing a parquet file populates the rowgroup indexes with the correct
|
||||
# values, using char types.
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
|
||||
tmpdir)
|
||||
|
||||
def test_write_index_null(self, vector, unique_database, tmpdir):
|
||||
"""Test that we don't write min/max values in the index for null columns.
|
||||
Ensure null_count is set for columns with null values.
|
||||
"""
|
||||
# Test that we don't write min/max values in the index for null columns.
|
||||
# Ensure null_count is set for columns with null values.
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
|
||||
tmpdir)
|
||||
|
||||
def test_write_index_multi_page(self, vector, unique_database, tmpdir):
|
||||
"""Test that when a ColumnChunk is written across multiple pages, the index is
|
||||
valid.
|
||||
"""
|
||||
# Test that when a ColumnChunk is written across multiple pages, the index is
|
||||
# valid.
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
|
||||
tmpdir)
|
||||
self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
|
||||
tmpdir)
|
||||
|
||||
def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
|
||||
"""Test that when the schema has a sorting column, the index is valid."""
|
||||
# Test that when the schema has a sorting column, the index is valid.
|
||||
self._ctas_table_and_verify_index(vector, unique_database,
|
||||
"functional_parquet.zipcode_incomes", tmpdir, "id")
|
||||
|
||||
def test_write_index_wide_table(self, vector, unique_database, tmpdir):
|
||||
"""Test table with wide row."""
|
||||
# Test table with wide row.
|
||||
self._ctas_table_and_verify_index(vector, unique_database,
|
||||
"functional_parquet.widerow", tmpdir)
|
||||
|
||||
def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir):
|
||||
"""Test tables with wide rows and many columns."""
|
||||
# Test tables with wide rows and many columns.
|
||||
self._ctas_table_and_verify_index(vector, unique_database,
|
||||
"functional_parquet.widetable_250_cols", tmpdir)
|
||||
self._ctas_table_and_verify_index(vector, unique_database,
|
||||
@@ -332,6 +329,7 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
|
||||
self._ctas_table_and_verify_index(vector, unique_database,
|
||||
"functional_parquet.widetable_1000_cols", tmpdir)
|
||||
|
||||
@CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
|
||||
def test_max_string_values(self, vector, unique_database, tmpdir):
|
||||
"""Test string values that are all 0xFFs or end with 0xFFs."""
|
||||
|
||||
@@ -351,7 +349,8 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
|
||||
# should not write page statistics.
|
||||
too_long_tbl = "too_long_tbl"
|
||||
too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
|
||||
too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
|
||||
too_long_tbl, "(rpad('', {0}, chr(255)))".format(
|
||||
PAGE_INDEX_MAX_STRING_LENGTH + 1))
|
||||
row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
|
||||
tmpdir.join(too_long_tbl))
|
||||
column = row_group_indexes[0][0]
|
||||
Reference in New Issue
Block a user