mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-3741 [part 1]: Upgraded kudu/util for BloomFilter
Ported BlockBloomFilter related source files from Kudu upstream to Impala be/src/kudu/util. The git hash of Kudu to take these files is 389d4f1e1c. Testing: Passed core tests. Change-Id: Ifac41ffb3e1742ffb6a969cb1c368d6d93c23357 Reviewed-on: http://gerrit.cloudera.org:8080/15676 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
f2f4c9891a
commit
f7770f1d20
@@ -15,6 +15,20 @@
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
#######################################
|
||||
# block_bloom_filter_proto
|
||||
#######################################
|
||||
|
||||
PROTOBUF_GENERATE_CPP(
|
||||
BLOCK_BLOOM_FILTER_PROTO_SRCS BLOCK_BLOOM_FILTER_PROTO_HDRS BLOCK_BLOOM_FILTER_PROTO_TGTS
|
||||
SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
|
||||
BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
|
||||
PROTO_FILES block_bloom_filter.proto)
|
||||
ADD_EXPORTABLE_LIBRARY(block_bloom_filter_proto
|
||||
SRCS ${BLOCK_BLOOM_FILTER_PROTO_SRCS}
|
||||
DEPS hash_proto pb_util_proto protobuf
|
||||
NONLINK_DEPS ${BLOCK_BLOOM_FILTER_PROTO_TGTS})
|
||||
|
||||
#######################################
|
||||
# util_compression_proto
|
||||
#######################################
|
||||
@@ -269,6 +283,7 @@ else()
|
||||
endif()
|
||||
|
||||
set(UTIL_LIBS
|
||||
block_bloom_filter_proto
|
||||
crcutil
|
||||
gflags
|
||||
glog
|
||||
|
||||
@@ -20,8 +20,9 @@
|
||||
#include <cmath> // IWYU pragma: keep
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <iosfwd>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@@ -30,13 +31,18 @@
|
||||
#include <glog/logging.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "kudu/util/hash.pb.h"
|
||||
#include "kudu/util/memory/arena.h"
|
||||
#include "kudu/util/slice.h"
|
||||
#include "kudu/util/status.h"
|
||||
#include "kudu/util/test_macros.h"
|
||||
#include "kudu/util/test_util.h"
|
||||
|
||||
DECLARE_bool(disable_blockbloomfilter_avx2);
|
||||
|
||||
using namespace std; // NOLINT(*)
|
||||
using std::unique_ptr;
|
||||
using std::unordered_set;
|
||||
using std::vector;
|
||||
|
||||
namespace kudu {
|
||||
|
||||
@@ -44,6 +50,7 @@ class BlockBloomFilterTest : public KuduTest {
|
||||
public:
|
||||
void SetUp() override {
|
||||
SeedRandom();
|
||||
allocator_ = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
|
||||
}
|
||||
// Make a random uint32_t, avoiding the absent high bit and the low-entropy low bits
|
||||
// produced by rand().
|
||||
@@ -57,9 +64,8 @@ class BlockBloomFilterTest : public KuduTest {
|
||||
BlockBloomFilter* CreateBloomFilter(size_t log_space_bytes) {
|
||||
FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0;
|
||||
|
||||
unique_ptr<BlockBloomFilter> bf(
|
||||
new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
|
||||
CHECK_OK(bf->Init(log_space_bytes));
|
||||
unique_ptr<BlockBloomFilter> bf(new BlockBloomFilter(allocator_));
|
||||
CHECK_OK(bf->Init(log_space_bytes, FAST_HASH, 0));
|
||||
bloom_filters_.emplace_back(move(bf));
|
||||
return bloom_filters_.back().get();
|
||||
}
|
||||
@@ -70,6 +76,9 @@ class BlockBloomFilterTest : public KuduTest {
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
DefaultBlockBloomFilterBufferAllocator* allocator_;
|
||||
|
||||
private:
|
||||
vector<unique_ptr<BlockBloomFilter>> bloom_filters_;
|
||||
};
|
||||
@@ -81,16 +90,56 @@ TEST_F(BlockBloomFilterTest, Constructor) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlockBloomFilterTest, Clone) {
|
||||
Arena arena(1024);
|
||||
ArenaBlockBloomFilterBufferAllocator arena_allocator(&arena);
|
||||
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone = arena_allocator.Clone();
|
||||
|
||||
for (int log_space_bytes = 1; log_space_bytes <= 20; ++log_space_bytes) {
|
||||
auto* bf = CreateBloomFilter(log_space_bytes);
|
||||
int max_elems = BlockBloomFilter::MaxNdv(log_space_bytes, 0.01 /* fpp */);
|
||||
while (max_elems-- > 0) {
|
||||
bf->Insert(MakeRand());
|
||||
}
|
||||
unique_ptr<BlockBloomFilter> bf_clone;
|
||||
ASSERT_OK(bf->Clone(allocator_clone.get(), &bf_clone));
|
||||
ASSERT_NE(nullptr, bf_clone);
|
||||
ASSERT_EQ(*bf_clone, *bf);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlockBloomFilterTest, InvalidSpace) {
|
||||
BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
|
||||
BlockBloomFilter bf(allocator_);
|
||||
// Random number in the range [38, 64).
|
||||
const int log_space_bytes = 38 + rand() % (64 - 38);
|
||||
Status s = bf.Init(log_space_bytes);
|
||||
Status s = bf.Init(log_space_bytes, FAST_HASH, 0);
|
||||
ASSERT_TRUE(s.IsInvalidArgument());
|
||||
ASSERT_STR_CONTAINS(s.ToString(), "Bloom filter too large");
|
||||
bf.Close();
|
||||
}
|
||||
|
||||
// Simple Arena allocator based test that would sometimes trigger
|
||||
// SIGSEGV due to misalignment of the "directory_" ptr on AVX operations
|
||||
// before adding support for 32/64 byte alignment in Arena allocator.
|
||||
// It simulates the allocation pattern in wire-protocol.cc.
|
||||
TEST_F(BlockBloomFilterTest, ArenaAligned) {
|
||||
Arena a(64);
|
||||
auto* allocator = a.NewObject<ArenaBlockBloomFilterBufferAllocator>(&a);
|
||||
auto* bf = a.NewObject<BlockBloomFilter>(allocator);
|
||||
bf->Init(6, FAST_HASH, 0);
|
||||
bool key = true;
|
||||
Slice s(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
|
||||
bf->Insert(s);
|
||||
ASSERT_TRUE(bf->Find(s));
|
||||
}
|
||||
|
||||
TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) {
|
||||
BlockBloomFilter bf(allocator_);
|
||||
Status s = bf.Init(4, UNKNOWN_HASH, 0);
|
||||
ASSERT_TRUE(s.IsInvalidArgument());
|
||||
ASSERT_STR_CONTAINS(s.ToString(), "Invalid/Unsupported hash algorithm");
|
||||
}
|
||||
|
||||
// We can Insert() hashes into a Bloom filter with different spaces.
|
||||
TEST_F(BlockBloomFilterTest, Insert) {
|
||||
for (int i = 13; i < 17; ++i) {
|
||||
@@ -240,4 +289,46 @@ TEST_F(BlockBloomFilterTest, MinSpaceForFpp) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlockBloomFilterTest, Or) {
|
||||
BlockBloomFilter* bf1 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
|
||||
BlockBloomFilter* bf2 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
|
||||
|
||||
for (int i = 60; i < 80; ++i) bf2->Insert(i);
|
||||
for (int i = 0; i < 10; ++i) bf1->Insert(i);
|
||||
|
||||
ASSERT_OK(bf1->Or(*bf2));
|
||||
for (int i = 0; i < 10; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
|
||||
for (int i = 60; i < 80; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
|
||||
|
||||
// Insert another value to aggregated BloomFilter.
|
||||
for (int i = 11; i < 50; ++i) bf1->Insert(i);
|
||||
|
||||
for (int i = 11; i < 50; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
|
||||
ASSERT_FALSE(bf1->Find(81));
|
||||
|
||||
// Check that AlwaysFalse() is updated correctly.
|
||||
BlockBloomFilter* bf3 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
|
||||
BlockBloomFilter* always_false = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
|
||||
ASSERT_OK(bf3->Or(*always_false));
|
||||
EXPECT_TRUE(bf3->always_false());
|
||||
ASSERT_OK(bf3->Or(*bf2));
|
||||
EXPECT_FALSE(bf3->always_false());
|
||||
|
||||
// Invalid argument test cases.
|
||||
BlockBloomFilter* bf4 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
|
||||
BlockBloomFilter* bf5 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100000, 0.01));
|
||||
Status s = bf4->Or(*bf5);
|
||||
ASSERT_TRUE(s.IsInvalidArgument());
|
||||
ASSERT_STR_CONTAINS(s.ToString(), "Directory size don't match");
|
||||
|
||||
// Test the public OrEqualArray() function.
|
||||
static constexpr size_t kNumBytes = 64;
|
||||
unique_ptr<uint8_t[]> a_ptr(new uint8_t[kNumBytes]);
|
||||
unique_ptr<uint8_t[]> b_ptr(new uint8_t[kNumBytes]);
|
||||
memset(a_ptr.get(), 0xDE, kNumBytes);
|
||||
memset(b_ptr.get(), 0, kNumBytes);
|
||||
ASSERT_OK(BlockBloomFilter::OrEqualArray(kNumBytes, a_ptr.get(), b_ptr.get()));
|
||||
ASSERT_EQ(0, memcmp(a_ptr.get(), b_ptr.get(), kNumBytes));
|
||||
}
|
||||
} // namespace kudu
|
||||
|
||||
@@ -24,13 +24,20 @@
|
||||
#include <cmath>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "kudu/gutil/singleton.h"
|
||||
#include "kudu/gutil/cpu.h"
|
||||
#include "kudu/gutil/integral_types.h"
|
||||
#include "kudu/gutil/strings/substitute.h"
|
||||
#include "kudu/util/block_bloom_filter.pb.h"
|
||||
#include "kudu/util/flag_tags.h"
|
||||
#include "kudu/util/memory/arena.h"
|
||||
|
||||
using std::shared_ptr;
|
||||
using std::unique_ptr;
|
||||
using strings::Substitute;
|
||||
|
||||
DEFINE_bool(disable_blockbloomfilter_avx2, false,
|
||||
"Disable AVX2 operations in BlockBloomFilter. This flag has no effect if the target "
|
||||
@@ -40,15 +47,22 @@ TAG_FLAG(disable_blockbloomfilter_avx2, hidden);
|
||||
|
||||
namespace kudu {
|
||||
|
||||
// Initialize the static member variables from BlockBloomFilter class.
|
||||
constexpr uint32_t BlockBloomFilter::kRehash[8] __attribute__((aligned(32)));
|
||||
const base::CPU BlockBloomFilter::kCpu = base::CPU();
|
||||
// constexpr data member requires initialization in the class declaration.
|
||||
// Hence no duplicate initialization in the definition here.
|
||||
constexpr BlockBloomFilter* const BlockBloomFilter::kAlwaysTrueFilter;
|
||||
|
||||
BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_allocator) :
|
||||
always_false_(true),
|
||||
buffer_allocator_(buffer_allocator),
|
||||
log_num_buckets_(0),
|
||||
directory_mask_(0),
|
||||
directory_(nullptr) {
|
||||
directory_(nullptr),
|
||||
hash_algorithm_(UNKNOWN_HASH),
|
||||
hash_seed_(0) {
|
||||
|
||||
#ifdef USE_AVX2
|
||||
if (has_avx2()) {
|
||||
bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsertAVX2;
|
||||
@@ -61,14 +75,22 @@ BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_all
|
||||
bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsert;
|
||||
bucket_find_func_ptr_ = &BlockBloomFilter::BucketFind;
|
||||
#endif
|
||||
|
||||
DCHECK(bucket_insert_func_ptr_);
|
||||
DCHECK(bucket_find_func_ptr_);
|
||||
}
|
||||
|
||||
BlockBloomFilter::~BlockBloomFilter() {
|
||||
DCHECK(directory_ == nullptr) <<
|
||||
"Close() should have been called before the object is destroyed.";
|
||||
Close();
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::Init(const int log_space_bytes) {
|
||||
Status BlockBloomFilter::InitInternal(const int log_space_bytes,
|
||||
HashAlgorithm hash_algorithm,
|
||||
uint32_t hash_seed) {
|
||||
if (!HashUtil::IsComputeHash32Available(hash_algorithm)) {
|
||||
return Status::InvalidArgument(
|
||||
Substitute("Invalid/Unsupported hash algorithm $0", HashAlgorithm_Name(hash_algorithm)));
|
||||
}
|
||||
// Since log_space_bytes is in bytes, we need to convert it to the number of tiny
|
||||
// Bloom filters we will use.
|
||||
log_num_buckets_ = std::max(1, log_space_bytes - kLogBucketByteSize);
|
||||
@@ -76,7 +98,7 @@ Status BlockBloomFilter::Init(const int log_space_bytes) {
|
||||
// must be limited.
|
||||
if (log_num_buckets_ > 32) {
|
||||
return Status::InvalidArgument(
|
||||
strings::Substitute("Bloom filter too large. log_space_bytes: $0", log_space_bytes));
|
||||
Substitute("Bloom filter too large. log_space_bytes: $0", log_space_bytes));
|
||||
}
|
||||
// Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
|
||||
// that is too large.
|
||||
@@ -86,7 +108,59 @@ Status BlockBloomFilter::Init(const int log_space_bytes) {
|
||||
Close(); // Ensure that any previously allocated memory for directory_ is released.
|
||||
RETURN_NOT_OK(buffer_allocator_->AllocateBuffer(alloc_size,
|
||||
reinterpret_cast<void**>(&directory_)));
|
||||
memset(directory_, 0, alloc_size);
|
||||
hash_algorithm_ = hash_algorithm;
|
||||
hash_seed_ = hash_seed;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::Init(const int log_space_bytes, HashAlgorithm hash_algorithm,
|
||||
uint32_t hash_seed) {
|
||||
RETURN_NOT_OK(InitInternal(log_space_bytes, hash_algorithm, hash_seed));
|
||||
DCHECK(directory_);
|
||||
memset(directory_, 0, directory_size());
|
||||
always_false_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::InitFromDirectory(int log_space_bytes,
|
||||
const Slice& directory,
|
||||
bool always_false,
|
||||
HashAlgorithm hash_algorithm,
|
||||
uint32_t hash_seed) {
|
||||
RETURN_NOT_OK(InitInternal(log_space_bytes, hash_algorithm, hash_seed));
|
||||
DCHECK(directory_);
|
||||
|
||||
if (directory_size() != directory.size()) {
|
||||
return Status::InvalidArgument(
|
||||
Substitute("Mismatch in BlockBloomFilter source directory size $0 and expected size $1",
|
||||
directory.size(), directory_size()));
|
||||
}
|
||||
memcpy(directory_, directory.data(), directory.size());
|
||||
always_false_ = always_false;
|
||||
return Status();
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::InitFromPB(const BlockBloomFilterPB& bf_src) {
|
||||
if (!bf_src.has_log_space_bytes() || !bf_src.has_bloom_data() ||
|
||||
!bf_src.has_hash_algorithm() || !bf_src.has_always_false()) {
|
||||
return Status::InvalidArgument("Missing arguments to initialize BlockBloomFilter.");
|
||||
}
|
||||
|
||||
return InitFromDirectory(bf_src.log_space_bytes(), bf_src.bloom_data(), bf_src.always_false(),
|
||||
bf_src.hash_algorithm(), bf_src.hash_seed());
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::Clone(BlockBloomFilterBufferAllocatorIf* allocator,
|
||||
unique_ptr<BlockBloomFilter>* bf_out) const {
|
||||
unique_ptr<BlockBloomFilter> bf_clone(new BlockBloomFilter(allocator));
|
||||
|
||||
RETURN_NOT_OK(bf_clone->InitInternal(log_space_bytes(), hash_algorithm_, hash_seed_));
|
||||
DCHECK(bf_clone->directory_);
|
||||
CHECK_EQ(bf_clone->directory_size(), directory_size());
|
||||
memcpy(bf_clone->directory_, directory_, bf_clone->directory_size());
|
||||
bf_clone->always_false_ = always_false_;
|
||||
|
||||
*bf_out = std::move(bf_clone);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -171,6 +245,7 @@ void BlockBloomFilter::InsertNoAvx2(const uint32_t hash) noexcept {
|
||||
void BlockBloomFilter::Insert(const uint32_t hash) noexcept {
|
||||
always_false_ = false;
|
||||
const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
|
||||
DCHECK(bucket_insert_func_ptr_);
|
||||
(this->*bucket_insert_func_ptr_)(bucket_idx, hash);
|
||||
}
|
||||
|
||||
@@ -179,21 +254,162 @@ bool BlockBloomFilter::Find(const uint32_t hash) const noexcept {
|
||||
return false;
|
||||
}
|
||||
const uint32_t bucket_idx = Rehash32to32(hash) & directory_mask_;
|
||||
DCHECK(bucket_find_func_ptr_);
|
||||
return (this->*bucket_find_func_ptr_)(bucket_idx, hash);
|
||||
}
|
||||
|
||||
void BlockBloomFilter::CopyToPB(BlockBloomFilterPB* bf_dst) const {
|
||||
bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(directory_), directory_size());
|
||||
bf_dst->set_log_space_bytes(log_space_bytes());
|
||||
bf_dst->set_always_false(always_false_);
|
||||
bf_dst->set_hash_algorithm(hash_algorithm_);
|
||||
bf_dst->set_hash_seed(hash_seed_);
|
||||
}
|
||||
|
||||
bool BlockBloomFilter::operator==(const BlockBloomFilter& rhs) const {
|
||||
return always_false_ == rhs.always_false_ &&
|
||||
directory_mask_ == rhs.directory_mask_ &&
|
||||
directory_size() == rhs.directory_size() &&
|
||||
hash_algorithm_ == rhs.hash_algorithm_ &&
|
||||
hash_seed_ == rhs.hash_seed_ &&
|
||||
memcmp(directory_, rhs.directory_, directory_size()) == 0;
|
||||
}
|
||||
|
||||
bool BlockBloomFilter::operator!=(const BlockBloomFilter& rhs) const {
|
||||
return !(rhs == *this);
|
||||
}
|
||||
|
||||
void BlockBloomFilter::OrEqualArrayInternal(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out) {
|
||||
#ifdef USE_AVX2
|
||||
if (has_avx2()) {
|
||||
BlockBloomFilter::OrEqualArrayAVX2(n, in, out);
|
||||
} else {
|
||||
BlockBloomFilter::OrEqualArrayNoAVX2(n, in, out);
|
||||
}
|
||||
#else
|
||||
BlockBloomFilter::OrEqualArrayNoAVX2(n, in, out);
|
||||
#endif
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::OrEqualArray(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out) {
|
||||
if ((n % kBucketByteSize) != 0) {
|
||||
return Status::InvalidArgument(Substitute("Input size $0 not a multiple of 32-bytes", n));
|
||||
}
|
||||
|
||||
OrEqualArrayInternal(n, in, out);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlockBloomFilter::OrEqualArrayNoAVX2(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out) {
|
||||
// The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
|
||||
// written in a way that is very friendly to auto-vectorization. Instead, we manually
|
||||
// vectorize, increasing the speed by up to 56x.
|
||||
const __m128i* simd_in = reinterpret_cast<const __m128i*>(in);
|
||||
const __m128i* const simd_in_end = reinterpret_cast<const __m128i*>(in + n);
|
||||
__m128i* simd_out = reinterpret_cast<__m128i*>(out);
|
||||
// in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
|
||||
// == 16, we can do two _mm_or_si128's in each iteration without checking array
|
||||
// bounds.
|
||||
while (simd_in != simd_in_end) {
|
||||
for (int i = 0; i < 2; ++i, ++simd_in, ++simd_out) {
|
||||
_mm_storeu_si128(
|
||||
simd_out, _mm_or_si128(_mm_loadu_si128(simd_out), _mm_loadu_si128(simd_in)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status BlockBloomFilter::Or(const BlockBloomFilter& other) {
|
||||
// AlwaysTrueFilter is a special case implemented with a nullptr.
|
||||
// Hence Or'ing with an AlwaysTrueFilter will result in a Bloom filter that also
|
||||
// always returns true which'll require destructing this Bloom filter.
|
||||
// Moreover for a reference "other" to be an AlwaysTrueFilter the reference needs
|
||||
// to be created from a nullptr and so we get into undefined behavior territory.
|
||||
// Comparing AlwaysTrueFilter with "&other" results in a compiler warning for
|
||||
// comparing a non-null argument "other" with NULL [-Wnonnull-compare].
|
||||
// For above reasons, guard against it.
|
||||
CHECK_NE(kAlwaysTrueFilter, &other);
|
||||
|
||||
if (this == &other) {
|
||||
// No op.
|
||||
return Status::OK();
|
||||
}
|
||||
if (directory_size() != other.directory_size()) {
|
||||
return Status::InvalidArgument(Substitute("Directory size don't match. this: $0, other: $1",
|
||||
directory_size(), other.directory_size()));
|
||||
}
|
||||
if (other.always_false()) {
|
||||
// Nothing to do.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
OrEqualArrayInternal(directory_size(), reinterpret_cast<const uint8*>(other.directory_),
|
||||
reinterpret_cast<uint8*>(directory_));
|
||||
|
||||
always_false_ = false;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool BlockBloomFilter::has_avx2() {
|
||||
return !FLAGS_disable_blockbloomfilter_avx2 && kCpu.has_avx2();
|
||||
}
|
||||
|
||||
shared_ptr<DefaultBlockBloomFilterBufferAllocator>
|
||||
DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr() {
|
||||
// Meyer's Singleton.
|
||||
// Static variable initialization is thread-safe in C++11.
|
||||
static shared_ptr<DefaultBlockBloomFilterBufferAllocator> instance =
|
||||
DefaultBlockBloomFilterBufferAllocator::make_shared();
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator::GetSingleton() {
|
||||
return GetSingletonSharedPtr().get();
|
||||
}
|
||||
|
||||
shared_ptr<BlockBloomFilterBufferAllocatorIf>
|
||||
DefaultBlockBloomFilterBufferAllocator::Clone() const {
|
||||
return GetSingletonSharedPtr();
|
||||
}
|
||||
|
||||
Status DefaultBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
|
||||
int ret_code = posix_memalign(ptr, CACHELINE_SIZE, bytes);
|
||||
return ret_code == 0 ? Status::OK() :
|
||||
Status::RuntimeError(strings::Substitute("bad_alloc. bytes: $0", bytes));
|
||||
Status::RuntimeError(Substitute("bad_alloc. bytes: $0", bytes));
|
||||
}
|
||||
|
||||
void DefaultBlockBloomFilterBufferAllocator::FreeBuffer(void* ptr) {
|
||||
free(DCHECK_NOTNULL(ptr));
|
||||
}
|
||||
|
||||
DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator::GetSingleton() {
|
||||
return Singleton<DefaultBlockBloomFilterBufferAllocator>::get();
|
||||
ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator(Arena* arena) :
|
||||
arena_(DCHECK_NOTNULL(arena)),
|
||||
is_arena_owned_(false) {
|
||||
}
|
||||
|
||||
ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator() :
|
||||
arena_(new Arena(1024)),
|
||||
is_arena_owned_(true) {
|
||||
}
|
||||
|
||||
ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() {
|
||||
if (is_arena_owned_) {
|
||||
delete arena_;
|
||||
}
|
||||
}
|
||||
|
||||
Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
|
||||
DCHECK(arena_);
|
||||
static_assert(CACHELINE_SIZE >= 32,
|
||||
"For AVX operations, need buffers to be 32-bytes aligned or higher");
|
||||
*ptr = arena_->AllocateBytesAligned(bytes, CACHELINE_SIZE);
|
||||
return *ptr == nullptr ?
|
||||
Status::RuntimeError(Substitute("Arena bad_alloc. bytes: $0", bytes)) :
|
||||
Status::OK();
|
||||
}
|
||||
|
||||
} // namespace kudu
|
||||
|
||||
@@ -21,16 +21,27 @@
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
|
||||
#include <gflags/gflags_declare.h>
|
||||
#include <glog/logging.h>
|
||||
// Including glog/logging.h causes problems while compiling in Apache Impala for codegen.
|
||||
// IWYU pragma: no_include <glog/logging.h>
|
||||
|
||||
#include "kudu/gutil/cpu.h"
|
||||
#include "kudu/gutil/macros.h"
|
||||
#include "kudu/gutil/port.h"
|
||||
#include "kudu/util/hash.pb.h"
|
||||
#include "kudu/util/hash_util.h"
|
||||
#include "kudu/util/make_shared.h"
|
||||
#include "kudu/util/slice.h"
|
||||
#include "kudu/util/status.h"
|
||||
|
||||
DECLARE_bool(disable_blockbloomfilter_avx2);
|
||||
namespace base {
|
||||
class CPU;
|
||||
} // namespace base
|
||||
|
||||
namespace kudu {
|
||||
class Arena;
|
||||
class BlockBloomFilterPB;
|
||||
} // namespace kudu
|
||||
|
||||
namespace kudu {
|
||||
|
||||
@@ -68,25 +79,51 @@ class BlockBloomFilter {
|
||||
|
||||
// Reset the filter state, allocate/reallocate the internal data structures.
|
||||
// All calls to Insert() and Find() should only be done between the calls to Init() and
|
||||
// Close().Init and Close are safe to call multiple times.
|
||||
Status Init(int log_space_bytes);
|
||||
// Close(). Init and Close are safe to call multiple times.
|
||||
// BlockBloomFilter offers convenience of both directly inserting 32-bit integers
|
||||
// and letting the Insert()/Find() hash the keys. To avoid mistakes wherein
|
||||
// caller is using specific hash function and directly inserts 32-bit hash values
|
||||
// but misses specifying the hash function in Init() call, default values are not used.
|
||||
// Parameters:
|
||||
// "log_space_bytes": Log2 of the space in bytes for the BloomFilter.
|
||||
// "hash_algorithm": Hash algorithm used to hash the keys to 32-bit integers prior to doing
|
||||
// Insert() or Find().
|
||||
// "hash_seed": Seed used to hash the keys.
|
||||
Status Init(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed);
|
||||
// Initialize the BlockBloomFilter by de-serializing the protobuf message.
|
||||
Status InitFromPB(const BlockBloomFilterPB& bf_src);
|
||||
// Initialize the BlockBloomFilter from a populated "directory" structure.
|
||||
// Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message.
|
||||
Status InitFromDirectory(int log_space_bytes, const Slice& directory, bool always_false,
|
||||
HashAlgorithm hash_algorithm, uint32_t hash_seed);
|
||||
|
||||
// Clones the BlockBloomFilter using the supplied "allocator". The allocator is expected
|
||||
// to remain valid during the lifetime of the cloned BlockBloomFilter.
|
||||
// On success, returns Status::OK with cloned BlockBloomFilter in "bf_out" output parameter.
|
||||
// On failure, returns error status.
|
||||
Status Clone(BlockBloomFilterBufferAllocatorIf* allocator,
|
||||
std::unique_ptr<BlockBloomFilter>* bf_out) const;
|
||||
|
||||
void Close();
|
||||
|
||||
// Representation of a filter which allows all elements to pass.
|
||||
static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr;
|
||||
|
||||
bool AlwaysFalse() const { return always_false_; }
|
||||
|
||||
// Adds an element to the BloomFilter. The function used to generate 'hash' need not
|
||||
// have good uniformity, but it should have low collision probability. For instance, if
|
||||
// the set of values is 32-bit ints, the identity function is a valid hash function for
|
||||
// this Bloom filter, since the collision probability (the probability that two
|
||||
// non-equal values will have the same hash value) is 0.
|
||||
void Insert(uint32_t hash) noexcept;
|
||||
// Same as above with convenience of hashing the key.
|
||||
void Insert(const Slice& key) noexcept {
|
||||
Insert(HashUtil::ComputeHash32(key, hash_algorithm_, hash_seed_));
|
||||
}
|
||||
|
||||
// Finds an element in the BloomFilter, returning true if it is found and false (with
|
||||
// high probability) if it is not.
|
||||
bool Find(uint32_t hash) const noexcept;
|
||||
// Same as above with convenience of hashing the key.
|
||||
bool Find(const Slice& key) const noexcept {
|
||||
return Find(HashUtil::ComputeHash32(key, hash_algorithm_, hash_seed_));
|
||||
}
|
||||
|
||||
// As more distinct items are inserted into a BloomFilter, the false positive rate
|
||||
// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
|
||||
@@ -110,6 +147,43 @@ class BlockBloomFilter {
|
||||
return sizeof(Bucket) * (1LL << std::max<int>(1, log_heap_size - kLogBucketWordBits));
|
||||
}
|
||||
|
||||
// Serializes BlockBloomFilter to protobuf message.
|
||||
void CopyToPB(BlockBloomFilterPB* bf_dst) const;
|
||||
|
||||
bool operator==(const BlockBloomFilter& rhs) const;
|
||||
bool operator!=(const BlockBloomFilter& rhs) const;
|
||||
|
||||
// Computes the logical OR of this filter with 'other' and stores the result in this
|
||||
// filter.
|
||||
// Notes:
|
||||
// - The directory sizes of the Bloom filters must match.
|
||||
// - Or'ing with kAlwaysTrueFilter is disallowed.
|
||||
Status Or(const BlockBloomFilter& other);
|
||||
|
||||
// Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' bytes where 'n'
|
||||
// is multiple of 32-bytes.
|
||||
static Status OrEqualArray(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out);
|
||||
|
||||
// Returns whether the Bloom filter is empty and hence would return false for all lookups.
|
||||
bool always_false() const {
|
||||
return always_false_;
|
||||
}
|
||||
|
||||
// Returns amount of space used in log2 bytes.
|
||||
int log_space_bytes() const {
|
||||
return log_num_buckets_ + kLogBucketByteSize;
|
||||
}
|
||||
|
||||
// Returns the directory structure. Useful for serializing the BlockBloomFilter to
|
||||
// a custom protobuf message.
|
||||
Slice directory() const {
|
||||
return Slice(reinterpret_cast<const uint8_t*>(directory_), directory_size());
|
||||
}
|
||||
|
||||
// Representation of a filter which allows all elements to pass.
|
||||
static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr;
|
||||
|
||||
private:
|
||||
// always_false_ is true when the bloom filter hasn't had any elements inserted.
|
||||
bool always_false_;
|
||||
@@ -127,6 +201,8 @@ class BlockBloomFilter {
|
||||
|
||||
// log2(number of bytes in a bucket)
|
||||
static constexpr int kLogBucketByteSize = 5;
|
||||
// Bucket size in bytes.
|
||||
static constexpr size_t kBucketByteSize = 1UL << kLogBucketByteSize;
|
||||
|
||||
static_assert((1 << kLogBucketWordBits) == std::numeric_limits<BucketWord>::digits,
|
||||
"BucketWord must have a bit-width that is be a power of 2, like 64 for uint64_t.");
|
||||
@@ -144,7 +220,15 @@ class BlockBloomFilter {
|
||||
|
||||
Bucket* directory_;
|
||||
|
||||
// Same as Insert(), but skips the CPU check and assumes that AVX is not available.
|
||||
// Hash algorithm used to hash data to 32-bit value before insertion and lookup.
|
||||
HashAlgorithm hash_algorithm_;
|
||||
// Seed used with hash algorithm.
|
||||
uint32_t hash_seed_;
|
||||
|
||||
// Helper function for public Init() variants.
|
||||
Status InitInternal(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed);
|
||||
|
||||
// Same as Insert(), but skips the CPU check and assumes that AVX2 is not available.
|
||||
void InsertNoAvx2(uint32_t hash) noexcept;
|
||||
|
||||
// Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
|
||||
@@ -153,8 +237,17 @@ class BlockBloomFilter {
|
||||
|
||||
bool BucketFind(uint32_t bucket_idx, uint32_t hash) const noexcept;
|
||||
|
||||
// Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' without using AVX2
|
||||
// operations.
|
||||
static void OrEqualArrayNoAVX2(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out);
|
||||
// Helper function for OrEqualArray functions that encapsulates AVX2 v/s non-AVX2 logic to
|
||||
// invoke the right function.
|
||||
static void OrEqualArrayInternal(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out);
|
||||
|
||||
#ifdef USE_AVX2
|
||||
// Same as Insert(), but skips the CPU check and assumes that AVX is available.
|
||||
// Same as Insert(), but skips the CPU check and assumes that AVX2 is available.
|
||||
void InsertAvx2(uint32_t hash) noexcept __attribute__((__target__("avx2")));
|
||||
|
||||
// A faster SIMD version of BucketInsert().
|
||||
@@ -164,21 +257,25 @@ class BlockBloomFilter {
|
||||
// A faster SIMD version of BucketFind().
|
||||
bool BucketFindAVX2(uint32_t bucket_idx, uint32_t hash) const noexcept
|
||||
__attribute__((__target__("avx2")));
|
||||
|
||||
// Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' using AVX2
|
||||
// instructions. 'n' must be a multiple of 32.
|
||||
static void OrEqualArrayAVX2(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out) __attribute__((target("avx2")));
|
||||
#endif
|
||||
|
||||
// Function pointers initialized in constructor to avoid run-time cost
|
||||
// in hot-path of Find and Insert operations.
|
||||
// Function pointers initialized in the constructor to avoid run-time cost in hot-path
|
||||
// of Find and Insert operations.
|
||||
decltype(&BlockBloomFilter::BucketInsert) bucket_insert_func_ptr_;
|
||||
decltype(&BlockBloomFilter::BucketFind) bucket_find_func_ptr_;
|
||||
|
||||
// Size of the internal directory structure in bytes.
|
||||
int64_t directory_size() const {
|
||||
return 1ULL << (log_num_buckets_ + kLogBucketByteSize);
|
||||
return 1ULL << log_space_bytes();
|
||||
}
|
||||
|
||||
// Detect at run-time whether CPU supports AVX2
|
||||
static bool has_avx2() {
|
||||
return !FLAGS_disable_blockbloomfilter_avx2 && kCpu.has_avx2();
|
||||
}
|
||||
static bool has_avx2();
|
||||
|
||||
// Some constants used in hashing. #defined for efficiency reasons.
|
||||
#define BLOOM_HASH_CONSTANTS \
|
||||
@@ -211,21 +308,58 @@ class BlockBloomFilter {
|
||||
// Generic interface to allocate and de-allocate memory for the BlockBloomFilter.
|
||||
class BlockBloomFilterBufferAllocatorIf {
|
||||
public:
|
||||
virtual ~BlockBloomFilterBufferAllocatorIf() = default;
|
||||
virtual Status AllocateBuffer(size_t bytes, void** ptr) = 0;
|
||||
virtual void FreeBuffer(void* ptr) = 0;
|
||||
// Clones the allocator.
|
||||
virtual std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const = 0;
|
||||
};
|
||||
|
||||
class DefaultBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf {
|
||||
// Default allocator implemented as Singleton.
|
||||
class DefaultBlockBloomFilterBufferAllocator :
|
||||
public BlockBloomFilterBufferAllocatorIf,
|
||||
public enable_make_shared<DefaultBlockBloomFilterBufferAllocator> {
|
||||
public:
|
||||
// Required for Singleton.
|
||||
DefaultBlockBloomFilterBufferAllocator() = default;
|
||||
static std::shared_ptr<DefaultBlockBloomFilterBufferAllocator> GetSingletonSharedPtr();
|
||||
static DefaultBlockBloomFilterBufferAllocator* GetSingleton();
|
||||
~DefaultBlockBloomFilterBufferAllocator() override = default;
|
||||
|
||||
Status AllocateBuffer(size_t bytes, void** ptr) override;
|
||||
void FreeBuffer(void* ptr) override;
|
||||
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override;
|
||||
|
||||
static DefaultBlockBloomFilterBufferAllocator* GetSingleton();
|
||||
protected:
|
||||
// Protected default constructor to allow using make_shared()
|
||||
DefaultBlockBloomFilterBufferAllocator() = default;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(DefaultBlockBloomFilterBufferAllocator);
|
||||
};
|
||||
|
||||
class ArenaBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf {
|
||||
public:
|
||||
// Default constructor with arena that's owned by the allocator.
|
||||
ArenaBlockBloomFilterBufferAllocator();
|
||||
|
||||
// "arena" is expected to remain valid during the lifetime of the allocator.
|
||||
explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena);
|
||||
|
||||
~ArenaBlockBloomFilterBufferAllocator() override;
|
||||
|
||||
Status AllocateBuffer(size_t bytes, void** ptr) override;
|
||||
|
||||
void FreeBuffer(void* ptr) override {
|
||||
// NOP. Buffer will be de-allocated when the arena is destructed.
|
||||
}
|
||||
|
||||
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override {
|
||||
return std::make_shared<ArenaBlockBloomFilterBufferAllocator>();
|
||||
};
|
||||
|
||||
private:
|
||||
Arena* arena_;
|
||||
bool is_arena_owned_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ArenaBlockBloomFilterBufferAllocator);
|
||||
};
|
||||
|
||||
} // namespace kudu
|
||||
|
||||
37
be/src/kudu/util/block_bloom_filter.proto
Normal file
37
be/src/kudu/util/block_bloom_filter.proto
Normal file
@@ -0,0 +1,37 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
syntax = "proto2";
|
||||
package kudu;
|
||||
|
||||
option java_package = "org.apache.kudu";
|
||||
|
||||
import "kudu/util/hash.proto";
|
||||
import "kudu/util/pb_util.proto";
|
||||
|
||||
message BlockBloomFilterPB {
|
||||
// Log2 of the space required for the BlockBloomFilter.
|
||||
optional int32 log_space_bytes = 1;
|
||||
// The bloom filter bitmap.
|
||||
optional bytes bloom_data = 2 [(kudu.REDACT) = true];
|
||||
// Whether the BlockBloomFilter is empty and hence always returns false for lookups.
|
||||
optional bool always_false = 3;
|
||||
// Hash algorithm to generate 32-bit unsigned integer hash values before inserting
|
||||
// in the BlockBloomFilter.
|
||||
optional HashAlgorithm hash_algorithm = 4 [default = FAST_HASH];
|
||||
// Seed used to hash the input values in the hash algorithm.
|
||||
optional uint32 hash_seed = 5 [default = 0];
|
||||
}
|
||||
@@ -24,9 +24,14 @@
|
||||
|
||||
#include "kudu/util/block_bloom_filter.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <immintrin.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <ostream>
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "kudu/gutil/port.h"
|
||||
|
||||
namespace kudu {
|
||||
@@ -75,4 +80,20 @@ void BlockBloomFilter::InsertAvx2(const uint32_t hash) noexcept {
|
||||
BucketInsertAVX2(bucket_idx, hash);
|
||||
}
|
||||
|
||||
void BlockBloomFilter::OrEqualArrayAVX2(size_t n, const uint8_t* __restrict__ in,
|
||||
uint8_t* __restrict__ out) {
|
||||
static constexpr size_t kAVXRegisterBytes = sizeof(__m256d);
|
||||
static_assert(kAVXRegisterBytes == kBucketByteSize,
|
||||
"Unexpected AVX register bytes");
|
||||
DCHECK_EQ(n % kAVXRegisterBytes, 0) << "Invalid Bloom filter directory size";
|
||||
|
||||
const uint8_t* const in_end = in + n;
|
||||
for (; in != in_end; (in += kAVXRegisterBytes), (out += kAVXRegisterBytes)) {
|
||||
const double* double_in = reinterpret_cast<const double*>(in);
|
||||
double* double_out = reinterpret_cast<double*>(out);
|
||||
_mm256_storeu_pd(double_out,
|
||||
_mm256_or_pd(_mm256_loadu_pd(double_out), _mm256_loadu_pd(double_in)));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace kudu
|
||||
|
||||
@@ -24,4 +24,5 @@ enum HashAlgorithm {
|
||||
UNKNOWN_HASH = 0;
|
||||
MURMUR_HASH_2 = 1;
|
||||
CITY_HASH = 2;
|
||||
FAST_HASH = 3;
|
||||
}
|
||||
|
||||
@@ -15,11 +15,15 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "kudu/util/hash_util.h"
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "kudu/util/hash_util.h"
|
||||
#include "kudu/util/hash.pb.h"
|
||||
#include "kudu/util/slice.h"
|
||||
#include "kudu/util/test_macros.h"
|
||||
|
||||
namespace kudu {
|
||||
|
||||
@@ -53,6 +57,12 @@ TEST(HashUtilTest, TestFastHash64) {
|
||||
|
||||
hash = HashUtil::FastHash64("quick brown fox", 15, 42);
|
||||
ASSERT_EQ(3757424404558187042UL, hash);
|
||||
|
||||
hash = HashUtil::FastHash64(nullptr, 0, 0);
|
||||
ASSERT_EQ(12680076593665652444UL, hash);
|
||||
|
||||
hash = HashUtil::FastHash64("", 0, 0);
|
||||
ASSERT_EQ(0, hash);
|
||||
}
|
||||
|
||||
TEST(HashUtilTest, TestFastHash32) {
|
||||
@@ -66,6 +76,22 @@ TEST(HashUtilTest, TestFastHash32) {
|
||||
|
||||
hash = HashUtil::FastHash32("quick brown fox", 15, 42);
|
||||
ASSERT_EQ(1676541068U, hash);
|
||||
|
||||
hash = HashUtil::FastHash32(nullptr, 0, 0);
|
||||
ASSERT_EQ(842467426U, hash);
|
||||
|
||||
hash = HashUtil::FastHash32("", 0, 0);
|
||||
ASSERT_EQ(0, hash);
|
||||
}
|
||||
|
||||
TEST(HashUtilTest, TestComputeHash32Available) {
|
||||
Slice data("abcd");
|
||||
for (int h = HashAlgorithm_MIN; h <= HashAlgorithm_MAX; h++) {
|
||||
const auto hash_algorithm = static_cast<HashAlgorithm>(h);
|
||||
if (HashUtil::IsComputeHash32Available(hash_algorithm)) {
|
||||
NO_FATALS(HashUtil::ComputeHash32(data, hash_algorithm, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace kudu
|
||||
|
||||
@@ -18,17 +18,33 @@
|
||||
#ifndef KUDU_UTIL_HASH_UTIL_H
|
||||
#define KUDU_UTIL_HASH_UTIL_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <cstdint>
|
||||
|
||||
// Including glog/logging.h causes problems while compiling in Apache Impala for codegen.
|
||||
// IWYU pragma: no_include <glog/logging.h>
|
||||
|
||||
#include "kudu/gutil/port.h"
|
||||
#include "kudu/util/hash.pb.h"
|
||||
#include "kudu/util/slice.h"
|
||||
|
||||
namespace kudu {
|
||||
|
||||
/// Utility class to compute hash values.
|
||||
// Constant imported from Apache Impala used to compute hash values for special cases. It's an
|
||||
// arbitrary constant obtained by taking lower bytes of generated UUID. Helps distinguish NULL
|
||||
// values from empty objects.
|
||||
// Impala uses the direct BlockBloomFilter API and inserts hash value directly using its own
|
||||
// implementation of the Fast hash. Hence the value must match with Impala.
|
||||
//
|
||||
// Note: Since address of this static constexpr variable is used, declaring this as
|
||||
// a member variable of HashUtil requires an explicit definition in .cc file
|
||||
// and this class is completely defined in the header file to allow inlining.
|
||||
static constexpr uint32_t kHashValNull = 0x58081667;
|
||||
|
||||
// Utility class to compute hash values.
|
||||
class HashUtil {
|
||||
public:
|
||||
|
||||
/// Murmur2 hash implementation returning 64-bit hashes.
|
||||
// Murmur2 hash implementation returning 64-bit hashes.
|
||||
ATTRIBUTE_NO_SANITIZE_INTEGER
|
||||
static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) {
|
||||
static constexpr uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995UL;
|
||||
@@ -70,9 +86,17 @@ class HashUtil {
|
||||
// FastHash is simple, robust, and efficient general-purpose hash function from Google.
|
||||
// Implementation is adapted from https://code.google.com/archive/p/fast-hash/
|
||||
//
|
||||
// Adds special handling for nullptr input.
|
||||
//
|
||||
// Compute 64-bit FastHash.
|
||||
ATTRIBUTE_NO_SANITIZE_INTEGER
|
||||
static uint64_t FastHash64(const void* buf, size_t len, uint64_t seed) {
|
||||
// Special handling for nullptr input with possible non-zero length as could be the
|
||||
// case with nullable column values.
|
||||
if (buf == nullptr) {
|
||||
buf = &kHashValNull;
|
||||
len = sizeof(kHashValNull);
|
||||
}
|
||||
static constexpr uint64_t kMultiplier = 0x880355f21e6d1965UL;
|
||||
const uint64_t* pos = static_cast<const uint64_t*>(buf);
|
||||
const uint64_t* end = pos + (len / 8);
|
||||
@@ -112,6 +136,30 @@ class HashUtil {
|
||||
return h - (h >> 32);
|
||||
}
|
||||
|
||||
// Checks whether 32-bit version of the hash algorithm is available.
|
||||
// Must be kept in sync with ComputeHash32() function.
|
||||
static bool IsComputeHash32Available(HashAlgorithm hash_algorithm) {
|
||||
switch (hash_algorithm) {
|
||||
case FAST_HASH:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Compute 32-bit hash of the supplied data using the specified hash algorithm.
|
||||
// Must be kept in sync with IsComputeHash32Available() function.
|
||||
static uint32_t ComputeHash32(const Slice& data, HashAlgorithm hash_algorithm, uint32_t seed) {
|
||||
switch (hash_algorithm) {
|
||||
case FAST_HASH:
|
||||
return FastHash32(data.data(), data.size(), seed);
|
||||
default:
|
||||
// Can't use LOG(FATAL)/CHECK() since including glog/logging.h causes problems
|
||||
// with code-gen in Impala.
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
// Compression function for Merkle-Damgard construction.
|
||||
ATTRIBUTE_NO_SANITIZE_INTEGER
|
||||
|
||||
@@ -15,11 +15,14 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "kudu/util/memory/arena.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
@@ -27,9 +30,11 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "kudu/gutil/stringprintf.h"
|
||||
#include "kudu/util/memory/arena.h"
|
||||
#include "kudu/util/memory/memory.h"
|
||||
#include "kudu/util/mem_tracker.h"
|
||||
#include "kudu/util/memory/memory.h"
|
||||
#include "kudu/util/random.h"
|
||||
#include "kudu/util/slice.h"
|
||||
#include "kudu/util/test_util.h"
|
||||
|
||||
DEFINE_int32(num_threads, 16, "Number of threads to test");
|
||||
DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
|
||||
@@ -42,19 +47,52 @@ using std::string;
|
||||
using std::thread;
|
||||
using std::vector;
|
||||
|
||||
// From the "arena" allocate number of bytes required to copy the "to_write" buffer
|
||||
// and add the allocated buffer to the output "ptrs" vector.
|
||||
template<class ArenaType>
|
||||
static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
|
||||
std::vector<void *> ptrs;
|
||||
static void AllocateBytesAndWrite(ArenaType* arena, const Slice& to_write, vector<void*>* ptrs) {
|
||||
void *allocated_bytes = arena->AllocateBytes(to_write.size());
|
||||
ASSERT_NE(nullptr, allocated_bytes);
|
||||
memcpy(allocated_bytes, to_write.data(), to_write.size());
|
||||
ptrs->push_back(allocated_bytes);
|
||||
}
|
||||
|
||||
// From the "arena" allocate aligned bytes as specified by "alignment". Number of bytes
|
||||
// must be at least the size required to copy the "to_write" buffer.
|
||||
// Add the allocated aligned buffer to the output "ptrs" vector.
|
||||
template<class ArenaType, class RNG>
|
||||
static void AllocateAlignedBytesAndWrite(ArenaType* arena, const size_t alignment,
|
||||
const Slice& to_write, RNG* r, vector<void*>* ptrs) {
|
||||
// To test alignment we allocate random number of bytes within bounds
|
||||
// but write to fixed number of bytes "to_write".
|
||||
size_t num_bytes = FLAGS_alloc_size + r->Uniform32(128 - FLAGS_alloc_size + 1);
|
||||
ASSERT_LE(to_write.size(), num_bytes);
|
||||
void* allocated_bytes = arena->AllocateBytesAligned(num_bytes, alignment);
|
||||
ASSERT_NE(nullptr, allocated_bytes);
|
||||
ASSERT_EQ(0, reinterpret_cast<uintptr_t>(allocated_bytes) % alignment) <<
|
||||
"failed to align on " << alignment << "b boundary: " << allocated_bytes;
|
||||
memcpy(allocated_bytes, to_write.data(), to_write.size());
|
||||
ptrs->push_back(allocated_bytes);
|
||||
}
|
||||
|
||||
// Thread callback function used by bunch of test cases below.
|
||||
template<class ArenaType, class RNG, bool InvokeAligned = false>
|
||||
static void AllocateAndTestThreadFunc(ArenaType *arena, uint8_t thread_index, RNG* r) {
|
||||
vector<void *> ptrs;
|
||||
ptrs.reserve(FLAGS_allocs_per_thread);
|
||||
|
||||
char buf[FLAGS_alloc_size];
|
||||
uint8_t buf[FLAGS_alloc_size];
|
||||
memset(buf, thread_index, FLAGS_alloc_size);
|
||||
Slice data(buf, FLAGS_alloc_size);
|
||||
|
||||
for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
|
||||
void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
|
||||
CHECK(alloced);
|
||||
memcpy(alloced, buf, FLAGS_alloc_size);
|
||||
ptrs.push_back(alloced);
|
||||
if (InvokeAligned) {
|
||||
// Test alignment up to 64 bytes.
|
||||
const size_t alignment = 1 << (i % 7);
|
||||
AllocateAlignedBytesAndWrite(arena, alignment, data, r, &ptrs);
|
||||
} else {
|
||||
AllocateBytesAndWrite(arena, data, &ptrs);
|
||||
}
|
||||
}
|
||||
|
||||
for (void *p : ptrs) {
|
||||
@@ -65,26 +103,51 @@ static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
|
||||
}
|
||||
|
||||
// Non-templated function to forward to above -- simplifies thread creation
|
||||
static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
|
||||
AllocateThread(arena, thread_index);
|
||||
static void AllocateAndTestTSArenaFunc(ThreadSafeArena *arena, uint8_t thread_index,
|
||||
ThreadSafeRandom* r) {
|
||||
AllocateAndTestThreadFunc(arena, thread_index, r);
|
||||
}
|
||||
|
||||
template<typename ArenaType, typename RNG>
|
||||
static void TestArenaAlignmentHelper() {
|
||||
RNG r(SeedRandom());
|
||||
|
||||
for (size_t initial_size = 16; initial_size <= 2048; initial_size <<= 1) {
|
||||
ArenaType arena(initial_size);
|
||||
static constexpr bool kIsMultiThreaded = std::is_same<ThreadSafeArena, ArenaType>::value;
|
||||
if (kIsMultiThreaded) {
|
||||
vector<thread> threads;
|
||||
threads.reserve(FLAGS_num_threads);
|
||||
for (auto i = 0; i < FLAGS_num_threads; i++) {
|
||||
threads.emplace_back(
|
||||
AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokeAligned */>, &arena, i, &r);
|
||||
}
|
||||
for (thread& thr : threads) {
|
||||
thr.join();
|
||||
}
|
||||
} else {
|
||||
// Invoke the helper method on the same thread avoiding separate single
|
||||
// thread creation/join.
|
||||
AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokedAligned */>(&arena, 0, &r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TestArena, TestSingleThreaded) {
|
||||
Arena arena(128);
|
||||
AllocateThread(&arena, 0);
|
||||
Random r(SeedRandom());
|
||||
AllocateAndTestThreadFunc(&arena, 0, &r);
|
||||
}
|
||||
|
||||
|
||||
|
||||
TEST(TestArena, TestMultiThreaded) {
|
||||
CHECK(FLAGS_num_threads < 256);
|
||||
|
||||
ThreadSafeRandom r(SeedRandom());
|
||||
ThreadSafeArena arena(1024);
|
||||
|
||||
vector<thread> threads;
|
||||
for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
|
||||
threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
|
||||
threads.reserve(FLAGS_num_threads);
|
||||
for (auto i = 0; i < FLAGS_num_threads; i++) {
|
||||
threads.emplace_back(AllocateAndTestTSArenaFunc, &arena, i, &r);
|
||||
}
|
||||
|
||||
for (thread& thr : threads) {
|
||||
@@ -92,16 +155,24 @@ TEST(TestArena, TestMultiThreaded) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TestArena, TestAlignment) {
|
||||
ThreadSafeArena arena(1024);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int alignment = 1 << (1 % 5);
|
||||
TEST(TestArena, TestAlignmentThreadSafe) {
|
||||
TestArenaAlignmentHelper<ThreadSafeArena, ThreadSafeRandom>();
|
||||
}
|
||||
|
||||
void *ret = arena.AllocateBytesAligned(5, alignment);
|
||||
ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
|
||||
"failed to align on " << alignment << "b boundary: " <<
|
||||
ret;
|
||||
}
|
||||
TEST(TestArena, TestAlignmentNotThreadSafe) {
|
||||
TestArenaAlignmentHelper<Arena, Random>();
|
||||
}
|
||||
|
||||
TEST(TestArena, TestAlignmentSmallArena) {
|
||||
// Start with small initial size and allocate bytes more than the size of the current
|
||||
// component to trigger fallback code path in Arena. Moreover allocate number of bytes
|
||||
// with alignment such that "aligned_size" exceeds "next_component_size".
|
||||
Arena arena(16);
|
||||
constexpr size_t alignment = 32;
|
||||
void *ret = arena.AllocateBytesAligned(33, alignment);
|
||||
ASSERT_NE(nullptr, ret);
|
||||
ASSERT_EQ(0, reinterpret_cast<uintptr_t>(ret) % alignment) <<
|
||||
"failed to align on " << alignment << "b boundary: " << ret;
|
||||
}
|
||||
|
||||
TEST(TestArena, TestObjectAlignment) {
|
||||
|
||||
@@ -76,23 +76,36 @@ void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size
|
||||
// Really need to allocate more space.
|
||||
size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
|
||||
// But, allocate enough, even if the request is large. In this case,
|
||||
// might violate the max_element_size bound.
|
||||
if (next_component_size < size) {
|
||||
next_component_size = size;
|
||||
// might violate the "max_buffer_size_" bound.
|
||||
// Component allocation is guaranteed to be 16-byte aligned, see NewComponent(),
|
||||
// but we also need to support higher alignment values of 32 and 64 bytes and
|
||||
// hence we add padding so that first request to allocate bytes after new
|
||||
// component creation doesn't fail.
|
||||
size_t aligned_size;
|
||||
if (align <= 16) {
|
||||
aligned_size = size;
|
||||
} else {
|
||||
DCHECK(align == 32 || align == 64);
|
||||
aligned_size = size + align - 16;
|
||||
}
|
||||
|
||||
if (next_component_size < aligned_size) {
|
||||
next_component_size = aligned_size;
|
||||
}
|
||||
|
||||
// If soft quota is exhausted we will only get the "minimal" amount of memory
|
||||
// we ask for. In this case if we always use "size" as minimal, we may degrade
|
||||
// we ask for. In this case if we always use "aligned_size" as minimal, we may degrade
|
||||
// to allocating a lot of tiny components, one for each string added to the
|
||||
// arena. This would be very inefficient, so let's first try something between
|
||||
// "size" and "next_component_size". If it fails due to hard quota being
|
||||
// exhausted, we'll fall back to using "size" as minimal.
|
||||
size_t minimal = (size + next_component_size) / 2;
|
||||
CHECK_LE(size, minimal);
|
||||
// "aligned_size" and "next_component_size". If it fails due to hard quota being
|
||||
// exhausted, we'll fall back to using "aligned_size" as minimal.
|
||||
size_t minimal = (aligned_size + next_component_size) / 2;
|
||||
CHECK_LE(aligned_size, minimal);
|
||||
CHECK_LE(minimal, next_component_size);
|
||||
// Now, just make sure we can actually get the memory.
|
||||
Component* component = NewComponent(next_component_size, minimal);
|
||||
if (component == nullptr) {
|
||||
component = NewComponent(next_component_size, size);
|
||||
component = NewComponent(next_component_size, aligned_size);
|
||||
}
|
||||
if (!component) return nullptr;
|
||||
|
||||
|
||||
@@ -19,17 +19,15 @@
|
||||
//
|
||||
//
|
||||
// Memory arena for variable-length datatypes and STL collections.
|
||||
#pragma once
|
||||
|
||||
#ifndef KUDU_UTIL_MEMORY_ARENA_H_
|
||||
#define KUDU_UTIL_MEMORY_ARENA_H_
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <new>
|
||||
#include <ostream>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/signals2/dummy_mutex.hpp>
|
||||
@@ -37,7 +35,6 @@
|
||||
|
||||
#include "kudu/gutil/atomicops.h"
|
||||
#include "kudu/gutil/dynamic_annotations.h"
|
||||
#include "kudu/gutil/gscoped_ptr.h"
|
||||
#include "kudu/gutil/macros.h"
|
||||
#include "kudu/gutil/port.h"
|
||||
#include "kudu/gutil/strings/stringpiece.h"
|
||||
@@ -163,8 +160,8 @@ class ArenaBase {
|
||||
}
|
||||
|
||||
// Allocate bytes, ensuring a specified alignment.
|
||||
// NOTE: alignment MUST be a power of two, or else this will break.
|
||||
void* AllocateBytesAligned(const size_t size, const size_t alignment);
|
||||
// NOTE: alignment MUST be a power of two and only upto 64 bytes is supported.
|
||||
void* AllocateBytesAligned(size_t size, size_t alignment);
|
||||
|
||||
// Removes all data from the arena. (Invalidates all pointers returned by
|
||||
// AddSlice and AllocateBytes). Does not cause memory allocation.
|
||||
@@ -186,8 +183,9 @@ class ArenaBase {
|
||||
class Component;
|
||||
|
||||
// Fallback for AllocateBytes non-fast-path
|
||||
void* AllocateBytesFallback(const size_t size, const size_t align);
|
||||
void* AllocateBytesFallback(size_t size, size_t align);
|
||||
|
||||
// Returned component is guaranteed to be 16-byte aligned.
|
||||
Component* NewComponent(size_t requested_size, size_t minimum_size);
|
||||
void AddComponent(Component *component);
|
||||
|
||||
@@ -370,11 +368,24 @@ class ArenaBase<THREADSAFE>::Component {
|
||||
}
|
||||
|
||||
private:
|
||||
// Adjusts the supplied "offset" such that the combined "data" ptr and "offset" aligns
|
||||
// with "alignment" bytes.
|
||||
//
|
||||
// Component start address "data_" is only guaranteed to be 16-byte aligned with enough
|
||||
// bytes for the first request size plus any padding needed for alignment.
|
||||
// So to support alignment values greater than 16 bytes, align the destination address ptr
|
||||
// that'll be returned by AllocatedBytesAligned() and not just the "offset_".
|
||||
template<typename T>
|
||||
static inline T AlignOffset(const uint8_t* data, const T offset, const size_t alignment) {
|
||||
const auto data_start_addr = reinterpret_cast<uintptr_t>(data);
|
||||
return KUDU_ALIGN_UP((data_start_addr + offset), alignment) - data_start_addr;
|
||||
}
|
||||
|
||||
// Mark the given range unpoisoned in ASAN.
|
||||
// This is a no-op in a non-ASAN build.
|
||||
void AsanUnpoison(const void* addr, size_t size);
|
||||
|
||||
gscoped_ptr<Buffer> buffer_;
|
||||
std::unique_ptr<Buffer> buffer_;
|
||||
uint8_t* const data_;
|
||||
typename ArenaTraits<THREADSAFE>::offset_type offset_;
|
||||
const size_t size_;
|
||||
@@ -389,21 +400,21 @@ class ArenaBase<THREADSAFE>::Component {
|
||||
DISALLOW_COPY_AND_ASSIGN(Component);
|
||||
};
|
||||
|
||||
|
||||
// Thread-safe implementation
|
||||
template <>
|
||||
inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
|
||||
const size_t size, const size_t alignment) {
|
||||
// Special case check the allowed alignments. Currently, we only ensure
|
||||
// the allocated buffer components are 16-byte aligned, and the code path
|
||||
// doesn't support larger alignment.
|
||||
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
|
||||
alignment == 8 || alignment == 16)
|
||||
// the allocated buffer components are 16-byte aligned and add extra padding
|
||||
// to support 32/64 byte alignment but the code path hasn't been tested
|
||||
// with larger alignment values nor has there been a need.
|
||||
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
|
||||
alignment == 16 || alignment == 32 || alignment == 64)
|
||||
<< "bad alignment: " << alignment;
|
||||
retry:
|
||||
Atomic32 offset = Acquire_Load(&offset_);
|
||||
|
||||
Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
|
||||
Atomic32 aligned = AlignOffset(data_, offset, alignment);
|
||||
Atomic32 new_offset = aligned + size;
|
||||
|
||||
if (PREDICT_TRUE(new_offset <= size_)) {
|
||||
@@ -424,13 +435,15 @@ inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
|
||||
template <>
|
||||
inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
|
||||
const size_t size, const size_t alignment) {
|
||||
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
|
||||
alignment == 8 || alignment == 16)
|
||||
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
|
||||
alignment == 16 || alignment == 32 || alignment == 64)
|
||||
<< "bad alignment: " << alignment;
|
||||
size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
|
||||
|
||||
size_t aligned = AlignOffset(data_, offset_, alignment);
|
||||
uint8_t* destination = data_ + aligned;
|
||||
size_t save_offset = offset_;
|
||||
offset_ = aligned + size;
|
||||
|
||||
if (PREDICT_TRUE(offset_ <= size_)) {
|
||||
AsanUnpoison(data_ + aligned, size);
|
||||
return destination;
|
||||
@@ -497,5 +510,3 @@ inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) {
|
||||
}
|
||||
|
||||
} // namespace kudu
|
||||
|
||||
#endif // KUDU_UTIL_MEMORY_ARENA_H_
|
||||
|
||||
Reference in New Issue
Block a user