IMPALA-8450: Add support for zstd in parquet

Makefile was updated to include zstd in the ${IMPALA_HOME}/toolchain
directory. Other changes were made to make zstd headers and libs
accessible.

Class ZstandardCompressor/ZstandardDecompressor was added to provide
interfaces for calling ZSTD_compress/ZSTD_decompress functions. Zstd
supports different compression levels (clevel) from 1 to
ZSTD_maxCLevel(). Zstd also supports -ive clevels, but since the -ive
values represents uncompressed data they won't be supported. The default
clevel is ZSTD_CLEVEL_DEFAULT.

HdfsParquetTableWriter was updated to support ZSTD codec. The
new codecs can be set using existing query option as follows:
  set COMPRESSION_CODEC=ZSTD:<clevel>;
  set COMPRESSION_CODEC=ZSTD; // uses ZSTD_CLEVEL_DEFAULT

Testing:
  - Added unit test in DecompressorTest class with ZSTD_CLEVEL_DEFAULT
    clevel and a random clevel. The test unit decompresses an input
    compressed data and validates the result. It also tests for
    expected behavior when passing an over/under sized buffer for
    decompressing.
  - Added unit tests for valid/invalid values for COMPRESSION_CODEC.
  - Added e2e test in test_insert_parquet.py which tests writing/read-
    ing (null/non-null) data into/from a table (w different data type
    columns) using multiple codecs. Other existing e2e tests were
    updated to also use parquet/zstd table format.
  - Manual interoperability tests were run between Impala and Hive.

Change-Id: Id2c0e26e6f7fb2dc4024309d733983ba5197beb7
Reviewed-on: http://gerrit.cloudera.org:8080/13507
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Abhishek
2019-06-04 00:22:33 -07:00
committed by Impala Public Jenkins
parent e573b5502d
commit 51e8175c62
30 changed files with 497 additions and 98 deletions

View File

@@ -106,6 +106,7 @@ set_dep_root(LIBUNWIND)
set_dep_root(LLVM)
set(LLVM_DEBUG_ROOT $ENV{IMPALA_TOOLCHAIN}/llvm-$ENV{IMPALA_LLVM_DEBUG_VERSION})
set_dep_root(LZ4)
set_dep_root(ZSTD)
set_dep_root(OPENLDAP)
set_dep_root(PROTOBUF)
set_dep_root(RE2)
@@ -284,6 +285,10 @@ IMPALA_ADD_THIRDPARTY_LIB(snappy ${SNAPPY_INCLUDE_DIR} ${SNAPPY_STATIC_LIB} "")
find_package(Lz4 REQUIRED)
IMPALA_ADD_THIRDPARTY_LIB(lz4 ${LZ4_INCLUDE_DIR} ${LZ4_STATIC_LIB} "")
# find zstd lib
find_package(Zstd REQUIRED)
IMPALA_ADD_THIRDPARTY_LIB(zstd ${ZSTD_INCLUDE_DIR} ${ZSTD_STATIC_LIB} "")
# find re2 headers and libs
find_package(Re2 REQUIRED)
IMPALA_ADD_THIRDPARTY_LIB(re2 ${RE2_INCLUDE_DIR} ${RE2_STATIC_LIB} "")

View File

@@ -480,6 +480,7 @@ endif ()
set (IMPALA_DEPENDENCIES
snappy
lz4
zstd
re2
${Boost_LIBRARIES}
${LLVM_MODULE_LIBS}

View File

@@ -342,8 +342,8 @@ Status TPrivilegeFromObjectName(const string& object_name, TPrivilege* privilege
Status CompressCatalogObject(const uint8_t* src, uint32_t size, string* dst) {
scoped_ptr<Codec> compressor;
RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, THdfsCompression::LZ4,
&compressor));
Codec::CodecInfo codec_info(THdfsCompression::LZ4);
RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_info, &compressor));
int64_t compressed_data_len = compressor->MaxOutputLen(size);
int64_t output_buffer_len = compressed_data_len + sizeof(uint32_t);
dst->resize(static_cast<size_t>(output_buffer_len));

View File

@@ -476,7 +476,7 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
}
if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
state->query_options().__isset.compression_codec &&
state->query_options().compression_codec != THdfsCompression::NONE) {
state->query_options().compression_codec.codec != THdfsCompression::NONE) {
stringstream error_msg;
error_msg << "Writing to compressed text table is not supported. ";
return Status(error_msg.str());

View File

@@ -94,10 +94,10 @@ class HdfsParquetTableWriter::BaseColumnWriter {
public:
// expr - the expression to generate output values for this column.
BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval,
const THdfsCompression::type& codec)
const Codec::CodecInfo& codec_info)
: parent_(parent),
expr_eval_(expr_eval),
codec_(codec),
codec_info_(codec_info),
page_size_(DEFAULT_DATA_PAGE_SIZE),
current_page_(nullptr),
num_values_(0),
@@ -122,7 +122,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// Called after the constructor to initialize the column writer.
Status Init() WARN_UNUSED_RESULT {
Reset();
RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_info_, &compressor_));
return Status::OK();
}
@@ -203,7 +203,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
uint64_t total_compressed_size() const { return total_compressed_byte_size_; }
uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; }
parquet::CompressionCodec::type GetParquetCodec() const {
return ConvertImpalaToParquetCodec(codec_);
return ConvertImpalaToParquetCodec(codec_info_.format_);
}
protected:
@@ -316,7 +316,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
HdfsParquetTableWriter* parent_;
ScalarExprEvaluator* expr_eval_;
THdfsCompression::type codec_;
Codec::CodecInfo codec_info_;
// Compression codec for this column. If nullptr, this column is will not be
// compressed.
@@ -401,8 +401,8 @@ class HdfsParquetTableWriter::ColumnWriter :
public HdfsParquetTableWriter::BaseColumnWriter {
public:
ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const THdfsCompression::type& codec)
: BaseColumnWriter(parent, eval, codec),
const Codec::CodecInfo& codec_info)
: BaseColumnWriter(parent, eval, codec_info),
num_values_since_dict_size_check_(0),
plain_encoded_value_size_(
ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
@@ -521,8 +521,8 @@ class HdfsParquetTableWriter::BoolColumnWriter :
public HdfsParquetTableWriter::BaseColumnWriter {
public:
BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const THdfsCompression::type& codec)
: BaseColumnWriter(parent, eval, codec),
const Codec::CodecInfo& codec_info)
: BaseColumnWriter(parent, eval, codec_info),
page_stats_(parent_->reusable_col_mem_pool_.get(), -1),
row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) {
DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN);
@@ -576,11 +576,11 @@ class HdfsParquetTableWriter::BoolColumnWriter :
class HdfsParquetTableWriter::Int64TimestampColumnWriterBase :
public HdfsParquetTableWriter::ColumnWriter<int64_t> {
public:
Int64TimestampColumnWriterBase(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
: HdfsParquetTableWriter::ColumnWriter<int64_t>(parent, eval, codec) {
int64_t dummy;
plain_encoded_value_size_ = ParquetPlainEncoder::ByteSize(dummy);
Int64TimestampColumnWriterBase(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const Codec::CodecInfo& codec_info)
: HdfsParquetTableWriter::ColumnWriter<int64_t>(parent, eval, codec_info) {
int64_t dummy;
plain_encoded_value_size_ = ParquetPlainEncoder::ByteSize(dummy);
}
protected:
@@ -603,9 +603,9 @@ private:
class HdfsParquetTableWriter::Int64MilliTimestampColumnWriter :
public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
public:
Int64MilliTimestampColumnWriter(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
Int64MilliTimestampColumnWriter(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
protected:
virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -617,9 +617,9 @@ protected:
class HdfsParquetTableWriter::Int64MicroTimestampColumnWriter :
public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
public:
Int64MicroTimestampColumnWriter(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
Int64MicroTimestampColumnWriter(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
protected:
virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -632,9 +632,9 @@ protected:
class HdfsParquetTableWriter::Int64NanoTimestampColumnWriter :
public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
public:
Int64NanoTimestampColumnWriter(HdfsParquetTableWriter* parent,
ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
Int64NanoTimestampColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
const Codec::CodecInfo& codec_info)
: HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
protected:
virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -956,19 +956,25 @@ Status HdfsParquetTableWriter::Init() {
// Default to snappy compressed
THdfsCompression::type codec = THdfsCompression::SNAPPY;
// Compression level only supported for zstd.
int clevel = ZSTD_CLEVEL_DEFAULT;
const TQueryOptions& query_options = state_->query_options();
if (query_options.__isset.compression_codec) {
codec = query_options.compression_codec;
codec = query_options.compression_codec.codec;
clevel = query_options.compression_codec.compression_level;
}
if (!(codec == THdfsCompression::NONE ||
codec == THdfsCompression::GZIP ||
codec == THdfsCompression::SNAPPY)) {
codec == THdfsCompression::SNAPPY ||
codec == THdfsCompression::ZSTD)) {
stringstream ss;
ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
return Status(ss.str());
}
VLOG_FILE << "Using compression codec: " << codec;
if (codec == THdfsCompression::ZSTD) {
VLOG_FILE << "Using compression level: " << clevel;
}
if (query_options.__isset.parquet_page_row_count_limit) {
page_row_count_limit_ = query_options.parquet_page_row_count_limit;
@@ -986,6 +992,8 @@ Status HdfsParquetTableWriter::Init() {
PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols));
}
Codec::CodecInfo codec_info(codec, clevel);
columns_.resize(num_cols);
// Initialize each column structure.
for (int i = 0; i < columns_.size(); ++i) {
@@ -993,43 +1001,43 @@ Status HdfsParquetTableWriter::Init() {
const ColumnType& type = output_expr_evals_[i]->root().type();
switch (type.type) {
case TYPE_BOOLEAN:
writer = new BoolColumnWriter(this, output_expr_evals_[i], codec);
writer = new BoolColumnWriter(this, output_expr_evals_[i], codec_info);
break;
case TYPE_TINYINT:
writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_SMALLINT:
writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_INT:
writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_BIGINT:
writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_FLOAT:
writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_DOUBLE:
writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_TIMESTAMP:
switch (state_->query_options().parquet_timestamp_type) {
case TParquetTimestampType::INT96_NANOS:
writer =
new ColumnWriter<TimestampValue>(this, output_expr_evals_[i], codec);
new ColumnWriter<TimestampValue>(this, output_expr_evals_[i], codec_info);
break;
case TParquetTimestampType::INT64_MILLIS:
writer =
new Int64MilliTimestampColumnWriter(this, output_expr_evals_[i], codec);
writer = new Int64MilliTimestampColumnWriter(
this, output_expr_evals_[i], codec_info);
break;
case TParquetTimestampType::INT64_MICROS:
writer =
new Int64MicroTimestampColumnWriter(this, output_expr_evals_[i], codec);
writer = new Int64MicroTimestampColumnWriter(
this, output_expr_evals_[i], codec_info);
break;
case TParquetTimestampType::INT64_NANOS:
writer =
new Int64NanoTimestampColumnWriter(this, output_expr_evals_[i], codec);
writer = new Int64NanoTimestampColumnWriter(
this, output_expr_evals_[i], codec_info);
break;
default:
DCHECK(false);
@@ -1038,28 +1046,28 @@ Status HdfsParquetTableWriter::Init() {
case TYPE_VARCHAR:
case TYPE_STRING:
case TYPE_CHAR:
writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec_info);
break;
case TYPE_DECIMAL:
switch (output_expr_evals_[i]->root().type().GetByteSize()) {
case 4:
writer = new ColumnWriter<Decimal4Value>(
this, output_expr_evals_[i], codec);
writer =
new ColumnWriter<Decimal4Value>(this, output_expr_evals_[i], codec_info);
break;
case 8:
writer = new ColumnWriter<Decimal8Value>(
this, output_expr_evals_[i], codec);
writer =
new ColumnWriter<Decimal8Value>(this, output_expr_evals_[i], codec_info);
break;
case 16:
writer = new ColumnWriter<Decimal16Value>(
this, output_expr_evals_[i], codec);
writer =
new ColumnWriter<Decimal16Value>(this, output_expr_evals_[i], codec_info);
break;
default:
DCHECK(false);
}
break;
case TYPE_DATE:
writer = new ColumnWriter<DateValue>(this, output_expr_evals_[i], codec);
writer = new ColumnWriter<DateValue>(this, output_expr_evals_[i], codec_info);
break;
default:
DCHECK(false);

View File

@@ -24,8 +24,10 @@ const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
THdfsCompression::NONE,
THdfsCompression::SNAPPY,
THdfsCompression::GZIP,
THdfsCompression::LZO
};
THdfsCompression::LZO,
THdfsCompression::BROTLI,
THdfsCompression::LZ4,
THdfsCompression::ZSTD};
const int PARQUET_TO_IMPALA_CODEC_SIZE =
sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
@@ -33,13 +35,18 @@ const int PARQUET_TO_IMPALA_CODEC_SIZE =
/// Mapping of Impala codec enums to Parquet enums
const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
parquet::CompressionCodec::UNCOMPRESSED,
parquet::CompressionCodec::SNAPPY, // DEFAULT
parquet::CompressionCodec::GZIP, // GZIP
parquet::CompressionCodec::GZIP, // DEFLATE
parquet::CompressionCodec::SNAPPY, // DEFAULT
parquet::CompressionCodec::GZIP, // GZIP
parquet::CompressionCodec::GZIP, // DEFLATE
// Placeholder for BZIP2 which isn't a valid parquet codec.
parquet::CompressionCodec::SNAPPY, // BZIP2
parquet::CompressionCodec::SNAPPY,
parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED
parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED
parquet::CompressionCodec::LZO,
};
parquet::CompressionCodec::LZ4,
parquet::CompressionCodec::GZIP, // ZLIB
parquet::CompressionCodec::ZSTD,
parquet::CompressionCodec::BROTLI};
const int IMPALA_TO_PARQUET_CODEC_SIZE =
sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);

View File

@@ -283,7 +283,8 @@ Status ParquetMetadataUtils::ValidateRowGroupColumn(
const auto codec = Ubsan::EnumToInt(&col_chunk_metadata.codec);
if (codec != parquet::CompressionCodec::UNCOMPRESSED &&
codec != parquet::CompressionCodec::SNAPPY &&
codec != parquet::CompressionCodec::GZIP) {
codec != parquet::CompressionCodec::GZIP &&
codec != parquet::CompressionCodec::ZSTD) {
return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
"'$2'.", filename, codec, schema_element.name));
}

View File

@@ -44,7 +44,7 @@ namespace impala {
// Generates num strings between min_len and max_len.
// Outputs the uncompressed/compressed/sorted_compressed sizes.
void TestCompression(int num, int min_len, int max_len, THdfsCompression::type codec) {
void TestCompression(int num, int min_len, int max_len, THdfsCompression::type format) {
vector<string> strings;
uint8_t* buffer = (uint8_t*)malloc(max_len * num);
int offset = 0;
@@ -69,7 +69,8 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c
}
scoped_ptr<Codec> compressor;
Status status = Codec::CreateCompressor(NULL, false, codec, &compressor);
Codec::CodecInfo codec_info(format);
Status status = Codec::CreateCompressor(NULL, false, codec_info, &compressor);
DCHECK(status.ok());
int64_t compressed_len = compressor->MaxOutputLen(offset);
@@ -83,7 +84,7 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c
&sorted_compressed_len, &sorted_compressed_buffer));
cout << "NumStrings=" << num << " MinLen=" << min_len << " MaxLen=" << max_len
<< " Codec=" << codec << endl;
<< " Codec=" << codec_info.format_ << endl;
cout << " Uncompressed len: " << offset << endl;
cout << " Compressed len: " << compressed_len << endl;
cout << " Sorted Compressed len: " << sorted_compressed_len << endl;

View File

@@ -125,13 +125,27 @@ Status ChildQuery::ExecAndFetch() {
return status;
}
template <typename T>
void PrintQueryOptionValue (const T& option, stringstream& val) {
val << option;
}
void PrintQueryOptionValue(const impala::TCompressionCodec& compression_codec,
stringstream& val) {
if (compression_codec.codec != THdfsCompression::ZSTD) {
val << compression_codec.codec;
} else {
val << compression_codec.codec << ":" << compression_codec.compression_level;
}
}
void ChildQuery::SetQueryOptions(const TQueryOptions& parent_options,
TExecuteStatementReq* exec_stmt_req) {
map<string, string> conf;
#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
if (parent_options.__isset.NAME) {\
stringstream val;\
val << parent_options.NAME;\
PrintQueryOptionValue(parent_options.NAME, val);\
conf[#ENUM] = val.str();\
}
#define REMOVED_QUERY_OPT_FN(NAME, ENUM)

View File

@@ -17,9 +17,11 @@
#include "service/query-options.h"
#include <zstd.h>
#include <boost/preprocessor/seq/for_each.hpp>
#include <boost/preprocessor/tuple/to_seq.hpp>
#include "gutil/strings/substitute.h"
#include "runtime/runtime-filter.h"
#include "testutil/gtest-util.h"
#include "util/mem-info.h"
@@ -27,6 +29,7 @@
using namespace boost;
using namespace impala;
using namespace std;
using namespace strings;
constexpr int32_t I32_MAX = numeric_limits<int32_t>::max();
constexpr int64_t I64_MAX = numeric_limits<int64_t>::max();
@@ -209,9 +212,6 @@ TEST(QueryOptions, SetEnumOptions) {
TParquetFallbackSchemaResolution, (POSITION, NAME)), true);
TestEnumCase(options, CASE(parquet_array_resolution, TParquetArrayResolution,
(THREE_LEVEL, TWO_LEVEL, TWO_LEVEL_THEN_THREE_LEVEL)), true);
TestEnumCase(options, CASE(compression_codec, THdfsCompression,
(NONE, DEFAULT, GZIP, DEFLATE, BZIP2, SNAPPY, SNAPPY_BLOCKED, LZO, LZ4, ZLIB,
ZSTD)), true);
TestEnumCase(options, CASE(default_file_format, THdfsFileFormat,
(TEXT, RC_FILE, SEQUENCE_FILE, AVRO, PARQUET, KUDU, ORC)), true);
TestEnumCase(options, CASE(runtime_filter_mode, TRuntimeFilterMode,
@@ -227,7 +227,7 @@ TEST(QueryOptions, SetEnumOptions) {
TEST(QueryOptions, SetIntOptions) {
TQueryOptions options;
// List of pairs of Key and its valid range
pair<OptionDef<int32_t>, Range<int32_t>> case_set[] {
pair<OptionDef<int32_t>, Range<int32_t>> case_set[]{
{MAKE_OPTIONDEF(runtime_filter_wait_time_ms), {0, I32_MAX}},
{MAKE_OPTIONDEF(mt_dop), {0, 64}},
{MAKE_OPTIONDEF(disable_codegen_rows_threshold), {0, I32_MAX}},
@@ -462,4 +462,51 @@ TEST(QueryOptions, ResetToDefaultViaEmptyString) {
}
}
TEST(QueryOptions, CompressionCodec) {
#define ENTRY(_, prefix, entry) (prefix::entry),
#define ENTRIES(prefix, name) BOOST_PP_SEQ_FOR_EACH(ENTRY, prefix, name)
#define CASE(enumtype, enums) {ENTRIES(enumtype, BOOST_PP_TUPLE_TO_SEQ(enums))}
TQueryOptions options;
vector<THdfsCompression::type> codecs = CASE(THdfsCompression, (NONE, DEFAULT, GZIP,
DEFLATE, BZIP2, SNAPPY, SNAPPY_BLOCKED, LZO, LZ4, ZLIB, ZSTD, BROTLI));
// Test valid values for compression_codec.
for (auto& codec : codecs) {
EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0",codec), &options,
nullptr).ok());
// Test that compression level is only supported for ZSTD.
if (codec != THdfsCompression::ZSTD) {
EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("$0:1",codec),
&options, nullptr).ok());
}
else {
EXPECT_TRUE(SetQueryOption("compression_codec",
Substitute("zstd:$0",ZSTD_CLEVEL_DEFAULT), &options, nullptr).ok());
}
}
// Test invalid values for compression_codec.
EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("$0", codecs.back() + 1),
&options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec", "foo", &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec", "1%", &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec", "-1", &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec", ":", &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec", ":1", &options, nullptr).ok());
// Test compression levels for ZSTD.
const int zstd_min_clevel = 1;
const int zstd_max_clevel = ZSTD_maxCLevel();
for (int i = zstd_min_clevel; i <= zstd_max_clevel; i++)
{
EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("ZSTD:$0",i), &options,
nullptr).ok());
}
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("ZSTD:$0", zstd_min_clevel - 1), &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("ZSTD:$0", zstd_max_clevel + 1), &options, nullptr).ok());
#undef CASE
#undef ENTRIES
#undef ENTRY
}
IMPALA_TEST_MAIN();

View File

@@ -25,10 +25,11 @@
#include "exprs/timezone_db.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include <zstd.h>
#include <sstream>
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
#include <gutil/strings/strip.h>
#include <gutil/strings/substitute.h>
#include "common/names.h"
@@ -89,6 +90,15 @@ const string& PrintQueryOptionValue(const std::string& option) {
return option;
}
const string PrintQueryOptionValue(const impala::TCompressionCodec& compression_codec) {
if (compression_codec.codec != THdfsCompression::ZSTD) {
return Substitute("$0", PrintThriftEnum(compression_codec.codec));
} else {
return Substitute("$0:$1", PrintThriftEnum(compression_codec.codec),
compression_codec.compression_level);
}
}
void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
map<string, string>* configuration) {
#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
@@ -128,6 +138,11 @@ static TQueryOptions DefaultQueryOptions() {
return defaults;
}
inline bool operator!=(const TCompressionCodec& a,
const TCompressionCodec& b) {
return (a.codec != b.codec || a.compression_level != b.compression_level);
}
string impala::DebugQueryOptions(const TQueryOptions& query_options) {
const static TQueryOptions defaults = DefaultQueryOptions();
int i = 0;
@@ -264,10 +279,42 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_debug_action(value.c_str());
break;
case TImpalaQueryOptions::COMPRESSION_CODEC: {
// Acceptable values are:
// - zstd:compression_level
// - codec
vector<string> tokens;
split(tokens, value, is_any_of(":"), token_compress_on);
if (tokens.size() > 2) return Status("Invalid compression codec value");
string& codec_name = tokens[0];
trim(codec_name);
int compression_level = ZSTD_CLEVEL_DEFAULT;
THdfsCompression::type enum_type;
RETURN_IF_ERROR(GetThriftEnum(value, "compression codec",
RETURN_IF_ERROR(GetThriftEnum(codec_name, "compression codec",
_THdfsCompression_VALUES_TO_NAMES, &enum_type));
query_options->__set_compression_codec(enum_type);
if (tokens.size() == 2) {
if (enum_type != THdfsCompression::ZSTD) {
return Status("Compression level only supported for ZSTD");
}
StringParser::ParseResult status;
string& clevel = tokens[1];
trim(clevel);
compression_level = StringParser::StringToInt<int>(
clevel.c_str(), static_cast<int>(clevel.size()), &status);
if (status != StringParser::PARSE_SUCCESS || compression_level < 1
|| compression_level > ZSTD_maxCLevel()) {
return Status(Substitute("Invalid ZSTD compression level '$0'."
" Valid values are in [1,$1]", clevel, ZSTD_maxCLevel()));
}
}
TCompressionCodec compression_codec;
compression_codec.__set_codec(enum_type);
if (enum_type == THdfsCompression::ZSTD) {
compression_codec.__set_compression_level(compression_level);
}
query_options->__set_compression_codec(compression_codec);
break;
}
case TImpalaQueryOptions::HBASE_CACHING:

View File

@@ -34,6 +34,9 @@ const char* const Codec::DEFAULT_COMPRESSION =
const char* const Codec::GZIP_COMPRESSION = "org.apache.hadoop.io.compress.GzipCodec";
const char* const Codec::BZIP2_COMPRESSION = "org.apache.hadoop.io.compress.BZip2Codec";
const char* const Codec::SNAPPY_COMPRESSION = "org.apache.hadoop.io.compress.SnappyCodec";
const char* const Codec::LZ4_COMPRESSION = "org.apache.hadoop.io.compress.Lz4Codec";
const char* const Codec::ZSTD_COMPRESSION =
"org.apache.hadoop.io.compress.ZStandardCodec";
const char* const Codec::UNKNOWN_CODEC_ERROR =
"This compression codec is currently unsupported: ";
const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. "
@@ -43,7 +46,9 @@ const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE},
{DEFAULT_COMPRESSION, THdfsCompression::DEFAULT},
{GZIP_COMPRESSION, THdfsCompression::GZIP},
{BZIP2_COMPRESSION, THdfsCompression::BZIP2},
{SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED}};
{SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED},
{LZ4_COMPRESSION, THdfsCompression::LZ4},
{ZSTD_COMPRESSION, THdfsCompression::ZSTD}};
string Codec::GetCodecName(THdfsCompression::type type) {
for (const CodecMap::value_type& codec: g_CatalogObjects_constants.COMPRESSION_MAP) {
@@ -71,13 +76,15 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const string& code
return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec));
}
RETURN_IF_ERROR(
CreateCompressor(mem_pool, reuse, type->second, compressor));
CodecInfo codec_info(
type->second, (type->second == THdfsCompression::ZSTD) ? ZSTD_CLEVEL_DEFAULT : 0);
RETURN_IF_ERROR(CreateCompressor(mem_pool, reuse, codec_info, compressor));
return Status::OK();
}
Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format, scoped_ptr<Codec>* compressor) {
Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& codec_info,
scoped_ptr<Codec>* compressor) {
THdfsCompression::type format = codec_info.format_;
switch (format) {
case THdfsCompression::NONE:
compressor->reset(nullptr);
@@ -103,6 +110,10 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse,
case THdfsCompression::LZ4:
compressor->reset(new Lz4Compressor(mem_pool, reuse));
break;
case THdfsCompression::ZSTD:
compressor->reset(new ZstandardCompressor(mem_pool, reuse,
codec_info.compression_level_));
break;
default: {
if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
return Status(Substitute("Unsupported codec: $0", format));
@@ -149,6 +160,9 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
case THdfsCompression::LZ4:
decompressor->reset(new Lz4Decompressor(mem_pool, reuse));
break;
case THdfsCompression::ZSTD:
decompressor->reset(new ZstandardDecompressor(mem_pool, reuse));
break;
default: {
if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
return Status(Substitute("Unsupported codec: $0", format));

View File

@@ -44,6 +44,8 @@ class Codec {
static const char* const GZIP_COMPRESSION;
static const char* const BZIP2_COMPRESSION;
static const char* const SNAPPY_COMPRESSION;
static const char* const LZ4_COMPRESSION;
static const char* const ZSTD_COMPRESSION;
static const char* const UNKNOWN_CODEC_ERROR;
// Output buffer size for streaming compressed file.
@@ -53,6 +55,16 @@ class Codec {
typedef std::map<const std::string, const THdfsCompression::type> CodecMap;
static const CodecMap CODEC_MAP;
struct CodecInfo {
public:
CodecInfo(THdfsCompression::type format, int compression_level = 0)
: format_(format), compression_level_(compression_level) {}
THdfsCompression::type format_;
// Currently only ZSTD uses compression level.
int compression_level_;
};
/// Create a decompressor.
/// Input:
/// mem_pool: the memory pool used to store the decompressed data.
@@ -79,7 +91,7 @@ class Codec {
/// Output:
/// compressor: scoped pointer to the compressor class to use.
static Status CreateCompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format,
const CodecInfo& codec_info,
boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT;
/// Alternate factory method: takes a codec string and populates a scoped pointer.

View File

@@ -22,8 +22,10 @@
#include <boost/crc.hpp>
#include <gutil/strings/substitute.h>
#undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
#include <snappy.h>
#include <lz4.h>
#include <snappy.h>
#include <zstd.h>
#include <zstd_errors.h>
#include "exec/read-write-util.h"
#include "runtime/mem-pool.h"
@@ -318,3 +320,23 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt
reinterpret_cast<char*>(*output), input_length, *output_length);
return Status::OK();
}
ZstandardCompressor::ZstandardCompressor(MemPool* mem_pool, bool reuse_buffer, int clevel)
: Codec(mem_pool, reuse_buffer), clevel_(clevel) {}
int64_t ZstandardCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return ZSTD_compressBound(input_len);
}
Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
DCHECK_GE(input_length, 0);
DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
if (input_length == 0) return Status::OK();
*output_length = ZSTD_compress(*output, *output_length, input, input_length, clevel_);
if (ZSTD_isError(*output_length)) {
return Status(TErrorCode::ZSTD_ERROR, "ZSTD_compress",
ZSTD_getErrorString(ZSTD_getErrorCode(*output_length)));
}
return Status::OK();
}

View File

@@ -21,6 +21,7 @@
/// We need zlib.h here to declare stream_ below.
#include <zlib.h>
#include <zstd.h>
#include "util/codec.h"
@@ -131,5 +132,22 @@ class Lz4Compressor : public Codec {
virtual std::string file_extension() const override { return "lz4"; }
};
/// ZStandard compression codec.
class ZstandardCompressor : public Codec {
public:
ZstandardCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
int clevel = ZSTD_CLEVEL_DEFAULT);
virtual ~ZstandardCompressor() {}
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "zstd"; }
private:
int clevel_;
};
}
#endif

View File

@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <zstd.h>
#include <iostream>
#include "gen-cpp/Descriptors_types.h"
@@ -24,12 +25,16 @@
#include "runtime/mem-tracker.h"
#include "runtime/mem-pool.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/decompress.h"
#include "util/compress.h"
#include "util/ubsan.h"
#include "common/names.h"
using std::mt19937;
using std::uniform_int_distribution;
namespace impala {
// Fixture for testing class Decompressor
@@ -58,15 +63,16 @@ class DecompressorTest : public ::testing::Test {
mem_pool_.FreeAll();
}
void RunTest(THdfsCompression::type format) {
void RunTest(THdfsCompression::type format, int clevel = 0) {
scoped_ptr<Codec> compressor;
scoped_ptr<Codec> decompressor;
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, format, &compressor));
Codec::CodecInfo codec_info(format, clevel);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
// LZ4 is not implemented to work without an allocated output
if(format == THdfsCompression::LZ4) {
// LZ4 & ZSTD are not implemented to work without an allocated output
if (format == THdfsCompression::LZ4 || format == THdfsCompression::ZSTD) {
CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
sizeof(input_), input_);
CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
@@ -97,7 +103,9 @@ class DecompressorTest : public ::testing::Test {
void RunTestStreaming(THdfsCompression::type format) {
scoped_ptr<Codec> compressor;
scoped_ptr<Codec> decompressor;
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, format, &compressor));
Codec::CodecInfo codec_info(format);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
@@ -331,7 +339,8 @@ class DecompressorTest : public ::testing::Test {
*compressed_len = 0;
scoped_ptr<Codec> compressor;
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, format, &compressor));
Codec::CodecInfo codec_info(format);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
// Make sure we don't completely fill the buffer, leave at least RAW_INPUT_SIZE
// bytes free in compressed buffer for junk data testing (Test case 3).
@@ -412,8 +421,8 @@ TEST_F(DecompressorTest, Impala1506) {
MemTracker trax;
MemPool pool(&trax);
scoped_ptr<Codec> compressor;
EXPECT_OK(
Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor));
Codec::CodecInfo codec_info(impala::THdfsCompression::GZIP);
EXPECT_OK(Codec::CreateCompressor(&pool, true, codec_info, &compressor));
int64_t input_len = 3;
const uint8_t input[3] = {1, 2, 3};
@@ -457,8 +466,8 @@ TEST_F(DecompressorTest, LZ4Huge) {
for (int i = 0 ; i < payload_len; ++i) payload[i] = rand();
scoped_ptr<Codec> compressor;
EXPECT_OK(Codec::CreateCompressor(nullptr, true, impala::THdfsCompression::LZ4,
&compressor));
Codec::CodecInfo codec_info(impala::THdfsCompression::LZ4);
EXPECT_OK(Codec::CreateCompressor(nullptr, true, codec_info, &compressor));
// The returned max_size is 0 because the payload is too big.
int64_t max_size = compressor->MaxOutputLen(payload_len);
@@ -472,6 +481,14 @@ TEST_F(DecompressorTest, LZ4Huge) {
&compressed_len, &compressed_ptr), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
}
TEST_F(DecompressorTest, ZSTD) {
RunTest(THdfsCompression::ZSTD, ZSTD_CLEVEL_DEFAULT);
mt19937 rng;
RandTestUtil::SeedRng("ZSTD_COMPRESSION_LEVEL_SEED", &rng);
// zstd supports compression levels from 1 up to ZSTD_maxCLevel()
const int clevel = uniform_int_distribution<int>(1, ZSTD_maxCLevel())(rng);
RunTest(THdfsCompression::ZSTD, clevel);
}
}
int main(int argc, char **argv) {

View File

@@ -21,8 +21,10 @@
#include <zlib.h>
#include <bzlib.h>
#undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
#include <snappy.h>
#include <lz4.h>
#include <snappy.h>
#include <zstd.h>
#include <zstd_errors.h>
#include "common/logging.h"
#include "exec/read-write-util.h"
@@ -603,3 +605,24 @@ Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_len
*output_length = ret;
return Status::OK();
}
ZstandardDecompressor::ZstandardDecompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {}
int64_t ZstandardDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
return -1;
}
Status ZstandardDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
DCHECK(output_preallocated) << "Output was not allocated for Zstd Codec";
if (*output_length == 0) return Status::OK();
size_t ret = ZSTD_decompress(*output, *output_length, input, input_length);
if (ZSTD_isError(ret)) {
*output_length = 0;
return Status(TErrorCode::ZSTD_ERROR, "ZSTD_decompress",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
*output_length = ret;
return Status::OK();
}

View File

@@ -132,5 +132,17 @@ class SnappyBlockDecompressor : public Codec {
virtual std::string file_extension() const override { return "snappy"; }
};
class ZstandardDecompressor : public Codec {
public:
ZstandardDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~ZstandardDecompressor() {}
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "zstd"; }
};
}
#endif

View File

@@ -860,8 +860,8 @@ Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
// Compress the serialized thrift string. This uses string keys and is very
// easy to compress.
scoped_ptr<Codec> compressor;
RETURN_IF_ERROR(
Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor));
Codec::CodecInfo codec_info(THdfsCompression::DEFAULT);
RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, codec_info, &compressor));
const auto close_compressor =
MakeScopeExitTrigger([&compressor]() { compressor->Close(); });

View File

@@ -541,7 +541,7 @@ if __name__ == "__main__":
"avro", "binutils", "boost", "breakpad", "bzip2", "cctz", "cmake", "crcutil",
"flatbuffers", "gcc", "gdb", "gflags", "glog", "gperftools", "gtest", "libev",
"libunwind", "lz4", "openldap", "openssl", "orc", "protobuf",
"rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"])
"rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib", "zstd"])
packages.insert(0, Package("llvm", "5.0.1-asserts-p1"))
packages.insert(0, Package("thrift", os.environ.get("IMPALA_THRIFT11_VERSION")))
bootstrap(toolchain_root, packages)

View File

@@ -118,6 +118,8 @@ export IMPALA_LLVM_DEBUG_VERSION=5.0.1-asserts-p1
unset IMPALA_LLVM_DEBUG_URL
export IMPALA_LZ4_VERSION=1.7.5
unset IMPALA_LZ4_URL
export IMPALA_ZSTD_VERSION=1.4.0
unset IMPALA_ZSTD_URL
export IMPALA_OPENLDAP_VERSION=2.4.47
unset IMPALA_OPENLDAP_URL
export IMPALA_OPENSSL_VERSION=1.0.2l

View File

@@ -0,0 +1,66 @@
##############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
##############################################################################
# - Find ZSTD (zstd.h, libzstd.a, libzstd.so, and libzstd.so.1)
# ZSTD_ROOT hints the location
#
# This module defines
# ZSTD_INCLUDE_DIR, directory containing headers
# ZSTD_LIBS, directory containing zstd libraries
# ZSTD_STATIC_LIB, path to libzstd.a
set(ZSTD_SEARCH_HEADER_PATHS ${ZSTD_ROOT}/include)
set(ZSTD_SEARCH_LIB_PATH ${ZSTD_ROOT}/lib)
find_path(ZSTD_INCLUDE_DIR
NAMES zstd.h
PATHS ${ZSTD_SEARCH_HEADER_PATHS}
NO_DEFAULT_PATH
DOC "Path to ZSTD headers"
)
find_library(ZSTD_LIBS NAMES zstd
PATHS ${ZSTD_SEARCH_LIB_PATH}
NO_DEFAULT_PATH
DOC "Path to ZSTD library"
)
find_library(ZSTD_STATIC_LIB NAMES libzstd.a
PATHS ${ZSTD_SEARCH_LIB_PATH}
NO_DEFAULT_PATH
DOC "Path to ZSTD static library"
)
if (NOT ZSTD_LIBS OR NOT ZSTD_STATIC_LIB)
message(FATAL_ERROR "Zstd includes and libraries NOT found. "
"Looked for headers in ${ZSTD_SEARCH_HEADER_PATHS}, "
"and for libs in ${ZSTD_SEARCH_LIB_PATH}")
set(ZSTD_FOUND FALSE)
else()
set(ZSTD_FOUND TRUE)
endif ()
mark_as_advanced(
ZSTD_INCLUDE_DIR
ZSTD_LIBS
ZSTD_STATIC_LIB
)

View File

@@ -78,6 +78,7 @@ enum THdfsCompression {
LZ4 = 8
ZLIB = 9
ZSTD = 10
BROTLI = 11
}
enum TColumnEncoding {
@@ -109,6 +110,13 @@ enum TAccessLevel {
WRITE_ONLY = 3
}
struct TCompressionCodec {
// Compression codec
1: required THdfsCompression codec
// Compression level
2: optional i32 compression_level
}
// Mapping from names defined by Avro to values in the THdfsCompression enum.
const map<string, THdfsCompression> COMPRESSION_MAP = {
"": THdfsCompression.NONE,

View File

@@ -111,7 +111,7 @@ struct TQueryOptions {
7: optional i32 num_scanner_threads = 0
11: optional string debug_action = ""
12: optional i64 mem_limit = 0
14: optional CatalogObjects.THdfsCompression compression_codec
14: optional CatalogObjects.TCompressionCodec compression_codec
15: optional i32 hbase_caching = 0
16: optional bool hbase_cache_blocks = 0
17: optional i64 parquet_file_size = 0

View File

@@ -414,6 +414,8 @@ error_codes = (
("UNAUTHORIZED_SESSION_USER", 136,
"The user authorized on the connection '$0' does not match the session username '$1'"),
("ZSTD_ERROR", 137, "$0 failed with error: $1"),
)
import sys

View File

@@ -0,0 +1,42 @@
====
---- QUERY
create table t1_default (c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 boolean, c6 float,
c7 real, c8 double, c9 decimal(20,15), c10 timestamp, c11 char(10),
c13 varchar(20), c14 string) stored as parquet;
====
---- QUERY
create table t1_zstd_gzip (c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 boolean, c6 float,
c7 real, c8 double, c9 decimal(20,15), c10 timestamp, c11 char(10),
c13 varchar(20), c14 string) stored as parquet;
====
---- QUERY
insert into t1_default select tinyint_col, smallint_col, id, bigint_col, bool_col,
float_col, double_col, double_col, cast(float_col as decimal(20,15)),
timestamp_col, cast(year as char(10)), cast(double_col as varchar(10)),
string_col from functional.alltypes;
====
---- QUERY
insert into t1_default(c3) values (8000),(9000);
====
---- QUERY
set COMPRESSION_CODEC=ZSTD;
insert into t1_zstd_gzip select tinyint_col, smallint_col, id, bigint_col, bool_col,
float_col, double_col, double_col, cast(float_col as decimal(20,15)),
timestamp_col, cast(year as char(10)), cast(double_col as varchar(10)), string_col
from functional.alltypes where id < 4000;
====
---- QUERY
set COMPRESSION_CODEC=GZIP;
insert into t1_zstd_gzip(c3) values (9000);
====
---- QUERY
set COMPRESSION_CODEC=GZIP;
insert into t1_zstd_gzip select tinyint_col, smallint_col, id, bigint_col, bool_col,
float_col, double_col, double_col, cast(float_col as decimal(20,15)),
timestamp_col, cast(year as char(10)), cast(double_col as varchar(10)), string_col
from functional.alltypes where id >= 4000;
====
---- QUERY
set COMPRESSION_CODEC=ZSTD;
insert into t1_zstd_gzip(c3) values(8000);
====

View File

@@ -94,7 +94,7 @@ Invalid query option: foo
---- QUERY
set parquet_compression_codec=bar
---- CATCH
Invalid compression codec: 'bar'. Valid values are NONE(0), DEFAULT(1), GZIP(2), DEFLATE(3), BZIP2(4), SNAPPY(5), SNAPPY_BLOCKED(6), LZO(7), LZ4(8), ZLIB(9), ZSTD(10).
Invalid compression codec: 'bar'. Valid values are NONE(0), DEFAULT(1), GZIP(2), DEFLATE(3), BZIP2(4), SNAPPY(5), SNAPPY_BLOCKED(6), LZO(7), LZ4(8), ZLIB(9), ZSTD(10), BROTLI(11).
====
---- QUERY
set explain_level=bar
@@ -275,4 +275,4 @@ set RM_INITIAL_MEM = "foo";
set SCAN_NODE_CODEGEN_THRESHOLD = "foo";
set max_io_buffers="foo";
---- RESULTS
====
====

View File

@@ -32,7 +32,7 @@ class TableFormatInfo(object):
KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase']
if os.environ['KUDU_IS_SUPPORTED'] == 'true':
KNOWN_FILE_FORMATS.append('kudu')
KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo']
KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo', 'zstd']
KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']
def __init__(self, **kwargs):

View File

@@ -35,7 +35,7 @@ from tests.common.test_result_verifier import (
from tests.common.test_vector import ImpalaTestDimension
from tests.verifiers.metric_verifier import MetricVerifier
PARQUET_CODECS = ['none', 'snappy', 'gzip']
PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd']
class TestInsertQueries(ImpalaTestSuite):
@classmethod

View File

@@ -31,12 +31,13 @@ from tests.common.parametrize import UniqueDatabase
from tests.common.skip import (SkipIfEC, SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfABFS,
SkipIfADLS)
from tests.common.test_dimensions import create_exec_option_dimension
from tests.common.test_result_verifier import verify_query_result_is_equal
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import get_fs_path
from tests.util.get_parquet_metadata import (decode_stats_value,
get_parquet_metadata_from_hdfs_folder)
PARQUET_CODECS = ['none', 'snappy', 'gzip']
PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd']
class RoundFloat():
@@ -125,6 +126,35 @@ class TestInsertParquetQueries(ImpalaTestSuite):
self.run_test_case('insert_parquet', vector, unique_database, multiple_impalad=True)
class TestParquetQueriesMultiCodecs(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestParquetQueriesMultiCodecs, cls).add_test_dimensions()
# Fix the exec_option vector to have a single value.
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
sync_ddl=[1]))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
@UniqueDatabase.parametrize(sync_ddl=True)
def test_insert_parquet_multi_codecs(self, vector, unique_database):
# Tests that parquet files are written/read correctly when using multiple codecs
self.run_test_case('QueryTest/insert_parquet_multi_codecs', vector, unique_database,
multiple_impalad=True)
base_table = "{0}.{1}".format(unique_database, "t1_default")
test_table = "{0}.{1}".format(unique_database, "t1_zstd_gzip")
# select all rows and compare the data in base_table and test_table
base_result = self.execute_query("select * from {0} order by c3".format(base_table))
test_result = self.execute_query("select * from {0} order by c3".format(test_table))
verify_query_result_is_equal(test_result.data, base_result.data)
class TestInsertParquetInvalidCodec(ImpalaTestSuite):
@classmethod