IMPALA-12426: Workload Management Supporting Changes

Contains several disparate pieces of functionality that support the
overall workload management work.

1. TNetworkAddressComparator
  An unused existing comparator for the Thrift class TNetworkAddress
  already existed in the thrift-util.h file. This comparator has been
  moved to the network-util.h file where it now resides in the same
  place as other utility functions that operator on TNetworkAddress
  instances.

  The existing comparator did not consider the uds address. It only
  considered hostname and port. The new comparator considers all three.

  Testing is accomplished by porting the existing ctests and adding
  additional ctests.

2. StringStreamPop
  This new class extends a std::basic_stringstream<char> to add a
  function that enables removing a character from the end.

  Testing is accomplished using new ctests.

3. Ticker
  This new header-only class notifies a condition variable at periodic
  intervals. It is a lightweight that sleeps until the configured
  duration has passed at which point it wakes up and notifies the
  condition variable. It also enables consumers to offload spurious
  wakeup guards to this class.

  Ctests have been added to test the functionality of this new class.

4. TUniqueId Empty Utility Function
  A new function UUIDEmpty returns true if a provided TUniqueID does
  not contain a UUID or false otherwise.

  Ctests have been added to test this new function.

5. run_clang_tidy.sh
  Additional informational outputs have been added to this script to
  enable tracking the status of the script and to easily identify
  errors found by clang tidy.

6. Summary Util Test
  A ctest was developed for testing the text table generation code that
  generates the exec summary portion of the query profile. This ctest
  was developed as part of an idea that did not ultimately pan out.
  Rather than throwing away that test code, it has been added as a new
  ctest.

Change-Id: Iee23334ec56a18b192a75d052468bf59159b6424
Reviewed-on: http://gerrit.cloudera.org:8080/21048
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
jasonmfehr
2024-02-21 13:41:08 -08:00
committed by Michael Smith
parent 0c0a3fff39
commit 97cd30c607
16 changed files with 647 additions and 28 deletions

View File

@@ -129,18 +129,4 @@ TEST(ThriftUtil, SerDeBuffer100MB) {
}
}
TEST(ThriftUtil, TNetworkAddressComparator) {
EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
MakeNetworkAddress("zzzz", 500)));
EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("zzzz", 500),
MakeNetworkAddress("aaaa", 500)));
EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
MakeNetworkAddress("aaaa", 501)));
EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 501),
MakeNetworkAddress("aaaa", 500)));
EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
MakeNetworkAddress("aaaa", 500)));
}
}

View File

@@ -253,13 +253,6 @@ void PrintTColumnValue(std::ostream& out, const TColumnValue& colval) {
}
}
bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b) {
int cmp = a.hostname.compare(b.hostname);
if (cmp < 0) return true;
if (cmp == 0) return a.port < b.port;
return false;
}
bool IsReadTimeoutTException(const TTransportException& e) {
// String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
// Specifically, "THRIFT_EAGAIN (timed out)" from TSocket.cpp,

View File

@@ -175,10 +175,6 @@ Status WaitForServer(const std::string& host, int port, int num_retries,
/// Print a TColumnValue. If null, print "NULL".
void PrintTColumnValue(std::ostream& out, const TColumnValue& colval);
/// Compares two TNetworkAddresses alphanumerically by their host:port
/// string representation
bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b);
/// Returns true if the TTransportException corresponds to a TCP socket read timeout.
bool IsReadTimeoutTException(const apache::thrift::transport::TTransportException& e);

View File

@@ -191,6 +191,7 @@ add_library(UtilTests STATIC
lru-multi-cache-test.cc
metrics-test.cc
min-max-filter-test.cc
network-util-test.cc
openssl-util-test.cc
os-info-test.cc
os-util-test.cc
@@ -208,11 +209,13 @@ add_library(UtilTests STATIC
simple-logger-test.cc
string-parser-test.cc
string-util-test.cc
summary-util-test.cc
symbols-util-test.cc
sys-info-test.cc
system-state-info-test.cc
tagged-ptr-test.cc
thread-pool-test.cc
ticker-test.cc
time-test.cc
tuple-row-compare-test.cc
uid-util-test.cc
@@ -250,6 +253,7 @@ ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
# minidump-test is flaky when the jvm pause monitor is running. So it can't be unified.
ADD_BE_LSAN_TEST(minidump-test)
ADD_UNIFIED_BE_LSAN_TEST(network-util-test "NetworkUtil.*")
ADD_UNIFIED_BE_LSAN_TEST(openssl-util-test "OpenSSLUtilTest.*")
ADD_UNIFIED_BE_LSAN_TEST(os-info-test "OsInfo.*")
ADD_UNIFIED_BE_LSAN_TEST(os-util-test "OsUtil.*")
@@ -267,11 +271,13 @@ ADD_UNIFIED_BE_LSAN_TEST(rle-test "BitArray.*:RleTest.*")
ADD_UNIFIED_BE_LSAN_TEST(runtime-profile-test "CountersTest.*:TimerCounterTest.*:TimeSeriesCounterTest.*:VariousNumbers/TimeSeriesCounterResampleTest.*:ToThrift.*:ToJson.*")
ADD_UNIFIED_BE_LSAN_TEST(simple-logger-test "SimpleLoggerTest.*")
ADD_UNIFIED_BE_LSAN_TEST(string-parser-test "StringToInt.*:StringToIntWithBase.*:StringToFloat.*:StringToBool.*:StringToDate.*")
ADD_UNIFIED_BE_LSAN_TEST(string-util-test "TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*")
ADD_UNIFIED_BE_LSAN_TEST(string-util-test "TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*:StringStreamPopTest.*")
ADD_UNIFIED_BE_LSAN_TEST(summary-util-test "PrintTableTest.*")
ADD_UNIFIED_BE_LSAN_TEST(symbols-util-test "SymbolsUtil.*")
ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")

View File

@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "gen-cpp/Types_types.h"
#include "testutil/gtest-util.h"
#include "util/network-util.h"
namespace impala {
// NetAddrComp Tests: These tests assert the TNetworkAddressComparator sorts two
// TNetworkAddress objects correctly based on their host, port, and uds address fields.
// Assert where host fields are different.
TEST(NetworkUtil, NetAddrCompHostnameDiff) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;
first.__set_hostname("aaaa");
first.__set_uds_address("uds");
first.__set_port(0);
second.__set_hostname("bbbb");
second.__set_uds_address("uds");
second.__set_port(0);
ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
}
// Assert where host fields are equal but port is different.
TEST(NetworkUtil, NetAddrCompPortDiff) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;
first.__set_hostname("host");
first.__set_port(0);
first.__set_uds_address("");
second.__set_hostname("host");
second.__set_port(1);
second.__set_uds_address("");
ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
}
// Assert where host and port fields are equal but uds address is different.
TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;
first.__set_hostname("host");
first.__set_port(0);
first.__set_uds_address("aaaa");
second.__set_hostname("host");
second.__set_port(0);
second.__set_uds_address("bbbb");
ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
}
// Assert where all three comparison fields are equal.
TEST(NetworkUtil, NetAddrUDSAddrSame) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;
first.__set_hostname("host");
first.__set_port(0);
first.__set_uds_address("uds");
second.__set_hostname("host");
second.__set_port(0);
second.__set_uds_address("uds");
ASSERT_FALSE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
}
} // namespace impala

View File

@@ -252,6 +252,27 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address) {
return address_pb;
}
bool TNetworkAddressComparator::operator()(const TNetworkAddress& a,
const TNetworkAddress& b) const {
const int host_compare = a.hostname.compare(b.hostname);
if (host_compare < 0) {
return true;
} else if(host_compare > 0) {
return false;
}
// Hostnames were the same, compare on port
if (a.port < b.port) {
return true;
} else if (a.port > b.port) {
return false;
}
// Hostnames and ports were the same, compare on uds address.
return a.uds_address.compare(b.uds_address) < 0;
}
/// Pick a random port in the range of ephemeral ports
/// https://tools.ietf.org/html/rfc6335
int FindUnusedEphemeralPort() {

View File

@@ -99,6 +99,12 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address);
Status NetworkAddressPBToSockaddr(
const NetworkAddressPB& address, bool use_uds, kudu::Sockaddr* sockaddr);
/// Custom comparator to sort network addresses first by host (alphabetically) and then by
/// by port (numerically) and finally by uds address (alphabetically).
struct TNetworkAddressComparator {
bool operator()(const TNetworkAddress& a, const TNetworkAddress& b) const;
};
/// Returns a ephemeral port that is currently unused. Returns -1 on an error or if
/// a free ephemeral port can't be found after 100 tries.
int FindUnusedEphemeralPort();

View File

@@ -269,5 +269,78 @@ TEST(RandomFindUtf8PosTest, Basic) {
}
}
// StringStreamPopTest: These tests assert the functionality of the StringStreamPop class.
// Assert the most common use case where the last character is popped and a new character
// is written to the stream.
TEST(StringStreamPopTest, NotEmptyPopOnce) {
StringStreamPop fixture;
fixture << "this is a tes,";
fixture.move_back();
fixture << "t";
EXPECT_EQ("this is a test", fixture.str());
}
// Asssert where the stream only contains a single character that is popped before another
// character is written to the stream.
TEST(StringStreamPopTest, OneCharPop) {
StringStreamPop fixture;
fixture << "t";
fixture.move_back();
fixture << "v";
EXPECT_EQ("v", fixture.str());
}
// Assert where the last two characters of a non-empty stream are popped.
TEST(StringStreamPopTest, NotEmptyPopTwice) {
StringStreamPop fixture;
fixture << "this is a second te,,";
fixture.move_back();
fixture.move_back();
fixture << "st";
EXPECT_EQ("this is a second test", fixture.str());
}
// Assert where an empty stream has it's last (nonexistant) character popped.
TEST(StringStreamPopTest, EmptyPopOnce) {
StringStreamPop fixture;
fixture.move_back();
EXPECT_TRUE(fixture.str().empty());
}
// Assert where an empty stream has it's last (nonexistant) character popped twice.
TEST(StringStreamPopTest, EmptyPopTwice) {
StringStreamPop fixture;
fixture.move_back();
fixture.move_back();
EXPECT_TRUE(fixture.str().empty());
}
// Assert the move_back functionality does not actually remove the character.
TEST(StringStreamPopTest, PopOnceBeforeAppend) {
StringStreamPop fixture;
fixture.move_back();
fixture << "a";
fixture.move_back();
// This assertion is correct because the move_back() function only moves the write
// pointer, it does not modify the internal buffer.
EXPECT_EQ("a", fixture.str());
}
// Assert the StringStreamPop class behavior matches the behavior of the stringstream
// class.
TEST(StringStreamPopTest, CompareWithStringstream) {
StringStreamPop fixture;
stringstream expected;
expected << "C++ is" << " an " << "invisible found" << "ation of " << "everything!";
fixture << "C++ is" << " an " << "invisible found" << "ation of " << "everything?";
fixture.move_back();
fixture << '!';
EXPECT_EQ(expected.str(), fixture.str());
}
}

View File

@@ -143,4 +143,11 @@ int FindUtf8PosBackward(const uint8_t* ptr, const int len, int index) {
DCHECK_EQ(pos, -1);
return -1;
}
void StringStreamPop::move_back() {
if (tellp() > 0) {
seekp(-1, std::ios_base::cur);
}
}
}

View File

@@ -93,6 +93,19 @@ int FindUtf8PosBackward(const uint8_t* str_ptr, const int str_len, const int ind
inline int FindUtf8PosBackward(const char* str_ptr, const int str_len, const int index) {
return FindUtf8PosBackward(reinterpret_cast<const uint8_t*>(str_ptr), str_len, index);
}
}
/// Subclass of std::stringstream that adds functionality to allow overwriting the very
/// last character of the stream. The purpose of this additional functionality is to
/// enable comma delimited string building where the last instance of the comma needs to
/// be removed (for example when building a list of columns in a sql statement).
class StringStreamPop : public std::basic_stringstream<char> {
public:
/// Directly modifies the underlying stream buffer seeking it backwards 1 position.
/// Then, when additional characters are written, the character at the end of the stream
/// is overwritten. Thus, to truly remove the character at the end of the stream
/// requires writing at least one character to the stream after this function is called.
void move_back();
};
}
#endif

View File

@@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "testutil/gtest-util.h"
#include "gen-cpp/ExecStats_types.h"
#include "util/summary-util.h"
namespace impala {
static const string expected_str = "\n"
"Operator #Hosts #Inst Avg Time Max Time #Rows Est. #Rows Peak Mem "
" Est. Peak Mem Detail \n"
"----------------------------------------------------------------------------------------"
"----------------------------------------------------------\n"
"F01:ROOT 1 1 857.074us 857.074us 4.01 MB "
" 4.00 MB \n"
"01:EXCHANGE 1 1 269.934us 269.934us 99 7 88.00 KB "
" 16.00 KB UNPARTITIONED \n"
"F00:EXCHANGE SENDER 3 3 332.506us 338.449us 7.95 KB "
" 96.00 KB \n"
"00:SCAN HDFS 3 3 1s328ms 1s331ms 99 7 360.00 KB "
" 64.00 MB default.test_query_log_beeswax_1707938440 ";
static TExecStats buildExecStats(int64_t latency, int64_t mem_used,
int64_t cardinality) {
TExecStats stat;
if (latency > -1) {
stat.__set_latency_ns(latency);
}
stat.__set_memory_used(mem_used);
stat.__set_cardinality(cardinality);
return stat;
}
static TPlanNodeExecSummary buildPlanNode(TPlanNodeId node_id, TFragmentIdx fragment_idx,
string label, string detail, int32_t num_hosts, int32_t num_children,
bool is_broadcast, TExecStats estimates) {
TPlanNodeExecSummary node;
node.__set_node_id(node_id);
node.__set_fragment_idx(fragment_idx);
node.__set_label(label);
node.__set_label_detail(detail);
node.__set_num_children(num_children);
node.__set_estimated_stats(estimates);
if (is_broadcast) {
node.__set_is_broadcast(is_broadcast);
}
node.__set_num_hosts(num_hosts);
return node;
}
// Constructs a simple exec summary and ensures the text table is generated correctly for
// that exec summary.
TEST(PrintTableTest, HappyPath) {
TExecSummary input;
TPlanNodeExecSummary node = buildPlanNode(-1, 0, "F01:ROOT", "", 1, 1,
false, buildExecStats(-1, 4194304, -1));
node.exec_stats.push_back(buildExecStats(857074, 4202496, -1));
node.__isset.exec_stats = true;
input.nodes.push_back(node);
node = buildPlanNode(1, 0, "01:EXCHANGE", "UNPARTITIONED", 1, 0,
true, buildExecStats(-1, 16384, 7));
node.exec_stats.push_back(buildExecStats(269934, 90112, 99));
node.__isset.exec_stats = true;
input.nodes.push_back(node);
node = buildPlanNode(-1, 1, "F00:EXCHANGE SENDER", "", 3, 1,
false, buildExecStats(-1, 98304, -1));
node.exec_stats.push_back(buildExecStats(338449, 6862, -1));
node.exec_stats.push_back(buildExecStats(333098, 8139, -1));
node.exec_stats.push_back(buildExecStats(325971, 8139, -1));
node.__isset.exec_stats = true;
input.nodes.push_back(node);
node = buildPlanNode(0, 1, "00:SCAN HDFS",
"default.test_query_log_beeswax_1707938440", 3, 0, false,
buildExecStats(-1, 67108864, 7));
node.exec_stats.push_back(buildExecStats(1331010710, 368640, 39));
node.exec_stats.push_back(buildExecStats(1326558546, 279552, 30));
node.exec_stats.push_back(buildExecStats(1327758097, 283648, 30));
node.__isset.exec_stats = true;
input.nodes.push_back(node);
input.__isset.nodes = true;
input.exch_to_sender_map.emplace(1, 2);
input.__isset.exch_to_sender_map = true;
string actual = PrintExecSummary(input);
EXPECT_EQ(expected_str, actual);
}
} // namespace impala

133
be/src/util/ticker-test.cc Normal file
View File

@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include "testutil/gtest-util.h"
#include "common/status.h"
#include "util/stopwatch.h"
#include "util/ticker.h"
using namespace std;
namespace impala {
static inline float NsToMs(int64_t nanos) {
return static_cast<float>(nanos / 1000000);
}
TEST(TickerTest, TickerSecondsBoolHappyPath) {
condition_variable cv;
mutex mu;
uint8_t cntr = 0;
TickerSecondsBool fixture(1, cv, mu);
MonotonicStopWatch sw;
sw.Start();
ABORT_IF_ERROR(fixture.Start("category", "tickersecondsbool-happy-path"));
while (cntr < 3) {
unique_lock<mutex> l(mu);
cv.wait(l, fixture.WakeupGuard());
fixture.ResetWakeupGuard();
cntr++;
}
sw.Stop();
fixture.RequestStop();
fixture.Join();
EXPECT_EQ(cntr, 3);
// Include a 30 millisecond (1%) margin of error to tolerate differences in the
// precision of time measurements.
EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(3000), 30);
}
TEST(TickerTest, GenericTickerHappyPath) {
condition_variable cv;
mutex mu;
shared_ptr<string> wakeup_guard = make_shared<string>();
uint8_t cntr = 0;
const string wakeup_val = "wakeup";
Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
wakeup_guard, wakeup_val);
MonotonicStopWatch sw;
sw.Start();
ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
while (cntr < 10) {
unique_lock<mutex> l(mu);
cv.wait(l, fixture.WakeupGuard());
*wakeup_guard = "";
cntr++;
}
sw.Stop();
fixture.RequestStop();
fixture.Join();
EXPECT_EQ(cntr, 10);
// Include a 5 millisecond (1%) margin of error to tolerate differences in the
// precision of time measurements.
EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(50), 5);
}
// Tests the case where the wakeup guard is not reset by the consuming code.
TEST(TickerTest, GenericTickerNoWakeupGuardReset) {
condition_variable cv;
mutex mu;
shared_ptr<string> wakeup_guard = make_shared<string>();
uint8_t cntr = 0;
const string wakeup_val = "wakeup";
Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
wakeup_guard, wakeup_val);
MonotonicStopWatch sw;
sw.Start();
ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
while (cntr < 10) {
unique_lock<mutex> l(mu);
cv.wait(l, fixture.WakeupGuard());
// No wakeup guard reset here.
cntr++;
}
sw.Stop();
fixture.RequestStop();
fixture.Join();
EXPECT_EQ(cntr, 10);
// If the wakeup guard was set properly, elapsed time would be 50 milliseconds. Since
// the wakeup guard does not get set, spurious wakeups of the condition variable happen
// much more frequently than they should.
EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(5), 5);
}
} // namespace impala

143
be/src/util/ticker.h Normal file
View File

@@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/bind.hpp>
#include "common/atomic.h"
#include "common/status.h"
#include "util/thread.h"
namespace impala {
// Manages a thread that periodically notifies a condition variable. This thread never
// returns. An indicator variable must be specified to guard against spurious wakeups.
//
// Immediately before this class notfies the condition variable, it sets the indicator
// variable to the `wakeup_value` specified in the constructor. It is the responsibility
// of the thread consuming this class to reset the indicator variable to a value other
// than `wakeup_value` before the consuming thread goes to sleep.
//
// If the periodic code takes longer to run than the specified duration, then the code
// will immediately execute the next time around.
//
// Internally, this class uses std::this_thread:sleep_for which may sleep for longer than
// the specified duration due to scheduling or resource contention delays.
// For details, see https://en.cppreference.com/w/cpp/thread/sleep_for.
//
// Example usage:
//
// #include <chrono>
// #include <condition_variable>
// #include <memory>
// #include <mutex>
//
// #include "common/status.h"
//
// std::condition_variable cv;
// std::mutex mu;
// std::shared_ptr<bool> wakeup_guard = make_shared<bool>();
// Ticker<std::chrono::seconds, bool> ticker(std::chrono::seconds(30), cv, mu,
// wakeup_guard, true);
//
// ABORT_IF_ERROR(ticker.Start());
//
// while(true) {
// unique_lock<mutex> l(mu);
// cv.wait(l, ticker.WakeupGuard());
// *wakeup_guard = false;
//
// run_my_code();
// }
template <typename DurationType, typename IndicatorType>
class Ticker {
public:
Ticker(DurationType interval, std::condition_variable& cv,
std::mutex& lock, std::shared_ptr<IndicatorType> indicator,
IndicatorType wakeup_value) : interval_(interval), cv_(cv), lock_(lock),
indicator_(indicator), wakeup_value_(wakeup_value) {}
Status Start(const std::string& category, const std::string& name) {
return Thread::Create(category, name, &Ticker::run, this, &my_thread_);
}
// Specify that the next iteration of this ticker be the last. This function does not
// block nor does it cause the ticker to wake up earlier than scheduled.
void RequestStop() {
stop_requested_.Store(true);
}
// Wait for the ticker to exit after it's final iteration.
void Join() {
my_thread_->Join();
}
// Provides a default implementation for the condition variable predicate lambda.
std::function<bool()> WakeupGuard() {
return [this]{ return *indicator_ == wakeup_value_; };
}
protected:
const DurationType interval_;
std::condition_variable& cv_;
std::mutex& lock_;
std::shared_ptr<IndicatorType> indicator_;
const IndicatorType wakeup_value_;
private:
std::unique_ptr<Thread> my_thread_;
AtomicBool stop_requested_;
void run() {
while (!stop_requested_.Load()) {
std::this_thread::sleep_for(interval_);
{
std::lock_guard<std::mutex> l(lock_);
*indicator_ = wakeup_value_;
}
cv_.notify_all();
}
}
}; // class Ticker
// Specialization of the Ticker class that uses seconds for the duration and bool as the
// wakeup indicator. The boolean shared_ptr indicator is internally managed. Use the
// ResetWakeupGuard() function in your code immediately after the condition variable wait
// to set the internally managed wakeup guard for the next iteration.
class TickerSecondsBool : public Ticker<std::chrono::seconds, bool> {
public:
TickerSecondsBool(uint32_t interval, std::condition_variable& cv,
std::mutex& lock) :
Ticker(std::chrono::seconds(interval), cv, lock, std::make_shared<bool>(), true) {}
void ResetWakeupGuard() {
*indicator_ = false;
}
}; // class TickerSecondsBool
} // namespace impala

View File

@@ -35,5 +35,28 @@ TEST(UidUtil, FragmentInstanceId) {
}
}
TEST(UidUtil, UuidNotEmpty) {
TUniqueId fixture = GenerateUUID();
EXPECT_FALSE(UUIDEmpty(fixture));
}
TEST(UidUtil, UuidHalfEmptyHi) {
TUniqueId fixture;
fixture.hi = 0;
fixture.lo = 1;
EXPECT_FALSE(UUIDEmpty(fixture));
}
TEST(UidUtil, UuidHalfEmptyLo) {
TUniqueId fixture;
fixture.hi = 1;
fixture.lo = 0;
EXPECT_FALSE(UUIDEmpty(fixture));
}
TEST(UidUtil, UuidEmpty) {
EXPECT_TRUE(UUIDEmpty(TUniqueId()));
}
}

View File

@@ -129,4 +129,9 @@ inline TUniqueId GenerateUUID() {
memcpy(&uid.lo, u.data() + sizeof(int64_t), sizeof(int64_t));
return uid;
}
/// Determines if a query id is empty.
inline bool UUIDEmpty(const TUniqueId& id) {
return id.hi == 0 && id.lo == 0;
}
} // namespace impala

View File

@@ -29,8 +29,8 @@
set -euo pipefail
echo "Compiling"
TMP_BUILDALL_LOG=$(mktemp)
echo "Compiling, for build logs see ${TMP_BUILDALL_LOG}"
if ! ./buildall.sh -skiptests -tidy -so -noclean &> "${TMP_BUILDALL_LOG}"
then
echo "buildall.sh failed, dumping output" >&2
@@ -59,6 +59,7 @@ export PATH="${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/share
:${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/bin/\
:$PATH"
TMP_STDERR=$(mktemp)
echo; echo "Running clang tidy, for error logs see ${TMP_STDERR}"
STRCAT_MESSAGE="Impala-specific note: This can also be fixed using the StrCat() function \
from be/src/gutil/strings strcat.h)"
CLANG_STRING_CONCAT="performance-inefficient-string-concatenation"