mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Add support for streaming decompression of gzip text
Compressed text formats currently require entire compressed files be read into memory to be decompressed in a single call to the decompression codec. This changes the HdfsTextScanner to drive gzip in a streaming mode, i.e. produce partial output as input is consumed. Change-Id: Id5c0805e18cf6b606bcf27a5df4b5f58895809fd Reviewed-on: http://gerrit.sjc.cloudera.com:8080/5233 Reviewed-by: Matthew Jacobs <mj@cloudera.com> Tested-by: jenkins (cherry picked from commit 05c3cc55e7a601d97adc4eebe03f878c68a33e56) Reviewed-on: http://gerrit.sjc.cloudera.com:8080/5385
This commit is contained in:
@@ -34,18 +34,27 @@ using namespace impala;
|
||||
using namespace llvm;
|
||||
using namespace std;
|
||||
|
||||
|
||||
DEFINE_bool(debug_disable_streaming_gzip, false, "Debug flag, will be removed. Disables "
|
||||
"streaming gzip decompression.");
|
||||
|
||||
const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
|
||||
|
||||
// Suffix for lzo index file: hdfs-filename.index
|
||||
const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
|
||||
|
||||
// Number of bytes to read when the previous attempt to streaming decompress did not make
|
||||
// progress.
|
||||
const int64_t GZIP_FIXED_READ_SIZE = 1 * 1024 * 1024;
|
||||
|
||||
HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state)
|
||||
: HdfsScanner(scan_node, state),
|
||||
byte_buffer_ptr_(NULL),
|
||||
byte_buffer_end_(NULL),
|
||||
byte_buffer_read_size_(0),
|
||||
boundary_row_(data_buffer_pool_.get()),
|
||||
boundary_column_(data_buffer_pool_.get()),
|
||||
boundary_pool_(new MemPool(scan_node->mem_tracker())),
|
||||
boundary_row_(boundary_pool_.get()),
|
||||
boundary_column_(boundary_pool_.get()),
|
||||
slot_idx_(0),
|
||||
error_in_row_(false) {
|
||||
}
|
||||
@@ -179,6 +188,7 @@ void HdfsTextScanner::Close() {
|
||||
decompressor_.reset(NULL);
|
||||
}
|
||||
AttachPool(data_buffer_pool_.get(), false);
|
||||
AttachPool(boundary_pool_.get(), false);
|
||||
AddFinalRowBatch();
|
||||
scan_node_->RangeComplete(THdfsFileFormat::TEXT, stream_->file_desc()->file_compression);
|
||||
HdfsScanner::Close();
|
||||
@@ -227,7 +237,7 @@ Status HdfsTextScanner::ResetScanner() {
|
||||
byte_buffer_ptr_ = byte_buffer_end_ = NULL;
|
||||
|
||||
partial_tuple_ =
|
||||
Tuple::Create(scan_node_->tuple_desc()->byte_size(), data_buffer_pool_.get());
|
||||
Tuple::Create(scan_node_->tuple_desc()->byte_size(), boundary_pool_.get());
|
||||
|
||||
// Initialize codegen fn
|
||||
RETURN_IF_ERROR(InitializeWriteTuplesFn(
|
||||
@@ -382,53 +392,7 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
|
||||
*eosr = false;
|
||||
Status status;
|
||||
|
||||
// If compressed text, decompress, point to the decompressed buffer, and then continue
|
||||
// normal processing.
|
||||
if (decompressor_.get() != NULL) {
|
||||
// Attempt to read the whole file.
|
||||
HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
|
||||
int64_t file_size = desc->file_length;
|
||||
DCHECK_GT(file_size, 0);
|
||||
stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
|
||||
&byte_buffer_read_size_, &status);
|
||||
if (!status.ok()) return status;
|
||||
|
||||
// If didn't read anything, return.
|
||||
if (byte_buffer_read_size_ == 0) {
|
||||
*eosr = true;
|
||||
return Status::OK;
|
||||
}
|
||||
DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
|
||||
|
||||
// For gzip and snappy it needs to read the entire file.
|
||||
if ((decompression_type_ == THdfsCompression::GZIP ||
|
||||
decompression_type_ == THdfsCompression::SNAPPY_BLOCKED ||
|
||||
decompression_type_ == THdfsCompression::BZIP2) &&
|
||||
(file_size < byte_buffer_read_size_)) {
|
||||
stringstream ss;
|
||||
ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
|
||||
<< "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
|
||||
<< "data file corruption. (file: " << stream_->filename() << ").";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ accordingly.
|
||||
int64_t decompressed_len = 0;
|
||||
uint8_t* decompressed_buffer = NULL;
|
||||
SCOPED_TIMER(decompress_timer_);
|
||||
// TODO: Once the writers are in, add tests with very large compressed files (4GB)
|
||||
// that could overflow.
|
||||
RETURN_IF_ERROR(decompressor_->ProcessBlock(false, byte_buffer_read_size_,
|
||||
reinterpret_cast<uint8_t*>(byte_buffer_ptr_), &decompressed_len,
|
||||
&decompressed_buffer));
|
||||
|
||||
// Inform stream_ that the buffer with the compressed text can be released.
|
||||
context_->ReleaseCompletedResources(NULL, true);
|
||||
|
||||
VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;
|
||||
byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
|
||||
byte_buffer_read_size_ = decompressed_len;
|
||||
} else {
|
||||
if (decompressor_.get() == NULL) {
|
||||
if (num_bytes > 0) {
|
||||
stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
|
||||
&byte_buffer_read_size_, &status);
|
||||
@@ -437,12 +401,158 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
|
||||
status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
|
||||
&byte_buffer_read_size_);
|
||||
}
|
||||
*eosr = stream_->eosr();
|
||||
} else if (!FLAGS_debug_disable_streaming_gzip &&
|
||||
decompression_type_ == THdfsCompression::GZIP) {
|
||||
DCHECK_EQ(num_bytes, 0);
|
||||
RETURN_IF_ERROR(FillByteBufferGzip(eosr));
|
||||
} else {
|
||||
DCHECK_EQ(num_bytes, 0);
|
||||
RETURN_IF_ERROR(FillByteBufferCompressedFile(eosr));
|
||||
}
|
||||
|
||||
byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_;
|
||||
*eosr = stream_->eosr();
|
||||
return status;
|
||||
}
|
||||
|
||||
Status HdfsTextScanner::FillByteBufferGzip(bool* eosr) {
|
||||
// Attach any previously decompressed buffers to the row batch before decompressing
|
||||
// any more data.
|
||||
if (!decompressor_->reuse_output_buffer()) {
|
||||
AttachPool(data_buffer_pool_.get(), false);
|
||||
}
|
||||
|
||||
// Gzip compressed text is decompressed as buffers are read from stream_ (unlike
|
||||
// other codecs which decompress the entire file in a single call). A compressed
|
||||
// buffer is passed to ProcessBlockStreaming but it may not consume all of the input.
|
||||
// In the unlikely case that decompressed output is not produced, we attempt to try
|
||||
// again with a reasonably large fixed size input buffer (explicitly calling
|
||||
// GetBytes()) before failing.
|
||||
bool try_read_fixed_size = false;
|
||||
uint8_t* decompressed_buffer = NULL;
|
||||
int64_t decompressed_len = 0;
|
||||
do {
|
||||
uint8_t* gzip_buffer_ptr = NULL;
|
||||
int64_t gzip_buffer_size = 0;
|
||||
// We don't know how many bytes ProcessBlockStreaming() will consume so we set
|
||||
// peak=true and then later advance the stream using SkipBytes().
|
||||
if (!try_read_fixed_size) {
|
||||
RETURN_IF_ERROR(stream_->GetBuffer(true, &gzip_buffer_ptr, &gzip_buffer_size));
|
||||
} else {
|
||||
Status status;
|
||||
stream_->GetBytes(GZIP_FIXED_READ_SIZE, &gzip_buffer_ptr, &gzip_buffer_size,
|
||||
&status, true);
|
||||
RETURN_IF_ERROR(status);
|
||||
try_read_fixed_size = false;
|
||||
}
|
||||
if (gzip_buffer_size == 0) {
|
||||
// If the compressed file was not properly ended, the decoder will not know that
|
||||
// the last buffer should have been eos.
|
||||
stringstream ss;
|
||||
ss << "Unexpected end of file decompressing gzip. File may be malformed. ";
|
||||
ss << "file: " << stream_->filename();
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
int64_t gzip_buffer_bytes_read = 0;
|
||||
{
|
||||
SCOPED_TIMER(decompress_timer_);
|
||||
RETURN_IF_ERROR(decompressor_->ProcessBlockStreaming(gzip_buffer_size,
|
||||
gzip_buffer_ptr, &gzip_buffer_bytes_read, &decompressed_len,
|
||||
&decompressed_buffer, eosr));
|
||||
DCHECK_GE(gzip_buffer_size, gzip_buffer_bytes_read);
|
||||
DCHECK_GE(decompressed_len, 0);
|
||||
}
|
||||
|
||||
// Skip the bytes in stream_ that were decompressed.
|
||||
Status status;
|
||||
stream_->SkipBytes(gzip_buffer_bytes_read, &status);
|
||||
RETURN_IF_ERROR(status);
|
||||
|
||||
if (!*eosr && decompressed_len == 0) {
|
||||
// It's possible (but very unlikely) that ProcessBlockStreaming() wasn't able to
|
||||
// make progress if the compressed buffer returned by GetBytes() is too small.
|
||||
// (Note that this did not even occur in simple experiments where the input buffer
|
||||
// is always 1 byte, but we need to handle this case to be defensive.) In this
|
||||
// case, try again with a reasonably large fixed size buffer. If we still did not
|
||||
// make progress, then return an error.
|
||||
if (try_read_fixed_size) {
|
||||
stringstream ss;
|
||||
ss << "Unable to make progress decoding gzip text. ";
|
||||
ss << "file: " << stream_->filename();
|
||||
return Status(ss.str());
|
||||
}
|
||||
VLOG_FILE << "Unable to make progress decompressing gzip, trying again";
|
||||
try_read_fixed_size = true;
|
||||
}
|
||||
} while (try_read_fixed_size);
|
||||
|
||||
byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
|
||||
byte_buffer_read_size_ = decompressed_len;
|
||||
|
||||
if (*eosr) {
|
||||
if (!stream_->eosr()) {
|
||||
// TODO: Add a test case that exercises this path.
|
||||
stringstream ss;
|
||||
ss << "Unexpected end of gzip stream before end of file: ";
|
||||
ss << stream_->filename();
|
||||
if (state_->LogHasSpace()) state_->LogError(ss.str());
|
||||
if (state_->abort_on_error()) parse_status_ = Status(ss.str());
|
||||
RETURN_IF_ERROR(parse_status_);
|
||||
}
|
||||
|
||||
context_->ReleaseCompletedResources(NULL, true);
|
||||
}
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
|
||||
// For other compressed text: attempt to read and decompress the entire file, point
|
||||
// to the decompressed buffer, and then continue normal processing.
|
||||
DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
|
||||
HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
|
||||
int64_t file_size = desc->file_length;
|
||||
DCHECK_GT(file_size, 0);
|
||||
Status status;
|
||||
stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
|
||||
&byte_buffer_read_size_, &status);
|
||||
RETURN_IF_ERROR(status);
|
||||
|
||||
// If didn't read anything, return.
|
||||
if (byte_buffer_read_size_ == 0) {
|
||||
*eosr = true;
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// Need to read the entire file.
|
||||
if (file_size < byte_buffer_read_size_) {
|
||||
stringstream ss;
|
||||
ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
|
||||
<< "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
|
||||
<< "data file corruption. (file: " << stream_->filename() << ").";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ accordingly.
|
||||
int64_t decompressed_len = 0;
|
||||
uint8_t* decompressed_buffer = NULL;
|
||||
SCOPED_TIMER(decompress_timer_);
|
||||
// TODO: Once the writers are in, add tests with very large compressed files (4GB)
|
||||
// that could overflow.
|
||||
RETURN_IF_ERROR(decompressor_->ProcessBlock(false, byte_buffer_read_size_,
|
||||
reinterpret_cast<uint8_t*>(byte_buffer_ptr_), &decompressed_len,
|
||||
&decompressed_buffer));
|
||||
|
||||
// Inform stream_ that the buffer with the compressed text can be released.
|
||||
context_->ReleaseCompletedResources(NULL, true);
|
||||
|
||||
VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;
|
||||
byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
|
||||
byte_buffer_read_size_ = decompressed_len;
|
||||
*eosr = stream_->eosr();
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
|
||||
*tuple_found = true;
|
||||
if (stream_->scan_range()->offset() != 0) {
|
||||
|
||||
@@ -92,10 +92,18 @@ class HdfsTextScanner : public HdfsScanner {
|
||||
// ready. Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_.
|
||||
// If num_bytes is 0, the scanner will read whatever is the io mgr buffer size,
|
||||
// otherwise it will just read num_bytes.
|
||||
// If the file is compressed, then it will attempt to read the file, decompress it and
|
||||
// set the byte_buffer_ptr_ to pointing to the decompressed buffer.
|
||||
virtual Status FillByteBuffer(bool* eosr, int num_bytes = 0);
|
||||
|
||||
// Fills the next byte buffer from the compressed data in stream_ by reading the entire
|
||||
// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer.
|
||||
Status FillByteBufferCompressedFile(bool* eosr);
|
||||
|
||||
// Fills the next byte buffer from the gzip compressed data in stream_. Unlike
|
||||
// FillByteBufferCompressedFile(), the entire file does not need to be read at once.
|
||||
// Buffers from stream_ are decompressed as they are read and byte_buffer_ptr_ is set
|
||||
// to available decompressed data.
|
||||
Status FillByteBufferGzip(bool* eosr);
|
||||
|
||||
// Prepends field data that was from the previous file buffer (This field straddled two
|
||||
// file buffers). 'data' already contains the pointer/len from the current file buffer,
|
||||
// boundary_column_ contains the beginning of the data from the previous file
|
||||
@@ -122,6 +130,9 @@ class HdfsTextScanner : public HdfsScanner {
|
||||
// row_idx is 0-based (in current batch) where the parse error occured.
|
||||
virtual void LogRowParseError(int row_idx, std::stringstream*);
|
||||
|
||||
// Mem pool for boundary_row_ and boundary_column_.
|
||||
boost::scoped_ptr<MemPool> boundary_pool_;
|
||||
|
||||
// Helper string for dealing with input rows that span file blocks.
|
||||
// We keep track of a whole line that spans file blocks to be able to report
|
||||
// the line as erroneous in case of parsing errors.
|
||||
|
||||
@@ -110,6 +110,15 @@ class Codec {
|
||||
Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t* input,
|
||||
int* output_length, uint8_t** output);
|
||||
|
||||
// Process data like ProcessBlock(), but can consume partial input and may only produce
|
||||
// partial output. *input_bytes_read returns the number of bytes of input that have
|
||||
// been consumed. Even if all input has been consumed, the caller must continue calling
|
||||
// to fetch output until *eos returns true.
|
||||
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
|
||||
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* eos) {
|
||||
return Status("Not implemented.");
|
||||
}
|
||||
|
||||
// Returns the maximum result length from applying the codec to input.
|
||||
// Note this is not the exact result length, simply a bound to allow preallocating
|
||||
// a buffer.
|
||||
|
||||
@@ -39,6 +39,13 @@ class DecompressorTest : public ::testing::Test {
|
||||
*ip++ = ch;
|
||||
}
|
||||
}
|
||||
|
||||
// The input for the streaming tests is a larger buffer which contains input_
|
||||
// at the beginning and end and is null otherwise.
|
||||
memset(&input_streaming_, 0, sizeof(input_streaming_));
|
||||
memcpy(&input_streaming_, &input_, sizeof(input_));
|
||||
memcpy(&input_streaming_[sizeof(input_streaming_) - sizeof(input_)],
|
||||
&input_, sizeof(input_));
|
||||
}
|
||||
|
||||
~DecompressorTest() {
|
||||
@@ -75,6 +82,23 @@ class DecompressorTest : public ::testing::Test {
|
||||
decompressor->Close();
|
||||
}
|
||||
|
||||
void RunTestStreaming(THdfsCompression::type format) {
|
||||
scoped_ptr<Codec> compressor;
|
||||
scoped_ptr<Codec> decompressor;
|
||||
EXPECT_TRUE(
|
||||
Codec::CreateCompressor(&mem_pool_, true, format, &compressor).ok());
|
||||
EXPECT_TRUE(
|
||||
Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor).ok());
|
||||
|
||||
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
|
||||
sizeof(input_streaming_), input_streaming_);
|
||||
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
|
||||
0, NULL);
|
||||
|
||||
compressor->Close();
|
||||
decompressor->Close();
|
||||
}
|
||||
|
||||
void CompressAndDecompress(Codec* compressor, Codec* decompressor,
|
||||
int64_t input_len, uint8_t* input) {
|
||||
// Non-preallocated output buffers
|
||||
@@ -115,6 +139,33 @@ class DecompressorTest : public ::testing::Test {
|
||||
EXPECT_EQ(memcmp(input, output, input_len), 0);
|
||||
}
|
||||
|
||||
void CompressAndStreamingDecompress(Codec* compressor, Codec* decompressor,
|
||||
int64_t input_len, uint8_t* input) {
|
||||
uint8_t* compressed;
|
||||
int64_t compressed_length;
|
||||
EXPECT_TRUE(compressor->ProcessBlock(false, input_len,
|
||||
input, &compressed_length, &compressed).ok());
|
||||
|
||||
// Should take multiple calls to ProcessBlockStreaming() to decompress the buffer.
|
||||
int64_t total_output_produced = 0;
|
||||
int64_t compressed_bytes_remaining = compressed_length;
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
EXPECT_LE(total_output_produced, input_len);
|
||||
uint8_t* output = NULL;
|
||||
int64_t output_len = 0;
|
||||
int64_t compressed_bytes_read = 0;
|
||||
EXPECT_TRUE(decompressor->ProcessBlockStreaming(compressed_bytes_remaining,
|
||||
compressed, &compressed_bytes_read, &output_len, &output, &eos).ok());
|
||||
EXPECT_EQ(memcmp(input + total_output_produced, output, output_len), 0);
|
||||
total_output_produced += output_len;
|
||||
compressed = compressed + compressed_bytes_read;
|
||||
compressed_bytes_remaining -= compressed_bytes_read;
|
||||
}
|
||||
EXPECT_EQ(0, compressed_bytes_remaining);
|
||||
EXPECT_EQ(total_output_produced, input_len);
|
||||
}
|
||||
|
||||
// Only tests compressors and decompressors with allocated output.
|
||||
void CompressAndDecompressNoOutputAllocated(Codec* compressor,
|
||||
Codec* decompressor, int64_t input_len, uint8_t* input) {
|
||||
@@ -137,7 +188,15 @@ class DecompressorTest : public ::testing::Test {
|
||||
EXPECT_EQ(memcmp(input, output, input_len), 0);
|
||||
}
|
||||
|
||||
|
||||
uint8_t input_[2 * 26 * 1024];
|
||||
|
||||
// Buffer for testing ProcessBlockStreaming() which allocates 16mb output buffers. This
|
||||
// is 2x + 1 the size of the output buffers to ensure that the decompressed output
|
||||
// requires several calls and doesn't need to be nicely aligned (the last call gets a
|
||||
// small amount of data).
|
||||
uint8_t input_streaming_[32 * 1024 * 1024 + 1];
|
||||
|
||||
MemTracker mem_tracker_;
|
||||
MemPool mem_pool_;
|
||||
};
|
||||
@@ -156,10 +215,12 @@ TEST_F(DecompressorTest, LZ4) {
|
||||
|
||||
TEST_F(DecompressorTest, Gzip) {
|
||||
RunTest(THdfsCompression::GZIP);
|
||||
RunTestStreaming(THdfsCompression::GZIP);
|
||||
}
|
||||
|
||||
TEST_F(DecompressorTest, Deflate) {
|
||||
RunTest(THdfsCompression::DEFLATE);
|
||||
RunTestStreaming(THdfsCompression::GZIP);
|
||||
}
|
||||
|
||||
TEST_F(DecompressorTest, Bzip) {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "util/decompress.h"
|
||||
#include "exec/read-write-util.h"
|
||||
#include "runtime/runtime-state.h"
|
||||
#include "common/logging.h"
|
||||
#include "gen-cpp/Descriptors_types.h"
|
||||
|
||||
// Codec libraries
|
||||
@@ -28,6 +29,9 @@ using namespace std;
|
||||
using namespace boost;
|
||||
using namespace impala;
|
||||
|
||||
// Output buffer size for streaming gzip
|
||||
const int64_t STREAM_GZIP_OUT_BUF_SIZE = 16 * 1024 * 1024;
|
||||
|
||||
GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate)
|
||||
: Codec(mem_pool, reuse_buffer),
|
||||
is_deflate_(is_deflate) {
|
||||
@@ -53,6 +57,64 @@ int64_t GzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input)
|
||||
return -1;
|
||||
}
|
||||
|
||||
string GzipDecompressor::DebugStreamState() const {
|
||||
stringstream ss;
|
||||
ss << "next_in=" << (void*)stream_.next_in;
|
||||
ss << " avail_in=" << stream_.avail_in;
|
||||
ss << " total_in=" << stream_.total_in;
|
||||
ss << " next_out=" << (void*)stream_.next_out;
|
||||
ss << " avail_out=" << stream_.avail_out;
|
||||
ss << " total_out=" << stream_.total_out;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
|
||||
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* eos) {
|
||||
if (!reuse_buffer_ || out_buffer_ == NULL) {
|
||||
buffer_length_ = STREAM_GZIP_OUT_BUF_SIZE;
|
||||
out_buffer_ = memory_pool_->Allocate(buffer_length_);
|
||||
}
|
||||
*output = out_buffer_;
|
||||
*output_length = buffer_length_;
|
||||
*input_bytes_read = 0;
|
||||
*eos = false;
|
||||
|
||||
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
|
||||
stream_.avail_in = input_length;
|
||||
stream_.next_out = reinterpret_cast<Bytef*>(*output);
|
||||
stream_.avail_out = *output_length;
|
||||
VLOG_ROW << "ProcessBlockStreaming() stream: " << DebugStreamState();
|
||||
|
||||
int ret = inflate(&stream_, Z_SYNC_FLUSH);
|
||||
if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) {
|
||||
stringstream ss;
|
||||
ss << "GzipDecompressor failed, ret=" << ret;
|
||||
if (stream_.msg != NULL) ss << " msg=" << stream_.msg;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// stream_.avail_out is the number of bytes *left* in the out buffer, but
|
||||
// we're interested in the number of bytes used.
|
||||
*output_length = *output_length - stream_.avail_out;
|
||||
*input_bytes_read = input_length - stream_.avail_in;
|
||||
VLOG_ROW << "inflate() ret=" << ret << " consumed=" << *input_bytes_read
|
||||
<< " produced=" << *output_length << " stream: " << DebugStreamState();
|
||||
|
||||
if (ret == Z_BUF_ERROR) {
|
||||
// Z_BUF_ERROR is returned if no progress was made. This should be very unlikely.
|
||||
// The caller should check for this case (where 0 bytes were consumed, 0 bytes
|
||||
// produced) and try again with more input.
|
||||
DCHECK_EQ(0, *output_length);
|
||||
DCHECK_EQ(0, *input_bytes_read);
|
||||
} else if (ret == Z_STREAM_END) {
|
||||
*eos = true;
|
||||
if (inflateReset(&stream_) != Z_OK) {
|
||||
return Status("zlib inflateReset failed: " + string(stream_.msg));
|
||||
}
|
||||
}
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
|
||||
const uint8_t* input, int64_t* output_length, uint8_t** output) {
|
||||
if (output_preallocated && *output_length == 0) {
|
||||
|
||||
@@ -31,6 +31,8 @@ class GzipDecompressor : public Codec {
|
||||
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
|
||||
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
|
||||
const uint8_t* input, int64_t* output_length, uint8_t** output);
|
||||
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
|
||||
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* eos);
|
||||
virtual std::string file_extension() const { return "gz"; }
|
||||
|
||||
private:
|
||||
@@ -38,6 +40,7 @@ class GzipDecompressor : public Codec {
|
||||
GzipDecompressor(
|
||||
MemPool* mem_pool = NULL, bool reuse_buffer = false, bool is_deflate = false);
|
||||
virtual Status Init();
|
||||
std::string DebugStreamState() const;
|
||||
|
||||
// If set assume deflate format, otherwise zlib or gzip
|
||||
bool is_deflate_;
|
||||
|
||||
BIN
testdata/bad_text_gzip/file_not_finished.gz
vendored
Executable file
BIN
testdata/bad_text_gzip/file_not_finished.gz
vendored
Executable file
Binary file not shown.
@@ -1250,6 +1250,16 @@ LOAD DATA LOCAL INPATH '${{env:IMPALA_HOME}}/testdata/bad_text_lzo/bad_text.lzo'
|
||||
---- DATASET
|
||||
functional
|
||||
---- BASE_TABLE_NAME
|
||||
bad_text_gzip
|
||||
---- COLUMNS
|
||||
s STRING
|
||||
i INT
|
||||
---- DEPENDENT_LOAD
|
||||
LOAD DATA LOCAL INPATH '${{env:IMPALA_HOME}}/testdata/bad_text_gzip/file_not_finished.gz' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
|
||||
====
|
||||
---- DATASET
|
||||
functional
|
||||
---- BASE_TABLE_NAME
|
||||
bad_seq_snap
|
||||
---- COLUMNS
|
||||
field STRING
|
||||
|
||||
@@ -33,6 +33,7 @@ table_name:insert_string_partitioned, constraint:restrict_to, table_format:parqu
|
||||
|
||||
table_name:old_rcfile_table, constraint:restrict_to, table_format:rc/none/none
|
||||
table_name:bad_text_lzo, constraint:restrict_to, table_format:text/lzo/block
|
||||
table_name:bad_text_gzip, constraint:restrict_to, table_format:text/gzip/block
|
||||
table_name:bad_seq_snap, constraint:restrict_to, table_format:seq/snap/block
|
||||
table_name:bad_parquet, constraint:restrict_to, table_format:parquet/none/none
|
||||
|
||||
|
||||
|
@@ -1,76 +1,88 @@
|
||||
====
|
||||
---- QUERY
|
||||
# TODO: the error info should be sufficient to pin point the data location: filename and
|
||||
# offset
|
||||
# TODO: printing the entire record will break column level security (when it is
|
||||
# implemented).
|
||||
select id, bool_col, tinyint_col, smallint_col from alltypeserror
|
||||
---- ERRORS
|
||||
|
||||
Error converting column: 3 TO SMALLINT (Data is: abc3)
|
||||
file: hdfs://regex:.$
|
||||
record: 23,false,3,abc3,3,30,3.000000,30.300000,03/01/09,3,2020-10-10 60:10:10.123
|
||||
Error converting column: 2 TO TINYINT (Data is: abc7)
|
||||
file: hdfs://regex:.$
|
||||
record: 27,false,abc7,7,7,70,7.000000,70.700000,03/01/09,7,2020-10-10 10:10:10.123
|
||||
Error converting column: 2 TO TINYINT (Data is: err30)
|
||||
Error converting column: 3 TO SMALLINT (Data is: err30)
|
||||
file: hdfs://regex:.$
|
||||
record: 30,t\rue,err30,err30,err30,err300,err30..000000,err300.900000,01/01/10,10,0000-01-01 00:00:00
|
||||
Error converting column: 2 TO TINYINT (Data is: xyz5)
|
||||
file: hdfs://regex:.$
|
||||
record: 15,false,xyz5,5,5,50,5.000000,50.500000,02/01/09,5,0009-01-01 00:00:00
|
||||
Error converting column: 1 TO BOOL (Data is: errfalse)
|
||||
file: hdfs://regex:.$
|
||||
record: 1,errfalse,,1,1,10,1.000000,10.100000,01/01/09,1,1999-10-10
|
||||
Error converting column: 2 TO TINYINT (Data is: err2)
|
||||
file: hdfs://regex:.$
|
||||
record: 2,true,err2,,2,20,2.000000,20.200000,01/01/09,2,1999-10-10 90:10:10
|
||||
Error converting column: 3 TO SMALLINT (Data is: err3)
|
||||
file: hdfs://regex:.$
|
||||
record: 3,false,3,err3,,30,3.000000,30.300000,01/01/09,3,2002-14-10 00:00:00
|
||||
Error converting column: 1 TO BOOL (Data is: errtrue)
|
||||
Error converting column: 2 TO TINYINT (Data is: err9)
|
||||
Error converting column: 3 TO SMALLINT (Data is: err9)
|
||||
file: hdfs://regex:.$
|
||||
record: 9,errtrue,err9,err9,err9,err90,err9.000000,err90.900000,01/01/09,9,0000-01-01 00:00:00
|
||||
|
||||
---- RESULTS
|
||||
0,NULL,NULL,0
|
||||
1,NULL,NULL,1
|
||||
10,NULL,NULL,NULL
|
||||
11,false,NULL,NULL
|
||||
12,true,2,NULL
|
||||
13,false,3,3
|
||||
14,true,4,4
|
||||
15,false,NULL,5
|
||||
16,NULL,NULL,NULL
|
||||
17,false,7,7
|
||||
18,true,8,8
|
||||
19,false,9,9
|
||||
2,true,NULL,NULL
|
||||
20,true,0,0
|
||||
21,false,1,1
|
||||
22,true,2,2
|
||||
23,false,3,NULL
|
||||
24,true,4,4
|
||||
25,false,5,5
|
||||
26,true,6,6
|
||||
27,false,NULL,7
|
||||
28,true,8,8
|
||||
29,false,9,9
|
||||
3,false,3,NULL
|
||||
30,NULL,NULL,NULL
|
||||
4,true,4,4
|
||||
5,false,5,5
|
||||
6,true,6,6
|
||||
7,NULL,NULL,7
|
||||
8,false,NULL,NULL
|
||||
9,NULL,NULL,NULL
|
||||
---- TYPES
|
||||
int, boolean, tinyint, smallint
|
||||
====
|
||||
---- QUERY
|
||||
## TODO: IMPALA-1502: The following tests are currently broken. We weren't running
|
||||
## them by mistake for some time and no longer work.
|
||||
##
|
||||
## TODO: the error info should be sufficient to pin point the data location: filename and
|
||||
## offset
|
||||
## TODO: printing the entire record will break column level security (when it is
|
||||
## implemented).
|
||||
#select id, bool_col, tinyint_col, smallint_col from alltypeserror
|
||||
#---- ERRORS
|
||||
#
|
||||
#Error converting column: 3 TO SMALLINT (Data is: abc3)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 23,false,3,abc3,3,30,3.000000,30.300000,03/01/09,3,2020-10-10 60:10:10.123
|
||||
#Error converting column: 2 TO TINYINT (Data is: abc7)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 27,false,abc7,7,7,70,7.000000,70.700000,03/01/09,7,2020-10-10 10:10:10.123
|
||||
#Error converting column: 2 TO TINYINT (Data is: err30)
|
||||
#Error converting column: 3 TO SMALLINT (Data is: err30)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 30,t\rue,err30,err30,err30,err300,err30..000000,err300.900000,01/01/10,10,0000-01-01 00:00:00
|
||||
#Error converting column: 2 TO TINYINT (Data is: xyz5)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 15,false,xyz5,5,5,50,5.000000,50.500000,02/01/09,5,0009-01-01 00:00:00
|
||||
#Error converting column: 1 TO BOOL (Data is: errfalse)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 1,errfalse,,1,1,10,1.000000,10.100000,01/01/09,1,1999-10-10
|
||||
#Error converting column: 2 TO TINYINT (Data is: err2)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 2,true,err2,,2,20,2.000000,20.200000,01/01/09,2,1999-10-10 90:10:10
|
||||
#Error converting column: 3 TO SMALLINT (Data is: err3)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 3,false,3,err3,,30,3.000000,30.300000,01/01/09,3,2002-14-10 00:00:00
|
||||
#Error converting column: 1 TO BOOL (Data is: errtrue)
|
||||
#Error converting column: 2 TO TINYINT (Data is: err9)
|
||||
#Error converting column: 3 TO SMALLINT (Data is: err9)
|
||||
#file: hdfs://regex:.$
|
||||
#record: 9,errtrue,err9,err9,err9,err90,err9.000000,err90.900000,01/01/09,9,0000-01-01 00:00:00
|
||||
#
|
||||
#---- RESULTS
|
||||
#0,NULL,NULL,0
|
||||
#1,NULL,NULL,1
|
||||
#10,NULL,NULL,NULL
|
||||
#11,false,NULL,NULL
|
||||
#12,true,2,NULL
|
||||
#13,false,3,3
|
||||
#14,true,4,4
|
||||
#15,false,NULL,5
|
||||
#16,NULL,NULL,NULL
|
||||
#17,false,7,7
|
||||
#18,true,8,8
|
||||
#19,false,9,9
|
||||
#2,true,NULL,NULL
|
||||
#20,true,0,0
|
||||
#21,false,1,1
|
||||
#22,true,2,2
|
||||
#23,false,3,NULL
|
||||
#24,true,4,4
|
||||
#25,false,5,5
|
||||
#26,true,6,6
|
||||
#27,false,NULL,7
|
||||
#28,true,8,8
|
||||
#29,false,9,9
|
||||
#3,false,3,NULL
|
||||
#30,NULL,NULL,NULL
|
||||
#4,true,4,4
|
||||
#5,false,5,5
|
||||
#6,true,6,6
|
||||
#7,NULL,NULL,7
|
||||
#8,false,NULL,NULL
|
||||
#9,NULL,NULL,NULL
|
||||
#---- TYPES
|
||||
#int, boolean, tinyint, smallint
|
||||
#====
|
||||
#---- QUERY
|
||||
#select count(*) from functional_text_lzo.bad_text_lzo
|
||||
#---- ERRORS
|
||||
#Blocksize: 536870911 is greater than LZO_MAX_BLOCK_SIZE: 67108864
|
||||
#---- RESULTS
|
||||
#5140
|
||||
#---- TYPES
|
||||
#bigint
|
||||
#====
|
||||
#---- QUERY
|
||||
select * from alltypeserrornonulls
|
||||
---- ERRORS
|
||||
|
||||
@@ -170,11 +182,7 @@ record: 9,errtrue,err9,err9,err9,err90,err9.000000,err90.900000,01/01/09,9,2012-
|
||||
int, boolean, tinyint, smallint, int, bigint, float, double, string, string, timestamp, int, int
|
||||
====
|
||||
---- QUERY
|
||||
select count(*) from functional_text_lzo.bad_text_lzo
|
||||
---- ERRORS
|
||||
Blocksize: 536870911 is greater than LZO_MAX_BLOCK_SIZE: 67108864
|
||||
---- RESULTS
|
||||
5140
|
||||
---- TYPES
|
||||
bigint
|
||||
====
|
||||
select count(*) from functional_text_gzip.bad_text_gzip
|
||||
---- CATCH
|
||||
Unexpected end of file decompressing gzip. File may be malformed.
|
||||
====
|
||||
|
||||
17
testdata/workloads/tpch/tpch_core.csv
vendored
17
testdata/workloads/tpch/tpch_core.csv
vendored
@@ -1,8 +1,9 @@
|
||||
# Manually created file.
|
||||
file_format:text, dataset:tpch, compression_codec:none, compression_type:none
|
||||
file_format:seq, dataset:tpch, compression_codec:gzip, compression_type:block
|
||||
file_format:seq, dataset:tpch, compression_codec:snap, compression_type:block
|
||||
file_format:rc, dataset:tpch, compression_codec:none, compression_type:none
|
||||
file_format:avro, dataset:tpch, compression_codec: none, compression_type: none
|
||||
file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
|
||||
file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
|
||||
# Manually created file.
|
||||
file_format:text, dataset:tpch, compression_codec:none, compression_type:none
|
||||
file_format:text, dataset:tpch, compression_codec:gzip, compression_type:block
|
||||
file_format:seq, dataset:tpch, compression_codec:gzip, compression_type:block
|
||||
file_format:seq, dataset:tpch, compression_codec:snap, compression_type:block
|
||||
file_format:rc, dataset:tpch, compression_codec:none, compression_type:none
|
||||
file_format:avro, dataset:tpch, compression_codec: none, compression_type: none
|
||||
file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
|
||||
file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
|
||||
|
||||
|
1
testdata/workloads/tpch/tpch_exhaustive.csv
vendored
1
testdata/workloads/tpch/tpch_exhaustive.csv
vendored
@@ -1,5 +1,6 @@
|
||||
# Generated File.
|
||||
file_format: text, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: text, dataset: tpch, compression_codec: gzip, compression_type: block
|
||||
file_format: text, dataset: tpch, compression_codec: lzo, compression_type: block
|
||||
file_format: seq, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: seq, dataset: tpch, compression_codec: def, compression_type: block
|
||||
|
||||
|
Reference in New Issue
Block a user