mirror of
https://github.com/apache/impala.git
synced 2025-12-23 21:08:39 -05:00
IMPALA-13627: Handle legacy Hive timezone conversion
After HIVE-12191, Hive has 2 different methods of calculating timestamp
conversion from UTC to local timezone. When Impala has
convert_legacy_hive_parquet_utc_timestamps=true, it assumes times
written by Hive are in UTC and converts them to local time using tzdata,
which matches the newer method introduced by HIVE-12191.
Some dates convert differently between the two methods, such as
Asia/Kuala_Lumpur or Singapore prior to 1982 (also seen in HIVE-24074).
After HIVE-25104, Hive writes 'writer.zone.conversion.legacy' to
distinguish which method is being used. As a result there are three
different cases we have to handle:
1. Hive prior to 3.1 used what’s now called “legacy conversion” using
SimpleDateFormat.
2. Hive 3.1.2 (with HIVE-21290) used a new Java API that’s based on
tzdata and added metadata to identify the timezone.
3. Hive 4 support both, and added a new file metadata to identify it.
Adds handling for Hive files (identified by created_by=parquet-mr) where
we can infer the correct handling from Parquet file metadata:
1. if writer.zone.conversion.legacy is present (Hive 4), use it to
determine whether to use a legacy conversion method compatible with
Hive's legacy behavior, or convert using tzdata.
2. if writer.zone.conversion.legacy is not present but writer.time.zone
is, we can infer it was written by Hive 3.1.2+ using new APIs.
3. otherwise it was likely written by an earlier Hive version.
Adds a new CLI and query option - use_legacy_hive_timestamp_conversion -
to select what conversion method to use in the 3rd case above, when
Impala determines that the file was written by Hive older than 3.1.2.
Defaults to false to minimize changes in Impala's behavior and because
going through JNI is ~50x slower even when the results would not differ;
Hive defaults to true for its equivalent setting:
hive.parquet.timestamp.legacy.conversion.enabled.
Hive legacy-compatible conversion uses a Java method that would be
complicated to mimic in C++, doing
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
formatter.setTimeZone(TimeZone.getTimeZone(timezone_string));
java.util.Date date = formatter.parse(date_time_string);
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
return out.println(formatter.format(date);
IMPALA-9385 added a check against a Timezone pointer in
FromUnixTimestamp. That dominates the time in FromUnixTimeNanos,
overriding any benchmark gains from IMPALA-7417. Moves FromUnixTime to
allow inlining, and switches to using UTCPTR in the benchmark - as
IMPALA-9385 did in most other code - to restore benchmark results.
Testing:
- Adds JVM conversion method to convert-timestamp-benchmark.
- Adds tests for several cases from Hive conversion tests.
Change-Id: I1271ed1da0b74366ab8315e7ec2d4ee47111e067
Reviewed-on: http://gerrit.cloudera.org:8080/22293
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This commit is contained in:
@@ -37,8 +37,11 @@
|
||||
#include "exprs/timezone_db.h"
|
||||
#include "exprs/timestamp-functions.h"
|
||||
#include "runtime/datetime-simple-date-format-parser.h"
|
||||
#include "runtime/test-env.h"
|
||||
#include "runtime/timestamp-value.h"
|
||||
#include "runtime/timestamp-value.inline.h"
|
||||
#include "service/fe-support.h"
|
||||
#include "service/frontend.h"
|
||||
#include "util/benchmark.h"
|
||||
#include "util/cpu-info.h"
|
||||
#include "util/pretty-printer.h"
|
||||
@@ -49,118 +52,120 @@
|
||||
using std::random_device;
|
||||
using std::mt19937;
|
||||
using std::uniform_int_distribution;
|
||||
using std::thread;
|
||||
|
||||
using namespace impala;
|
||||
using namespace datetime_parse_util;
|
||||
|
||||
// Benchmark tests for timestamp time-zone conversions
|
||||
/*
|
||||
Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
|
||||
Machine Info: 12th Gen Intel(R) Core(TM) i9-12900K
|
||||
|
||||
UtcToUnixTime: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(glibc) 7.98 8.14 8.3 1X 1X 1X
|
||||
(Google/CCTZ) 17.9 18.2 18.5 2.24X 2.24X 2.23X
|
||||
(boost) 301 306 311 37.7X 37.5X 37.5X
|
||||
(glibc) 12 12.1 12.2 1X 1X 1X
|
||||
(Google/CCTZ) 25.2 25.5 25.5 2.1X 2.11X 2.09X
|
||||
(boost) 635 643 646 53X 53.3X 52.9X
|
||||
|
||||
LocalToUnixTime: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(glibc) 0.717 0.732 0.745 1X 1X 1X
|
||||
(Google/CCTZ) 15.3 15.5 15.8 21.3X 21.2X 21.2X
|
||||
(glibc) 0.739 0.745 0.745 1X 1X 1X
|
||||
(Google/CCTZ) 23.2 23.4 23.7 31.4X 31.4X 31.7X
|
||||
|
||||
FromUtc: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(boost) 1.6 1.63 1.67 1X 1X 1X
|
||||
(Google/CCTZ) 14.5 14.8 15.2 9.06X 9.09X 9.11X
|
||||
(boost) 2.59 2.6 2.6 1X 1X 1X
|
||||
(Google/CCTZ) 24.4 24.7 24.9 9.45X 9.5X 9.58X
|
||||
|
||||
ToUtc: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(boost) 1.63 1.67 1.68 1X 1X 1X
|
||||
(Google/CCTZ) 8.7 8.9 9.05 5.34X 5.34X 5.38X
|
||||
(boost) 2.6 2.64 2.65 1X 1X 1X
|
||||
(Google/CCTZ) 13.5 13.6 13.7 5.2X 5.16X 5.18X
|
||||
|
||||
UtcToLocal: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(glibc) 2.68 2.75 2.8 1X 1X 1X
|
||||
(Google/CCTZ) 15 15.2 15.5 5.59X 5.55X 5.53X
|
||||
(glibc) 4.42 4.42 4.44 1X 1X 1X
|
||||
(Google/CCTZ) 25.6 26.6 27 5.78X 6.01X 6.08X
|
||||
(JVM) 0.511 0.596 0.6 0.115X 0.135X 0.135X
|
||||
|
||||
UnixTimeToLocalPtime: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(glibc) 2.69 2.75 2.8 1X 1X 1X
|
||||
(Google/CCTZ) 14.8 15.1 15.4 5.5X 5.49X 5.52X
|
||||
(glibc) 4.51 4.53 4.53 1X 1X 1X
|
||||
(Google/CCTZ) 23.7 24.1 24.4 5.25X 5.32X 5.4X
|
||||
|
||||
UnixTimeToUtcPtime: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(glibc) 17 17.6 17.9 1X 1X 1X
|
||||
(Google/CCTZ) 6.45 6.71 6.81 0.379X 0.382X 0.38X
|
||||
(fast path) 25.1 26 26.4 1.47X 1.48X 1.48X
|
||||
(day split) 48.6 50.3 51.3 2.85X 2.87X 2.86X
|
||||
(glibc) 24.8 25.1 25.6 1X 1X 1X
|
||||
(Google/CCTZ) 13.9 14 14.1 0.562X 0.557X 0.553X
|
||||
(fast path) 54.3 54.8 55.4 2.19X 2.18X 2.17X
|
||||
(day split) 196 199 200 7.89X 7.92X 7.81X
|
||||
|
||||
UtcFromUnixTimeMicros: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(sec split (old)) 17.9 18.7 19.1 1X 1X 1X
|
||||
(day split) 111 116 118 6.21X 6.19X 6.19X
|
||||
(sec split (old)) 30.1 31 31.7 1X 1X 1X
|
||||
(day split) 526 532 534 17.5X 17.2X 16.9X
|
||||
|
||||
FromUnixTimeNanos: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(sec split (old)) 18.7 19.5 19.8 1X 1X 1X
|
||||
(sec split (new)) 104 108 110 5.58X 5.55X 5.57X
|
||||
(sec split (old)) 36.8 37.5 39 1X 1X 1X
|
||||
(sec split (new)) 319 323 324 8.68X 8.61X 8.33X
|
||||
|
||||
FromSubsecondUnixTime: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile
|
||||
(relative) (relative) (relative)
|
||||
---------------------------------------------------------------------------------------------------------
|
||||
(old) 18.7 18.7 18.7 1X 1X 1X
|
||||
(new) 73.5 74.1 74.1 3.94X 3.96X 3.96X
|
||||
(old) 37.1 38 38.7 1X 1X 1X
|
||||
(new) 175 175 177 4.71X 4.6X 4.59X
|
||||
|
||||
Number of threads: 8
|
||||
|
||||
UtcToUnixTime:
|
||||
(glibc) elapsed time: 1s020ms
|
||||
(Google/CCTZ) elapsed time: 144ms
|
||||
(boost) elapsed time: 10ms
|
||||
cctz speedup: 7.0784
|
||||
boost speedup: 95.1732
|
||||
(glibc) elapsed time: 793ms
|
||||
(Google/CCTZ) elapsed time: 152ms
|
||||
(boost) elapsed time: 3ms
|
||||
cctz speedup: 5.20977
|
||||
boost speedup: 260.517
|
||||
|
||||
LocalToUnixTime:
|
||||
(glibc) elapsed time: 18s050ms
|
||||
(Google/CCTZ) elapsed time: 212ms
|
||||
speedup: 84.9949
|
||||
(glibc) elapsed time: 21s750ms
|
||||
(Google/CCTZ) elapsed time: 242ms
|
||||
speedup: 89.6388
|
||||
|
||||
FromUtc:
|
||||
(boost) elapsed time: 1s519ms
|
||||
(Google/CCTZ) elapsed time: 263ms
|
||||
speedup: 5.77003
|
||||
(boost) elapsed time: 1s298ms
|
||||
(Google/CCTZ) elapsed time: 279ms
|
||||
speedup: 4.64642
|
||||
|
||||
ToUtc:
|
||||
(boost) elapsed time: 1s674ms
|
||||
(Google/CCTZ) elapsed time: 325ms
|
||||
speedup: 5.13874
|
||||
(boost) elapsed time: 1s436ms
|
||||
(Google/CCTZ) elapsed time: 496ms
|
||||
speedup: 2.89263
|
||||
|
||||
UtcToLocal:
|
||||
(glibc) elapsed time: 4s862ms
|
||||
(Google/CCTZ) elapsed time: 263ms
|
||||
speedup: 18.4253
|
||||
(glibc) elapsed time: 4s565ms
|
||||
(Google/CCTZ) elapsed time: 260ms
|
||||
(JVM) elapsed time: 2s507ms
|
||||
cctz speedup: 17.5058
|
||||
jvm speedup: 1.82038
|
||||
|
||||
UnixTimeToLocalPtime:
|
||||
(glibc) elapsed time: 4s856ms
|
||||
(Google/CCTZ) elapsed time: 259ms
|
||||
speedup: 18.7398
|
||||
(glibc) elapsed time: 4s576ms
|
||||
(Google/CCTZ) elapsed time: 268ms
|
||||
speedup: 17.0547
|
||||
|
||||
UnixTimeToUtcPtime:
|
||||
(glibc) elapsed time: 928ms
|
||||
(Google/CCTZ) elapsed time: 282ms
|
||||
(fast path) elapsed time: 90ms
|
||||
cctz speedup: 3.28187
|
||||
fast path speedup: 10.2951
|
||||
(glibc) elapsed time: 478ms
|
||||
(Google/CCTZ) elapsed time: 123ms
|
||||
(fast path) elapsed time: 25ms
|
||||
cctz speedup: 3.87304
|
||||
fast path speedup: 18.9835
|
||||
*/
|
||||
|
||||
vector<TimestampValue> AddTestDataDateTimes(int n, const string& startstr) {
|
||||
@@ -223,11 +228,11 @@ public:
|
||||
}
|
||||
|
||||
// Create and start threads.
|
||||
vector<unique_ptr<thread>> threads(num_of_threads);
|
||||
vector<unique_ptr<std::thread>> threads(num_of_threads);
|
||||
StopWatch sw;
|
||||
sw.Start();
|
||||
for (int i = 0; i < num_of_threads; ++i) {
|
||||
threads[i] = make_unique<thread>(
|
||||
threads[i] = make_unique<std::thread>(
|
||||
run_test, batch_size, test_data[i].get());
|
||||
}
|
||||
|
||||
@@ -403,6 +408,13 @@ TimestampValue cctz_utc_to_local(const TimestampValue& ts_value) {
|
||||
return TimestampValue(d, t);
|
||||
}
|
||||
|
||||
// JVM
|
||||
TimestampValue jvm_utc_to_local(const TimestampValue& ts_value) {
|
||||
TimestampValue result = ts_value;
|
||||
result.HiveLegacyUtcToLocal(*PTR_CCTZ_LOCAL_TZ);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Test FromUtc (CCTZ is expected to be faster than boost)
|
||||
@@ -603,8 +615,7 @@ TimestampValue old_split_utc_from_unix_time_nanos(const SplitNanoAndSecond& unix
|
||||
TimestampValue new_split_utc_from_unix_time_nanos(const SplitNanoAndSecond& unix_time) {
|
||||
// The TimestampValue version is used as it is hard to reproduce the same logic without
|
||||
// accessing private members.
|
||||
return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos,
|
||||
&TimezoneDatabase::GetUtcTimezone());
|
||||
return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos, UTCPTR);
|
||||
}
|
||||
|
||||
TimestampValue from_subsecond_unix_time_old(const double& unix_time) {
|
||||
@@ -621,8 +632,7 @@ TimestampValue from_subsecond_unix_time_new(const double& unix_time) {
|
||||
const double ONE_BILLIONTH = 0.000000001;
|
||||
int64_t unix_time_whole = unix_time;
|
||||
int64_t nanos = (unix_time - unix_time_whole) / ONE_BILLIONTH;
|
||||
return TimestampValue::FromUnixTimeNanos(
|
||||
unix_time_whole, nanos, &TimezoneDatabase::GetUtcTimezone());
|
||||
return TimestampValue::FromUnixTimeNanos(unix_time_whole, nanos, UTCPTR);
|
||||
}
|
||||
|
||||
//
|
||||
@@ -698,7 +708,11 @@ TimestampVal cctz_to_utc(const TimestampVal& ts_val) {
|
||||
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
CpuInfo::Init();
|
||||
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
|
||||
impala::InitFeSupport();
|
||||
TestEnv test_env;
|
||||
CHECK(test_env.Init().ok());
|
||||
|
||||
cout << Benchmark::GetMachineInfo() << endl;
|
||||
|
||||
ABORT_IF_ERROR(TimezoneDatabase::Initialize());
|
||||
@@ -783,9 +797,12 @@ int main(int argc, char* argv[]) {
|
||||
tsvalue_data;
|
||||
TestData<TimestampValue, TimestampValue, cctz_utc_to_local> cctz_utc_to_local_data =
|
||||
tsvalue_data;
|
||||
TestData<TimestampValue, TimestampValue, jvm_utc_to_local> jvm_utc_to_local_data =
|
||||
tsvalue_data;
|
||||
|
||||
glibc_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(glibc)");
|
||||
cctz_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(Google/CCTZ)");
|
||||
jvm_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(JVM)");
|
||||
cout << bm_utc_to_local.Measure() << endl;
|
||||
|
||||
bail_if_results_dont_match(vector<const vector<TimestampValue>*>{
|
||||
@@ -795,7 +812,7 @@ int main(int argc, char* argv[]) {
|
||||
vector<time_t> time_data;
|
||||
for (const TimestampValue& tsvalue: tsvalue_data) {
|
||||
time_t unix_time;
|
||||
tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
|
||||
tsvalue.ToUnixTime(UTCPTR, &unix_time);
|
||||
time_data.push_back(unix_time);
|
||||
}
|
||||
|
||||
@@ -859,7 +876,7 @@ int main(int argc, char* argv[]) {
|
||||
for (int i = 0; i < tsvalue_data.size(); ++i) {
|
||||
const TimestampValue& tsvalue = tsvalue_data[i];
|
||||
time_t unix_time;
|
||||
tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
|
||||
tsvalue.ToUnixTime(UTCPTR, &unix_time);
|
||||
int micros = (i * 1001) % MICROS_PER_SEC; // add some sub-second part
|
||||
microsec_data.push_back(unix_time * MICROS_PER_SEC + micros);
|
||||
}
|
||||
@@ -885,7 +902,7 @@ int main(int argc, char* argv[]) {
|
||||
for (int i = 0; i < tsvalue_data.size(); ++i) {
|
||||
const TimestampValue& tsvalue = tsvalue_data[i];
|
||||
time_t unix_time;
|
||||
tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
|
||||
tsvalue.ToUnixTime(UTCPTR, &unix_time);
|
||||
int nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
|
||||
nanosec_data.push_back(SplitNanoAndSecond {unix_time, nanos} );
|
||||
}
|
||||
@@ -911,7 +928,7 @@ int main(int argc, char* argv[]) {
|
||||
for (int i = 0; i < tsvalue_data.size(); ++i) {
|
||||
const TimestampValue& tsvalue = tsvalue_data[i];
|
||||
time_t unix_time;
|
||||
tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
|
||||
tsvalue.ToUnixTime(UTCPTR, &unix_time);
|
||||
double nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
|
||||
double_data.push_back((double)unix_time + nanos / NANOS_PER_SEC);
|
||||
}
|
||||
@@ -979,7 +996,10 @@ int main(int argc, char* argv[]) {
|
||||
num_of_threads, BATCH_SIZE, tsvalue_data, "(glibc)");
|
||||
m2 = cctz_utc_to_local_data.measure_multithreaded_elapsed_time(
|
||||
num_of_threads, BATCH_SIZE, tsvalue_data, "(Google/CCTZ)");
|
||||
cout << "speedup: " << double(m1)/double(m2) << endl;
|
||||
m3 = jvm_utc_to_local_data.measure_multithreaded_elapsed_time(
|
||||
num_of_threads, BATCH_SIZE, tsvalue_data, "(JVM)");
|
||||
cout << "cctz speedup: " << double(m1)/double(m2) << endl;
|
||||
cout << "jvm speedup: " << double(m1)/double(m3) << endl;
|
||||
|
||||
// UnixTimeToLocalPtime
|
||||
cout << endl << "UnixTimeToLocalPtime:" << endl;
|
||||
|
||||
@@ -26,6 +26,8 @@
|
||||
#include "runtime/datetime-simple-date-format-parser.h"
|
||||
#include "runtime/date-parse-util.h"
|
||||
#include "runtime/timestamp-parse-util.h"
|
||||
#include "runtime/timestamp-value.h"
|
||||
#include "runtime/timestamp-value.inline.h"
|
||||
#include "util/benchmark.h"
|
||||
#include "util/cpu-info.h"
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#include <queue>
|
||||
#include <stack>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <gflags/gflags.h>
|
||||
#include <gutil/strings/substitute.h>
|
||||
|
||||
@@ -53,6 +55,9 @@
|
||||
|
||||
#include "common/names.h"
|
||||
|
||||
using boost::adaptors::transformed;
|
||||
using boost::algorithm::iequals;
|
||||
using boost::algorithm::join;
|
||||
using std::move;
|
||||
using std::sort;
|
||||
using namespace impala;
|
||||
@@ -564,14 +569,13 @@ Status HdfsParquetScanner::ResolveSchemaForStatFiltering(SlotDescriptor* slot_de
|
||||
}
|
||||
|
||||
ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
|
||||
const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
|
||||
SchemaNode* node, const ColumnType& col_type) {
|
||||
const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType& col_type) {
|
||||
DCHECK(node);
|
||||
|
||||
int col_idx = node->col_idx;
|
||||
DCHECK_LT(col_idx, row_group.columns.size());
|
||||
|
||||
const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders;
|
||||
const vector<parquet::ColumnOrder>& col_orders = file_metadata_.column_orders;
|
||||
const parquet::ColumnOrder* col_order =
|
||||
col_idx < col_orders.size() ? &col_orders[col_idx] : nullptr;
|
||||
|
||||
@@ -585,8 +589,7 @@ ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
|
||||
return stat_reader;
|
||||
}
|
||||
|
||||
Status HdfsParquetScanner::EvaluateStatsConjuncts(
|
||||
const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
|
||||
Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup& row_group,
|
||||
bool* skip_row_group) {
|
||||
*skip_row_group = false;
|
||||
|
||||
@@ -623,7 +626,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
|
||||
}
|
||||
|
||||
ColumnStatsReader stats_reader =
|
||||
CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
|
||||
CreateStatsReader(row_group, node, slot_desc->type());
|
||||
|
||||
bool all_nulls = false;
|
||||
if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
|
||||
@@ -686,8 +689,7 @@ bool HdfsParquetScanner::FilterAlreadyDisabledOrOverlapWithColumnStats(
|
||||
}
|
||||
|
||||
Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
|
||||
const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
|
||||
bool* skip_row_group) {
|
||||
const parquet::RowGroup& row_group, bool* skip_row_group) {
|
||||
*skip_row_group = false;
|
||||
|
||||
if (!state_->query_options().parquet_read_statistics) return Status::OK();
|
||||
@@ -764,7 +766,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
|
||||
break;
|
||||
}
|
||||
ColumnStatsReader stats_reader =
|
||||
CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
|
||||
CreateStatsReader(row_group, node, slot_desc->type());
|
||||
|
||||
bool all_nulls = false;
|
||||
if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
|
||||
@@ -926,8 +928,7 @@ Status HdfsParquetScanner::NextRowGroup() {
|
||||
|
||||
// Evaluate row group statistics with stats conjuncts.
|
||||
bool skip_row_group_on_stats;
|
||||
RETURN_IF_ERROR(
|
||||
EvaluateStatsConjuncts(file_metadata_, row_group, &skip_row_group_on_stats));
|
||||
RETURN_IF_ERROR(EvaluateStatsConjuncts(row_group, &skip_row_group_on_stats));
|
||||
if (skip_row_group_on_stats) {
|
||||
COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
|
||||
continue;
|
||||
@@ -935,8 +936,7 @@ Status HdfsParquetScanner::NextRowGroup() {
|
||||
|
||||
// Evaluate row group statistics with min/max filters.
|
||||
bool skip_row_group_on_minmax;
|
||||
RETURN_IF_ERROR(
|
||||
EvaluateOverlapForRowGroup(file_metadata_, row_group, &skip_row_group_on_minmax));
|
||||
RETURN_IF_ERROR(EvaluateOverlapForRowGroup(row_group, &skip_row_group_on_minmax));
|
||||
if (skip_row_group_on_minmax) {
|
||||
COUNTER_ADD(num_minmax_filtered_row_groups_counter_, 1);
|
||||
continue;
|
||||
@@ -1487,7 +1487,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
|
||||
}
|
||||
|
||||
ColumnStatsReader stats_reader =
|
||||
CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
|
||||
CreateStatsReader(row_group, node, slot_desc->type());
|
||||
|
||||
DCHECK_LT(col_idx, row_group.columns.size());
|
||||
const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
|
||||
@@ -1570,7 +1570,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
|
||||
}
|
||||
int col_idx = node->col_idx;;
|
||||
ColumnStatsReader stats_reader =
|
||||
CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
|
||||
CreateStatsReader(row_group, node, slot_desc->type());
|
||||
|
||||
DCHECK_LT(col_idx, row_group.columns.size());
|
||||
const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
|
||||
@@ -2826,6 +2826,13 @@ Status HdfsParquetScanner::ProcessFooter() {
|
||||
|
||||
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename()));
|
||||
|
||||
if (VLOG_FILE_IS_ON) {
|
||||
VLOG_FILE << "Parquet metadata for " << filename() << " created by "
|
||||
<< file_metadata_.created_by << ":\n"
|
||||
<< join(file_metadata_.key_value_metadata | transformed(
|
||||
[](parquet::KeyValue kv) { return kv.key + "=" + kv.value; }), "\n");
|
||||
}
|
||||
|
||||
// IMPALA-3943: Do not throw an error for empty files for backwards compatibility.
|
||||
if (file_metadata_.num_rows == 0) {
|
||||
// Warn if the num_rows is inconsistent with the row group metadata.
|
||||
@@ -3157,8 +3164,41 @@ ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
|
||||
state_->query_options().convert_legacy_hive_parquet_utc_timestamps &&
|
||||
state_->local_time_zone() != UTCPTR;
|
||||
|
||||
return ParquetTimestampDecoder(element, state_->local_time_zone(),
|
||||
timestamp_conversion_needed_for_int96_timestamps);
|
||||
const Timezone* timezone = state_->local_time_zone();
|
||||
bool hive_legacy_conversion = false;
|
||||
if (timestamp_conversion_needed_for_int96_timestamps && GetHiveZoneConversionLegacy()) {
|
||||
VLOG_FILE << "Using Hive legacy timezone conversion";
|
||||
hive_legacy_conversion = true;
|
||||
}
|
||||
|
||||
return ParquetTimestampDecoder(element, timezone,
|
||||
timestamp_conversion_needed_for_int96_timestamps, hive_legacy_conversion);
|
||||
}
|
||||
|
||||
bool HdfsParquetScanner::GetHiveZoneConversionLegacy() const {
|
||||
string writer_zone_conversion_legacy;
|
||||
string writer_time_zone;
|
||||
for (const parquet::KeyValue& kv : file_metadata_.key_value_metadata) {
|
||||
if (kv.key == "writer.zone.conversion.legacy") {
|
||||
writer_zone_conversion_legacy = kv.value;
|
||||
} else if (kv.key == "writer.time.zone") {
|
||||
writer_time_zone = kv.value;
|
||||
}
|
||||
}
|
||||
|
||||
if (writer_zone_conversion_legacy != "") {
|
||||
return iequals(writer_zone_conversion_legacy, "true");
|
||||
}
|
||||
|
||||
// There are no explicit meta about the legacy conversion.
|
||||
if (writer_time_zone != "") {
|
||||
// There is meta about the timezone thus we can infer that when the file was written,
|
||||
// the new APIs were used.
|
||||
return false;
|
||||
}
|
||||
|
||||
// There is no (relevant) metadata in the file, use the configuration.
|
||||
return state_->query_options().use_legacy_hive_timestamp_conversion;
|
||||
}
|
||||
|
||||
void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t compressed_page_size) {
|
||||
|
||||
@@ -587,10 +587,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
|
||||
}
|
||||
|
||||
/// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
|
||||
/// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
|
||||
/// compute the statistics. Sets 'skip_row_group' to true if the row group can be
|
||||
/// skipped, 'false' otherwise.
|
||||
Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
|
||||
/// of 'row_group'. Sets 'skip_row_group' to true if the row group can be skipped,
|
||||
/// 'false' otherwise.
|
||||
Status EvaluateStatsConjuncts(
|
||||
const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT;
|
||||
|
||||
/// Advances 'row_group_idx_' to the next non-empty row group and initializes
|
||||
@@ -601,12 +600,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
|
||||
Status NextRowGroup() WARN_UNUSED_RESULT;
|
||||
|
||||
/// Evaluates the overlap predicates of the 'scan_node_' using the parquet::Statistics
|
||||
/// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
|
||||
/// compute the statistics. Sets 'skip_row_group' to true if the row group can be
|
||||
/// skipped, 'false' otherwise.
|
||||
/// of 'row_group'. Sets 'skip_row_group' to true if the row group can be skipped,
|
||||
/// 'false' otherwise.
|
||||
Status EvaluateOverlapForRowGroup(
|
||||
const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
|
||||
bool* skip_row_group);
|
||||
const parquet::RowGroup& row_group, bool* skip_row_group);
|
||||
|
||||
/// Return true if filter 'minmax_filter' of fitler id 'filter_id' is too close to
|
||||
/// column min/max stats available at the target desc entry targets[0] in
|
||||
@@ -631,12 +628,12 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
|
||||
SchemaNode** schema_node_ptr = nullptr);
|
||||
|
||||
/// Create a ColumnStatsReader object for a column chunk described by a schema
|
||||
/// path in a slot descriptor 'slot_desc'. 'file_metadata', 'row_group', 'node',
|
||||
/// and 'col_type' provide extra data needed.
|
||||
/// path in a slot descriptor 'slot_desc'. 'row_group', 'node', and 'col_type' provide
|
||||
/// extra data needed.
|
||||
/// On return:
|
||||
/// A column chunk stats reader ('ColumnStatsReader') is returned.
|
||||
ColumnStatsReader CreateStatsReader(const parquet::FileMetaData& file_metadata,
|
||||
const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType& col_type);
|
||||
ColumnStatsReader CreateStatsReader(const parquet::RowGroup& row_group,
|
||||
SchemaNode* node, const ColumnType& col_type);
|
||||
|
||||
/// Return the overlap predicate descs from the HDFS scan plan.
|
||||
const vector<TOverlapPredicateDesc>& GetOverlapPredicateDescs();
|
||||
@@ -944,6 +941,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
|
||||
/// then we skip to row index 'skip_to_row'.
|
||||
Status SkipRowsForColumns(const vector<ParquetColumnReader*>& column_readers,
|
||||
int64_t* num_rows_to_skip, int64_t* skip_to_row);
|
||||
|
||||
/// Returns whether Hive legacy zone conversion should be used for transforming
|
||||
/// timestamps based on file metadata and configuration.
|
||||
bool GetHiveZoneConversionLegacy() const;
|
||||
};
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -255,7 +255,8 @@ bool ParquetTimestampDecoder::GetTimestampInfoFromSchema(const parquet::SchemaEl
|
||||
}
|
||||
|
||||
ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& e,
|
||||
const Timezone* timezone, bool convert_int96_timestamps) {
|
||||
const Timezone* timezone, bool convert_int96_timestamps,
|
||||
bool hive_legacy_conversion) : hive_legacy_conversion_(hive_legacy_conversion) {
|
||||
bool needs_conversion = false;
|
||||
bool valid_schema = GetTimestampInfoFromSchema(e, precision_, needs_conversion);
|
||||
DCHECK(valid_schema); // Invalid schemas should be rejected in an earlier step.
|
||||
@@ -267,7 +268,15 @@ void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const
|
||||
DCHECK(timezone_ != nullptr);
|
||||
if (!v->HasDateAndTime()) return;
|
||||
TimestampValue repeated_period_start;
|
||||
v->UtcToLocal(*timezone_, &repeated_period_start);
|
||||
if (hive_legacy_conversion_) {
|
||||
// Hive legacy conversion does not have efficient tools for identifying repeated
|
||||
// periods, so subtract a day to ensure we cover all possible repeated periods
|
||||
// (such as switching from UTC- to UTC+ near the international date line).
|
||||
v->HiveLegacyUtcToLocal(*timezone_);
|
||||
v->Subtract(boost::posix_time::hours(24));
|
||||
} else {
|
||||
v->UtcToLocal(*timezone_, &repeated_period_start);
|
||||
}
|
||||
if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
|
||||
}
|
||||
|
||||
@@ -275,7 +284,15 @@ void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const
|
||||
DCHECK(timezone_ != nullptr);
|
||||
if (!v->HasDateAndTime()) return;
|
||||
TimestampValue repeated_period_end;
|
||||
v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
|
||||
if (hive_legacy_conversion_) {
|
||||
// Hive legacy conversion does not have efficient tools for identifying repeated
|
||||
// periods, so add a day to ensure we cover all possible repeated periods
|
||||
// (such as switching from UTC- to UTC+ near the international date line).
|
||||
v->HiveLegacyUtcToLocal(*timezone_);
|
||||
v->Add(boost::posix_time::hours(24));
|
||||
} else {
|
||||
v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
|
||||
}
|
||||
if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -764,7 +764,7 @@ public:
|
||||
ParquetTimestampDecoder() {}
|
||||
|
||||
ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* timezone,
|
||||
bool convert_int96_timestamps);
|
||||
bool convert_int96_timestamps, bool hive_legacy_conversion);
|
||||
|
||||
bool NeedsConversion() const { return timezone_ != nullptr; }
|
||||
|
||||
@@ -798,7 +798,13 @@ public:
|
||||
|
||||
void ConvertToLocalTime(TimestampValue* v) const {
|
||||
DCHECK(timezone_ != nullptr);
|
||||
if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
|
||||
if (v->HasDateAndTime()) {
|
||||
if (hive_legacy_conversion_) {
|
||||
v->HiveLegacyUtcToLocal(*timezone_);
|
||||
} else {
|
||||
v->UtcToLocal(*timezone_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timezone conversion of min/max stats need some extra logic because UTC->local
|
||||
@@ -831,6 +837,9 @@ private:
|
||||
/// INT64 decoding. INT64 with nanosecond precision (and reduced range) is also planned
|
||||
/// to be implemented once it is added in Parquet (PARQUET-1387).
|
||||
Precision precision_ = NANO;
|
||||
|
||||
/// Use Hive legacy-compatible conversion with Java DateFormat.
|
||||
bool hive_legacy_conversion_ = false;
|
||||
};
|
||||
|
||||
template <>
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
|
||||
#include "exprs/timezone_db.h"
|
||||
#include "runtime/raw-value.inline.h"
|
||||
#include "runtime/timestamp-value.h"
|
||||
#include "runtime/timestamp-value.inline.h"
|
||||
#include "testutil/gtest-util.h"
|
||||
|
||||
#include "common/names.h"
|
||||
|
||||
@@ -17,14 +17,19 @@
|
||||
|
||||
#include "runtime/timestamp-value.h"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
#include "exprs/timestamp-functions.h"
|
||||
#include "exprs/timezone_db.h"
|
||||
#include "runtime/datetime-simple-date-format-parser.h"
|
||||
#include "runtime/exec-env.h"
|
||||
#include "runtime/timestamp-parse-util.h"
|
||||
#include "runtime/timestamp-value.inline.h"
|
||||
#include "service/frontend.h"
|
||||
|
||||
#include "common/names.h"
|
||||
|
||||
using boost::algorithm::starts_with;
|
||||
using boost::date_time::not_a_date_time;
|
||||
using boost::gregorian::date;
|
||||
using boost::gregorian::date_duration;
|
||||
@@ -155,6 +160,32 @@ void TimestampValue::UtcToLocal(const Timezone& local_tz,
|
||||
}
|
||||
}
|
||||
|
||||
void TimestampValue::HiveLegacyUtcToLocal(const Timezone& local_tz) {
|
||||
DCHECK(HasDateAndTime());
|
||||
int64_t utc_time_millis;
|
||||
if (UNLIKELY(!FloorUtcToUnixTimeMillis(&utc_time_millis))) {
|
||||
SetToInvalidDateTime();
|
||||
return;
|
||||
}
|
||||
|
||||
string tz = local_tz.name();
|
||||
static constexpr std::string_view zoneinfo = "/usr/share/zoneinfo/";
|
||||
if (starts_with(tz, zoneinfo)) {
|
||||
tz = tz.substr(zoneinfo.size());
|
||||
}
|
||||
|
||||
TCivilTime cs;
|
||||
Status status = ExecEnv::GetInstance()->frontend()->HiveLegacyTimezoneConvert(
|
||||
tz, utc_time_millis, &cs);
|
||||
if (UNLIKELY(!status.ok())) {
|
||||
// This would result in log spam. However it should be impossible to fail.
|
||||
LOG(ERROR) << "Timezone " << tz << " cannot be used with legacy Hive conversion.";
|
||||
return;
|
||||
}
|
||||
date_ = boost::gregorian::date(cs.year, cs.month, cs.day);
|
||||
time_ = time_duration(cs.hour, cs.minute, cs.second, time_.fractional_seconds());
|
||||
}
|
||||
|
||||
void TimestampValue::LocalToUtc(const Timezone& local_tz,
|
||||
TimestampValue* pre_utc_if_repeated, TimestampValue* post_utc_if_repeated) {
|
||||
DCHECK(HasDateAndTime());
|
||||
@@ -227,14 +258,6 @@ TimestampValue TimestampValue::UnixTimeToLocal(
|
||||
}
|
||||
}
|
||||
|
||||
TimestampValue TimestampValue::FromUnixTime(time_t unix_time, const Timezone* local_tz) {
|
||||
if (local_tz != UTCPTR) {
|
||||
return UnixTimeToLocal(unix_time, *local_tz);
|
||||
} else {
|
||||
return UtcFromUnixTimeTicks<1>(unix_time);
|
||||
}
|
||||
}
|
||||
|
||||
void TimestampValue::ToString(string& dst) const {
|
||||
dst.resize(SimpleDateFormatTokenizer::DEFAULT_DATE_TIME_FMT_LEN);
|
||||
const int out_len = TimestampParser::FormatDefault(date(), time(), dst.data());
|
||||
|
||||
@@ -296,6 +296,11 @@ class TimestampValue {
|
||||
TimestampValue* start_of_repeated_period = nullptr,
|
||||
TimestampValue* end_of_repeated_period = nullptr);
|
||||
|
||||
/// Converts from UTC to 'local_tz' time zone in-place. The caller must ensure the
|
||||
/// TimestampValue this function is called upon has both a valid date and time. Uses
|
||||
/// Java Calendar for conversion to match Hive's legacy conversion process.
|
||||
void HiveLegacyUtcToLocal(const Timezone& local_tz);
|
||||
|
||||
/// Converts from 'local_tz' to UTC time zone in-place. The caller must ensure the
|
||||
/// TimestampValue this function is called upon has both a valid date and time.
|
||||
///
|
||||
|
||||
@@ -77,6 +77,15 @@ inline TimestampValue TimestampValue::UtcFromUnixTimeLimitedRangeNanos(
|
||||
return UtcFromUnixTimeTicks<NANOS_PER_SEC>(unix_time_nanos);
|
||||
}
|
||||
|
||||
inline TimestampValue TimestampValue::FromUnixTime(time_t unix_time,
|
||||
const Timezone* local_tz) {
|
||||
if (local_tz != UTCPTR) {
|
||||
return UnixTimeToLocal(unix_time, *local_tz);
|
||||
} else {
|
||||
return UtcFromUnixTimeTicks<1>(unix_time);
|
||||
}
|
||||
}
|
||||
|
||||
inline TimestampValue TimestampValue::FromUnixTimeNanos(time_t unix_time, int64_t nanos,
|
||||
const Timezone* local_tz) {
|
||||
unix_time =
|
||||
|
||||
@@ -157,7 +157,8 @@ Frontend::Frontend() {
|
||||
};
|
||||
|
||||
JniMethodDescriptor staticMethods[] = {
|
||||
{"getSecretFromKeyStore", "([B)Ljava/lang/String;", &get_secret_from_key_store_}
|
||||
{"getSecretFromKeyStore", "([B)Ljava/lang/String;", &get_secret_from_key_store_},
|
||||
{"hiveLegacyTimezoneConvert", "([BJ)[B", &hive_legacy_timezone_convert_}
|
||||
};
|
||||
|
||||
JNIEnv* jni_env = JniUtil::GetJNIEnv();
|
||||
@@ -436,3 +437,12 @@ Status Frontend::GetSecretFromKeyStore(const string& secret_key, string* secret)
|
||||
return JniUtil::CallStaticJniMethod(fe_class_, get_secret_from_key_store_, secret_key_t,
|
||||
secret);
|
||||
}
|
||||
|
||||
Status Frontend::HiveLegacyTimezoneConvert(
|
||||
const string& timezone, long utc_time_millis, TCivilTime* local_time) {
|
||||
TStringLiteral timezone_t;
|
||||
timezone_t.__set_value(timezone);
|
||||
return JniCall::static_method(fe_class_, hive_legacy_timezone_convert_)
|
||||
.with_thrift_arg(timezone_t).with_primitive_arg(utc_time_millis)
|
||||
.Call(local_time);
|
||||
}
|
||||
|
||||
@@ -249,6 +249,10 @@ class Frontend {
|
||||
/// Get secret from jceks key store for the input secret_key.
|
||||
Status GetSecretFromKeyStore(const string& secret_key, string* secret);
|
||||
|
||||
/// Convert UTC UNIX time (in millis) to target timezone using Hive legacy conversion.
|
||||
Status HiveLegacyTimezoneConvert(
|
||||
const string& timezone, long utc_time_millis, TCivilTime* local_time);
|
||||
|
||||
private:
|
||||
jclass fe_class_; // org.apache.impala.service.JniFrontend class
|
||||
jobject fe_; // instance of org.apache.impala.service.JniFrontend
|
||||
@@ -292,6 +296,7 @@ class Frontend {
|
||||
jmethodID commit_kudu_txn_; // JniFrontend.commitKuduTransaction()
|
||||
jmethodID convertTable; // JniFrontend.convertTable
|
||||
jmethodID get_secret_from_key_store_; // JniFrontend.getSecretFromKeyStore()
|
||||
jmethodID hive_legacy_timezone_convert_; // JniFrontend.hiveLegacyTimezoneConvert()
|
||||
|
||||
// Only used for testing.
|
||||
jmethodID build_test_descriptor_table_id_; // JniFrontend.buildTestDescriptorTable()
|
||||
|
||||
@@ -1330,6 +1330,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va
|
||||
query_options->__set_estimate_duplicate_in_preagg(IsTrue(value));
|
||||
break;
|
||||
}
|
||||
case TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION: {
|
||||
query_options->__set_use_legacy_hive_timestamp_conversion(IsTrue(value));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
string key = to_string(option);
|
||||
if (IsRemovedQueryOption(key)) {
|
||||
|
||||
@@ -51,7 +51,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
|
||||
// plus one. Thus, the second argument to the DCHECK has to be updated every
|
||||
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
|
||||
constexpr unsigned NUM_QUERY_OPTIONS =
|
||||
TImpalaQueryOptions::ESTIMATE_DUPLICATE_IN_PREAGG + 1;
|
||||
TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION + 1;
|
||||
#define QUERY_OPTS_TABLE \
|
||||
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \
|
||||
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
|
||||
@@ -364,6 +364,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
|
||||
ENABLE_TUPLE_ANALYSIS_IN_AGGREGATE, TQueryOptionLevel::ADVANCED) \
|
||||
QUERY_OPT_FN(estimate_duplicate_in_preagg, \
|
||||
ESTIMATE_DUPLICATE_IN_PREAGG, TQueryOptionLevel::ADVANCED) \
|
||||
QUERY_OPT_FN(use_legacy_hive_timestamp_conversion, \
|
||||
USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED)
|
||||
;
|
||||
|
||||
/// Enforce practical limits on some query options to avoid undesired query state.
|
||||
|
||||
@@ -1092,3 +1092,17 @@ struct TWrappedHttpResponse {
|
||||
5: optional string content
|
||||
6: optional string content_type
|
||||
}
|
||||
|
||||
// Captures civil time - local time in a specific time zone - mirroring
|
||||
// cctz::civil_second. Used to serialize Java timezone conversions back to C++ code.
|
||||
// Omits subsecond measurements because
|
||||
// - matches cctz::civil_second; no known timezone libraries have subsecond adjustments
|
||||
// - Java timezone conversion is only accurate to milliseconds, but we use nanoseconds
|
||||
struct TCivilTime {
|
||||
1: required i32 year
|
||||
2: required i32 month
|
||||
3: required i32 day
|
||||
4: required i32 hour
|
||||
5: required i32 minute
|
||||
6: required i32 second
|
||||
}
|
||||
|
||||
@@ -977,6 +977,15 @@ enum TImpalaQueryOptions {
|
||||
// If True, account for probability of having duplicate grouping key exist in multiple
|
||||
// nodes during preaggreation.
|
||||
ESTIMATE_DUPLICATE_IN_PREAGG = 185
|
||||
|
||||
// When true and CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS is also enabled, TIMESTAMP
|
||||
// conversion to local time will fallback to the timestamp conversion method from Hive
|
||||
// 3.0 and earlier if not specified in the file. This matches the Hive option
|
||||
// 'hive.parquet.timestamp.legacy.conversion.enabled', which defaults to true. Impala
|
||||
// defaults to false because conversion is ~50x slower than Impala's default conversion
|
||||
// method and they produce the same results for modern time periods (post 1970, and in
|
||||
// most instances before that).
|
||||
USE_LEGACY_HIVE_TIMESTAMP_CONVERSION = 186
|
||||
}
|
||||
|
||||
// The summary of a DML statement.
|
||||
|
||||
@@ -759,6 +759,9 @@ struct TQueryOptions {
|
||||
|
||||
// See comment in ImpalaService.thrift
|
||||
186: optional bool estimate_duplicate_in_preagg = true
|
||||
|
||||
// See comment in ImpalaService.thrift
|
||||
187: optional bool use_legacy_hive_timestamp_conversion = false;
|
||||
}
|
||||
|
||||
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
|
||||
|
||||
@@ -263,6 +263,43 @@ DATE_ADD (<varname>timestamp</varname>, INTERVAL <varname>interval</varname> <va
|
||||
Parquet files written by Hive.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Hive versions prior to 3.1 wrote Parquet files in local time using Java's
|
||||
SimpleDateFormat. That method has some cases that differ from both Impala's
|
||||
method and the default method used in Hive 3.1.2+ that are based on the
|
||||
<xref href="https://www.iana.org/time-zones" format="html" scope="external">
|
||||
IANA Time Zone Database</xref>. Hive 4 added the
|
||||
<codeph>writer.zone.conversion.legacy</codeph> Parquet file metadata property
|
||||
to identify which method was used to write the file (controlled by
|
||||
<codeph>hive.parquet.timestamp.write.legacy.conversion.enabled</codeph>). When
|
||||
the Parquet file was written by Parquet Java (<codeph>parquet-mr</codeph>), Hive -
|
||||
and Impala's behavior when
|
||||
<codeph>convert_legacy_hive_parquet_utc_timestamps</codeph> is
|
||||
<codeph>true</codeph> - are:
|
||||
<ul>
|
||||
<li>
|
||||
If <codeph>writer.zone.conversion.legacy</codeph> is present, use the legacy
|
||||
conversion method if true, use the newer method if false.
|
||||
</li>
|
||||
<li>
|
||||
If <codeph>writer.zone.conversion.legacy</codeph> is not present but
|
||||
<codeph>writer.time.zone</codeph> is, we can infer the file was written by
|
||||
Hive 3.1.2+ using new APIs and use the newer method.
|
||||
</li>
|
||||
<li>
|
||||
Otherwise assume it was written by an earlier Hive release. In that case
|
||||
Hive will select conversion method based on
|
||||
<codeph>hive.parquet.timestamp.legacy.conversion.enabled</codeph> (defaults
|
||||
to <codeph>true</codeph>). <keyword keyref="impala45"/> adds the query
|
||||
option <codeph>use_legacy_hive_timestamp_conversion</codeph> to select this
|
||||
behavior. It defaults to <codeph>false</codeph> because conversion is ~50x
|
||||
slower than Impala's default conversion method and they produce the same
|
||||
results for modern time periods (post 1970, and in most instances before
|
||||
that).
|
||||
</li>
|
||||
</ul>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Hive currently cannot write <codeph>INT64</codeph> <codeph>TIMESTAMP</codeph> values.
|
||||
</p>
|
||||
|
||||
@@ -54,6 +54,7 @@ import org.apache.impala.service.Frontend.PlanCtx;
|
||||
import org.apache.impala.thrift.TBackendGflags;
|
||||
import org.apache.impala.thrift.TBuildTestDescriptorTableParams;
|
||||
import org.apache.impala.thrift.TCatalogObject;
|
||||
import org.apache.impala.thrift.TCivilTime;
|
||||
import org.apache.impala.thrift.TDatabase;
|
||||
import org.apache.impala.thrift.TDescribeDbParams;
|
||||
import org.apache.impala.thrift.TDescribeResult;
|
||||
@@ -115,11 +116,14 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.IllegalArgumentException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
|
||||
/**
|
||||
* JNI-callable interface onto a wrapped Frontend instance. The main point is to serialise
|
||||
@@ -808,6 +812,29 @@ public class JniFrontend {
|
||||
return secret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs Hive legacy-compatible timezone conversion
|
||||
*/
|
||||
public static byte[] hiveLegacyTimezoneConvert(byte[] timezoneT, long utc_time_millis)
|
||||
throws ImpalaException {
|
||||
final TStringLiteral timezone = new TStringLiteral();
|
||||
JniUtil.deserializeThrift(protocolFactory_, timezone, timezoneT);
|
||||
// TimeZone.getTimeZone defaults to GMT if it doesn't recognize the timezone.
|
||||
Calendar c = new GregorianCalendar(TimeZone.getTimeZone(timezone.getValue()));
|
||||
c.setTimeInMillis(utc_time_millis);
|
||||
|
||||
final TCivilTime civilTime = new TCivilTime(
|
||||
// Normalize month so January starts at 1.
|
||||
c.get(Calendar.YEAR), c.get(Calendar.MONTH)+1, c.get(Calendar.DAY_OF_MONTH),
|
||||
c.get(Calendar.HOUR_OF_DAY), c.get(Calendar.MINUTE), c.get(Calendar.SECOND));
|
||||
try {
|
||||
TSerializer serializer = new TSerializer(protocolFactory_);
|
||||
return serializer.serialize(civilTime);
|
||||
} catch (TException e) {
|
||||
throw new InternalException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public String validateSaml2Bearer(byte[] serializedRequest) throws ImpalaException{
|
||||
Preconditions.checkNotNull(frontend_);
|
||||
Preconditions.checkNotNull(frontend_.getSaml2Client());
|
||||
|
||||
BIN
testdata/data/employee_hive_3_1_3_us_pacific.parquet
vendored
Normal file
BIN
testdata/data/employee_hive_3_1_3_us_pacific.parquet
vendored
Normal file
Binary file not shown.
BIN
testdata/data/hive_kuala_lumpur_legacy.parquet
vendored
Normal file
BIN
testdata/data/hive_kuala_lumpur_legacy.parquet
vendored
Normal file
Binary file not shown.
BIN
testdata/data/tbl_parq1/000000_0
vendored
Normal file
BIN
testdata/data/tbl_parq1/000000_0
vendored
Normal file
Binary file not shown.
BIN
testdata/data/tbl_parq1/000000_1
vendored
Normal file
BIN
testdata/data/tbl_parq1/000000_1
vendored
Normal file
Binary file not shown.
BIN
testdata/data/tbl_parq1/000000_2
vendored
Normal file
BIN
testdata/data/tbl_parq1/000000_2
vendored
Normal file
Binary file not shown.
46
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
vendored
Normal file
46
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone=PST;
|
||||
SELECT * FROM employee_hive_3_1_3_us_pacific;
|
||||
---- TYPES
|
||||
INT,TIMESTAMP
|
||||
---- RESULTS
|
||||
1,1880-01-01 07:52:58
|
||||
2,1884-01-01 08:00:00
|
||||
3,1990-01-01 08:00:00
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone=PST;
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SELECT * FROM employee_hive_3_1_3_us_pacific;
|
||||
---- TYPES
|
||||
INT,TIMESTAMP
|
||||
---- RESULTS
|
||||
1,1880-01-01 00:00:00
|
||||
2,1884-01-01 00:00:00
|
||||
3,1990-01-01 00:00:00
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone=PST;
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SET use_legacy_hive_timestamp_conversion=true;
|
||||
SELECT * FROM employee_hive_3_1_3_us_pacific;
|
||||
---- TYPES
|
||||
INT,TIMESTAMP
|
||||
---- RESULTS
|
||||
1,1880-01-01 00:00:00
|
||||
2,1884-01-01 00:00:00
|
||||
3,1990-01-01 00:00:00
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone=UTC;
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SET use_legacy_hive_timestamp_conversion=true;
|
||||
SELECT * FROM employee_hive_3_1_3_us_pacific;
|
||||
---- TYPES
|
||||
INT,TIMESTAMP
|
||||
---- RESULTS
|
||||
1,1880-01-01 07:52:58
|
||||
2,1884-01-01 08:00:00
|
||||
3,1990-01-01 08:00:00
|
||||
====
|
||||
34
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
vendored
Normal file
34
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
vendored
Normal file
@@ -0,0 +1,34 @@
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone="Asia/Singapore";
|
||||
SELECT * FROM t;
|
||||
---- RESULTS
|
||||
1899-12-31 16:00:00
|
||||
1899-12-31 16:00:00
|
||||
1899-12-31 17:04:35
|
||||
2020-04-08 05:17:05.215000000
|
||||
2020-04-08 05:17:05.215000000
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone="Asia/Singapore";
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SELECT * FROM t;
|
||||
---- RESULTS
|
||||
1899-12-31 22:55:25
|
||||
1899-12-31 22:55:25
|
||||
1900-01-01 00:00:00
|
||||
2020-04-08 13:17:05.215000000
|
||||
2020-04-08 13:17:05.215000000
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone="Asia/Singapore";
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SET use_legacy_hive_timestamp_conversion=true;
|
||||
SELECT * FROM t;
|
||||
---- RESULTS
|
||||
1899-12-31 22:55:25
|
||||
1900-01-01 00:00:00
|
||||
1900-01-01 00:00:00
|
||||
2020-04-08 13:17:05.215000000
|
||||
2020-04-08 13:17:05.215000000
|
||||
====
|
||||
28
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
vendored
Normal file
28
testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
vendored
Normal file
@@ -0,0 +1,28 @@
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone="Asia/Kuala_Lumpur";
|
||||
SELECT * FROM hive_kuala_lumpur_legacy;
|
||||
---- RESULTS
|
||||
1899-12-31 16:00:00
|
||||
1909-12-31 17:00:00
|
||||
1934-12-31 16:40:00
|
||||
1939-12-31 16:40:00
|
||||
1941-12-31 16:30:00
|
||||
1943-12-31 15:00:00
|
||||
1969-01-28 16:30:00
|
||||
1999-12-31 16:00:00
|
||||
====
|
||||
---- QUERY
|
||||
SET timezone="Asia/Kuala_Lumpur";
|
||||
SET convert_legacy_hive_parquet_utc_timestamps=true;
|
||||
SELECT * FROM hive_kuala_lumpur_legacy;
|
||||
---- RESULTS
|
||||
1900-01-01 00:00:00
|
||||
1910-01-01 00:00:00
|
||||
1935-01-01 00:00:00
|
||||
1940-01-01 00:00:00
|
||||
1942-01-01 00:00:00
|
||||
1944-01-01 00:00:00
|
||||
1969-01-29 00:00:00
|
||||
2000-01-01 00:00:00
|
||||
====
|
||||
76
tests/query_test/test_hive_timestamp_conversion.py
Normal file
76
tests/query_test/test_hive_timestamp_conversion.py
Normal file
@@ -0,0 +1,76 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.file_utils import create_table_and_copy_files, create_table_from_parquet
|
||||
|
||||
|
||||
class TestHiveParquetTimestampConversion(ImpalaTestSuite):
|
||||
"""Tests that Impala can read parquet files written by older versions of Hive or with
|
||||
Hive legacy conversion enabled. Tests use convert_legacy_hive_parquet_utc_timestamps,
|
||||
use_legacy_hive_timestamp_conversion, and timezone to test conversion."""
|
||||
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
return 'functional-query'
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestHiveParquetTimestampConversion, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
v.get_value('table_format').file_format == 'parquet'
|
||||
and v.get_value('table_format').compression_codec == 'none')
|
||||
|
||||
def test_hive_4_legacy(self, vector, unique_database):
|
||||
"""Test that legacy conversion uses the same timezone conversion as Hive when
|
||||
Parquet metadata contains writer.zone.conversion.legacy=true.
|
||||
|
||||
Load test data generated via Hive with TZ=Asia/Kuala_Lumpur:
|
||||
|
||||
create table t (d timestamp) stored as parquet;
|
||||
set hive.parquet.timestamp.write.legacy.conversion.enabled=true;
|
||||
insert into t values ("1900-01-01 00:00:00"), ("1910-01-01 00:00:00"),
|
||||
("1935-01-01 00:00:00"), ("1940-01-01 00:00:00"), ("1942-01-01 00:00:00"),
|
||||
("1944-01-01 00:00:00"), ("1969-01-29 00:00:00"), ("2000-01-01 00:00:00");
|
||||
"""
|
||||
create_table_from_parquet(self.client, unique_database, "hive_kuala_lumpur_legacy")
|
||||
self.run_test_case("QueryTest/timestamp-conversion-hive-4", vector, unique_database)
|
||||
|
||||
def test_hive_313(self, vector, unique_database):
|
||||
"""The parquet file was written with Hive 3.1.3 using the new Date/Time APIs
|
||||
(legacy=false) to convert from US/Pacific to UTC. The presence of writer.time.zone in
|
||||
the metadata of the file allow us to infer that new Date/Time APIS should be used for
|
||||
the conversion. The use_legacy_hive_timestamp_conversion property shouldn't be taken
|
||||
into account in this case.
|
||||
|
||||
Test file from https://github.com/apache/hive/blob/rel/release-4.0.1/data/files/
|
||||
employee_hive_3_1_3_us_pacific.parquet"""
|
||||
create_table_from_parquet(
|
||||
self.client, unique_database, "employee_hive_3_1_3_us_pacific")
|
||||
self.run_test_case("QueryTest/timestamp-conversion-hive-313", vector, unique_database)
|
||||
|
||||
def test_hive_3_mixed(self, vector, unique_database):
|
||||
"""Test table containing Hive legacy timestamps written with Hive prior to 3.1.3.
|
||||
|
||||
Test files target timezone=Asia/Singapore, sourced from
|
||||
https://github.com/apache/hive/tree/rel/release-4.0.1/data/files/tbl_parq1."""
|
||||
create_stmt = "create table %s.t (d timestamp) stored as parquet" % unique_database
|
||||
create_table_and_copy_files(self.client, create_stmt, unique_database, "t",
|
||||
["testdata/data/tbl_parq1/" + f for f in ["000000_0", "000000_1", "000000_2"]])
|
||||
self.run_test_case("QueryTest/timestamp-conversion-hive-3m", vector, unique_database)
|
||||
Reference in New Issue
Block a user