This patch extends blacklist functionality by adding executor node to
blacklist if a query fails caused by disk failure during spill-to-disk.
Also classifies disk error codes and defines a blacklistable error set
for non-transient disk errors. Coordinator blacklists executor only if
the executor hitted blacklistable error during spill-to-disk.
Adds a new debug action to simulate disk write error during spill-to-
disk. To use, specify in query options as:
'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
where <hostname> and <port> represent the impalad which execute the
fragment instances, <port> is the BE krpc port (default 27000).
Adds new test cases for blacklist and query-retry to cover the code
changes.
Testing:
- Passed new test cases.
- Passed exhaustive test.
- Manually simulated disk failures in scratch directories on nodes
of a cluster, verified that the nodes were blacklisted as
expected.
Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437
Reviewed-on: http://gerrit.cloudera.org:8080/16949
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for limiting the rows produced by a join node
such that runaway join queries can be prevented.
The limit is specified by a query option. Queries exceeding that limit
get terminated. The checking runs periodically, so the actual rows
produced may go somewhat over the limit.
JOIN_ROWS_PRODUCED_LIMIT is exposed as an advanced query option.
Rows produced Query profile is updated to include query wide and per
backend metrics for RowsReturned. Example from "
set JOIN_ROWS_PRODUCED_LIMIT = 10000000;
select count(*) from tpch_parquet.lineitem l1 cross join
(select * from tpch_parquet.lineitem l2 limit 5) l3;":
NESTED_LOOP_JOIN_NODE (id=2):
- InactiveTotalTime: 107.534ms
- PeakMemoryUsage: 16.00 KB (16384)
- ProbeRows: 1.02K (1024)
- ProbeTime: 0.000ns
- RowsReturned: 10.00M (10002025)
- RowsReturnedRate: 749.58 K/sec
- TotalTime: 13s337ms
Testing:
Added tests for JOIN_ROWS_PRODUCED_LIMIT
Change-Id: Idbca7e053b61b4e31b066edcfb3b0398fa859d02
Reviewed-on: http://gerrit.cloudera.org:8080/16706
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This work addresses the current limitation in admission controller by
appending the last known memory consumption statistics about the set of
queries running or waiting on a host or in a pool to the existing memory
exhaustion message. The statistics is logged in impalad.INFO when a
query is queued or queued and then timed out due to memory pressure in
the pool or on the host. The statistics can also be part of the query
profile.
The new memory consumption statistics can be either stats on host or
aggregated pool stats. The stats on host describes memory consumption
for every pool on a host. The aggregated pool stats describes the
aggregated memory consumption on all hosts for a pool. For each stats
type, information such as query Ids and memory consumption of up to top
5 queries is provided, in addition to the min, the max, the average and
the total memory consumption for the query set.
When a query request is queued due to memory exhaustion, the above
new consumption statistics is logged when the BE logging level is set
at 2.
When a query request is timed out due to memory exhaustion, the above
new consumption statistics is logged when the BE logging level is set
at 1.
Testing:
1. Added a new test TopNQueryCheck in admission-controller-test.cc to
verify that the topN query memory consumption details are reported
correctly.
2. Add two new tests in test_admission_controller.py to simulate
queries being queued and then timed out due to pool or host memory
pressure.
3. Added a new test TopN in mem-tracker-test.cc to
verify that the topN query memory consumption details are computed
correctly from a mem tracker hierarchy.
4. Ran Core tests successfully.
Change-Id: Id995a9d044082c3b8f044e1ec25bb4c64347f781
Reviewed-on: http://gerrit.cloudera.org:8080/16220
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This fix addresses the current limitation in that an ill-formatted
Parquet version string is not properly formatted before appearing
in an error message or impalad.INFO. With the fix, any such string is
converted to a hex string first. The hex string is a sequence of
four hex digit groups separated by spaces and each group is one or
two hex digits, such as "6c 65 2e a".
Testing:
Ran "core" tests successfully.
Change-Id: I281d6fa7cb2f88f04588110943e3e768678b9cf1
Reviewed-on: http://gerrit.cloudera.org:8080/16331
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
This switches null-aware anti-join (NAAJ) to use shared
join builds with mt_dop > 0. To support this, we
make all access to the join build data structures
from the probe read-only. NAAJ requires iterating
over rows from build partitions at various steps
in the algorithm and before this patch this was not
thread-safe. We avoided that problem by having a
separate builder for each join node and duplicating
the data.
The main challenge was iteration over
null_aware_partition()->build_rows() from the probe
side, because it uses an embedded iterator in the
stream so was not thread-safe (since each thread
would be trying to use the same iterator).
The solution is to extend BufferedTupleStream to
allow multiple read iterators into a pinned,
read-only, stream. Each probe thread can then
iterate over the stream independently with no
thread safety issues.
With BufferedTupleStream changes, I partially abstracted
ReadIterator more from the rest of BufferedTupleStream,
but decided not to completely refactor so that this patchset
didn't cause excessive churn. I.e. much BufferedTupleStream
code still accesses internal fields of ReadIterator.
Fix a pre-existing bug in grouping-aggregator where
Spill() hit a DCHECK because the hash table was
destroyed unnecessarily when it hit an OOM. This was
flushed out by the parameter change in test_spilling.
Testing:
Add test to buffered-tuple-stream-test for multiple readers
to BTS.
Tweaked test_spilling_naaj_no_deny_reservation to have
a smaller minimum reservation, required to keep the
test passing with the new, lower, memory requirement.
Updated a TPC-H planner test where resource requirements
slightly decreased for the NAAJ.
Ran the naaj tests in test_spilling.py with TSAN enabled,
confirmed no data races.
Ran exhaustive tests, which passed after fixing IMPALA-9611.
Ran core tests with ASAN.
Ran backend tests with TSAN.
Perf:
I ran this query that exercises EvaluateNullProbe() heavily.
select l_orderkey, l_partkey, l_suppkey, l_linenumber
from tpch30_parquet.lineitem
where l_suppkey = 4162 and l_shipmode = 'AIR'
and l_returnflag = 'A' and l_shipdate > '1993-01-01'
and if(l_orderkey > 5500000, NULL, l_orderkey) not in (
select if(o_orderkey % 2 = 0, NULL, o_orderkey + 1)
from orders
where l_orderkey = o_orderkey)
order by 1,2,3,4;
It went from ~13s to ~11s running on a single impalad with
this change, because of the inlining of CreateOutputRow() and
EvalConjuncts().
I also ran TPC-H SF 30 on Parquet with mt_dop=4, and there was
no change in performance.
Change-Id: I95ead761430b0aa59a4fb2e7848e47d1bf73c1c9
Reviewed-on: http://gerrit.cloudera.org:8080/15612
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
We don't support reading UNION columns. Queries on tables containing
UNION types will fail in planning. Error message is metadata loading
error. However, scanner may need to read an ORC file with UNION types if
the table schema doesn't map to the UNION columns. Though the UNION
values won't be read, the scanner need to resolve the file schema,
including the UNION types, correctly.
In OrcSchemaResolver::BuildSchemaPath, we create a map from ORC type ids
to Impala SchemaPath representation for all types of the file. We should
deal with UNION types as well.
This patch also include some refactor to improve code readability.
Tests:
- Add tests for table schema and file schema mismatching on all complex
types.
Change-Id: I452d27b4e281eada00b62ac58af773a3479163ec
Reviewed-on: http://gerrit.cloudera.org:8080/15103
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Implements read path for the date type in ORC scanner. The internal
representation of a date is an int32 meaning the number of days since
Unix epoch using proleptic Gregorian calendar.
Similarly to the Parquet implementation (IMPALA-7370) this
representation introduces an interoperability issue between Impala
and older versions of Hive (before 3.1). For more details see the
commit message of the mentioned Parquet implementation.
Change-Id: I672a2cdd2452a46b676e0e36942fd310f55c4956
Reviewed-on: http://gerrit.cloudera.org:8080/14982
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Hive can write timestamps that are outside Impala's valid
range (Impala: 1400-9999 Hive: 0001-9999). This change adds
validation logic to ORC reading that replaces out-of-range
timestamps with NULLs and adds a warning to the query.
The logic is very similar to the existing validation in
Parquet. Some differences:
- "time of day" is not checked separately as it doesn't make
sense with ORC's encoding
- instead of column name only column id is added to the warning
Testing:
- added a simple EE test that scans an existing ORC file
Change-Id: I8ee2ba83a54f93d37e8832e064f2c8418b503490
Reviewed-on: http://gerrit.cloudera.org:8080/14832
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch the supported year range for DATE type started with
year 0. This contradicts the ANSI SQL standard that defines the valid
DATE value range to be 0001-01-01 to 9999-12-31.
Change-Id: Iefdf1c036834763f52d44d0c39a25a1f04e41e07
Reviewed-on: http://gerrit.cloudera.org:8080/14349
Reviewed-by: Attila Jeges <attilaj@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change is a follow-up to IMPALA-7368 and adds support for DATE
type to the avro scanner.
Similarly to parquet, avro uses DATE logical type for dates. DATE
logical type annotates an INT32 that stores the number of days since
the unix epoch, 1 January 1970.
This representation introduces an avro interoperability issue between
Impala and older versions of Hive:
- Before version 3.1, Hive used Julian calendar to represent dates
up to 1582-10-05 and Gregorian calendar for dates starting with
1582-10-15. Dates between 1582-10-05 and 1582-10-15 were lost.
- Impala uses proleptic Gregorian calendar, extending the Gregorian
calendar backward to dates preceding its official introduction in
1582-10-15.
This means that pre-1582-10-15 dates written to an avro table by Hive
will be read back incorrectly by Impala.
Note that Hive 3.1 switched to proleptic Gregorian calendar too, so
for Hive 3.1+ this is no longer an issue.
Dependency changes:
- BE uses avro 1.7.4-p5 from native-toolchain.
Change-Id: I7a9d5b93a22cf3a00244037e187f8c145cacc959
Reviewed-on: http://gerrit.cloudera.org:8080/13944
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Replaces DequeRowBatchQueue with SpillableRowBatchQueue in
BufferedPlanRootSink. A few changes to BufferedPlanRootSink were
necessary for it to work with the spillable queue, however, all the
synchronization logic is the same.
SpillableRowBatchQueue is a wrapper around a BufferedTupleStream and
a ReservationManager. It takes in a TBackendResourceProfile that
specifies the max / min memory reservation the BufferedTupleStream can
use to buffer rows. The 'max_unpinned_bytes' parameter limits the max
number of bytes that can be unpinned in the BufferedTupleStream. The
limit is a 'soft' limit because calls to AddBatch may push the amount of
unpinned memory over the limit. The queue is non-blocking and not thread
safe. It provides AddBatch and GetBatch methods. Calls to AddBatch spill
if the BufferedTupleStream does not have enough reservation to fit the
entire RowBatch.
Adds two new query options: 'MAX_PINNED_RESULT_SPOOLING_MEMORY' and
'MAX_UNPINNED_RESULT_SPOOLING_MEMORY', which bound the amount of pinned
and unpinned memory that a query can use for spooling, respectively.
MAX_PINNED_RESULT_SPOOLING_MEMORY must be <=
MAX_UNPINNED_RESULT_SPOOLING_MEMORY in order to allow all the pinned
data in the BufferedTupleStream to be unpinned. This is enforced in a
new method in QueryOptions called 'ValidateQueryOptions'.
Planner Changes:
PlanRootSink.java now computes a full ResourceProfile if result spooling
is enabled. The min mem reservation is bounded by the size of the read and
write pages used by the BufferedTupleStream. The max mem reservation is
bounded by 'MAX_PINNED_RESULT_SPOOLING_MEMORY'. The mem estimate is
computed by estimating the size of the result set using stats.
BufferedTupleStream Re-Factoring:
For the most part, using a BufferedTupleStream outside an ExecNode works
properly. However, some changes were necessary:
* The message for the MAX_ROW_SIZE error is ExecNode specific. In order to
fix this, this patch introduces the concept of an ExecNode 'label' which
is a more generic version of an ExecNode 'id'.
* The definition of TBackendResourceProfile lived in PlanNodes.thrift,
it was moved to its own file so it can be used by DataSinks.thrift.
* Modified BufferedTupleStream so it internally tracks how many bytes
are unpinned (necessary for 'MAX_UNPINNED_RESULT_SPOOLING_MEMORY').
Metrics:
* Added a few of the metrics mentioned in IMPALA-8825 to
BufferedPlanRootSink. Specifically, added timers to track how much time
is spent waiting in the BufferedPlanRootSink 'Send' and 'GetNext'
methods.
* The BufferedTupleStream in the SpillableRowBatchQueue exposes several
BufferPool metrics such as number of reserved and unpinned bytes.
Bug Fixes:
* Fixed a bug in BufferedPlanRootSink where the MemPool used by the
expression evaluators was not being cleared incrementally.
* Fixed a bug where the inactive timer was not being properly updated in
BufferedPlanRootSink.
* Fixed a bug where RowBatch memory was not freed if
BufferedPlanRootSink::GetNext terminated early because it could not
handle requests where num_results < BATCH_SIZE.
Testing:
* Added new tests to test_result_spooling.py.
* Updated errors thrown in spilling-large-rows.test.
* Ran exhaustive tests.
Change-Id: I10f9e72374cdf9501c0e5e2c5b39c13688ae65a9
Reviewed-on: http://gerrit.cloudera.org:8080/14039
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Various BI tools generate and run SQL. When used incorrectly or
misconfigured, the tools can generate extremely large SQLs.
Some of these SQL statements reach 10s of megabytes. Large SQL
statements impose costs throughout execution, including
statement rewrite logic in the frontend and codegen in the
backend. The resource usage of these statements can impact
the stability of the system or the ability to run other SQL
statements.
This implements two new query options that provide controls
to reject large SQL statements.
- The first, MAX_STATEMENT_LENGTH_BYTES is a cap on the
total size of the SQL statement (in bytes). It is
applied before any parsing or analysis. It uses a
default value of 16MB.
- The second, STATEMENT_EXPRESSION_LIMIT, is a limit on
the total number of expressions in a statement or any
views that it references. The limit is applied upon the
first round of analysis, but it is not reapplied when
statement rewrite rules are applied. Certain expressions
such as literals in IN lists or VALUES clauses are not
analyzed and do not count towards the limit. It uses
a default value of 250,000.
The two are complementary. Since enforcing the statement
expression limit requires parsing and analyzing the
statement, the MAX_STATEMENT_LENGTH_BYTES sets an upper
bound on the size of statement that needs to be parsed
and analyzed. Testing confirms that even statements
approaching 16MB get through the first round of analysis
within a few seconds and then are rejected.
This also changes the logging in tests/common/impala_connection.py
to limit the total SQL size that it will print to 128KB. This is
prevents the JUnitXML (which includes this logging) from being too
large. Existing tests do not run SQL larger than about 80KB, so
this only applies to tests added in this change that run multi-MB
SQLs to verify limits.
Testing:
- This adds frontend tests that verify the low level
semantics about how expressions are counted and verifies
that the expression limits are enforced.
- This adds end-to-end tests that verify both the
MAX_STATEMENT_LENGTH_BYTES and STATEMENT_EXPRESSION_LIMIT
at their defaults values.
- There is also an end-to-end test that runs in exhaustive
mode that runs a SQL with close to 250,000 expressions.
Change-Id: I5675fb4a08c1dc51ae5bcf467cbb969cc064602c
Reviewed-on: http://gerrit.cloudera.org:8080/14012
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This extends the --scratch_dirs syntax to support specifying a max
capacity per directory, similarly to the --data_cache confirmation.
The capacity is delimited from the directory name with ":" and
uses the usual syntax for specifying memory. The following are
valid arguments:
* --scratch_dirs=/dir1,/dir2 (no limits)
* --scratch_dirs=/dir1,/dir2:25G (only a limit on /dir2)
* --scratch_dirs=/dir1:5MB,/dir2 (only a limit on /dir)
* --scratch_dirs=/dir1:-1,/dir2:0 (alternative ways of
expressing no limit)
The usage is tracked with a metric per directory. Allocations
from that directory start to fail when the limit is exceeded.
These metrics are exposed as
tmp-file-mgr.scratch-space-bytes-used.dir-0,
tmp-file-mgr.scratch-space-bytes-used.dir-1, etc.
Also add support for parsing terabyte specifiers to a utility
function that is used for parsing many configurations.
Testing:
Added a unit test to exercise TmpFileMgr.
Manually ran a spilling query on an impalad with multiple scratch dirs
configured with different limits. Confirmed via metrics that the
capacities were enforced.
Change-Id: I696146a65dbb97f1ba200ae472358ae2db6eb441
Reviewed-on: http://gerrit.cloudera.org:8080/13986
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
A new enum value LZ4_BLOCKED was added to the THdfsCompression enum, to
distinguish it from the existing LZ4 codec. LZ4_BLOCKED codec represents
the block compression scheme used by Hadoop. Its similar to
SNAPPY_BLOCKED as far as the block format is concerned, with the only
difference being the codec used for compression and decompression.
Added Lz4BlockCompressor and Lz4BlockDecompressor classes for
compressing and decompressing parquet data using Hadoop's
lz4 block compression scheme.
The Lz4BlockCompressor treats the input
as a single block and generates a compressed block with following layout
<4 byte big endian uncompressed size>
<4 byte big endian compressed size>
<lz4 compressed block>
The hdfs parquet table writer should call the Lz4BlockCompressor
using the ideal input size (unit of compression in parquet is a page),
and so the Lz4BlockCompressor does not further break down the input
into smaller blocks.
The Lz4BlockDecompressor on the other hand should be compatible with
blocks written by Impala and other engines in Hadoop ecosystem. It can
decompress compressed data in following format
<4 byte big endian uncompressed size>
<4 byte big endian compressed size>
<lz4 compressed block>
...
<4 byte big endian compressed size>
<lz4 compressed block>
...
<repeated untill uncompressed size from outer block is consumed>
Externally users can now set the lz4 codec for parquet using:
set COMPRESSION_CODEC=lz4
This gets translated into LZ4_BLOCKED codec for the
HdfsParquetTableWriter. Similarly, when reading lz4 compressed parquet
data, the LZ4_BLOCKED codec is used.
Testing:
- Added unit tests for LZ4_BLOCKED in decompress-test.cc
- Added unit tests for Hadoop compatibility in decompress-test.cc,
basically being able to decompress an outer block with multiple inner
blocks (the Lz4BlockDecompressor description above)
- Added interoperability tests for Hive and Impala for all parquet
codecs. New test added to
tests/custom_cluster/test_hive_parquet_codec_interop.py
Change-Id: Ia6850a39ef3f1e0e7ba48e08eef1d4f7cbb74d0c
Reviewed-on: http://gerrit.cloudera.org:8080/13582
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Makefile was updated to include zstd in the ${IMPALA_HOME}/toolchain
directory. Other changes were made to make zstd headers and libs
accessible.
Class ZstandardCompressor/ZstandardDecompressor was added to provide
interfaces for calling ZSTD_compress/ZSTD_decompress functions. Zstd
supports different compression levels (clevel) from 1 to
ZSTD_maxCLevel(). Zstd also supports -ive clevels, but since the -ive
values represents uncompressed data they won't be supported. The default
clevel is ZSTD_CLEVEL_DEFAULT.
HdfsParquetTableWriter was updated to support ZSTD codec. The
new codecs can be set using existing query option as follows:
set COMPRESSION_CODEC=ZSTD:<clevel>;
set COMPRESSION_CODEC=ZSTD; // uses ZSTD_CLEVEL_DEFAULT
Testing:
- Added unit test in DecompressorTest class with ZSTD_CLEVEL_DEFAULT
clevel and a random clevel. The test unit decompresses an input
compressed data and validates the result. It also tests for
expected behavior when passing an over/under sized buffer for
decompressing.
- Added unit tests for valid/invalid values for COMPRESSION_CODEC.
- Added e2e test in test_insert_parquet.py which tests writing/read-
ing (null/non-null) data into/from a table (w different data type
columns) using multiple codecs. Other existing e2e tests were
updated to also use parquet/zstd table format.
- Manual interoperability tests were run between Impala and Hive.
Change-Id: Id2c0e26e6f7fb2dc4024309d733983ba5197beb7
Reviewed-on: http://gerrit.cloudera.org:8080/13507
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds an additional hiveserver2 endpoint for clients to
connect to that uses HTTP. The endpoint can be disabled by setting
--hs2_http_port=0. HTTP(S) also works when external TLS is
enabled using --ssl_server_certificate.
Thrift's http transport is modified to support BASIC authentication
via ldap. For convenience of developing and reviewing, this patch
is based on another that copied THttpServer and THttpTransport into
Impala's codebase. Kerberos authentication is not supported, so the
http endpoint is turned off if Kerberos is enabled and LDAP isn't.
TODO
=====
- Fuzz test the http endpoint
- Add tests for LDAP + HTTPS
Testing
=======
- Parameterized JdbcTest and LdapJdbcTest to work for HS2 + HTTP mode
- Added LdapHS2Test, which directly calls into the Hiveserver2
interface using a thrift http client.
Manual testing with Beeline client (from Apache Hive), which has
builtin support to connect to HTTP(S) based HS2 compatible endpoints.
Example
========
-- HTTP mode:
> start-impala-cluster.py
> JDBC_URL="jdbc:hive2://localhost:<port>/default;transportMode=http"
> beeline -u "$JDBC_URL"
-- HTTPS mode:
> cd $IMPALA_HOME
> SSL_ARGS="--ssl_client_ca_certificate=./be/src/testutil/server-cert.pem \
--ssl_server_certificate=./be/src/testutil/server-cert.pem \
--ssl_private_key=./be/src/testutil/server-key.pem --hostname=localhost"
> start-impala-cluster.py --impalad_args="$SSL_ARGS" \
--catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
- Create a local trust store using 'keytool' and import the certificate
from server-cert.pem (./clientkeystore in the example).
> JDBC_URL="jdbc:hive2://localhost:<port>/default;ssl=true;sslTrustStore= \
./clientkeystore;trustStorePassword=password;transportMode=http"
> beeline -u "$JDBC_URL"
-- BASIC Auth with LDAP:
> LDAP_ARGS="--enable_ldap_auth --ldap_uri='ldap://...' \
--ldap_bind_pattern='...' --ldap_passwords_in_clear_ok"
> start-impala-cluster.py --impalad_args="$LDAP_ARGS"
> JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
...;transportMode=http"
> beeline -u "$JDBC_URL"
-- HTTPS mode with LDAP:
> start-impala-cluster.py --impalad_args="$LDAP_ARGS $SSL_ARGS" \
--catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
> JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
...;ssl=true;sslTrustStore=./clientkeystore;trustStorePassword=\
password;transportMode=http"
> beeline -u "$JDBC_URL"
Change-Id: Ic5569ac62ef3af2868b5d0581f5029dac736b2ff
Reviewed-on: http://gerrit.cloudera.org:8080/13299
Reviewed-by: Thomas Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Currently, when a client connection is closed, we always close any
session started over that connection. This is a requirement for
beeswax, which always ties sessions to connections, but it is not
required for hiveserver2, which allows sessions to be used across
connections with a session token.
This patch changes this behavior so that hiveserver2 sessions are no
longer closed when the corresponding connection is closed.
One downside of this change is that clients may inadvertently leave
sessions open indefinitely if they close their connection without
calling CloseSession(), which can waste space on the coordinator.
We already have a flag --idle_session_timeout, but this flag is off
by default and sessions that hit this timeout are expired but not
fully closed.
Rather than changing the default idle session behavior, which could
affect existing users, this patch mitigates this issue by adding a
new flag: --disconnected_session_timeout which is set to 1 hour by
default. When a session has had no open connections for longer than
this time, it will be closed and any associated queries will be
unregistered.
Testing:
- Added e2e tests.
Change-Id: Ia4555cd9b73db5b4dde92cd4fac4f9bfa3664d78
Reviewed-on: http://gerrit.cloudera.org:8080/13306
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change is a follow-up to IMPALA-7368 and adds support for DATE
type to the parquet scanner/writer. CREATE TABLE LIKE PARQUET
statements associated with data files that contain dates are also
supported.
Parquet uses DATE logical type for dates. DATE logical type annotates
an INT32 that stores the number of days from the Unix epoch, 1 January
1970.
This representation introduces a parquet interoperability issue
between Impala and older versions of Hive:
- Before version 3.1, Hive used Julian calendar to represent dates
up to 1582-10-05 and Gregorian calendar for dates starting with
1582-10-15. Dates between 1582-10-05 and 1582-10-15 were lost.
- Impala uses proleptic Gregorian calendar, extending the Gregorian
calendar backward to dates preceding its official introduction in
1582-10-15.
This means that pre-1582-10-15 dates written to a parquet table by
Hive will be read back incorrectly by Impala and vice versa.
Note that Hive 3.1 switched to proleptic Gregorian calendar too, so
for Hive 3.1+ this is no longer an issue.
Change-Id: I67da03754531660bc8de3b6935580d46deae1814
Reviewed-on: http://gerrit.cloudera.org:8080/13189
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The coordinator currently waits indefinitely if it does not receive a
status report from a backend. This could cause a query to hang
indefinitely in certain situations, for example if the backend decides
to cancel itself as a result of failed status report rpcs.
This patch adds a thread to ImpalaServer which periodically iterates
over all queries for which that server is the coordinator and cancels
any that haven't had a report from a backend in a certain amount of
time.
This patch adds two flags:
--status_report_max_retry_s: the maximum number of seconds a backend
will attempt to send status reports before giving up. This is used
in place of --status_report_max_retries which is now deprecated.
--status_report_cancellation_padding: the coordinator will wait
--status_report_max_retry_s *
(1 + --status_report_cancellation_padding / 100)
before concluding a backend is not responding and cancelling the
query.
Testing:
- Added a functional test that runs a query that is cancelled through
the new mechanism.
- Passed a full set of exhaustive tests.
Ran tests on a 10 node cluster loaded with tpch 500:
- Ran the stress test for 1000 queries with the debug actions:
'REPORT_EXEC_STATUS_DELAY:JITTER@1000'
Prior to this patch, this setup results in hanging queries. With
this patch, no hangs were observed.
- Ran perf tests with 4 concurrent streams, 3 iterations per query.
Found no change in performance.
Change-Id: I196c8c6a5633b1960e2c3a3884777be9b3824987
Reviewed-on: http://gerrit.cloudera.org:8080/12299
Reviewed-by: Thomas Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Avoided rewrite if the resulting string literal exceeds a defined limit.
Testing:
Added three statements in testFoldConstantsRule() to verify that the
expression rewrite is accepted only when the size of the rewritten
expression is below a specified threshold.
Change-Id: I8b078113ccc1aa49b0cea0c86dff2e02e1dd0e23
Reviewed-on: http://gerrit.cloudera.org:8080/12814
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
I recently ran into some queries that failed like so:
WARNINGS: Disk I/O error: Could not open file: /data/...: Error(5): Input/output error
These warnings were in the profile, but I had to cross-reference impalad
logs to figure out which machine had the broken disk.
In this commit, I've sprinkled GetBackendString() to include it.
Change-Id: Ib977d2c0983ef81ab1338de090239ed57f3efde2
Reviewed-on: http://gerrit.cloudera.org:8080/12402
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch limits the number of rows produced by a query by
tracking it at the PlanRootSink level. When the
NUM_ROWS_PRODUCED_LIMIT is set, it cancels a query when its
execution produces more rows than the specified limit. This limit
only applies when the results are returned to a client, e.g. for a
SELECT query, but not an INSERT query.
Testing:
Added tests to query-resource-limits.test to verify that the rows
produced limit is honored.
Manually tested on various combinations of tables, fileformats
and ROWS_RETURNED_LIMIT values.
Change-Id: I7b22dbe130a368f4be1f3662a559eb9aae7f0c1d
Reviewed-on: http://gerrit.cloudera.org:8080/12328
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
There were two races:
* queries were terminated because of an impalad being detected
as failed by the statestore even if the query had finished
executing on that impalad.
* NUM_FRAGMENTS_IN_FLIGHT was used to detect the backend being
idle, but it was decremented before the final status report
was sent.
The fixes are:
* keep track of the backends that triggered the potential cancellation,
and only proceed with the cancellation if the coordinator has fragments
still executing on the backend.
* add a new metric that keeps track of the number of executing queries,
which isn't decremented until the final status report is sent.
Also do some cleanup/improvements in this code:
* use proper error codes for some errors
* more overloads for Status::Expected()
* also add a metric for the total number of queries executed on the
backend
Testing:
Add a new version of test_shutdown_executor with delays that
trigger both races. This test only runs in exhaustive to avoid
adding ~20s to core build time.
Ran exhaustive tests.
Looped test_restart_services overnight.
Change-Id: I7c1a80304cb6695d228aca8314e2231727ab1998
Reviewed-on: http://gerrit.cloudera.org:8080/12082
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Adds additional context about how much scratch was allocated
by the query and the impalad in total. We sometimes see scratch
allocation failures because a query was spilling heavily and
ate up all the disk. In this case, the high values in the
error should provide an additional clue that the volume
of spilling is the problem (rather than disks being full
for other reasons).
Example error after deleting /tmp/impala-scratch:
[localhost:21000] default> set mem_limit=150m; select distinct * from tpch_parquet.lineitem limit 5;
WARNINGS: Could not create files in any configured scratch directories (--scratch_dirs=/tmp/impala-scratch) on backend 'tarmstrong-box:22000'. 2.00 MB of scratch is currently in use by this Impala Daemon (2.00 MB by this query). See logs for previous errors that may have prevented creating or writing scratch files.
Disk I/O error: open() failed for /tmp/impala-scratch/7d473ea7aef26431:c9105f7900000000_3120108e-475b-4616-9825-8bbdb1dc9cc2. The given path doesn't exist. errno=2
Change-Id: Icbedd586c57ec02e784143927e82b74455f98dc8
Reviewed-on: http://gerrit.cloudera.org:8080/12088
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This is part 1 of a push to add timeouts for all HDFS operations.
It adds timeouts for opening an HDFS file handle.
It introduces a new SynchronousThreadPool, which executes
an operation in a thread pool and waits up to a specified
timeout for the operation to complete. This type of thread
pool can accept any subclass of SynchronousWorkItem, and
a single thread pool can process different types of work
items. It is tested by a new test case in thread-pool-test.
This also introduces a new HdfsMonitor which implements
timeouts for HDFS operations, currently limited to
hdfsOpenFile(). This is implemented using a SynchronousThreadPool.
The timeout for hdfs operations is specified by
hdfs_operation_timeout_sec, which defaults to 5 minutes.
Testing:
1. Added a test to thread-pool-test for the new
SynchronousThreadPool.
2. Core tests
3. Added a custom cluster test that does "kill -STOP"
for the NameNode and verifies that a subsequent
hdfsOpenFile operation times out.
Change-Id: Ia14403ca5f3f19c6d5f61b9ab2306b0ad3267454
Reviewed-on: http://gerrit.cloudera.org:8080/11874
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Move parquet classes into exec/parquet.
Move CollectionColumnReader and ParquetLevelDecoder into separate files.
Remove unnecessary 'encoding_' field from ParquetLevelDecoder.
Switch BOOLEAN decoding to use composition instead of inheritance. This
lets the boolean decoding use the faster batched implementations in
ScalarColumnReader and avoids some confusing aspects of the class
hierarchy, like the ReadValueBatch() implementation on the base class
that was shared between BoolColumnReader and CollectionColumnReader.
Improve compile times by instantiating BitPacking templates in a
separate file (this looks to give a 30s+ speedup for
compiling parquet-column-readers.cc).
Testing:
Ran exhaustive tests.
Change-Id: I0efd5c50b781fe9e3c022b33c66c06cfb529c0b8
Reviewed-on: http://gerrit.cloudera.org:8080/11949
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this fix Impala did not check whether a timestamp's time part
is out of the valid [0, 24 hour) range when reading Parquet files,
so these timestamps were memcopied as they were to slots, leading to
results like:
1970-01-01 -00:00:00.000000001
1970-01-01 24:00:00
Different parts of Impala treat these timestamp differently:
- string conversion leads to invalid representation that cannot be
converted back to timestamp
- timezone conversions handle the overflowing time part and give
a valid timestamp result (at least since CCTZ, I did not check
older versions of Impala)
- Parquet writing inserts these timestamp as they are, so the
resulting Parquet file will also contain corrupt timestamps
The fix adds a check that converts these corrupt timestamps to NULL,
similarly to the handling of timestamp outside the [1400..10000)
range. A new error code is added for this case. If both the date
and the time part is corrupt, then error about corrupt time is
returned.
Testing:
- added a new scanner test that reads a corrupted Parquet file
with edge values
Change-Id: Ibc0ae651b6a0a028c61a15fd069ef9e904231058
Reviewed-on: http://gerrit.cloudera.org:8080/11521
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This is the same patch except with fixes for the test failures
on EC and S3 noted in the JIRA.
This allows graceful shutdown of executors and partially graceful
shutdown of coordinators (new operations fail, old operations can
continue).
Details:
* In order to allow future admin commands, this is implemented with
function-like syntax and does not add any reserved words.
* ALL privilege is required on the server
* The coordinator impalad that the client is connected to can be shut
down directly with ":shutdown()".
* Remote shutdown of another impalad is supported, e.g. with
":shutdown('hostname')", so that non-coordinators can be shut down
and for the convenience of the client, which does not have to
connect to the specific impalad. There is no assumption that the
other impalad is registered in the statestore; just that the
coordinator can connect to the other daemon's thrift endpoint.
This simplifies things and allows shutdown in various important
cases, e.g. statestore down.
* The shutdown time limit can be overridden to force a quicker or
slower shutdown by specifying a deadline in seconds after the
statement is executed.
* If shutting down, a banner is shown on the root debug page.
Workflow:
1. (if a coordinator) clients are prevented from submitting
queries to this coordinator via some out-of-band mechanism,
e.g. load balancer
2. the shutdown process is started via ":shutdown()"
3. a bit is set in the statestore and propagated to coordinators,
which stop scheduling fragment instances on this daemon
(if an executor).
4. the query startup grace period (which is ideally set to the AC
queueing delay plus some additional leeway) expires
5. once the daemon is quiesced (i.e. no fragments, no registered
queries), it shuts itself down.
6. If the daemon does not successfully quiesce (e.g. rogue clients,
long-running queries), after a longer timeout (counted from the start
of the shutdown process) it will shut down anyway.
What this does:
* Executors can be shut down without causing a service-wide outage
* Shutting down an executor will not disrupt any short-running queries
and will wait for long-running queries up to a threshold.
* Coordinators can be shut down without query failures only if
there is an out-of-band mechanism to prevent submission of more
queries to the shut down coordinator. If queries are submitted to
a coordinator after shutdown has started, they will fail.
* Long running queries or other issues (e.g. stuck fragments) will
slow down but not prevent eventual shutdown.
Limitations:
* The startup grace period needs to be configured to be greater than
the latency of statestore updates + scheduling + admission +
coordinator startup. Otherwise a coordinator may send a
fragment instance to the shutting down impalad. (We could
automate this configuration as a follow-on)
* The startup grace period means a minimum latency for shutdown,
even if the cluster is idle.
* We depend on the statestore detecting the process going down
if queries are still running on that backend when the timeout
expires. This may still be subject to existing problems,
e.g. IMPALA-2990.
Tests:
* Added parser, analysis and authorization tests.
* End-to-end test of shutting down impalads.
* End-to-end test of shutting down then restarting an executor while
queries are running.
* End-to-end test of shutting down a coordinator
- New queries cannot be started on coord, existing queries continue to run
- Exercises various Beeswax and HS2 operations.
Change-Id: I8f3679ef442745a60a0ab97c4e9eac437aef9463
Reviewed-on: http://gerrit.cloudera.org:8080/11484
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
I started by converting scan and spill-to-disk because the
cancellation there is always meant to be internal to the scan and
spill-to-disk subsystems.
I updated all places that checked for TErrorCode::CANCELLED to treat
CANCELLED_INTERNALLY the same.
This is to aid triage and debugging of bugs like IMPALA-7418
where an "internal" cancellation leaks out into the query state.
This will make it easier to determine if an internal cancellation
somehow "leaked" out.
Testing:
Ran exhaustive tests.
Change-Id: If25d5b539d68981359e4d881cae7b08728ba2999
Reviewed-on: http://gerrit.cloudera.org:8080/11464
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This allows graceful shutdown of executors and partially graceful
shutdown of coordinators (new operations fail, old operations can
continue).
Details:
* In order to allow future admin commands, this is implemented with
function-like syntax and does not add any reserved words.
* ALL privilege is required on the server
* The coordinator impalad that the client is connected to can be shut
down directly with ":shutdown()".
* Remote shutdown of another impalad is supported, e.g. with
":shutdown('hostname')", so that non-coordinators can be shut down
and for the convenience of the client, which does not have to
connect to the specific impalad. There is no assumption that the
other impalad is registered in the statestore; just that the
coordinator can connect to the other daemon's thrift endpoint.
This simplifies things and allows shutdown in various important
cases, e.g. statestore down.
* The shutdown time limit can be overridden to force a quicker or
slower shutdown by specifying a deadline in seconds after the
statement is executed.
* If shutting down, a banner is shown on the root debug page.
Workflow:
1. (if a coordinator) clients are prevented from submitting
queries to this coordinator via some out-of-band mechanism,
e.g. load balancer
2. the shutdown process is started via ":shutdown()"
3. a bit is set in the statestore and propagated to coordinators,
which stop scheduling fragment instances on this daemon
(if an executor).
4. the query startup grace period (which is ideally set to the AC
queueing delay plus some additional leeway) expires
5. once the daemon is quiesced (i.e. no fragments, no registered
queries), it shuts itself down.
6. If the daemon does not successfully quiesce (e.g. rogue clients,
long-running queries), after a longer timeout (counted from the start
of the shutdown process) it will shut down anyway.
What this does:
* Executors can be shut down without causing a service-wide outage
* Shutting down an executor will not disrupt any short-running queries
and will wait for long-running queries up to a threshold.
* Coordinators can be shut down without query failures only if
there is an out-of-band mechanism to prevent submission of more
queries to the shut down coordinator. If queries are submitted to
a coordinator after shutdown has started, they will fail.
* Long running queries or other issues (e.g. stuck fragments) will
slow down but not prevent eventual shutdown.
Limitations:
* The startup grace period needs to be configured to be greater than
the latency of statestore updates + scheduling + admission +
coordinator startup. Otherwise a coordinator may send a
fragment instance to the shutting down impalad. (We could
automate this configuration as a follow-on)
* The startup grace period means a minimum latency for shutdown,
even if the cluster is idle.
* We depend on the statestore detecting the process going down
if queries are still running on that backend when the timeout
expires. This may still be subject to existing problems,
e.g. IMPALA-2990.
Tests:
* Added parser, analysis and authorization tests.
* End-to-end test of shutting down impalads.
* End-to-end test of shutting down then restarting an executor while
queries are running.
* End-to-end test of shutting down a coordinator
- New queries cannot be started on coord, existing queries continue to run
- Exercises various Beeswax and HS2 operations.
Change-Id: I4d5606ccfec84db4482c1e7f0f198103aad141a0
Reviewed-on: http://gerrit.cloudera.org:8080/10744
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The error text with AES-GCM enabled looks like:
Error reading 44 bytes from scratch file
'/tmp/impala-scratch/0:0_d43635d0-8f55-485e-8899-907af289ac86' on
backend tarmstrong-box:22000 at offset 0: verification of read data
failed.
OpenSSL error in EVP_DecryptFinal:
139634997483216:error:0607C083:digital envelope
routines:EVP_CIPHER_CTX_ctrl:no cipher set:evp_enc.c:610:
139634997483216:error:0607C083:digital envelope
routines:EVP_CIPHER_CTX_ctrl:no cipher set:evp_enc.c:610:
139634997483216:error:0607C083:digital envelope
routines:EVP_CIPHER_CTX_ctrl:no cipher set:evp_enc.c:610:
139634997483216:error:0607C083:digital envelope
routines:EVP_CIPHER_CTX_ctrl:no cipher set:evp_enc.c:610:
Testing:
Added a backend test to exercise the code path and verify the error
code.
Change-Id: I0652d6cdfbb4e543dd0ca46b7cc65edc4e41a2d8
Reviewed-on: http://gerrit.cloudera.org:8080/10204
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When an impalad is in executor-only mode, it receives no
catalog updates. As a result, lib-cache entries are never
refreshed. A consequence is that udf queries can return
incorrect results or may not run due to resolution issues.
Both cases are caused by the executor using a stale copy
of the lib file. For incorrect results, an old version of
the method may be used. Resolution issues can come up if
a method is added to a lib file.
The solution in this change is to capture the coordinator's
view of the lib file's last modified time when planning.
This last modified time is then shipped with the plan to
executors. Executors must then use both the lib file path
and the last modified time as a key for the lib-cache.
If the coordinator's last modified time is more recent than
the executor's lib-cache entry, then the entry is refreshed.
Brief discussion of alternatives:
- lib-cache always checks last modified time
+ easy/local change to lib-cache
- adds an fs lookup always. rejected for this reason
- keep the last modified time in the catalog
- bound on staleness is too loose. consider the case where
fn's f1, f2, f3 are created with last modified times of
t1, t2, t3. treat the fn's last modified time as a low-watermark;
if the cache entry has a more recent time, use it. Such a scheme
would allow the version at t2 to persist. An old fn may keep the
state from converging to the latest. This could end up with strange
cases where different versions of the lib are used across executors
for a single query.
In contrast, the change in this path relies on the statestore to
push versions forward at all coordinators, so will push all
versions at all caches forward as well.
Testing:
- added an e2e custom cluster test
Change-Id: Icf740ea8c6a47e671427d30b4d139cb8507b7ff6
Reviewed-on: http://gerrit.cloudera.org:8080/9697
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins
The serialization format of a row batch relies on
tuple offsets. In its current form, the tuple offsets
are int32s. This means that it is impossible to generate
a valid serialization of a row batch that is larger
than INT_MAX.
This changes RowBatch::SerializeInternal() to return an
error if trying to serialize a row batch larger than INT_MAX.
This prevents a DCHECK on debug builds when creating a row
larger than 2GB.
This also changes the compression logic in RowBatch::Serialize()
to avoid a DCHECK if LZ4 will not be able to compress the
row batch. Instead, it returns an error.
This modifies row-batch-serialize-test to verify behavior at
each of the limits. Specifically:
RowBatches up to size LZ4_MAX_INPUT_SIZE succeed.
RowBatches with size range [LZ4_MAX_INPUT_SIZE+1, INT_MAX]
fail on LZ4 compression.
RowBatches with size > INT_MAX fail with RowBatch too large.
Change-Id: I3b022acdf3bc93912d6d98829b30e44b65890d91
Reviewed-on: http://gerrit.cloudera.org:8080/9367
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins
The encoding was added in an early version of the Parquet
spec and deprecated even in the Parquet 1.0 spec.
Parquet-MR switched to generating RLE at the same time as
the spec changed in mid-2013. Impala always wrote RLE:
see commit 6e293090e6.
The Impala implementation of BIT_PACKED was never correct
because it implemented little endian bit unpacking instead of
the big endian unpacking required by the spec for levels.
Testing:
Updated tests to reflect expected behaviour for supported
and unsupported def level encodings.
Cherry-picks: not for 2.x.
Change-Id: I12c75b7f162dd7de8e26cf31be142b692e3624ae
Reviewed-on: http://gerrit.cloudera.org:8080/9241
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
This patch adds the following details to the error message encountered
on failure to get minimum memory reservation:
- which ReservationTracker hit its limit
- top 5 admitted queries that are consuming the most memory under the
ReservationTracker that hit its limit
Testing:
- added tests to reservation-tracker-test.cc that verify the error
message returned for different cases.
- tested "initial reservation failed" condition manually to verify
the error message returned.
Change-Id: Ic4675fe923b33fdc4ddefd1872e6d6b803993d74
Reviewed-on: http://gerrit.cloudera.org:8080/8781
Reviewed-by: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Tested-by: Impala Public Jenkins
There is not much benefit in printing the stack trace when
Thrift RPC hits an error. As long as we print enough info about
the error and identify the caller, that should be sufficient.
In fact, it has been observed that stack crawl caused unnecessary
CPU spikes in the past. This change replaces Status() with
Status::Expected() in DoRpc(), RetryRpc(), RetryRpcRecv() and
Coordinator::BackendState::Exec() to avoid unnecessary stack crawls.
Testing done: private core build. Verified error strings with
test_rpc_timeout.py and test_rpc_exception.py
Change-Id: Ia83294494442ef21f7934f92ba9112e80d81fa58
Reviewed-on: http://gerrit.cloudera.org:8080/8788
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins
Previously, we implicitly create a local string object created from
the char* in argv[0] when calling InitAuth(). This string object goes
out of scope once InitAuth() returns but the pointer of this local
string's buffer is passed to the Sasl library which may reference
it after the local string has been deleted, leading to use-after-free.
This bug is exposed by recent change to enable Kerberos with KRPC as
we now always initialize Sasl even if Kerberos is not enabled.
This change fixes the problem above by making a copy of 'appname'
passed to InitAuth(). Also, the new code enforces that multiple
calls to InitAuth() must use the same 'appname' or it will fail.
Testing done: Verified rpc-mgr-test and thrift-server-test no longer
fail in ASAN build.
Change-Id: I1f29c2396df114264dfc23726b8ba778f50e12e9
Reviewed-on: http://gerrit.cloudera.org:8080/8777
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins
This change augments the message of TErrorCode::DATASTREAM_SENDER_TIMEOUT
to include the source address when KRPC is enabled. The source address is
not readily available in Thrift. The new message includes the destination
plan node id in case there are multiple exchange nodes in a fragment instance.
Testing done: Confirmed the error message by testing with following options:
"--stress_datastream_recvr_delay_ms=90000 datastream_sender_timeout_ms=1000"
Change-Id: Ie3e83773fe6feda057296e7d5544690aa9271fa0
Reviewed-on: http://gerrit.cloudera.org:8080/8751
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins
When Lz4Compressor::MaxOutputLen returns 0, it
means that the input is too large to compress.
When invoked Lz4Compressor::ProcessBlock with
an input too large, it silently produced a bogus
result. This bogus result even decompresses
successfully, but not to the data that was
originally compressed.
After this commit, Lz4Compressor::ProcessBlock
will return error if it cannot compress the
input.
I also added a comment on Codec::MaxOutputLen()
that return value 0 means that the input is too
large.
I added some checks after the invocations of
MaxOutputLen() where the compressor can be
a Lz4Compressor.
I added an automated test case to
be/src/util/decompress-test.cc.
Change-Id: Ifb0bc4ed98c5d7b628b791aa90ead36347b9fbb8
Reviewed-on: http://gerrit.cloudera.org:8080/8748
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
KuduRPC has support for Kerberos. However, since Impala's client transport
still uses the Thrift transport stack, we need to make sure that a single
security configuration applies to both internal communication (KuduRPC)
and external communication (Thrift's TSaslTransport).
This patch changes InitAuth() to start Sasl regardless of security
configuration, since KRPC uses plain SASL for negotiation on insecure
clusters.
It also moves some utility code out of authentication.cc into
auth-util.cc for resuse by the RpcMgr while enabling kerberos.
The MiniKDC related code is moved out of thrift-server-test.cc into a
new file called mini-kdc-wrapper.h/cc. This file exposes a new class
MiniKdcWrapper which can be easily used by the tests to configure the
kerberos environment, create the keytab, start the KDC and also
initialize the Impala security library.
Tests are added to rpc-mgr-test for kerberos tests over KRPC.
thrift-server-test also has a mechanical change to use MiniKdcWrapper.
Also tested on a live cluster configured to use kerberos.
Change-Id: I8cec5cca5fdb4b1d46bab19e86cb1a8a3ad718fd
Reviewed-on: http://gerrit.cloudera.org:8080/8270
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins
This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.
Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.
Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.
Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.
All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).
This patch is based on an abandoned patch by Henry Robinson.
TESTING
-------
* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.
TO DO
-----
* Port some BE tests to KRPC services.
Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Reviewed-on: http://gerrit.cloudera.org:8080/8023
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins
Prior to this fix, an error in ScannerContext::Stream::GetNextBuffer()
could leave the stream in an inconsistent state:
- The DiskIoMgr hits EOF unexpected, cancels the scan range and enqueues
a buffer with eosr set.
- The ScannerContext::Stream tries to read more bytes, but since it has
hit eosr, it tries to read beyond the end of the scan range using
DiskIoMgr::Read().
- The previous read error resulted in a new file handle being opened.
The now truncated, smaller file causes the seek to fail.
- Then during error handling, the BaseSequenceScanner calls SkipToSync()
and trips over the NULL pointer in in the IO buffer.
In my reproduction this only happens with the file handle cache enabled,
which causes Impala to see two different sized handles: the one from the
cache when the query starts, and the one after reopening the file.
To fix this, we change the I/O manager to always return DISK_IO_ERROR
for errors and we abort a query if we receive such an error in the
scanner.
This change also fixes GetBytesInternal() to maintain the invariant that
the output buffer points to the boundary buffer whenever the latter
contains some data.
I tested this by running the repro from the JIRA and impalad did not
crash but aborted the queries. I also ran the repro with
abort_on_error=1, and with the file handle cache disabled.
Text files are not affected by this problem, since the
text scanner doesn't try to recover from errors during ProcessRange()
but wraps it in RETURN_IF_ERROR instead. With this change queries abort
with the same error.
Parquet files are also not affected since they have the metadata at the
end. Truncated files immediately fail with this error:
WARNINGS: File 'hdfs://localhost:20500/test-warehouse/tpch.partsupp_parquet/foo.0.parq'
has an invalid version number: <UTF8 Garbage>
Change-Id: I44dc95184c241fbcdbdbebad54339530680d3509
Reviewed-on: http://gerrit.cloudera.org:8080/8011
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins
Adds GetBackendAddress() (which is host:port) to error messages stemming
from SCRATCH_LIMIT_EXCEEDED, SCRATCH_READ_TRUNCATED, and
SCRATCH_ALLOCATION_FAILED messages.
Testing:
* Unit tests assert the string is updated for SCRATCH_LIMIT_EXCEEDED
and SCRATCH_ALLOCATION_FAILED. SCRATCH_READ_TRUNCATED doesn't
have an existing test, and I didn't add a new one.
* Manually testing a query that spills after "chmod 000 /tmp/impala-scratch":
$ chmod 000 /tmp/impala-scratch
$ impala-shell
[dev:21000] > set mem_limit=100m;
MEM_LIMIT set to 100m
[dev:21000] > select count(*) from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey;
Query: select count(*) from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
Query submitted at: 2017-09-11 11:07:06 (Coordinator: http://dev:25000)
Query progress can be monitored at: http://dev:25000/query_plan?query_id=5c48ff8f4103c194:1b40a6c00000000
WARNINGS: Could not create files in any configured scratch directories (--scratch_dirs=/tmp/impala-scratch) on backend 'dev:22002'. See logs for previous errors that may have prevented creating or writing scratch files.
Opening '/tmp/impala-scratch/5c48ff8f4103c194:1b40a6c00000000_08e8d63b-169d-4571-a0fe-c48fa08d73e6' for write failed with errno=13 description=Error(13): Permission denied
Opening '/tmp/impala-scratch/5c48ff8f4103c194:1b40a6c00000000_08e8d63b-169d-4571-a0fe-c48fa08d73e6' for write failed with errno=13 description=Error(13): Permission denied
Opening '/tmp/impala-scratch/5c48ff8f4103c194:1b40a6c00000000_08e8d63b-169d-4571-a0fe-c48fa08d73e6' for write failed with errno=13 description=Error(13): Permission denied
Change-Id: If31a50fdf6031312d0348d48aeb8f9688274cac2
Reviewed-on: http://gerrit.cloudera.org:8080/7816
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
The boost thread constructor will throw boost::thread_resource_error
if it is unable to spawn a thread on the system
(e.g. due to a ulimit). This uncaught exception crashes
Impala. Systems with a large number of nodes and threads
are hitting this limit.
This change catches the exception from the thread
constructor and converts it to a Status. This requires
several changes:
1. util/thread.h's Thread constructor is now private
and all Threads are constructed via a new Create()
static factory method.
2. util/thread-pool.h's ThreadPool requires that Init()
be called after the ThreadPool is constructed.
3. To propagate the Status, Threads cannot be created in
constructors, so this is moved to initialization methods
that can return Status.
4. Threads now use unique_ptr's for management in all
cases. Threads cannot be used as stack-allocated local
variables or direct declarations in classes.
Query execution code paths will now handle the error:
1. If the scan node fails to spawn any scanner thread,
it will abort the query.
2. Failing to spawn a fragment instance from the query
state in StartFInstances() will correctly report the error
to the coordinator and tear down the query.
Testing:
This introduces the parameter thread_creation_fault_injection,
which will cause Thread::Create() calls in eligible
locations to fail randomly roughly 1% of the time.
Quite a few locations of Thread::Create() and
ThreadPool::Init() are necessary for startup and cannot
be eligible. However, all the locations used for query
execution are marked as eligible and governed by this
parameter. The code was tested by setting this parameter
to true and running queries to verify that queries either
run to completion with the correct result or fail with
appropriate status.
Change-Id: I15a2f278dc71892b7fec09593f81b1a57ab725c0
Reviewed-on: http://gerrit.cloudera.org:8080/7730
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins
Augment the error message to mention that oversubscription is likely the
problem and hint at solutions.
Change-Id: I8e367e1b0cb08e11fdd0546880df23b785e3b7c9
Reviewed-on: http://gerrit.cloudera.org:8080/7861
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins
Sometimes the client is not open when the debug action fires at the
start of Open() or Prepare(). In that case we should set the
probability when the client is opened later.
This caused one of the large row tests to start failing with a "failed
to repartition" error in the aggregation. The error is a false positive
caused by two distinct keys hashing to the same partition. Removing the
check allows the query to succeed because the keys hash to different
partitions in the next round of repartitioning.
If we repeatedly get unlucky and have collisions, the query will still
fail when it reaches MAX_PARTITION_DEPTH.
Testing:
Ran TestSpilling in a loop for a couple of hours, including the
exhaustive-only tests.
Change-Id: Ib26b697544d6c2312a8e1fe91b0cf8c0917e5603
Reviewed-on: http://gerrit.cloudera.org:8080/7771
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins