Files
impala/be/src/util/condition-variable.h
Michael Ho 705045021e IMPALA-4026: Implement double-buffering for BlockingQueue
With recent changes to improve the parquet scanner's efficency,
row batches are produced more quickly, leading to higher contention
in the blocking queue shared between scanner threads and the scan
node. The contention happens between different producers (i.e.
the scanner threads) and also to a lesser extent, between the
scanner threads and the scan node.

This change addresses the contention between the scanner threads
and the scan node by splitting the queue into a 'get_list_' and
a 'put_list_'. The consumers will consume from 'get_list_' until
it's exhausted while the producers will enqueue into 'put_list_'
until it's full. When 'get_list_' is exhausted, the consumer will
atomically swap the 'get_list_' with 'put_list_'. This reduces
the contention: 'get_list_' and 'put_list_' are protected by two
different locks so callers of BlockingGet() only contends for the
'put_lock_' when 'put_list_' is empty.

With this change, primitive_filter_bigint_non_selective improves
by 33.9%, going from 1.60s to 1.06s

Change-Id: Ib9f4cf351455efefb0f3bb791cf9bc82d1421d54
Reviewed-on: http://gerrit.cloudera.org:8080/4350
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Internal Jenkins
2016-09-30 06:16:07 +00:00

66 lines
2.3 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_CONDITION_VARIABLE_H
#define IMPALA_UTIL_CONDITION_VARIABLE_H
#include <boost/thread/pthread/timespec.hpp>
#include <boost/thread/mutex.hpp>
#include <pthread.h>
#include <unistd.h>
namespace impala {
/// Simple wrapper around POSIX pthread condition variable. This has lower overhead than
/// boost's implementation as it doesn't implement boost thread interruption.
class ConditionVariable {
public:
ConditionVariable() { pthread_cond_init(&cv_, NULL); }
~ConditionVariable() { pthread_cond_destroy(&cv_); }
/// Wait indefinitely on the condition variable until it's notified.
inline void Wait(boost::unique_lock<boost::mutex>& lock) {
DCHECK(lock.owns_lock());
pthread_mutex_t* mutex = lock.mutex()->native_handle();
pthread_cond_wait(&cv_, mutex);
}
/// Wait until the condition variable is notified or 'timeout' has passed.
/// Returns true if the condition variable is notified before the absolute timeout
/// specified in 'timeout' has passed. Returns false otherwise.
inline bool TimedWait(boost::unique_lock<boost::mutex>& lock,
const struct timespec* timeout) {
DCHECK(lock.owns_lock());
pthread_mutex_t* mutex = lock.mutex()->native_handle();
return pthread_cond_timedwait(&cv_, mutex, timeout) == 0;
}
/// Notify a single waiter on this condition variable.
inline void NotifyOne() { pthread_cond_signal(&cv_); }
/// Notify all waiters on this condition variable.
inline void NotifyAll() { pthread_cond_broadcast(&cv_); }
private:
pthread_cond_t cv_;
};
}
#endif