mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
Decimal: Read/Write to parquet.
This adds support for the FIXED_LENGTH_BYTE_ARRAY parquet type and encoding for decimals. Change-Id: I9d5780feb4530989b568ec8d168cbdc32b7039bd Reviewed-on: http://gerrit.ent.cloudera.com:8080/1727 Reviewed-by: Nong Li <nong@cloudera.com> Tested-by: jenkins Reviewed-on: http://gerrit.ent.cloudera.com:8080/2432
This commit is contained in:
@@ -232,11 +232,16 @@ class HdfsParquetScanner::ColumnReader : public HdfsParquetScanner::BaseColumnRe
|
||||
ColumnReader(HdfsParquetScanner* parent, const SlotDescriptor* desc, int file_idx)
|
||||
: BaseColumnReader(parent, desc, file_idx) {
|
||||
DCHECK_NE(desc->type().type, TYPE_BOOLEAN);
|
||||
if (desc->type().type == TYPE_DECIMAL) {
|
||||
fixed_len_size_ = ParquetPlainEncoder::DecimalSize(desc->type());
|
||||
} else {
|
||||
fixed_len_size_ = -1;
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void CreateDictionaryDecoder(uint8_t* values, int size) {
|
||||
dict_decoder_.reset(new DictDecoder<T>(values, size));
|
||||
dict_decoder_.reset(new DictDecoder<T>(values, size, fixed_len_size_));
|
||||
dict_decoder_base_ = dict_decoder_.get();
|
||||
}
|
||||
|
||||
@@ -259,7 +264,8 @@ class HdfsParquetScanner::ColumnReader : public HdfsParquetScanner::BaseColumnRe
|
||||
result = dict_decoder_->GetValue(reinterpret_cast<T*>(slot));
|
||||
} else {
|
||||
DCHECK(page_encoding == parquet::Encoding::PLAIN);
|
||||
data_ += ParquetPlainEncoder::Decode<T>(data_, reinterpret_cast<T*>(slot));
|
||||
data_ += ParquetPlainEncoder::Decode<T>(data_, fixed_len_size_,
|
||||
reinterpret_cast<T*>(slot));
|
||||
if (parent_->scan_node_->requires_compaction()) {
|
||||
CopySlot(reinterpret_cast<T*>(slot), pool);
|
||||
}
|
||||
@@ -274,6 +280,10 @@ class HdfsParquetScanner::ColumnReader : public HdfsParquetScanner::BaseColumnRe
|
||||
}
|
||||
|
||||
scoped_ptr<DictDecoder<T> > dict_decoder_;
|
||||
|
||||
// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY. Unused
|
||||
// otherwise.
|
||||
int fixed_len_size_;
|
||||
};
|
||||
|
||||
template<>
|
||||
@@ -380,6 +390,19 @@ HdfsParquetScanner::BaseColumnReader* HdfsParquetScanner::CreateReader(
|
||||
case TYPE_STRING:
|
||||
reader = new ColumnReader<StringValue>(this, desc, file_idx);
|
||||
break;
|
||||
case TYPE_DECIMAL:
|
||||
switch (desc->type().GetByteSize()) {
|
||||
case 4:
|
||||
reader = new ColumnReader<Decimal4Value>(this, desc, file_idx);
|
||||
break;
|
||||
case 8:
|
||||
reader = new ColumnReader<Decimal8Value>(this, desc, file_idx);
|
||||
break;
|
||||
case 16:
|
||||
reader = new ColumnReader<Decimal16Value>(this, desc, file_idx);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
DCHECK(false);
|
||||
}
|
||||
@@ -1019,8 +1042,9 @@ Status HdfsParquetScanner::ValidateColumn(const SlotDescriptor* slot_desc, int c
|
||||
for (int i = 0; i < encodings.size(); ++i) {
|
||||
if (!IsEncodingSupported(encodings[i])) {
|
||||
stringstream ss;
|
||||
ss << "File " << stream_->filename() << " uses an unsupported encoding: "
|
||||
<< PrintEncoding(encodings[i]) << " for column " << schema_element.name;
|
||||
ss << "File '" << stream_->filename() << "' uses an unsupported encoding: "
|
||||
<< PrintEncoding(encodings[i]) << " for column '" << schema_element.name
|
||||
<< "'.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
}
|
||||
@@ -1030,8 +1054,9 @@ Status HdfsParquetScanner::ValidateColumn(const SlotDescriptor* slot_desc, int c
|
||||
file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
|
||||
file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
|
||||
stringstream ss;
|
||||
ss << "File " << stream_->filename() << " uses an unsupported compression: "
|
||||
<< file_data.meta_data.codec << " for column " << schema_element.name;
|
||||
ss << "File '" << stream_->filename() << "' uses an unsupported compression: "
|
||||
<< file_data.meta_data.codec << " for column '" << schema_element.name
|
||||
<< "'.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
@@ -1039,8 +1064,8 @@ Status HdfsParquetScanner::ValidateColumn(const SlotDescriptor* slot_desc, int c
|
||||
parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
|
||||
if (type != file_data.meta_data.type) {
|
||||
stringstream ss;
|
||||
ss << "File " << stream_->filename() << " has an incompatible type with the"
|
||||
<< " table schema for column " << schema_element.name << ". Expected type: "
|
||||
ss << "File '" << stream_->filename() << "' has an incompatible type with the"
|
||||
<< " table schema for column '" << schema_element.name << "'. Expected type: "
|
||||
<< type << ". Actual type: " << file_data.meta_data.type;
|
||||
return Status(ss.str());
|
||||
}
|
||||
@@ -1049,8 +1074,8 @@ Status HdfsParquetScanner::ValidateColumn(const SlotDescriptor* slot_desc, int c
|
||||
const vector<string> schema_path = file_data.meta_data.path_in_schema;
|
||||
if (schema_path.size() != 1) {
|
||||
stringstream ss;
|
||||
ss << "File " << stream_->filename() << " contains a nested schema for column "
|
||||
<< schema_element.name << ". This is currently not supported.";
|
||||
ss << "File '" << stream_->filename() << "' contains a nested schema for column '"
|
||||
<< schema_element.name << "'. This is currently not supported.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
@@ -1058,11 +1083,66 @@ Status HdfsParquetScanner::ValidateColumn(const SlotDescriptor* slot_desc, int c
|
||||
if (schema_element.repetition_type != parquet::FieldRepetitionType::OPTIONAL &&
|
||||
schema_element.repetition_type != parquet::FieldRepetitionType::REQUIRED) {
|
||||
stringstream ss;
|
||||
ss << "File " << stream_->filename() << " column " << schema_element.name
|
||||
<< " contains an unsupported column repetition type: "
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' contains an unsupported column repetition type: "
|
||||
<< schema_element.repetition_type;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
// Check the decimal scale in the file matches the metastore scale and precision.
|
||||
if (schema_element.__isset.scale || schema_element.__isset.precision) {
|
||||
if (slot_desc->type().type != TYPE_DECIMAL) {
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< " has scale and precision set but the table metadata column type is not "
|
||||
<< " DECIMAL.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (!schema_element.__isset.scale) {
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' does not have the scale set.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
if (!schema_element.__isset.precision) {
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' does not have the precision set.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (schema_element.scale != slot_desc->type().scale) {
|
||||
// TODO: we could allow a mismatch and do a conversion at this step.
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' has a scale that does not match the table metadata scale."
|
||||
<< " File metadata scale: " << schema_element.scale
|
||||
<< " Table metadata scale: " << slot_desc->type().scale;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (schema_element.precision != slot_desc->type().precision) {
|
||||
// TODO: we could allow a mismatch and do a conversion at this step.
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' has a precision that does not match the table metadata precision."
|
||||
<< " File metadata precision: " << schema_element.precision
|
||||
<< " Table metadata precision: " << slot_desc->type().precision;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
if (!schema_element.__isset.type_length) {
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' does not have type_length set.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
} else if (slot_desc->type().type == TYPE_DECIMAL) {
|
||||
stringstream ss;
|
||||
ss << "File '" << stream_->filename() << "' column '" << schema_element.name
|
||||
<< "' should contain decimal data but the scale and precision are not specified.";
|
||||
return Status(ss.str());
|
||||
}
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
#include "common/version.h"
|
||||
#include "exprs/expr.h"
|
||||
#include "runtime/decimal-value.h"
|
||||
#include "runtime/raw-value.h"
|
||||
#include "runtime/row-batch.h"
|
||||
#include "runtime/runtime-state.h"
|
||||
@@ -236,6 +237,7 @@ class HdfsParquetTableWriter::ColumnWriter :
|
||||
const THdfsCompression::type& codec) : BaseColumnWriter(parent, expr, codec),
|
||||
num_values_since_dict_size_check_(0) {
|
||||
DCHECK_NE(expr->type().type, TYPE_BOOLEAN);
|
||||
encoded_value_size_ = ParquetPlainEncoder::ByteSize(expr->type());
|
||||
}
|
||||
|
||||
virtual void Reset() {
|
||||
@@ -243,7 +245,8 @@ class HdfsParquetTableWriter::ColumnWriter :
|
||||
// Default to dictionary encoding. If the cardinality ends up being too high,
|
||||
// it will fall back to plain.
|
||||
current_encoding_ = Encoding::PLAIN_DICTIONARY;
|
||||
dict_encoder_.reset(new DictEncoder<T>(parent_->per_file_mem_pool_.get()));
|
||||
dict_encoder_.reset(
|
||||
new DictEncoder<T>(parent_->per_file_mem_pool_.get(), encoded_value_size_));
|
||||
dict_encoder_base_ = dict_encoder_.get();
|
||||
}
|
||||
|
||||
@@ -267,12 +270,14 @@ class HdfsParquetTableWriter::ColumnWriter :
|
||||
}
|
||||
} else if (current_encoding_ == Encoding::PLAIN) {
|
||||
T* v = reinterpret_cast<T*>(value);
|
||||
int encoded_len = ParquetPlainEncoder::ByteSize<T>(*v);
|
||||
int encoded_len = encoded_value_size_ < 0 ?
|
||||
ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_;
|
||||
if (current_page_->header.uncompressed_page_size + encoded_len > DATA_PAGE_SIZE) {
|
||||
return false;
|
||||
}
|
||||
uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
|
||||
int written_len = ParquetPlainEncoder::Encode(dst_ptr, *v);
|
||||
int written_len =
|
||||
ParquetPlainEncoder::Encode(dst_ptr, encoded_value_size_, *v);
|
||||
DCHECK_EQ(encoded_len, written_len);
|
||||
*bytes_added += written_len;
|
||||
current_page_->header.uncompressed_page_size += encoded_len;
|
||||
@@ -298,6 +303,9 @@ class HdfsParquetTableWriter::ColumnWriter :
|
||||
|
||||
// The number of values added since we last checked the dictionary.
|
||||
int num_values_since_dict_size_check_;
|
||||
|
||||
// Size of each encoded value. -1 if the size is type is variable-length.
|
||||
int encoded_value_size_;
|
||||
};
|
||||
|
||||
// Bools are encoded a bit differently so subclass it explicitly.
|
||||
@@ -646,6 +654,21 @@ Status HdfsParquetTableWriter::Init() {
|
||||
case TYPE_STRING:
|
||||
writer = new ColumnWriter<StringValue>(this, output_exprs_[i], codec);
|
||||
break;
|
||||
case TYPE_DECIMAL:
|
||||
switch (output_exprs_[i]->type().GetByteSize()) {
|
||||
case 4:
|
||||
writer = new ColumnWriter<Decimal4Value>(this, output_exprs_[i], codec);
|
||||
break;
|
||||
case 8:
|
||||
writer = new ColumnWriter<Decimal8Value>(this, output_exprs_[i], codec);
|
||||
break;
|
||||
case 16:
|
||||
writer = new ColumnWriter<Decimal16Value>(this, output_exprs_[i], codec);
|
||||
break;
|
||||
default:
|
||||
DCHECK(false);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
DCHECK(false);
|
||||
}
|
||||
@@ -669,6 +692,16 @@ Status HdfsParquetTableWriter::CreateSchema() {
|
||||
node.name = table_desc_->col_names()[i + num_clustering_cols];
|
||||
node.__set_type(IMPALA_TO_PARQUET_TYPES[output_exprs_[i]->type().type]);
|
||||
node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
|
||||
if (output_exprs_[i]->type().type == TYPE_DECIMAL) {
|
||||
// This column is type decimal. Update the file metadata to include the
|
||||
// additional fields:
|
||||
// 1) type_length: the number of bytes used per decimal value in the data
|
||||
// 2) precision/scale
|
||||
node.__set_type_length(
|
||||
ParquetPlainEncoder::DecimalSize(output_exprs_[i]->type()));
|
||||
node.__set_scale(output_exprs_[i]->type().scale);
|
||||
node.__set_precision(output_exprs_[i]->type().precision);
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK;
|
||||
|
||||
@@ -18,7 +18,9 @@
|
||||
|
||||
#include "gen-cpp/Descriptors_types.h"
|
||||
#include "gen-cpp/parquet_types.h"
|
||||
#include "runtime/decimal-value.h"
|
||||
#include "runtime/string-value.h"
|
||||
#include "util/bit-util.h"
|
||||
|
||||
// This file contains common elements between the parquet Writer and Scanner.
|
||||
namespace impala {
|
||||
@@ -41,7 +43,11 @@ const parquet::Type::type IMPALA_TO_PARQUET_TYPES[] = {
|
||||
parquet::Type::FLOAT,
|
||||
parquet::Type::DOUBLE,
|
||||
parquet::Type::INT96, // Timestamp
|
||||
parquet::Type::BYTE_ARRAY,
|
||||
parquet::Type::BYTE_ARRAY, // String
|
||||
parquet::Type::BYTE_ARRAY, // Date, NYI
|
||||
parquet::Type::BYTE_ARRAY, // DateTime, NYI
|
||||
parquet::Type::BYTE_ARRAY, // Binary NYI
|
||||
parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
|
||||
};
|
||||
|
||||
// Mapping of Parquet codec enums to Impala enums
|
||||
@@ -65,32 +71,140 @@ const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
|
||||
|
||||
// The plain encoding does not maintain any state so all these functions
|
||||
// are static helpers.
|
||||
// TODO: we are using templates to provide a generic interface (over the
|
||||
// types) to avoid performance penalities. This makes the code more complex
|
||||
// and should be removed when we have codegen support to inline virtual
|
||||
// calls.
|
||||
class ParquetPlainEncoder {
|
||||
public:
|
||||
// Returns the byte size of 'v'.
|
||||
template<typename T>
|
||||
static int ByteSize(const T& v) { return sizeof(T); }
|
||||
|
||||
// Returns the encoded size of values of type t. Returns -1 if it is variable
|
||||
// length. This can be different than the slot size of the types.
|
||||
static int ByteSize(const ColumnType& t) {
|
||||
switch (t.type) {
|
||||
case TYPE_STRING:
|
||||
return -1;
|
||||
case TYPE_TINYINT:
|
||||
case TYPE_SMALLINT:
|
||||
case TYPE_INT:
|
||||
case TYPE_FLOAT:
|
||||
return 4;
|
||||
case TYPE_BIGINT:
|
||||
case TYPE_DOUBLE:
|
||||
return 8;
|
||||
case TYPE_TIMESTAMP:
|
||||
return 12;
|
||||
case TYPE_DECIMAL:
|
||||
return DecimalSize(t);
|
||||
|
||||
case TYPE_NULL:
|
||||
case TYPE_BOOLEAN: // These types are not plain encoded.
|
||||
default:
|
||||
DCHECK(false);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// The minimum byte size to store decimals of with precision t.precision.
|
||||
static int DecimalSize(const ColumnType& t) {
|
||||
DCHECK(t.type == TYPE_DECIMAL);
|
||||
// Numbers in the comment is the max positive value that can be represented
|
||||
// with those number of bits (max negative is -(X + 1)).
|
||||
// TODO: use closed form for this?
|
||||
switch (t.precision) {
|
||||
case 1: case 2:
|
||||
return 1; // 127
|
||||
case 3: case 4:
|
||||
return 2; // 32,767
|
||||
case 5: case 6:
|
||||
return 3; // 8,388,607
|
||||
case 7: case 8: case 9:
|
||||
return 4; // 2,147,483,427
|
||||
case 10: case 11:
|
||||
return 5; // 549,755,813,887
|
||||
case 12: case 13: case 14:
|
||||
return 6; // 140,737,488,355,327
|
||||
case 15: case 16:
|
||||
return 7; // 36,028,797,018,963,967
|
||||
case 17: case 18:
|
||||
return 8; // 9,223,372,036,854,775,807
|
||||
case 19: case 20: case 21:
|
||||
return 9; // 2,361,183,241,434,822,606,847
|
||||
case 22: case 23:
|
||||
return 10; // 604,462,909,807,314,587,353,087
|
||||
case 24: case 25: case 26:
|
||||
return 11; // 154,742,504,910,672,534,362,390,527
|
||||
case 27: case 28:
|
||||
return 12; // 39,614,081,257,132,168,796,771,975,167
|
||||
case 29: case 30: case 31:
|
||||
return 13; // 10,141,204,801,825,835,211,973,625,643,007
|
||||
case 32: case 33:
|
||||
return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
|
||||
case 34: case 35:
|
||||
return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
|
||||
case 36: case 37: case 38:
|
||||
return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
|
||||
default:
|
||||
DCHECK(false);
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Encodes t into buffer. Returns the number of bytes added. buffer must
|
||||
// be preallocated and big enough. Buffer need not be aligned.
|
||||
// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and
|
||||
// is the number of bytes the plain encoder should use.
|
||||
template<typename T>
|
||||
static int Encode(uint8_t* buffer, const T& t) {
|
||||
static int Encode(uint8_t* buffer, int fixed_len_size, const T& t) {
|
||||
memcpy(buffer, &t, ByteSize(t));
|
||||
return ByteSize(t);
|
||||
}
|
||||
|
||||
// Decodes t from buffer. Returns the number of bytes read. Buffer need
|
||||
// not be aligned.
|
||||
// For types that are stored as FIXED_LEN_BYTE_ARRAY, fixed_len_size is the size
|
||||
// of the object. Otherwise, it is unused.
|
||||
template<typename T>
|
||||
static int Decode(uint8_t* buffer, T* v) {
|
||||
static int Decode(uint8_t* buffer, int fixed_len_size, T* v) {
|
||||
memcpy(v, buffer, ByteSize(*v));
|
||||
return ByteSize(*v);
|
||||
}
|
||||
|
||||
// Encode 't', which must be in the machine endian, to FIXED_LEN_BYTE_ARRAY
|
||||
// of 'fixed_len_size'. The result is encoded as big endian.
|
||||
template <typename T>
|
||||
static int EncodeToFixedLenByteArray(uint8_t* buffer, int fixed_len_size, const T& t);
|
||||
|
||||
// Decodes into v assuming buffer is encoded using FIXED_LEN_BYTE_ARRAY of
|
||||
// 'fixed_len_size'. The bytes in buffer must be big endian and the result stored in
|
||||
// v is the machine endian format.
|
||||
template<typename T>
|
||||
static int DecodeFromFixedLenByteArray(uint8_t* buffer, int fixed_len_size, T* v);
|
||||
};
|
||||
|
||||
// Disable for bools. Plain encoding is not used for booleans.
|
||||
template<> int ParquetPlainEncoder::ByteSize(const bool& b);
|
||||
template<> int ParquetPlainEncoder::Encode(uint8_t*, const bool&);
|
||||
template<> int ParquetPlainEncoder::Decode(uint8_t*, bool* v);
|
||||
template<> int ParquetPlainEncoder::Encode(uint8_t*, int fixed_len_size, const bool&);
|
||||
template<> int ParquetPlainEncoder::Decode(uint8_t*, int fixed_len_size, bool* v);
|
||||
|
||||
// Not used for decimals since the plain encoding encodes them using
|
||||
// FIXED_LEN_BYTE_ARRAY.
|
||||
template<> inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
|
||||
DCHECK(false);
|
||||
return -1;
|
||||
}
|
||||
template<> inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
|
||||
DCHECK(false);
|
||||
return -1;
|
||||
}
|
||||
template<> inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
|
||||
DCHECK(false);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
|
||||
template<>
|
||||
@@ -98,31 +212,6 @@ inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return sizeof(int32_
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return sizeof(int32_t); }
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int8_t* v) {
|
||||
*v = *buffer;
|
||||
return ByteSize(*v);
|
||||
}
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int16_t* v) {
|
||||
memcpy(v, buffer, sizeof(int16_t));
|
||||
return ByteSize(*v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(uint8_t* buffer, const int8_t& v) {
|
||||
int32_t val = v;
|
||||
memcpy(buffer, &val, sizeof(int32_t));
|
||||
return ByteSize(v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(uint8_t* buffer, const int16_t& v) {
|
||||
int32_t val = v;
|
||||
memcpy(buffer, &val, sizeof(int32_t));
|
||||
return ByteSize(v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::ByteSize(const StringValue& v) {
|
||||
return sizeof(int32_t) + v.len;
|
||||
@@ -134,19 +223,137 @@ inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(uint8_t* buffer, const StringValue& v) {
|
||||
inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int fixed_len_size, int8_t* v) {
|
||||
*v = *buffer;
|
||||
return ByteSize(*v);
|
||||
}
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int fixed_len_size, int16_t* v) {
|
||||
memcpy(v, buffer, sizeof(int16_t));
|
||||
return ByteSize(*v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const int8_t& v) {
|
||||
int32_t val = v;
|
||||
memcpy(buffer, &val, sizeof(int32_t));
|
||||
return ByteSize(v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const int16_t& v) {
|
||||
int32_t val = v;
|
||||
memcpy(buffer, &val, sizeof(int32_t));
|
||||
return ByteSize(v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const StringValue& v) {
|
||||
memcpy(buffer, &v.len, sizeof(int32_t));
|
||||
memcpy(buffer + sizeof(int32_t), v.ptr, v.len);
|
||||
return ByteSize(v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(uint8_t* buffer, StringValue* v) {
|
||||
inline int ParquetPlainEncoder::Decode(
|
||||
uint8_t* buffer, int fixed_len_size, StringValue* v) {
|
||||
memcpy(&v->len, buffer, sizeof(int32_t));
|
||||
v->ptr = reinterpret_cast<char*>(buffer) + sizeof(int32_t);
|
||||
return ByteSize(*v);
|
||||
}
|
||||
|
||||
// Write decimals as big endian (byte comparable) to benefit from common prefixes.
|
||||
// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. This means
|
||||
// that the value in the in-memory format has leading zeros or negative 1's.
|
||||
// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value
|
||||
// will have 3 bytes of leading zeros, we will only store the interesting byte.
|
||||
template<typename T>
|
||||
inline int ParquetPlainEncoder::EncodeToFixedLenByteArray(
|
||||
uint8_t* buffer, int fixed_len_size, const T& v) {
|
||||
DCHECK_GT(fixed_len_size, 0);
|
||||
DCHECK_LE(fixed_len_size, sizeof(T));
|
||||
const int8_t* skipped_bytes_start = NULL;
|
||||
#if __BYTE_ORDER == __LITTLE_ENDIAN
|
||||
BitUtil::ByteSwap(buffer, &v, fixed_len_size);
|
||||
skipped_bytes_start = reinterpret_cast<const int8_t*>(&v) + fixed_len_size;
|
||||
#else
|
||||
memcpy(buffer, &v + sizeof(T) - fixed_len_size, fixed_len_size);
|
||||
skipped_bytes_start = reinterpret_cast<const int8_t*>(&v);
|
||||
#endif
|
||||
|
||||
#ifndef NDEBUG
|
||||
// On debug, verify that the skipped bytes are what we expect.
|
||||
for (int i = 0; i < sizeof(T) - fixed_len_size; ++i) {
|
||||
DCHECK_EQ(skipped_bytes_start[i], v.value() < 0 ? -1 : 0);
|
||||
}
|
||||
#endif
|
||||
return fixed_len_size;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
inline int ParquetPlainEncoder::DecodeFromFixedLenByteArray(
|
||||
uint8_t* buffer, int fixed_len_size, T* v) {
|
||||
DCHECK_GT(fixed_len_size, 0);
|
||||
DCHECK_LE(fixed_len_size, sizeof(T));
|
||||
*v = 0;
|
||||
// We need to sign extend val. For example, if the original value was
|
||||
// -1, the original bytes were -1,-1,-1,-1. If we only wrote out 1 byte, after
|
||||
// the encode step above, val would contain (-1, 0, 0, 0). We need to sign
|
||||
// extend the remaining 3 bytes to get the original value.
|
||||
// We do this by filling in the most significant bytes and (arithmetic) bit
|
||||
// shifting down.
|
||||
int bytes_to_fill = sizeof(T) - fixed_len_size;
|
||||
#if __BYTE_ORDER == __LITTLE_ENDIAN
|
||||
BitUtil::ByteSwap(reinterpret_cast<int8_t*>(v) + bytes_to_fill, buffer, fixed_len_size);
|
||||
#else
|
||||
memcpy(v, buffer, fixed_len_size);
|
||||
#endif
|
||||
*v >>= (bytes_to_fill * 8);
|
||||
return fixed_len_size;
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const Decimal4Value& v) {
|
||||
return EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const Decimal8Value& v) {
|
||||
return EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Encode(
|
||||
uint8_t* buffer, int fixed_len_size, const Decimal16Value& v) {
|
||||
return EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(
|
||||
uint8_t* buffer, int fixed_len_size, Decimal4Value* v) {
|
||||
return DecodeFromFixedLenByteArray(
|
||||
buffer, fixed_len_size, reinterpret_cast<int32_t*>(v));
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(
|
||||
uint8_t* buffer, int fixed_len_size, Decimal8Value* v) {
|
||||
return DecodeFromFixedLenByteArray(
|
||||
buffer, fixed_len_size, reinterpret_cast<int64_t*>(v));
|
||||
}
|
||||
|
||||
template<>
|
||||
inline int ParquetPlainEncoder::Decode(
|
||||
uint8_t* buffer, int fixed_len_size, Decimal16Value* v) {
|
||||
return DecodeFromFixedLenByteArray(
|
||||
buffer, fixed_len_size, reinterpret_cast<int128_t*>(v));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <limits.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "exec/parquet-common.h"
|
||||
#include "runtime/decimal-value.h"
|
||||
#include "runtime/string-value.inline.h"
|
||||
#include "runtime/timestamp-value.h"
|
||||
|
||||
@@ -28,12 +29,11 @@ namespace impala {
|
||||
template <typename T>
|
||||
void TestType(const T& v, int expected_byte_size) {
|
||||
uint8_t buffer[expected_byte_size];
|
||||
EXPECT_EQ(ParquetPlainEncoder::ByteSize(v), expected_byte_size);
|
||||
int encoded_size = ParquetPlainEncoder::Encode(buffer, v);
|
||||
int encoded_size = ParquetPlainEncoder::Encode(buffer, expected_byte_size, v);
|
||||
EXPECT_EQ(encoded_size, expected_byte_size);
|
||||
|
||||
T result;
|
||||
int decoded_size = ParquetPlainEncoder::Decode(buffer, &result);
|
||||
int decoded_size = ParquetPlainEncoder::Decode(buffer, expected_byte_size, &result);
|
||||
EXPECT_EQ(decoded_size, expected_byte_size);
|
||||
EXPECT_EQ(result, v);
|
||||
}
|
||||
@@ -56,6 +56,69 @@ TEST(PlainEncoding, Basic) {
|
||||
TestType(d, sizeof(double));
|
||||
TestType(sv, sizeof(int32_t) + sv.len);
|
||||
TestType(tv, 12);
|
||||
|
||||
TestType(Decimal4Value(1234), sizeof(Decimal4Value));
|
||||
TestType(Decimal4Value(-1234), sizeof(Decimal4Value));
|
||||
|
||||
TestType(Decimal8Value(1234), sizeof(Decimal8Value));
|
||||
TestType(Decimal8Value(-1234), sizeof(Decimal8Value));
|
||||
TestType(Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
|
||||
TestType(Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
|
||||
|
||||
TestType(Decimal16Value(1234), 16);
|
||||
TestType(Decimal16Value(-1234), 16);
|
||||
TestType(Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
|
||||
TestType(Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
|
||||
TestType(Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
|
||||
TestType(Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
|
||||
|
||||
// two digit values can be encoded with any byte size.
|
||||
for (int i = 1; i <=16; ++i) {
|
||||
if (i <= 4) {
|
||||
TestType(Decimal4Value(i), i);
|
||||
TestType(Decimal4Value(-i), i);
|
||||
}
|
||||
if (i <= 8) {
|
||||
TestType(Decimal8Value(i), i);
|
||||
TestType(Decimal8Value(-i), i);
|
||||
}
|
||||
TestType(Decimal16Value(i), i);
|
||||
TestType(Decimal16Value(-i), i);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(PlainEncoding, DecimalBigEndian) {
|
||||
// Test Basic can pass if we make the same error in encode and decode.
|
||||
// Verify the bytes are actually big endian.
|
||||
uint8_t buffer[] = {
|
||||
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
|
||||
};
|
||||
|
||||
// Manually generate this to avoid potential bugs in BitUtil
|
||||
uint8_t buffer_swapped[] = {
|
||||
15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
|
||||
};
|
||||
uint8_t result_buffer[16];
|
||||
|
||||
Decimal4Value d4;
|
||||
Decimal8Value d8;
|
||||
Decimal16Value d16;
|
||||
|
||||
memcpy(&d4, buffer, sizeof(d4));
|
||||
memcpy(&d8, buffer, sizeof(d8));
|
||||
memcpy(&d16, buffer, sizeof(d16));
|
||||
|
||||
int size = ParquetPlainEncoder::Encode(result_buffer, sizeof(d4), d4);
|
||||
DCHECK_EQ(size, sizeof(d4));
|
||||
DCHECK_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d4), sizeof(d4)), 0);
|
||||
|
||||
size = ParquetPlainEncoder::Encode(result_buffer, sizeof(d8), d8);
|
||||
DCHECK_EQ(size, sizeof(d8));
|
||||
DCHECK_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d8), sizeof(d8)), 0);
|
||||
|
||||
size = ParquetPlainEncoder::Encode(result_buffer, sizeof(d16), d16);
|
||||
DCHECK_EQ(size, sizeof(d16));
|
||||
DCHECK_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d16), sizeof(d16)), 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "common/logging.h"
|
||||
#include "runtime/multi-precision.h"
|
||||
#include "util/decimal-util.h"
|
||||
#include "util/hash-util.h"
|
||||
|
||||
namespace impala {
|
||||
|
||||
@@ -211,6 +212,7 @@ class DecimalValue {
|
||||
// Returns the underlying storage. For a particular storage size, there is
|
||||
// only one representation for any decimal and the storage is directly comparable.
|
||||
const T& value() const { return value_; }
|
||||
T& value() { return value_; }
|
||||
|
||||
// Returns the value of the decimal before the decimal point.
|
||||
const T whole_part(const ColumnType& t) const {
|
||||
@@ -228,6 +230,10 @@ class DecimalValue {
|
||||
return static_cast<double>(value_) / powf(10.0, type.scale);
|
||||
}
|
||||
|
||||
inline uint32_t Hash(int seed = 0) const {
|
||||
return HashUtil::Hash(&value_, sizeof(value_), seed);
|
||||
}
|
||||
|
||||
std::string ToString(const ColumnType& type) const;
|
||||
|
||||
private:
|
||||
@@ -292,6 +298,17 @@ inline std::ostream& operator<<(std::ostream& os, const Decimal16Value& d) {
|
||||
return os << d.value();
|
||||
}
|
||||
|
||||
// This function must be called 'hash_value' to be picked up by boost.
|
||||
inline std::size_t hash_value(const Decimal4Value& v) {
|
||||
return v.Hash();
|
||||
}
|
||||
inline std::size_t hash_value(const Decimal8Value& v) {
|
||||
return v.Hash();
|
||||
}
|
||||
inline std::size_t hash_value(const Decimal16Value& v) {
|
||||
return v.Hash();
|
||||
}
|
||||
|
||||
// For comparisons, we need the intermediate to be at the next precision
|
||||
// to avoid overflows.
|
||||
// TODO: is there a more efficient way to do this?
|
||||
|
||||
@@ -104,40 +104,67 @@ class BitUtil {
|
||||
return static_cast<uint16_t>(ByteSwap(static_cast<int16_t>(value)));
|
||||
}
|
||||
|
||||
// Write the swapped bytes into dst. len must be 1, 2, 4 or 8.
|
||||
static inline void ByteSwap(void* dst, void* src, int len) {
|
||||
// Write the swapped bytes into dst. Src and st cannot overlap.
|
||||
static inline void ByteSwap(void* dst, const void* src, int len) {
|
||||
switch (len) {
|
||||
case 1:
|
||||
*reinterpret_cast<int8_t*>(dst) = *reinterpret_cast<int8_t*>(src);
|
||||
break;
|
||||
*reinterpret_cast<int8_t*>(dst) = *reinterpret_cast<const int8_t*>(src);
|
||||
return;
|
||||
case 2:
|
||||
*reinterpret_cast<int16_t*>(dst) = ByteSwap(*reinterpret_cast<int16_t*>(src));
|
||||
break;
|
||||
*reinterpret_cast<int16_t*>(dst) =
|
||||
ByteSwap(*reinterpret_cast<const int16_t*>(src));
|
||||
return;
|
||||
case 4:
|
||||
*reinterpret_cast<int32_t*>(dst) = ByteSwap(*reinterpret_cast<int32_t*>(src));
|
||||
break;
|
||||
*reinterpret_cast<int32_t*>(dst) =
|
||||
ByteSwap(*reinterpret_cast<const int32_t*>(src));
|
||||
return;
|
||||
case 8:
|
||||
*reinterpret_cast<int64_t*>(dst) = ByteSwap(*reinterpret_cast<int64_t*>(src));
|
||||
break;
|
||||
default: DCHECK(false);
|
||||
*reinterpret_cast<int64_t*>(dst) =
|
||||
ByteSwap(*reinterpret_cast<const int64_t*>(src));
|
||||
return;
|
||||
default: break;
|
||||
}
|
||||
|
||||
uint8_t* d = reinterpret_cast<uint8_t*>(dst);
|
||||
const uint8_t* s = reinterpret_cast<const uint8_t*>(src);
|
||||
for (int i = 0; i < len; ++i) {
|
||||
d[i] = s[len - i - 1];
|
||||
}
|
||||
}
|
||||
|
||||
// Converts to big endian format (if not already in big endian) from the
|
||||
// machine's native endian format.
|
||||
#if __BYTE_ORDER == __LITTLE_ENDIAN
|
||||
// Converts to big endian format (if not already in big endian).
|
||||
static inline int64_t BigEndian(int64_t value) { return ByteSwap(value); }
|
||||
static inline uint64_t BigEndian(uint64_t value) { return ByteSwap(value); }
|
||||
static inline int32_t BigEndian(int32_t value) { return ByteSwap(value); }
|
||||
static inline uint32_t BigEndian(uint32_t value) { return ByteSwap(value); }
|
||||
static inline int16_t BigEndian(int16_t value) { return ByteSwap(value); }
|
||||
static inline uint16_t BigEndian(uint16_t value) { return ByteSwap(value); }
|
||||
static inline int64_t ToBigEndian(int64_t value) { return ByteSwap(value); }
|
||||
static inline uint64_t ToBigEndian(uint64_t value) { return ByteSwap(value); }
|
||||
static inline int32_t ToBigEndian(int32_t value) { return ByteSwap(value); }
|
||||
static inline uint32_t ToBigEndian(uint32_t value) { return ByteSwap(value); }
|
||||
static inline int16_t ToBigEndian(int16_t value) { return ByteSwap(value); }
|
||||
static inline uint16_t ToBigEndian(uint16_t value) { return ByteSwap(value); }
|
||||
#else
|
||||
static inline int64_t BigEndian(int64_t val) { return val; }
|
||||
static inline uint64_t BigEndian(uint64_t val) { return val; }
|
||||
static inline int32_t BigEndian(int32_t val) { return val; }
|
||||
static inline uint32_t BigEndian(uint32_t val) { return val; }
|
||||
static inline int16_t BigEndian(int16_t val) { return val; }
|
||||
static inline uint16_t BigEndian(uint16_t val) { return val; }
|
||||
static inline int64_t ToBigEndian(int64_t val) { return val; }
|
||||
static inline uint64_t ToBigEndian(uint64_t val) { return val; }
|
||||
static inline int32_t ToBigEndian(int32_t val) { return val; }
|
||||
static inline uint32_t ToBigEndian(uint32_t val) { return val; }
|
||||
static inline int16_t ToBigEndian(int16_t val) { return val; }
|
||||
static inline uint16_t ToBigEndian(uint16_t val) { return val; }
|
||||
#endif
|
||||
|
||||
// Converts from big endian format to the machine's native endian format.
|
||||
#if __BYTE_ORDER == __LITTLE_ENDIAN
|
||||
static inline int64_t FromBigEndian(int64_t value) { return ByteSwap(value); }
|
||||
static inline uint64_t FromBigEndian(uint64_t value) { return ByteSwap(value); }
|
||||
static inline int32_t FromBigEndian(int32_t value) { return ByteSwap(value); }
|
||||
static inline uint32_t FromBigEndian(uint32_t value) { return ByteSwap(value); }
|
||||
static inline int16_t FromBigEndian(int16_t value) { return ByteSwap(value); }
|
||||
static inline uint16_t FromBigEndian(uint16_t value) { return ByteSwap(value); }
|
||||
#else
|
||||
static inline int64_t FromBigEndian(int64_t val) { return val; }
|
||||
static inline uint64_t FromBigEndian(uint64_t val) { return val; }
|
||||
static inline int32_t FromBigEndian(int32_t val) { return val; }
|
||||
static inline uint32_t FromBigEndian(uint32_t val) { return val; }
|
||||
static inline int16_t FromBigEndian(int16_t val) { return val; }
|
||||
static inline uint16_t FromBigEndian(uint16_t val) { return val; }
|
||||
#endif
|
||||
|
||||
};
|
||||
|
||||
@@ -42,6 +42,9 @@ namespace impala {
|
||||
// abstracts over the actual dictionary type.
|
||||
// Note: it does not provide a virtual Put(). Users are expected to know the subclass
|
||||
// type when using Put().
|
||||
// TODO: once we can easily remove virtual calls with codegen, this interface can
|
||||
// rely less on templating and be easier to follow. The type should be passed in
|
||||
// as an argument rather than template argument.
|
||||
class DictEncoderBase {
|
||||
public:
|
||||
virtual ~DictEncoderBase() {
|
||||
@@ -98,7 +101,9 @@ class DictEncoderBase {
|
||||
template<typename T>
|
||||
class DictEncoder : public DictEncoderBase {
|
||||
public:
|
||||
DictEncoder(MemPool* pool) : DictEncoderBase(pool) { }
|
||||
DictEncoder(MemPool* pool, int encoded_value_size) :
|
||||
DictEncoderBase(pool), encoded_value_size_(encoded_value_size) {
|
||||
}
|
||||
|
||||
// Encode value. Returns the number of bytes added to the dictionary page length (will
|
||||
// be 0 if this value is already in the dictionary). Note that this does not actually
|
||||
@@ -119,6 +124,9 @@ class DictEncoder : public DictEncoderBase {
|
||||
// write dictionary page.
|
||||
std::vector<T> dict_;
|
||||
|
||||
// Size of each encoded dictionary value. -1 for variable-length types.
|
||||
int encoded_value_size_;
|
||||
|
||||
// Adds value to dict_ and updates dict_encoded_size_. Returns the
|
||||
// number of bytes added to dict_encoded_size_.
|
||||
// *index is the output parameter and is the index into the dict_ for 'value'
|
||||
@@ -153,7 +161,9 @@ class DictDecoder : public DictDecoderBase {
|
||||
// of dict_buffer.
|
||||
// For string data, the decoder returns StringValues with data directly from
|
||||
// dict_buffer (i.e. no copies).
|
||||
DictDecoder(uint8_t* dict_buffer, int dict_len);
|
||||
// fixed_len_size is the size that must be passed to decode fixed-length
|
||||
// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
|
||||
DictDecoder(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
|
||||
|
||||
virtual int num_entries() const { return dict_.size(); }
|
||||
|
||||
@@ -182,12 +192,12 @@ inline int DictEncoder<T>::Put(const T& value) {
|
||||
|
||||
template<typename T>
|
||||
inline int DictEncoder<T>::AddToDict(const T& value, int* index) {
|
||||
DCHECK_GT(encoded_value_size_, 0);
|
||||
*index = dict_index_.size();
|
||||
dict_index_[value] = *index;
|
||||
dict_.push_back(value);
|
||||
int bytes_added = ParquetPlainEncoder::ByteSize(value);
|
||||
dict_encoded_size_ += bytes_added;
|
||||
return bytes_added;
|
||||
dict_encoded_size_ += encoded_value_size_;
|
||||
return encoded_value_size_;
|
||||
}
|
||||
|
||||
template<>
|
||||
@@ -217,7 +227,7 @@ inline bool DictDecoder<T>::GetValue(T* value) {
|
||||
template<typename T>
|
||||
inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
|
||||
BOOST_FOREACH(const T& value, dict_) {
|
||||
buffer += ParquetPlainEncoder::Encode(buffer, value);
|
||||
buffer += ParquetPlainEncoder::Encode(buffer, encoded_value_size_, value);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,11 +246,13 @@ inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
inline DictDecoder<T>::DictDecoder(uint8_t* dict_buffer, int dict_len) {
|
||||
inline DictDecoder<T>::DictDecoder(uint8_t* dict_buffer, int dict_len,
|
||||
int fixed_len_size) {
|
||||
uint8_t* end = dict_buffer + dict_len;
|
||||
while (dict_buffer < end) {
|
||||
T value;
|
||||
dict_buffer += ParquetPlainEncoder::Decode(dict_buffer, &value);
|
||||
dict_buffer +=
|
||||
ParquetPlainEncoder::Decode(dict_buffer, fixed_len_size, &value);
|
||||
dict_.push_back(value);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,12 +30,12 @@ using namespace std;
|
||||
namespace impala {
|
||||
|
||||
template<typename T>
|
||||
void ValidateDict(const vector<T>& values) {
|
||||
void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size) {
|
||||
set<T> values_set(values.begin(), values.end());
|
||||
|
||||
MemTracker tracker;
|
||||
MemPool pool(&tracker);
|
||||
DictEncoder<T> encoder(&pool);
|
||||
DictEncoder<T> encoder(&pool, fixed_buffer_byte_size);
|
||||
BOOST_FOREACH(T i, values) {
|
||||
encoder.Put(i);
|
||||
}
|
||||
@@ -50,7 +50,8 @@ void ValidateDict(const vector<T>& values) {
|
||||
EXPECT_GT(data_len, 0);
|
||||
encoder.ClearIndices();
|
||||
|
||||
DictDecoder<T> decoder(dict_buffer, encoder.dict_encoded_size());
|
||||
DictDecoder<T> decoder(
|
||||
dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size);
|
||||
decoder.SetData(data_buffer, data_len);
|
||||
BOOST_FOREACH(T i, values) {
|
||||
T j;
|
||||
@@ -79,7 +80,7 @@ TEST(DictTest, TestStrings) {
|
||||
values.push_back(sv3);
|
||||
values.push_back(sv4);
|
||||
|
||||
ValidateDict(values);
|
||||
ValidateDict(values, -1);
|
||||
}
|
||||
|
||||
TEST(DictTest, TestTimestamps) {
|
||||
@@ -95,35 +96,48 @@ TEST(DictTest, TestTimestamps) {
|
||||
values.push_back(tv1);
|
||||
values.push_back(tv1);
|
||||
|
||||
ValidateDict(values);
|
||||
ValidateDict(values, ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TIMESTAMP)));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void TestNumbers(int max_value, int repeat) {
|
||||
void IncrementValue(T* t) { ++(*t); }
|
||||
|
||||
template <> void IncrementValue(Decimal4Value* t) { ++(t->value()); }
|
||||
template <> void IncrementValue(Decimal8Value* t) { ++(t->value()); }
|
||||
template <> void IncrementValue(Decimal16Value* t) { ++(t->value()); }
|
||||
|
||||
template<typename T>
|
||||
void TestNumbers(int max_value, int repeat, int value_byte_size) {
|
||||
vector<T> values;
|
||||
for (T val = 0; val < max_value; ++val) {
|
||||
for (T val = 0; val < max_value; IncrementValue(&val)) {
|
||||
for (int i = 0; i < repeat; ++i) {
|
||||
values.push_back(val);
|
||||
}
|
||||
}
|
||||
ValidateDict(values);
|
||||
ValidateDict(values, value_byte_size);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void TestNumbers() {
|
||||
TestNumbers<T>(100, 1);
|
||||
TestNumbers<T>(1, 100);
|
||||
TestNumbers<T>(1, 1);
|
||||
TestNumbers<T>(1, 2);
|
||||
void TestNumbers(int value_byte_size) {
|
||||
TestNumbers<T>(100, 1, value_byte_size);
|
||||
TestNumbers<T>(1, 100, value_byte_size);
|
||||
TestNumbers<T>(1, 1, value_byte_size);
|
||||
TestNumbers<T>(1, 2, value_byte_size);
|
||||
}
|
||||
|
||||
TEST(DictTest, TestNumbers) {
|
||||
TestNumbers<int8_t>();
|
||||
TestNumbers<int16_t>();
|
||||
TestNumbers<int32_t>();
|
||||
TestNumbers<int64_t>();
|
||||
TestNumbers<float>();
|
||||
TestNumbers<double>();
|
||||
TestNumbers<int8_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TINYINT)));
|
||||
TestNumbers<int16_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_SMALLINT)));
|
||||
TestNumbers<int32_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_INT)));
|
||||
TestNumbers<int64_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_BIGINT)));
|
||||
TestNumbers<float>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_FLOAT)));
|
||||
TestNumbers<double>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_DOUBLE)));
|
||||
|
||||
for (int i = 1; i <=16; ++i) {
|
||||
if (i <= 4) TestNumbers<Decimal4Value>(i);
|
||||
if (i <= 8) TestNumbers<Decimal8Value>(i);
|
||||
TestNumbers<Decimal16Value>(i);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ inline bool KeyNormalizer::WriteNullBit(uint8_t null_bit, uint8_t* value, uint8_
|
||||
|
||||
template <typename ValueType>
|
||||
inline void KeyNormalizer::StoreFinalValue(ValueType value, void* dst, bool is_asc) {
|
||||
if (sizeof(ValueType) > 1) value = BitUtil::BigEndian(value);
|
||||
if (sizeof(ValueType) > 1) value = BitUtil::ToBigEndian(value);
|
||||
if (!is_asc) value = ~value;
|
||||
memcpy(dst, &value, sizeof(ValueType));
|
||||
}
|
||||
|
||||
@@ -54,12 +54,12 @@ enum ConvertedType {
|
||||
/** a key/value pair is converted into a group of two fields */
|
||||
MAP_KEY_VALUE = 2;
|
||||
|
||||
/** a list is converted into an optional field containing a repeated field for its
|
||||
/** a list is converted into an optional field containing a repeated field for its
|
||||
* values */
|
||||
LIST = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Representation of Schemas
|
||||
*/
|
||||
enum FieldRepetitionType {
|
||||
@@ -108,6 +108,13 @@ struct SchemaElement {
|
||||
* Used to record the original type to help with cross conversion.
|
||||
*/
|
||||
6: optional ConvertedType converted_type;
|
||||
|
||||
/** Used when this column contains decimal data, in which case the data
|
||||
* is encoded without the decimal point (e.g. 1.23 is encoded as 123).
|
||||
* The scale is stored as part of the metadata on where the decimal point is.
|
||||
*/
|
||||
7: optional i32 scale
|
||||
8: optional i32 precision
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -122,7 +129,7 @@ enum Encoding {
|
||||
* INT64 - 8 bytes per value. Stored as little-endian.
|
||||
* FLOAT - 4 bytes per value. IEEE. Stored as little-endian.
|
||||
* DOUBLE - 8 bytes per value. IEEE. Stored as little-endian.
|
||||
* BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
|
||||
* BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
|
||||
* FIXED_LEN_BYTE_ARRAY - Just the bytes.
|
||||
*/
|
||||
PLAIN = 0;
|
||||
@@ -130,8 +137,8 @@ enum Encoding {
|
||||
/** Group VarInt encoding for INT32/INT64. */
|
||||
GROUP_VAR_INT = 1;
|
||||
|
||||
/** Dictionary encoding. The values in the dictionary are encoded in the
|
||||
* plain type.
|
||||
/** Dictionary encoding. The values in the dictionary are encoded in the
|
||||
* plain type.
|
||||
*/
|
||||
PLAIN_DICTIONARY = 2;
|
||||
|
||||
@@ -145,7 +152,7 @@ enum Encoding {
|
||||
}
|
||||
|
||||
/**
|
||||
* Supported compression algorithms.
|
||||
* Supported compression algorithms.
|
||||
*/
|
||||
enum CompressionCodec {
|
||||
UNCOMPRESSED = 0;
|
||||
@@ -198,7 +205,7 @@ struct PageHeader {
|
||||
3: required i32 compressed_page_size
|
||||
|
||||
/** 32bit crc for the data below. This allows for disabling checksumming in HDFS
|
||||
* if only a few pages needs to be read
|
||||
* if only a few pages needs to be read
|
||||
**/
|
||||
4: optional i32 crc
|
||||
|
||||
@@ -208,7 +215,7 @@ struct PageHeader {
|
||||
7: optional DictionaryPageHeader dictionary_page_header;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Wrapper struct to store key values
|
||||
*/
|
||||
struct KeyValue {
|
||||
@@ -223,7 +230,7 @@ struct ColumnMetaData {
|
||||
/** Type of this column **/
|
||||
1: required Type type
|
||||
|
||||
/** Set of all encodings used for this column. The purpose is to validate
|
||||
/** Set of all encodings used for this column. The purpose is to validate
|
||||
* whether we can decode those pages. **/
|
||||
2: required list<Encoding> encodings
|
||||
|
||||
@@ -256,7 +263,7 @@ struct ColumnMetaData {
|
||||
}
|
||||
|
||||
struct ColumnChunk {
|
||||
/** File where column data is stored. If not set, assumed to be same file as
|
||||
/** File where column data is stored. If not set, assumed to be same file as
|
||||
* metadata. This path is relative to the current file.
|
||||
**/
|
||||
1: optional string file_path
|
||||
@@ -266,11 +273,11 @@ struct ColumnChunk {
|
||||
|
||||
/** Column metadata for this chunk. This is the same content as what is at
|
||||
* file_path/file_offset. Having it here has it replicated in the file
|
||||
* metadata.
|
||||
* metadata.
|
||||
**/
|
||||
3: optional ColumnMetaData meta_data
|
||||
}
|
||||
|
||||
|
||||
struct RowGroup {
|
||||
1: required list<ColumnChunk> columns
|
||||
|
||||
@@ -306,7 +313,7 @@ struct FileMetaData {
|
||||
5: optional list<KeyValue> key_value_metadata
|
||||
|
||||
/** String for application that wrote this file. This should be in the format
|
||||
* <Application> version <App Version> (build <App Build Hash>).
|
||||
* <Application> version <App Version> (build <App Build Hash>).
|
||||
* e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55)
|
||||
**/
|
||||
6: optional string created_by
|
||||
|
||||
4
testdata/bin/create-load-data.sh
vendored
4
testdata/bin/create-load-data.sh
vendored
@@ -139,10 +139,6 @@ hadoop fs -rm /test-warehouse/alltypes_text_lzo/year=2009/month=1/000013_0.lzo.i
|
||||
hadoop fs -put -f ${IMPALA_HOME}/testdata/tinytable_seq_snap/tinytable_seq_snap_header_only \
|
||||
/test-warehouse/tinytable_seq_snap
|
||||
|
||||
# Populate the decimal text tables.
|
||||
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/decimal_tbl.txt /test-warehouse/decimal_tbl
|
||||
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/decimal-tiny.txt /test-warehouse/decimal_tiny
|
||||
|
||||
# Create special table for testing Avro schema resolution
|
||||
# (see testdata/avro_schema_resolution/README)
|
||||
pushd ${IMPALA_HOME}/testdata/avro_schema_resolution
|
||||
|
||||
@@ -1232,9 +1232,11 @@ d5 DECIMAL(10, 5)
|
||||
---- ROW_FORMAT
|
||||
delimited fields terminated by ','
|
||||
---- LOAD
|
||||
-- TODO: Beeline on CDH4 doesn't understand decimal with precision and scale so can't
|
||||
-- do the data load. This is done in testdata/bin/create_load_data for now.
|
||||
-- Fix this by allowing an "hdfs put" as the load command.
|
||||
`hadoop fs -mkdir -p /test-warehouse/decimal_tbl && hadoop fs -put -f \
|
||||
${IMPALA_HOME}/testdata/data/decimal_tbl.txt /test-warehouse/decimal_tbl/
|
||||
---- DEPENDENT_LOAD
|
||||
INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
|
||||
select * from functional.{table_name};
|
||||
====
|
||||
---- DATASET
|
||||
functional
|
||||
@@ -1247,9 +1249,11 @@ c3 DECIMAL(1,1)
|
||||
---- ROW_FORMAT
|
||||
delimited fields terminated by ','
|
||||
---- LOAD
|
||||
-- TODO: Beeline on CDH4 doesn't understand decimal with precision and scale so can't
|
||||
-- do the data load. This is done in testdata/bin/create_load_data for now.
|
||||
-- Fix this by allowing an "hdfs put" as the load command.
|
||||
`hadoop fs -mkdir -p /test-warehouse/decimal_tiny && hadoop fs -put -f \
|
||||
${IMPALA_HOME}/testdata/data/decimal-tiny.txt /test-warehouse/decimal_tiny/
|
||||
---- DEPENDENT_LOAD
|
||||
INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
|
||||
select * from functional.{table_name};
|
||||
====
|
||||
---- DATASET
|
||||
functional
|
||||
|
||||
@@ -89,9 +89,11 @@ table_name:widerow, constraint:exclude, table_format:hbase/none/none
|
||||
table_name:nullformat_custom, constraint:exclude, table_format:hbase/none/none
|
||||
table_name:unsupported_types, constraint:exclude, table_format:hbase/none/none
|
||||
|
||||
# Decimal is only supported for text for now.
|
||||
# On CDH4, decimal can only be tested on formats Impala can write to (text and parquet)
|
||||
table_name:decimal_tbl, constraint:restrict_to, table_format:text/none/none
|
||||
table_name:decimal_tiny, constraint:restrict_to, table_format:text/none/none
|
||||
table_name:decimal_tbl, constraint:restrict_to, table_format:parquet/none/none
|
||||
table_name:decimal_tiny, constraint:restrict_to, table_format:parquet/none/none
|
||||
|
||||
# hbase doesn't seem to like very wide tables
|
||||
table_name:widetable_250_cols, constraint:exclude, table_format:hbase/none/none
|
||||
|
||||
|
@@ -21,12 +21,12 @@ class TestDecimalQueries(ImpalaTestSuite):
|
||||
cls.TestMatrix.add_dimension(
|
||||
TestDimension('batch_size', *TestDecimalQueries.BATCH_SIZES))
|
||||
|
||||
# TODO: add parquet when that is supported.
|
||||
# On CDH4, hive does not support decimal so we can't run these tests against
|
||||
# the other file formats. Enable them on C5.
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('table_format').file_format == 'text' and
|
||||
v.get_value('table_format').compression_codec == 'none')
|
||||
(v.get_value('table_format').file_format == 'text' and
|
||||
v.get_value('table_format').compression_codec == 'none') or
|
||||
v.get_value('table_format').file_format == 'parquet')
|
||||
|
||||
def test_queries(self, vector):
|
||||
new_vector = copy(vector)
|
||||
|
||||
Reference in New Issue
Block a user