IMPALA-12108: Add support for LZ4 high compression levels

LZ4 has a high compression mode that gets higher compression ratios
(at the cost of higher compression time) while maintaining the fast
decompression speed. This type of compression would be useful for
workloads that write data once and read it many times.

This adds support for specifying a compression level for the
LZ4 codec. Compression level 1 is the current fast API. Compression
levels between LZ4HC_CLEVEL_MIN (3) and LZ4HC_CLEVEL_MAX (12) use
the high compression API. This lines up with the behavior of the lz4
commandline.

TPC-H 42 scale comparison
Compression codec | Avg Time (s) | Geomean Time (s) | Lineitem Size (GB) | Compression time for lineitem (s)
------------------+--------------+------------------+--------------------+------------------------------
Snappy            | 2.75         | 2.08             | 8.76               |   7.436
LZ4 level 1       | 2.58         | 1.91             | 9.1                |   6.864
LZ4 level 3       | 2.58         | 1.93             | 7.9                |  43.918
LZ4 level 9       | 2.68         | 1.98             | 7.6                | 125.0
Zstd level 3      | 3.03         | 2.31             | 6.36               |  17.274
Zstd level 6      | 3.10         | 2.38             | 6.33               |  44.955

LZ4 level 3 is about 10% smaller in data size while being about as fast as
regular LZ4. It compresses at about the same speed as Zstd level 6.

Testing:
 - Ran perf-AB-test with lz4 high compression levels
 - Added test cases to decompress-test

Change-Id: Ie7470ce38b8710c870cacebc80bc02cf5d022791
Reviewed-on: http://gerrit.cloudera.org:8080/23254
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Joe McDonnell
2025-08-05 13:29:47 -07:00
committed by Impala Public Jenkins
parent 14ff597e2f
commit 7477107ca3
5 changed files with 94 additions and 13 deletions

View File

@@ -17,6 +17,7 @@
#include "service/query-options.h"
#include <lz4hc.h>
#include <zstd.h>
#include <zlib.h>
#include <boost/preprocessor/seq/for_each.hpp>
@@ -630,6 +631,8 @@ TEST(QueryOptions, CompressionCodec) {
case THdfsCompression::ZLIB:
case THdfsCompression::BZIP2:
case THdfsCompression::DEFLATE:
case THdfsCompression::LZ4:
case THdfsCompression::LZ4_BLOCKED:
EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:1", codec),
&options, nullptr).ok());
break;
@@ -700,6 +703,29 @@ TEST(QueryOptions, CompressionCodec) {
Substitute("BZIP2:$0", bzip_min_clevel - 1), &options, nullptr).ok());
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("BZIP2:$0", bzip_max_clevel + 1), &options, nullptr).ok());
// Test compression levels for LZ4 / LZ4_BLOCKED
for (const string& lz4_codec : {"lz4", "lz4_blocked"}) {
// LZ4's default behavior is considered level 1 (and is tested above).
// LZ4's high compression API is used for levels between LZ4HC_CLEVEL_MIN (3) and
// LZ4HC_CLEVEL_MAX (12)
for (int i = LZ4HC_CLEVEL_MIN; i <= LZ4HC_CLEVEL_MAX; i++) {
EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:$1", lz4_codec, i),
&options, nullptr).ok());
}
// LZ4 added a level 2 in 1.10.0. If using a version without that support, test that
// level 2 fails. (If using a version with that support, this is tested by the loop
// above.)
if (LZ4HC_CLEVEL_MIN > 2) {
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("$0:$1", lz4_codec, 2), &options, nullptr).ok());
}
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("$0:$1", lz4_codec, LZ4HC_CLEVEL_MAX + 1), &options, nullptr).ok());
// Zero is not a valid level
EXPECT_FALSE(SetQueryOption("compression_codec",
Substitute("$0:$1", lz4_codec, 0), &options, nullptr).ok());
}
#undef CASE
#undef ENTRIES
#undef ENTRY

View File

@@ -111,14 +111,16 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& c
compressor->reset(new SnappyCompressor(mem_pool, reuse));
break;
case THdfsCompression::LZ4:
compressor->reset(new Lz4Compressor(mem_pool, reuse));
compressor->reset(new Lz4Compressor(mem_pool, reuse,
codec_info.compression_level_));
break;
case THdfsCompression::ZSTD:
compressor->reset(new ZstandardCompressor(mem_pool, reuse,
codec_info.compression_level_));
break;
case THdfsCompression::LZ4_BLOCKED:
compressor->reset(new Lz4BlockCompressor(mem_pool, reuse));
compressor->reset(new Lz4BlockCompressor(mem_pool, reuse,
codec_info.compression_level_));
break;
default: {
if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
@@ -140,10 +142,13 @@ Status Codec::ValidateCompressionLevel(THdfsCompression::type format,
return ZstandardCompressor::ValidateCompressionLevel(compression_level);
case THdfsCompression::BZIP2:
return BzipCompressor::ValidateCompressionLevel(compression_level);
case THdfsCompression::LZ4:
case THdfsCompression::LZ4_BLOCKED:
return Lz4Compressor::ValidateCompressionLevel(compression_level);
default:
// Note: BZIP2 compression levels are supported for disk-spill
// Parquet or ORC does not support BZIP compression
return Status("Compression level only supported for ZSTD, ZLIB(GZIP, DEFLATE)"
return Status("Compression level only supported for ZSTD, ZLIB(GZIP, DEFLATE), LZ4,"
" and BZIP2. Note: BZIP2 is not supported by Parquet(i.e. to write tables)");
}
}

View File

@@ -26,6 +26,7 @@
#include <zlib.h>
#include <boost/crc.hpp>
#include <lz4.h>
#include <lz4hc.h>
#include <snappy.h>
#include <zconf.h>
#include <zstd.h>
@@ -341,8 +342,24 @@ uint32_t SnappyCompressor::ComputeChecksum(int64_t input_len, const uint8_t* inp
return ((chk >> 15) | (chk << 17)) + 0xa282ead8;
}
Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer,
std::optional<int> compression_level)
: Codec(mem_pool, reuse_buffer), compression_level_(compression_level.value_or(1)) {
}
Status Lz4Compressor::ValidateCompressionLevel(int compression_level) {
if (compression_level == 1 ||
(compression_level >= LZ4HC_CLEVEL_MIN &&
compression_level <= LZ4HC_CLEVEL_MAX)) {
return Status::OK();
}
return Status(Substitute("Invalid LZ4 compression level '$0'."
" Valid values are 1 or between [$1, $2] for high compression", compression_level,
LZ4HC_CLEVEL_MIN, LZ4HC_CLEVEL_MAX));
}
Status Lz4Compressor::Init() {
return ValidateCompressionLevel(compression_level_);
}
int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
@@ -357,8 +374,16 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt
if (MaxOutputLen(input_length, input) == 0) {
return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, input_length);
}
*output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(*output), input_length, *output_length);
if (compression_level_ == 1) {
*output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(*output), input_length, *output_length);
} else {
DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN &&
compression_level_ <= LZ4HC_CLEVEL_MAX);
*output_length = LZ4_compress_HC(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(*output), input_length, *output_length,
compression_level_);
}
return Status::OK();
}
@@ -411,8 +436,9 @@ Status ZstandardCompressor::ValidateCompressionLevel(int compression_level) {
}
}
Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer)
: Codec(mem_pool, reuse_buffer) {
Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer,
std::optional<int> compression_level)
: Codec(mem_pool, reuse_buffer), compression_level_(compression_level.value_or(1)) {
}
int64_t Lz4BlockCompressor::MaxOutputLen(int64_t input_length, const uint8_t* input) {
@@ -444,8 +470,17 @@ Status Lz4BlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_
if (input_length > 0) {
uint8_t* sizep = outp;
outp += sizeof(int32_t);
const int64_t size = LZ4_compress_default(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output));
int64_t size = 0;
if (compression_level_ == 1) {
size = LZ4_compress_default(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output));
} else {
DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN &&
compression_level_ <= LZ4HC_CLEVEL_MAX);
size = LZ4_compress_HC(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output),
compression_level_);
}
if (size == 0) { return Status(TErrorCode::LZ4_COMPRESS_DEFAULT_FAILED); }
ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
outp += size;

View File

@@ -138,15 +138,22 @@ class SnappyCompressor : public Codec {
/// allocated and will cause an error if asked to do so.
class Lz4Compressor : public Codec {
public:
Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
std::optional<int> compression_level = std::nullopt);
virtual ~Lz4Compressor() { }
virtual Status Init() override WARN_UNUSED_RESULT;
virtual int64_t MaxOutputLen(
int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "lz4"; }
static Status ValidateCompressionLevel(int compression_level);
private:
int compression_level_;
};
/// ZStandard compression codec.
@@ -175,7 +182,8 @@ class ZstandardCompressor : public Codec {
/// Hadoop's block compression scheme on top of LZ4.
class Lz4BlockCompressor : public Codec {
public:
Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
std::optional<int> compression_level = std::nullopt);
virtual ~Lz4BlockCompressor() { }
virtual int64_t MaxOutputLen(
@@ -184,5 +192,7 @@ class Lz4BlockCompressor : public Codec {
const uint8_t* input, int64_t* output_length,
uint8_t** output) override WARN_UNUSED_RESULT;
virtual std::string file_extension() const override { return "lz4"; }
private:
int compression_level_;
};
}

View File

@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include <lz4hc.h>
#include <stdio.h>
#include <stdlib.h>
#include <zstd.h>
@@ -405,6 +406,8 @@ TEST_F(DecompressorTest, Snappy) {
TEST_F(DecompressorTest, LZ4) {
RunTest(THdfsCompression::LZ4);
RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MIN);
RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MAX);
}
TEST_F(DecompressorTest, Gzip) {
@@ -608,6 +611,8 @@ TEST_F(DecompressorTest, LZ4HadoopCompat) {
TEST_F(DecompressorTest, LZ4Blocked) {
RunTest(THdfsCompression::LZ4_BLOCKED);
RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MIN);
RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MAX);
}
}