Currently, when the graceful shutdown deadline is reached, Impala
daemon exits immediately, leaving any running queries unfinished.
This approach is not quite graceful, as it may result in unreleased
resources, such as scratch files in remote storage.
This patch adds a new state in the graceful shutdown process.
Before reaching the shutdown deadline, Impala daemon will try to
cancel any remaining running queries within a configurable timelimit
flag, shutdown_query_cancel_period_s. If this time limit exceeds
20% of the total shutdown deadline, it will be automatically
capped at that value. The idea is to cancel queries only near the
end of the graceful shutdown deadline. The 20% is the threshold to
allow us to take a more aggressive way to ensure a graceful
shutdown.
If all queries are successfully canceled within this period, the
server shuts down immediately. Otherwise, it shuts down once the
deadline is reached, with queries still running.
Tests:
Passed core tests.
Added testcases test_shutdown_coordinator_cancel_query and
test_shutdown_executor_with_query_cancel_period and
test_shutdown_coordinator_and_executor_cancel_query.
Manually tested shutdown a coord or an executor with long
running queries and they were canceled.
Change-Id: I1cac2e100d329644e21fdceb0b23901b08079130
Reviewed-on: http://gerrit.cloudera.org:8080/22422
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
To support killing queries programatically, this patch adds a new
type of SQL statements, called the KILL QUERY statement, to cancel and
unregister a query on any coordinator in the cluster.
A KILL QUERY statement looks like
```
KILL QUERY '123:456';
```
where `123:456` is the query id of the query we want to kill. We follow
syntax from HIVE-17483. For backward compatibility, 'KILL' and 'QUERY'
are added as "unreserved keywords", like 'DEFAULT'. This allows the
three keywords to be used as identifiers.
A user is authorized to kill a query only if the user is an admin or is
the owner of the query. KILL QUERY statements are not affected by
admission control.
Implementation:
Since we don't know in advance which impalad is the coordinator of the
query we want to kill, we need to broadcast the kill request to all the
coordinators in the cluster. Upon receiving a kill request, each
coordinator checks whether it is the coordinator of the query:
- If yes, it cancels and unregisters the query,
- If no, it reports "Invalid or unknown query handle".
Currently, a KILL QUERY statement is not interruptible. IMPALA-13663 is
created for this.
For authorization, this patch adds a custom handler of
AuthorizationException for each statement to allow the exception to be
handled by the backend. This is because we don't know whether the user
is the owner of the query until we reach its coordinator.
To support cancelling child queries, this patch changes
ChildQuery::Cancel() to bypass the HS2 layer so that the session of the
child query will not be added to the connection used to execute the
KILL QUERY statement.
Testing:
- A new ParserTest case is added to test using "unreserved keywords" as
identifiers.
- New E2E test cases are added for the KILL QUERY statement.
- Added a new dimension in TestCancellation to use the KILL QUERY
statement.
- Added file tests/common/cluster_config.py and made
CustomClusterTestSuite.with_args() composable so that common cluster
configs can be reused in custom cluster tests.
Change-Id: If12d6e47b256b034ec444f17c7890aa3b40481c0
Reviewed-on: http://gerrit.cloudera.org:8080/21930
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
This patch improves REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION error
message by saying the specific configuration that must be adjusted such
that the query can pass the Admission Control. New fields
'per_backend_mem_to_admit_source' and
'coord_backend_mem_to_admit_source' of type MemLimitSourcePB are added
into QuerySchedulePB. These fields explain what limiting factor drives
final numbers at 'per_backend_mem_to_admit' and
'coord_backend_mem_to_admit' respectively. In turn, Admission Control
will use this information to compose a more informative error message
that the user can act upon. The new error message pattern also
explicitly mentions "Per Host Min Memory Reservation" as a place to look
at to investigate memory reservations scheduled for each backend node.
Updated documentation with examples of query rejection by Admission
Control and how to read the error message.
Testing:
- Add BE tests at admission-controller-test.cc
- Adjust and pass affected EE tests
Change-Id: I1ef7fb7e7a194b2036c2948639a06c392590bf66
Reviewed-on: http://gerrit.cloudera.org:8080/21436
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
It is possible to have UpdateFilterFromRemote RPC arrive to an impalad
executor before QueryState of the destination query is created or
complete initialization. This patch add wait mechanism in
UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until
QueryState exist and complete initialization.
The wait time is fixed at 500ms, with exponential sleep period in
between. If wait time passed and QueryState still not found or
initialized, UpdateFilterFromRemote RPC is deemed fail and query
execution move on without complete filter.
Testing:
- Add BE tests in network-util-test.cc
- Add test_runtime_filter_aggregation.py::TestLateQueryStateInit
- Pass exhastive runs of test_runtime_filter_aggregation.py,
test_query_live.py, and test_query_log.py
Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9
Reviewed-on: http://gerrit.cloudera.org:8080/21383
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Defines SystemTable which are in-memory tables that can provide access
to Impala state. Adds the 'impala_query_live' to the database 'sys',
which already exists for 'sys.impala_query_log'.
Implements the 'impala_query_live' table to view active queries across
all coordinators sharing the same statestore. SystemTables create new
SystemTableScanNodes for their scan node implementation. When computing
scan range locations, SystemTableScanNodes creates a scan range for each
in the cluster (identified via ClusterMembershipMgr). This produces a
plan that looks like:
Query: explain select * from sys.impala_query_live
+------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 |
| Per-Host Resource Estimates: Memory=11MB |
| WARNING: The following tables are missing relevant table |
| and/or column statistics. |
| sys.impala_query_live |
| |
| PLAN-ROOT SINK |
| | |
| 01:EXCHANGE [UNPARTITIONED] |
| | |
| 00:SCAN SYSTEM_TABLE [sys.impala_query_live] |
| row-size=72B cardinality=20 |
+------------------------------------------------------------+
Impala's scheduler checks for whether the query contains fragments that
can be scheduled on coordinators, and if present includes an
ExecutorGroup containing all coordinators. These are used to schedule
scan ranges that are flagged as 'use_coordinator', allowing
SystemTableScanNodes to be scheduled on dedicated coordinators and
outside the selected executor group.
Execution will pull data from ImpalaServer on the backend via a
SystemTableScanner implementation based on table name.
In the query profile, SYSTEM_TABLE_SCAN_NODE includes
ActiveQueryCollectionTime and PendingQueryCollectionTime to track time
spent collecting QueryState from ImpalaServer.
Grants QueryScanner private access to ImpalaServer, identical to how
ImpalaHttpHandler access internal server state.
Adds custom cluster tests for impala_query_live, and unit tests for
changes to planner and scheduler.
Change-Id: Ie2f9a449f0e5502078931e7f1c5df6e0b762c743
Reviewed-on: http://gerrit.cloudera.org:8080/20762
Reviewed-by: Jason Fehr <jfehr@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
We must invoke validateDataFilesExist for RowDelta operations (DELETE/
UPDATE/MERGE). Without this a concurrent RewriteFiles (compaction) and
RowDelta can corrupt a table.
IcebergBufferedDeleteSink now also collects the filenames of the data
files that are referenced in the position delete files. It adds them to
the DML exec state which is then collected by the Coordinator. The
Coordinator passes the file paths to CatalogD which executes Iceberg's
RowDelta operation and now invokes validateDataFilesExist() with the
file paths. Additionally it also invokes validateDeletedFiles().
This patch set also resolves IMPALA-12640 which is about replacing
IcebergDeleteSink with IcebergBufferedDeleteSink, as from now on
we use the buffered version for all DML operations that write
position delete files.
Testing:
* adds new stress test with DELETE + UPDATE + OPTIMIZE
Change-Id: I4869eb863ff0afe8f691ccf2fd681a92d36b405c
Reviewed-on: http://gerrit.cloudera.org:8080/21099
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.
This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.
A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.
This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.
test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.
We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:
+---------------------+------------------------+
| num aggregator node | Aggregation speed (ms) |
+---------------------+------------------------+
| 0 | 1296 |
| 1 | 1229 |
| 2 | 608 |
| 4 | 329 |
| 8 | 205 |
+---------------------+------------------------+
Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.
Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Reviewed-on: http://gerrit.cloudera.org:8080/20612
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds limited UPDATE support for Iceberg tables. The
limitations mean users cannot update Iceberg tables if any of
the following is true:
* UPDATE value of partitioning column
* UPDATE table that went through partition evolution
* Table has SORT BY properties
The above limitations will be resolved by part 3. The usual limitations
like writing non-Parquet files, using copy-on-write, modifying V1 tables
are out of scope of IMPALA-12313.
This patch implements UPDATEs with the merge-on-read technique. This
means the UPDATE statement writes both data files and delete files.
Data files contain the updated records, delete files contain the
position delete records of the old data records that have been
touched.
To achieve the above this patch introduces a new sink: MultiDataSink.
We can configure multiple TableSinks for a single MultiDataSink object.
During execution, the row batches sent to the MultiDataSink will be
forwarded to all the TableSinks that have been registered.
The UPDATE statement for an Iceberg table creates a source select
statement with all table columns and virtual columns INPUT__FILE__NAME
and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema
(i int, s string, k int), and we update the table with:
UPDATE tbl SET k = 5 WHERE i % 100 = 11;
The generated source statement will be ==>
SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION
FROM tbl WHERE i % 100 = 11;
Then we create two table sinks that refer to expressions from the above
source statement:
Insert sink (i, s, 5)
Delete sink (INPUT__FILE__NAME, FILE__POSITION)
The tuples in the rowbatch of MultiDataSink contain slots for all the
above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION).
MultiDataSink forwards each row batch to each registered TableSink.
They will pick their relevant expressions from the tuple and write
data/delete files. The tuples are sorted by INPUTE__FILE__NAME and
FILE__POSITION because we need to write the delete records in this
order.
For partitioned tables we need to shuffle and sort the input tuples.
In this case we also add virtual columns "PARTITION__SPEC__ID" and
"ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle
and sort the rows based on them.
Data files and delete files are now separated in the DmlExecState, so
at the end of the operation we'll have two sets of files. We use these
two sets to create a new Iceberg snapshot.
Why does this patch have the limitations?
- Because we are shuffling and sorting rows based on the delete
records and their partitions. This means that the new data files
might not get written in an efficient way, e.g. there will be
too many of them, or we will need to keep too many open file
handles during writing.
Also, if the table has SORT BY properties, we cannot respect
it as the input rows are ordered in a way to favor the position
deletes.
Part 3 will introduce a buffering writer for position delete
files. This means we will shuffle and sort records based on
the data records' partitions and SORT BY properties while
delete records get buffered and written out at the end (sorted
by file_path and position). In some edge cases the delete records
might not get written efficiently, but it is a smaller problem
then inefficient data files.
Testing:
* negative tests
* planner tests
* update all supported data types
* partitioned tables
* Impala/Hive interop tests
* authz tests
* concurrent tests
Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08
Reviewed-on: http://gerrit.cloudera.org:8080/20677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
For Iceberg tables, when joining the data files with the delete files,
both of the current distribution modes (broadcast, partitioned) are
wasteful. The idea is that when we read a row from a delete file it
contains the name of the data file that this particular delete row is
referring to so if we knew where that data file is scheduled we could
directly send that delete file row there.
This patch enhances the scheduler to collect the information about
which data file is scheduled on which host. Since, the scan node for
the data files are on the same host as the Iceberg join node, we can
send the delete files directly to that specific host.
Functional testing:
- Re-run full test suite to check for regressions.
Performance testing:
1) Local machine: SELECT COUNT(1) FROM TPCH10_parquet.lineitem
Around 15% of the rows are deleted.
As the table is unpartitioned I got a small number of delete files with
relatively large size.
Query runtime decreased by ~80%
2) Local machine: SELECT COUNT(1) FROM TPCDS10_store_sales
Around 15% of the rows are deleted.
Table is partitioned that results more delete files but smaller in
size.
Query runtime decreased by ~50%
3) Performance testing in a multi-node with data stored on S3.
SELECT COUNT(1) FROM a scaled store_sales table having ~8.6B rows and
~15% are deleted.
Here we had 2 scenarios:
a) Table is written by Impala: One delete file row is sent exactly to
one host.
b) Table is written by Hive: Here apparently the data files are
bigger and one data file might be spread to multiple scan ranges.
As a result one delete file row might be sent to multiple hosts.
The time difference between the a) run is the time spent on
sending out more delete file rows.
- Results with 10-node
a) Runtime decreased by ~80%.
b) Runtime decreased by ~60%.
- Results with 20-node
a) Runtime decreased by ~65%.
b) Runtime decreased by ~42%.
- Results with 40-node
a) Runtime decreased by ~55%.
b) Runtime decreased by ~42%.
Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2
Reviewed-on: http://gerrit.cloudera.org:8080/20548
Reviewed-by: Tamas Mate <tmater@apache.org>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds process start time and version to the /backends page.
Two more optional elements are added in BackendDescriptorPB and can
be broadcast through statestore topic. This information should be
helpful for users checking all backends in a large cluster.
For display, as two more columns are added to the table of backend
information, the table is changed to 'table-responsive' to be scrolled
horizontally with ease. A sample screenshot is attached to the
IMPALA-12096 ticket.
Testing:
- Added cases to test_web_pages.py
Change-Id: I5f1f0ba0081986f428840442c247d7dde9e1ba05
Reviewed-on: http://gerrit.cloudera.org:8080/19800
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When operating and maintaining an Impala cluster or analyzing historical
query performance, it will be helpful if we show the memory consumed,
the amount of data read, and other information of the query from the
historical query page of the web UI. The current historical query page
does not display these information, so we should count this information
when the query is executed and display it on the web page.
This patch modifies the query list page (/queries) and query detail
pages (/query_plan, etc.).
On the list page, some metrics are added for each query record,
including queuing time, memory usage, memory estimation, bytes read, and
bytes sent. In addition, the Details column now shows the query ID and
the position is adjusted to make them at the top of the record for easy
clicking.
On the query detail page, a similar record table is added to display the
key information of the current query. In addition, a timeline display is
added to the summary page (which is exactly the same as the timeline in
the profile, just for quick viewing). For queries that are running, the
above information will be automatically refreshed (only for the plan and
summary tabs).
To make it clear what each metric means, tooltips are added to all list
headers.
Change-Id: I19c75461a6405025fa433ae84d2c94d013fcaacb
Reviewed-on: http://gerrit.cloudera.org:8080/19417
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Adds a metric bytes-read-encrypted to track encrypted reads.
Testing:
- ran test_io_metrics.py with Ozone (encrypts by default)
- ran test_io_metrics.py with HDFS (no encryption)
Change-Id: I9dbc194a4bc31cb0e01545fb6032a0853db60f34
Reviewed-on: http://gerrit.cloudera.org:8080/19461
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
For the Iceberg tables, table-level statistics such as numRows can be
computed according to iceberg parition stats, which is more accurate and
real-time. Obtaining these statistics is independent of
StatsSetupConst.ROW_COUNT and StatsSetupConst.TOTAL_SIZE in HMS. This is
an improvement for estimating the cardinality of the Iceberg tables.
But now the calculation of V2 Iceberg table is not accurate, maybe after
IMPALA-11516(Return better partition stats for V2 tables) is ready, they
can be considered to replace those MHS statistics.
Testing:
- Existing tests
- Test on 'On-demand Metadata' mode
- For 'select * from
iceberg_v2_positional_not_all_data_files_have_delete_files where i =
(select max(i) from iceberg_v2_positional_update_all_rows)', the 'Join
Order' and 'Distribution Mode' are the same as when table stats are
present
Change-Id: I3e92d3f25e2a57a64556249410d0af3522598c00
Reviewed-on: http://gerrit.cloudera.org:8080/19168
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
For Iceberg tables, when one of the following properties is used, it is
considered that the table is possible to have data outside the table
location directory:
- 'write.object-storage.enabled' is true
- 'write.data.path' is not empty
- 'write.location-provider.impl' is configured
- 'write.object-storage.path'(Deprecated) is not empty
- 'write.folder-storage.path'(Deprecated) is not empty
We should tolerate the situation that relative path of the data files
cannot be obtained by the table location path, and we could use the
absolute path in that case. E.g. the ETL program will write the table
that the metadata of the Iceberg tables is placed in
'hdfs://nameservice_meta/warehouse/hadoop_catalog/ice_tbl/metadata',
the recent data files in
'hdfs://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', and the
data files half a year ago in
's3a://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', it
should still be queried normally by Impala.
Testing:
- added e2e tests
Change-Id: I666bed21d20d5895f4332e92eb30a94fa24250be
Reviewed-on: http://gerrit.cloudera.org:8080/18894
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This fixes a few different CMake warnings:
1. This removes cmake_minimum_required invocations except for the
top-most CMakeLists.txt. This eliminates the warnings like this:
Compatibility with CMake < 2.8.12 will be removed from a future version of
CMake.
Update the VERSION argument <min> value or use a ...<max> suffix to tell
CMake that the project does not need compatibility with older versions.
Moving to a later version also required setting CMAKE_ENABLE_EXPORTS
to continue exporting symbols.
2. This modifies the module names so that they match the corresponding
module names from Find*.cmake. This is mostly dealing with case
differences. This address warnings like:
The package name passed to `find_package_handle_standard_args` (PROTOBUF)
does not match the name of the calling package (Protobuf). This can lead
to problems in calling code that expects `find_package` result variables
(e.g., `_FOUND`) to follow a certain pattern.
This fixed the detection logic for KerberosPrograms, and so it required
adding more Kerberos packages to bin/bootstrap_build.sh.
3. This adds a missing .cc suffix. This addresses the following warning:
CMake Warning (dev) at be/src/util/CMakeLists.txt:141 (add_library):
Policy CMP0115 is not set: Source file extensions must be explicit. Run
"cmake --help-policy CMP0115" for policy details. Use the cmake_policy
command to set the policy and suppress this warning.
These fixes mostly match how these warnings were handled in
Apache Kudu.
Testing:
- Ran GVO
Change-Id: I2a97dd07cdd0831e90882a2035415ac71d670147
Reviewed-on: http://gerrit.cloudera.org:8080/18444
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch make following changes to support running KRPC over UDS.
- Add FLAGS_rpc_use_unix_domain_socket to enable running KRPC over
UDS. Add FLAGS_uds_address_unique_id to specify unique Id for UDS
address. It could be 'ip_address', 'backend_id', or 'none'.
- Add variable uds_address in NetworkAddressPB and TNetworkAddress.
Replace TNetworkAddress with NetworkAddressPB for KRPC related
class variables and APIs.
- Set UDS address for each daemon as @impala-kprc:<unique_id>
during initialization with unique_id specified by starting flag
FLAGS_uds_address_unique_id.
- When FLAG_rpc_use_unix_domain_socket is true, the socket of KRPC
server will be binded to the UDS address of the daemon.
KRPC Client will connect to KRPC server with the UDS address of
the server when creating proxy service, which in turn call
kudu::Socket::Connect() function to connect KRPC server.
- rpcz Web page show TCP related stats as 'N/A' when using UDS.
Show remote UDS address for KRPC inbound connections on rpcz Web
page as '*' when using UDS since the remote UDS addresses are
not available.
- Add new unit-tests for UDS.
- BackendId of admissiond is not available. Use admissiond's IP
address as unique ID for UDS.
TODO: Advertise BackendId of admissiond in global admission
control mode.
Testing:
- Passed core test with FLAG_rpc_use_unix_domain_socket as fault
value false.
- Passed core test with FLAG_rpc_use_unix_domain_socket as true.
Change-Id: I439f5a03eb425c17451bcaa96a154bb0bca17ee7
Reviewed-on: http://gerrit.cloudera.org:8080/18369
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When Hive (and probably other engines as well) converts a legacy Hive
table to Iceberg it doesn't rewrite the data files. It means that the
data files don't have write ids neither partition column data. Currently
Impala expects the partition columns to be present in the data files,
so it is not able to read converted partitioned tables.
With this patch Impala loads partition values from the Iceberg metadata.
The extra metadata information is attached to the file descriptor
objects and propageted to the scanners. This metadata contains the
Iceberg data file format (later it could be used to handle mixed-format
tables), and partition data.
We use the partition data in the HdfsScanner to create the template
tuple that contains the partition values of identity-partitioned
columns. This is not only true to migrated tables, but all Iceberg
tables with identity partitions, which means we also save some IO
and CPU time for such columns. The partition information could also
be used for Dynamic Partition Pruning later.
We use the (human-readable) string representation of the partition data
when storing them in the flat buffers. This helps debugging, also
it provides the needed flexibility when the partition columns
evolve (e.g. INT -> BIGINT, DECIMAL(4,2) -> DECIMAL(6,2)).
Testing
* e2e test for all data types that can be used to partition a table
* e2e test for migrated partitioned table + schema evolution (without
renaming columns)
* e2e for table where all columns are used as identity-partitions
Change-Id: Iac11a02de709d43532056f71359c49d20c1be2b8
Reviewed-on: http://gerrit.cloudera.org:8080/18240
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
ORC files have optional bloom filter indexes for each column. Since
ORC-1.7.0, the C++ reader supports pushing down predicates to skip
unreleated RowGroups. The pushed down predicates will be evaludated on
file indexes (i.e. statistics and bloom filter indexes). Note that only
EQUALS and IN-list predicates can leverage bloom filter indexes.
Currently Impala has two kinds of runtime filters: bloom filter and
min-max filter. Unfortunately they can't be converted into EQUALS or
IN-list predicates. So they can't leverage the file level bloom filter
indexes.
This patch adds runtime IN-list filters for this purpose. Currently they
are generated for the build side of a broadcast join. They will only be
applied on ORC tables and be pushed down to the ORC reader(i.e. ORC
lib). To avoid exploding the IN-list, if # of distinct values of the
build side exceeds a threshold (default to 1024), we set the filter to
ALWAYS_TRUE and clear its entry. The threshold can be configured by a
new query option, RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT.
Evaluating runtime IN-list filters is much slower than evaluating
runtime bloom filters due to the current simple implementation (i.e.
std::unorder_set) and the lack of codegen. So we disable it at row
level.
For visibility, this patch addes two counters in the HdfsScanNode:
- NumPushedDownPredicates
- NumPushedDownRuntimeFilters
They reflect the predicates and runtime filters that are pushed down to
the ORC reader.
Currently, runtime IN-list filters are disabled by default. This patch
extends the query option, ENABLED_RUNTIME_FILTER_TYPES, to support a
comma separated list of filter types. It defaults to be "BLOOM,MIN_MAX".
Add "IN_LIST" in it to enable runtime IN-list filters.
Ran perf tests on a 3 instances cluster on my desktop using TPC-DS with
scale factor 20. It shows significant improvements in some queries:
+-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval |
+-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
| TPCDS(20) | TPCDS-Q67A | orc / snap / block | 35.07 | 44.01 | I -20.32% | 0.38% | 1.38% | 10 | I -25.69% | -3.58 | -45.33 |
| TPCDS(20) | TPCDS-Q37 | orc / snap / block | 1.08 | 1.45 | I -25.23% | 7.14% | 3.09% | 10 | I -34.09% | -3.58 | -12.94 |
| TPCDS(20) | TPCDS-Q70A | orc / snap / block | 6.30 | 8.60 | I -26.81% | 5.24% | 4.21% | 10 | I -36.67% | -3.58 | -14.88 |
| TPCDS(20) | TPCDS-Q16 | orc / snap / block | 1.33 | 1.85 | I -28.28% | 4.98% | 5.92% | 10 | I -39.38% | -3.58 | -12.93 |
| TPCDS(20) | TPCDS-Q18A | orc / snap / block | 5.70 | 8.06 | I -29.25% | 3.00% | 4.12% | 10 | I -40.30% | -3.58 | -19.95 |
| TPCDS(20) | TPCDS-Q22A | orc / snap / block | 2.01 | 2.97 | I -32.21% | 6.12% | 5.94% | 10 | I -47.68% | -3.58 | -14.05 |
| TPCDS(20) | TPCDS-Q77A | orc / snap / block | 8.49 | 12.44 | I -31.75% | 6.44% | 3.96% | 10 | I -49.71% | -3.58 | -16.97 |
| TPCDS(20) | TPCDS-Q75 | orc / snap / block | 7.76 | 12.27 | I -36.76% | 5.01% | 3.87% | 10 | I -59.56% | -3.58 | -23.26 |
| TPCDS(20) | TPCDS-Q21 | orc / snap / block | 0.71 | 1.27 | I -44.26% | 4.56% | 4.24% | 10 | I -77.31% | -3.58 | -28.31 |
| TPCDS(20) | TPCDS-Q80A | orc / snap / block | 9.24 | 20.42 | I -54.77% | 4.03% | 3.82% | 10 | I -123.12% | -3.58 | -40.90 |
| TPCDS(20) | TPCDS-Q39-1 | orc / snap / block | 1.07 | 2.26 | I -52.74% | * 23.83% * | 2.60% | 10 | I -149.68% | -3.58 | -14.43 |
| TPCDS(20) | TPCDS-Q39-2 | orc / snap / block | 1.00 | 2.33 | I -56.95% | * 19.53% * | 2.07% | 10 | I -151.89% | -3.58 | -20.81 |
+-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
"Base Avg" is the avg of the original time. "Avg" is the current time.
However, we also see some regressions due to the suboptimal
implementation. The follow-up JIRAs will focus on improvements:
- IMPALA-11140: Codegen InListFilter::Insert() and InListFilter::Find()
- IMPALA-11141: Use exact data types in IN-list filters instead of
casting data to a set of int64_t or a set of string.
- IMPALA-11142: Consider IN-list filters in partitioned joins.
Tests:
- Test IN-list filter on string, date and all integer types
- Test IN-list filter with NULL
- Test IN-list filter on complex exprs targets
Change-Id: I25080628233799aa0b6be18d5a832f1385414501
Reviewed-on: http://gerrit.cloudera.org:8080/18141
Reviewed-by: Qifan Chen <qchen@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds parquet stats to iceberg manifest as per-datafile
metrics.
The following metrics are supported:
- column_sizes :
Map from column id to the total size on disk of all regions that
store the column. Does not include bytes necessary to read other
columns, like footers.
- null_value_counts :
Map from column id to number of null values in the column.
- lower_bounds :
Map from column id to lower bound in the column serialized as
binary. Each value must be less than or equal to all non-null,
non-NaN values in the column for the file.
- upper_bounds :
Map from column id to upper bound in the column serialized as
binary. Each value must be greater than or equal to all non-null,
non-Nan values in the column for the file.
The corresponding parquet stats are collected by 'ColumnStats'
(in 'min_value_', 'max_value_', 'null_count_' members) and
'HdfsParquetTableWriter::BaseColumnWriter' (in
'total_compressed_byte_size_' member).
Testing:
- New e2e test was added to verify that the metrics are written to the
Iceberg manifest upon inserting data.
- New e2e test was added to verify that lower_bounds/upper_bounds
metrics are used to prune data files on querying iceberg tables.
- Existing e2e tests were updated to work with the new behavior.
- BE test for single-value serialization.
Relevant Iceberg documentation:
- Manifest:
https://iceberg.apache.org/spec/#manifests
- Values in lower_bounds and upper_bounds maps should be Single-value
serialized to binary:
https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
Change-Id: Ic31f2260bc6f6a7f307ac955ff05eb154917675b
Reviewed-on: http://gerrit.cloudera.org:8080/17806
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Attila Jeges <attilaj@cloudera.com>
Before this fix Impala committed an insert first, then reloaded the
table from HMS, and generated the insert events based on the difference
between the two snapshots. (e.g. which file was not present in the old
snapshot but are there in the new one).
Hive replication expects the insert events before the commit, so this
may potentially lead to issues there.
The solution is to collect the new files during the insert in the
backend, and send the insert events based on this file set. This wasn't
very hard to do as we were already collecting the files in some cases:
- to move them from staging dir to their final location in case of
non-partitioned tables
- to write the file list to snapshot files in case of Iceberg tables
This patch unifies the paths above and collects all information about
the created files regardless of the table type.
Testing:
- no new tests, insert events were already covered in
test_event_processing.py and MetastoreEventsProcessorTest.java
- ran core tests
Change-Id: I2ed812dbcb5f55efff3a910a3daeeb76cd3295b9
Reviewed-on: http://gerrit.cloudera.org:8080/17313
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Currently, if a ReleaseQuery rpc fails, it's possible for the
admission service to think that some resources are still being used
that are actually free.
This patch fixes the issue by introducing a periodic heartbeat rpc
from coordinators to the admission service which contains a list of
queries registered at that coordinator.
If there is a query that the admission service thinks is running but
is not included in the heartbeat, the admission service can conclude
that the query must have already completed and release its resources.
Testing:
- Added a test that uses a debug action to simulate ReleaseQuery rpcs
failing and checks that query resources are released properly.
Change-Id: Ia528d92268cea487ada20b476935a81166f5ad34
Reviewed-on: http://gerrit.cloudera.org:8080/17194
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds retries of the AdmitQuery rpc by coordinators.
This helps to ensure that if an admissiond goes down and is restarted
or is temporarily unreachable, queries won't fail.
The retries are done with backoff and jitter to avoid overloading the
admissiond in these scenarios.
A new flag, --admission_max_retry_time_s, is added to control how long
queries will continue retrying before giving up.
The AdmitQuery rpc is made idempotent - if a query is submitted with
the same query id as one the admissiond already knows about,
AdmitQuery will return OK without submitting the query to be scheduled
again.
Testing:
- Added a custom cluster test that checks that queries won't fail when
the admissiond goes down.
Change-Id: I8bc0cac666bbd613a1143c0e2c4f84d3b0ad003a
Reviewed-on: http://gerrit.cloudera.org:8080/17188
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch extends blacklist functionality by adding executor node to
blacklist if a query fails caused by disk failure during spill-to-disk.
Also classifies disk error codes and defines a blacklistable error set
for non-transient disk errors. Coordinator blacklists executor only if
the executor hitted blacklistable error during spill-to-disk.
Adds a new debug action to simulate disk write error during spill-to-
disk. To use, specify in query options as:
'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
where <hostname> and <port> represent the impalad which execute the
fragment instances, <port> is the BE krpc port (default 27000).
Adds new test cases for blacklist and query-retry to cover the code
changes.
Testing:
- Passed new test cases.
- Passed exhaustive test.
- Manually simulated disk failures in scratch directories on nodes
of a cluster, verified that the nodes were blacklisted as
expected.
Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437
Reviewed-on: http://gerrit.cloudera.org:8080/16949
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for limiting the rows produced by a join node
such that runaway join queries can be prevented.
The limit is specified by a query option. Queries exceeding that limit
get terminated. The checking runs periodically, so the actual rows
produced may go somewhat over the limit.
JOIN_ROWS_PRODUCED_LIMIT is exposed as an advanced query option.
Rows produced Query profile is updated to include query wide and per
backend metrics for RowsReturned. Example from "
set JOIN_ROWS_PRODUCED_LIMIT = 10000000;
select count(*) from tpch_parquet.lineitem l1 cross join
(select * from tpch_parquet.lineitem l2 limit 5) l3;":
NESTED_LOOP_JOIN_NODE (id=2):
- InactiveTotalTime: 107.534ms
- PeakMemoryUsage: 16.00 KB (16384)
- ProbeRows: 1.02K (1024)
- ProbeTime: 0.000ns
- RowsReturned: 10.00M (10002025)
- RowsReturnedRate: 749.58 K/sec
- TotalTime: 13s337ms
Testing:
Added tests for JOIN_ROWS_PRODUCED_LIMIT
Change-Id: Idbca7e053b61b4e31b066edcfb3b0398fa859d02
Reviewed-on: http://gerrit.cloudera.org:8080/16706
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch introduces a new krpc service, AdmissionControlService,
which coordinators can use to submit queries for admission.
This patch adds some simple configuration flags that make it possible
to have coordinators use this service to submit their queries for
admission to other coordinators. These flags are only to make this
patch testable and will be replaced when the separate admission
control daemon is introduced in IMPALA-9975.
The interface consists of the following RPCs:
- AdmitQuery: takes a TQueryExecRequest and a TQueryOptions
(serialized into sidecars), places the request on a queue to be
processed by a thread pool and then immediately returns.
- GetQueryStatus: takes a query id and returns the current admission
status, including the QuerySchedulePB if admission has completed
successfully but the query has not been released yet.
- ReleaseQueryBackends: called when individual backends complete but
the overall query is still running to release resources
incrementally. This RPC will be called at most O(log(# backends))
per query due to BackendResourceState, which batches backends to
release together.
- ReleaseQuery: called when the query has completely finished.
Releases all remaining resources.
- CancelAdmission: called if a query is cancelled before an admission
decision has been made to indicate that it should no longer be
considered for admission.
The majority of the patch consists of two classes:
- AdmissionControlClient: used to abstract whether admission is being
performed locally or remotely. In the local case, it is basically
just a wrapper around AdmissionController. In the remote case, it
handles serializing/deserializing of RPC params, polling
GetQueryStatus() until a decision has been made, etc.
- AdmissionControlService: exports the RPC interface and acts as a
wrapper around AdmissionController.
Some notable changes involved:
- AdmissionController::SubmitForAdmission() no longer blocks while a
query is queued. Instead, a new function WaitOnQueued() can be used
to monitor the admission status of a queued query.
- Adding events to the query timeline is moved out of
AdmissionController and into the AdmissionControlClient classes, so
that it always happens on the coordinator.
- When a cluster is run in the new admission control service mode,
only the impalad that is performing admission control exposes the
/admission http endpoint. Observability will be cleaned up in a
subsequent patch.
Testing:
- Modified existing admission control tests to run both with and
without the admission control service enabled, including both the
functional and stress tests. The 'num_queries' param in the stress
test is modified to only use a single value to reduce the number of
tests that are run and keep the running time reasonable.
- Ran tpch10 on a local minicluster and observed no significant
regressions.
Change-Id: I594fc593a27b24b6952e381a9bc1a9a5c6b757ae
Reviewed-on: http://gerrit.cloudera.org:8080/16412
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This reworks the status reporting so that serialized
AggregatedRuntimeProfile objects are sent from executors
to coordinators. These profiles are substantially denser
and faster to process for higher mt_dop values. The aggregation
is also done in a single step, merging the aggregated thrift
profile from the executor directly into the final aggregated
profile, instead of converting it to an unaggregated profile
first.
The changes required were:
* A new Update() method for AggregatedRuntimeProfile that
updates the profile from a serialised AggregateRuntimeProfile
for a subset of the instances. The code is generalized from the
existing InitFromThrift() code path.
* Per-fragment reports included in the status report protobuf
when --gen_experimental_profile=true.
* Logic on the coordinator that either consumes serialized
AggregatedRuntimeProfile per fragment, when
--gen_experimental_profile=true, or consumes a serialized
RuntimeProfile per finstance otherwise.
This also adds support for event sequences and time series
in the aggregated profile, so the amount of information
in the aggregated profile is now on par with the basic profile.
We also finish off support for JSON profile. The JSON profile is
more stripped down because we do not need to round-trip profiles
via JSON and it is a much less dense profile representation.
Part 3 will clean up and improve the display of the profile.
Testing:
* Add sanity tests for aggregated runtime profile.
* Add unit tests to exercise aggregation of the various counter types
* Ran core tests.
Change-Id: Ic680cbfe94c939c2a8fad9d0943034ed058c6bca
Reviewed-on: http://gerrit.cloudera.org:8080/16057
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The legacy Thrift based Impala internal service has been removed so
the backend port 22000 can be freed up.
This patch set flag be_port as a REMOVED_FLAG and all infrastructures
around it are cleaned up. StatestoreSubscriber::subscriber_id is set
as hostname + krpc_port.
Testing:
- Passed the exhaustive test.
Change-Id: Ic6909a8da449b4d25ee98037b3eb459af4850dc6
Reviewed-on: http://gerrit.cloudera.org:8080/16533
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit adds support for INSERT INTO statements against Iceberg
tables when the table is non-partitioned and the underlying file format
is Parquet.
We still use Impala's HdfsParquetTableWriter to write the data files,
though they needed some modifications to conform to the Iceberg spec,
namely:
* write Iceberg/Parquet 'field_id' for the columns
* TIMESTAMPs are encoded as INT64 micros (without time zone)
We use DmlExecState to transfer information from the table sink
operators to the coordinator, then updateCatalog() invokes the
AppendFiles API to add files atomically. DmlExecState is encoded in
protobuf, communication with the Frontend uses Thrift. Therefore to
avoid defining Iceberg DataFile multiple times they are stored in
FlatBuffers.
The commit also does some corrections on Impala type <-> Iceberg type
mapping:
* Impala TIMESTAMP is Iceberg TIMESTAMP (without time zone)
* Impala CHAR is Iceberg FIXED
Testing:
* Added INSERT tests to iceberg-insert.test
* Added negative tests to iceberg-negative.test
* I also did some manual testing with Spark. Spark is able to read
Iceberg tables written by Impala until we use TIMESTAMPs. In that
case Spark rejects the data files because it only accepts TIMESTAMPS
with time zone.
* Added concurrent INSERT tests to test_insert_stress.py
Change-Id: I5690fb6c2cc51f0033fa26caf8597c80a11bcd8e
Reviewed-on: http://gerrit.cloudera.org:8080/16545
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This issue was introduced by the patch for IMPALA-5746.
QueryState::exec_rpc_params_.coord_backend_id is set in function
QuestState::Init(), but it could be accessed by QueryExecMgr object in
QueryExecMgr::CancelQueriesForFailedCoordinators() before or during
QueryState::Init() is called, hence cause data race.
To fix it, move coord_backend_id from class ExecQueryFInstancesRequestPB
to class TQueryCtx. QueryState::query_ctx_ is a constant variable and is
set in QueryState c'tor so that QueryState::query_ctx_.coord_backend_id
is valid and will not be changed once the QuestState object is created.
Testing:
- Passed tests/custom_cluster/test_process_failures.py.
- Passed the core tests for normal build.
- Passed the core tests against a TSAN build.
Change-Id: I1c4b51e741a28b80bf3485adff8c97aabe0a3f67
Reviewed-on: http://gerrit.cloudera.org:8080/16437
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Executor registers the updating of cluster membership. When coordinators
are absence from the active cluster membership list, executer cancels
all the running fragments of the queries which are scheduled by the
inactive coordinators since the executer cannot send results back to
the inactive/failed coordinators. This makes executers quickly release
the resources allocated for those running fragments to be cancelled.
Testing:
- Added new test case TestProcessFailures::test_kill_coordinator
and ran the test case as following command:
./bin/impala-py.test tests/custom_cluster/test_process_failures.py\
::TestProcessFailures::test_kill_coordinator \
--exploration_strategy=exhaustive.
- Passed the core test.
Change-Id: I918fcc27649d5d2bbe8b6ef47fbd9810ae5f57bd
Reviewed-on: http://gerrit.cloudera.org:8080/16215
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In order to support the new admission control service, we need to be
able to return the results of an admission attempt, i.e. the query
schedule, to the coordinator.
To enable this, this patch moves all parts of the QuerySchedule class
and related classes that are required by the coordinator into a new
message QuerySchedulePB. The main admission control interface,
SubmitForAdmission(), now returns a QuerySchedulePB.
Some notable changes:
- Previously, QuerySchedule was used by Coordinator as a way to pass
around references to parts of the TExecRequest to places like
Coordinator::ExecSummary and Coordinator::BackendState. This has
been replaced with the ExecParams class, which is a container for
references to the TExecRequest and QuerySchedulePB along with
convenience functions for accessing them.
- Similarly, FragmentExecParams, which is part of QuerySchedule,
contains references to the associated TPlanFragment, owned by the
TExecRequest, which were used by the Coordinator when iterating over
the schedule to initiate the query. Since FragmentExecParamsPB can't
contain these references, they were replaced by a map between
fragment idx and TPlanFragment in ExecParams.
- In order to keep payloads reasonable for the eventual RPC interface,
AdmissionController::ReleaseQuery() and ReleaseQueryBackend() now
take a query id as a parameter instead of a QuerySchedule. To
facilitate this, AdmissionController now maintains a map from query
ids of running queries to the resources that were allocated for them
so that it can look the resources up when releasing them. This map
will be necessary when implementing the admission control service to
facilitate proper accounting of resouces in cases like coordinator
failures.
- As scheduling is currently organized, we first construct the
FragmentExecParams with the FInstanceExecParams as their children,
then we construct the BackendExecParams which get references to
their FInstanceExecParams. Since we can't send references like these
through an rpc, we now instead Swap() the FInstanceExecParamsPB
into the BackendExecParamsPB.
Testing:
- Updated related tests.
- Passed a full run of existing tests.
Change-Id: I1db64e72f84604b1d8ac24e0bdd4ad6bedd6bcd9
Reviewed-on: http://gerrit.cloudera.org:8080/15961
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this change the coordinator depended on getting the full
fragment instance profiles from executors to pull out various
things. This change removes that dependency by pulling out the
information on the executor, and including it in the status
report protobuf. This should slightly reduce the amount of work
done on the coordinator, but more importantly, makes it easier
to switch to sending aggregated profiles from executor to
coordinator, because the coordinator no longer depends on
receiving individual instance profiles.
Per-host peak memory is included directly in the status report.
Per-backend stats - where the per-backend total is needed -
are aggregated on the executor with the result included in the
status report. These counters are: BytesRead, ScanRangesComplete,
TotalBytesSent, TotalThreads{User,Sys}Time.
One subtlety to keep in mind that status reports don't include
stats for instances where the final update was sent in a previous
status report. So the executor needs to ensure that stats for
finished fragment instances are included in updates. This is
achieved by caching those values in FragmentInstanceState.
The stats used in the exec summary were previously also plucked
out of the profile on the coordinator. This change moves the work
to the executor, and includes the per-node stats in the status
report.
I did a little cleanup of the profile counter declarations,
making sure they were consistently inside the impala namespace
in the files that I touched.
Testing:
Ran core tests.
Manually checked exec summary, query profiles and backends
page for a running query.
Change-Id: Ia2aca354d803ce78a798a1a64f9f98353b813e4a
Reviewed-on: http://gerrit.cloudera.org:8080/16050
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The new admission control service will be written in protobuf, so
there are various admission control related structures currently
stored in Thrift that it would be convenient to convert to protobuf,
to minimize the amount of converting back and forth that needs to be
done.
This patch converts some portions of TExecPlanFragmentInfo to
protobuf. TExecPlanFragmentInfo is sent as a sidecar with the Exec()
rpc, so the refactored parts are now just directly included in the
ExecQueryFInstancesRequestPB.
The portions that are converted are those that are part of the
QuerySchedule, in particular the TPlanFragmentDestination,
TScanRangeParams, and TJoinBuildInput.
This patch is just a refactor and doesn't contain any functional
changes.
One notable related change is that DataSink::CreateSink() has two
parameters removed - TPlanFragmentCtx (which no longer exists) and
TPlanFragmentInstanceCtx. These variables and the new PB eqivalents
are available via the RuntimeState that was already being passed in as
another parameter and don't need to be individually passed in.
Testing:
- Passed a full run of existing tests.
- Ran the single node perf test and didn't detect any regressions.
Change-Id: I3a8e46767b257bbf677171ac2f4efb1b623ba41b
Reviewed-on: http://gerrit.cloudera.org:8080/15844
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The new admission control service will be written in protobuf, so
there are various admission control related structures currently
stored in Thrift that it would be convenient to convert to protobuf,
to minimize the amount of converting back and forth that needs to be
done.
This patch converts TBackendDescriptor to protobuf. It isn't used
directly in any rpcs - we serialize it ourselves to send to the
statestore as a string, so no rpc definitions are affected.
This patch is just a refactor and doesn't contain any functional
changes.
Testing:
- Passed a full run of existing tests.
Change-Id: Ie7d1e373d9c87887144517ff6a4c2d5996aa88b8
Reviewed-on: http://gerrit.cloudera.org:8080/15822
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.
In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.
Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.
Testing:
This patch has passed the exhaustive tests.
Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8
Reviewed-on: http://gerrit.cloudera.org:8080/13882
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/14974
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
This patch moves AuxErrorInfoPB from FragmentInstanceExecStatusPB to
StatefulStatusPB. This is necessary because if the report with the
AuxErrorInfoPB is dropped (e.g. due to backpressure at the Coordinator
or a flaky network), the next report won't contain the AuxErrorInfoPB,
and the error info will be lost. StatefulStatus solves this by detecting
any reports that may not have been received by the Coordinator, and
re-transmitting any StatefulStatuses that were not successfully
delivered.
This change also makes the setting of AuxErrorInfoPB stateful, so that
the error info only shows up in one report and is then dropped from the
RuntimeState.
Change-Id: Iabbb48dd3ab58ba7b76b1ab6979b92d0e25e72e3
Reviewed-on: http://gerrit.cloudera.org:8080/15046
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Introduces a new optional field to FragmentInstanceExecStatusPB:
AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated
with a failed fragment instance. Currently, AuxErrorInfoPB only contains
one field: RPCErrorInfoPB, which is only set if the fragment failed
because a RPC to another impalad failed. The RPCErrorInfoPB contains
the destination node of the failed RPC and the posix error code of the
failed RPC.
Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...)
uses the information in RPCErrorInfoPB (if one is set) to blacklist
the target node. While RPCErrorInfoPB::dest_node can be set to the address
of the Coordinator, the Coordinator will not blacklist itself. The
Coordinator only blacklists the node if the RPC failed with a specific
error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN).
Testing:
* Ran core tests
* Added new test to test_blacklist.py
Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c
Reviewed-on: http://gerrit.cloudera.org:8080/14677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When issuing Exec() rpcs to backends, we currently serialize the
TQueryCtx once per backend. This is inefficient as the TQueryCtx is
the same for all backends and really only needs to be serialized once.
Serializing the TQueryCtx can be expensive as it contains both the
full text of the original query and the descriptor table, which can be
quite large. In a synthetic dataset I tested with, scanning a table
with 100k partitions leads to a descriptor table size of ~20MB.
This patch serializes the TQueryCtx in the coordinator and then passes
it to each BackendState when calling Exec().
Followup work might consider if we really need all of the info in the
TQueryCtx to be distributed to all backends.
Testing:
- Passed full run of existing tests.
- Single node perf run showed no significant change.
Change-Id: I6a4dd302fd5602ec2775492a041ddd51e7d7a6c6
Reviewed-on: http://gerrit.cloudera.org:8080/14777
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The previous patch porting runtime filter from Thrift RPC to KRPC
introduces a deadlock if there are a very limited number of threads on
the Impala cluster.
Specifically, in that patch a Coordinator used a synchronous KRPC to
propagate an aggregated filter to other hosts. A deadlock would happen
if there is no thread available on the receiving side to answer that
KRPC especially the calling and receiving threads are called from the
same thread pool. One possible way to address this issue is to make
the call of propagating a runtime filter asynchronous to free the
calling thread. Before resolving this issue, we revert this patch for
now.
This reverts commit ec11c18884.
Change-Id: I32371a515fb607da396914502da8c7fb071406bc
Reviewed-on: http://gerrit.cloudera.org:8080/14780
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.
In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.
Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.
Testing:
This patch has passed the exhaustive tests.
Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Reviewed-on: http://gerrit.cloudera.org:8080/13882
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch ports the ExecQueryFInstances rpc to use KRPC. The
parameters for this call contain a huge number of Thrift structs
(eg. everything related to TPlanNode and TExpr), so to avoid
converting all of these to protobuf and the resulting effect that
would have on the FE and catalog, this patch stores most of the
parameters in a sidecar (in particular the TQueryCtx,
TPlanFragmentCtx's, and TPlanFragmentInstanceCtx's).
Testing:
- Passed a full exhaustive run on the minicluster.
Set up a ten node cluster with tpch 500:
- Ran perf tests: 3 iterations per tpch query, 4 concurrent streams,
no perf change.
- Ran the stress test for 1000 queries, passed.
Change-Id: Id3f1c6099109bd8e5361188005a7d0e892147570
Reviewed-on: http://gerrit.cloudera.org:8080/13583
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The :shutdown command is used to shutdown a remote server. The common
case is that a user specifies the impalad to shutdown by specifying a
host e.g. :shutdown('host100'). If a user has more than one impalad on a
remote host then the form :shutdown('<host>:<port>') can be used to
specify the port by which the impalad can be contacted. Prior to
IMPALA-7985 this port was the backend port, e.g.
:shutdown('host100:22000'). With IMPALA-7985 the port to use is the KRPC
port, e.g. :shutdown('host100:27000').
Shutdown is implemented by making an rpc call to the target impalad.
This changes the implementation of this call to use KRPC.
To aid the user in finding the KRPC port, the KRPC address is added to
the /backends section of the debug web page.
We attempt to detect the case where :shutdown is pointed at a thrift
port (like the backend port) and print an informative message.
Documentation of this change will be done in IMPALA-8098.
Further improvements to DoRpcWithRetry() will be done in IMPALA-8143.
For discussion of why it was chosen to implement this change in an
incompatible way, see comments in
https://issues.apache.org/jira/browse/IMPALA-7985.
TESTING
Ran all end-to-end tests.
Enhance the test for /backends in test_web_pages.py.
In test_restart_services.py add a call to the old backend port to the
test. Some expected error messages were changed in line with what KRPC
returns.
Change-Id: I4fd00ee4e638f5e71e27893162fd65501ef9e74e
Reviewed-on: http://gerrit.cloudera.org:8080/12260
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
QueryState periodically collects runtime profiles from all of its
fragment instances and sends them to the coordinator. Previously, each
time this happens, if the rpc fails, QueryState will retry twice after
a configurable timeout and then cancel the fragment instances under
the assumption that the coordinator no longer exists.
We've found in real clusters that this logic is too sensitive to
failed rpcs and can result in fragment instances being cancelled even
in cases where the coordinator is still running.
This patch makes a few improvements to this logic:
- When a report fails to send, instead of retrying the same report
quickly (after waiting report_status_retry_interval_ms), we wait the
regular reporting interval (status_report_interval_ms), regenerate
any stale portions of the report, and then retry.
- A new flag, --status_report_max_retries, is introduced, which
controls the number of failed reports that are allowed before the
query is cancelled. --report_status_retry_interval_ms is removed.
- Backoff is used for repeated failed attempts, such that for a period
between retries of 't', on try 'n' the actual timeout will be t * n.
Testing:
- Added a test which results in a large number of failed intermediate
status reports but still succeeds.
Change-Id: Ib6007013fc2c9e8eeba11b752ee58fb3038da971
Reviewed-on: http://gerrit.cloudera.org:8080/12049
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When the Coordinator needs to cancel a query (for example because a user
has hit Control-C), it does this by sending a CancelQueryFInstances
message to each fragment instance. This change switches this code to use
KRPC.
Add new protobuf definitions for the messages, and remove the old thrift
definitions. Move the server-side implementation of Cancel() from
ImpalaInternalService to ControlService. Rework the scheduler so
that the FInstanceExecParams always contains the KRPC address of the
fragment executors, this address can then be used if a query is to be
cancelled.
For now keep the KRPC calls to CancelQueryFInstances() as synchronous.
While moving the client-side code, remove the fault injection code that
was inserted with FAULT_INJECTION_SEND_RPC_EXCEPTION and
FAULT_INJECTION_RECV_RPC_EXCEPTION (triggered by running impalad with
--fault_injection_rpc_exception_type=1) as this tickles code in
client-cache.h which is now not used.
TESTING:
Ran all end-to-end tests.
No new tests as test_cancellation.py provides good coverage.
Checked in debugger that DebugAction style fault injection (triggered
from test_cancellation.py) was working correctly.
Change-Id: I625030c3f1068061aa029e6e242f016cadd84969
Reviewed-on: http://gerrit.cloudera.org:8080/12142
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Previously, each fragment instance executing on an executor will
independently report its status to the coordinator periodically.
This creates a huge amount of RPCs to the coordinator under highly
concurrent workloads, causing lock contention in the coordinator's
backend states when multiple fragment instances send them at the
same time. In addition, due to the lack of coordination between query
fragment instances, a query may end without collecting the profiles
from all fragment instances when one of them hits an error before
another fragment instance manages to finish Prepare(), leading to
missing profiles for certain fragment instances.
This change fixes the problem above by making a thread per QueryState
(started by QueryExecMgr) to be responsible for periodically reporting
the status and profiles of all fragment instances of a query running
on a backend. As part of this refactoring, each query fragment instance
will not report their errors individually. Instead, there is a cumulative
status maintained per QueryState. It's set to the error status of the first
fragment instance which hits an error or any general error (e.g. failure
to start a thread) when starting fragment instances. With this change,
the status reporting threads are also removed.
Testing done: exhaustive tests
This patch is based on a patch by Sailesh Mukil
Change-Id: I5f95e026ba05631f33f48ce32da6db39c6f421fa
Reviewed-on: http://gerrit.cloudera.org:8080/11615
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change converts ReportExecStatus() RPC from thrift
based RPC to KRPC. This is done in part of the preparation
for fixing IMPALA-2990 as we can take advantage of TCP connection
multiplexing in KRPC to avoid overwhelming the coordinator
with too many connections by reducing the number of TCP connection
to one for each executor.
This patch also introduces a new service pool for all query execution
control related RPCs in the future so that control commands from
coordinators aren't blocked by long-running DataStream services' RPCs.
To avoid unnecessary delays due to sharing the network connections
between DataStream service and Control service, this change added the
service name as part of the user credentials for the ConnectionId
so each service will use a separate connection.
The majority of this patch is mechanical conversion of some Thrift
structures used in ReportExecStatus() RPC to Protobuf. Note that the
runtime profile is still retained as a Thrift structure as Impala
clients will still fetch query profiles using Thrift RPCs. This also
avoids duplicating the serialization implementation in both Thrift
and Protobuf for the runtime profile. The Thrift runtime profiles
are serialized and sent as a sidecar in ReportExecStatus() RPC.
This patch also fixes IMPALA-7241 which may lead to duplicated
dml stats being applied. The fix is by adding a monotonically
increasing version number for fragment instances' reports. The
coordinator will ignore any report smaller than or equal to the
version in the last report.
Testing done:
1. Exhaustive build.
2. Added some targeted test cases for profile serialization failure
and RPC retries/timeout.
Change-Id: I7638583b433dcac066b87198e448743d90415ebe
Reviewed-on: http://gerrit.cloudera.org:8080/10855
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>