diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 548396732..733a90b51 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -118,12 +118,6 @@ QueryState::QueryState( } TQueryOptions& query_options = const_cast(query_ctx_.client_request.query_options); - // max_errors does not indicate how many errors in total have been recorded, but rather - // how many are distinct. It is defined as the sum of the number of generic errors and - // the number of distinct other errors. - if (query_options.max_errors <= 0) { - query_options.max_errors = 100; - } if (query_options.batch_size <= 0) { query_options.__set_batch_size(DEFAULT_BATCH_SIZE); } diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 8a994b35e..d9c83e624 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -59,7 +59,11 @@ using strings::Substitute; -DECLARE_int32(max_errors); +DEFINE_int32(max_error_logs_per_instance, 2000, + "Maximum number of non-fatal error to be logged in log level 1 (INFO). " + "Once this number exceeded, further non-fatal error will be logged at log level 2 " + "(DEBUG) severity. This flag is ignored if user set negative max_errors query " + "option. Default to 2000"); namespace impala { @@ -190,11 +194,37 @@ string RuntimeState::ErrorLog() { bool RuntimeState::LogError(const ErrorMsg& message, int vlog_level) { lock_guard l(error_log_lock_); - // All errors go to the log, unreported_error_count_ is counted independently of the - // size of the error_log to account for errors that were already reported to the - // coordinator - VLOG(vlog_level) << "Error from query " << PrintId(query_id()) << ": " << message.msg(); - if (ErrorCount(error_log_) < query_options().max_errors) { + // All errors go to the log. If the amount of errors logged to vlog level 1 exceed + // or equal max_error_logs_per_instance, then that error will be downgraded to vlog + // level 2. + int user_max_errors = query_options().max_errors; + if (vlog_level == 1 && user_max_errors >= 0 + && vlog_1_errors >= FLAGS_max_error_logs_per_instance) { + vlog_level = 2; + } + + if (VLOG_IS_ON(vlog_level)) { + VLOG(vlog_level) << "Error from query " << PrintId(query_id()) << ": " + << message.msg(); + } + + if (vlog_level == 1 && user_max_errors >= 0) { + vlog_1_errors++; + DCHECK_LE(vlog_1_errors, FLAGS_max_error_logs_per_instance); + if (vlog_1_errors == FLAGS_max_error_logs_per_instance) { + VLOG(vlog_level) << "Query " << PrintId(query_id()) << " printed " + << FLAGS_max_error_logs_per_instance + << " non-fatal error to log level 1 (INFO). Further non-fatal " + << "error will be downgraded to log level 2 (DEBUG)."; + } + } + + TErrorCode::type code = message.error(); + if (ErrorCount(error_log_) < max_errors() + || (code != TErrorCode::GENERAL && error_log_.find(code) != error_log_.end())) { + // Appending general error is expensive since it writes the entire message to the + // error_log_ map. Meanwhile, appending non-general (specific) error that already + // exist in error_log_ is cheap since it only increment count. AppendError(&error_log_, message); return true; } diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 7d2d57bb3..5d00df7ae 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -166,6 +166,15 @@ class RuntimeState { return Status::OK(); } + /// Return maximum number of non-fatal error to report to client through coordinator. + /// max_errors does not indicate how many errors in total have been recorded, but rather + /// how many are distinct. It is defined as the sum of the number of generic errors and + /// the number of distinct other errors. Default to 100 if non-positive number is + /// specified in max_errors query option. + inline int max_errors() const { + return query_options().max_errors <= 0 ? 100 : query_options().max_errors; + } + /// Log an error that will be sent back to the coordinator based on an instance of the /// ErrorMsg class. The runtime state aggregates log messages based on type with one /// exception: messages with the GENERAL type are not aggregated but are kept @@ -318,6 +327,9 @@ class RuntimeState { /// Logs error messages. ErrorLogMap error_log_; + /// Track how many error has been printed to VLOG(1). + int64_t vlog_1_errors = 0; + /// Global QueryState and original thrift descriptors for this fragment instance. QueryState* const query_state_; const TPlanFragment* const fragment_; diff --git a/tests/custom_cluster/test_logging.py b/tests/custom_cluster/test_logging.py new file mode 100644 index 000000000..34cf2578e --- /dev/null +++ b/tests/custom_cluster/test_logging.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + + +class TestLoggingCore(CustomClusterTestSuite): + """Test existence of certain log lines under some scenario.""" + + @classmethod + def get_workload(cls): + return 'functional-query' + + def _test_max_errors(self, max_error_logs_per_instance, max_errors, expect_downgraded): + """Test that number of non-fatal error printed to INFO log is limited by + max_errors and max_error_logs_per_instance.""" + + query = ("select id, bool_col, tinyint_col, smallint_col " + "from functional.alltypeserror order by id") + client = self.create_impala_client() + + self.execute_query_expect_success(client, query, {'max_errors': max_errors}) + self.assert_impalad_log_contains("INFO", "Error parsing row", + max_error_logs_per_instance if expect_downgraded else 8) + self.assert_impalad_log_contains("INFO", + "printed {0} non-fatal error to log level 1".format(max_error_logs_per_instance), + 1 if expect_downgraded else 0) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1, + impalad_args="--max_error_logs_per_instance=2") + def test_max_errors(self): + self._test_max_errors(2, 4, True) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1, + impalad_args="--max_error_logs_per_instance=3") + def test_max_errors_0(self): + self._test_max_errors(3, 0, True) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1, + impalad_args="--max_error_logs_per_instance=2") + def test_max_errors_no_downgrade(self): + self._test_max_errors(2, -1, False)