IMPALA-13074: Add sink node to Web UI's graphical plan for DDL/DML queries

From now on if a plan fragment's root data sink is a table sink, a
multi data sink or a merge sink, it will be included in the json
response and shown on the Web UI as parent of the plan fragment.

Testing
 * adopted and refined impala-http-handler-test
 * added new tests for related sink types
 * tested manually on WebUI with
   - CTAS statements
   - UPDATE statements on Iceberg tables
   - DELETE statements on Iceberg tables
   - MERGE statements on Iceberg tables

Change-Id: Ib2bd442f6499efde7406d87c2b1fd1b46a45381b
Reviewed-on: http://gerrit.cloudera.org:8080/22496
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Noemi Pap-Takacs <npaptakacs@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
This commit is contained in:
Daniel Vanko
2025-02-18 18:45:15 +01:00
committed by Zoltan Borok-Nagy
parent 8d56eea725
commit c446291ccf
3 changed files with 561 additions and 117 deletions

View File

@@ -85,9 +85,9 @@ class ImpalaHttpHandlerTest : public testing::Test {
if (id % 2 == 0) {
node_summary.__set_is_broadcast(true);
}
addStatsToSummary(id, &node_summary);
addStatsToSummary(id + 1, &node_summary);
if (id % 3 == 0) {
addStatsToSummary(id + 1, &node_summary);
addStatsToSummary(id + 2, &node_summary);
}
return node_summary;
}
@@ -95,12 +95,12 @@ class ImpalaHttpHandlerTest : public testing::Test {
// Should be kept synced with the logic in createTPlanNodeExecSummary.
static void checkPlanNodeStats(const Value& plan_node, TPlanNodeId id) {
ASSERT_TRUE(plan_node.IsObject());
int output_card = (id + 1) * 2;
int output_card = (id + 2) * 2;
if (id % 3 == 0) {
if (id % 2 == 0) {
output_card = (id + 2) * 2;
output_card = (id + 3) * 2;
} else {
output_card += (id + 2) * 2;
output_card += (id + 3) * 2;
}
}
EXPECT_EQ(plan_node["output_card"].GetInt(), output_card);
@@ -117,7 +117,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
EXPECT_FALSE(plan_node.HasMember("is_broadcast"));
}
int max_time = id % 3 == 0 ? (id + 2) * 2000 : (id + 1) * 2000;
int max_time = id % 3 == 0 ? (id + 3) * 2000 : (id + 2) * 2000;
EXPECT_EQ(plan_node["max_time_val"].GetInt(), max_time);
EXPECT_EQ(
@@ -144,6 +144,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
TDataSink sink;
TDataStreamSink stream_sink;
stream_sink.__set_dest_node_id(dest_node_id);
sink.__set_label("EXCHANGE SENDER");
sink.__set_type(TDataSinkType::DATA_STREAM_SINK);
sink.__set_stream_sink(stream_sink);
return sink;
@@ -152,6 +153,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
static TDataSink createPlanRootSink() {
TDataSink sink;
TPlanRootSink plan_root_sink;
sink.__set_label("PLAN-ROOT SINK");
sink.__set_type(TDataSinkType::PLAN_ROOT_SINK);
sink.__set_plan_root_sink(plan_root_sink);
return sink;
@@ -161,13 +163,44 @@ class ImpalaHttpHandlerTest : public testing::Test {
TDataSink sink;
TTableSink table_sink;
THdfsTableSink hdfs_table_sink;
table_sink.__set_action(TSinkAction::INSERT);
table_sink.__set_type(TTableSinkType::HDFS);
table_sink.__set_hdfs_table_sink(hdfs_table_sink);
sink.__set_label("HDFS WRITER");
sink.__set_type(TDataSinkType::TABLE_SINK);
sink.__set_table_sink(table_sink);
return sink;
}
static TDataSink createIcebergDeleteTableSink() {
TDataSink sink;
TTableSink table_sink;
TIcebergDeleteSink iceberg_delete_sink;
table_sink.__set_action(TSinkAction::DELETE);
table_sink.__set_type(TTableSinkType::HDFS);
table_sink.__set_iceberg_delete_sink(iceberg_delete_sink);
sink.__set_label("ICEBERG BUFFERED DELETER");
sink.__set_type(TDataSinkType::TABLE_SINK);
sink.__set_table_sink(table_sink);
return sink;
}
static TDataSink createMultiDataSink(const vector<TDataSink>& child_sinks) {
TDataSink sink;
sink.__set_type(TDataSinkType::MULTI_DATA_SINK);
sink.__set_label("MULTI DATA SINK");
sink.__set_child_data_sinks(child_sinks);
return sink;
}
static TDataSink createMergeSink(const vector<TDataSink>& child_sinks) {
TDataSink sink;
sink.__set_type(TDataSinkType::MERGE_SINK);
sink.__set_label("MERGE SINK");
sink.__set_child_data_sinks(child_sinks);
return sink;
}
static void addSinkToFragment(
const TDataSink& sink, TPlanFragment* fragment, TExecSummary* summary) {
fragment->__set_output_sink(sink);
@@ -258,22 +291,22 @@ const string ImpalaHttpHandlerTest::PLAN_SCHEMA_JSON = R"({
// {
// "label": "01:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 4,
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "3.000us",
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 4,
// "output_card": 6,
// "num_instances": 1,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "4.000us",
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [],
// "data_stream_target": "01:EXCHANGE"
// }
@@ -321,24 +354,35 @@ static void prepareSelectStatement(const vector<string>& labels,
// {
// "plan_nodes": [
// {
// "label": "01:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 4,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "3.000us",
// "children": []
// "label": "HDFS WRITER",
// "label_detail": "INSERT",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "01:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// }
// ]
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 4,
// "output_card": 6,
// "num_instances": 1,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "4.000us",
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [],
// "data_stream_target": "01:EXCHANGE"
// }
@@ -370,8 +414,7 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels,
fragments->push_back(fragment00);
}
// Prepares query plan for a join statement with one fragment and the join node having
// two children.
// Prepares query plan for a join statement with three fragments.
//
// F02:PLAN FRAGMENT
// |
@@ -396,42 +439,42 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels,
// {
// "label": "04:EXCHANGE",
// "label_detail": "UNPARTITIONED",
// "output_card": 4,
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "3.000us",
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// },
// {
// "label": "02:HASH_JOIN",
// "label_detail": "INNER JOIN, BROADCAST",
// "output_card": 4,
// "output_card": 6,
// "num_instances": 1,
// "max_time": "4.000us",
// "max_time_val": 4000,
// "avg_time": "4.000us",
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [
// {
// "label": "01:SCAN HDFS",
// "label_detail": "default.table1 t1",
// "output_card": 6,
// "output_card": 8,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "max_time": "8.000us",
// "max_time_val": 8000,
// "avg_time": "8.000us",
// "children": []
// },
// {
// "label": "03:EXCHANGE",
// "label_detail": "BROADCAST",
// "output_card": 18,
// "output_card": 22,
// "num_instances": 2,
// "max_time": "10.000us",
// "max_time_val": 10000,
// "avg_time": "9.000us",
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "11.000us",
// "children": []
// }
// ],
@@ -440,12 +483,12 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels,
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table2 t2",
// "output_card": 10,
// "output_card": 12,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "10.000us",
// "max_time_val": 10000,
// "avg_time": "10.000us",
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "12.000us",
// "children": [],
// "data_stream_target": "03:EXCHANGE"
// }
@@ -494,6 +537,194 @@ static void prepareJoinStatement(const vector<string>& labels,
fragments->push_back(fragment01);
}
// Prepares query plan for an Iceberg update statement. Root sink will be a multi data
// sink, consisting of an HDFS table sink and an Iceberg delete table sink.
//
// F00:PLAN FRAGMENT
// |
// MULTI DATA SINK
// |->WRITE TO HDFS
// |
// |->BUFFERED DELETE FROM ICEBERG
// |
// 00:SCAN HDFS
//
// Expected JSON output for the plan will look like this:
// {
// "plan_nodes": [
// {
// "label": "MULTI DATA SINK",
// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.table",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": []
// }
// ]
// }
// ]
// }
static void prepareUpdateStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(label_details.size(), 1);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink multi_data_sink = ImpalaHttpHandlerTest::createMultiDataSink(
{ImpalaHttpHandlerTest::createHdfsTableSink(),
ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
ImpalaHttpHandlerTest::addSinkToFragment(multi_data_sink, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 0, &fragment00, summary);
fragments->push_back(fragment00);
}
// Prepares query plan for an Iceberg merge statement. The root sink will be a
// merge sink, consisting of an HDFS writer and an Iceberg buffered delete sink.
//
// F00:PLAN FRAGMENT
// |
// MERGE SINK
// |->WRITE TO HDFS
// |
// |->BUFFERED DELETE FROM ICEBERG
// |
// 03:MERGE
// |
// 02:HASH JOIN
// |
// |--04:EXCHANGE
// | |
// | F01:PLAN FRAGMENT
// | 01:SCAN HDFS
// |
// 00:SCAN HDFS [functional_parquet.target, RANDOM]
//
// Expected JSON output for the plan will look like this:
// {
// "plan_nodes": [
// {
// "label": "MERGE SINK",
// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
// "output_card": 2,
// "num_instances": 1,
// "max_time": "2.000us",
// "max_time_val": 2000,
// "avg_time": "2.000us",
// "children": [
// {
// "label": "03:MERGE",
// "label_detail": "",
// "output_card": 6,
// "num_instances": 2,
// "is_broadcast": true,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "5.000us",
// "children": [
// {
// "label": "02:HASH JOIN",
// "label_detail": "INNER JOIN, BROADCAST",
// "output_card": 6,
// "num_instances": 1,
// "max_time": "6.000us",
// "max_time_val": 6000,
// "avg_time": "6.000us",
// "children": [
// {
// "label": "04:EXCHANGE",
// "label_detail": "BROADCAST",
// "output_card": 8,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "8.000us",
// "max_time_val": 8000,
// "avg_time": "8.000us",
// "children": []
// },
// {
// "label": "00:SCAN HDFS",
// "label_detail": "default.target",
// "output_card": 22,
// "num_instances": 2,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "11.000us",
// "children": []
// }
// ]
// }
// ]
// }
// ]
// },
// {
// "label": "01:SCAN HDFS",
// "label_detail": "default.source",
// "output_card": 12,
// "num_instances": 1,
// "is_broadcast": true,
// "max_time": "12.000us",
// "max_time_val": 12000,
// "avg_time": "12.000us",
// "children": [],
// "data_stream_target": "04:EXCHANGE"
// }
// ]
// }
static void prepareMergeStatement(const vector<string>& labels,
const vector<string>& label_details, vector<TPlanFragment>* fragments,
TExecSummary* summary) {
ASSERT_EQ(labels.size(), 5);
ASSERT_EQ(label_details.size(), 5);
// F00:PLAN FRAGMENT
TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
TDataSink merge_sink = ImpalaHttpHandlerTest::createMergeSink(
{ImpalaHttpHandlerTest::createHdfsTableSink(),
ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
ImpalaHttpHandlerTest::addSinkToFragment(merge_sink, &fragment00, summary);
// 03:MERGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(0), label_details.at(0), 1, &fragment00, summary);
// 02:HASH JOIN
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(1), label_details.at(1), 2, &fragment00, summary);
// 04:EXCHANGE
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(2), label_details.at(2), 0, &fragment00, summary);
// 00:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(3), label_details.at(3), 0, &fragment00, summary);
fragments->push_back(fragment00);
// F01:PLAN FRAGMENT
TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(1);
TDataSink stream_sink =
ImpalaHttpHandlerTest::createStreamSink(fragment00.plan.nodes[2].node_id);
ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment01, summary);
// 01:SCAN HDFS
ImpalaHttpHandlerTest::addNodeToFragment(
labels.at(4), label_details.at(4), 0, &fragment01, summary);
fragments->push_back(fragment01);
}
} // namespace impala
TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
@@ -512,8 +743,9 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
validatePlanSchema(document);
// Check the two nodes in the plan
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 2);
ASSERT_EQ(array.Size(), 2);
for (size_t i = 0; i < array.Size(); ++i) {
EXPECT_EQ(array[i]["label"].GetString(), SELECT_LABELS.at(i));
EXPECT_EQ(array[i]["label_detail"].GetString(), SELECT_LABEL_DETAILS.at(i));
@@ -522,6 +754,10 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
EXPECT_EQ(array[i]["children"].Size(), 0);
}
// The data_stream_target of 00:SCAN HDFS should point to the 01:EXCHANGE node
ASSERT_TRUE(array[1].HasMember("data_stream_target"));
EXPECT_EQ(array[1]["data_stream_target"].GetString(), SELECT_LABELS[0]);
}
TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) {
@@ -543,14 +779,37 @@ TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) {
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 2);
for (size_t i = 0; i < array.Size(); ++i) {
EXPECT_EQ(array[i]["label"].GetString(), CTAS_LABELS.at(i));
EXPECT_EQ(array[i]["label_detail"].GetString(), CTAS_LABEL_DETAILS.at(i));
checkPlanNodeStats(array[i], i);
// Check the HDFS WRITER node
ASSERT_GE(array.Size(), 1);
auto& hdfs_writer_node = array[0];
EXPECT_STREQ(hdfs_writer_node["label"].GetString(), "HDFS WRITER");
EXPECT_STREQ(hdfs_writer_node["label_detail"].GetString(), "INSERT");
EXPECT_EQ(array[i]["children"].Size(), 0);
}
checkPlanNodeStats(hdfs_writer_node, -1);
// Check 01:EXCHANGE, which is the child of HDFS WRITER node
EXPECT_EQ(hdfs_writer_node["children"].Size(), 1);
auto& exch01_node = hdfs_writer_node["children"][0];
EXPECT_EQ(exch01_node["label"].GetString(), CTAS_LABELS[0]);
EXPECT_EQ(exch01_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[0]);
checkPlanNodeStats(exch01_node, 0);
EXPECT_EQ(exch01_node["children"].Size(), 0);
// Check the 00:SCAN HDFS node
ASSERT_GE(array.Size(), 2);
auto& scan_node = array[1];
EXPECT_EQ(scan_node["label"].GetString(), CTAS_LABELS[1]);
EXPECT_EQ(scan_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[1]);
checkPlanNodeStats(scan_node, 1);
EXPECT_EQ(scan_node["children"].Size(), 0);
// The data_stream_target should point to the 01:EXCHANGE node
ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), CTAS_LABELS[0]);
}
TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
@@ -575,6 +834,7 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
EXPECT_EQ(array.Size(), 3);
// Check the 04:EXCHANGE node
ASSERT_GE(array.Size(), 1);
auto& exch04_node = array[0];
EXPECT_EQ(exch04_node["label"].GetString(), JOIN_LABELS[0]);
EXPECT_EQ(exch04_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[0]);
@@ -582,18 +842,21 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
checkPlanNodeStats(exch04_node, 0);
EXPECT_EQ(exch04_node["children"].Size(), 0);
// Check the 02:HASH_JOIN node
// Check the 02:HASH JOIN node
ASSERT_GE(array.Size(), 2);
auto& join_node = array[1];
EXPECT_EQ(join_node["label"].GetString(), JOIN_LABELS[1]);
EXPECT_EQ(join_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[1]);
checkPlanNodeStats(join_node, 1);
EXPECT_TRUE(join_node.HasMember("data_stream_target"));
// The data_stream_target of 02:HASH JOIN should point to the 04:EXCHANGE node
ASSERT_TRUE(join_node.HasMember("data_stream_target"));
EXPECT_EQ(join_node["data_stream_target"], JOIN_LABELS[0]);
// Check the two children of join node
// Check the two children of 02:HASH JOIN node
auto children = join_node["children"].GetArray();
EXPECT_EQ(children.Size(), 2);
ASSERT_EQ(children.Size(), 2);
for (size_t i = 0; i < children.Size(); ++i) {
EXPECT_EQ(children[i]["label"].GetString(), JOIN_LABELS.at(i + 2));
@@ -605,12 +868,131 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
}
// Check the 00:SCAN HDFS node
ASSERT_GE(array.Size(), 3);
auto& scan_node = array[2];
EXPECT_EQ(scan_node["label"].GetString(), JOIN_LABELS[4]);
EXPECT_EQ(scan_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[4]);
checkPlanNodeStats(scan_node, 4);
EXPECT_EQ(scan_node["children"].Size(), 0);
EXPECT_TRUE(join_node.HasMember("data_stream_target"));
// The data_stream_target of 00:SCAN HDFS should point to the 03:EXCHANGE node
ASSERT_TRUE(join_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), JOIN_LABELS[3]);
}
TEST_F(ImpalaHttpHandlerTest, UpdateStatement) {
const vector<string> UPDATE_LABELS = {"00:SCAN HDFS"};
const vector<string> UPDATE_LABEL_DETAILS = {"default.table"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareUpdateStatement(UPDATE_LABELS, UPDATE_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
// Check the MULTI DATA SINK node
ASSERT_EQ(array.Size(), 1);
auto& multi_data_sink_node = array[0];
EXPECT_STREQ(multi_data_sink_node["label"].GetString(), "MULTI DATA SINK");
EXPECT_STREQ(multi_data_sink_node["label_detail"].GetString(),
"HDFS WRITER, ICEBERG BUFFERED DELETER");
checkPlanNodeStats(multi_data_sink_node, -1);
// Check 00:SCAN HDFS node, which is the child of MULTI DATA SINK node
ASSERT_EQ(multi_data_sink_node["children"].Size(), 1);
auto& scan_node = multi_data_sink_node["children"][0];
EXPECT_EQ(scan_node["label"].GetString(), UPDATE_LABELS[0]);
EXPECT_EQ(scan_node["label_detail"].GetString(), UPDATE_LABEL_DETAILS[0]);
checkPlanNodeStats(scan_node, 0);
EXPECT_EQ(scan_node["children"].Size(), 0);
}
TEST_F(ImpalaHttpHandlerTest, MergeStatement) {
const vector<string> MERGE_LABELS = {
"03:MERGE", "02:HASH JOIN", "04:EXCHANGE", "00:SCAN HDFS", "01:SCAN HDFS"};
const vector<string> MERGE_LABEL_DETAILS = {
"", "INNER JOIN, BROADCAST", "BROADCAST", "default.target", "default.source"};
vector<TPlanFragment> fragments;
TExecSummary summary;
prepareMergeStatement(MERGE_LABELS, MERGE_LABEL_DETAILS, &fragments, &summary);
Document document;
Value value(kObjectType);
PlanToJson(fragments, summary, &document, &value);
document.CopyFrom(value, document.GetAllocator());
validatePlanSchema(document);
auto array = document["plan_nodes"].GetArray();
EXPECT_EQ(array.Size(), 2);
// Check the MERGE SINK node
ASSERT_GE(array.Size(), 1);
auto& merge_sink_node = array[0];
EXPECT_STREQ(merge_sink_node["label"].GetString(), "MERGE SINK");
EXPECT_STREQ(merge_sink_node["label_detail"].GetString(),
"HDFS WRITER, ICEBERG BUFFERED DELETER");
checkPlanNodeStats(merge_sink_node, -1);
// Check 03:MERGE node, which is the child of the MERGE SINK node
ASSERT_EQ(merge_sink_node["children"].Size(), 1);
auto& merge_node = merge_sink_node["children"][0];
EXPECT_EQ(merge_node["label"].GetString(), MERGE_LABELS[0]);
EXPECT_EQ(merge_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[0]);
checkPlanNodeStats(merge_node, 0);
// Check the 02:HASH JOIN node, which is a child of the 03:MERGE node
ASSERT_EQ(merge_node["children"].Size(), 1);
auto& hash_join_node = merge_node["children"][0];
EXPECT_EQ(hash_join_node["label"].GetString(), MERGE_LABELS[1]);
EXPECT_EQ(hash_join_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[1]);
checkPlanNodeStats(hash_join_node, 1);
// Check the two children of the 02:HASH JOIN node
auto children = hash_join_node["children"].GetArray();
ASSERT_EQ(children.Size(), 2);
for (size_t i = 0; i < children.Size(); ++i) {
EXPECT_EQ(children[i]["label"].GetString(), MERGE_LABELS.at(i + 2));
EXPECT_EQ(children[i]["label_detail"].GetString(), MERGE_LABEL_DETAILS.at(i + 2));
checkPlanNodeStats(children[i], i + 2);
EXPECT_EQ(children[i]["children"].Size(), 0);
}
// Check the 01:SCAN HDFS node
ASSERT_EQ(array.Size(), 2);
auto& scan_node = array[1];
EXPECT_EQ(scan_node["label"].GetString(), MERGE_LABELS[4]);
EXPECT_EQ(scan_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[4]);
checkPlanNodeStats(scan_node, 4);
EXPECT_EQ(scan_node["children"].Size(), 0);
// The data_stream_target of 01:SCAN HDFS should point to the 04:EXCHANGE node
ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
EXPECT_EQ(scan_node["data_stream_target"].GetString(), MERGE_LABELS[2]);
}

View File

@@ -1083,12 +1083,50 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::WebRequest& req,
namespace {
// Summary is stored with -1 as id if it is for a data sink at the root of a fragment.
constexpr int SINK_ID = -1;
void ExecStatsToJsonHelper(
const TPlanNodeExecSummary& summary, rapidjson::Document* document, Value* value) {
int64_t cardinality = 0;
int64_t max_time = 0;
int64_t total_time = 0;
for (const TExecStats& stat : summary.exec_stats) {
if (summary.is_broadcast) {
// Avoid multiple-counting for recipients of broadcasts.
cardinality = ::max(cardinality, stat.cardinality);
} else {
cardinality += stat.cardinality;
}
total_time += stat.latency_ns;
max_time = ::max(max_time, stat.latency_ns);
}
value->AddMember("output_card", cardinality, document->GetAllocator());
value->AddMember("num_instances", static_cast<uint64_t>(summary.exec_stats.size()),
document->GetAllocator());
if (summary.is_broadcast) {
value->AddMember("is_broadcast", true, document->GetAllocator());
}
const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS);
Value max_time_str_json(max_time_str, document->GetAllocator());
value->AddMember("max_time", max_time_str_json, document->GetAllocator());
value->AddMember("max_time_val", max_time, document->GetAllocator());
// Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a
// ns. See IMPALA-1800.
const string& avg_time_str = PrettyPrinter::Print(
// A bug may occasionally cause 1-instance nodes to appear to have 0 instances.
total_time / ::max(static_cast<int>(summary.exec_stats.size()), 1), TUnit::TIME_NS);
Value avg_time_str_json(avg_time_str, document->GetAllocator());
value->AddMember("avg_time", avg_time_str_json, document->GetAllocator());
}
// Helper for PlanToJson(), processes a single list of plan nodes which are the
// DFS-flattened representation of a single plan fragment. Called recursively, the
// iterator parameter is updated in place so that when a recursive call returns, the
// caller is pointing at the next of its children.
void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
const vector<TPlanNode>& nodes,
vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) {
Value children(kArrayType);
Value label((*it)->label, document->GetAllocator());
@@ -1098,54 +1136,67 @@ void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
value->AddMember("label_detail", label_detail, document->GetAllocator());
TPlanNodeId id = (*it)->node_id;
map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(id);
if (summary != summaries.end()) {
int64_t cardinality = 0;
int64_t max_time = 0L;
int64_t total_time = 0;
for (const TExecStats& stat: summary->second.exec_stats) {
if (summary->second.is_broadcast) {
// Avoid multiple-counting for recipients of broadcasts.
cardinality = ::max(cardinality, stat.cardinality);
} else {
cardinality += stat.cardinality;
}
total_time += stat.latency_ns;
max_time = ::max(max_time, stat.latency_ns);
}
value->AddMember("output_card", cardinality, document->GetAllocator());
value->AddMember("num_instances",
static_cast<uint64_t>(summary->second.exec_stats.size()),
document->GetAllocator());
if (summary->second.is_broadcast) {
value->AddMember("is_broadcast", true, document->GetAllocator());
}
const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS);
Value max_time_str_json(max_time_str, document->GetAllocator());
value->AddMember("max_time", max_time_str_json, document->GetAllocator());
value->AddMember("max_time_val", max_time, document->GetAllocator());
// Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a
// ns. See IMPALA-1800.
const string& avg_time_str = PrettyPrinter::Print(
// A bug may occasionally cause 1-instance nodes to appear to have 0 instances.
total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1),
TUnit::TIME_NS);
Value avg_time_str_json(avg_time_str, document->GetAllocator());
value->AddMember("avg_time", avg_time_str_json, document->GetAllocator());
map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it = summaries.find(id);
if (summary_it != summaries.end()) {
ExecStatsToJsonHelper(summary_it->second, document, value);
}
int num_children = (*it)->num_children;
for (int i = 0; i < num_children; ++i) {
++(*it);
Value container(kObjectType);
PlanToJsonHelper(summaries, nodes, it, document, &container);
PlanToJsonHelper(summaries, it, document, &container);
children.PushBack(container, document->GetAllocator());
}
value->AddMember("children", children, document->GetAllocator());
}
// Helper for PlanToJson(), called only when the plan fragment's root data sink must be
// one of the following types:
// - table sink,
// - multi data sink,
// - merge sink.
// Plan nodes of the plan fragment will be listed as children of the root data sink.
void SinkToJsonHelper(const TDataSink& sink,
const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) {
Value label(sink.label, document->GetAllocator());
value->AddMember("label", label, document->GetAllocator());
string label_detail_str = "";
switch (sink.type) {
case TDataSinkType::type::MERGE_SINK:
case TDataSinkType::type::MULTI_DATA_SINK:
label_detail_str = sink.child_data_sinks.at(0).label;
for (std::size_t i = 1; i < sink.child_data_sinks.size(); ++i) {
label_detail_str += ", ";
label_detail_str += sink.child_data_sinks.at(i).label;
}
break;
case TDataSinkType::type::TABLE_SINK:
label_detail_str = to_string(sink.table_sink.action);
break;
default:
// Should not call SinkToJsonHelper() with any other sink type.
DCHECK(false) << "Invalid sink type: " << sink.type;
}
Value label_detail(label_detail_str, document->GetAllocator());
value->AddMember("label_detail", label_detail, document->GetAllocator());
map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it =
summaries.find(SINK_ID);
DCHECK(summary_it != summaries.end());
ExecStatsToJsonHelper(summary_it->second, document, value);
Value children(kArrayType);
Value container(kObjectType);
PlanToJsonHelper(summaries, it, document, &container);
children.PushBack(container, document->GetAllocator());
value->AddMember("children", children, document->GetAllocator());
}
} // unnamed namespace
void impala::PlanToJson(const vector<TPlanFragment>& fragments,
@@ -1153,32 +1204,43 @@ void impala::PlanToJson(const vector<TPlanFragment>& fragments,
// Build a map from id to label so that we can resolve the targets of data-stream sinks
// and connect plan fragments.
map<TPlanNodeId, string> label_map;
for (const TPlanFragment& fragment: fragments) {
for (const TPlanNode& node: fragment.plan.nodes) {
for (const TPlanFragment& fragment : fragments) {
for (const TPlanNode& node : fragment.plan.nodes) {
label_map[node.node_id] = node.label;
}
}
map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries;
for (const TPlanNodeExecSummary& s: summary.nodes) {
exec_summaries[s.node_id] = s;
for (const TPlanNodeExecSummary& s : summary.nodes) {
// All sink has -1 as node_id, we want to store the summary of the first one (the root
// of the plan tree) and insert will not overwrite the existing value
// if the key is already present.
exec_summaries.insert({s.node_id, s});
}
Value nodes(kArrayType);
for (const TPlanFragment& fragment: fragments) {
for (const TPlanFragment& fragment : fragments) {
Value plan_fragment(kObjectType);
vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin();
PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment);
if (fragment.__isset.output_sink) {
const TDataSink& sink = fragment.output_sink;
if (sink.__isset.stream_sink) {
Value target(label_map[sink.stream_sink.dest_node_id],
document->GetAllocator());
plan_fragment.AddMember("data_stream_target", target, document->GetAllocator());
} else if (sink.__isset.join_build_sink) {
Value target(label_map[sink.join_build_sink.dest_node_id],
document->GetAllocator());
plan_fragment.AddMember("join_build_target", target, document->GetAllocator());
if (fragment.__isset.output_sink
&& (fragment.output_sink.type == TDataSinkType::type::MERGE_SINK
|| fragment.output_sink.type == TDataSinkType::type::MULTI_DATA_SINK
|| fragment.output_sink.type == TDataSinkType::type::TABLE_SINK)) {
SinkToJsonHelper(
fragment.output_sink, exec_summaries, &it, document, &plan_fragment);
} else {
PlanToJsonHelper(exec_summaries, &it, document, &plan_fragment);
if (fragment.__isset.output_sink) {
const TDataSink& sink = fragment.output_sink;
if (sink.__isset.stream_sink) {
Value target(
label_map[sink.stream_sink.dest_node_id], document->GetAllocator());
plan_fragment.AddMember("data_stream_target", target, document->GetAllocator());
} else if (sink.__isset.join_build_sink) {
Value target(
label_map[sink.join_build_sink.dest_node_id], document->GetAllocator());
plan_fragment.AddMember("join_build_target", target, document->GetAllocator());
}
}
}
nodes.PushBack(plan_fragment, document->GetAllocator());

View File

@@ -1049,9 +1049,9 @@ struct TFinalizeParams {
9: optional TIcebergDmlFinalizeParams iceberg_params;
}
// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
// Result of call to ImpalaPlanService/JniFrontend.createExecRequest()
struct TQueryExecRequest {
// exec info for all plans; the first one materializes the query result, and subsequent
// Exec info for all plans; the first one materializes the query result, and subsequent
// plans materialize the build sides of joins. Each plan appears before its
// dependencies in the list.
1: optional list<TPlanExecInfo> plan_exec_info