IMPALA-11877: (part 2) Add support for DELETE statements for PARTITIONED Iceberg tables

This patch adds support for the DELETE operation for partitioned
Iceberg tables. It does so by writing position delete files
(merge-on-read strategy). The delete files contain the file path
and file position of the deleted records. The delete files must
reside in the same partition as the data files they are referring
to.

To execute the DELETE statement for a given table 'tbl', we are
basically doing an INSERT to the virtual DELETE table
'tbl-POSITION-DELETE':

from:
 DELETE FROM ice_t WHERE id = 42;

to:
 INSERT INTO ice_t-POSITION-DELETE
 SELECT INPUT__FILE__NAME, FILE__POSITION
 FROM ice_t
 WHERE id = 42;

The above was true for unpartitioned Iceberg tables.

If the table is partitioned, we need to shuffle the rows around
executors based on the partitions they belong, then sort the rows
based on the partitions (also based on 'file_path' and 'pos'), so
writers can work on partitions sequentially.

To do this, we need to select the partition columns as well from the
table. But in case of partition-evolution there are different sets
of partition columns in each partition spec of the table. To overcome
this, this patchset introduces two additional virtual columns:

* PARTITION__SPEC__ID
* ICEBERG__PARTITION__SERIALIZED

PARTITION__SPEC__ID is an INT column that contains the Iceberg spec_id
for each row. ICEBERG__PARTITION__SERIALIZED is a BINARY column that
contains all partition values base64-encoded and dot-separated. E.g.:

select PARTITION__SPEC__ID, ICEBERG__PARTITION__SERIALIZED, * FROM ice_t
+---------------------+--------------------------------+---+---+
| partition__spec__id | iceberg__partition__serialized | i | j |
+---------------------+--------------------------------+---+---+
| 0                   | Mg==                           | 2 | 2 |
| 0                   | Mg==                           | 2 | 2 |
+---------------------+--------------------------------+---+---+

So for the INSERT we are shuffling the rows between executors based on
HASH(partition__spec__id, iceberg__partition__serialized) then each
writer fragment sorts the rows based on (partition__spec__id,
iceberg__partition__serialized, file_path, pos) before writing them out
to delete files. The IcebergDeleteSink has been smarten up in a way that
it creates a new delete file whenever it sees a row with a new
(partition__spec__id, iceberg__partition__serialized).

Some refactorings were also involved during implementing this patch set.
A lot of common code between IcebergDeleteSink and HdfsTableSink has
been moved to the common base class TableSinkBase. In the Frontend this
patch set also moves some common code of InsertStmt and ModifyStmt to a
new common base class DmlStatementBase.

Testing:
  * planner tests
  * e2e tests (including interop with Hive)
  * Did manual stress test with a TPCDS_3000.store_sales
  ** Table had 8 Billion rows, partitioned by column (ss_sold_date_sk)
  ** Deleted 800 Million rows using 10 Impala hosts
  ** Operation was successful, finished under a minute
  ** Created minimum number of delete files, i.e. one per partition

Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60
Reviewed-on: http://gerrit.cloudera.org:8080/20078
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2023-05-03 17:33:09 +02:00
committed by Impala Public Jenkins
parent a98c783d45
commit ce4202a70b
39 changed files with 1755 additions and 391 deletions

View File

@@ -82,14 +82,17 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
const FbFileMetadata* file_metadata = file_desc_->file_metadata;
const FbIcebergMetadata* ice_metadata = file_metadata->iceberg_metadata();
auto transforms = ice_metadata->partition_keys();
if (transforms == nullptr) return;
const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
if (*template_tuple == nullptr) {
*template_tuple = Tuple::Create(tuple_desc->byte_size(), mem_pool);
}
for (const SlotDescriptor* slot_desc : scan_node_->tuple_desc()->slots()) {
if (slot_desc->IsVirtual()) continue;
if (slot_desc->IsVirtual()) {
AddVirtualIcebergColumn(mem_pool, *template_tuple, *ice_metadata, slot_desc);
continue;
}
if (transforms == nullptr) continue;
const SchemaPath& path = slot_desc->col_path();
if (path.size() != 1) continue;
const ColumnDescriptor& col_desc =
@@ -105,7 +108,7 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
const AuxColumnType& aux_type =
scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
if (!text_converter.WriteSlot(slot_desc, &aux_type, *template_tuple,
transform->transform_value()->c_str(),
(const char*)transform->transform_value()->data(),
transform->transform_value()->size(),
true, false,
mem_pool)) {
@@ -114,7 +117,7 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
"column '$0' in file '$1'. Partition string is '$2' "
"NULL Partition key value is '$3'",
col_desc.name(), file_desc_->filename,
transform->transform_value()->c_str(),
transform->transform_value()->data(),
scan_node_->hdfs_table()->null_partition_key_value()));
// Dates are stored as INTs in the partition data in Iceberg, so let's try
// to parse them as INTs.
@@ -122,7 +125,7 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
int32_t* slot = (*template_tuple)->GetIntSlot(slot_desc->tuple_offset());
StringParser::ParseResult parse_result;
*slot = StringParser::StringToInt<int32_t>(
transform->transform_value()->c_str(),
(const char*)transform->transform_value()->data(),
transform->transform_value()->size(),
&parse_result);
if (parse_result == StringParser::ParseResult::PARSE_SUCCESS) {
@@ -141,6 +144,45 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu
}
}
void FileMetadataUtils::AddVirtualIcebergColumn(MemPool* mem_pool, Tuple* template_tuple,
const org::apache::impala::fb::FbIcebergMetadata& ice_metadata,
const SlotDescriptor* slot_desc) {
DCHECK(slot_desc->IsVirtual());
if (slot_desc->virtual_column_type() == TVirtualColumnType::PARTITION_SPEC_ID) {
int32_t* slot = template_tuple->GetIntSlot(slot_desc->tuple_offset());
*slot = ice_metadata.spec_id();
} else if (slot_desc->virtual_column_type() ==
TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) {
auto transforms = ice_metadata.partition_keys();
if (transforms == nullptr) return;
string partitions;
for (int i = 0; i < transforms->size(); ++i) {
using namespace org::apache::impala::fb;
auto transform = transforms->Get(i);
if (transform->transform_type() ==
FbIcebergTransformType::FbIcebergTransformType_VOID) {
continue;
}
stringstream part_ss;
Base64Encode((const char*)transform->transform_value()->data(),
transform->transform_value()->size(), &part_ss);
string part_value(part_ss.str());
if (partitions.empty()) {
partitions = part_value;
} else {
partitions += '.' + part_value;
}
}
StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset());
int len = partitions.length();
char* partition_serialized_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
Ubsan::MemCpy(partition_serialized_copy, partitions.c_str(), len);
slot->ptr = partition_serialized_copy;
slot->len = len;
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
}
}
bool FileMetadataUtils::IsValuePartitionCol(const SlotDescriptor* slot_desc) {
DCHECK(file_desc_ != nullptr);
if (slot_desc->col_pos() < scan_node_->num_partition_keys() &&

View File

@@ -23,6 +23,10 @@
#include <map>
#include <memory>
namespace org::apache::impala::fb {
struct FbIcebergMetadata;
}
namespace impala {
struct HdfsFileDesc;
@@ -68,6 +72,11 @@ private:
void AddIcebergColumns(MemPool* mem_pool, Tuple** template_tuple,
std::map<const SlotId, const SlotDescriptor*>* slot_descs_written);
/// Writes Iceberg-related virtual column values to the template tuple.
void AddVirtualIcebergColumn(MemPool* mem_pool, Tuple* template_tuple,
const org::apache::impala::fb::FbIcebergMetadata& ice_metadata,
const SlotDescriptor* slot_desc);
HdfsScanNodeBase* const scan_node_;
// Members below are set in Open()

View File

@@ -1109,6 +1109,11 @@ bool HdfsScanPlanNode::HasVirtualColumnInTemplateTuple() const {
// We return false at the end of the function if there are no virtual
// columns in the template tuple.
continue;
} else if (sd->virtual_column_type() == TVirtualColumnType::PARTITION_SPEC_ID) {
return true;
} else if (sd->virtual_column_type() ==
TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) {
return true;
} else {
// Adding DCHECK here so we don't forget to update this when adding new virtual
// column.

View File

@@ -71,23 +71,15 @@ DataSink* HdfsTableSinkConfig::CreateSink(RuntimeState* state) const {
new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
}
void HdfsTableSinkConfig::Close() {
ScalarExpr::Close(partition_key_exprs_);
DataSinkConfig::Close();
}
HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
const THdfsTableSink& hdfs_sink, RuntimeState* state)
: TableSinkBase(sink_id, sink_config, "HdfsTableSink", state),
prototype_partition_(nullptr),
skip_header_line_count_(
hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0),
overwrite_(hdfs_sink.overwrite),
input_is_clustered_(hdfs_sink.input_is_clustered),
sort_columns_(hdfs_sink.sort_columns),
sorting_order_((TSortingOrder::type)hdfs_sink.sorting_order),
current_clustered_partition_(nullptr),
partition_key_exprs_(sink_config.partition_key_exprs_),
is_result_sink_(hdfs_sink.is_result_sink) {
if (hdfs_sink.__isset.write_id) {
write_id_ = hdfs_sink.write_id;
@@ -112,9 +104,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
&partition_key_expr_evals_));
// Resolve table id and set input tuple descriptor.
table_desc_ = static_cast<const HdfsTableDescriptor*>(
@@ -129,13 +118,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
staging_dir_ = Substitute("$0/_impala_insert_staging/$1", table_desc_->hdfs_base_dir(),
PrintId(state->query_id(), "_"));
// Prepare partition key exprs and gather dynamic partition key exprs.
for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) {
// Remember non-constant partition key exprs for building hash table of Hdfs files.
if (!partition_key_expr_evals_[i]->root().is_constant()) {
dynamic_partition_key_expr_evals_.push_back(partition_key_expr_evals_[i]);
}
}
// Sanity check.
if (!IsIceberg()) {
DCHECK_LE(partition_key_expr_evals_.size(), table_desc_->num_cols())
@@ -212,10 +194,7 @@ void HdfsTableSink::BuildPartitionDescMap() {
Status HdfsTableSink::Open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(TableSinkBase::Open(state));
DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size());
RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state));
if (!IsIceberg()) BuildPartitionDescMap();
prototype_partition_ = CHECK_NOTNULL(table_desc_->prototype_partition_descriptor());
return Status::OK();
}
@@ -282,25 +261,11 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc
return Status::OK();
}
string HdfsTableSink::GetPartitionName(int i) {
if (IsIceberg()) {
DCHECK_LT(i, partition_key_expr_evals_.size());
return table_desc_->IcebergNonVoidPartitionNames()[i];
} else {
DCHECK_LT(i, table_desc_->num_clustering_cols());
return table_desc_->col_descs()[i].name();
}
}
void HdfsTableSink::ConstructPartitionNames(
void HdfsTableSink::ConstructPartitionInfo(
const TupleRow* row,
string* url_encoded_partition_name,
vector<string>* raw_partition_names,
string* external_partition_name) {
DCHECK(url_encoded_partition_name != nullptr);
DCHECK(external_partition_name != nullptr);
DCHECK(raw_partition_names != nullptr);
DCHECK(raw_partition_names->empty());
OutputPartition* output_partition) {
DCHECK(output_partition != nullptr);
DCHECK(output_partition->raw_partition_names.empty());
stringstream url_encoded_partition_name_ss;
stringstream external_partition_name_ss;
@@ -321,19 +286,7 @@ void HdfsTableSink::ConstructPartitionNames(
raw_partition_key_value_ss << value_str;
// Directory names containing partition-key values need to be UrlEncoded, in
// particular to avoid problems when '/' is part of the key value (which might
// occur, for example, with date strings). Hive will URL decode the value
// transparently when Impala's frontend asks the metastore for partition key values,
// which makes it particularly important that we use the same encoding as Hive. It's
// also not necessary to encode the values when writing partition metadata. You can
// check this with 'show partitions <tbl>' in Hive, followed by a select from a
// decoded partition key value.
string encoded_str;
UrlEncode(value_str, &encoded_str, true);
string part_key_value = (encoded_str.empty() ?
table_desc_->null_partition_key_value() : encoded_str);
// If the string is empty, map it to nullptr (mimicking Hive's behaviour)
string part_key_value = UrlEncodePartitionValue(value_str);
encoded_partition_key_value_ss << part_key_value;
}
if (i < partition_key_expr_evals_.size() - 1) encoded_partition_key_value_ss << "/";
@@ -343,100 +296,15 @@ void HdfsTableSink::ConstructPartitionNames(
external_partition_name_ss << encoded_partition_key_value_ss.str();
}
raw_partition_names->push_back(raw_partition_key_value_ss.str());
output_partition->raw_partition_names.push_back(raw_partition_key_value_ss.str());
}
*url_encoded_partition_name = url_encoded_partition_name_ss.str();
*external_partition_name = external_partition_name_ss.str();
}
Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
OutputPartition* output_partition, bool empty_partition) {
// Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
// etc.
string external_partition_name;
ConstructPartitionNames(row, &output_partition->partition_name,
&output_partition->raw_partition_names, &external_partition_name);
BuildHdfsFileNames(partition_descriptor, output_partition, external_partition_name);
if (ShouldSkipStaging(state, output_partition)) {
// We will be writing to the final file if we're skipping staging, so get a connection
// to its filesystem.
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
output_partition->final_hdfs_file_name_prefix,
&output_partition->hdfs_connection));
} else {
// Else get a connection to the filesystem of the tmp file.
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
output_partition->tmp_hdfs_file_name_prefix, &output_partition->hdfs_connection));
output_partition->partition_name = url_encoded_partition_name_ss.str();
output_partition->external_partition_name = external_partition_name_ss.str();
if (IsIceberg()) {
// Use default partition spec id.
output_partition->iceberg_spec_id = table_desc_->IcebergSpecId();
}
output_partition->partition_descriptor = &partition_descriptor;
if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
stringstream error_msg;
map<int, const char*>::const_iterator i =
_THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
error_msg << "Writing to table format " << i->second << " is not supported.";
return Status(error_msg.str());
}
if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
state->query_options().__isset.compression_codec &&
state->query_options().compression_codec.codec != THdfsCompression::NONE) {
stringstream error_msg;
error_msg << "Writing to compressed text table is not supported. ";
return Status(error_msg.str());
}
// It is incorrect to initialize a writer if there are no rows to feed it. The writer
// could incorrectly create an empty file or empty partition.
// However, for transactional tables we should create a new empty base directory in
// case of INSERT OVERWRITEs.
if (empty_partition && (!overwrite_ || !IsTransactional())) return Status::OK();
switch (partition_descriptor.file_format()) {
case THdfsFileFormat::TEXT:
output_partition->writer.reset(
new HdfsTextTableWriter(
this, state, output_partition, &partition_descriptor, table_desc_));
break;
case THdfsFileFormat::PARQUET:
output_partition->writer.reset(
new HdfsParquetTableWriter(
this, state, output_partition, &partition_descriptor, table_desc_));
break;
default:
stringstream error_msg;
map<int, const char*>::const_iterator i =
_THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
if (i != _THdfsFileFormat_VALUES_TO_NAMES.end()) {
error_msg << "Cannot write to table with format " << i->second << ". "
<< "Impala only supports writing to TEXT and PARQUET.";
} else {
error_msg << "Cannot write to table. Impala only supports writing to TEXT"
<< " and PARQUET tables. (Unknown file format: "
<< partition_descriptor.file_format() << ")";
}
return Status(error_msg.str());
}
RETURN_IF_ERROR(output_partition->writer->Init());
COUNTER_ADD(partitions_created_counter_, 1);
return CreateNewTmpFile(state, output_partition);
}
void HdfsTableSink::GetHashTblKey(const TupleRow* row,
const vector<ScalarExprEvaluator*>& evals, string* key) {
stringstream hash_table_key;
for (int i = 0; i < evals.size(); ++i) {
RawValue::PrintValueAsBytes(
evals[i]->GetValue(row), evals[i]->root().type(), &hash_table_key);
// Additionally append "/" to avoid accidental key collisions.
hash_table_key << "/";
}
*key = hash_table_key.str();
}
inline const HdfsPartitionDescriptor* HdfsTableSink::GetPartitionDescriptor(
@@ -561,7 +429,6 @@ void HdfsTableSink::Close(RuntimeState* state) {
if (!close_status.ok()) state->LogError(close_status.msg());
}
partition_keys_to_output_partitions_.clear();
ScalarExprEvaluator::Close(partition_key_expr_evals_, state);
TableSinkBase::Close(state);
closed_ = true;
}

View File

@@ -30,17 +30,12 @@ namespace impala {
class Expr;
class TupleDescriptor;
class TupleRow;
class RuntimeState;
class MemTracker;
class HdfsTableSinkConfig : public DataSinkConfig {
class HdfsTableSinkConfig : public TableSinkBaseConfig {
public:
DataSink* CreateSink(RuntimeState* state) const override;
void Close() override;
/// Expressions for computing the target partitions to which a row is written.
std::vector<ScalarExpr*> partition_key_exprs_;
~HdfsTableSinkConfig() override {}
@@ -132,32 +127,18 @@ class HdfsTableSink : public TableSinkBase {
/// the PARTITION clause of the INSERT statement.
void BuildPartitionDescMap();
/// Initialises the filenames of a given output partition, and opens the temporary file.
/// The partition key is derived from 'row'. If the partition will not have any rows
/// added to it, empty_partition must be true.
Status InitOutputPartition(RuntimeState* state,
const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT;
/// Constructs the partition name using 'partition_key_expr_evals_'.
/// 'url_encoded_partition_name' is the full partition name in URL encoded form. E.g.:
/// it's "a=12%2F31%2F11/b=10" if we have 2 partition columns "a" and "b", and "a" has
/// Fille 'output_partition' using 'partition_key_expr_evals_'.
/// 'output_partition->partition_name' will contain the full partition name in URL
/// encoded form. E.g.:
/// It's "a=12%2F31%2F11/b=10" if we have 2 partition columns "a" and "b", and "a" has
/// the value of "12/31/11" and "b" has the value of 10. Since this is URL encoded,
/// can be used for paths.
/// 'raw_partition_name' is a vector of partition key-values in a non-encoded format.
/// 'output_partition->raw_partition_names' is a vector of partition key-values in a
/// non-encoded format.
/// Staying with the above example this would hold ["a=12/31/11", "b=10"].
/// 'external_partition_name' is a subset of 'url_encoded_partition_name'.
void ConstructPartitionNames(
void ConstructPartitionInfo(
const TupleRow* row,
string* url_encoded_partition_name,
std::vector<std::string>* raw_partition_names,
string* external_partition_name);
/// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs,
/// evaluated against 'row'. The generated string is much shorter than the full Hdfs
/// file name.
void GetHashTblKey(const TupleRow* row,
const std::vector<ScalarExprEvaluator*>& evals, std::string* key);
OutputPartition* output_partition) override;
/// Returns partition descriptor object for the given key.
const HdfsPartitionDescriptor* GetPartitionDescriptor(const std::string& key);
@@ -173,16 +154,9 @@ class HdfsTableSink : public TableSinkBase {
/// files. The input must be ordered by the partition key expressions.
Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
/// Returns the ith partition name of the table.
std::string GetPartitionName(int i);
/// Returns TRUE for Hive ACID tables.
bool IsHiveAcid() const override { return write_id_ != -1; }
/// The partition descriptor used when creating new partitions from this sink.
/// Currently we don't support multi-format sinks.
const HdfsPartitionDescriptor* prototype_partition_;
/// The 'skip.header.line.count' property of the target Hdfs table. We will insert this
/// many empty lines at the beginning of new text files, which will be skipped by the
/// scanners while reading from the files.
@@ -207,14 +181,6 @@ class HdfsTableSink : public TableSinkBase {
// Represents the sorting order used in SORT BY queries.
const TSortingOrder::type sorting_order_;
/// Stores the current partition during clustered inserts across subsequent row batches.
/// Only set if 'input_is_clustered_' is true.
PartitionPair* current_clustered_partition_;
/// Stores the current partition key during clustered inserts across subsequent row
/// batches. Only set if 'input_is_clustered_' is true.
std::string current_clustered_partition_key_;
/// The directory in which to write intermediate results. Set to
/// <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare()
std::string staging_dir_;
@@ -235,14 +201,6 @@ class HdfsTableSink : public TableSinkBase {
/// OutputPartition in the map to simplify the code.
PartitionMap partition_keys_to_output_partitions_;
/// Expressions for computing the target partitions to which a row is written.
const std::vector<ScalarExpr*>& partition_key_exprs_;
std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;
/// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare().
/// Used for generating the string key of hash_tbl_.
std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_;
/// Map from row key (i.e. concatenated non-constant partition keys) to
/// partition descriptor. We don't own the HdfsPartitionDescriptors, they
/// belong to the table descriptor. The key is generated by GetHashTblKey()

View File

@@ -36,7 +36,7 @@
namespace impala {
HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
HdfsTextTableWriter::HdfsTextTableWriter(TableSinkBase* parent,
RuntimeState* state, OutputPartition* output,
const HdfsPartitionDescriptor* partition,
const HdfsTableDescriptor* table_desc)

View File

@@ -42,7 +42,7 @@ class TupleRow;
/// as delimited text into Hdfs files.
class HdfsTextTableWriter : public HdfsTableWriter {
public:
HdfsTextTableWriter(HdfsTableSink* parent,
HdfsTextTableWriter(TableSinkBase* parent,
RuntimeState* state, OutputPartition* output,
const HdfsPartitionDescriptor* partition,
const HdfsTableDescriptor* table_desc);

View File

@@ -17,14 +17,19 @@
#include "exec/iceberg-delete-sink.h"
#include <boost/algorithm/string.hpp>
#include "common/object-pool.h"
#include "exec/parquet/hdfs-parquet-table-writer.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "kudu/util/url-coding.h"
#include "runtime/descriptors.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/coding-util.h"
#include "util/debug-util.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
@@ -35,12 +40,6 @@
namespace impala {
IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id,
const IcebergDeleteSinkConfig& sink_config, const TIcebergDeleteSink& ice_del_sink,
RuntimeState* state) : TableSinkBase(sink_id, sink_config,
"IcebergDeleteSink", state) {
}
DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
TDataSinkId sink_id = state->fragment().idx;
return state->obj_pool()->Add(
@@ -48,17 +47,23 @@ DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
this->tsink_->table_sink.iceberg_delete_sink, state));
}
void IcebergDeleteSinkConfig::Close() {
DataSinkConfig::Close();
}
Status IcebergDeleteSinkConfig::Init(
const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
DCHECK(tsink_->__isset.table_sink);
DCHECK(tsink_->table_sink.__isset.iceberg_delete_sink);
RETURN_IF_ERROR(
ScalarExpr::Create(tsink_->table_sink.iceberg_delete_sink.partition_key_exprs,
*input_row_desc_, state, &partition_key_exprs_));
return Status::OK();
}
IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id,
const IcebergDeleteSinkConfig& sink_config, const TIcebergDeleteSink& ice_del_sink,
RuntimeState* state) :
TableSinkBase(sink_id, sink_config, "IcebergDeleteSink", state) {
}
Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
@@ -82,59 +87,127 @@ Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tr
Status IcebergDeleteSink::Open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(TableSinkBase::Open(state));
prototype_partition_ = CHECK_NOTNULL(table_desc_->prototype_partition_descriptor());
output_partition_ = make_pair(make_unique<OutputPartition>(), std::vector<int32_t>());
state->dml_exec_state()->AddPartition(
output_partition_.first->partition_name, prototype_partition_->id(),
&table_desc_->hdfs_base_dir(), nullptr);
DCHECK_EQ(partition_key_expr_evals_.size(), dynamic_partition_key_expr_evals_.size());
return Status::OK();
}
Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(profile()->total_time_counter());
expr_results_pool_->Clear();
RETURN_IF_ERROR(state->CheckQueryState());
// We don't do any work for an empty batch.
if (batch->num_rows() == 0) return Status::OK();
if (output_partition_.first->writer == nullptr) {
RETURN_IF_ERROR(InitOutputPartition(state));
// If there are no partition keys then just pass the whole batch to one partition.
if (dynamic_partition_key_expr_evals_.empty()) {
if (current_partition_.first == nullptr) {
RETURN_IF_ERROR(SetCurrentPartition(state, nullptr, ROOT_PARTITION_KEY));
}
RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
} else {
RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch));
}
RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &output_partition_));
return Status();
return Status::OK();
}
Status IcebergDeleteSink::InitOutputPartition(RuntimeState* state) {
// Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
// etc.
stringstream partition_name_ss;
inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
const TupleRow* row, const string& key) {
DCHECK(row != nullptr || key == ROOT_PARTITION_KEY);
PartitionMap::iterator existing_partition;
if (current_partition_.first != nullptr &&
key == current_clustered_partition_key_) {
return Status::OK();
}
// partition_name_ss now holds the unique descriptor for this partition,
output_partition_.first->partition_name = partition_name_ss.str();
BuildHdfsFileNames(*prototype_partition_, output_partition_.first.get(), "");
current_partition_.first.reset(new OutputPartition());
current_partition_.second.clear();
Status status = InitOutputPartition(state, *prototype_partition_, row,
current_partition_.first.get(), false);
if (!status.ok()) {
// We failed to create the output partition successfully. Clean it up now.
if (current_partition_.first->writer != nullptr) {
current_partition_.first->writer->Close();
}
return status;
}
// We will be writing to the final file if we're skipping staging, so get a connection
// to its filesystem.
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
output_partition_.first->final_hdfs_file_name_prefix,
&output_partition_.first->hdfs_connection));
// Save the partition name so that the coordinator can create the partition
// directory structure if needed.
state->dml_exec_state()->AddPartition(
current_partition_.first->partition_name, prototype_partition_->id(),
&table_desc_->hdfs_base_dir(),
nullptr);
return Status::OK();
}
output_partition_.first->partition_descriptor = prototype_partition_;
Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) {
DCHECK_GT(batch->num_rows(), 0);
DCHECK_EQ(partition_key_expr_evals_.size(), 2);
DCHECK(!dynamic_partition_key_expr_evals_.empty());
output_partition_.first->writer.reset(
new HdfsParquetTableWriter(
this, state, output_partition_.first.get(), prototype_partition_, table_desc_));
RETURN_IF_ERROR(output_partition_.first->writer->Init());
COUNTER_ADD(partitions_created_counter_, 1);
return CreateNewTmpFile(state, output_partition_.first.get());
// Initialize the clustered partition and key.
if (current_partition_.first == nullptr) {
TupleRow* current_row = batch->GetRow(0);
GetHashTblKey(current_row, dynamic_partition_key_expr_evals_,
&current_clustered_partition_key_);
RETURN_IF_ERROR(SetCurrentPartition(state, current_row,
current_clustered_partition_key_));
}
// Compare the last row of the batch to the last current partition key. If they match,
// then all the rows in the batch have the same key and can be written as a whole.
string last_row_key;
GetHashTblKey(batch->GetRow(batch->num_rows() - 1),
dynamic_partition_key_expr_evals_, &last_row_key);
if (last_row_key == current_clustered_partition_key_) {
DCHECK(current_partition_.second.empty());
RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
return Status::OK();
}
// Not all rows in this batch match the previously written partition key, so we process
// them individually.
for (int i = 0; i < batch->num_rows(); ++i) {
TupleRow* current_row = batch->GetRow(i);
string key;
GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key);
if (current_clustered_partition_key_ != key) {
DCHECK(current_partition_.first->writer != nullptr);
// Done with previous partition - write rows and close.
if (!current_partition_.second.empty()) {
RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
current_partition_.second.clear();
}
RETURN_IF_ERROR(FinalizePartitionFile(state,
current_partition_.first.get()));
if (current_partition_.first->writer.get() != nullptr) {
current_partition_.first->writer->Close();
}
RETURN_IF_ERROR(SetCurrentPartition(state, current_row, key));
current_clustered_partition_key_ = std::move(key);
}
#ifdef DEBUG
string debug_row_key;
GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &debug_row_key);
DCHECK_EQ(current_clustered_partition_key_, debug_row_key);
#endif
DCHECK(current_partition_.first->writer != nullptr);
current_partition_.second.push_back(i);
}
// Write final set of rows to the partition but keep its file open.
RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
return Status::OK();
}
Status IcebergDeleteSink::FlushFinal(RuntimeState* state) {
DCHECK(!closed_);
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition_.first.get()));
if (current_partition_.first != nullptr) {
RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.first.get()));
}
return Status::OK();
}
@@ -142,23 +215,116 @@ void IcebergDeleteSink::Close(RuntimeState* state) {
if (closed_) return;
SCOPED_TIMER(profile()->total_time_counter());
if (output_partition_.first->writer != nullptr) {
output_partition_.first->writer->Close();
if (current_partition_.first != nullptr) {
if (current_partition_.first->writer != nullptr) {
current_partition_.first->writer->Close();
}
Status close_status = ClosePartitionFile(state, current_partition_.first.get());
if (!close_status.ok()) state->LogError(close_status.msg());
}
Status close_status = ClosePartitionFile(state, output_partition_.first.get());
if (!close_status.ok()) state->LogError(close_status.msg());
output_partition_.first.reset();
current_partition_.first.reset();
TableSinkBase::Close(state);
closed_ = true;
}
void IcebergDeleteSink::ConstructPartitionInfo(
const TupleRow* row,
OutputPartition* output_partition) {
DCHECK(output_partition != nullptr);
DCHECK(output_partition->raw_partition_names.empty());
if (partition_key_expr_evals_.empty()) {
output_partition->iceberg_spec_id = table_desc_->IcebergSpecId();
return;
}
DCHECK_EQ(partition_key_expr_evals_.size(), 2);
ScalarExprEvaluator* spec_id_eval = partition_key_expr_evals_[0];
ScalarExprEvaluator* partitions_eval = partition_key_expr_evals_[1];
int spec_id = spec_id_eval->GetIntVal(row).val;
output_partition->iceberg_spec_id = spec_id;
StringVal partitions_strings_val = partitions_eval->GetStringVal(row);
string partition_values_str((const char*)partitions_strings_val.ptr,
partitions_strings_val.len);
vector<string> non_void_partition_names;
const vector<string>* non_void_partition_names_ptr = nullptr;
if (LIKELY(spec_id == table_desc_->IcebergSpecId())) {
// If 'spec_id' is the default spec id, then point 'non_void_partition_names_ptr'
// to the already existing vector 'table_desc_->IcebergNonVoidPartitionNames()'.
non_void_partition_names_ptr = &table_desc_->IcebergNonVoidPartitionNames();
} else {
// Otherwise collect the non-void partition names belonging to 'spec_id' in
// 'non_void_partition_names' and point 'non_void_partition_names_ptr' to it.
const TIcebergPartitionSpec& partition_spec =
table_desc_->IcebergPartitionSpecs()[spec_id];
for (const TIcebergPartitionField& spec_field : partition_spec.partition_fields) {
if (spec_field.transform.transform_type != TIcebergPartitionTransformType::VOID) {
non_void_partition_names.push_back(spec_field.field_name);
}
}
non_void_partition_names_ptr = &non_void_partition_names;
}
DCHECK(non_void_partition_names_ptr != nullptr);
if (non_void_partition_names_ptr->empty()) {
DCHECK(partition_values_str.empty());
return;
}
vector<string> partition_values_encoded;
boost::split(partition_values_encoded, partition_values_str, boost::is_any_of("."));
vector<string> partition_values_decoded;
partition_values_decoded.reserve(partition_values_encoded.size());
for (const string& encoded_part_val : partition_values_encoded) {
string decoded_val;
bool success = kudu::Base64Decode(encoded_part_val, &decoded_val);
// We encoded it, we must succeed decoding it.
DCHECK(success);
partition_values_decoded.push_back(std::move(decoded_val));
}
DCHECK_EQ(partition_values_decoded.size(), non_void_partition_names_ptr->size());
stringstream url_encoded_partition_name_ss;
stringstream external_partition_name_ss;
for (int i = 0; i < partition_values_decoded.size(); ++i) {
stringstream raw_partition_key_value_ss;
stringstream encoded_partition_key_value_ss;
raw_partition_key_value_ss << (*non_void_partition_names_ptr)[i] << "=";
encoded_partition_key_value_ss << (*non_void_partition_names_ptr)[i] << "=";
string& value_str = partition_values_decoded[i];
raw_partition_key_value_ss << value_str;
string part_key_value = UrlEncodePartitionValue(value_str);
encoded_partition_key_value_ss << part_key_value;
if (i < partition_key_expr_evals_.size() - 1) encoded_partition_key_value_ss << "/";
url_encoded_partition_name_ss << encoded_partition_key_value_ss.str();
output_partition->raw_partition_names.push_back(raw_partition_key_value_ss.str());
}
output_partition->partition_name = url_encoded_partition_name_ss.str();
output_partition->external_partition_name = external_partition_name_ss.str();
}
string IcebergDeleteSink::DebugString() const {
stringstream out;
out << "IcebergDeleteSink("
<< " table_desc=" << table_desc_->DebugString()
<< " output_exprs=" << ScalarExpr::DebugString(output_exprs_)
<< ")";
<< " output_exprs=" << ScalarExpr::DebugString(output_exprs_);
if (!partition_key_exprs_.empty()) {
out << " partition_key_exprs=" << ScalarExpr::DebugString(partition_key_exprs_);
}
out << ")";
return out.str();
}

View File

@@ -30,13 +30,9 @@ class TupleRow;
class RuntimeState;
class MemTracker;
class IcebergDeleteSinkConfig : public DataSinkConfig {
class IcebergDeleteSinkConfig : public TableSinkBaseConfig {
public:
DataSink* CreateSink(RuntimeState* state) const override;
void Close() override;
/// Expressions for computing the target partitions to which a row is written.
std::vector<ScalarExpr*> partition_key_exprs_;
~IcebergDeleteSinkConfig() override {}
@@ -73,15 +69,28 @@ class IcebergDeleteSink : public TableSinkBase {
std::string DebugString() const override;
private:
/// Initialises the filenames of a given output partition, and opens the temporary file.
Status InitOutputPartition(RuntimeState* state) WARN_UNUSED_RESULT;
/// Fills output_partition's partition_name, raw_partition_names and
/// external_partition_name based on the row's columns. In case of partitioned
/// tables 'row' must contain the Iceberg virtual columns PARTITION__SPEC__ID and
/// ICEBERG__PARTITION__SERIALIZED. Every information needed for 'output_partition' can
/// be retrieved from these fields and from the 'table_desc_'.
void ConstructPartitionInfo(
const TupleRow* row,
OutputPartition* output_partition) override;
/// For now we only allow non-partitioned Iceberg tables.
PartitionPair output_partition_;
/// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs
/// files. The input must be ordered by the partition key expressions.
Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
/// The partition descriptor used when creating new partitions from this sink.
/// Currently we don't support multi-format sinks.
const HdfsPartitionDescriptor* prototype_partition_;
/// Sets and initializes the 'current_partition_' based on key. For unpartitioned tables
/// it is only invoked once to initialize the only output partition.
/// For partitioned tables the rows are clustered based on partition data, i.e. when the
/// key changes we initialize a new output partition.
Status SetCurrentPartition(RuntimeState* state, const TupleRow* row,
const std::string& key) WARN_UNUSED_RESULT;
/// The sink writes partitions one-by-one.
PartitionPair current_partition_;
};
}

View File

@@ -67,10 +67,16 @@ struct OutputPartition {
/// value in this member is URL encoded for the sake of e.g. data file name creation.
std::string partition_name;
/// Used when an external Frontend specifies the staging directory and how partitions
/// should be created. See IMPALA-10553 for details.
std::string external_partition_name;
/// This is a split of the 'partition_name' variable by '/'. Note, the partition keys
/// and values in this variable are not URL encoded.
std::vector<std::string> raw_partition_names;
int32_t iceberg_spec_id = -1;
/// Connection to hdfs.
hdfsFS hdfs_connection = nullptr;

View File

@@ -17,8 +17,16 @@
#include "exec/table-sink-base.h"
#include "exec/hdfs-text-table-writer.h"
#include "exec/output-partition.h"
#include "exec/parquet/hdfs-parquet-table-writer.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/coding-util.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
@@ -29,6 +37,11 @@
namespace impala {
void TableSinkBaseConfig::Close() {
ScalarExpr::Close(partition_key_exprs_);
DataSinkConfig::Close();
}
Status TableSinkBase::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
@@ -38,9 +51,35 @@ Status TableSinkBase::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
encode_timer_ = ADD_TIMER(profile(), "EncodeTimer");
hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer");
compress_timer_ = ADD_TIMER(profile(), "CompressTimer");
RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
&partition_key_expr_evals_));
// Prepare partition key exprs and gather dynamic partition key exprs.
for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) {
// Remember non-constant partition key exprs for building hash table of Hdfs files.
if (!partition_key_expr_evals_[i]->root().is_constant()) {
dynamic_partition_key_expr_evals_.push_back(partition_key_expr_evals_[i]);
}
}
return Status::OK();
}
Status TableSinkBase::Open(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::Open(state));
DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size());
RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state));
prototype_partition_ = CHECK_NOTNULL(table_desc_->prototype_partition_descriptor());
return Status::OK();
}
void TableSinkBase::Close(RuntimeState* state) {
ScalarExprEvaluator::Close(partition_key_expr_evals_, state);
DataSink::Close(state);
}
Status TableSinkBase::ClosePartitionFile(
RuntimeState* state, OutputPartition* partition) {
if (partition->tmp_hdfs_file == nullptr) return Status::OK();
@@ -56,9 +95,25 @@ Status TableSinkBase::ClosePartitionFile(
return Status::OK();
}
string TableSinkBase::GetPartitionName(int i) {
if (IsIceberg()) {
DCHECK_LT(i, partition_key_expr_evals_.size());
return table_desc_->IcebergNonVoidPartitionNames()[i];
} else {
DCHECK_LT(i, table_desc_->num_clustering_cols());
return table_desc_->col_descs()[i].name();
}
}
string TableSinkBase::UrlEncodePartitionValue(const string& raw_str) {
string encoded_str;
UrlEncode(raw_str, &encoded_str, true);
return encoded_str.empty() ? table_desc_->null_partition_key_value() : encoded_str;
}
void TableSinkBase::BuildHdfsFileNames(
const HdfsPartitionDescriptor& partition_descriptor,
OutputPartition* output_partition, const string &external_partition_name) {
OutputPartition* output_partition) {
// Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix.
// Path: <hdfs_base_dir>/<partition_values>/<unique_id_str>
@@ -90,7 +145,7 @@ void TableSinkBase::BuildHdfsFileNames(
// We are trusting that the external frontend implementation has done appropriate
// authorization checks on the external output directory.
output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
external_output_dir_, external_partition_name);
external_output_dir_, output_partition->external_partition_name);
} else if (partition_descriptor.location().empty()) {
output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
table_desc_->hdfs_base_dir(), output_partition->partition_name);
@@ -136,6 +191,82 @@ void TableSinkBase::BuildHdfsFileNames(
output_partition->num_files = 0;
}
Status TableSinkBase::InitOutputPartition(RuntimeState* state,
const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
OutputPartition* output_partition, bool empty_partition) {
// Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
// etc.
ConstructPartitionInfo(row, output_partition);
BuildHdfsFileNames(partition_descriptor, output_partition);
if (ShouldSkipStaging(state, output_partition)) {
// We will be writing to the final file if we're skipping staging, so get a connection
// to its filesystem.
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
output_partition->final_hdfs_file_name_prefix,
&output_partition->hdfs_connection));
} else {
// Else get a connection to the filesystem of the tmp file.
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
output_partition->tmp_hdfs_file_name_prefix, &output_partition->hdfs_connection));
}
output_partition->partition_descriptor = &partition_descriptor;
if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
stringstream error_msg;
map<int, const char*>::const_iterator i =
_THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
error_msg << "Writing to table format " << i->second << " is not supported.";
return Status(error_msg.str());
}
if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
state->query_options().__isset.compression_codec &&
state->query_options().compression_codec.codec != THdfsCompression::NONE) {
stringstream error_msg;
error_msg << "Writing to compressed text table is not supported. ";
return Status(error_msg.str());
}
// It is incorrect to initialize a writer if there are no rows to feed it. The writer
// could incorrectly create an empty file or empty partition.
// However, for transactional tables we should create a new empty base directory in
// case of INSERT OVERWRITEs.
if (empty_partition && (!is_overwrite() || !IsTransactional())) return Status::OK();
switch (partition_descriptor.file_format()) {
case THdfsFileFormat::TEXT:
output_partition->writer.reset(
new HdfsTextTableWriter(
this, state, output_partition, &partition_descriptor, table_desc_));
break;
case THdfsFileFormat::ICEBERG:
case THdfsFileFormat::PARQUET:
output_partition->writer.reset(
new HdfsParquetTableWriter(
this, state, output_partition, &partition_descriptor, table_desc_));
break;
default:
stringstream error_msg;
map<int, const char*>::const_iterator i =
_THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
if (i != _THdfsFileFormat_VALUES_TO_NAMES.end()) {
error_msg << "Cannot write to table with format " << i->second << ". "
<< "Impala only supports writing to TEXT and PARQUET.";
} else {
error_msg << "Cannot write to table. Impala only supports writing to TEXT"
<< " and PARQUET tables. (Unknown file format: "
<< partition_descriptor.file_format() << ")";
}
return Status(error_msg.str());
}
RETURN_IF_ERROR(output_partition->writer->Init());
COUNTER_ADD(partitions_created_counter_, 1);
return CreateNewTmpFile(state, output_partition);
}
Status TableSinkBase::CreateNewTmpFile(RuntimeState* state,
OutputPartition* output_partition) {
SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
@@ -263,6 +394,18 @@ Status TableSinkBase::WriteRowsToPartition(
return Status::OK();
}
void TableSinkBase::GetHashTblKey(const TupleRow* row,
const vector<ScalarExprEvaluator*>& evals, string* key) {
stringstream hash_table_key;
for (int i = 0; i < evals.size(); ++i) {
RawValue::PrintValueAsBytes(
evals[i]->GetValue(row), evals[i]->root().type(), &hash_table_key);
// Additionally append "/" to avoid accidental key collisions.
hash_table_key << "/";
}
*key = hash_table_key.str();
}
bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) {
if (IsTransactional() || HasExternalOutputDir() || is_result_sink()) return true;
// We skip staging if we are writing query results

View File

@@ -23,12 +23,25 @@
namespace impala {
class TupleRow;
class TableSinkBaseConfig : public DataSinkConfig {
public:
void Close() override;
/// Expressions for computing the target partitions to which a row is written.
std::vector<ScalarExpr*> partition_key_exprs_;
~TableSinkBaseConfig() override {}
};
class TableSinkBase : public DataSink {
public:
TableSinkBase(TDataSinkId sink_id, const DataSinkConfig& sink_config,
TableSinkBase(TDataSinkId sink_id, const TableSinkBaseConfig& sink_config,
const std::string& name, RuntimeState* state) :
DataSink(sink_id, sink_config, name, state),
table_id_(sink_config.tsink_->table_sink.target_table_id) {}
table_id_(sink_config.tsink_->table_sink.target_table_id),
partition_key_exprs_(sink_config.partition_key_exprs_) {}
virtual bool is_overwrite() const { return false; }
virtual bool is_result_sink() const { return false; }
@@ -45,7 +58,9 @@ public:
return dummy;
}
Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
Status Open(RuntimeState* state) override;
void Close(RuntimeState* state) override;
RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; }
@@ -71,12 +86,36 @@ protected:
virtual bool IsHiveAcid() const { return false; }
virtual void ConstructPartitionInfo(
const TupleRow* row,
OutputPartition* output_partition) = 0;
/// Initialises the filenames of a given output partition, and opens the temporary file.
/// The partition key is derived from 'row'. If the partition will not have any rows
/// added to it, empty_partition must be true.
Status InitOutputPartition(RuntimeState* state,
const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT;
/// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
/// The Hdfs directory is created from the target table's base Hdfs dir,
/// the partition_key_names_ and the evaluated partition_key_exprs_.
/// The Hdfs file name is the unique_id_str_.
void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
OutputPartition* output, const std::string &external_partition_path);
OutputPartition* output);
/// Returns the ith partition name of the table.
std::string GetPartitionName(int i);
// Directory names containing partition-key values need to be UrlEncoded, in
// particular to avoid problems when '/' is part of the key value (which might
// occur, for example, with date strings). Hive will URL decode the value
// transparently when Impala's frontend asks the metastore for partition key values,
// which makes it particularly important that we use the same encoding as Hive. It's
// also not necessary to encode the values when writing partition metadata. You can
// check this with 'show partitions <tbl>' in Hive, followed by a select from a
// decoded partition key value.
std::string UrlEncodePartitionValue(const std::string& raw_str);
/// Add a temporary file to an output partition. Files are created in a
/// temporary directory and then moved to the real partition directory by the
@@ -115,6 +154,12 @@ protected:
/// Returns TRUE if an external output directory was provided.
bool HasExternalOutputDir() { return !external_output_dir_.empty(); }
/// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs,
/// evaluated against 'row'. The generated string is much shorter than the full Hdfs
/// file name.
void GetHashTblKey(const TupleRow* row,
const std::vector<ScalarExprEvaluator*>& evals, std::string* key);
/// Table id resolved in Prepare() to set tuple_desc_;
TableId table_id_;
@@ -125,6 +170,26 @@ protected:
/// Descriptor of target table. Set in Prepare().
const HdfsTableDescriptor* table_desc_ = nullptr;
/// The partition descriptor used when creating new partitions from this sink.
/// Currently we don't support multi-format sinks.
const HdfsPartitionDescriptor* prototype_partition_;
/// Expressions for computing the target partitions to which a row is written.
const std::vector<ScalarExpr*>& partition_key_exprs_;
std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;
/// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare().
/// Used for generating the string key of hash_tbl_.
std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_;
/// Stores the current partition during clustered inserts across subsequent row batches.
/// Only set if 'input_is_clustered_' is true.
PartitionPair* current_clustered_partition_ = nullptr;
/// Stores the current partition key during clustered inserts across subsequent row
/// batches. Only set if 'input_is_clustered_' is true.
std::string current_clustered_partition_key_;
/// The directory in which an external FE expects results to be written to.
std::string external_output_dir_;

View File

@@ -257,9 +257,10 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
if (tdesc.__isset.icebergTable) {
is_iceberg_ = true;
iceberg_table_location_ = tdesc.icebergTable.table_location;
const TIcebergPartitionSpec& spec = tdesc.icebergTable.partition_spec[
tdesc.icebergTable.default_partition_spec_id];
DCHECK_EQ(spec.spec_id, tdesc.icebergTable.default_partition_spec_id);
iceberg_spec_id_ = tdesc.icebergTable.default_partition_spec_id;
iceberg_partition_specs_ = tdesc.icebergTable.partition_spec;
const TIcebergPartitionSpec& spec = iceberg_partition_specs_[iceberg_spec_id_];
DCHECK_EQ(spec.spec_id, iceberg_spec_id_);
for (const TIcebergPartitionField& spec_field : spec.partition_fields) {
if (spec_field.transform.transform_type == TIcebergPartitionTransformType::VOID) {
continue;

View File

@@ -437,6 +437,9 @@ class HdfsTableDescriptor : public TableDescriptor {
bool IsIcebergTable() const { return is_iceberg_; }
const std::string& IcebergTableLocation() const { return iceberg_table_location_; }
const std::vector<TIcebergPartitionSpec>& IcebergPartitionSpecs() const {
return iceberg_partition_specs_;
}
const std::vector<std::string>& IcebergNonVoidPartitionNames() const {
return iceberg_non_void_partition_names_;
}
@@ -454,6 +457,10 @@ class HdfsTableDescriptor : public TableDescriptor {
return iceberg_parquet_dict_page_size_;
}
int32_t IcebergSpecId() const {
return iceberg_spec_id_;
}
virtual std::string DebugString() const;
protected:
@@ -469,11 +476,13 @@ class HdfsTableDescriptor : public TableDescriptor {
TValidWriteIdList valid_write_id_list_;
bool is_iceberg_ = false;
std::string iceberg_table_location_;
std::vector<TIcebergPartitionSpec> iceberg_partition_specs_;
std::vector<std::string> iceberg_non_void_partition_names_;
TCompressionCodec iceberg_parquet_compression_codec_;
int64_t iceberg_parquet_row_group_size_;
int64_t iceberg_parquet_plain_page_size_;
int64_t iceberg_parquet_dict_page_size_;
int32_t iceberg_spec_id_;
};
class HBaseTableDescriptor : public TableDescriptor {

View File

@@ -521,6 +521,7 @@ string createIcebergDataFileString(
FbIcebergDataFileFormat::FbIcebergDataFileFormat_PARQUET,
num_rows,
file_size,
partition.iceberg_spec_id,
fbb.CreateString(partition.partition_name),
fbb.CreateVector(raw_partition_fields),
fbb.CreateVector(ice_col_stats_vec));

View File

@@ -40,13 +40,14 @@ enum FbIcebergTransformType : byte {
table FbIcebergPartitionTransformValue {
transform_type: FbIcebergTransformType;
transform_param: int;
transform_value: string;
transform_value: [ubyte];
source_id: int;
}
table FbIcebergMetadata {
file_format : FbIcebergDataFileFormat;
record_count : long;
spec_id : ushort;
partition_keys : [FbIcebergPartitionTransformValue];
}
@@ -64,6 +65,7 @@ table FbIcebergDataFile {
format: FbIcebergDataFileFormat = PARQUET;
record_count: long = 0;
file_size_in_bytes: long = 0;
spec_id: ushort;
partition_path: string;
raw_partition_fields: [string];
per_column_stats: [FbIcebergColumnStats];

View File

@@ -77,7 +77,9 @@ enum THdfsFileFormat {
enum TVirtualColumnType {
NONE,
INPUT_FILE_NAME,
FILE_POSITION
FILE_POSITION,
PARTITION_SPEC_ID,
ICEBERG_PARTITION_SERIALIZED
}
// TODO: Since compression is also enabled for Kudu columns, we should

View File

@@ -103,7 +103,12 @@ struct THdfsTableSink {
11: optional map<string, i64> parquet_bloom_filter_col_info;
}
// Structure to encapsulate specific options that are passed down to the
// IcebergDeleteSink.
struct TIcebergDeleteSink {
// Partition expressions of this sink. In case of Iceberg DELETEs these are the
// partition spec id and the serialized partition data.
1: required list<Exprs.TExpr> partition_key_exprs
}
// Structure to encapsulate specific options that are passed down to the KuduTableSink

View File

@@ -52,19 +52,23 @@ public class DeleteStmt extends ModifyStmt {
new ArrayList<>(), other.wherePredicate_.clone());
}
@Override
public DataSink createDataSink(List<Expr> resultExprs) {
public DataSink createDataSink() {
// analyze() must have been called before.
Preconditions.checkState(table_ != null);
TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false,
partitionKeyExprs_, resultExprs_, referencedColumns_, false, false,
new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
getKuduTransactionToken(),
0);
maxTableSinks_);
Preconditions.checkState(!referencedColumns_.isEmpty());
return tableSink;
}
public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
}
@Override
public DeleteStmt clone() {
return new DeleteStmt(this);

View File

@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import com.google.common.base.Preconditions;
import java.util.List;
public abstract class DmlStatementBase extends StatementBase {
// Target table of the DML statement.
protected FeTable table_;
// Set in analyze(). Set the limit on the maximum number of table sink instances.
// A value of 0 means no limit.
protected int maxTableSinks_ = 0;
// Serialized metadata of transaction object which is set by the Frontend if the
// target table is Kudu table and Kudu's transaction is enabled.
protected java.nio.ByteBuffer kuduTxnToken_ = null;
protected DmlStatementBase() {}
protected DmlStatementBase(DmlStatementBase other) {
super(other);
table_ = other.table_;
maxTableSinks_ = other.maxTableSinks_;
kuduTxnToken_ = org.apache.thrift.TBaseHelper.copyBinary(other.kuduTxnToken_);
}
@Override
public void reset() {
super.reset();
table_ = null;
kuduTxnToken_ = null;
}
public FeTable getTargetTable() { return table_; }
public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = maxTableSinks; }
public boolean hasShuffleHint() { return false; }
public boolean hasNoShuffleHint() { return false; }
public boolean hasClusteredHint() { return false; }
public boolean hasNoClusteredHint() { return false; }
abstract public List<Expr> getPartitionKeyExprs();
abstract public List<Expr> getSortExprs();
/**
* Return bytes of Kudu transaction token.
*/
public java.nio.ByteBuffer getKuduTransactionToken() {
return kuduTxnToken_;
}
/**
* Set Kudu transaction token.
*/
public void setKuduTransactionToken(byte[] kuduTxnToken) {
Preconditions.checkState(table_ instanceof FeKuduTable);
Preconditions.checkNotNull(kuduTxnToken);
kuduTxnToken_ = java.nio.ByteBuffer.wrap(kuduTxnToken.clone());
}
}

View File

@@ -58,6 +58,8 @@ public class IcebergPartitionSpec extends StmtNode {
return icebergPartitionFields_ != null && (!icebergPartitionFields_.isEmpty());
}
public int getSpecId() { return specId_; }
public int getIcebergPartitionFieldsSize() {
if (!hasPartitionFields()) return 0;
return getIcebergPartitionFields().size();

View File

@@ -67,7 +67,7 @@ import org.apache.impala.util.IcebergUtil;
* Representation of a single insert or upsert statement, including the select statement
* whose results are to be inserted.
*/
public class InsertStmt extends StatementBase {
public class InsertStmt extends DmlStatementBase {
// Determines the location of optional hints. The "Start" option is motivated by
// Oracle's hint placement at the start of the statement and the "End" option places
// the hint right before the query (if specified).
@@ -132,13 +132,6 @@ public class InsertStmt extends StatementBase {
// analysis.
private QueryStmt queryStmt_;
// Set in analyze(). Contains metadata of target table to determine type of sink.
private FeTable table_;
// Set in analyze(). Set the limit on the maximum number of table sink instances.
// A value of 0 means no limit.
private int maxTableSinks_ = 0;
// Set in analyze(). Exprs correspond to the partitionKeyValues, if specified, or to
// the partition columns for Kudu tables.
private List<Expr> partitionKeyExprs_ = new ArrayList<>();
@@ -205,10 +198,6 @@ public class InsertStmt extends StatementBase {
// Set by the Frontend if the target table is transactional.
private long writeId_ = -1;
// Serialized metadata of transaction object which is set by the Frontend if the
// target table is Kudu table and Kudu's transaction is enabled.
private java.nio.ByteBuffer kuduTxnToken_ = null;
// END: Members that need to be reset()
/////////////////////////////////////////
@@ -264,10 +253,8 @@ public class InsertStmt extends StatementBase {
queryStmt_ = other.queryStmt_ != null ? other.queryStmt_.clone() : null;
needsGeneratedQueryStatement_ = other.needsGeneratedQueryStatement_;
columnPermutation_ = other.columnPermutation_;
table_ = other.table_;
isUpsert_ = other.isUpsert_;
writeId_ = other.writeId_;
kuduTxnToken_ = org.apache.thrift.TBaseHelper.copyBinary(other.kuduTxnToken_);
}
@Override
@@ -276,7 +263,6 @@ public class InsertStmt extends StatementBase {
if (withClause_ != null) withClause_.reset();
targetTableName_ = originalTableName_;
queryStmt_.reset();
table_ = null;
partitionKeyExprs_.clear();
partitionColPos_.clear();
hasShuffleHint_ = false;
@@ -290,7 +276,6 @@ public class InsertStmt extends StatementBase {
mentionedColumns_.clear();
primaryKeyExprs_.clear();
writeId_ = -1;
kuduTxnToken_ = null;
}
@Override
@@ -1209,15 +1194,9 @@ public class InsertStmt extends StatementBase {
public List<PlanHint> getPlanHints() { return planHints_; }
public TableName getTargetTableName() { return targetTableName_; }
public FeTable getTargetTable() { return table_; }
public void setTargetTable(FeTable table) { this.table_ = table; }
public boolean isTargetTableKuduTable() { return (table_ instanceof FeKuduTable); }
public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = maxTableSinks; }
public void setWriteId(long writeId) { this.writeId_ = writeId; }
public void setKuduTransactionToken(byte[] kuduTxnToken) {
Preconditions.checkNotNull(kuduTxnToken);
kuduTxnToken_ = java.nio.ByteBuffer.wrap(kuduTxnToken.clone());
}
public boolean isOverwrite() { return overwrite_; }
public TSortingOrder getSortingOrder() { return sortingOrder_; }
@@ -1225,18 +1204,21 @@ public class InsertStmt extends StatementBase {
* Only valid after analysis
*/
public QueryStmt getQueryStmt() { return queryStmt_; }
@Override
public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
public List<Integer> getPartitionColPos() { return partitionColPos_; }
@Override
public boolean hasShuffleHint() { return hasShuffleHint_; }
@Override
public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
@Override
public boolean hasClusteredHint() { return hasClusteredHint_; }
@Override
public boolean hasNoClusteredHint() { return hasNoClusteredHint_; }
public List<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
@Override
public List<Expr> getSortExprs() { return sortExprs_; }
public long getWriteId() { return writeId_; }
public byte[] getKuduTransactionToken() {
return kuduTxnToken_ == null ? null : kuduTxnToken_.array();
}
// Clustering is enabled by default. If the table has a 'sort.columns' property and the
// query has a 'noclustered' hint, we issue a warning during analysis and ignore the

View File

@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
import static java.lang.String.format;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,10 +33,10 @@ import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.IcebergPositionDeleteTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.Pair;
import org.apache.impala.planner.DataSink;
import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TIcebergFileFormat;
@@ -58,11 +57,8 @@ import com.google.common.base.Preconditions;
* match the right-hand side of the assignments in addition to projecting the key columns
* of the underlying table. During query execution, the plan that
* is generated from this SelectStmt produces all rows that need to be modified.
*
* Currently, only Kudu tables can be modified.
*/
public abstract class ModifyStmt extends StatementBase {
public abstract class ModifyStmt extends DmlStatementBase {
// List of explicitly mentioned assignment expressions in the UPDATE's SET clause
protected final List<Pair<SlotRef, Expr>> assignments_;
@@ -77,26 +73,41 @@ public abstract class ModifyStmt extends StatementBase {
protected FromClause fromClause_;
/////////////////////////////////////////
// START: Members that are set in first run of analyze().
// Exprs correspond to the partitionKeyValues, if specified, or to the partition columns
// for tables.
protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
// For every column of the target table that is referenced in the optional
// 'sort.columns' table property, this list will contain the corresponding result expr
// from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
// list is empty, no additional sorting by non-partitioning columns will be performed.
// The column list must not contain partition columns and must be empty for non-Hdfs
// tables.
protected List<Expr> sortExprs_ = new ArrayList<>();
// Output expressions that produce the final results to write to the target table. May
// include casts. Set in first run of analyze().
//
// In case of DELETE statements it contains the columns that identify the deleted rows.
protected List<Expr> resultExprs_ = new ArrayList<>();
// Result of the analysis of the internal SelectStmt that produces the rows that
// will be modified.
protected SelectStmt sourceStmt_;
// Target table.
protected FeTable table_;
// Implementation of the modify statement. Depends on the target table type.
private ModifyImpl modifyImpl_;
// Serialized metadata of transaction object which is set by the Frontend if the
// target table is Kudu table and Kudu's transaction is enabled.
java.nio.ByteBuffer kuduTxnToken_;
// END: Members that need to be reset()
/////////////////////////////////////////
// Position mapping of output expressions of the sourceStmt_ to column indices in the
// target table. The i'th position in this list maps to the referencedColumns_[i]'th
// position in the target table. Set in createSourceStmt() during analysis.
protected List<Integer> referencedColumns_;
protected List<Integer> referencedColumns_ = new ArrayList<>();
// END: Members that are set in first run of analyze
/////////////////////////////////////////
// SQL string of the ModifyStmt. Set in analyze().
protected String sqlString_;
@@ -195,10 +206,14 @@ public abstract class ModifyStmt extends StatementBase {
super.reset();
fromClause_.reset();
if (sourceStmt_ != null) sourceStmt_.reset();
table_ = null;
modifyImpl_ = null;
}
@Override
public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
@Override
public List<Expr> getSortExprs() { return sortExprs_; }
@Override
public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
return sourceStmt_.resolveTableMask(analyzer);
@@ -217,8 +232,7 @@ public abstract class ModifyStmt extends StatementBase {
throws AnalysisException {
// Builds the select list and column position mapping for the target table.
ArrayList<SelectListItem> selectList = new ArrayList<>();
referencedColumns_ = new ArrayList<>();
buildAndValidateAssignmentExprs(analyzer, selectList, referencedColumns_);
buildAndValidateAssignmentExprs(analyzer, selectList);
// Analyze the generated select statement.
sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, wherePredicate_,
@@ -241,7 +255,7 @@ public abstract class ModifyStmt extends StatementBase {
* are always prepended to the list of expression representing the assignments.
*/
private void buildAndValidateAssignmentExprs(Analyzer analyzer,
List<SelectListItem> selectList, List<Integer> referencedColumns)
List<SelectListItem> selectList)
throws AnalysisException {
// The order of the referenced columns equals the order of the result expressions
Set<SlotId> uniqueSlots = new HashSet<>();
@@ -254,7 +268,7 @@ public abstract class ModifyStmt extends StatementBase {
colIndexMap.put(cols.get(i).getName(), i);
}
modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns, uniqueSlots,
modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
keySlots, colIndexMap);
// Assignments are only used in the context of updates.
@@ -304,7 +318,7 @@ public abstract class ModifyStmt extends StatementBase {
c, rhsExpr, analyzer.isDecimalV2(), null /*widestTypeSrcExpr*/);
uniqueSlots.add(lhsSlotRef.getSlotId());
selectList.add(new SelectListItem(rhsExpr, null));
referencedColumns.add(colIndexMap.get(c.getName()));
referencedColumns_.add(colIndexMap.get(c.getName()));
}
}
@@ -326,7 +340,6 @@ public abstract class ModifyStmt extends StatementBase {
}
public QueryStmt getQueryStmt() { return sourceStmt_; }
public abstract DataSink createDataSink(List<Expr> resultExprs);
/**
* Return true if the target table is Kudu table.
@@ -334,28 +347,26 @@ public abstract class ModifyStmt extends StatementBase {
*/
public boolean isTargetTableKuduTable() { return (table_ instanceof FeKuduTable); }
/**
* Return target table.
*/
public FeTable getTargetTable() { return table_; }
/**
* Set Kudu transaction token.
*/
public void setKuduTransactionToken(byte[] kuduTxnToken) {
Preconditions.checkState(table_ instanceof FeKuduTable);
Preconditions.checkNotNull(kuduTxnToken);
kuduTxnToken_ = java.nio.ByteBuffer.wrap(kuduTxnToken.clone());
}
/**
* Return bytes of Kudu transaction token.
*/
public java.nio.ByteBuffer getKuduTransactionToken() {
return kuduTxnToken_;
}
private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
throws AnalysisException {
SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
keySlots, colIndexMap, colName);
resultExprs_.add(ref);
if (isSortingColumn) sortExprs_.add(ref);
}
private void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
keySlots, colIndexMap, colName);
partitionKeyExprs_.add(ref);
sortExprs_.add(ref);
}
private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), colName);
@@ -365,6 +376,7 @@ public abstract class ModifyStmt extends StatementBase {
uniqueSlots.add(ref.getSlotId());
keySlots.add(ref.getSlotId());
referencedColumns.add(colIndexMap.get(colName));
return ref;
}
@Override
@@ -408,7 +420,7 @@ public abstract class ModifyStmt extends StatementBase {
// Add the key columns as slot refs
for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
colIndexMap, k);
colIndexMap, k, false);
}
}
}
@@ -426,6 +438,7 @@ public abstract class ModifyStmt extends StatementBase {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
if (ModifyStmt.this instanceof UpdateStmt) {
throw new AnalysisException("UPDATE is not supported for Iceberg table " +
originalTargetTable_.getFullName());
@@ -443,11 +456,6 @@ public abstract class ModifyStmt extends StatementBase {
"Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
}
if (originalTargetTable_.isPartitioned()) {
throw new AnalysisException("Cannot execute DELETE/UPDATE statement on " +
"partitioned Iceberg table: " + originalTargetTable_.getFullName());
}
if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
throw new AnalysisException("Impala can only write delete files in PARQUET, " +
"but the given table uses a different file format: " +
@@ -471,15 +479,22 @@ public abstract class ModifyStmt extends StatementBase {
List<Integer> referencedColumns,
Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
throws AnalysisException {
if (originalTargetTable_.isPartitioned()) {
String[] partitionCols;
partitionCols = new String[] {"PARTITION__SPEC__ID",
"ICEBERG__PARTITION__SERIALIZED"};
for (String k : partitionCols) {
addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
keySlots, colIndexMap, k);
}
}
String[] deleteCols;
Preconditions.checkState(!icePosDelTable_.isPartitioned());
deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
// Add the key columns as slot refs
for (String k : deleteCols) {
addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
colIndexMap, k);
colIndexMap, k, true);
}
}
}
}

View File

@@ -61,7 +61,6 @@ public class UpdateStmt extends ModifyStmt {
/**
* Return an instance of a KuduTableSink specialized as an Update operation.
*/
@Override
public DataSink createDataSink(List<Expr> resultExprs) {
// analyze() must have been called before.
Preconditions.checkState(table_ != null);

View File

@@ -177,6 +177,13 @@ public interface FeIcebergTable extends FeFsTable {
*/
int getDefaultPartitionSpecId();
default IcebergPartitionSpec getPartitionSpec(int specId) {
for (IcebergPartitionSpec spec : getPartitionSpecs()) {
if (spec.getSpecId() == specId) return spec;
}
return null;
}
default int getFormatVersion() {
return ((BaseTable)getIcebergApiTable()).operations().current().formatVersion();
}

View File

@@ -443,6 +443,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
private void addVirtualColumns() {
addVirtualColumn(VirtualColumn.INPUT_FILE_NAME);
addVirtualColumn(VirtualColumn.FILE_POSITION);
addVirtualColumn(VirtualColumn.PARTITION_SPEC_ID);
addVirtualColumn(VirtualColumn.ICEBERG_PARTITION_SERIALIZED);
}
@Override
@@ -522,6 +524,13 @@ public class IcebergTable extends Table implements FeIcebergTable {
Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new HashMap<>();
TGetPartialCatalogObjectResponse resp =
getHdfsTable().getPartialInfo(req, missingPartialInfos);
if (resp.table_info != null) {
// Clear HdfsTable virtual columns and add IcebergTable virtual columns.
resp.table_info.unsetVirtual_columns();
for (VirtualColumn vCol : getVirtualColumns()) {
resp.table_info.addToVirtual_columns(vCol.toThrift());
}
}
if (req.table_info_selector.want_iceberg_table) {
resp.table_info.setIceberg_table(Utils.getTIcebergTable(this));
if (!resp.table_info.isSetNetwork_addresses()) {

View File

@@ -33,15 +33,25 @@ import com.google.common.base.Preconditions;
public class VirtualColumn extends Column {
private final TVirtualColumnType virtualColType_;
// Virtual columns of FileSystem-based tables.
public static VirtualColumn INPUT_FILE_NAME = new VirtualColumn("INPUT__FILE__NAME",
Type.STRING, TVirtualColumnType.INPUT_FILE_NAME);
public static VirtualColumn FILE_POSITION = new VirtualColumn("FILE__POSITION",
Type.BIGINT, TVirtualColumnType.FILE_POSITION);
/// Iceberg-related virtual columns.
public static VirtualColumn PARTITION_SPEC_ID = new VirtualColumn("PARTITION__SPEC__ID",
Type.INT, TVirtualColumnType.PARTITION_SPEC_ID);
public static VirtualColumn ICEBERG_PARTITION_SERIALIZED = new
VirtualColumn("ICEBERG__PARTITION__SERIALIZED", Type.BINARY,
TVirtualColumnType.ICEBERG_PARTITION_SERIALIZED);
public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) {
switch (virtColType) {
case INPUT_FILE_NAME: return INPUT_FILE_NAME;
case FILE_POSITION: return FILE_POSITION;
case PARTITION_SPEC_ID: return PARTITION_SPEC_ID;
case ICEBERG_PARTITION_SERIALIZED: return ICEBERG_PARTITION_SERIALIZED;
default: break;
}
return null;

View File

@@ -22,11 +22,14 @@ import java.util.List;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.DeleteStmt;
import org.apache.impala.analysis.DmlStatementBase;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.common.ImpalaException;
@@ -176,6 +179,13 @@ public class DistributedPlanner {
return exprs.isEmpty() ? 1 : Expr.getNumDistinctValues(exprs);
}
public PlanFragment createInsertFragment(
PlanFragment inputFragment, DmlStatementBase dmlStmt, Analyzer analyzer,
List<PlanFragment> fragments)
throws ImpalaException {
return createDmlFragment(inputFragment, dmlStmt, analyzer, fragments);
}
/**
* Decides whether to repartition the output of 'inputFragment' before feeding
* its data into the table sink of the given 'insertStmt'. The decision obeys
@@ -188,17 +198,17 @@ public class DistributedPlanner {
* query option is used. If this functions ends up creating a new fragment, appends
* that to 'fragments'.
*/
public PlanFragment createInsertFragment(
PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer,
public PlanFragment createDmlFragment(
PlanFragment inputFragment, DmlStatementBase dmlStmt, Analyzer analyzer,
List<PlanFragment> fragments)
throws ImpalaException {
boolean isComputeCost = analyzer.getQueryOptions().isCompute_processing_cost();
boolean enforce_hdfs_writer_limit = insertStmt.getTargetTable() instanceof FeFsTable
boolean enforce_hdfs_writer_limit = dmlStmt.getTargetTable() instanceof FeFsTable
&& (analyzer.getQueryOptions().getMax_fs_writers() > 0 || isComputeCost);
if (insertStmt.hasNoShuffleHint() && !enforce_hdfs_writer_limit) return inputFragment;
if (dmlStmt.hasNoShuffleHint() && !enforce_hdfs_writer_limit) return inputFragment;
List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
List<Expr> partitionExprs = Lists.newArrayList(dmlStmt.getPartitionKeyExprs());
// Ignore constants for the sake of partitioning.
Expr.removeConstants(partitionExprs);
@@ -208,7 +218,7 @@ public class DistributedPlanner {
if (!partitionExprs.isEmpty()
&& analyzer.setsHaveValueTransfer(inputPartition.getPartitionExprs(),
partitionExprs, true)
&& !(insertStmt.getTargetTable() instanceof FeKuduTable)
&& !(dmlStmt.getTargetTable() instanceof FeKuduTable)
&& !enforce_hdfs_writer_limit) {
return inputFragment;
}
@@ -259,7 +269,7 @@ public class DistributedPlanner {
maxHdfsWriters = costBasedMaxWriter;
}
Preconditions.checkState(maxHdfsWriters > 0);
insertStmt.setMaxTableSinks(maxHdfsWriters);
dmlStmt.setMaxTableSinks(maxHdfsWriters);
// At this point, parallelism of writer fragment is fixed and will not be adjusted
// by costing phase.
@@ -283,8 +293,8 @@ public class DistributedPlanner {
}
// Make a cost-based decision only if no user hint was supplied.
if (!insertStmt.hasShuffleHint()) {
if (insertStmt.getTargetTable() instanceof FeKuduTable) {
if (!dmlStmt.hasShuffleHint()) {
if (dmlStmt.getTargetTable() instanceof FeKuduTable) {
// If the table is unpartitioned or all of the partition exprs are constants,
// don't insert the exchange.
// TODO: make a more sophisticated decision here for partitioned tables and when
@@ -347,9 +357,10 @@ public class DistributedPlanner {
} else {
partition = DataPartition.UNPARTITIONED;
}
} else if (insertStmt.getTargetTable() instanceof FeKuduTable) {
} else if (dmlStmt instanceof InsertStmt &&
dmlStmt.getTargetTable() instanceof FeKuduTable) {
partition = DataPartition.kuduPartitioned(
KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
KuduUtil.createPartitionExpr((InsertStmt)dmlStmt, ctx_.getRootAnalyzer()));
} else {
partition = DataPartition.hashPartitioned(partitionExprs);
}

View File

@@ -44,9 +44,13 @@ public class IcebergDeleteSink extends TableSink {
// A value of 0 means no limit.
private int maxHdfsSinks_;
public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> outputExprs,
int maxTableSinks) {
// Exprs for computing the output partition(s).
protected final List<Expr> partitionKeyExprs_;
public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs,
List<Expr> outputExprs, int maxTableSinks) {
super(targetTable, Op.DELETE, outputExprs);
partitionKeyExprs_ = partitionKeyExprs;
maxHdfsSinks_ = maxTableSinks;
}
@@ -95,7 +99,11 @@ public class IcebergDeleteSink extends TableSink {
targetTable_.getFullName()));
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(detailPrefix + "output exprs: ")
.append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
.append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
if (!partitionKeyExprs_.isEmpty()) {
output.append(detailPrefix + "partition keys: ")
.append(Expr.getExplainString(partitionKeyExprs_, explainLevel) + "\n");
}
}
}
@@ -106,7 +114,8 @@ public class IcebergDeleteSink extends TableSink {
@Override
protected void toThriftImpl(TDataSink tsink) {
TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(
Expr.treesToThrift(partitionKeyExprs_));
TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
TTableSinkType.HDFS, sinkOp_.toThrift());
tTableSink.iceberg_delete_sink = icebergDeleteSink;
@@ -121,6 +130,7 @@ public class IcebergDeleteSink extends TableSink {
@Override
public void collectExprs(List<Expr> exprs) {
exprs.addAll(partitionKeyExprs_);
exprs.addAll(outputExprs_);
}

View File

@@ -161,7 +161,7 @@ public class Planner {
insertStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
if (!ctx_.isSingleNodeExec()) {
// repartition on partition keys
rootFragment = distributedPlanner.createInsertFragment(
rootFragment = distributedPlanner.createDmlFragment(
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
}
// Add optional sort node to the plan, based on clustered/noclustered plan hint.
@@ -180,12 +180,14 @@ public class Planner {
// Set up delete sink for root fragment
DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();
if (deleteStmt.getTargetTable() instanceof FeIcebergTable) {
if (!ctx_.isSingleNodeExec()) {
// repartition on partition keys
rootFragment = distributedPlanner.createDmlFragment(
rootFragment, deleteStmt, ctx_.getRootAnalyzer(), fragments);
}
createPreDeleteSort(deleteStmt, rootFragment, ctx_.getRootAnalyzer());
SortNode sortNode = (SortNode)rootFragment.getPlanRoot();
resultExprs = Expr.substituteList(resultExprs,
sortNode.getSortInfo().getOutputSmap(), ctx_.getRootAnalyzer(), true);
}
rootFragment.setSink(deleteStmt.createDataSink(resultExprs));
rootFragment.setSink(deleteStmt.createDataSink());
} else if (ctx_.isQuery()) {
rootFragment.setSink(
ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs));
@@ -909,7 +911,7 @@ public class Planner {
Analyzer analyzer) throws ImpalaException {
List<Expr> orderingExprs = new ArrayList<>();
orderingExprs.addAll(deleteStmt.getResultExprs());
orderingExprs.addAll(deleteStmt.getSortExprs());
// Build sortinfo to sort by the ordering exprs.
List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
@@ -918,6 +920,7 @@ public class Planner {
TSortingOrder.LEXICAL);
sortInfo.createSortTupleInfo(deleteStmt.getResultExprs(), analyzer);
sortInfo.getSortTupleDescriptor().materializeSlots();
deleteStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
PlanNode node = SortNode.createTotalSortNode(
ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);

View File

@@ -134,7 +134,8 @@ public abstract class TableSink extends DataSink {
return new HdfsTableSink(table, partitionKeyExprs,outputExprs, overwrite,
inputIsClustered, sortProperties, writeId, maxTableSinks, isResultSink);
} else if (sinkAction == Op.DELETE) {
return new IcebergDeleteSink((FeIcebergTable)table, outputExprs, maxTableSinks);
return new IcebergDeleteSink((FeIcebergTable)table, partitionKeyExprs,
outputExprs, maxTableSinks);
} else {
// We don't support any other sink actions yet.
Preconditions.checkState(false);

View File

@@ -45,6 +45,7 @@ import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.TableLoadingException;
@@ -345,7 +346,9 @@ public class IcebergCatalogOpExecutor {
for (ByteBuffer buf : deleteFilesFb) {
FbIcebergDataFile deleteFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id());
PartitionSpec partSpec = nativeIcebergTable.specs().get(deleteFile.specId());
IcebergPartitionSpec impPartSpec = feIcebergTable.getPartitionSpec(
deleteFile.specId());
Metrics metrics = buildDataFileMetrics(deleteFile);
FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(partSpec)
.ofPositionDeletes()
@@ -354,6 +357,9 @@ public class IcebergCatalogOpExecutor {
.withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(deleteFile.format()))
.withRecordCount(deleteFile.recordCount())
.withFileSizeInBytes(deleteFile.fileSizeInBytes());
IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
partSpec.partitionType(), impPartSpec, deleteFile);
if (partitionData != null) builder.withPartition(partitionData);
rowDelta.addDeletes(builder.build());
}
try {
@@ -381,8 +387,12 @@ public class IcebergCatalogOpExecutor {
}
for (ByteBuffer buf : dataFilesFb) {
FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
Preconditions.checkState(dataFile.specId() == icebergOp.getSpec_id());
int specId = icebergOp.getSpec_id();
PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id());
PartitionSpec partSpec = nativeIcebergTable.specs().get(specId);
IcebergPartitionSpec impPartSpec =
feIcebergTable.getPartitionSpec(specId);
Metrics metrics = buildDataFileMetrics(dataFile);
DataFiles.Builder builder =
DataFiles.builder(partSpec)
@@ -392,8 +402,7 @@ public class IcebergCatalogOpExecutor {
.withRecordCount(dataFile.recordCount())
.withFileSizeInBytes(dataFile.fileSizeInBytes());
IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile(
partSpec.partitionType(),
feIcebergTable.getDefaultPartitionSpec(), dataFile);
partSpec.partitionType(), impPartSpec, dataFile);
if (partitionData != null) builder.withPartition(partitionData);
batchWrite.addFile(builder.build());
}

View File

@@ -935,6 +935,7 @@ public class IcebergUtil {
if (fileFormat != -1) {
FbIcebergMetadata.addFileFormat(fbb, fileFormat);
}
FbIcebergMetadata.addSpecId(fbb, cf.specId());
FbIcebergMetadata.addRecordCount(fbb, cf.recordCount());
if (partKeysOffset != -1) {
FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset);

View File

@@ -193,3 +193,270 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
predicates: FILE__POSITION = id
row-size=28B cardinality=1
====
delete from iceberg_v2_partitioned_position_deletes where id = 20;
---- PLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
03:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| row-size=40B cardinality=2
|
|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: id = 20
row-size=40B cardinality=2
---- DISTRIBUTEDPLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
06:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
| row-size=40B cardinality=2
|
|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
| |
| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: id = 20
row-size=40B cardinality=2
====
delete from iceberg_v2_partitioned_position_deletes where action = 'click';
---- PLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
03:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=6
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| row-size=36B cardinality=6
|
|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=1 size=3.15KB
| row-size=204B cardinality=3
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=1 size=1.15KB
skipped Iceberg predicates: action = 'click'
row-size=36B cardinality=6
---- DISTRIBUTEDPLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
04:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=6
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
| row-size=36B cardinality=6
|
|--03:EXCHANGE [BROADCAST]
| |
| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=1 size=3.15KB
| row-size=204B cardinality=3
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=1 size=1.15KB
skipped Iceberg predicates: action = 'click'
row-size=36B cardinality=6
====
delete from iceberg_v2_partitioned_position_deletes where user like 'A%';
---- PLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
03:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| row-size=48B cardinality=2
|
|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: `user` LIKE 'A%'
row-size=48B cardinality=2
---- DISTRIBUTEDPLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
06:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
| row-size=48B cardinality=2
|
|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
| |
| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: `user` LIKE 'A%'
row-size=48B cardinality=2
====
delete from iceberg_v2_partitioned_position_deletes
where id = (select max(id) from iceberg_v2_delete_positional);
---- PLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
08:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=20
|
07:HASH JOIN [LEFT SEMI JOIN]
| hash predicates: id = max(id)
| runtime filters: RF000 <- max(id)
| row-size=40B cardinality=20
|
|--06:AGGREGATE [FINALIZE]
| | output: max(id)
| | row-size=8B cardinality=1
| |
| 05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| | row-size=28B cardinality=3
| |
| |--04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete]
| | HDFS partitions=1/1 files=1 size=1.54KB
| | row-size=182B cardinality=1
| |
| 03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
| HDFS partitions=1/1 files=1 size=662B
| row-size=28B cardinality=3
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| row-size=40B cardinality=20
|
|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
runtime filters: RF000 -> id
row-size=40B cardinality=20
---- DISTRIBUTEDPLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
15:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=20
|
14:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
|
07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
| hash predicates: id = max(id)
| runtime filters: RF000 <- max(id)
| row-size=40B cardinality=20
|
|--13:EXCHANGE [BROADCAST]
| |
| 12:AGGREGATE [FINALIZE]
| | output: max:merge(id)
| | row-size=8B cardinality=1
| |
| 11:EXCHANGE [UNPARTITIONED]
| |
| 06:AGGREGATE
| | output: max(id)
| | row-size=8B cardinality=1
| |
| 05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
| | row-size=28B cardinality=3
| |
| |--10:EXCHANGE [BROADCAST]
| | |
| | 04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete]
| | HDFS partitions=1/1 files=1 size=1.54KB
| | row-size=182B cardinality=1
| |
| 03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
| HDFS partitions=1/1 files=1 size=662B
| row-size=28B cardinality=3
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
| row-size=40B cardinality=20
|
|--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
| |
| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
08:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
runtime filters: RF000 -> id
row-size=40B cardinality=20
====
DELETE FROM iceberg_v2_partitioned_position_deletes WHERE FILE__POSITION = id
---- PLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
03:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
| row-size=40B cardinality=2
|
|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: FILE__POSITION = id
row-size=40B cardinality=2
---- DISTRIBUTEDPLAN
DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
|
06:SORT
| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
| row-size=36B cardinality=2
|
05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
|
02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
| row-size=40B cardinality=2
|
|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
| |
| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
| HDFS partitions=1/1 files=3 size=9.47KB
| row-size=204B cardinality=10
|
03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
|
00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
HDFS partitions=1/1 files=3 size=3.48KB
predicates: FILE__POSITION = id
row-size=40B cardinality=2
====

View File

@@ -0,0 +1,409 @@
====
---- QUERY
CREATE TABLE id_part (i int, s string)
PARTITIONED BY SPEC (i)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
====
---- QUERY
# Delete from empty table is no-op.
DELETE FROM id_part where i = 1;
SELECT * FROM id_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
SHOW FILES IN id_part;
---- RESULTS
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
INSERT INTO id_part VALUES(1, 'one'), (2, 'two'), (3, 'three');
DELETE FROM id_part WHERE i = 2;
SELECT * FROM id_part;
---- RESULTS
1,'one'
3,'three'
---- TYPES
INT,STRING
====
---- QUERY
INSERT INTO id_part VALUES(4, 'four'), (5, 'five'), (6, 'six');
DELETE FROM id_part WHERE i % 2 = 0;
SELECT * FROM id_part;
---- RESULTS
1,'one'
3,'three'
5,'five'
---- TYPES
INT,STRING
====
---- QUERY
SHOW FILES IN id_part;
---- RESULTS: VERIFY_IS_SUBSET
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=1/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=3/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=5/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/(?!delete-).*.parq','.*B','','NONE'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
DELETE FROM id_part WHERE strleft(s, 1) = 'o';
SELECT * FROM id_part;
---- RESULTS
5,'five'
3,'three'
---- TYPES
INT,STRING
====
---- QUERY
DELETE FROM id_part WHERE i > 0;
SELECT * FROM id_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
INSERT INTO id_part VALUES (null, 'null');
SELECT * FROM id_part;
---- RESULTS
NULL,'null'
---- TYPES
INT,STRING
====
---- QUERY
DELETE FROM id_part where s = 'null';
SELECT * FROM id_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
SHOW FILES IN id_part;
---- RESULTS: VERIFY_IS_SUBSET
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=1/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=1/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=3/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=3/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=5/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=5/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=__HIVE_DEFAULT_PARTITION__/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=__HIVE_DEFAULT_PARTITION__/(?!delete-).*.parq','.*B','','NONE'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
CREATE TABLE trunc_part (i int, s string)
PARTITIONED BY SPEC (truncate(1, s))
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
====
---- QUERY
# Delete from empty table is no-op.
DELETE FROM trunc_part where i = 1;
SELECT * FROM trunc_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
INSERT INTO trunc_part VALUES(1, 'one'), (2, 'two'), (3, 'three');
DELETE FROM trunc_part WHERE s like 't%';
SELECT * FROM trunc_part;
---- RESULTS
1,'one'
---- TYPES
INT,STRING
====
---- QUERY
INSERT INTO trunc_part VALUES(4, 'four'), (5, 'five'), (6, 'six');
DELETE FROM trunc_part WHERE i % 2 = 0;
SELECT * FROM trunc_part;
---- RESULTS
1,'one'
5,'five'
---- TYPES
INT,STRING
====
---- QUERY
DELETE FROM trunc_part WHERE i > 0;
SELECT * FROM trunc_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
INSERT INTO trunc_part VALUES (0, null);
SELECT * FROM trunc_part;
---- RESULTS
0,'NULL'
---- TYPES
INT,STRING
====
---- QUERY
DELETE FROM trunc_part where s is null;
SELECT * FROM trunc_part;
---- RESULTS
---- TYPES
INT,STRING
====
---- QUERY
SHOW FILES IN trunc_part;
---- RESULTS: VERIFY_IS_SUBSET
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=f/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=o/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=o/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=s/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=s/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=t/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=t/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=t/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=__HIVE_DEFAULT_PARTITION__/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/trunc_part/data/s_trunc=__HIVE_DEFAULT_PARTITION__/(?!delete-).*.parq','.*B','','NONE'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
CREATE TABLE multi_part (i int, s string, f double)
PARTITIONED BY SPEC (bucket(3, i), truncate(1, s))
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
====
---- QUERY
# Delete from empty table is no-op.
DELETE FROM multi_part where i = 1;
SELECT * FROM multi_part;
---- RESULTS
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
INSERT INTO multi_part VALUES(1, 'one', 1.1), (2, 'two', 2.2), (3, 'three', 3.33);
DELETE FROM multi_part
WHERE i != (select min(i) from multi_part) and
i != (select max(i) from multi_part);
SELECT * FROM multi_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
INSERT INTO multi_part VALUES(4, 'four', 4.4), (5, 'five', 5.5), (6, 'six', 6.6);
DELETE FROM multi_part WHERE i % 2 = 0;
SELECT * FROM multi_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
5,'five',5.5
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
DELETE FROM multi_part WHERE i > 0;
SELECT * FROM multi_part;
---- RESULTS
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
INSERT INTO multi_part VALUES (null, 'null',0.0);
SELECT * FROM multi_part;
---- RESULTS
NULL,'null',0.0
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
DELETE FROM multi_part where s = 'null';
SELECT * FROM multi_part;
---- RESULTS
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
SHOW FILES IN multi_part;
---- RESULTS: VERIFY_IS_SUBSET
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=0/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=0/s_trunc=f/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=0/s_trunc=t/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=0/s_trunc=t/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=0/s_trunc=t/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=f/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=o/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=o/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=s/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=2/s_trunc=s/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=__HIVE_DEFAULT_PARTITION__/s_trunc=n/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/multi_part/data/i_bucket=__HIVE_DEFAULT_PARTITION__/s_trunc=n/(?!delete-).*.parq','.*B','','NONE'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
CREATE TABLE evolve_part (i int, s string, f double)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
====
---- QUERY
INSERT INTO evolve_part VALUES(1, 'one', 1.1), (2, 'two', 2.2), (3, 'three', 3.33);
DELETE FROM evolve_part WHERE i = 2;
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
ALTER TABLE evolve_part SET PARTITION SPEC (i);
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
INSERT INTO evolve_part VALUES (10, 'ten', 10.10), (20, 'twenty', 20.20);
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
20,'twenty',20.20
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
DELETE FROM evolve_part WHERE i = 20;
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
ALTER TABLE evolve_part SET PARTITION SPEC (truncate(1, s));
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
INSERT INTO evolve_part VALUES (30, 'thirty', 30.30), (40, 'forty', 40.40), (50, 'fifty', 50.50);
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
30,'thirty',30.30
40,'forty',40.40
50,'fifty',50.50
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
DELETE FROM evolve_part WHERE i = 50;
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
30,'thirty',30.30
40,'forty',40.40
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
ALTER TABLE evolve_part SET PARTITION SPEC (void(s));
SELECT * FROM evolve_part;
---- RESULTS
1,'one',1.1
3,'three',3.33
10,'ten',10.10
30,'thirty',30.30
40,'forty',40.40
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
DELETE FROM evolve_part WHERE length(s) < 4;
SELECT * FROM evolve_part;
---- RESULTS
3,'three',3.33
30,'thirty',30.3
40,'forty',40.4
---- TYPES
INT,STRING,DOUBLE
====
---- QUERY
SHOW FILES IN evolve_part;
---- RESULTS: VERIFY_IS_SUBSET
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/i=10/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/i=10/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/i=20/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/i=20/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/s_trunc=f/delete-.*parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/s_trunc=f/(?!delete-).*.parq','.*B','','NONE'
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/evolve_part/data/s_trunc=t/(?!delete-).*.parq','.*B','','NONE'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
CREATE TABLE ice_store_sales PARTITIONED BY SPEC (ss_sold_date_sk)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2')
AS SELECT * FROM tpcds_parquet.store_sales;
SELECT count(*) FROM ice_store_sales;
---- RESULTS
2880404
---- TYPES
BIGINT
====
---- QUERY
SELECT count(*) FROM ice_store_sales where ss_customer_sk % 10 = 0;
---- RESULTS
278906
---- TYPES
BIGINT
====
---- QUERY
DELETE FROM ice_store_sales where ss_customer_sk % 10 = 0;
SELECT count(*) FROM ice_store_sales;
---- RESULTS
2601498
---- TYPES
BIGINT
====
---- QUERY
SELECT * FROM ice_store_sales where ss_customer_sk % 10 = 0;
---- RESULTS
---- TYPES
INT, BIGINT, INT, INT, INT, INT, INT, INT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, INT
====

View File

@@ -727,11 +727,3 @@ delete from ice_delete where i = 1;
---- CATCH
AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: $DATABASE.ice_delete
====
---- QUERY
# Cannot delete from partitioned Iceberg table.
alter table ice_delete set tblproperties ('write.delete.mode'='merge-on-read');
alter table ice_delete set partition spec (bucket(5, i));
delete from ice_delete;
---- CATCH
AnalysisException: Cannot execute DELETE/UPDATE statement on partitioned Iceberg table: $DATABASE.ice_delete
====

View File

@@ -0,0 +1,211 @@
====
---- QUERY
select partition__spec__id, iceberg__partition__serialized, * from iceberg_alltypes_part;
---- RESULTS
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh',1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh',2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
---- TYPES
INT, BINARY, INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
====
---- QUERY
select *, base64decode(split_part(p, '.', 1)), base64decode(split_part(p, '.', 2)), base64decode(split_part(p, '.', 3)), base64decode(split_part(p, '.', 4)),
base64decode(split_part(p, '.', 5)), base64decode(split_part(p, '.', 6)), base64decode(split_part(p, '.', 7)), base64decode(split_part(p, '.', 8))
from (select partition__spec__id, cast(iceberg__partition__serialized as STRING) p from iceberg_alltypes_part) v;
---- RESULTS
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh','true','1','11','1.1','2.222','123.321','19045','impala'
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh','true','1','11','1.1','2.222','123.321','19045','impala'
---- TYPES
INT, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
====
---- QUERY
select *, base64decode(split_part(p, '.', 1)), base64decode(split_part(p, '.', 2)), base64decode(split_part(p, '.', 3)), base64decode(split_part(p, '.', 4)),
base64decode(split_part(p, '.', 5)), base64decode(split_part(p, '.', 6)), base64decode(split_part(p, '.', 7)), base64decode(split_part(p, '.', 8))
from (select partition__spec__id, cast(iceberg__partition__serialized as STRING) p from iceberg_alltypes_part_orc) v;
---- RESULTS
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh','true','1','11','1.1','2.222','123.321','19045','impala'
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh','true','1','11','1.1','2.222','123.321','19045','impala'
---- TYPES
INT, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
====
---- QUERY
select partition__spec__id, iceberg__partition__serialized, * from iceberg_alltypes_part where i = 2;
---- RESULTS
0,'dHJ1ZQ==.MQ==.MTE=.MS4x.Mi4yMjI=.MTIzLjMyMQ==.MTkwNDU=.aW1wYWxh',2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
---- TYPES
INT, BINARY, INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
====
---- QUERY
set TIMEZONE='Europe/Budapest';
select partition__spec__id, iceberg__partition__serialized, * from iceberg_non_partitioned where id < 5;
---- RESULTS
0,'',4,'Alex','view',2020-01-01 09:00:00
0,'',1,'Alex','view',2020-01-01 09:00:00
0,'',3,'Alan','click',2020-01-01 10:00:00
0,'',2,'Lisa','download',2020-01-01 11:00:00
---- TYPES
INT, BINARY, INT, STRING, STRING, TIMESTAMP
====
---- QUERY
set TIMEZONE='Europe/Budapest';
select partition__spec__id, iceberg__partition__serialized,
base64decode(split_part(cast(iceberg__partition__serialized as string), '.', 2)) p, *
from iceberg_partitioned;
---- RESULTS
0,'NDM4Mjk3.Y2xpY2s=','click',10,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=','click',12,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk2.dmlldw==','view',20,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',7,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',5,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',2,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk2.dmlldw==','view',11,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk3.Y2xpY2s=','click',3,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=','click',9,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk2.dmlldw==','view',17,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk3.Y2xpY2s=','click',13,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk2.dmlldw==','view',1,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==','view',4,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==','view',15,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',16,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',14,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk2.dmlldw==','view',6,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk3.Y2xpY2s=','click',18,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=','download',8,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk2.dmlldw==','view',19,'Alex','view',2020-01-01 09:00:00
---- TYPES
INT, BINARY, STRING, INT, STRING, STRING, TIMESTAMP
====
---- QUERY
select partition__spec__id, iceberg__partition__serialized,
base64decode(split_part(cast(iceberg__partition__serialized as string), '.', 1)), *
from iceberg_partitioned_orc_external
where base64decode(split_part(cast(iceberg__partition__serialized as string), '.', 1)) in ('click', 'download');
---- RESULTS
0,'Y2xpY2s=','click',10,'Alan','click'
0,'Y2xpY2s=','click',12,'Alan','click'
0,'ZG93bmxvYWQ=','download',7,'Lisa','download'
0,'ZG93bmxvYWQ=','download',5,'Lisa','download'
0,'ZG93bmxvYWQ=','download',2,'Lisa','download'
0,'Y2xpY2s=','click',3,'Alan','click'
0,'Y2xpY2s=','click',9,'Alan','click'
0,'Y2xpY2s=','click',13,'Alan','click'
0,'ZG93bmxvYWQ=','download',16,'Lisa','download'
0,'ZG93bmxvYWQ=','download',14,'Lisa','download'
0,'Y2xpY2s=','click',18,'Alan','click'
0,'ZG93bmxvYWQ=','download',8,'Lisa','download'
---- TYPES
INT, BINARY, STRING, INT, STRING, STRING
====
---- QUERY
set TIMEZONE='Europe/Budapest';
select partition__spec__id, iceberg__partition__serialized, * from iceberg_partitioned
where partition__spec__id = 0;
---- RESULTS
0,'NDM4Mjk4.ZG93bmxvYWQ=',5,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk2.dmlldw==',20,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==',11,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==',19,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==',1,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk3.Y2xpY2s=',9,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=',14,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk2.dmlldw==',17,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk3.Y2xpY2s=',10,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=',16,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk3.Y2xpY2s=',12,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk2.dmlldw==',4,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=',8,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk3.Y2xpY2s=',3,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',13,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk2.dmlldw==',15,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk2.dmlldw==',6,'Alex','view',2020-01-01 09:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=',2,'Lisa','download',2020-01-01 11:00:00
0,'NDM4Mjk3.Y2xpY2s=',18,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk4.ZG93bmxvYWQ=',7,'Lisa','download',2020-01-01 11:00:00
---- TYPES
INT, BINARY, INT, STRING, STRING, TIMESTAMP
====
---- QUERY
set TIMEZONE='Europe/Budapest';
select partition__spec__id, iceberg__partition__serialized, * from iceberg_partitioned
where iceberg__partition__serialized = cast('NDM4Mjk3.Y2xpY2s=' as BINARY);
---- RESULTS
0,'NDM4Mjk3.Y2xpY2s=',12,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',3,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',13,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',18,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',9,'Alan','click',2020-01-01 10:00:00
0,'NDM4Mjk3.Y2xpY2s=',10,'Alan','click',2020-01-01 10:00:00
---- TYPES
INT, BINARY, INT, STRING, STRING, TIMESTAMP
====
---- QUERY
select *, base64decode(split_part(p, '.', 1)), base64decode(split_part(p, '.', 2)),
base64decode(split_part(p, '.', 3))
from (select partition__spec__id, cast(iceberg__partition__serialized as string) p, count(*)
from iceberg_partition_evolution
where year=2009 and month=1 group by 1,2) v
---- RESULTS
0,'MjAwOQ==.MDEyMg==',10,'2009','0122',''
0,'MjAwOQ==.MDEwMQ==',10,'2009','0101',''
1,'MjAwOQ==.MDEzMQ==.MQ==',10,'2009','0131','1'
0,'MjAwOQ==.MDExMw==',10,'2009','0113',''
0,'MjAwOQ==.MDEzMQ==',10,'2009','0131',''
1,'MjAwOQ==.MDEwNg==.MQ==',10,'2009','0106','1'
1,'MjAwOQ==.MDEyMA==.MQ==',10,'2009','0120','1'
1,'MjAwOQ==.MDEwMg==.MQ==',10,'2009','0102','1'
1,'MjAwOQ==.MDEwNQ==.MQ==',10,'2009','0105','1'
1,'MjAwOQ==.MDEwNA==.MQ==',10,'2009','0104','1'
1,'MjAwOQ==.MDEyMw==.MQ==',10,'2009','0123','1'
0,'MjAwOQ==.MDEyNQ==',10,'2009','0125',''
0,'MjAwOQ==.MDEyNA==',10,'2009','0124',''
1,'MjAwOQ==.MDExNA==.MQ==',10,'2009','0114','1'
0,'MjAwOQ==.MDExOA==',10,'2009','0118',''
0,'MjAwOQ==.MDEyOQ==',10,'2009','0129',''
0,'MjAwOQ==.MDEwMg==',10,'2009','0102',''
0,'MjAwOQ==.MDExMA==',10,'2009','0110',''
0,'MjAwOQ==.MDEyMQ==',10,'2009','0121',''
1,'MjAwOQ==.MDEzMA==.MQ==',10,'2009','0130','1'
1,'MjAwOQ==.MDEwOQ==.MQ==',10,'2009','0109','1'
1,'MjAwOQ==.MDEwOA==.MQ==',10,'2009','0108','1'
0,'MjAwOQ==.MDEyOA==',10,'2009','0128',''
0,'MjAwOQ==.MDExOQ==',10,'2009','0119',''
0,'MjAwOQ==.MDEwNg==',10,'2009','0106',''
0,'MjAwOQ==.MDEwNA==',10,'2009','0104',''
0,'MjAwOQ==.MDExNg==',10,'2009','0116',''
0,'MjAwOQ==.MDExNA==',10,'2009','0114',''
1,'MjAwOQ==.MDEyNQ==.MQ==',10,'2009','0125','1'
1,'MjAwOQ==.MDEyMQ==.MQ==',10,'2009','0121','1'
1,'MjAwOQ==.MDEyNg==.MQ==',10,'2009','0126','1'
1,'MjAwOQ==.MDEyMg==.MQ==',10,'2009','0122','1'
1,'MjAwOQ==.MDEwMQ==.MQ==',10,'2009','0101','1'
1,'MjAwOQ==.MDExMw==.MQ==',10,'2009','0113','1'
0,'MjAwOQ==.MDEwNw==',10,'2009','0107',''
0,'MjAwOQ==.MDExNQ==',10,'2009','0115',''
1,'MjAwOQ==.MDExNQ==.MQ==',10,'2009','0115','1'
0,'MjAwOQ==.MDEwOA==',10,'2009','0108',''
1,'MjAwOQ==.MDExOQ==.MQ==',10,'2009','0119','1'
1,'MjAwOQ==.MDExOA==.MQ==',10,'2009','0118','1'
0,'MjAwOQ==.MDEzMA==',10,'2009','0130',''
0,'MjAwOQ==.MDExMg==',10,'2009','0112',''
0,'MjAwOQ==.MDEyMw==',10,'2009','0123',''
0,'MjAwOQ==.MDEyMA==',10,'2009','0120',''
0,'MjAwOQ==.MDEwMw==',10,'2009','0103',''
0,'MjAwOQ==.MDExMQ==',10,'2009','0111',''
0,'MjAwOQ==.MDEwOQ==',10,'2009','0109',''
1,'MjAwOQ==.MDEyOA==.MQ==',10,'2009','0128','1'
1,'MjAwOQ==.MDEyOQ==.MQ==',10,'2009','0129','1'
1,'MjAwOQ==.MDEwNw==.MQ==',10,'2009','0107','1'
1,'MjAwOQ==.MDEyNA==.MQ==',10,'2009','0124','1'
1,'MjAwOQ==.MDEwMw==.MQ==',10,'2009','0103','1'
1,'MjAwOQ==.MDEyNw==.MQ==',10,'2009','0127','1'
0,'MjAwOQ==.MDEyNw==',10,'2009','0127',''
0,'MjAwOQ==.MDEyNg==',10,'2009','0126',''
1,'MjAwOQ==.MDExMg==.MQ==',10,'2009','0112','1'
1,'MjAwOQ==.MDExNg==.MQ==',10,'2009','0116','1'
1,'MjAwOQ==.MDExNw==.MQ==',10,'2009','0117','1'
1,'MjAwOQ==.MDExMQ==.MQ==',10,'2009','0111','1'
0,'MjAwOQ==.MDEwNQ==',10,'2009','0105',''
1,'MjAwOQ==.MDExMA==.MQ==',10,'2009','0110','1'
0,'MjAwOQ==.MDExNw==',10,'2009','0117',''
---- TYPES
INT, STRING, BIGINT, STRING, STRING, STRING
====

View File

@@ -1149,6 +1149,35 @@ class TestIcebergV2Table(IcebergTestSuite):
self.run_test_case('QueryTest/iceberg-delete', vector,
unique_database)
def test_delete_partitioned(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-delete-partitioned', vector,
unique_database)
if IS_HDFS:
self._partitioned_hive_tests(unique_database)
def _partitioned_hive_tests(self, db):
# Hive needs table property 'format-version' explicitly set
for tbl in ["id_part", "trunc_part", "multi_part", "evolve_part", "ice_store_sales"]:
self.run_stmt_in_hive(
"ALTER TABLE {}.{} SET TBLPROPERTIES('format-version'='2')".format(db, tbl))
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "id_part"))
assert hive_output == "id_part.i,id_part.s\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "trunc_part"))
assert hive_output == "trunc_part.i,trunc_part.s\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "multi_part"))
assert hive_output == "multi_part.i,multi_part.s,multi_part.f\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "evolve_part"))
assert hive_output == \
"evolve_part.i,evolve_part.s,evolve_part.f\n3,three,3.33\n" \
"30,thirty,30.3\n40,forty,40.4\n"
hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}.{}".format(
db, "ice_store_sales"))
assert hive_output == "_c0\n2601498\n"
@SkipIfFS.hive
def test_delete_hive_read(self, vector, unique_database):
ice_delete = unique_database + ".ice_delete"

View File

@@ -169,6 +169,28 @@ class TestScannersVirtualColumns(ImpalaTestSuite):
self.run_test_case('QueryTest/mixing-virtual-columns', vector, unique_database)
class TestIcebergVirtualColumns(ImpalaTestSuite):
BATCH_SIZES = [0, 1, 16]
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestIcebergVirtualColumns, cls).add_test_dimensions()
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_dimension(cls.create_table_info_dimension('pairwise'))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('batch_size', *TestScannersAllTableFormats.BATCH_SIZES))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def test_partition_columns(self, vector):
"""Tests partition-level Iceberg-only virtual columns."""
self.run_test_case('QueryTest/iceberg-virtual-partition-columns', vector)
# Test all the scanners with a simple limit clause. The limit clause triggers
# cancellation in the scanner code paths.
class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):