IMPALA-10319: Support arbitrary encodings on Text files

As proposed in Jira, this implements decoding and encoding of text
buffers for Impala/Hive text tables. Given a table with
'serialization.encoding' property set, similarly to Hive, Impala should
be able to encode the inserted data into charset specified, consequently
saving it into a text file. The opposite decoding operation should be
performed upon reading data buffers from text files. Both operations
employ boost::locale::conv library.

Since Hive doesn't encode line delimiters, charsets that would have
delimiters stored differently from ASCII are not allowed.

One difference from Hive is that Impala implements
'serialization.encoding' only as a per partition serdeproperty to avoid
confusion of allowing both serde and tbl properties. (See related
IMPALA-13748)

Note: Due to precreated non-UTF-8 files present in the patch
'gerrit-code-review-checks' was performed locally. (See IMPALA-14100)

Change-Id: I787cd01caa52a19d6645519a6cedabe0a5253a65
Reviewed-on: http://gerrit.cloudera.org:8080/22049
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Mihaly Szjatinya
2025-06-01 15:36:48 +02:00
committed by Impala Public Jenkins
parent f8a1f6046a
commit 4837cedc79
34 changed files with 1063 additions and 20 deletions

View File

@@ -313,8 +313,9 @@ class HdfsScanner {
/// The most recently used decompression type.
THdfsCompression::type decompression_type_ = THdfsCompression::NONE;
/// Pool to allocate per data block memory. This should be used with the
/// decompressor and any other per data block allocations.
/// Pool to allocate per data block memory. This should be used with the decompressor
/// and any other per data block allocations. In case of decoding it contains resulting
/// decoded data.
boost::scoped_ptr<MemPool> data_buffer_pool_;
/// Offsets of string slots in the result tuple that may need to be copied as part of

View File

@@ -28,6 +28,7 @@
#include "util/coding-util.h"
#include "util/hdfs-util.h"
#include "util/runtime-profile-counters.h"
#include "util/char-codec.h"
#include <hdfs.h>
#include <stdlib.h>
@@ -52,8 +53,16 @@ HdfsTextTableWriter::HdfsTextTableWriter(TableSinkBase* parent,
rowbatch_stringstream_.precision(RawValue::ASCII_PRECISION);
}
HdfsTextTableWriter::~HdfsTextTableWriter() {}
Status HdfsTextTableWriter::Init() {
parent_->mem_tracker()->Consume(flush_size_);
const auto& encoding = output_->partition_descriptor->encoding_value();
if (!encoding.empty() && encoding != "UTF-8") {
encoder_.reset(new CharCodec(nullptr, encoding));
}
return Status::OK();
}
@@ -147,7 +156,13 @@ Status HdfsTextTableWriter::InitNewFile() {
}
Status HdfsTextTableWriter::Flush() {
string rowbatch_string = rowbatch_stringstream_.str();
string rowbatch_string;
if (encoder_) {
RETURN_IF_ERROR(encoder_->EncodeBuffer(
rowbatch_stringstream_.str(), &rowbatch_string));
} else {
rowbatch_string = rowbatch_stringstream_.str();
}
rowbatch_stringstream_.str(string());
const uint8_t* data =
reinterpret_cast<const uint8_t*>(rowbatch_string.data());

View File

@@ -37,6 +37,7 @@ class RuntimeState;
class StringValue;
class TupleDescriptor;
class TupleRow;
class CharCodec;
/// The writer consumes all rows passed to it and writes the evaluated output_exprs_
/// as delimited text into Hdfs files.
@@ -47,7 +48,7 @@ class HdfsTextTableWriter : public HdfsTableWriter {
const HdfsPartitionDescriptor* partition,
const HdfsTableDescriptor* table_desc);
~HdfsTextTableWriter() { }
~HdfsTextTableWriter();
virtual Status Init();
virtual Status Finalize();
@@ -87,6 +88,9 @@ class HdfsTextTableWriter : public HdfsTableWriter {
/// Stringstream to buffer output. The stream is cleared between HDFS
/// Write calls to allow for the internal buffers to be reused.
std::stringstream rowbatch_stringstream_;
/// For non-utf8 text files
std::unique_ptr<CharCodec> encoder_;
};
}

View File

@@ -45,7 +45,7 @@ class TupleRow;
/// Columnar formats have multiple streams per context object.
/// This class handles stitching data split across IO buffers and providing
/// some basic parsing utilities.
/// This class it *not* thread safe. It is designed to have a single scanner thread
/// This class is *not* thread safe. It is designed to have a single scanner thread
/// reading from it.
//
/// Each scanner context maps to a single hdfs split. There are three threads that
@@ -192,7 +192,7 @@ class ScannerContext {
bool SkipBytes(int64_t length, Status* status) WARN_UNUSED_RESULT;
/// Read length bytes into the supplied buffer. The returned buffer is owned
/// by this object The memory is owned by and should not be modified. The contents
/// by this object. The memory is owned by and should not be modified. The contents
/// of the buffer are invalidated after subsequent calls to GetBytes()/ReadBytes().
/// Returns true on success, otherwise returns false and sets 'status' to
/// indicate the error.

View File

@@ -55,6 +55,7 @@
#include "util/error-util.h"
#include "util/runtime-profile-counters.h"
#include "util/stopwatch.h"
#include "util/char-codec.h"
#include "common/names.h"
@@ -249,6 +250,12 @@ Status HdfsTextScanner::InitNewRange() {
scan_node_->hdfs_table()->null_column_value(), true,
state_->strict_mode()));
const auto& encoding = hdfs_partition->encoding_value();
if (!encoding.empty() && encoding != "UTF-8") {
decoder_.reset(new CharCodec(data_buffer_pool_.get(), encoding,
hdfs_partition->line_delim(), scan_node_->tuple_desc()->string_slots().empty()));
}
RETURN_IF_ERROR(ResetScanner());
scan_state_ = SCAN_RANGE_INITIALIZED;
return Status::OK();
@@ -535,6 +542,12 @@ Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes)
*eosr = byte_buffer_read_size_ == 0 ? true : stream_->eosr();
}
if (decoder_.get() != nullptr) {
SCOPED_TIMER(decode_timer_);
RETURN_IF_ERROR(decoder_->DecodeBuffer(reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
&byte_buffer_read_size_, pool, *eosr, decompressor_.get() != nullptr, context_));
}
byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_;
return Status::OK();
}
@@ -658,6 +671,7 @@ Status HdfsTextScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(HdfsScanner::Open(context));
parse_delimiter_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DelimiterParseTime");
decode_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecodingTime");
// Allocate the scratch space for two pass parsing. The most fields we can go
// through in one parse pass is the batch size (tuples) * the number of fields per tuple
@@ -731,7 +745,8 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
if (num_tuples > 0) {
// Need to copy out strings if they may reference the original I/O buffer.
const bool copy_strings = !string_slot_offsets_.empty() &&
stream_->file_desc()->file_compression == THdfsCompression::NONE;
stream_->file_desc()->file_compression == THdfsCompression::NONE &&
decoder_.get() == nullptr;
int max_added_tuples = (scan_node_->limit() == -1) ?
num_tuples :
scan_node_->limit() - scan_node_->rows_returned_shared();

View File

@@ -29,6 +29,7 @@ template<bool>
class DelimitedTextParser;
class ScannerContext;
struct HdfsFileDesc;
class CharCodec;
/// HdfsScanner implementation that understands text-formatted records.
/// Uses SSE instructions, if available, for performance.
@@ -175,7 +176,8 @@ class HdfsTextScanner : public HdfsScanner {
/// decompression functions DecompressFileToBuffer()/DecompressStreamToBuffer().
/// If applicable, attaches decompression buffers from previous calls that might still
/// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are
/// freed instead.
/// freed instead. In case of decoding calls decoder_->DecodeBuffer() which overwrites
/// the byte_buffer_ptr_ with decoded data on data_buffer_pool_.
///
/// Subclasses can override this function to implement different behaviour.
/// TODO: IMPALA-6146: rethink this interface - having subclasses modify member
@@ -223,7 +225,7 @@ class HdfsTextScanner : public HdfsScanner {
/// Mem pool for boundary_row_, boundary_column_, partial_tuple_ and any variable length
/// data that is pointed at by the partial tuple. Does not hold any tuple data
/// of returned batches, because the data is always deep-copied into the output batch.
boost::scoped_ptr<MemPool> boundary_pool_;
std::unique_ptr<MemPool> boundary_pool_;
/// Helper string for dealing with input rows that span file blocks. We keep track of
/// a whole line that spans file blocks to be able to report the line as erroneous in
@@ -239,7 +241,7 @@ class HdfsTextScanner : public HdfsScanner {
int slot_idx_;
/// Helper class for picking fields and rows from delimited text.
boost::scoped_ptr<DelimitedTextParser<true>> delimited_text_parser_;
std::unique_ptr<DelimitedTextParser<true>> delimited_text_parser_;
/// Return field locations from the Delimited Text Parser.
std::vector<FieldLocation> field_locations_;
@@ -265,6 +267,13 @@ class HdfsTextScanner : public HdfsScanner {
/// Time parsing text files
RuntimeProfile::Counter* parse_delimiter_timer_;
/// For non-utf8 text files
std::unique_ptr<CharCodec> decoder_;
/// Time spent decoding bytes
RuntimeProfile::Counter* decode_timer_;
};
}

View File

@@ -220,6 +220,7 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(
block_size_ = sd.blockSize;
file_format_ = sd.fileFormat;
json_binary_format_ = sd.jsonBinaryFormat;
encoding_value_ = sd.__isset.encodingValue ? sd.encodingValue : "";
DecompressLocation(thrift_table, thrift_partition, &location_);
}

View File

@@ -399,6 +399,7 @@ class HdfsPartitionDescriptor {
char escape_char() const { return escape_char_; }
THdfsFileFormat::type file_format() const { return file_format_; }
int block_size() const { return block_size_; }
const std::string& encoding_value() const { return encoding_value_; }
const std::string& location() const { return location_; }
int64_t id() const { return id_; }
TJsonBinaryFormat::type json_binary_format() const { return json_binary_format_; }
@@ -419,6 +420,7 @@ class HdfsPartitionDescriptor {
char collection_delim_;
char escape_char_;
int block_size_;
std::string encoding_value_;
// TODO: use the same representation as the Catalog does, in which common prefixes are
// stripped.
std::string location_;

View File

@@ -571,4 +571,23 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
}
return curr_consumption > max_consumption;
}
Status ScopedMemTracker::TryConsume(int64_t size) {
DCHECK(tracker_ != nullptr);
if (tracker_->TryConsume(size)) {
size_ += size;
} else {
Status status = tracker_->MemLimitExceeded(
nullptr, "Failed to allocate memory required by DecodeBuffer", size);
tracker_ = nullptr;
return status;
}
return Status::OK();
}
ScopedMemTracker::~ScopedMemTracker() {
if (tracker_ != nullptr && size_ > 0) {
tracker_->Release(size_);
}
}
}

View File

@@ -619,4 +619,17 @@ class MemTrackerAllocator : public Alloc {
typedef MemTrackerAllocator<char> CharMemTrackerAllocator;
typedef std::basic_string<char, std::char_traits<char>, CharMemTrackerAllocator>
TrackedString;
// Auxiliary class to track memory consumption for a scope of code.
class ScopedMemTracker {
public:
ScopedMemTracker(MemTracker* tracker) : tracker_(tracker), size_(0) {}
~ScopedMemTracker();
Status TryConsume(int64_t size);
private:
MemTracker* tracker_ = nullptr;
int64_t size_;
};
}

View File

@@ -510,7 +510,7 @@ class RowBatch {
/// The memory ownership depends on whether legacy joins and aggs are enabled.
///
/// Memory is malloc'd and owned by RowBatch and is freed upon its destruction. This is
/// more performant that allocating the pointers from 'tuple_data_pool_' especially
/// more performant than allocating the pointers from 'tuple_data_pool_' especially
/// with SubplanNodes in the ExecNode tree because the tuple pointers are not
/// transferred and do not have to be re-created in every Reset().
const int tuple_ptrs_size_;

View File

@@ -44,6 +44,7 @@ set(UTIL_SRCS
bit-packing.cc
bit-util.cc
bloom-filter.cc
char-codec.cc
cgroup-util.cc
coding-util.cc
codec.cc

194
be/src/util/char-codec.cc Normal file
View File

@@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/char-codec.h"
#include <boost/locale.hpp>
#include <string>
#include "exec/scanner-context.h"
#include "gutil/strings/substitute.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
using namespace impala;
using namespace strings;
CharCodec::CharCodec(MemPool* memory_pool, const std::string& encoding, char tuple_delim,
bool reuse_buffer)
: memory_pool_(memory_pool), encoding_(encoding), tuple_delim_(tuple_delim),
reuse_buffer_(reuse_buffer) {
}
const int CharCodec::MAX_SYMBOL = 4;
Status CharCodec::DecodeBuffer(uint8_t** buffer, int64_t* bytes_read, MemPool* pool,
bool eosr, bool decompress, ScannerContext* context) {
std::string result_prefix;
std::string result_core;
std::string result_suffix;
// We're about to create a new decoding buffer (if we can't reuse). Attach the
// memory from previous decoding rounds to 'pool'. In case of streaming decompression
// this is already done in DecompressStreamToBuffer().
if (!decompress && !reuse_buffer_) {
if (pool != nullptr) {
pool->AcquireData(memory_pool_, false);
} else {
memory_pool_->FreeAll();
}
out_buffer_ = nullptr;
}
uint8_t* buf_start = *buffer;
uint8_t* buf_end = buf_start + *bytes_read;
// Allocate memory twice the size of the input buffer to handle the worst case
ScopedMemTracker scoped_mem_tracker(memory_pool_->mem_tracker());
RETURN_IF_ERROR(scoped_mem_tracker.TryConsume((*bytes_read) * 2));
RETURN_IF_ERROR(HandlePrefix(&buf_start, buf_end, &result_prefix));
RETURN_IF_ERROR(HandleCore(&buf_start, buf_end, &result_core));
RETURN_IF_ERROR(HandleSuffix(&buf_start, buf_end, &result_suffix));
if (eosr && !partial_symbol_.empty()) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR,
"End of stream reached with partial symbol.");
}
// In case of decompression, decompressed data can be freed up after decoding
if (decompress) {
memory_pool_->FreeAll();
} else if (eosr) {
context->ReleaseCompletedResources(false);
}
// Concat the results onto the output buffer
*bytes_read = result_prefix.size() + result_core.size() + result_suffix.size();
if (out_buffer_ == nullptr || buffer_length_ < *bytes_read) {
buffer_length_ = *bytes_read;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(
"HdfsTextScanner::DecodeBuffer() failed to allocate $1 bytes.", *bytes_read);
return memory_pool_->mem_tracker()->MemLimitExceeded(nullptr, details, *bytes_read);
}
}
*buffer = out_buffer_;
memcpy(*buffer, result_prefix.data(), result_prefix.size());
memcpy(*buffer + result_prefix.size(), result_core.data(), result_core.size());
memcpy(*buffer + result_prefix.size() + result_core.size(),
result_suffix.data(), result_suffix.size());
return Status::OK();
}
Status CharCodec::HandlePrefix(uint8_t** buf_start, uint8_t* buf_end,
std::string* result_prefix) {
if (!partial_symbol_.empty()) {
std::vector<uint8_t> prefix;
prefix.reserve(MAX_SYMBOL);
prefix.assign(partial_symbol_.begin(), partial_symbol_.end());
bool success = false;
DCHECK_LT(partial_symbol_.size(), MAX_SYMBOL);
for (int i = 0; partial_symbol_.size() + i < MAX_SYMBOL && *buf_start + i < buf_end;
++i) {
prefix.push_back((*buf_start)[i]);
try {
*result_prefix =
boost::locale::conv::to_utf<char>(reinterpret_cast<char*>(prefix.data()),
reinterpret_cast<char*>(prefix.data()) + prefix.size(), encoding_,
boost::locale::conv::stop);
success = true;
*buf_start += i + 1;
break;
} catch (boost::locale::conv::conversion_error&) {
continue;
} catch (const std::exception& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
}
}
if (!success) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, "Unable to decode buffer");
}
partial_symbol_.clear();
}
return Status::OK();
}
Status CharCodec::HandleCore(uint8_t** buf_start, uint8_t* buf_end,
std::string* result_core) {
uint8_t* last_delim =
std::find_end(*buf_start, buf_end, &tuple_delim_, &tuple_delim_ + 1);
if (last_delim != buf_end) {
try {
*result_core = boost::locale::conv::to_utf<char>(
reinterpret_cast<char*>(*buf_start), reinterpret_cast<char*>(last_delim) + 1,
encoding_, boost::locale::conv::stop);
} catch (boost::locale::conv::conversion_error& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
} catch (const std::exception& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
}
*buf_start = last_delim + 1;
}
return Status::OK();
}
Status CharCodec::HandleSuffix(uint8_t** buf_start, uint8_t* buf_end,
std::string* result_suffix) {
if (*buf_start < buf_end) {
bool success = false;
uint8_t* end = buf_end;
while (buf_end - end < MAX_SYMBOL && end > *buf_start) {
try {
*result_suffix =
boost::locale::conv::to_utf<char>(reinterpret_cast<char*>(*buf_start),
reinterpret_cast<char*>(end), encoding_, boost::locale::conv::stop);
success = true;
break;
} catch (boost::locale::conv::conversion_error&) {
--end;
} catch (const std::exception& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
}
}
if (!success && end > *buf_start) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, "Unable to decode buffer");
}
if (end < buf_end) {
partial_symbol_.assign(end, buf_end);
}
}
return Status::OK();
}
Status CharCodec::EncodeBuffer(const std::string& str, std::string* result) {
try {
*result = boost::locale::conv::from_utf<char>(
str, encoding_, boost::locale::conv::stop);
} catch (boost::locale::conv::conversion_error& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
} catch (const std::exception& e) {
return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
}
return Status::OK();
}

84
be/src/util/char-codec.h Normal file
View File

@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include "common/status.h"
namespace impala {
class MemPool;
class MemTracker;
class ScannerContext;
/// Class for encoding and decoding character buffers between different encodings and
/// UTF-8. Empolys the Boost.Locale library for encoding and decoding.
class CharCodec {
public:
static const int MAX_SYMBOL;
CharCodec(MemPool* memory_pool, const std::string& encoding, char tuple_delim = '\n',
bool reuse_buffer = false);
/// Decodes 'buffer' from 'encoding_' to UTF-8, handling partial symbols and delimiters.
///
/// The function processes the buffer in three parts:
/// 1. Prefix: attempts to complete partial_symbol_, stored from previous DecodeBuffer
/// call, by adding first bytes from buffer one by one.
/// 2. Core: Converts the main part of the buffer up to the last delimiter found.
/// 3. Suffix: in case buffer is split in the middle of a symbol, progressively
/// determines the incomplete part and stores it into partial_symbol_.
Status DecodeBuffer(uint8_t** buffer, int64_t* bytes_read, MemPool* pool, bool eosr,
bool decompress, ScannerContext* context);
/// Encodes 'str' from UTF-8 into a given 'encoding_'. Since
/// HdfsTextTableWriter::Flush(), currently being the only client of this function,
/// always flushes the buffer at the end of the row, we don't need to handle partial
/// symbols here.
Status EncodeBuffer(const std::string& str, std::string* result);
private:
Status HandlePrefix(uint8_t** buf_start, uint8_t* buf_end, std::string* result_prefix);
Status HandleCore(uint8_t** buf_start, uint8_t* buf_end, std::string* result_core);
Status HandleSuffix(uint8_t** buf_start, uint8_t* buf_end, std::string* result_suffix);
/// Pool to allocate the buffer to hold transformed data.
MemPool* memory_pool_ = nullptr;
/// Name of the encoding of the input / output data.
std::string encoding_;
/// The following members are only used by DecodeBuffer:
/// Delimiter used to separate tuples.
const char tuple_delim_;
/// Buffer to hold the partial symbol that could not be decoded in the previous call to
/// DecodeBuffer.
std::vector<uint8_t> partial_symbol_;
/// Can we reuse the output buffer or do we need to allocate on each call?
bool reuse_buffer_;
/// Buffer to hold transformed data.
uint8_t* out_buffer_ = nullptr;
/// Length of the output buffer.
int64_t buffer_length_ = 0;
};
}

View File

@@ -137,6 +137,7 @@ testdata/AllTypesErrorNoNulls/*.txt
*.avsc
*.parq
*.parquet
testdata/charcodec/*
testdata/cluster/hive/*.diff
testdata/cluster/node_templates/cdh5/etc/hadoop/conf/*.xml.tmpl
testdata/cluster/node_templates/common/etc/kudu/*.conf.tmpl

View File

@@ -365,6 +365,7 @@ struct THdfsStorageDescriptor {
7: required THdfsFileFormat fileFormat
8: required i32 blockSize
9: optional TJsonBinaryFormat jsonBinaryFormat
10: optional string encodingValue
}
// Represents an HDFS partition

View File

@@ -500,7 +500,9 @@ error_codes = (
"cache entry ($0 bytes)"),
("TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED", 163, "Outstanding tuple cache writes "
"exceeded the limit ($0 bytes)")
"exceeded the limit ($0 bytes)"),
("CHARSET_CONVERSION_ERROR", 164, "Error during buffer conversion: $0")
)
import sys

View File

@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.mr.Catalogs;
@@ -30,14 +31,18 @@ import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.catalog.DataSourceTable;
import org.apache.impala.catalog.FeDataSourceTable;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsStorageDescriptor;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.local.LocalCatalogException;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TCompressionCodec;
@@ -50,12 +55,15 @@ import org.apache.impala.util.AvroSchemaParser;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* Represents an ALTER TABLE SET [PARTITION ('k1'='a', 'k2'='b'...)]
@@ -134,6 +142,9 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
// Analyze 'sort.columns' property.
analyzeSortColumns(getTargetTable(), tblProperties_);
// Analyze 'serialization.encoding' property
analyzeSerializationEncoding(tblProperties_);
}
private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
@@ -267,6 +278,90 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
}
}
/**
* Analyzes the 'serialization.encoding' property in 'tblProperties' to check if its
* line delimiter is compatible with ASCII, since multi-byte line delimiters are not
* supported.
*/
public void analyzeSerializationEncoding(Map<String, String> tblProperties)
throws AnalysisException {
if (!tblProperties.containsKey(serdeConstants.SERIALIZATION_ENCODING)
|| !getOperation().equals("SET SERDEPROPERTIES")) {
return;
}
FeTable tbl = getTargetTable();
if (!(tbl instanceof FeFsTable)) {
throw new AnalysisException(
String.format("Property 'serialization.encoding' is only supported "
+ "on HDFS tables. Conflicting table: %s", tbl.getFullName()));
}
if (partitionSet_ != null) {
for (FeFsPartition partition: partitionSet_.getPartitions()) {
if (partition.getFileFormat() != HdfsFileFormat.TEXT) {
throw new AnalysisException(String.format("Property 'serialization.encoding' "
+ "is only supported on TEXT file format. "
+ "Conflicting partition/format: %s %s",
partition.getPartitionName(), partition.getFileFormat().name()));
}
}
} else {
StorageDescriptor sd = tbl.getMetaStoreTable().getSd();
HdfsFileFormat format = HdfsFileFormat.fromHdfsInputFormatClass(
sd.getInputFormat(), sd.getSerdeInfo().getSerializationLib());
if (format != HdfsFileFormat.TEXT) {
throw new AnalysisException(String.format("Property 'serialization.encoding' "
+ "is only supported on TEXT file format. Conflicting "
+ "table/format: %s %s",
tbl.getFullName(), format.name()));
}
}
String encoding = tblProperties.get(serdeConstants.SERIALIZATION_ENCODING);
if (!Charset.isSupported(encoding)) {
throw new AnalysisException(String.format("Unsupported encoding: %s.", encoding));
}
Charset charset = Charset.forName(encoding);
if (partitionSet_ != null) {
for (FeFsPartition partition : partitionSet_.getPartitions()) {
if (!isLineDelimiterSameAsAscii(
partition.getInputFormatDescriptor().getLineDelim(), charset)) {
throw new AnalysisException(String.format(
"Property 'serialization.encoding' only supports " +
"encodings in which line delimiter is compatible with ASCII. " +
"Conflicting partition: %s. " +
"Please refer to IMPALA-10319 for more info.",
partition.getPartitionName()));
}
}
} else {
StorageDescriptor sd = tbl.getMetaStoreTable().getSd();
HdfsStorageDescriptor hdfsSD;
try {
hdfsSD = HdfsStorageDescriptor.fromStorageDescriptor(tbl.getName(), sd);
} catch (HdfsStorageDescriptor.InvalidStorageDescriptorException e) {
throw new LocalCatalogException(String.format(
"Invalid input format descriptor for table %s", table_.getFullName()), e);
}
if (!isLineDelimiterSameAsAscii(hdfsSD.getLineDelim(), charset)) {
throw new AnalysisException(String.format(
"Property 'serialization.encoding' only supports " +
"encodings in which line delimiter is compatible with ASCII. " +
"Conflicting table: %1$s. " +
"Please refer to IMPALA-10319 for more info.",
table_.getFullName()));
}
}
}
public static boolean isLineDelimiterSameAsAscii(byte lineDelim, Charset charset) {
String lineDelimStr = new String(new byte[]{lineDelim}, charset);
byte[] newlineBytesInEncoding = lineDelimStr.getBytes(charset);
byte[] newlineBytesInAscii = lineDelimStr.getBytes(StandardCharsets.US_ASCII);
return java.util.Arrays.equals(newlineBytesInEncoding, newlineBytesInAscii);
}
/**
* Analyze the 'skip.header.line.count' property to make sure it is set to a valid
* value. It is looked up in 'tblProperties', which must not be null.

View File

@@ -17,18 +17,21 @@
package org.apache.impala.analysis;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.mr.Catalogs;
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.catalog.DataSourceTable;
import org.apache.impala.catalog.HdfsStorageDescriptor;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.RowFormat;
@@ -321,6 +324,8 @@ public class CreateTableStmt extends StatementBase implements SingleTableStmt {
if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
computeLineageGraph(analyzer);
}
analyzeSerializationEncoding();
}
/**
@@ -931,4 +936,43 @@ public class CreateTableStmt extends StatementBase implements SingleTableStmt {
return isExternal() && !Boolean.parseBoolean(getTblProperties().get(
Table.TBL_PROP_EXTERNAL_TABLE_PURGE));
}
/**
* Analyzes the 'serialization.encoding' property in 'SerdeProperties' to check if its
* line delimiter is compatible with ASCII, since multi-byte line delimiters are not
* supported.
*/
public void analyzeSerializationEncoding()
throws AnalysisException {
if (!getSerdeProperties().containsKey(serdeConstants.SERIALIZATION_ENCODING)) {
return;
}
if (getFileFormat() != THdfsFileFormat.TEXT) {
throw new AnalysisException(String.format("Property 'serialization.encoding' is "
+ "only supported on TEXT file format. Conflicting "
+ "table/format: %1$s / %2$s",
tableDef_.getTblName().toString(), getFileFormat().name()));
}
String encoding = getSerdeProperties().get(serdeConstants.SERIALIZATION_ENCODING);
if (!Charset.isSupported(encoding)) {
throw new AnalysisException("Unsupported encoding: " + encoding + ".");
}
byte lineDelimiter = getRowFormat() == null ?
(byte) HdfsStorageDescriptor.DEFAULT_LINE_DELIM :
// assuming RowFormat analysis filtered out multi-byte line delimiters
(byte) getRowFormat().getLineDelimiter().charAt(0);
Charset charset = Charset.forName(encoding);
if (!AlterTableSetTblProperties.isLineDelimiterSameAsAscii(lineDelimiter, charset)) {
throw new AnalysisException(
String.format("Property 'serialization.encoding' only supports "
+ "encodings in which line delimiter is compatible with ASCII. "
+ "Conflicting table: %1$s. "
+ "Please refer to IMPALA-10319 for more info.",
tableDef_.getTblName().toString()));
}
}
}

View File

@@ -95,6 +95,7 @@ public class HdfsStorageDescriptor {
private final byte quoteChar_;
private final int blockSize_;
private final TJsonBinaryFormat jsonBinaryFormat_;
private String encodingValue_ = null;
/**
* Returns a map from delimiter key to a single delimiter character,
@@ -171,7 +172,8 @@ public class HdfsStorageDescriptor {
private HdfsStorageDescriptor(String tblName, HdfsFileFormat fileFormat, byte lineDelim,
byte fieldDelim, byte collectionDelim, byte mapKeyDelim, byte escapeChar,
byte quoteChar, int blockSize, TJsonBinaryFormat jsonBinaryFormat) {
byte quoteChar, int blockSize, TJsonBinaryFormat jsonBinaryFormat,
String encodingValue) {
this.fileFormat_ = fileFormat;
this.lineDelim_ = lineDelim;
this.fieldDelim_ = fieldDelim;
@@ -180,6 +182,7 @@ public class HdfsStorageDescriptor {
this.quoteChar_ = quoteChar;
this.blockSize_ = blockSize;
this.jsonBinaryFormat_ = jsonBinaryFormat;
this.encodingValue_ = encodingValue;
// You can set the escape character as a tuple or row delim. Empirically,
// this is ignored by hive.
@@ -247,6 +250,8 @@ public class HdfsStorageDescriptor {
jsonBinaryFormat = null;
}
String encodingValue = parameters.get(serdeConstants.SERIALIZATION_ENCODING);
try {
return INTERNER.intern(new HdfsStorageDescriptor(tblName,
HdfsFileFormat.fromJavaClassName(
@@ -257,7 +262,7 @@ public class HdfsStorageDescriptor {
delimMap.get(serdeConstants.MAPKEY_DELIM),
delimMap.get(serdeConstants.ESCAPE_CHAR),
delimMap.get(serdeConstants.QUOTE_CHAR),
blockSize, jsonBinaryFormat));
blockSize, jsonBinaryFormat, encodingValue));
} catch (IllegalArgumentException ex) {
// Thrown by fromJavaClassName
throw new InvalidStorageDescriptorException(ex);
@@ -270,19 +275,24 @@ public class HdfsStorageDescriptor {
HdfsFileFormat.fromThrift(tDesc.getFileFormat()), tDesc.lineDelim,
tDesc.fieldDelim, tDesc.collectionDelim, tDesc.mapKeyDelim, tDesc.escapeChar,
tDesc.quoteChar, tDesc.blockSize, tDesc.isSetJsonBinaryFormat() ?
tDesc.getJsonBinaryFormat() : null));
tDesc.getJsonBinaryFormat() : null, tDesc.encodingValue));
}
public THdfsStorageDescriptor toThrift() {
return new THdfsStorageDescriptor(lineDelim_, fieldDelim_, collectionDelim_,
mapKeyDelim_, escapeChar_, quoteChar_, fileFormat_.toThrift(), blockSize_)
.setJsonBinaryFormat(jsonBinaryFormat_);
THdfsStorageDescriptor tHdfsStorageDescriptor = new THdfsStorageDescriptor(
lineDelim_, fieldDelim_, collectionDelim_, mapKeyDelim_, escapeChar_, quoteChar_,
fileFormat_.toThrift(), blockSize_);
if (encodingValue_ != null) {
tHdfsStorageDescriptor.setEncodingValue(encodingValue_);
}
tHdfsStorageDescriptor.setJsonBinaryFormat(jsonBinaryFormat_);
return tHdfsStorageDescriptor;
}
public HdfsStorageDescriptor cloneWithChangedFileFormat(HdfsFileFormat newFormat) {
return INTERNER.intern(new HdfsStorageDescriptor(
"<unknown>", newFormat, lineDelim_, fieldDelim_, collectionDelim_, mapKeyDelim_,
escapeChar_, quoteChar_, blockSize_, jsonBinaryFormat_));
escapeChar_, quoteChar_, blockSize_, jsonBinaryFormat_, encodingValue_));
}
public byte getLineDelim() { return lineDelim_; }
@@ -293,11 +303,13 @@ public class HdfsStorageDescriptor {
public HdfsFileFormat getFileFormat() { return fileFormat_; }
public int getBlockSize() { return blockSize_; }
public TJsonBinaryFormat getJsonBinaryFormat() { return jsonBinaryFormat_; }
public String getEncodingValue() { return encodingValue_; }
@Override
public int hashCode() {
return Objects.hash(blockSize_, collectionDelim_, escapeChar_, fieldDelim_,
fileFormat_, lineDelim_, mapKeyDelim_, quoteChar_, jsonBinaryFormat_);
fileFormat_, lineDelim_, mapKeyDelim_, quoteChar_, jsonBinaryFormat_,
encodingValue_);
}
@Override
@@ -315,6 +327,7 @@ public class HdfsStorageDescriptor {
if (mapKeyDelim_ != other.mapKeyDelim_) return false;
if (quoteChar_ != other.quoteChar_) return false;
if (jsonBinaryFormat_ != other.jsonBinaryFormat_) return false;
if (encodingValue_ != other.encodingValue_) return false;
return true;
}
}

View File

@@ -1063,6 +1063,64 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Cannot ALTER TABLE SET on an HBase table.
AnalysisError("alter table functional_hbase.alltypes set tblproperties('a'='b')",
"ALTER TABLE SET not currently supported on HBase tables.");
// serialization.encoding
AnalyzesOk("alter table functional.alltypes set serdeproperties(" +
"'serialization.encoding'='GBK')");
AnalyzesOk("alter table functional.text_dollar_hash_pipe set serdeproperties(" +
"'serialization.encoding'='GBK')");
AnalyzesOk("alter table functional.alltypes partition(year=2010, month=12) " +
"set serdeproperties('serialization.encoding'='GBK')");
String tmpTableName =
QueryStringBuilder.createTmpTableName("functional", "tmp_table");
AnalyzesOk("create table " + tmpTableName + " (id int) with serdeproperties(" +
"'serialization.encoding'='GBK')");
String [] unsupportedFileFormatDbs =
{"functional_parquet", "functional_rc", "functional_avro"};
for (String format: unsupportedFileFormatDbs) {
AnalysisError("alter table " + format + ".alltypes set serdeproperties(" +
"'serialization.encoding'='GBK')", "Property 'serialization.encoding' is " +
"only supported on TEXT file format");
}
String[] unsupportedFileFormats = {
"parquet", "rcfile", "avro", "iceberg"};
for (String format: unsupportedFileFormats) {
AnalysisError("create table " + tmpTableName + " (id int) with serdeproperties(" +
"'serialization.encoding'='GBK') stored as " + format,
"Property 'serialization.encoding' is only supported on TEXT file format");
}
AnalysisError("alter table functional_kudu.alltypes set serdeproperties( " +
"'serialization.encoding'='GBK')", "Property 'serialization.encoding' is only " +
"supported on HDFS tables");
AnalysisError("alter table functional.alltypesmixedformat partition(year=2009) " +
"set serdeproperties('serialization.encoding'='GBK')", "Property " +
"'serialization.encoding' is only supported on TEXT file format");
AnalysisError("create table " + tmpTableName +
" (id int, primary key (id)) with serdeproperties(" +
"'serialization.encoding'='GBK') stored as kudu " +
"tblproperties('kudu.master_addresses'='localhost')",
"Property 'serialization.encoding' is only supported on TEXT file format");
AnalysisError("alter table functional.alltypes set serdeproperties(" +
"'serialization.encoding'='UTF-16')",
"Property 'serialization.encoding' only supports encodings in which line " +
"delimiter is compatible with ASCII.");
AnalysisError("alter table functional.alltypes partition(year=2010, month=12) " +
"set serdeproperties('serialization.encoding'='UTF-16')",
"Property 'serialization.encoding' only supports encodings in which line " +
"delimiter is compatible with ASCII.");
AnalysisError("alter table functional.alltypes set serdeproperties(" +
"'serialization.encoding'='NonexistentEncoding')",
"Unsupported encoding: NonexistentEncoding.");
AnalysisError("create table " + tmpTableName + " (id int) with serdeproperties(" +
"'serialization.encoding'='UTF-16')",
"Property 'serialization.encoding' only supports encodings in which line " +
"delimiter is compatible with ASCII.");
AnalysisError("create table " + tmpTableName + " (id int) with serdeproperties(" +
"'serialization.encoding'='NonexistentEncoding')",
"Unsupported encoding: NonexistentEncoding.");
}
@Test

3
testdata/charcodec/cp1251_names.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@@ -0,0 +1,3 @@
Алексей
Мария
Дмитрий

3
testdata/charcodec/gbk_names.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>

BIN
testdata/charcodec/gbk_names_error.txt vendored Normal file

Binary file not shown.

3
testdata/charcodec/gbk_names_utf8.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
张三
李四
王五

3
testdata/charcodec/koi8r_names.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@@ -0,0 +1,3 @@
Алексей
Мария
Дмитрий

3
testdata/charcodec/latin1_names.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
Jos<EFBFBD>
Fran<EFBFBD>ois
G<EFBFBD>ran

View File

@@ -0,0 +1,3 @@
José
François
Göran

View File

@@ -0,0 +1,3 @@
<EFBFBD>R<EFBFBD>c<EFBFBD><EFBFBD><EFBFBD>Y
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ԏq
<EFBFBD><EFBFBD><EFBFBD>؎<EFBFBD><EFBFBD>Y

Binary file not shown.

View File

@@ -0,0 +1,3 @@
山田太郎
佐藤花子
鈴木次郎

View File

@@ -0,0 +1,441 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_vector import ImpalaTestDimension
from tests.common.skip import SkipIfFS
import codecs
import os
import pytest
import random
import tempfile
import shutil
import sys
if sys.version_info[0] >= 3:
unichr = chr # Python 3
_hiragana_range = [codepoint for codepoint in range(0x3040, 0x309F) if codepoint not in
# problematic symbols: unassigned, deprecated, etc:
set([0x3040, 0x3094, 0x3095, 0x3096, 0x3097, 0x3098, 0x3099, 0x309A, 0x309B, 0x309C])]
_cyrillic_range = [codepoint for codepoint in range(0x0410, 0x045F) if codepoint not in
# problematic symbols: unassigned, deprecated, etc:
set([0x0450, 0x0452, 0x0453, 0x0454, 0x0455, 0x0456, 0x0457, 0x0458,
0x0459, 0x045A, 0x045B, 0x045C, 0x045D, 0x045E])]
_charsets = {
'gbk': u''.join(unichr(i) for i in range(0x4E00, 0x9FA6)),
'latin1': u''.join(unichr(i) for i in range(0x20, 0x7F)),
'shift_jis': u''.join(unichr(i) for i in _hiragana_range),
'cp1251': u''.join(unichr(i) for i in range(0x0410, 0x044F)),
'koi8-r': u''.join(unichr(i) for i in _cyrillic_range)
}
def _generate_random_word(charset, min_length=1, max_length=20):
length = random.randint(min_length, max_length)
return u''.join(random.choice(charset) for _ in range(length))
def _compare_tables(selfobj, db, utf8_table, encoded_table, row_count):
# Compare count(*) of the encoded table with the utf8 table
count_utf8 = selfobj.client.execute("""select count(*) from {}.{}"""
.format(db, utf8_table))
count_encoded = selfobj.client.execute("""select count(*) from {}.{}"""
.format(db, encoded_table))
assert int(count_utf8.get_data()) == int(count_encoded.get_data()) == row_count
# Compare * of the encoded table with the utf8 table
result = selfobj.client.execute("""select * from {}.{} except select * from {}.{}
union all select * from {}.{} except select * from {}.{}"""
.format(db, utf8_table, db, encoded_table, db, encoded_table, db, utf8_table))
assert result.data == []
# Tests with auto-generated data
class TestCharCodecGen(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestCharCodecGen, cls).add_test_dimensions()
encodings = list(_charsets.keys())
# Only run the tests for single 'gbk' encoding in non-exhaustive mode.
if cls.exploration_strategy() != 'exhaustive':
encodings = [enc for enc in encodings if enc == 'gbk']
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
'charset', *encodings))
# There is no reason to run these tests using all dimensions.
# See IMPALA-14063 for Sequence file format support.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'text'
and v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('exec_option')['disable_codegen'] is False)
# Basic Tests
####################################################################
def generate_text_files(self, encoding_name, charset, test_name,
num_lines=10000, words_per_line=5, num_files=1,
min_word_length=1, max_word_length=20):
lines_per_file = num_lines // num_files
file_paths = []
tmp_dir = tempfile.mkdtemp(dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
for file_index in range(num_files):
data_file_path = os.path.join(tmp_dir, "charcodec_{}_{}_utf8_{}.txt"
.format(encoding_name, test_name, file_index))
file_paths.append(data_file_path)
with codecs.open(data_file_path, 'w', encoding='utf-8') as file:
for _ in range(lines_per_file):
words = [_generate_random_word(charset, min_word_length, max_word_length)
for _ in range(words_per_line)]
line = u','.join(words)
file.write(line + u'\n')
return tmp_dir, file_paths, num_lines
def prepare_utf8_test_table(self, db, file_paths, encoding_name, vector):
encoding_name_tbl = encoding_name.replace('-', '')
tbl_name = "{}_gen_utf8".format(encoding_name_tbl)
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
.format(db, tbl_name))
for file_path in file_paths:
self.filesystem_client.copy_from_local(file_path,
self._get_table_location("{0}.{1}".format(db, tbl_name), vector))
# remove REFRESH when IMPALA-13749 is fixed
self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
return tbl_name
def prepare_encoded_test_table(self, db, utf8_table, encoding_name):
encoding_name_tbl = encoding_name.replace('-', '')
encoded_table = "{}_gen".format(encoding_name_tbl)
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
STORED AS TEXTFILE""".format(db, encoded_table))
self.execute_query("""ALTER TABLE {}.{}
SET SERDEPROPERTIES("serialization.encoding"="{}")"""
.format(db, encoded_table, encoding_name))
self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
self.execute_query("""INSERT OVERWRITE TABLE {}.{} SELECT * FROM {}.{}"""
.format(db, encoded_table, db, utf8_table))
return encoded_table
def test_enc_dec_gen(self, vector, unique_database):
"""Write encoded table with Impala and read it back."""
db = unique_database
encoding_name = vector.get_value('charset')
charset = _charsets[encoding_name]
tmp_dir, file_paths, row_count = self.generate_text_files(
encoding_name, charset, "gen")
utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, vector)
shutil.rmtree(tmp_dir)
encoded_table = self.prepare_encoded_test_table(db, utf8_table, encoding_name)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
def test_enc_dec_gen_long_words(self, vector, unique_database):
db = unique_database
encoding_name = vector.get_value('charset')
charset = _charsets[encoding_name]
tmp_dir, file_paths, row_count = self.generate_text_files(
encoding_name, charset, "gen", min_word_length=100, max_word_length=1000)
utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, vector)
shutil.rmtree(tmp_dir)
encoded_table = self.prepare_encoded_test_table(db, utf8_table, encoding_name)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
# Split-file tests
####################################################################
def test_enc_dec_gen_split(self, vector, unique_database):
"""Test table is split across multiple files."""
db = unique_database
encoding_name = vector.get_value('charset')
charset = _charsets[encoding_name]
tmp_dir, file_paths, row_count = self.generate_text_files(
encoding_name, charset, "split", num_lines=10000, words_per_line=5, num_files=5)
utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, vector)
shutil.rmtree(tmp_dir)
encoded_table = self.prepare_encoded_test_table(db, utf8_table, encoding_name)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
# Hive + Compression Tests
####################################################################
def prepare_encoded_test_table_compress(self, db, utf8_table, encoding_name, codec):
encoding_name_tbl = encoding_name.replace('-', '')
encoded_table = "{}_gen_{}".format(encoding_name_tbl, codec)
self.run_stmt_in_hive("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
STORED AS TEXTFILE""".format(db, encoded_table))
self.run_stmt_in_hive("""ALTER TABLE {}.{}
SET SERDEPROPERTIES("serialization.encoding"="{}")"""
.format(db, encoded_table, encoding_name))
self.run_stmt_in_hive("""set hive.exec.compress.output={};
set mapreduce.output.fileoutputformat.compress.codec=
org.apache.hadoop.io.compress.{}Codec;
INSERT OVERWRITE TABLE {}.{} SELECT * FROM {}.{}
""".format("false" if codec == "None" else "true",
codec, db, encoded_table, db, utf8_table))
return encoded_table
@SkipIfFS.hive
def test_enc_dec_gen_compress(self, vector, unique_database):
db = unique_database
encoding_name = vector.get_value('charset')
charset = _charsets[encoding_name]
tmp_dir, file_paths, row_count = self.generate_text_files(
encoding_name, charset, "compress", num_lines=10000)
utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, vector)
shutil.rmtree(tmp_dir)
# Snappy codec supports streaming, ZStandard does not
for codec in ["None", "Snappy", "ZStandard"]:
encoded_table = self.prepare_encoded_test_table_compress(db, utf8_table,
encoding_name, codec)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
# Partitions Tests
####################################################################
def prepare_utf8_test_table_partitions(self, db, file_paths, encoding_name, vector):
encoding_name_tbl = encoding_name.replace('-', '')
tbl_name = "{}_gen_utf8".format(encoding_name_tbl)
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
PARTITIONED BY (part STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
.format(db, tbl_name))
for i in range(len(file_paths)):
self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
.format(db, tbl_name, i))
part_url = os.path.join(
self._get_table_location("{0}.{1}".format(db, tbl_name), vector),
"part={}".format(i))
part_dir = part_url[part_url.index("/test-warehouse"):]
self.filesystem_client.make_dir(part_dir)
self.filesystem_client.copy_from_local(file_paths[i], part_dir)
self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
return tbl_name
def prepare_encoded_test_table_partitions(self, db, utf8_table, encoding_name,
file_paths):
encoding_name_tbl = encoding_name.replace('-', '')
encoded_table = "{}_gen".format(encoding_name_tbl)
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
PARTITIONED BY (part STRING)
STORED AS TEXTFILE""".format(db, encoded_table))
for i in range(len(file_paths)):
self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
.format(db, encoded_table, i))
self.execute_query("""ALTER TABLE {}.{} PARTITION (part='{}')
SET SERDEPROPERTIES("serialization.encoding"="{}")"""
.format(db, encoded_table, i, encoding_name))
self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
self.execute_query("""INSERT OVERWRITE TABLE {}.{} PARTITION (part='{}')
SELECT name1, name2, name3, name4, name5 FROM {}.{} WHERE part='{}'"""
.format(db, encoded_table, i, db, utf8_table, i))
return encoded_table
def test_enc_dec_gen_partitions(self, vector, unique_database):
db = unique_database
encoding_name = vector.get_value('charset')
charset = _charsets[encoding_name]
tmp_dir, file_paths, row_count = self.generate_text_files(
encoding_name, charset, "partitions", num_lines=10000, num_files=5)
utf8_table = self.prepare_utf8_test_table_partitions(
db, file_paths, encoding_name, vector)
shutil.rmtree(tmp_dir)
encoded_table = self.prepare_encoded_test_table_partitions(db,
utf8_table, encoding_name, file_paths)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
class TestCharCodecGenMixed(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestCharCodecGenMixed, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'text'
and v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('exec_option')['disable_codegen'] is False)
# Mixed Partitions Tests
####################################################################
def generate_text_files_mixed(self, test_file, num_lines=10000, words_per_line=5,
num_files=1):
lines_per_file = num_lines // num_files
file_paths = []
encodings = []
tmp_dir = tempfile.mkdtemp(dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
for i in range(num_files):
encoding_name, charset = random.choice(list(_charsets.items()))
data_file_path = os.path.join(tmp_dir, "charcodec_{}_{}_utf8_{}.txt"
.format(encoding_name, test_file, i))
encodings.append(encoding_name)
file_paths.append(data_file_path)
with codecs.open(data_file_path, 'w', encoding='utf-8') as file:
for _ in range(lines_per_file):
words = [_generate_random_word(charset) for _ in range(words_per_line)]
line = u','.join(words)
file.write(line + u'\n')
return tmp_dir, file_paths, encodings, num_lines
# Partitioned table with different encodings.
def prepare_utf8_test_table_partitions_mixed(self, db, file_paths, vector):
tbl_name = "mixed_gen_utf8"
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
PARTITIONED BY (part STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
.format(db, tbl_name))
for i in range(len(file_paths)):
self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
.format(db, tbl_name, i))
part_url = os.path.join(
self._get_table_location("{0}.{1}".format(db, tbl_name), vector),
"part={}".format(i))
part_dir = part_url[part_url.index("/test-warehouse"):]
self.filesystem_client.make_dir(part_dir)
self.filesystem_client.copy_from_local(file_paths[i], part_dir)
self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
return tbl_name
def prepare_encoded_test_table_partitions_mixed(self, db, utf8_table, encodings):
encoded_table = "mixed_gen"
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
PARTITIONED BY (part STRING)
STORED AS TEXTFILE""".format(db, encoded_table))
for i in range(len(encodings)):
self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
.format(db, encoded_table, i))
self.execute_query("""ALTER TABLE {}.{} PARTITION (part='{}')
SET SERDEPROPERTIES("serialization.encoding"="{}")"""
.format(db, encoded_table, i, encodings[i]))
self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
self.execute_query("""INSERT OVERWRITE TABLE {}.{} PARTITION (part='{}')
SELECT name1, name2, name3, name4, name5 FROM {}.{} WHERE part='{}'"""
.format(db, encoded_table, i, db, utf8_table, i))
return encoded_table
def test_enc_dec_gen_partitions_mixed(self, unique_database, vector):
db = unique_database
tmp_dir, file_paths, encodings, row_count = self.generate_text_files_mixed(
"mixed", num_lines=10000, num_files=5)
utf8_table = self.prepare_utf8_test_table_partitions_mixed(db, file_paths, vector)
shutil.rmtree(tmp_dir)
encoded_table = self.prepare_encoded_test_table_partitions_mixed(db,
utf8_table, encodings)
_compare_tables(self, db, utf8_table, encoded_table, row_count)
class TestCharCodecPreCreated(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestCharCodecPreCreated, cls).add_test_dimensions()
encodings = list(_charsets.keys())
# Only run the tests for single 'gbk' encoding in non-exhaustive mode.
if cls.exploration_strategy() != 'exhaustive':
encodings = [enc for enc in encodings if enc == 'gbk']
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
'charset', *encodings))
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'text'
and v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('exec_option')['disable_codegen'] is False)
def prepare_test_table(self, vector, db, tbl_name, datafile, encoding=None):
tbl_name = tbl_name.replace('-', '')
datafile = datafile.replace('-', '')
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (name STRING)
STORED AS TEXTFILE""".format(db, tbl_name))
if encoding:
self.execute_query("""ALTER TABLE {}.{} SET
SERDEPROPERTIES("serialization.encoding"="{}")"""
.format(db, tbl_name, encoding))
data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata",
"charcodec", datafile)
self.filesystem_client.copy_from_local(data_file_path,
self._get_table_location("{0}.{1}".format(db, tbl_name), vector))
self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
return tbl_name
def test_precreated_files(self, vector, unique_database):
"""Read encoded precreated files."""
db = unique_database
enc = vector.get_value('charset')
# Without SERDEPROPERTIES("serialization.encoding") data is read incorrectly
utf8_table = self.prepare_test_table(
vector, db, enc + '_names_utf8', enc + '_names_utf8.txt', None)
encoded_table = self.prepare_test_table(
vector, db, enc + '_names_none', enc + '_names.txt', None)
with pytest.raises(AssertionError) as exc_info:
_compare_tables(self, db, utf8_table, encoded_table, 3)
assert " == []" in str(exc_info.value)
# With SERDEPROPERTIES("serialization.encoding") data is read correctly
encoded_table = self.prepare_test_table(
vector, db, enc + '_names', enc + '_names.txt', enc)
_compare_tables(self, db, utf8_table, encoded_table, 3)
def test_precreated_decoding_with_errors(self, vector, unique_database):
db = unique_database
enc = vector.get_value('charset')
# Skip for promiscious encodings
if enc not in ['gbk', 'shift_jis']: pytest.skip()
encoded_table = self.prepare_test_table(
vector, db, enc + '_names_error', enc + '_names_error.txt', enc)
err = self.execute_query_expect_failure(
self.client, """select * from {}.{}""".format(db, encoded_table))
assert "Error during buffer conversion: Conversion failed" in str(err)
def test_precreated_encoding_with_errors(self, vector, unique_database):
db = unique_database
enc = vector.get_value('charset')
# Skip for promiscious encodings
if enc not in ['gbk', 'shift_jis']: pytest.skip()
encoded_table = self.prepare_test_table(
vector, db, enc + '_names_error', enc + '_names_error.txt', enc)
err = self.execute_query_expect_failure(self.client, """insert overwrite {}.{}
select cast(binary_col as string) from functional.binary_tbl"""
.format(db, encoded_table))
assert "Error during buffer conversion: Conversion failed" in str(err)
@SkipIfFS.hive
def test_read_from_hive(self, unique_database, vector):
"""Write table with Impala and read it back with Hive."""
db = unique_database
enc = vector.get_value('charset')
utf8_table = self.prepare_test_table(
vector, db, enc + '_names_utf8', enc + '_names_utf8.txt', None)
encoded_table = self.prepare_test_table(
vector, db, enc + '_names', enc + '_names.txt', enc)
self.execute_query(
"""insert overwrite {}.{} select * from {}.{}"""
.format(db, encoded_table, db, utf8_table))
result_hive = self.run_stmt_in_hive(
"""select name from {}.{}""".format(db, encoded_table))
result_impala = self.client.execute(
"""select name from {}.{}""".format(db, utf8_table))
result_hive_list = result_hive.strip().split('\n')[1:]
assert result_hive_list == result_impala.data