diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt index e954583d8..1739375de 100644 --- a/be/src/benchmarks/CMakeLists.txt +++ b/be/src/benchmarks/CMakeLists.txt @@ -56,5 +56,6 @@ ADD_BE_BENCHMARK(string-compare-benchmark) ADD_BE_BENCHMARK(string-search-benchmark) ADD_BE_BENCHMARK(thread-create-benchmark) ADD_BE_BENCHMARK(tuple-layout-benchmark) +ADD_BE_BENCHMARK(convert-timestamp-benchmark) target_link_libraries(hash-benchmark Experiments) diff --git a/be/src/benchmarks/convert-timestamp-benchmark.cc b/be/src/benchmarks/convert-timestamp-benchmark.cc new file mode 100644 index 000000000..6ba718399 --- /dev/null +++ b/be/src/benchmarks/convert-timestamp-benchmark.cc @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "exprs/timezone_db.h" +#include "runtime/timestamp-parse-util.h" +#include "runtime/timestamp-value.h" +#include "util/benchmark.h" +#include "util/cpu-info.h" +#include "util/pretty-printer.h" +#include "util/stopwatch.h" + +#include "common/names.h" + +namespace gregorian = boost::gregorian; +using boost::posix_time::duration_from_string; +using boost::posix_time::hours; +using boost::posix_time::nanoseconds; +using boost::posix_time::ptime; +using boost::posix_time::time_duration; +using boost::posix_time::to_iso_extended_string; +using boost::posix_time::to_simple_string; +using boost::local_time::time_zone_ptr; +using boost::local_time::posix_time_zone; +using namespace impala; + +// Benchmark for converting timestamps from UTC to local time and from UTC to a given time +// zone. +// Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz +// ConvertTimestamp: Function 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// ------------------------------------------------------------------------------------ +// FromUtc 0.0147 0.0152 0.0155 1X 1X 1X +// UtcToLocal 0.0216 0.0228 0.0234 1.47X 1.5X 1.51X + +time_zone_ptr LOCAL_TZ; + +struct TestData { + vector data; + vector result; +}; + +void AddTestDataDateTimes(TestData* data, int n, const string& startstr) { + DateTimeFormatContext dt_ctx; + dt_ctx.Reset("yyyy-MMM-dd HH:mm:ss", 19); + TimestampParser::ParseFormatTokens(&dt_ctx); + + ptime start(boost::posix_time::time_from_string(startstr)); + for (int i = 0; i < n; ++i) { + int val = rand(); + start += gregorian::date_duration(rand() % 100); + start += nanoseconds(val); + stringstream ss; + ss << to_simple_string(start); + string ts = ss.str(); + data->data.push_back(TimestampValue(ts.c_str(), ts.size(), dt_ctx)); + } +} + +void TestFromUtc(int batch_size, void* d) { + TestData* data = reinterpret_cast(d); + for (int i = 0; i < batch_size; ++i) { + int n = data->data.size(); + for (int j = 0; j < n; ++j) { + TimestampValue ts = data->data[j]; + ts.FromUtc(LOCAL_TZ); + data->result[j] = ts; + } + } +} + +void TestUtcToLocal(int batch_size, void* d) { + TestData* data = reinterpret_cast(d); + for (int i = 0; i < batch_size; ++i) { + int n = data->data.size(); + for (int j = 0; j < n; ++j) { + TimestampValue ts = data->data[j]; + ts.UtcToLocal(); + data->result[j] = ts; + } + } +} + +int main(int argc, char **argv) { + CpuInfo::Init(); + cout << Benchmark::GetMachineInfo() << endl; + + TimestampParser::Init(); + ABORT_IF_ERROR(TimezoneDatabase::Initialize()); + + // Initialize LOCAL_TZ to local time zone + tzset(); + time_t now = time(0); + LOCAL_TZ = time_zone_ptr(new posix_time_zone(tzname[localtime(&now)->tm_isdst])); + + TestData datetimes; + AddTestDataDateTimes(&datetimes, 10000, "1953-04-22 01:02:03"); + datetimes.result.resize(datetimes.data.size()); + + Benchmark timestamp_suite("ConvertTimestamp"); + timestamp_suite.AddBenchmark("FromUtc", TestFromUtc, &datetimes); + timestamp_suite.AddBenchmark("UtcToLocal", TestUtcToLocal, &datetimes); + + cout << timestamp_suite.Measure() << endl; + + // If number of threads is specified, run multithreaded tests. + int num_of_threads = (argc < 2) ? 0 : atoi(argv[1]); + if (num_of_threads >= 1) { + vector > threads(num_of_threads); + StopWatch sw; + // Test UtcToLocal() + sw.Start(); + for (auto& t: threads) { + t = boost::shared_ptr( + new boost::thread(TestUtcToLocal, 100, &datetimes)); + } + for (auto& t: threads) t->join(); + uint64_t utc_to_local_elapsed_time = sw.ElapsedTime(); + sw.Stop(); + + // Test FromUtc() + sw.Start(); + for (auto& t: threads) { + t = boost::shared_ptr( + new boost::thread(TestFromUtc, 100, &datetimes)); + } + for (auto& t: threads) t->join(); + uint64_t from_utc_elapsed_time = sw.ElapsedTime(); + sw.Stop(); + + cout << "Number of threads: " << num_of_threads << endl + << "TestFromUtc: " + << PrettyPrinter::Print(from_utc_elapsed_time, TUnit::CPU_TICKS) << endl + << "TestUtcToLocal: " + << PrettyPrinter::Print(utc_to_local_elapsed_time, TUnit::CPU_TICKS) << endl; + } + + return 0; +} diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 24802e31e..b22e67ce4 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -88,6 +88,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, min_max_tuple_desc_(nullptr), skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? tnode.hdfs_scan_node.skip_header_line_count : 0), + parquet_mr_write_zone_(tnode.hdfs_scan_node.__isset.parquet_mr_write_zone ? + tnode.hdfs_scan_node.parquet_mr_write_zone : ""), tuple_id_(tnode.hdfs_scan_node.tuple_id), reader_context_(NULL), tuple_desc_(NULL), diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 1711bb5d2..3d97e2e56 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -153,6 +153,7 @@ class HdfsScanNodeBase : public ScanNode { const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); } RuntimeState* runtime_state() { return runtime_state_; } int skip_header_line_count() const { return skip_header_line_count_; } + const std::string& parquet_mr_write_zone() const { return parquet_mr_write_zone_; } DiskIoRequestContext* reader_context() { return reader_context_; } typedef std::map> ConjunctsMap; @@ -312,6 +313,11 @@ class HdfsScanNodeBase : public ScanNode { // to values > 0 for hdfs text files. const int skip_header_line_count_; + // Time zone for adjusting timestamp values read from Parquet files written by + // parquet-mr. If conversion should not occur, this is set to an empty string. Otherwise + // FE guarantees that this is a valid time zone. + const std::string parquet_mr_write_zone_; + /// Tuple id resolved in Prepare() to set tuple_desc_ const int tuple_id_; diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index f92de482c..53848e2ce 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -27,6 +27,7 @@ #include "exec/parquet-metadata-utils.h" #include "exec/parquet-scratch-tuple-batch.h" #include "exec/read-write-util.h" +#include "exprs/timezone_db.h" #include "rpc/thrift-util.h" #include "runtime/collection-value-builder.h" #include "runtime/tuple-row.h" @@ -206,7 +207,9 @@ class ScalarColumnReader : public BaseScalarColumnReader { ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : BaseScalarColumnReader(parent, node, slot_desc), - dict_decoder_init_(false) { + dict_decoder_init_(false), + timezone_(NULL), + is_timestamp_dependent_timezone_(false) { if (!MATERIALIZED) { // We're not materializing any values, just counting them. No need (or ability) to // initialize state used to materialize values. @@ -223,12 +226,25 @@ class ScalarColumnReader : public BaseScalarColumnReader { } else { fixed_len_size_ = -1; } - needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || - // TODO: Add logic to detect file versions that have unconverted TIMESTAMP - // values. Currently all versions have converted values. - (FLAGS_convert_legacy_hive_parquet_utc_timestamps && - slot_desc_->type().type == TYPE_TIMESTAMP && - parent->file_version_.application == "parquet-mr"); + needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || ( + slot_desc_->type().type == TYPE_TIMESTAMP && + parent->file_version_.application == "parquet-mr" && + parent_->scan_node_->parquet_mr_write_zone() != "UTC" && ( + !parent_->scan_node_->parquet_mr_write_zone().empty() || + FLAGS_convert_legacy_hive_parquet_utc_timestamps + ) + ); + + if (needs_conversion_ && slot_desc_->type().type == TYPE_TIMESTAMP && + !parent_->scan_node_->parquet_mr_write_zone().empty()) { + is_timestamp_dependent_timezone_ = TimezoneDatabase::IsTimestampDependentTimezone( + parent_->scan_node_->parquet_mr_write_zone()); + if (!is_timestamp_dependent_timezone_) { + timezone_ = TimezoneDatabase::FindTimezone( + parent_->scan_node_->parquet_mr_write_zone()); + } + DCHECK_EQ(is_timestamp_dependent_timezone_, (timezone_ == NULL)); + } } virtual ~ScalarColumnReader() { } @@ -536,6 +552,16 @@ class ScalarColumnReader : public BaseScalarColumnReader { /// true if decoded values must be converted before being written to an output tuple. bool needs_conversion_; + /// Used to cache the timezone object corresponding to the "parquet.mr.int96.write.zone" + /// table property to avoid repeated calls to TimezoneDatabase::FindTimezone(). Set to + /// NULL if the table property is not set, or if it is set to UTC or to a timestamp + /// dependent timezone. + boost::local_time::time_zone_ptr timezone_; + + /// true if "parquet.mr.int96.write.zone" table property is set to a timestamp dependent + /// timezone. + bool is_timestamp_dependent_timezone_; + /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or /// the max length for VARCHAR columns. Unused otherwise. int fixed_len_size_; @@ -579,13 +605,63 @@ inline bool ScalarColumnReader::NeedsConversionInline() co return needs_conversion_; } +/// Sets timestamp conversion error message in 'scanner_status'. Returns false if the +/// execution should be aborted, otherwise returns true. +bool __attribute__((noinline)) SetTimestampConversionError(HdfsScanNodeBase* scan_node, + RuntimeState* scanner_state, const TimestampValue* tv, const string& timezone, + const string& detail, Status* scanner_status) { + ErrorMsg msg(TErrorCode::PARQUET_MR_TIMESTAMP_CONVERSION_FAILED, tv->DebugString(), + timezone, scan_node->hdfs_table()->fully_qualified_name()); + if (!detail.empty()) msg.AddDetail(detail); + Status status = scanner_state->LogOrReturnError(msg); + if (!status.ok()) { + *scanner_status = status; + return false; + } + return true; +} + template<> bool ScalarColumnReader::ConvertSlot( - const TimestampValue* src, TimestampValue* dst, MemPool* pool) { - // Conversion should only happen when this flag is enabled. - DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps); +const TimestampValue* src, TimestampValue* dst, MemPool* pool) { + // Conversion should only happen when "parquet.mr.int96.write.zone" table property is + // not set to "UTC" + DCHECK_NE(parent_->scan_node_->parquet_mr_write_zone(), "UTC"); + *dst = *src; - if (dst->HasDateAndTime()) dst->UtcToLocal(); + if (LIKELY(dst->HasDateAndTime())) { + if (LIKELY(timezone_ != NULL)) { + // Not a timestamp specific timezone. Convert timestamp to the timezone object + // cached in timezone_. + if (UNLIKELY(!dst->FromUtc(timezone_))) { + if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_, + src, parent_->scan_node_->parquet_mr_write_zone(), "", + &parent_->parse_status_)) { + return false; + } + } + } else if (UNLIKELY(is_timestamp_dependent_timezone_)) { + // Timestamp specific timezone (such as Moscow pre 2011). + // Call timestamp conversion function with the timezone string. + if (UNLIKELY(!dst->FromUtc(parent_->scan_node_->parquet_mr_write_zone()))) { + if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_, + src, parent_->scan_node_->parquet_mr_write_zone(), "", + &parent_->parse_status_)) { + return false; + } + } + } else { + DCHECK(parent_->scan_node_->parquet_mr_write_zone().empty()); + DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps); + Status s = dst->UtcToLocal(); + if (UNLIKELY(!s.ok())) { + if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_, + src, "localtime", s.GetDetail(), &parent_->parse_status_)) { + return false; + } + } + } + } return true; } diff --git a/be/src/exprs/timestamp-functions.cc b/be/src/exprs/timestamp-functions.cc index 89bb498ee..b2a33d49e 100644 --- a/be/src/exprs/timestamp-functions.cc +++ b/be/src/exprs/timestamp-functions.cc @@ -196,8 +196,32 @@ void TimestampFunctions::UnixAndFromUnixClose(FunctionContext* context, } } +time_zone_ptr TimezoneDatabase::FindTimezone(const string& tz) { + if (tz.empty()) return NULL; + + // Look up time zone in 'tz_database' by region. + time_zone_ptr tzp = tz_database_.time_zone_from_region(tz); + if (tzp != NULL) return tzp; + + // Look up time zone in 'tz_database' by name variations. The following name variations + // are considered: + // - daylight savings abbreviation + // - standard abbreviation + // - daylight savings name + // - standard name + for (const string& tz_region: tz_region_list_) { + time_zone_ptr tzp = tz_database_.time_zone_from_region(tz_region); + DCHECK(tzp != NULL); + if (tzp->dst_zone_abbrev() == tz) return tzp; + if (tzp->std_zone_abbrev() == tz) return tzp; + if (tzp->dst_zone_name() == tz) return tzp; + if (tzp->std_zone_name() == tz) return tzp; + } + return NULL; +} + time_zone_ptr TimezoneDatabase::FindTimezone( - const string& tz, const TimestampValue& tv, bool tv_in_utc) { + const string& tz, const TimestampValue& tv, bool tv_in_utc) { // The backing database does not handle timezone rule changes. if (iequals("Europe/Moscow", tz) || iequals("Moscow", tz) || iequals("MSK", tz)) { if (tv.date().year() < 2011 || (tv.date().year() == 2011 && tv.date().month() < 4)) { @@ -228,20 +252,11 @@ time_zone_ptr TimezoneDatabase::FindTimezone( } } - // See if they specified a zone id - time_zone_ptr tzp = tz_database_.time_zone_from_region(tz); - if (tzp != NULL) return tzp; + return FindTimezone(tz); +} - for (vector::const_iterator iter = tz_region_list_.begin(); - iter != tz_region_list_.end(); ++iter) { - time_zone_ptr tzp = tz_database_.time_zone_from_region(*iter); - DCHECK(tzp != NULL); - if (tzp->dst_zone_abbrev() == tz) return tzp; - if (tzp->std_zone_abbrev() == tz) return tzp; - if (tzp->dst_zone_name() == tz) return tzp; - if (tzp->std_zone_name() == tz) return tzp; - } - return time_zone_ptr(); +bool TimezoneDatabase::IsTimestampDependentTimezone(const string& tz) { + return iequals("Europe/Moscow", tz) || iequals("Moscow", tz) || iequals("MSK", tz); } } diff --git a/be/src/exprs/timezone_db.h b/be/src/exprs/timezone_db.h index 3a1178a42..9a9b14f82 100644 --- a/be/src/exprs/timezone_db.h +++ b/be/src/exprs/timezone_db.h @@ -42,6 +42,15 @@ class TimezoneDatabase { static boost::local_time::time_zone_ptr FindTimezone(const std::string& tz, const TimestampValue& tv, bool tv_in_utc); + /// Converts the name of a timezone to a boost timezone object without taking into + /// account the timestamp. May not work correctly when IsTimestampDependentTimezone(tz) + /// is true, e.g. Moscow timezone. + /// If 'tz' is not found in the database, nullptr is returned. + static boost::local_time::time_zone_ptr FindTimezone(const std::string& tz); + + /// Returns true if 'tz' specifies a timezone that was changed in the past. + static bool IsTimestampDependentTimezone(const std::string& tz); + /// Moscow timezone UTC+3 with DST, for use before March 27, 2011. static const boost::local_time::time_zone_ptr TIMEZONE_MSK_PRE_2011_DST; diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc index 72be7fdc7..5d1c8b95b 100644 --- a/be/src/runtime/timestamp-value.cc +++ b/be/src/runtime/timestamp-value.cc @@ -16,6 +16,8 @@ // under the License. #include "runtime/timestamp-value.h" +#include "exprs/timestamp-functions.h" +#include "exprs/timezone_db.h" #include @@ -26,6 +28,8 @@ using boost::date_time::not_a_date_time; using boost::gregorian::date; using boost::gregorian::date_duration; +using boost::local_time::local_date_time; +using boost::local_time::time_zone_ptr; using boost::posix_time::nanoseconds; using boost::posix_time::ptime; using boost::posix_time::ptime_from_tm; @@ -68,7 +72,7 @@ int TimestampValue::Format(const DateTimeFormatContext& dt_ctx, int len, char* b return TimestampParser::Format(dt_ctx, date_, time_, len, buff); } -void TimestampValue::UtcToLocal() { +Status TimestampValue::UtcToLocal() { DCHECK(HasDateAndTime()); // Previously, conversion was done using boost functions but it was found to be // too slow. Doing the conversion without function calls (which also avoids some @@ -83,7 +87,7 @@ void TimestampValue::UtcToLocal() { tm temp; if (UNLIKELY(NULL == localtime_r(&utc, &temp))) { *this = ptime(not_a_date_time); - return; + return Status("Failed to convert timestamp to local time."); } // Unlikely but a time zone conversion may push the value over the min/max // boundary resulting in an exception. @@ -93,9 +97,33 @@ void TimestampValue::UtcToLocal() { static_cast(temp.tm_mday)); time_ = time_duration(temp.tm_hour, temp.tm_min, temp.tm_sec, time().fractional_seconds()); - } catch (std::exception& /* from Boost */) { + } catch (std::exception& from_boost) { + Status s("Failed to convert timestamp to local time."); + s.AddDetail(from_boost.what()); *this = ptime(not_a_date_time); + return s; } + return Status::OK(); +} + +bool TimestampValue::FromUtc(const std::string& timezone_str) { + DCHECK(HasDateAndTime()); + time_zone_ptr timezone = TimezoneDatabase::FindTimezone(timezone_str, *this, true); + if (UNLIKELY(timezone == NULL)) { + *this = ptime(not_a_date_time); + return false; + } + return FromUtc(timezone); +} + +bool TimestampValue::FromUtc(time_zone_ptr timezone) { + DCHECK(HasDateAndTime()); + DCHECK(timezone != NULL); + ptime temp; + ToPtime(&temp); + local_date_time lt(temp, timezone); + *this = lt.local_time(); + return true; } ostream& operator<<(ostream& os, const TimestampValue& timestamp_value) { diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h index 58829285f..bdd92d5db 100644 --- a/be/src/runtime/timestamp-value.h +++ b/be/src/runtime/timestamp-value.h @@ -21,12 +21,14 @@ #include #include +#include #include #include #include #include #include +#include "common/status.h" #include "udf/udf.h" #include "util/hash-util.h" @@ -194,8 +196,16 @@ class TimestampValue { } /// Converts from UTC to local time in-place. The caller must ensure the TimestampValue - /// this function is called upon has both a valid date and time. - void UtcToLocal(); + /// this function is called upon has both a valid date and time. Returns Status::OK() if + /// conversion was successfull and an error Status otherwise. If conversion failed *this + /// is set to a ptime object initialized to not_a_date_time. + Status UtcToLocal(); + + /// Converts from UTC to given timezone in-place. Returns true if conversion was + /// successfull and false otherwise. If conversion failed *this is set to a ptime object + /// initialized to not_a_date_time. + bool FromUtc(const std::string& timezone_str); + bool FromUtc(boost::local_time::time_zone_ptr timezone); void set_date(const boost::gregorian::date d) { date_ = d; } void set_time(const boost::posix_time::time_duration t) { time_ = t; } diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 925a59c9b..8f5235245 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -28,6 +28,8 @@ #include "exec/catalog-op-executor.h" #include "exprs/expr-context.h" #include "exprs/expr.h" +#include "exprs/timestamp-functions.h" +#include "exprs/timezone_db.h" #include "gen-cpp/Data_types.h" #include "gen-cpp/Frontend_types.h" #include "rpc/jni-thrift-util.h" @@ -37,6 +39,7 @@ #include "runtime/hdfs-fs-cache.h" #include "runtime/lib-cache.h" #include "runtime/runtime-state.h" +#include "runtime/timestamp-value.h" #include "service/impala-server.h" #include "util/cpu-info.h" #include "util/debug-util.h" @@ -66,6 +69,7 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit( // Init the JVM to load the classes in JniUtil that are needed for returning // exceptions to the FE. InitCommonRuntime(1, &name, true, TestInfo::FE_TEST); + ABORT_IF_ERROR(TimezoneDatabase::Initialize()); LlvmCodeGen::InitializeLlvm(true); ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process. exec_env->InitForFeTests(); @@ -346,6 +350,17 @@ Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad( return result_bytes; } +// Used to call native code from the FE to check if a timezone string is valid or not. +extern "C" +JNIEXPORT jboolean JNICALL +Java_org_apache_impala_service_FeSupport_NativeCheckIsValidTimeZone( + JNIEnv* env, jclass caller_class, jstring timezone) { + const char *tz = env->GetStringUTFChars(timezone, NULL); + jboolean tz_found = tz != NULL && TimezoneDatabase::FindTimezone(tz) != NULL; + env->ReleaseStringUTFChars(timezone, tz); + return tz_found; +} + namespace impala { static JNINativeMethod native_methods[] = { @@ -369,6 +384,10 @@ static JNINativeMethod native_methods[] = { (char*)"NativePrioritizeLoad", (char*)"([B)[B", (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad }, + { + (char*)"NativeCheckIsValidTimeZone", (char*)"(Ljava/lang/String;)Z", + (void*)::Java_org_apache_impala_service_FeSupport_NativeCheckIsValidTimeZone + } }; void InitFeSupport(bool disable_codegen) { diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 86cb0c92b..613291853 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -120,6 +120,10 @@ DEFINE_string(default_query_options, "", "key=value pair of default query option DEFINE_int32(query_log_size, 25, "Number of queries to retain in the query log. If -1, " "the query log has unbounded size."); DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file."); +DEFINE_bool(set_parquet_mr_int96_write_zone_to_utc_on_new_tables, false, "if true, sets " + "the parquet.mr.int96.write.zone table property to UTC for new tables created using " + "CREATE TABLE and CREATE TABLE LIKE . You can find details in the " + "documentation."); DEFINE_int64(max_result_cache_size, 100000L, "Maximum number of query results a client " "may request to be cached on a per-query basis to support restarting fetches. This " diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 0781393c3..7b2cdd986 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -26,6 +26,7 @@ // Configs for the Frontend and the Catalog. DECLARE_bool(load_catalog_in_background); DECLARE_bool(load_auth_to_local_rules); +DECLARE_bool(set_parquet_mr_int96_write_zone_to_utc_on_new_tables); DECLARE_int32(non_impala_java_vlog); DECLARE_int32(read_size); DECLARE_int32(num_metadata_loading_threads); @@ -51,6 +52,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { TBackendGflags cfg; cfg.__set_authorization_policy_file(FLAGS_authorization_policy_file); cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background); + cfg.__set_set_parquet_mr_int96_write_zone_to_utc_on_new_tables( + FLAGS_set_parquet_mr_int96_write_zone_to_utc_on_new_tables); cfg.__set_server_name(FLAGS_server_name); cfg.__set_sentry_config(FLAGS_sentry_config); cfg.__set_authorization_policy_provider_class( diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 09cf6f7f0..edf74225d 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -54,4 +54,8 @@ struct TBackendGflags { 16: required i32 kudu_operation_timeout_ms 17: required i32 initial_hms_cnxn_timeout_s + + // If true, new HDFS tables created using CREATE TABLE and CREATE TABLE LIKE + // regardless of format will have the "parquet.mr.int96.write.zone" property set to UTC. + 18: required bool set_parquet_mr_int96_write_zone_to_utc_on_new_tables } diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index b83b84d21..1baffc0eb 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -215,6 +215,11 @@ struct THdfsScanNode { // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible // for dictionary filtering. 9: optional map> dictionary_filter_conjuncts + + // Specifies a time zone for adjusting timestamp values read from Parquet files written + // by parquet-mr. The actual value comes from "parquet.mr.int96.write.zone" table + // property. This is used for a Hive compatibilty fix. + 10: optional string parquet_mr_write_zone } struct TDataSourceScanNode { diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 33bae258a..84a0dadc7 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -315,6 +315,9 @@ error_codes = ( ("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' at " "offset $2: could only read $3 bytes"), + + ("PARQUET_MR_TIMESTAMP_CONVERSION_FAILED", 103, "Failed to convert timestamp '$0' to " + "timezone '$1' for a Parquet file in table '$2'."), ) import sys diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java index 08007b30e..f0e8f1106 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Table; import org.apache.impala.common.AnalysisException; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TAlterTableParams; import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams; import org.apache.impala.thrift.TAlterTableType; @@ -99,6 +100,9 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt { // Analyze 'skip.header.line.format' property. analyzeSkipHeaderLineCount(getTargetTable(), tblProperties_); + + // Analyze 'parquet.mr.int96.write.zone' + analyzeParquetMrWriteZone(getTargetTable(), tblProperties_); } /** @@ -155,4 +159,27 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt { if (error.length() > 0) throw new AnalysisException(error.toString()); } } + + /** + * Analyze the 'parquet.mr.int96.write.zone' property to make sure it is set to a valid + * value. It is looked up in 'tblProperties', which must not be null. If 'table' is not + * null, then the method ensures that 'parquet.mr.int96.write.zone' is supported for its + * table type. If it is null, then this check is omitted. + */ + private static void analyzeParquetMrWriteZone(Table table, + Map tblProperties) throws AnalysisException { + if (tblProperties.containsKey(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)) { + if (table != null && !(table instanceof HdfsTable)) { + throw new AnalysisException(String.format( + "Table property '%s' is only supported for HDFS tables.", + HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)); + } + String timezone = tblProperties.get(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE); + if (!FeSupport.CheckIsValidTimeZone(timezone)) { + throw new AnalysisException(String.format( + "Invalid time zone in the '%s' table property: %s", + HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone)); + } + } + } } diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java index 1691315be..2350398d5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java @@ -20,6 +20,8 @@ package org.apache.impala.analysis; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Table; import org.apache.impala.common.AnalysisException; +import org.apache.impala.service.FeSupport; + import com.google.common.base.Preconditions; /** @@ -64,6 +66,7 @@ public class BaseTableRef extends TableRef { analyzeHints(analyzer); analyzeJoin(analyzer); analyzeSkipHeaderLineCount(); + analyzeParquetMrWriteZone(); } @Override @@ -95,4 +98,20 @@ public class BaseTableRef extends TableRef { hdfsTable.parseSkipHeaderLineCount(error); if (error.length() > 0) throw new AnalysisException(error.toString()); } + + /** + * Analyze the 'parquet.mr.int96.write.zone' property. + */ + private void analyzeParquetMrWriteZone() throws AnalysisException { + Table table = getTable(); + if (!(table instanceof HdfsTable)) return; + HdfsTable hdfsTable = (HdfsTable)table; + + String timezone = hdfsTable.getParquetMrWriteZone(); + if (timezone != null && !FeSupport.CheckIsValidTimeZone(timezone)) { + throw new AnalysisException(String.format( + "Invalid time zone in the '%s' table property: %s", + HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone)); + } + } } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index 1139005b1..8ee792763 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -23,10 +23,13 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.impala.authorization.PrivilegeRequestBuilder; +import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.RowFormat; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.service.BackendConfig; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.THdfsFileFormat; import org.apache.impala.thrift.TTableName; @@ -176,6 +179,24 @@ public class CreateTableStmt extends StatementBase { } AvroSchemaUtils.setFromSerdeComment(getColumnDefs()); } + + if (getTblProperties().containsKey(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)) { + if (getFileFormat() == THdfsFileFormat.KUDU) { + throw new AnalysisException(String.format( + "Table property '%s' is only supported for HDFS tables.", + HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)); + } + String timezone = getTblProperties().get(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE); + if (!FeSupport.CheckIsValidTimeZone(timezone)) { + throw new AnalysisException(String.format( + "Invalid time zone in the '%s' table property: %s", + HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone)); + } + } else if (BackendConfig.INSTANCE.isSetParquetMrWriteZoneToUtcOnNewTables()) { + if (getFileFormat() != THdfsFileFormat.KUDU) { + getTblProperties().put(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, "UTC"); + } + } } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 143e2b19b..b6df167c6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -115,6 +115,10 @@ public class HdfsTable extends Table { // Table property key for skip.header.line.count public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count"; + // Table property key for parquet.mr.int96.write.zone + public static final String TBL_PROP_PARQUET_MR_WRITE_ZONE = + "parquet.mr.int96.write.zone"; + // An invalid network address, which will always be treated as remote. private final static TNetworkAddress REMOTE_NETWORK_ADDRESS = new TNetworkAddress("remote*addr", 0); @@ -1376,6 +1380,18 @@ public class HdfsTable extends Table { return skipHeaderLineCount; } + /** + * Returns the value of the 'parquet.mr.int96.write.zone' table property. If the value + * is not set for the table, returns null. + */ + public String getParquetMrWriteZone() { + org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); + if (msTbl == null) return null; + Map tblProperties = msTbl.getParameters(); + if (tblProperties == null) return null; + return tblProperties.get(TBL_PROP_PARQUET_MR_WRITE_ZONE); + } + /** * Sets avroSchema_ if the table or any of the partitions in the table are stored * as Avro. Additionally, this method also reconciles the schema if the column diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 95458281c..2a9c84ce0 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -151,7 +151,6 @@ public class HdfsScanNode extends ScanNode { private static final Configuration CONF = new Configuration(); - // List of conjuncts for min/max values of parquet::Statistics, that are used to skip // data when scanning Parquet files. private List minMaxConjuncts_ = Lists.newArrayList(); @@ -719,6 +718,10 @@ public class HdfsScanNode extends ScanNode { if (skipHeaderLineCount_ > 0) { msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_); } + String parquetMrWriteZone = tbl_.getParquetMrWriteZone(); + if (parquetMrWriteZone != null) { + msg.hdfs_scan_node.setParquet_mr_write_zone(parquetMrWriteZone); + } msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); if (!minMaxConjuncts_.isEmpty()) { for (Expr e: minMaxConjuncts_) { diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 5a4a440e5..04de23834 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -54,6 +54,9 @@ public class BackendConfig { !Strings.isNullOrEmpty(backendCfg_.principal); } public int getKuduClientTimeoutMs() { return backendCfg_.kudu_operation_timeout_ms; } + public boolean isSetParquetMrWriteZoneToUtcOnNewTables() { + return backendCfg_.set_parquet_mr_int96_write_zone_to_utc_on_new_tables; + } public int getImpalaLogLevel() { return backendCfg_.impala_log_lvl; } public int getNonImpalaJavaVlogLevel() { return backendCfg_.non_impala_java_vlog; } diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index 8b87962ef..f712752cf 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -84,6 +84,10 @@ public class FeSupport { // using Java Thrift bindings. public native static byte[] NativePrioritizeLoad(byte[] thriftReq); + // Returns true if timezone String is valid according to the BE timezone database, false + // otherwise. + public native static boolean NativeCheckIsValidTimeZone(String timezone); + /** * Locally caches the jar at the specified HDFS location. * @@ -261,6 +265,16 @@ public class FeSupport { } } + public static boolean CheckIsValidTimeZone(String timezone) { + if (timezone == null) return false; + try { + return NativeCheckIsValidTimeZone(timezone); + } catch (UnsatisfiedLinkError e) { + loadLibrary(); + } + return NativeCheckIsValidTimeZone(timezone); + } + /** * This function should only be called explicitly by the FeSupport to ensure that * native functions are loaded. diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 1278372b8..4a662f395 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -663,6 +663,58 @@ public class AnalyzeDDLTest extends FrontendTestBase { "ALTER TABLE SET not currently supported on HBase tables."); } + @Test + public void TestParquetMrInt96WriteZone() { + // Attempt to set 'parquet.mr.int96.write.zone' when creating a table. Positive cases. + AnalyzesOk("create table tbl (i int) tblproperties " + + "('parquet.mr.int96.write.zone'='EST')"); + AnalyzesOk("create table tbl tblproperties " + + "('parquet.mr.int96.write.zone'='EST') " + + "as select * from functional.alltypesnopart"); + AnalyzesOk("create external table tbl like parquet " + + "'/test-warehouse/alltypesagg_hive_13_1_parquet/" + + "alltypesagg_hive_13_1.parquet' " + + "stored as parquet " + + "tblproperties ('parquet.mr.int96.write.zone'='EST')"); + // Cannot set 'parquet.mr.int96.write.zone' table property when creating a non-HDFS + // table. + AnalysisError("create external table tbl stored as kudu tblproperties (" + + "'kudu.table_name'='tab'," + + "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081'," + + "'parquet.mr.int96.write.zone'='EST')", + "Table property 'parquet.mr.int96.write.zone' is only supported for HDFS " + + "tables."); + // Cannot set 'parquet.mr.int96.write.zone' table property to an invalid time zone + // when creating a table. + AnalysisError("create table tbl (i int) tblproperties" + + "('parquet.mr.int96.write.zone'='garbage')", + "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage"); + AnalysisError("create table tbl tblproperties " + + "('parquet.mr.int96.write.zone'='garbage') " + + "as select * from functional.alltypesnopart", + "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage"); + AnalysisError("create external table tbl like parquet " + + "'/test-warehouse/alltypesagg_hive_13_1_parquet/" + + "alltypesagg_hive_13_1.parquet' " + + "stored as parquet " + + "tblproperties ('parquet.mr.int96.write.zone'='garbage')", + "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage"); + + // Attempt to set 'parquet.mr.int96.write.zone' table property. Positive case. + AnalyzesOk("alter table functional.alltypes set tblproperties" + + "('parquet.mr.int96.write.zone'='EST')"); + // Cannot set 'parquet.mr.int96.write.zone' table property on a table not backed by + // HDFS. + AnalysisError("alter table functional_kudu.alltypes set tblproperties" + + "('parquet.mr.int96.write.zone'='EST')", + "Table property 'parquet.mr.int96.write.zone' is only supported for HDFS " + + "tables."); + // Cannot set 'parquet.mr.int96.write.zone' table property to an invalid time zone. + AnalysisError("alter table functional.alltypes set tblproperties" + + "('parquet.mr.int96.write.zone'='garbage')", + "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage"); + } + @Test public void TestAlterTableSetCached() { // Positive cases diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 2972cb868..af0ed1d23 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -495,6 +495,29 @@ class ImpalaTestSuite(BaseTestSuite): assert not result.success, "No failure encountered for query %s" % query return result + def _get_properties(self, section_name, table_name): + """Extracts the table properties mapping from the output of DESCRIBE FORMATTED""" + result = self.client.execute("describe formatted " + table_name) + match = False + properties = dict(); + for row in result.data: + if section_name in row: + match = True + elif match: + row = row.split('\t') + if (row[1] == 'NULL'): + break + properties[row[1].rstrip()] = row[2].rstrip() + return properties + + def get_table_properties(self, table_name): + """Extracts the table properties mapping from the output of DESCRIBE FORMATTED""" + return self._get_properties('Table Parameters:', table_name) + + def get_serde_properties(self, table_name): + """Extracts the serde properties mapping from the output of DESCRIBE FORMATTED""" + return self._get_properties('Storage Desc Params:', table_name) + @execute_wrapper def execute_query(self, query, query_options=None): return self.__execute_query(self.client, query, query_options) diff --git a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py index 4d7c20246..fd631fd32 100644 --- a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py +++ b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py @@ -21,12 +21,41 @@ import pytest import time from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.util.filesystem_utils import get_fs_path class TestHiveParquetTimestampConversion(CustomClusterTestSuite): - '''Hive writes timestamps in parquet files by first converting values from local time + '''Hive writes timestamps in Parquet files by first converting values from local time to UTC. The conversion was not expected (other file formats don't convert) and a - startup flag was later added to adjust for this (IMPALA-1658). This file tests that - the conversion and flag behave as expected. + startup flag (-convert_legacy_hive_parquet_utc_timestamps) was later added to adjust + for this (IMPALA-1658). IMPALA-2716 solves the issue in a more general way by + introducing a table property ('parquet.mr.int96.write.zone') that specifies the time + zone to convert the timestamp values to. + + This file tests that the table property and the startup option behave as expected in + the following scenarios: + 1. If the 'parquet.mr.int96.write.zone' table property is set, Impala ignores the + -convert_legacy_hive_parquet_utc_timestamps startup option. It reads Parquet + timestamp data written by Hive and adjusts values using the time zone from the + table property. + 2. If the 'parquet.mr.int96.write.zone' table property is not set, the + -convert_legacy_hive_parquet_utc_timestamps startup option is taken into account. + a. If the startup option is set to true, Impala reads Parquet timestamp data + created by Hive and adjusts values using the local time zone. + b. If the startup option is absent or set to false, no adjustment will be applied + to timestamp values. + + IMPALA-2716 also introduces a startup option + (-set_parquet_mr_int96_write_zone_to_utc_on_new_tables) that determines if the table + property will be set on newly created tables. This file tests the basic behavior of the + startup option: + 1. Tables created with the 'parquet.mr.int96.write.zone' table property explicitly + set, will keep the value the property is set to. + 2. If -set_parquet_mr_int96_write_zone_to_utc_on_new_tables is set to true, tables + created using CREATE TABLE, CREATE TABLE AS SELECT and CREATE TABLE LIKE + will set the table property to UTC. + 3. Tables created using CREATE TABLE LIKE will ignore the value of + -set_parquet_mr_int96_write_zone_to_utc_on_new_tables and copy the property of + the table that is copied. ''' @classmethod @@ -36,11 +65,12 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite): v.get_value('table_format').file_format == 'parquet' and v.get_value('table_format').compression_codec == 'none') - def check_sanity(self, expect_converted_result): + def check_sanity(self, expect_converted_result, + tbl_name='functional_parquet.alltypesagg_hive_13_1'): data = self.execute_query_expect_success(self.client, """ SELECT COUNT(timestamp_col), COUNT(DISTINCT timestamp_col), MIN(timestamp_col), MAX(timestamp_col) - FROM functional_parquet.alltypesagg_hive_13_1""")\ + FROM {0}""".format(tbl_name))\ .get_data() assert len(data) > 0 rows = data.split("\n") @@ -58,22 +88,63 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite): assert values[2] == "2010-01-01 00:00:00" assert values[3] == "2010-01-10 18:02:05.100000000" + def get_parquet_mr_write_zone_tbl_prop(self, + tbl_name='functional_parquet.alltypesagg_hive_13_1'): + tbl_prop = self.get_table_properties(tbl_name) + if 'parquet.mr.int96.write.zone' not in tbl_prop: + return None + return tbl_prop['parquet.mr.int96.write.zone'] + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true") - def test_conversion(self, vector): + def test_conversion_to_tbl_prop_timezone(self, vector, unique_database): + # Create table with 'parquet.mr.int96.write.zone' property set to China Standard Time. + # The undelying parquet file has been written by Hive. + hive_tbl = '%s.hive_tbl' % unique_database + parquet_loc = get_fs_path('/test-warehouse/alltypesagg_hive_13_1_parquet') + parquet_path = get_fs_path( + '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet') + self.client.execute('''CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}" + STORED AS PARQUET LOCATION "{2}" + TBLPROPERTIES ('parquet.mr.int96.write.zone'='China Standard Time') + '''.format(hive_tbl, parquet_path, parquet_loc)) + # Make sure that the table property has been properly set. + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=hive_tbl) ==\ + 'China Standard Time' + # Even though -convert_legacy_hive_parquet_utc_timestamps is set to true it is ignored + # because the 'parquet.mr.int06.write.zone' table property is also set. The value read + # from the Hive table should be the same as the corresponding Impala timestamp value + # converted from UTC to China Standard Time. + self.check_sanity(True, tbl_name=hive_tbl) + data = self.execute_query_expect_success(self.client, """ + SELECT h.id, h.day, h.timestamp_col, i.timestamp_col + FROM {0} h + JOIN functional_parquet.alltypesagg i + ON i.id = h.id AND i.day = h.day -- serves as a unique key + WHERE + (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL) + OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, 'China Standard Time') + """.format(hive_tbl))\ + .get_data() + assert len(data) == 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true") + def test_conversion_to_localtime(self, vector): tz_name = time.tzname[time.localtime().tm_isdst] self.check_sanity(tz_name not in ("UTC", "GMT")) + # Make sure that the table property is not set + assert self.get_parquet_mr_write_zone_tbl_prop() == None # The value read from the Hive table should be the same as reading a UTC converted # value from the Impala table. tz_name = time.tzname[time.localtime().tm_isdst] data = self.execute_query_expect_success(self.client, """ SELECT h.id, h.day, h.timestamp_col, i.timestamp_col FROM functional_parquet.alltypesagg_hive_13_1 h - JOIN functional_parquet.alltypesagg - i ON i.id = h.id AND i.day = h.day -- serves as a unique key + JOIN functional_parquet.alltypesagg i + ON i.id = h.id AND i.day = h.day -- serves as a unique key WHERE - (h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL) - OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL) + (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL) OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s') """ % tz_name)\ .get_data() @@ -83,13 +154,15 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite): @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=false") def test_no_conversion(self, vector): self.check_sanity(False) + # Make sure that the table property is not set + assert self.get_parquet_mr_write_zone_tbl_prop() == None # Without conversion all the values will be different. tz_name = time.tzname[time.localtime().tm_isdst] data = self.execute_query_expect_success(self.client, """ SELECT h.id, h.day, h.timestamp_col, i.timestamp_col FROM functional_parquet.alltypesagg_hive_13_1 h - JOIN functional_parquet.alltypesagg - i ON i.id = h.id AND i.day = h.day -- serves as a unique key + JOIN functional_parquet.alltypesagg i + ON i.id = h.id AND i.day = h.day -- serves as a unique key WHERE h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s') """ % tz_name)\ .get_data() @@ -101,11 +174,76 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite): data = self.execute_query_expect_success(self.client, """ SELECT h.id, h.day, h.timestamp_col, i.timestamp_col FROM functional_parquet.alltypesagg_hive_13_1 h - JOIN functional_parquet.alltypesagg - i ON i.id = h.id AND i.day = h.day -- serves as a unique key + JOIN functional_parquet.alltypesagg i + ON i.id = h.id AND i.day = h.day -- serves as a unique key WHERE - (h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL) - OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL) + (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL) """)\ .get_data() assert len(data) == 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + "-set_parquet_mr_int96_write_zone_to_utc_on_new_tables=true") + def test_new_table_enable_set_tbl_prop_to_utc(self, unique_database): + # Table created with CREATE TABLE will set the table property to UTC. + tbl1_name = '%s.table1' % unique_database + self.client.execute('CREATE TABLE {0} (id int)'.format(tbl1_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl1_name) == 'UTC' + # Table created with CREATE TABLE will honor the explicitly set property. + tbl_est_name = '%s.table_est' % unique_database + self.client.execute('''CREATE TABLE {0} (id int) + TBLPROPERTIES ('parquet.mr.int96.write.zone'='EST') + '''.format(tbl_est_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl_est_name) == 'EST' + # Table created with CREATE TABLE AS SELECT will set the table property to UTC. Table + # property is not copied from the other table. + tbl2_name = '%s.table2' % unique_database + self.client.execute('CREATE TABLE {0} AS SELECT * FROM {1}'.format( + tbl2_name, tbl_est_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl2_name) == 'UTC' + # Table created with CREATE TABLE LIKE will set the table property to UTC. + tbl3_name = '%s.tbl3_name' % unique_database + parquet_path = get_fs_path( + '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet') + self.client.execute('CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}"'.format( + tbl3_name, parquet_path)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl3_name) == 'UTC' + # Table created with CREATE TABLE LIKE will copy the property from the + # other table. + tbl4_name = '%s.tbl4_name' % unique_database + self.client.execute('CREATE TABLE {0} LIKE {1}'.format(tbl4_name, tbl_est_name)); + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl4_name) == 'EST' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + "-set_parquet_mr_int96_write_zone_to_utc_on_new_tables=false") + def test_new_table_disable_set_tbl_prop_to_utc(self, unique_database): + # Table created with CREATE TABLE will not set the table property. + tbl1_name = '%s.table1' % unique_database + self.client.execute('CREATE TABLE {0} (id int)'.format(tbl1_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl1_name) == None + # Table created with CREATE TABLE will honor the explicitly set property. + tbl_est_name = '%s.table_est' % unique_database + self.client.execute('''CREATE TABLE {0} (id int) + TBLPROPERTIES ('parquet.mr.int96.write.zone'='EST') + '''.format(tbl_est_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl_est_name) == 'EST' + # Table created with CREATE TABLE AS SELECT will not set the table property. Table + # property is not copied from the other table. + tbl2_name = '%s.table2' % unique_database + self.client.execute('CREATE TABLE {0} AS SELECT * FROM {1}'.format( + tbl2_name, tbl_est_name)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl2_name) == None + # Table created with CREATE TABLE LIKE will not set the table property. + tbl3_name = '%s.tbl3_name' % unique_database + parquet_path = get_fs_path( + '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet') + self.client.execute('CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}"'.format( + tbl3_name, parquet_path)) + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl3_name) == None + # Table created with CREATE TABLE LIKE will copy the property from the + # other table. + tbl4_name = '%s.tbl4_name' % unique_database + self.client.execute('CREATE TABLE {0} LIKE {1}'.format(tbl4_name, tbl_est_name)); + assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl4_name) == 'EST' diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 37ca7cd8f..2dc950bd2 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -368,7 +368,7 @@ class TestDdlStatements(TestDdlBase): self.client.execute("""create table {0} (i int) with serdeproperties ('s1'='s2', 's3'='s4') tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name)) - properties = self._get_tbl_properties(fq_tbl_name) + properties = self.get_table_properties(fq_tbl_name) assert len(properties) == 2 # The transient_lastDdlTime is variable, so don't verify the value. @@ -376,19 +376,19 @@ class TestDdlStatements(TestDdlBase): del properties['transient_lastDdlTime'] assert {'p1': 'v1'} == properties - properties = self._get_serde_properties(fq_tbl_name) + properties = self.get_serde_properties(fq_tbl_name) assert {'s1': 's2', 's3': 's4'} == properties # Modify the SERDEPROPERTIES using ALTER TABLE SET. self.client.execute("alter table {0} set serdeproperties " "('s1'='new', 's5'='s6')".format(fq_tbl_name)) - properties = self._get_serde_properties(fq_tbl_name) + properties = self.get_serde_properties(fq_tbl_name) assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties # Modify the TBLPROPERTIES using ALTER TABLE SET. self.client.execute("alter table {0} set tblproperties " "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name)) - properties = self._get_tbl_properties(fq_tbl_name) + properties = self.get_table_properties(fq_tbl_name) assert 'transient_lastDdlTime' in properties assert properties['p1'] == 'v1' diff --git a/tests/metadata/test_ddl_base.py b/tests/metadata/test_ddl_base.py index 3044ef015..acda99f56 100644 --- a/tests/metadata/test_ddl_base.py +++ b/tests/metadata/test_ddl_base.py @@ -64,26 +64,3 @@ class TestDdlBase(ImpalaTestSuite): db_name, comment, WAREHOUSE) impala_client.execute(ddl) impala_client.close() - - def _get_tbl_properties(self, table_name): - """Extracts the table properties mapping from the output of DESCRIBE FORMATTED""" - return self._get_properties('Table Parameters:', table_name) - - def _get_serde_properties(self, table_name): - """Extracts the serde properties mapping from the output of DESCRIBE FORMATTED""" - return self._get_properties('Storage Desc Params:', table_name) - - def _get_properties(self, section_name, table_name): - """Extracts the table properties mapping from the output of DESCRIBE FORMATTED""" - result = self.client.execute("describe formatted " + table_name) - match = False - properties = dict(); - for row in result.data: - if section_name in row: - match = True - elif match: - row = row.split('\t') - if (row[1] == 'NULL'): - break - properties[row[1].rstrip()] = row[2].rstrip() - return properties diff --git a/tests/query_test/test_parquet_timestamp_compatibility.py b/tests/query_test/test_parquet_timestamp_compatibility.py new file mode 100644 index 000000000..37e6398da --- /dev/null +++ b/tests/query_test/test_parquet_timestamp_compatibility.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import pytest +import time + +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.util.filesystem_utils import WAREHOUSE, get_fs_path + +class TestParquetTimestampCompatibility(ImpalaTestSuite): + '''Hive adjusts timestamps by subtracting the local time zone's offset from all values + when writing data to Parquet files. As a result of this adjustment, Impala may read + "incorrect" timestamp values from Parquet files written by Hive. To fix the problem + a table property ('parquet.mr.int96.write.zone') was introduced in IMPALA-2716 that + specifies the time zone to convert the timesamp values to. + + This file tests the following scenarios: + 1. If the 'parquet.mr.int96.write.zone' table property is set to an invalid time zone + (by Hive), Impala throws an error when analyzing a query against the table. + 2. If the 'parquet.mr.int96.write.zone' table property is set to a valid time zone: + a. Impala adjusts timestamp values read from Parquet files created by Hive using + the time zone from the table property. + b. Impala does not adjust timestamp values read from Parquet files created by + Impala. + ''' + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestParquetTimestampCompatibility, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet' and + v.get_value('table_format').compression_codec == 'none') + + def _setup_env(self, hive_tbl_name, impala_tbl_name=None): + parquet_loc = get_fs_path('/test-warehouse/alltypesagg_hive_13_1_parquet') + parquet_fn = get_fs_path( + '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet') + self.client.execute('''CREATE EXTERNAL TABLE {0} + LIKE PARQUET "{1}" + STORED AS PARQUET LOCATION "{2}" + '''.format(hive_tbl_name, parquet_fn, parquet_loc)) + if impala_tbl_name: + self.client.execute('''CREATE TABLE {0} + STORED AS PARQUET AS + SELECT * FROM {1} + '''.format(impala_tbl_name, hive_tbl_name)) + + def _set_tbl_timezone(self, tbl_name, tz_name): + self.client.execute('''ALTER TABLE {0} + SET TBLPROPERTIES ('parquet.mr.int96.write.zone'='{1}') + '''.format(tbl_name, tz_name)) + + def _get_parquet_mr_write_zone_tbl_prop(self, tbl_name): + tbl_prop = self.get_table_properties(tbl_name) + if 'parquet.mr.int96.write.zone' not in tbl_prop: + return None + return tbl_prop['parquet.mr.int96.write.zone'] + + def test_invalid_parquet_mr_write_zone(self, vector, unique_database): + # Hive doesn't allow setting 'parquet.mr.int96.write.zone' table property to an + # invalid time zone anymore. + pytest.skip() + + hive_tbl_name = '%s.hive_table' % unique_database + self._setup_env(hive_tbl_name) + # Hive sets the table property to an invalid time zone + self.run_stmt_in_hive('''ALTER TABLE {0} + SET TBLPROPERTIES ('parquet.mr.int96.write.zone'='garbage') + '''.format(hive_tbl_name)) + self.client.execute('REFRESH %s' % hive_tbl_name) + # Impala throws an error when the table is queried + try: + self.client.execute('SELECT timestamp_col FROM %s' % hive_tbl_name) + except ImpalaBeeswaxException, e: + if "Invalid time zone" not in str(e): + raise e + else: + assert False, "Query was expected to fail" + + def test_parquet_timestamp_conversion(self, vector, unique_database): + hive_tbl_name = '%s.hive_table' % unique_database + impala_tbl_name = '%s.impala_table' % unique_database + self._setup_env(hive_tbl_name, impala_tbl_name) + for tz_name in ['UTC', 'EST', 'China Standard Time', 'CET']: + # impala_table's underlying Parquet file was written by Impala. No conversion is + # performed on the timestamp values, no matter what value + # 'parquet.mr.int96.write.zone' is set to. + self._set_tbl_timezone(impala_tbl_name, tz_name) + data = self.execute_query_expect_success(self.client, """ + SELECT i2.id, i2.day, i2.timestamp_col, i1.timestamp_col + FROM functional.alltypesagg i1 + JOIN {0} i2 + ON i1.id = i2.id AND i1.day = i2.day -- serves as a unique key + WHERE + (i1.timestamp_col IS NULL) != (i2.timestamp_col IS NULL) + OR i1.timestamp_col != i2.timestamp_col + """.format(impala_tbl_name))\ + .get_data() + assert len(data) == 0 + assert self._get_parquet_mr_write_zone_tbl_prop(impala_tbl_name) == tz_name + # hive_table's underlying Parquet file was written by Hive. Setting the + # 'parquet.mr.int96.write.zone' table property to tz_name triggers a 'UTC' -> + # tz_name conversion on the timestamp values. + self._set_tbl_timezone(hive_tbl_name, tz_name) + data = self.execute_query_expect_success(self.client, """ + SELECT h.id, h.day, h.timestamp_col, i.timestamp_col + FROM functional.alltypesagg i + JOIN {0} h + ON i.id = h.id AND i.day = h.day -- serves as a unique key + WHERE + (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL) + OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '{1}') + """.format(hive_tbl_name, tz_name))\ + .get_data() + assert len(data) == 0 + assert self._get_parquet_mr_write_zone_tbl_prop(hive_tbl_name) == tz_name