When building LLVM as DLL, the `ThreadPoolExecutor` can cause deadlocks on Windows. The threads were previously only joined when the destructor of the `ThreadPoolExecutor` is called, not when it's stopped. Destruction happens when unloading the DLL, when the global destructors are called. On Windows, `std::thread` uses `FreeLibraryAndExitThread` to cleanup a thread. This requires access to the loader lock, when a thread terminates. However, when destroying the pool, the loader lock is also held, as the DLL is being unloaded. If the threads did not end fast enough, the destructor would wait for them to join. At the same time, the threads would wait for the destructor to release the loader lock. Joining the threads when stopping the pool fixes that, as it ensures the threads are stopped when calling `llvm_shutdown`, outside the loader lock.
286 lines
8.4 KiB
C++
286 lines
8.4 KiB
C++
//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
|
||
//
|
||
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
|
||
// See https://llvm.org/LICENSE.txt for license information.
|
||
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
||
//
|
||
//===----------------------------------------------------------------------===//
|
||
|
||
#include "llvm/Support/Parallel.h"
|
||
#include "llvm/ADT/ScopeExit.h"
|
||
#include "llvm/Config/llvm-config.h"
|
||
#include "llvm/Support/ExponentialBackoff.h"
|
||
#include "llvm/Support/Jobserver.h"
|
||
#include "llvm/Support/ManagedStatic.h"
|
||
#include "llvm/Support/Threading.h"
|
||
|
||
#include <atomic>
|
||
#include <future>
|
||
#include <memory>
|
||
#include <mutex>
|
||
#include <thread>
|
||
#include <vector>
|
||
|
||
llvm::ThreadPoolStrategy llvm::parallel::strategy;
|
||
|
||
namespace llvm {
|
||
namespace parallel {
|
||
#if LLVM_ENABLE_THREADS
|
||
|
||
#ifdef _WIN32
|
||
static thread_local unsigned threadIndex = UINT_MAX;
|
||
|
||
unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
|
||
#else
|
||
thread_local unsigned threadIndex = UINT_MAX;
|
||
#endif
|
||
|
||
namespace detail {
|
||
|
||
namespace {
|
||
|
||
/// An abstract class that takes closures and runs them asynchronously.
|
||
class Executor {
|
||
public:
|
||
virtual ~Executor() = default;
|
||
virtual void add(std::function<void()> func) = 0;
|
||
virtual size_t getThreadCount() const = 0;
|
||
|
||
static Executor *getDefaultExecutor();
|
||
};
|
||
|
||
/// An implementation of an Executor that runs closures on a thread pool
|
||
/// in filo order.
|
||
class ThreadPoolExecutor : public Executor {
|
||
public:
|
||
explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
|
||
if (S.UseJobserver)
|
||
TheJobserver = JobserverClient::getInstance();
|
||
|
||
ThreadCount = S.compute_thread_count();
|
||
// Spawn all but one of the threads in another thread as spawning threads
|
||
// can take a while.
|
||
Threads.reserve(ThreadCount);
|
||
Threads.resize(1);
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
// Use operator[] before creating the thread to avoid data race in .size()
|
||
// in 'safe libc++' mode.
|
||
auto &Thread0 = Threads[0];
|
||
Thread0 = std::thread([this, S] {
|
||
for (unsigned I = 1; I < ThreadCount; ++I) {
|
||
Threads.emplace_back([this, S, I] { work(S, I); });
|
||
if (Stop)
|
||
break;
|
||
}
|
||
ThreadsCreated.set_value();
|
||
work(S, 0);
|
||
});
|
||
}
|
||
|
||
// To make sure the thread pool executor can only be created with a parallel
|
||
// strategy.
|
||
ThreadPoolExecutor() = delete;
|
||
|
||
void stop() {
|
||
{
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
if (Stop)
|
||
return;
|
||
Stop = true;
|
||
}
|
||
Cond.notify_all();
|
||
ThreadsCreated.get_future().wait();
|
||
|
||
std::thread::id CurrentThreadId = std::this_thread::get_id();
|
||
for (std::thread &T : Threads)
|
||
if (T.get_id() == CurrentThreadId)
|
||
T.detach();
|
||
else
|
||
T.join();
|
||
}
|
||
|
||
~ThreadPoolExecutor() override { stop(); }
|
||
|
||
struct Creator {
|
||
static void *call() { return new ThreadPoolExecutor(strategy); }
|
||
};
|
||
struct Deleter {
|
||
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
|
||
};
|
||
|
||
void add(std::function<void()> F) override {
|
||
{
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
WorkStack.push_back(std::move(F));
|
||
}
|
||
Cond.notify_one();
|
||
}
|
||
|
||
size_t getThreadCount() const override { return ThreadCount; }
|
||
|
||
private:
|
||
void work(ThreadPoolStrategy S, unsigned ThreadID) {
|
||
threadIndex = ThreadID;
|
||
S.apply_thread_strategy(ThreadID);
|
||
// Note on jobserver deadlock avoidance:
|
||
// GNU Make grants each invoked process one implicit job slot. Our
|
||
// JobserverClient models this by returning an implicit JobSlot on the
|
||
// first successful tryAcquire() in a process. This guarantees forward
|
||
// progress without requiring a dedicated "always-on" thread here.
|
||
|
||
while (true) {
|
||
if (TheJobserver) {
|
||
// Jobserver-mode scheduling:
|
||
// - Acquire one job slot (with exponential backoff to avoid busy-wait).
|
||
// - While holding the slot, drain and run tasks from the local queue.
|
||
// - Release the slot when the queue is empty or when shutting down.
|
||
// Rationale: Holding a slot amortizes acquire/release overhead over
|
||
// multiple tasks and avoids requeue/yield churn, while still enforcing
|
||
// the jobserver’s global concurrency limit. With K available slots,
|
||
// up to K workers run tasks in parallel; within each worker tasks run
|
||
// sequentially until the local queue is empty.
|
||
ExponentialBackoff Backoff(std::chrono::hours(24));
|
||
JobSlot Slot;
|
||
do {
|
||
if (Stop)
|
||
return;
|
||
Slot = TheJobserver->tryAcquire();
|
||
if (Slot.isValid())
|
||
break;
|
||
} while (Backoff.waitForNextAttempt());
|
||
|
||
llvm::scope_exit SlotReleaser(
|
||
[&] { TheJobserver->release(std::move(Slot)); });
|
||
|
||
while (true) {
|
||
std::function<void()> Task;
|
||
{
|
||
std::unique_lock<std::mutex> Lock(Mutex);
|
||
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
|
||
if (Stop && WorkStack.empty())
|
||
return;
|
||
if (WorkStack.empty())
|
||
break;
|
||
Task = std::move(WorkStack.back());
|
||
WorkStack.pop_back();
|
||
}
|
||
Task();
|
||
}
|
||
} else {
|
||
std::unique_lock<std::mutex> Lock(Mutex);
|
||
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
|
||
if (Stop)
|
||
break;
|
||
auto Task = std::move(WorkStack.back());
|
||
WorkStack.pop_back();
|
||
Lock.unlock();
|
||
Task();
|
||
}
|
||
}
|
||
}
|
||
|
||
std::atomic<bool> Stop{false};
|
||
std::vector<std::function<void()>> WorkStack;
|
||
std::mutex Mutex;
|
||
std::condition_variable Cond;
|
||
std::promise<void> ThreadsCreated;
|
||
std::vector<std::thread> Threads;
|
||
unsigned ThreadCount;
|
||
|
||
JobserverClient *TheJobserver = nullptr;
|
||
};
|
||
|
||
Executor *Executor::getDefaultExecutor() {
|
||
#ifdef _WIN32
|
||
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
|
||
// llvm_shutdown() on Windows. This is important to avoid various race
|
||
// conditions at process exit that can cause crashes or deadlocks.
|
||
|
||
static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
|
||
ThreadPoolExecutor::Deleter>
|
||
ManagedExec;
|
||
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
|
||
return Exec.get();
|
||
#else
|
||
// ManagedStatic is not desired on other platforms. When `Exec` is destroyed
|
||
// by llvm_shutdown(), worker threads will clean up and invoke TLS
|
||
// destructors. This can lead to race conditions if other threads attempt to
|
||
// access TLS objects that have already been destroyed.
|
||
static ThreadPoolExecutor Exec(strategy);
|
||
return &Exec;
|
||
#endif
|
||
}
|
||
} // namespace
|
||
} // namespace detail
|
||
|
||
size_t getThreadCount() {
|
||
return detail::Executor::getDefaultExecutor()->getThreadCount();
|
||
}
|
||
#endif
|
||
|
||
// Latch::sync() called by the dtor may cause one thread to block. If is a dead
|
||
// lock if all threads in the default executor are blocked. To prevent the dead
|
||
// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
|
||
// of nested parallel_for_each(), only the outermost one runs parallelly.
|
||
TaskGroup::TaskGroup()
|
||
#if LLVM_ENABLE_THREADS
|
||
: Parallel((parallel::strategy.ThreadsRequested != 1) &&
|
||
(threadIndex == UINT_MAX)) {}
|
||
#else
|
||
: Parallel(false) {}
|
||
#endif
|
||
TaskGroup::~TaskGroup() {
|
||
// We must ensure that all the workloads have finished before decrementing the
|
||
// instances count.
|
||
L.sync();
|
||
}
|
||
|
||
void TaskGroup::spawn(std::function<void()> F) {
|
||
#if LLVM_ENABLE_THREADS
|
||
if (Parallel) {
|
||
L.inc();
|
||
detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
|
||
F();
|
||
L.dec();
|
||
});
|
||
return;
|
||
}
|
||
#endif
|
||
F();
|
||
}
|
||
|
||
} // namespace parallel
|
||
} // namespace llvm
|
||
|
||
void llvm::parallelFor(size_t Begin, size_t End,
|
||
llvm::function_ref<void(size_t)> Fn) {
|
||
#if LLVM_ENABLE_THREADS
|
||
if (parallel::strategy.ThreadsRequested != 1) {
|
||
auto NumItems = End - Begin;
|
||
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
|
||
// overhead on large inputs.
|
||
auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
|
||
if (TaskSize == 0)
|
||
TaskSize = 1;
|
||
|
||
parallel::TaskGroup TG;
|
||
for (; Begin + TaskSize < End; Begin += TaskSize) {
|
||
TG.spawn([=, &Fn] {
|
||
for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
|
||
Fn(I);
|
||
});
|
||
}
|
||
if (Begin != End) {
|
||
TG.spawn([=, &Fn] {
|
||
for (size_t I = Begin; I != End; ++I)
|
||
Fn(I);
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
#endif
|
||
|
||
for (; Begin != End; ++Begin)
|
||
Fn(Begin);
|
||
}
|