From e11c5a772c1bee28cc628bdeaac2ea30be355995 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 25 Jan 2013 10:24:35 -0800 Subject: [PATCH] Fix snappy block decompressor and mem pool usage in rc file. --- be/src/exec/base-sequence-scanner.cc | 7 + be/src/exec/base-sequence-scanner.h | 4 + be/src/exec/hdfs-rcfile-scanner.cc | 12 +- be/src/exec/hdfs-rcfile-scanner.h | 2 +- be/src/exec/hdfs-sequence-scanner.cc | 5 +- be/src/exec/hdfs-sequence-scanner.h | 3 - be/src/util/codec.h | 14 +- be/src/util/compress.h | 12 +- be/src/util/decompress.cc | 163 ++++++++++++++------ be/src/util/decompress.h | 11 +- tests/query_test/test_compressed_formats.py | 2 + 11 files changed, 153 insertions(+), 82 deletions(-) diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index f92f88227..cc4e3d9b3 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -61,6 +61,13 @@ BaseSequenceScanner::~BaseSequenceScanner() { data_buffer_pool_->peak_allocated_bytes()); } +Status BaseSequenceScanner::Prepare() { + RETURN_IF_ERROR(HdfsScanner::Prepare()); + decompress_timer_ = ADD_COUNTER( + scan_node_->runtime_profile(), "DecompressionTime", TCounterType::CPU_TICKS); + return Status::OK; +} + Status BaseSequenceScanner::Close() { context_->AcquirePool(data_buffer_pool_.get()); context_->Flush(); diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h index 09ccd2203..44f2fb739 100644 --- a/be/src/exec/base-sequence-scanner.h +++ b/be/src/exec/base-sequence-scanner.h @@ -38,6 +38,7 @@ class BaseSequenceScanner : public HdfsScanner { // Issue the initial ranges for all sequence container files. static void IssueInitialRanges(HdfsScanNode*, const std::vector&); + virtual Status Prepare(); virtual Status Close(); virtual Status ProcessScanRange(ScanRangeContext* context); @@ -137,6 +138,9 @@ class BaseSequenceScanner : public HdfsScanner { // Pool to allocate per data block memory. This should be used with the // decompressor and any other per data block allocations. boost::scoped_ptr data_buffer_pool_; + + // Time spent decompressing bytes + RuntimeProfile::Counter* decompress_timer_; }; } diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 7b24bf0d0..9ff22d3f6 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -58,7 +58,7 @@ HdfsRCFileScanner::~HdfsRCFileScanner() { } Status HdfsRCFileScanner::Prepare() { - RETURN_IF_ERROR(HdfsScanner::Prepare()); + RETURN_IF_ERROR(BaseSequenceScanner::Prepare()); text_converter_.reset(new TextConverter(0)); @@ -228,6 +228,12 @@ void HdfsRCFileScanner::ResetRowGroup() { columns_[i].current_field_len = 0; columns_[i].current_field_len_rep = 0; } + + if (!context_->compact_data()) { + // We are done with this row group, pass along non-compact external buffers + context_->AcquirePool(data_buffer_pool_.get()); + row_group_buffer_size_ = 0; + } } Status HdfsRCFileScanner::ReadRowGroup() { @@ -236,7 +242,7 @@ Status HdfsRCFileScanner::ReadRowGroup() { while (num_rows_ == 0) { RETURN_IF_ERROR(ReadRowGroupHeader()); RETURN_IF_ERROR(ReadKeyBuffers()); - if (has_noncompact_strings_ || row_group_buffer_size_ < row_group_length_) { + if (context_->compact_data() || row_group_buffer_size_ < row_group_length_) { // Allocate a new buffer for reading the row group. Row groups have a // fixed number of rows so take a guess at how big it will be based on // the previous row group size. @@ -491,7 +497,7 @@ Status HdfsRCFileScanner::ProcessRange() { reinterpret_cast(row_group_buffer_ + row_group_length_)); if (!text_converter_->WriteSlot(slot_desc, tuple, - col_start, field_len, !has_noncompact_strings_, false, pool)) { + col_start, field_len, context_->compact_data(), false, pool)) { ReportColumnParseError(slot_desc, col_start, field_len); error_in_row = true; } diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h index 3875ac97b..822f7a130 100644 --- a/be/src/exec/hdfs-rcfile-scanner.h +++ b/be/src/exec/hdfs-rcfile-scanner.h @@ -139,7 +139,7 @@ // // rowgroup-key-length ::= Int // -// -- Total compressed length in bytes of the rowgroup's key sections. +// -- Total compressed length in bytes of the rowgroup's key sections. // // rowgroup-compressed-key-length ::= Int // diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 93708b66b..ef27c6ace 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -92,16 +92,13 @@ Status HdfsSequenceScanner::InitNewRange() { } Status HdfsSequenceScanner::Prepare() { - RETURN_IF_ERROR(HdfsScanner::Prepare()); + RETURN_IF_ERROR(BaseSequenceScanner::Prepare()); // Allocate the scratch space for two pass parsing. The most fields we can go // through in one parse pass is the batch size (tuples) * the number of fields per tuple // TODO: This should probably be based on L2/L3 cache sizes (as should the batch size) record_locations_.resize(state_->batch_size()); field_locations_.resize(state_->batch_size() * scan_node_->materialized_slots().size()); - - decompress_timer_ = ADD_COUNTER( - scan_node_->runtime_profile(), "DecompressionTime", TCounterType::CPU_TICKS); return Status::OK; } diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index 6053df4d6..3da5b7c75 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -247,9 +247,6 @@ class HdfsSequenceScanner : public BaseSequenceScanner { // Next record from block compressed data. uint8_t* next_record_in_compressed_block_; - - // Time spent decompressing bytes - RuntimeProfile::Counter* decompress_timer_; }; } // namespace impala diff --git a/be/src/util/codec.h b/be/src/util/codec.h index 5c577895d..e8bca7509 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -84,14 +84,15 @@ class Codec { virtual ~Codec() {} - // Process a block of data. The operator will allocate the output buffer - // if output_length is passed as 0 and return the length in output_length. - // If it is non-zero the length must be the correct size to hold the transformed output. + // Process a block of data, either compressing or decompressing it. + // If *output_length is 0, the function will allocate from its mempool. + // If *output_length is non-zero, it should be the length of *output and must + // be exactly the size of the transformed output. // Inputs: // input_length: length of the data to process // input: data to process // In/Out: - // output_length: Length of the output, if known, 0 otherwise. + // output_length: Length of the output, if known, 0 otherwise. // Output: // output: Pointer to processed data virtual Status ProcessBlock(int input_length, uint8_t* input, @@ -99,13 +100,14 @@ class Codec { // Return the name of a compression algorithm. static std::string GetCodecName(THdfsCompression::type); - - protected: + // Largest block we will compress/decompress: 2GB. // We are dealing with compressed blocks that are never this big but we // want to guard against a corrupt file that has the block length as some // large number. static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1; + + protected: // Create a compression operator // Inputs: // mem_pool: memory pool to allocate the output buffer, this implies that the diff --git a/be/src/util/compress.h b/be/src/util/compress.h index 5cf546995..fad300f4c 100644 --- a/be/src/util/compress.h +++ b/be/src/util/compress.h @@ -33,7 +33,7 @@ class GzipCompressor : public Codec { GzipCompressor(MemPool* mem_pool, bool reuse_buffer, bool is_gzip); virtual ~GzipCompressor(); - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); @@ -51,7 +51,6 @@ class GzipCompressor : public Codec { // These are magic numbers from zlib.h. Not clear why they are not defined there. const static int WINDOW_BITS = 15; // Maximum window size const static int GZIP_CODEC = 16; // Output Gzip. - }; class BzipCompressor : public Codec { @@ -59,12 +58,11 @@ class BzipCompressor : public Codec { BzipCompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~BzipCompressor() { } - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); // Initialize the compressor. virtual Status Init() { return Status::OK; } - }; class SnappyBlockCompressor : public Codec { @@ -72,14 +70,13 @@ class SnappyBlockCompressor : public Codec { SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~SnappyBlockCompressor() { } - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); protected: // Snappy does not need initialization virtual Status Init() { return Status::OK; } - }; class SnappyCompressor : public Codec { @@ -87,14 +84,13 @@ class SnappyCompressor : public Codec { SnappyCompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~SnappyCompressor() { } - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); protected: // Snappy does not need initialization virtual Status Init() { return Status::OK; } - }; } diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc index e84ecc427..e7018ebba 100644 --- a/be/src/util/decompress.cc +++ b/be/src/util/decompress.cc @@ -27,7 +27,6 @@ using namespace std; using namespace boost; using namespace impala; -//TODO: Decompressors should log their errors. GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer) : Codec(mem_pool, reuse_buffer) { bzero(&stream_, sizeof(stream_)); @@ -87,7 +86,6 @@ Status GzipDecompressor::ProcessBlock(int input_length, uint8_t* input, } out_buffer_ = temp_memory_pool_.Allocate(buffer_length_); if (inflateReset(&stream_) != Z_OK) { - DCHECK(false); return Status("zlib inflateEnd failed: " + string(stream_.msg)); } continue; @@ -96,7 +94,6 @@ Status GzipDecompressor::ProcessBlock(int input_length, uint8_t* input, } } if (inflateReset(&stream_) != Z_OK) { - DCHECK(false); return Status("zlib inflateEnd failed: " + string(stream_.msg)); } @@ -204,57 +201,121 @@ SnappyBlockDecompressor::SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_b : Codec(mem_pool, reuse_buffer) { } -Status SnappyBlockDecompressor::ProcessBlock(int input_length, uint8_t* input, - int* output_length, uint8_t** output) { - // Hadoop uses a block compression scheme on top of snappy. First there is - // an integer which is the size of the decompressed data followed by a - // sequence of compressed blocks each preceded with an integer size. - int32_t length = SerDeUtils::GetInt(input); - DCHECK(*output_length == 0 || length == *output_length); +// Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs +// the input is split into blocks. Each block "contains the uncompressed length for +// the block followed by one of more length-prefixed blocks of compressed data." +// This is essentially blocks of blocks. +// The outer block consists of: +// - 4 byte little endian uncompressed_size +// < inner blocks > +// ... repeated until input_len is consumed .. +// The inner blocks have: +// - 4-byte little endian compressed_size +// < snappy compressed block > +// - 4-byte little endian compressed_size +// < snappy compressed block > +// ... repeated until uncompressed_size from outer block is consumed ... - // If length is non-zero then the output has been allocated. - if (*output_length != 0) { - buffer_length_ = *output_length; - out_buffer_ = *output; - } else if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < length) { - buffer_length_ = length; - if (buffer_length_ > MAX_BLOCK_SIZE) { - return Status("Decompressor: block size is too big"); +// Utility function to decompress snappy block compressed data. If size_only is true, +// this function does not decompress but only computes the output size and writes +// the result to *output_len. +// If size_only is false, output must be preallocated to output_len and this needs to +// be exactly big enough to hold the decompressed output. +// size_only is a O(1) operations (just reads a single varint for each snappy block). +static Status SnappyBlockDecompress(int input_len, uint8_t* input, bool size_only, + int* output_len, char* output) { + + int uncompressed_total_len = 0; + while (input_len > 0) { + size_t uncompressed_block_len = SerDeUtils::GetInt(input); + input += sizeof(int32_t); + input_len -= sizeof(int32_t); + + if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE || uncompressed_block_len == 0) { + if (uncompressed_total_len == 0) { + return Status("Decompressor: block size is too big. Data is likely corrupt."); + } + break; + } + + if (!size_only) { + int remaining_output_size = *output_len - uncompressed_total_len; + DCHECK_GE(remaining_output_size, uncompressed_block_len); + } + + while (uncompressed_block_len > 0) { + // Read the length of the next snappy compressed block. + size_t compressed_len = SerDeUtils::GetInt(input); + input += sizeof(int32_t); + input_len -= sizeof(int32_t); + + if (compressed_len == 0 || compressed_len > input_len) { + if (uncompressed_total_len == 0) { + return Status( + "Decompressor: invalid compressed length. Data is likely corrupt."); + } + input_len =0; + break; + } + + // Read how big the output will be. + size_t uncompressed_len; + if (!snappy::GetUncompressedLength(reinterpret_cast(input), + input_len, &uncompressed_len)) { + if (uncompressed_total_len == 0) { + return Status("Snappy: GetUncompressedLength failed"); + } + input_len =0; + break; + } + DCHECK_GT(uncompressed_len, 0); + + if (!size_only) { + // Decompress this snappy block + if (!snappy::RawUncompress(reinterpret_cast(input), + compressed_len, output)) { + return Status("Snappy: RawUncompress failed"); + } + output += uncompressed_len; + } + + input += compressed_len; + input_len -= compressed_len; + uncompressed_block_len -= uncompressed_len; + uncompressed_total_len += uncompressed_len; } - out_buffer_ = memory_pool_->Allocate(buffer_length_); } - input += sizeof(length); - input_length -= sizeof(length); - - uint8_t* outp = out_buffer_; - do { - // Read the length of the next block. - length = SerDeUtils::GetInt(input); - - if (length == 0) break; - - input += sizeof(length); - input_length -= sizeof(length); - - // Read how big the output will be. - size_t uncompressed_length; - if (!snappy::GetUncompressedLength(reinterpret_cast(input), - input_length, &uncompressed_length)) { - return Status("Snappy: GetUncompressedLength failed"); - } - - DCHECK_GT(uncompressed_length, 0); - if (!snappy::RawUncompress(reinterpret_cast(input), - static_cast(length), reinterpret_cast(outp))) { - return Status("Snappy: RawUncompress failed"); - } - input += length; - input_length -= length; - outp += uncompressed_length; - } while (input_length > 0); - - *output = out_buffer_; - if (*output_length == 0) *output_length = outp - out_buffer_; + if (size_only) { + *output_len = uncompressed_total_len; + } else if (*output_len != uncompressed_total_len) { + return Status("Snappy: Decompressed size is not correct."); + } + return Status::OK; +} + +Status SnappyBlockDecompressor::ProcessBlock(int input_len, uint8_t* input, + int* output_len, uint8_t** output) { + if (*output_len == 0) { + // If we don't know the size beforehand, compute it. + RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, NULL)); + DCHECK_NE(*output_len, 0); + + if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < *output_len) { + // Need to allocate a new buffer + buffer_length_ = *output_len; + out_buffer_ = memory_pool_->Allocate(buffer_length_); + } + *output = out_buffer_; + } + DCHECK(*output != NULL); + + if (*output_len > MAX_BLOCK_SIZE) { + // TODO: is this check really robust? + return Status("Decompressor: block size is too big. Data is likely corrupt."); + } + + char* out_ptr = reinterpret_cast(*output); + RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr)); return Status::OK; } diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h index e62e07254..867daa554 100644 --- a/be/src/util/decompress.h +++ b/be/src/util/decompress.h @@ -30,7 +30,7 @@ class GzipDecompressor : public Codec { GzipDecompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~GzipDecompressor(); - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); @@ -51,13 +51,12 @@ class BzipDecompressor : public Codec { BzipDecompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~BzipDecompressor() { } - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); protected: // Bzip does not need initialization - virtual Status Init() { return Status::OK; } - + virtual Status Init() { return Status::OK; } }; class SnappyDecompressor : public Codec { @@ -65,7 +64,7 @@ class SnappyDecompressor : public Codec { SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~SnappyDecompressor() { } - //Process a block of data. + // Process a block of data. virtual Status ProcessBlock(int input_length, uint8_t* input, int* output_length, uint8_t** output); @@ -74,6 +73,7 @@ class SnappyDecompressor : public Codec { virtual Status Init() { return Status::OK; } }; + class SnappyBlockDecompressor : public Codec { public: SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer); @@ -86,7 +86,6 @@ class SnappyBlockDecompressor : public Codec { protected: // Snappy does not need initialization virtual Status Init() { return Status::OK; } - }; } diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index 067045bd1..5e898ebfd 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # Copyright (c) 2012 Cloudera, Inc. All rights reserved. +import pytest from os.path import join from subprocess import call from tests.common.test_vector import * @@ -33,6 +34,7 @@ class TestCompressedFormats(ImpalaTestSuite): cls.TestMatrix.add_dimension(\ TestDimension('compression_format', *compression_formats)) + @pytest.mark.execute_serially def test_compressed_formats(self, vector): file_format = vector.get_value('file_format') extension, suffix = vector.get_value('compression_format')