[libc][CndVar] reimplmement conditional variable with FIFO ordering (#192748)
This PR reimplements conditional variable with two different variants: - futex-based shared condvar with atomic counter for waiters - queue-based private condvar Notice that thread-local queue node cannot be reliably accessed in shared processes, so we cannot use a unified implementation in this case. POSIX.1-2024 (Issue 8) added atomicity conditions to conditional variable: - The `pthread_cond_broadcast()` function shall, **as a single atomic operation**, determine which threads, if any, are blocked on the specified condition variable cond and unblock all of these threads. - The `pthread_cond_signal()` function shall, as a **single atomic operation**, determine which threads, if any, are blocked on the specified condition variable cond and unblock at least one of these threads. This means that threads parked after a condvar signal event shall not steal signals before it. From my read, both implementation fulfills the requirement but glibc claims that single futex does not provide the stronger ordering needed by its users hence switched to a rotational queue, to fulfill the requirements mentioned in https://sourceware.org/bugzilla/show_bug.cgi?id=13165. As in a single futex, the lock acquisition do not happen in order and hence when a spurious wakeup happen, later waiters may "steal" the signal. However musl's shared implementation and bionic's whole implementation and Rust's std condvar still stick to a signal-futex+accounting data style. Musl's private condvar and this implementation are even stronger in the sense that not only the queue is decided as an atomic event, the mutex acquisition also happens in baton-passing style. Our implementation is different from musl in the sense that we have done some spin attempts instead of always do futex syscall to requeue threads (with a new requeued state added). This gives a chance for the signal/waiter to stay in fast path if the queue is really small. Based on the microbenchmark, this implementation is generally more performant. We also abuse the `RawMutex`'s LOCKED state as "waiting for its turn". One caveat we can see from the benchmark (https://github.com/SchrodingerZhu/condvar-benchmark) is that strict FIFO condvar is not that good if users are maintain the order on themselves as in `turn_ring`, because many threads may just wake up first only see it is not its turn while blocking the real thread in turn. However, a semaphore or other synchronization primitives would be more suitable in those cases. Even though in benchmark like `turn_ring/broadcast_stress` made this code perform badly (but still similar to musl anyway), they are not really the correct usage. In other cases, we are always close to the fastest while providing a stronger FIFO semantic. TODO in future commit: - support pthread cancellation. Ideally we should block cancellation when signal is consumed and add cancellation callback for checking illegibility. - think about stronger ordering semantic in shared case (?)
This commit is contained in:
committed by
GitHub
parent
ccc608f119
commit
793bdd8597
@@ -154,6 +154,10 @@ function(_get_compile_options_from_config output_var)
|
||||
libc_add_definition(config_options "LIBC_THREAD_MODE=${LIBC_CONF_THREAD_MODE}")
|
||||
endif()
|
||||
|
||||
if(LIBC_CONF_TIMEOUT_ENSURE_MONOTONICITY)
|
||||
libc_add_definition(config_options "LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY")
|
||||
endif()
|
||||
|
||||
if(LIBC_CONF_TRAP_ON_RAISE_FP_EXCEPT)
|
||||
libc_add_definition(config_options "LIBC_TRAP_ON_RAISE_FP_EXCEPT")
|
||||
endif()
|
||||
|
||||
@@ -15,6 +15,7 @@ typedef struct {
|
||||
void *__qfront;
|
||||
void *__qback;
|
||||
__futex_word __qmtx;
|
||||
char __padding[4];
|
||||
} cnd_t;
|
||||
|
||||
#endif // LLVM_LIBC_TYPES_CND_T_H
|
||||
|
||||
@@ -38,8 +38,6 @@ if(TARGET libc.src.__support.threads.${LIBC_TARGET_OS}.futex_utils)
|
||||
raw_mutex
|
||||
HDRS
|
||||
raw_mutex.h
|
||||
COMPILE_OPTIONS
|
||||
${monotonicity_flags}
|
||||
DEPENDS
|
||||
.futex_utils
|
||||
libc.src.__support.threads.sleep
|
||||
@@ -55,7 +53,6 @@ if(TARGET libc.src.__support.threads.${LIBC_TARGET_OS}.futex_utils)
|
||||
raw_rwlock.h
|
||||
COMPILE_OPTIONS
|
||||
${rwlock_default_spin_count}
|
||||
${monotonicity_flags}
|
||||
DEPENDS
|
||||
.raw_mutex
|
||||
libc.src.__support.common
|
||||
@@ -147,12 +144,24 @@ if(TARGET libc.src.__support.threads.${LIBC_TARGET_OS}.callonce)
|
||||
)
|
||||
endif()
|
||||
|
||||
if(TARGET libc.src.__support.threads.${LIBC_TARGET_OS}.CndVar)
|
||||
add_object_library(
|
||||
if(TARGET libc.src.__support.threads.futex_utils)
|
||||
add_header_library(
|
||||
CndVar
|
||||
ALIAS
|
||||
HDRS
|
||||
CndVar.h
|
||||
DEPENDS
|
||||
.${LIBC_TARGET_OS}.CndVar
|
||||
.futex_utils
|
||||
.mutex
|
||||
.mutex_common
|
||||
.raw_mutex
|
||||
libc.hdr.stdint_proxy
|
||||
libc.src.__support.CPP.expected
|
||||
libc.src.__support.CPP.limits
|
||||
libc.src.__support.CPP.mutex
|
||||
libc.src.__support.CPP.new
|
||||
libc.src.__support.threads.sleep
|
||||
libc.src.__support.time.monotonicity
|
||||
libc.src.__support.time.abs_timeout
|
||||
)
|
||||
endif()
|
||||
|
||||
|
||||
@@ -6,49 +6,348 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#ifndef LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
|
||||
#define LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
|
||||
#ifndef LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
|
||||
#define LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
|
||||
|
||||
#include "hdr/stdint_proxy.h" // uint32_t
|
||||
#include "src/__support/CPP/limits.h"
|
||||
#include "src/__support/CPP/mutex.h"
|
||||
#include "src/__support/CPP/new.h"
|
||||
#include "src/__support/macros/config.h"
|
||||
#include "src/__support/threads/futex_utils.h" // Futex
|
||||
#include "src/__support/threads/mutex.h" // Mutex
|
||||
#include "src/__support/threads/raw_mutex.h" // RawMutex
|
||||
#include "src/__support/threads/futex_utils.h" // Futex
|
||||
#include "src/__support/threads/mutex.h" // Mutex
|
||||
#include "src/__support/threads/raw_mutex.h" // RawMutex
|
||||
#include "src/__support/threads/sleep.h"
|
||||
#include "src/__support/time/abs_timeout.h"
|
||||
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#include "src/__support/time/monotonicity.h"
|
||||
#endif
|
||||
|
||||
namespace LIBC_NAMESPACE_DECL {
|
||||
|
||||
enum class CndVarResult {
|
||||
Success,
|
||||
MutexError,
|
||||
Timeout,
|
||||
};
|
||||
|
||||
class CndVar {
|
||||
enum CndWaiterStatus : uint32_t {
|
||||
WS_Waiting = 0xE,
|
||||
WS_Signalled = 0x5,
|
||||
public:
|
||||
using Timeout = internal::AbsTimeout;
|
||||
|
||||
private:
|
||||
// A single-waiter multiple-notifier barrier used to keep
|
||||
// track of cancellation threads. We use this barrier to
|
||||
// ensure in-queue threads that have posted their cancellation
|
||||
// request have finished dequeue themselves.
|
||||
class CancellationBarrier {
|
||||
LIBC_INLINE_VAR static constexpr size_t CANCEL_STEP = 2;
|
||||
LIBC_INLINE_VAR static constexpr size_t SLEEPING_BIT = 1;
|
||||
|
||||
// LSB indicates whether the waiter is in sleeping state.
|
||||
Futex futex;
|
||||
|
||||
public:
|
||||
LIBC_INLINE CancellationBarrier() : futex(0) {}
|
||||
// Add one more notification request.
|
||||
LIBC_INLINE void add_one() {
|
||||
futex.fetch_add(CANCEL_STEP, cpp::MemoryOrder::RELAXED);
|
||||
}
|
||||
// Send notification to one waiter.
|
||||
LIBC_INLINE void notify() {
|
||||
FutexWordType res = futex.fetch_sub(CANCEL_STEP);
|
||||
// Only need to goto syscall if waiter is sleep and we are the last one
|
||||
if (res <= (CANCEL_STEP | SLEEPING_BIT) && (res & SLEEPING_BIT) != 0)
|
||||
futex.notify_one();
|
||||
}
|
||||
LIBC_INLINE void wait() {
|
||||
size_t spin = 0;
|
||||
while (auto remaining = futex.load(cpp::MemoryOrder::RELAXED)) {
|
||||
// Set LSB to 1 to indicate that the waiter is entering sleeping
|
||||
// state.
|
||||
FutexWordType new_val = remaining | SLEEPING_BIT;
|
||||
if (spin > LIBC_COPT_RAW_MUTEX_DEFAULT_SPIN_COUNT &&
|
||||
futex.compare_exchange_strong(remaining, new_val)) {
|
||||
futex.wait(new_val, /*timeout=*/cpp::nullopt, /*is_pshared=*/false);
|
||||
futex.fetch_sub(1);
|
||||
spin = 0;
|
||||
}
|
||||
sleep_briefly();
|
||||
spin++;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct CndWaiter {
|
||||
Futex futex_word = WS_Waiting;
|
||||
CndWaiter *next = nullptr;
|
||||
enum WaiterState : uint8_t {
|
||||
Waiting = 0,
|
||||
Signalled = 1,
|
||||
Cancelled = 2,
|
||||
Requeued = 3,
|
||||
};
|
||||
|
||||
CndWaiter *waitq_front;
|
||||
CndWaiter *waitq_back;
|
||||
RawMutex qmtx;
|
||||
template <typename T> struct QueueNode {
|
||||
T *prev;
|
||||
T *next;
|
||||
|
||||
LIBC_INLINE T *self() { return static_cast<T *>(this); }
|
||||
|
||||
// We use cyclic dummy node to avoid handing corner cases.
|
||||
LIBC_INLINE void ensure_queue_initialization() {
|
||||
if (LIBC_UNLIKELY(prev == nullptr))
|
||||
prev = next = self();
|
||||
}
|
||||
|
||||
// Assume `this` the dummy node of queue. Push back `waiter` to the queue.
|
||||
LIBC_INLINE void push_back(T *waiter) {
|
||||
ensure_queue_initialization();
|
||||
waiter->next = self();
|
||||
waiter->prev = prev;
|
||||
waiter->next->prev = waiter;
|
||||
waiter->prev->next = waiter;
|
||||
}
|
||||
|
||||
// Remove `waiter` from the queue.
|
||||
LIBC_INLINE static void remove(T *waiter) {
|
||||
waiter->next->prev = waiter->prev;
|
||||
waiter->prev->next = waiter->next;
|
||||
waiter->prev = waiter->next = waiter;
|
||||
}
|
||||
|
||||
LIBC_INLINE bool is_empty() {
|
||||
ensure_queue_initialization();
|
||||
return self() == next;
|
||||
}
|
||||
|
||||
// Assume `this` is the dummy node of the queue. Separate nodes before
|
||||
// cursor into a separate queue.
|
||||
LIBC_INLINE void separate(T *cursor) {
|
||||
T *removed_head = this->next;
|
||||
T *removed_tail = cursor->prev;
|
||||
this->next = cursor;
|
||||
cursor->prev = self();
|
||||
removed_tail->next = removed_head;
|
||||
removed_head->prev = removed_tail;
|
||||
}
|
||||
};
|
||||
|
||||
// This node will be on the per-thread stack.
|
||||
struct CndWaiter : QueueNode<CndWaiter> {
|
||||
cpp::Atomic<CancellationBarrier *> cancellation_barrier;
|
||||
RawMutex barrier;
|
||||
cpp::Atomic<uint8_t> state;
|
||||
|
||||
LIBC_INLINE CndWaiter()
|
||||
: QueueNode{}, cancellation_barrier(nullptr), barrier{},
|
||||
state{Waiting} {
|
||||
// this lock should always success as no contention is possible
|
||||
[[maybe_unused]] bool locked = barrier.try_lock();
|
||||
LIBC_ASSERT(locked);
|
||||
}
|
||||
|
||||
LIBC_INLINE void confirm_cancellation() {
|
||||
if (CancellationBarrier *sender = cancellation_barrier.load())
|
||||
sender->notify();
|
||||
}
|
||||
};
|
||||
|
||||
// Group structures with similar alignment together to
|
||||
// save trailing padding bytes, such that is_shared
|
||||
// can be introduced without extra space.
|
||||
union {
|
||||
QueueNode<CndWaiter> waiter_queue;
|
||||
cpp::Atomic<size_t> shared_waiters;
|
||||
};
|
||||
|
||||
union {
|
||||
RawMutex queue_lock;
|
||||
Futex shared_futex;
|
||||
};
|
||||
|
||||
bool is_shared;
|
||||
|
||||
LIBC_INLINE void notify(bool is_broadcast) {
|
||||
if (LIBC_UNLIKELY(is_shared)) {
|
||||
if (shared_waiters.load() == 0)
|
||||
return;
|
||||
// increase the sequence number
|
||||
shared_futex.fetch_add(1);
|
||||
if (is_broadcast)
|
||||
shared_futex.notify_all();
|
||||
else
|
||||
shared_futex.notify_one();
|
||||
return;
|
||||
}
|
||||
|
||||
size_t limit =
|
||||
is_broadcast ? cpp::numeric_limits<size_t>::max() : size_t{1};
|
||||
CancellationBarrier cancellation_barrier{};
|
||||
CndWaiter *head = nullptr;
|
||||
CndWaiter *cursor = nullptr;
|
||||
// Go through the queue, try send signal to waiters.
|
||||
// 1. if signal is sent, we reduce the number of pending signals
|
||||
// 2. if waiter cancelled before signal is sent, we add it
|
||||
// to cancellation barrier and continue
|
||||
// Notice that cancelled sender will not continue before
|
||||
// we release the queue lock, because they also need to
|
||||
// acquire the lock and dequeue themselves.
|
||||
{
|
||||
cpp::lock_guard lock(queue_lock);
|
||||
if (waiter_queue.is_empty())
|
||||
return;
|
||||
for (cursor = waiter_queue.next; cursor != waiter_queue.self();
|
||||
cursor = cursor->next) {
|
||||
if (limit == 0)
|
||||
break;
|
||||
uint8_t expected = Waiting;
|
||||
if (!cursor->state.compare_exchange_strong(expected, Signalled)) {
|
||||
cancellation_barrier.add_one();
|
||||
cursor->cancellation_barrier.store(&cancellation_barrier);
|
||||
continue;
|
||||
}
|
||||
if (!head)
|
||||
head = cursor;
|
||||
limit--;
|
||||
}
|
||||
// remove everything before cursor
|
||||
waiter_queue.separate(cursor);
|
||||
}
|
||||
// We want to make sure the propagation queue contain only threads
|
||||
// that have consumed the signal. So we wait until all cancelled
|
||||
// finishing their dequeue operation.
|
||||
cancellation_barrier.wait();
|
||||
// Start propagate notification to the first waiter in the queue.
|
||||
// Waiters in the queue will acquire the lock in strict FIFO order:
|
||||
// Only when the predecessor has acquired the lock can the successor
|
||||
// be waken up to compete for the mutex.
|
||||
if (head)
|
||||
head->barrier.unlock();
|
||||
}
|
||||
|
||||
public:
|
||||
LIBC_INLINE static int init(CndVar *cv) {
|
||||
cv->waitq_front = cv->waitq_back = nullptr;
|
||||
RawMutex::init(&cv->qmtx);
|
||||
return 0;
|
||||
LIBC_INLINE constexpr CndVar(bool is_shared)
|
||||
: waiter_queue{}, queue_lock{}, is_shared(is_shared) {
|
||||
if (is_shared) {
|
||||
new (&shared_waiters) cpp::Atomic<size_t>(0);
|
||||
new (&shared_futex) Futex(0);
|
||||
}
|
||||
}
|
||||
|
||||
LIBC_INLINE static void destroy(CndVar *cv) {
|
||||
cv->waitq_front = cv->waitq_back = nullptr;
|
||||
LIBC_INLINE void reset() {
|
||||
if (is_shared) {
|
||||
shared_waiters.store(0);
|
||||
shared_futex.store(0);
|
||||
return;
|
||||
}
|
||||
queue_lock.reset();
|
||||
waiter_queue.prev = nullptr;
|
||||
waiter_queue.next = nullptr;
|
||||
}
|
||||
|
||||
// Returns 0 on success, -1 on error.
|
||||
int wait(Mutex *m);
|
||||
void notify_one();
|
||||
void broadcast();
|
||||
// TODO: register callback for pthread cancellation
|
||||
LIBC_INLINE CndVarResult wait(Mutex *mutex,
|
||||
cpp::optional<Timeout> timeout = cpp::nullopt) {
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
if (timeout)
|
||||
ensure_monotonicity(*timeout);
|
||||
#endif
|
||||
|
||||
if (LIBC_UNLIKELY(is_shared)) {
|
||||
shared_waiters.fetch_add(1);
|
||||
FutexWordType old_val = shared_futex.load();
|
||||
mutex->unlock();
|
||||
ErrorOr<int> result =
|
||||
shared_futex.wait(old_val, timeout, /*is_pshared=*/true);
|
||||
shared_waiters.fetch_sub(1);
|
||||
MutexError mutex_result = mutex->lock();
|
||||
if (!result.has_value() && result.error() == ETIMEDOUT)
|
||||
return CndVarResult::Timeout;
|
||||
return mutex_result == MutexError::NONE ? CndVarResult::Success
|
||||
: CndVarResult::MutexError;
|
||||
}
|
||||
|
||||
CndWaiter waiter{};
|
||||
// Register the waiter to the queue.
|
||||
{
|
||||
cpp::lock_guard lock(queue_lock);
|
||||
waiter_queue.push_back(&waiter);
|
||||
}
|
||||
|
||||
// Unlock the mutex and wait for the signal.
|
||||
mutex->unlock();
|
||||
// Notice that lock is already initialized as LOCKED. We abuse the LOCKED
|
||||
// state to indicate that the waiter is pending.
|
||||
bool locked = waiter.barrier.lock(timeout, /*is_shared=*/false);
|
||||
|
||||
// if we wake up and find that we are still waiting, this means
|
||||
// timeout has been reached.
|
||||
uint8_t old_state = Waiting;
|
||||
if (waiter.state.compare_exchange_strong(old_state, Cancelled,
|
||||
cpp::MemoryOrder::ACQ_REL)) {
|
||||
// we haven't consumed the signal before timeout reaches.
|
||||
{
|
||||
cpp::lock_guard lock(queue_lock);
|
||||
CndWaiter::remove(&waiter);
|
||||
}
|
||||
waiter.confirm_cancellation();
|
||||
} else if (!locked) {
|
||||
// Whenever a signal is already consumed, we compete for the mutex
|
||||
// in the FIFO order of the queue. We only relock if we previously
|
||||
// wake up due to timeout. Otherwise, it means that our turn has
|
||||
// come, so we don't need to relock.
|
||||
waiter.barrier.lock();
|
||||
}
|
||||
|
||||
// Reacquire the mutex lock. If error ever happens, we still wake up
|
||||
// our successor so that remaining waiters can continue. However, we treat
|
||||
// outselves as not owning the mutex and we don't touch the contention
|
||||
// bit.
|
||||
MutexError mutex_result = mutex->lock();
|
||||
// If we are requeued, we need to establish contention after lock, otherwise
|
||||
// requeued thread may clear the contention bit even though
|
||||
// there are still waiters behind it.
|
||||
if (mutex_result == MutexError::NONE &&
|
||||
waiter.state.load(cpp::MemoryOrder::RELAXED) == Requeued)
|
||||
mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
|
||||
// If there is other in the queue after us, we need to wake the next waiter.
|
||||
// If we cancelled, we should naturally have waiter.next == &waiter
|
||||
if (waiter.next != &waiter) {
|
||||
auto *next_waiter = waiter.next;
|
||||
CndWaiter::remove(&waiter);
|
||||
auto &next_barrier_futex = next_waiter->barrier.get_raw_futex();
|
||||
auto &mutex_futex = mutex->get_raw_futex();
|
||||
// the following is basically an inlined version of mutex::unlock
|
||||
// but with requeue instead of wake if it is possible.
|
||||
FutexWordType prev = next_barrier_futex.exchange(
|
||||
RawMutex::UNLOCKED, cpp::MemoryOrder::RELEASE);
|
||||
// If next waiter in queue sleeps, it will establish contention its own
|
||||
// barrier
|
||||
if (prev == RawMutex::IN_CONTENTION) {
|
||||
if (mutex_result == MutexError::NONE && mutex->can_be_requeued()) {
|
||||
ErrorOr<int> res = next_barrier_futex.requeue_to(
|
||||
mutex_futex, cpp::nullopt, /*wake_limit=*/0,
|
||||
/*requeue_limit=*/1,
|
||||
/*is_shared=*/false);
|
||||
if (!res.has_value()) // cannot requeue on this system
|
||||
next_waiter->barrier.wake(/*is_shared=*/false);
|
||||
else if (res.value() > 0) {
|
||||
next_waiter->state.store(Requeued, cpp::MemoryOrder::RELAXED);
|
||||
mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
|
||||
}
|
||||
} else { // cannot requeue under special lock mode
|
||||
next_waiter->barrier.wake(/*is_shared=*/false);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (mutex_result != MutexError::NONE)
|
||||
return CndVarResult::MutexError;
|
||||
return old_state == Waiting ? CndVarResult::Timeout : CndVarResult::Success;
|
||||
}
|
||||
|
||||
LIBC_INLINE void notify_one() { notify(/*is_broadcast=*/false); }
|
||||
LIBC_INLINE void broadcast() { notify(/*is_broadcast=*/true); }
|
||||
};
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
#endif // LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
|
||||
#endif // LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
|
||||
|
||||
@@ -24,13 +24,6 @@ add_header_library(
|
||||
libc.src.__support.time.abs_timeout
|
||||
)
|
||||
|
||||
set(monotonicity_flags)
|
||||
if (LIBC_CONF_TIMEOUT_ENSURE_MONOTONICITY)
|
||||
libc_set_definition(monotonicity_flags LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY=1)
|
||||
else()
|
||||
libc_set_definition(monotonicity_flags LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY=0)
|
||||
endif()
|
||||
|
||||
add_object_library(
|
||||
thread
|
||||
SRCS
|
||||
@@ -65,22 +58,6 @@ add_header_library(
|
||||
libc.src.__support.macros.optimization
|
||||
)
|
||||
|
||||
add_object_library(
|
||||
CndVar
|
||||
SRCS
|
||||
CndVar.cpp
|
||||
HDRS
|
||||
../CndVar.h
|
||||
DEPENDS
|
||||
libc.hdr.stdint_proxy
|
||||
libc.include.sys_syscall
|
||||
libc.src.__support.OSUtil.osutil
|
||||
libc.src.__support.threads.linux.futex_word_type
|
||||
libc.src.__support.threads.mutex
|
||||
libc.src.__support.threads.raw_mutex
|
||||
libc.src.__support.CPP.mutex
|
||||
)
|
||||
|
||||
add_object_library(
|
||||
barrier
|
||||
HDRS
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
//===-- Utility condition variable class ------------------------*- C++ -*-===//
|
||||
//
|
||||
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
|
||||
// See https://llvm.org/LICENSE.txt for license information.
|
||||
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#include "src/__support/threads/CndVar.h"
|
||||
#include "src/__support/CPP/mutex.h"
|
||||
#include "src/__support/OSUtil/syscall.h" // syscall_impl
|
||||
#include "src/__support/macros/config.h"
|
||||
#include "src/__support/threads/linux/futex_word.h" // FutexWordType
|
||||
#include "src/__support/threads/mutex.h" // Mutex
|
||||
#include "src/__support/threads/raw_mutex.h" // RawMutex
|
||||
|
||||
#include <sys/syscall.h> // For syscall numbers.
|
||||
|
||||
namespace LIBC_NAMESPACE_DECL {
|
||||
|
||||
int CndVar::wait(Mutex *m) {
|
||||
// The goal is to perform "unlock |m| and wait" in an
|
||||
// atomic operation. However, it is not possible to do it
|
||||
// in the true sense so we do it in spirit. Before unlocking
|
||||
// |m|, a new waiter object is added to the waiter queue with
|
||||
// the waiter queue locked. Iff a signalling thread signals
|
||||
// the waiter before the waiter actually starts waiting, the
|
||||
// wait operation will not begin at all and the waiter immediately
|
||||
// returns.
|
||||
|
||||
CndWaiter waiter;
|
||||
{
|
||||
cpp::lock_guard ml(qmtx);
|
||||
CndWaiter *old_back = nullptr;
|
||||
if (waitq_front == nullptr) {
|
||||
waitq_front = waitq_back = &waiter;
|
||||
} else {
|
||||
old_back = waitq_back;
|
||||
waitq_back->next = &waiter;
|
||||
waitq_back = &waiter;
|
||||
}
|
||||
|
||||
if (m->unlock() != MutexError::NONE) {
|
||||
// If we do not remove the queued up waiter before returning,
|
||||
// then another thread can potentially signal a non-existing
|
||||
// waiter. Note also that we do this with |qmtx| locked. This
|
||||
// ensures that another thread will not signal the withdrawing
|
||||
// waiter.
|
||||
waitq_back = old_back;
|
||||
if (waitq_back == nullptr)
|
||||
waitq_front = nullptr;
|
||||
else
|
||||
waitq_back->next = nullptr;
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
waiter.futex_word.wait(WS_Waiting, cpp::nullopt, true);
|
||||
|
||||
// At this point, if locking |m| fails, we can simply return as the
|
||||
// queued up waiter would have been removed from the queue.
|
||||
auto err = m->lock();
|
||||
return err == MutexError::NONE ? 0 : -1;
|
||||
}
|
||||
|
||||
void CndVar::notify_one() {
|
||||
// We don't use an RAII locker in this method as we want to unlock
|
||||
// |qmtx| and signal the waiter using a single FUTEX_WAKE_OP signal.
|
||||
qmtx.lock();
|
||||
if (waitq_front == nullptr)
|
||||
qmtx.unlock();
|
||||
|
||||
CndWaiter *first = waitq_front;
|
||||
waitq_front = waitq_front->next;
|
||||
if (waitq_front == nullptr)
|
||||
waitq_back = nullptr;
|
||||
|
||||
qmtx.reset();
|
||||
|
||||
// this is a special WAKE_OP, so we use syscall directly
|
||||
LIBC_NAMESPACE::syscall_impl<long>(
|
||||
FUTEX_SYSCALL_ID, &qmtx.get_raw_futex(), FUTEX_WAKE_OP, 1, 1,
|
||||
&first->futex_word.val,
|
||||
FUTEX_OP(FUTEX_OP_SET, WS_Signalled, FUTEX_OP_CMP_EQ, WS_Waiting));
|
||||
}
|
||||
|
||||
void CndVar::broadcast() {
|
||||
cpp::lock_guard ml(qmtx);
|
||||
uint32_t dummy_futex_word;
|
||||
CndWaiter *waiter = waitq_front;
|
||||
waitq_front = waitq_back = nullptr;
|
||||
while (waiter != nullptr) {
|
||||
// FUTEX_WAKE_OP is used instead of just FUTEX_WAKE as it allows us to
|
||||
// atomically update the waiter status to WS_Signalled before waking
|
||||
// up the waiter. A dummy location is used for the other futex of
|
||||
// FUTEX_WAKE_OP.
|
||||
LIBC_NAMESPACE::syscall_impl<long>(
|
||||
FUTEX_SYSCALL_ID, &dummy_futex_word, FUTEX_WAKE_OP, 1, 1,
|
||||
&waiter->futex_word.val,
|
||||
FUTEX_OP(FUTEX_OP_SET, WS_Signalled, FUTEX_OP_CMP_EQ, WS_Waiting));
|
||||
waiter = waiter->next;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "src/__support/threads/linux/barrier.h"
|
||||
#include "hdr/errno_macros.h"
|
||||
#include "src/__support/CPP/new.h"
|
||||
#include "src/__support/threads/CndVar.h"
|
||||
#include "src/__support/threads/mutex.h"
|
||||
|
||||
@@ -24,16 +25,11 @@ int Barrier::init(Barrier *b,
|
||||
b->waiting = 0;
|
||||
b->blocking = true;
|
||||
|
||||
int err;
|
||||
err = CndVar::init(&b->entering);
|
||||
if (err != 0)
|
||||
return err;
|
||||
new (&b->entering) CndVar(attr ? attr->pshared : false);
|
||||
new (&b->exiting) CndVar(attr ? attr->pshared : false);
|
||||
|
||||
err = CndVar::init(&b->exiting);
|
||||
if (err != 0)
|
||||
return err;
|
||||
|
||||
auto mutex_err = Mutex::init(&b->m, false, false, false, false);
|
||||
auto mutex_err = Mutex::init(&b->m, false, false, false,
|
||||
/*pshared=*/attr ? attr->pshared : false);
|
||||
if (mutex_err != MutexError::NONE)
|
||||
return EAGAIN;
|
||||
|
||||
@@ -76,8 +72,8 @@ int Barrier::wait() {
|
||||
}
|
||||
|
||||
int Barrier::destroy(Barrier *b) {
|
||||
CndVar::destroy(&b->entering);
|
||||
CndVar::destroy(&b->exiting);
|
||||
b->entering.reset();
|
||||
b->exiting.reset();
|
||||
Mutex::destroy(&b->m);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -21,12 +21,8 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#ifndef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#define LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY 1
|
||||
#endif
|
||||
|
||||
// TODO(bojle): check this for darwin impl
|
||||
#if LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#include "src/__support/time/monotonicity.h"
|
||||
#endif
|
||||
|
||||
@@ -45,6 +41,7 @@ protected:
|
||||
LIBC_INLINE_VAR static constexpr FutexWordType UNLOCKED = 0b00;
|
||||
LIBC_INLINE_VAR static constexpr FutexWordType LOCKED = 0b01;
|
||||
LIBC_INLINE_VAR static constexpr FutexWordType IN_CONTENTION = 0b10;
|
||||
friend class CndVar;
|
||||
|
||||
private:
|
||||
LIBC_INLINE FutexWordType spin(unsigned spin_count) {
|
||||
@@ -74,7 +71,7 @@ private:
|
||||
futex.compare_exchange_strong(state, LOCKED, cpp::MemoryOrder::ACQUIRE,
|
||||
cpp::MemoryOrder::RELAXED))
|
||||
return true;
|
||||
#if LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
/* ADL should kick in */
|
||||
if (timeout)
|
||||
ensure_monotonicity(*timeout);
|
||||
|
||||
@@ -24,12 +24,7 @@
|
||||
#define LIBC_COPT_RWLOCK_DEFAULT_SPIN_COUNT 100
|
||||
#endif
|
||||
|
||||
#ifndef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#define LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY 1
|
||||
#warning "LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY is not defined, defaulting to 1"
|
||||
#endif
|
||||
|
||||
#if LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#include "src/__support/time/monotonicity.h"
|
||||
#endif
|
||||
|
||||
@@ -361,7 +356,7 @@ private:
|
||||
LIBC_INLINE LockResult
|
||||
lock_slow(cpp::optional<Futex::Timeout> timeout = cpp::nullopt,
|
||||
unsigned spin_count = LIBC_COPT_RWLOCK_DEFAULT_SPIN_COUNT) {
|
||||
#if LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
|
||||
// Phase 2: convert the timeout if necessary.
|
||||
if (timeout)
|
||||
ensure_monotonicity(*timeout);
|
||||
|
||||
@@ -30,6 +30,9 @@ class Mutex final : private RawMutex {
|
||||
pid_t owner;
|
||||
unsigned long long lock_count;
|
||||
|
||||
// CndVar needs to access Mutex as RawMutex
|
||||
friend class CndVar;
|
||||
|
||||
public:
|
||||
LIBC_INLINE constexpr Mutex(bool is_timed, bool is_recursive, bool is_robust,
|
||||
bool is_pshared)
|
||||
@@ -82,6 +85,10 @@ public:
|
||||
return MutexError::NONE;
|
||||
return MutexError::BUSY;
|
||||
}
|
||||
|
||||
LIBC_INLINE bool can_be_requeued() const {
|
||||
return !this->pshared && !this->robust;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@@ -21,6 +21,7 @@ add_entrypoint_object(
|
||||
../cnd_init.h
|
||||
DEPENDS
|
||||
libc.include.threads
|
||||
libc.src.__support.CPP.new
|
||||
libc.src.__support.threads.CndVar
|
||||
)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ static_assert(sizeof(CndVar) == sizeof(cnd_t));
|
||||
|
||||
LLVM_LIBC_FUNCTION(void, cnd_destroy, (cnd_t * cond)) {
|
||||
CndVar *cndvar = reinterpret_cast<CndVar *>(cond);
|
||||
CndVar::destroy(cndvar);
|
||||
cndvar->reset();
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#include "src/threads/cnd_init.h"
|
||||
#include "src/__support/CPP/new.h"
|
||||
#include "src/__support/common.h"
|
||||
#include "src/__support/macros/config.h"
|
||||
#include "src/__support/threads/CndVar.h"
|
||||
@@ -18,8 +19,8 @@ namespace LIBC_NAMESPACE_DECL {
|
||||
static_assert(sizeof(CndVar) == sizeof(cnd_t));
|
||||
|
||||
LLVM_LIBC_FUNCTION(int, cnd_init, (cnd_t * cond)) {
|
||||
CndVar *cndvar = reinterpret_cast<CndVar *>(cond);
|
||||
return CndVar::init(cndvar) ? thrd_error : thrd_success;
|
||||
new (cond) CndVar(false);
|
||||
return thrd_success;
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@@ -21,7 +21,8 @@ static_assert(sizeof(CndVar) == sizeof(cnd_t));
|
||||
LLVM_LIBC_FUNCTION(int, cnd_wait, (cnd_t * cond, mtx_t *mtx)) {
|
||||
CndVar *cndvar = reinterpret_cast<CndVar *>(cond);
|
||||
Mutex *mutex = reinterpret_cast<Mutex *>(mtx);
|
||||
return cndvar->wait(mutex) ? thrd_error : thrd_success;
|
||||
return cndvar->wait(mutex) == CndVarResult::Success ? thrd_success
|
||||
: thrd_error;
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@@ -59,3 +59,21 @@ add_integration_test(
|
||||
libc.src.pthread.pthread_create
|
||||
libc.src.pthread.pthread_join
|
||||
)
|
||||
|
||||
add_integration_test(
|
||||
cndvar_test
|
||||
SUITE
|
||||
libc-support-threads-integration-tests
|
||||
SRCS
|
||||
cndvar_test.cpp
|
||||
DEPENDS
|
||||
libc.hdr.time_macros
|
||||
libc.src.__support.CPP.expected
|
||||
libc.src.__support.threads.CndVar
|
||||
libc.src.__support.threads.mutex
|
||||
libc.src.__support.threads.mutex_common
|
||||
libc.src.__support.threads.sleep
|
||||
libc.src.__support.time.clock_gettime
|
||||
libc.src.threads.thrd_create
|
||||
libc.src.threads.thrd_join
|
||||
)
|
||||
|
||||
128
libc/test/integration/src/__support/threads/cndvar_test.cpp
Normal file
128
libc/test/integration/src/__support/threads/cndvar_test.cpp
Normal file
@@ -0,0 +1,128 @@
|
||||
//===-- Integration test for CndVar with C11 threads ----------------------===//
|
||||
//
|
||||
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
|
||||
// See https://llvm.org/LICENSE.txt for license information.
|
||||
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#include "hdr/time_macros.h"
|
||||
#include "src/__support/CPP/expected.h"
|
||||
#include "src/__support/threads/CndVar.h"
|
||||
#include "src/__support/threads/mutex.h"
|
||||
#include "src/__support/threads/mutex_common.h"
|
||||
#include "src/__support/threads/sleep.h"
|
||||
#include "src/__support/time/clock_gettime.h"
|
||||
#include "src/threads/thrd_create.h"
|
||||
#include "src/threads/thrd_join.h"
|
||||
#include "test/IntegrationTest/test.h"
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr int THREAD_COUNT = 16;
|
||||
constexpr int NUM_ITERATIONS = 250;
|
||||
|
||||
struct QueueState {
|
||||
LIBC_NAMESPACE::CndVar cnd{false};
|
||||
LIBC_NAMESPACE::Mutex m{false, false, false, false};
|
||||
size_t consumed = 0;
|
||||
size_t produced = 0;
|
||||
LIBC_NAMESPACE::cpp::Atomic<size_t> exited_consumers = 0;
|
||||
bool use_broadcast;
|
||||
bool allow_timeout;
|
||||
};
|
||||
|
||||
void stress_test(bool use_broadcast, bool allow_timeout) {
|
||||
for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
|
||||
QueueState state{};
|
||||
state.use_broadcast = use_broadcast;
|
||||
state.allow_timeout = allow_timeout;
|
||||
constexpr size_t PRODUCER = THREAD_COUNT / 2;
|
||||
constexpr size_t CONSUMER = THREAD_COUNT - PRODUCER;
|
||||
constexpr size_t ITEMS_PER_PRODUCER = 200;
|
||||
thrd_t producer_threads[PRODUCER];
|
||||
thrd_t consumer_threads[CONSUMER];
|
||||
using TimeoutOpt =
|
||||
LIBC_NAMESPACE::cpp::optional<LIBC_NAMESPACE::CndVar::Timeout>;
|
||||
for (size_t i = 0; i < CONSUMER; ++i)
|
||||
ASSERT_EQ(LIBC_NAMESPACE::thrd_create(
|
||||
&consumer_threads[i],
|
||||
[](void *arg) {
|
||||
auto *state = static_cast<QueueState *>(arg);
|
||||
state->m.lock();
|
||||
while (state->consumed != PRODUCER * ITEMS_PER_PRODUCER) {
|
||||
TimeoutOpt timeout = LIBC_NAMESPACE::cpp::nullopt;
|
||||
if (state->allow_timeout) {
|
||||
timespec now{};
|
||||
LIBC_NAMESPACE::internal::clock_gettime(
|
||||
CLOCK_MONOTONIC, &now);
|
||||
size_t sleep_ns = 1000;
|
||||
now.tv_nsec += sleep_ns;
|
||||
if (now.tv_nsec >= 1'000'000'000) {
|
||||
now.tv_sec++;
|
||||
now.tv_nsec -= 1'000'000'000;
|
||||
}
|
||||
timeout = TimeoutOpt(
|
||||
LIBC_NAMESPACE::CndVar::Timeout::from_timespec(
|
||||
now,
|
||||
/*realtime=*/false)
|
||||
.value());
|
||||
}
|
||||
ASSERT_NE(state->cnd.wait(&state->m, timeout),
|
||||
LIBC_NAMESPACE::CndVarResult::MutexError);
|
||||
if (state->produced == 0)
|
||||
continue;
|
||||
state->produced--;
|
||||
state->consumed++;
|
||||
}
|
||||
state->m.unlock();
|
||||
state->exited_consumers.fetch_add(1);
|
||||
return 0;
|
||||
},
|
||||
&state),
|
||||
int(thrd_success));
|
||||
for (size_t i = 0; i < PRODUCER; ++i)
|
||||
ASSERT_EQ(LIBC_NAMESPACE::thrd_create(
|
||||
&producer_threads[i],
|
||||
[](void *arg) {
|
||||
auto *state = static_cast<QueueState *>(arg);
|
||||
for (size_t j = 0; j < ITEMS_PER_PRODUCER; ++j) {
|
||||
state->m.lock();
|
||||
state->produced++;
|
||||
if (state->use_broadcast)
|
||||
state->cnd.broadcast();
|
||||
else
|
||||
state->cnd.notify_one();
|
||||
state->m.unlock();
|
||||
}
|
||||
return 0;
|
||||
},
|
||||
&state),
|
||||
int(thrd_success));
|
||||
|
||||
// join producers
|
||||
for (size_t i = 0; i < PRODUCER; ++i)
|
||||
ASSERT_EQ(LIBC_NAMESPACE::thrd_join(producer_threads[i], nullptr),
|
||||
int(thrd_success));
|
||||
// keep signalling until all consumers have consumed all items
|
||||
while (state.exited_consumers != CONSUMER) {
|
||||
if (state.use_broadcast)
|
||||
state.cnd.broadcast();
|
||||
else
|
||||
state.cnd.notify_one();
|
||||
}
|
||||
// join consumers
|
||||
for (size_t i = 0; i < CONSUMER; ++i)
|
||||
ASSERT_EQ(LIBC_NAMESPACE::thrd_join(consumer_threads[i], nullptr),
|
||||
int(thrd_success));
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_MAIN() {
|
||||
stress_test(false, false);
|
||||
stress_test(true, false);
|
||||
stress_test(false, true);
|
||||
stress_test(true, true);
|
||||
return 0;
|
||||
}
|
||||
@@ -28,11 +28,13 @@ add_integration_test(
|
||||
DEPENDS
|
||||
libc.include.pthread
|
||||
libc.src.errno.errno
|
||||
libc.src.__support.CPP.atomic
|
||||
libc.src.pthread.pthread_barrier_destroy
|
||||
libc.src.pthread.pthread_barrier_wait
|
||||
libc.src.pthread.pthread_barrier_init
|
||||
libc.src.pthread.pthread_create
|
||||
libc.src.pthread.pthread_join
|
||||
libc.src.string.memset
|
||||
libc.src.stdio.printf
|
||||
)
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ add_integration_test(
|
||||
cnd_test.cpp
|
||||
DEPENDS
|
||||
libc.include.threads
|
||||
libc.src.__support.CPP.atomic
|
||||
libc.src.threads.cnd_init
|
||||
libc.src.threads.cnd_broadcast
|
||||
libc.src.threads.cnd_signal
|
||||
|
||||
@@ -54,4 +54,7 @@ LIBC_CONFIGURE_OPTIONS = [
|
||||
|
||||
# Documentation in libc/src/__support/libc_errno.h
|
||||
"LIBC_ERRNO_MODE=LIBC_ERRNO_MODE_SYSTEM_INLINE",
|
||||
|
||||
# Documentation in libc/src/__support/time/monotonicity.h
|
||||
"LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user