Nested TaskGroups run serially to prevent deadlock, as documented by https://reviews.llvm.org/D61115 and refined by https://reviews.llvm.org/D148984 to use threadIndex. Enable nested parallelism by having worker threads actively execute tasks from the work queue while waiting (work-stealing), instead of just blocking. Root-level TaskGroups (main thread) keep the efficient blocking Latch::sync(), so there is no overhead for the common non-nested case. In lld, https://reviews.llvm.org/D131247 worked around the limitation by passing a single root TaskGroup into OutputSection::writeTo and spawning 4MB-chunked tasks into it. However, SyntheticSection::writeTo calls with internal parallelism (e.g. GdbIndexSection, MergeNoTailSection) still ran serially on worker threads. With this change, their internal parallelFor/parallelForEach calls parallelize automatically via helpSync work-stealing. The increased parallelism can reorder error messages from parallel phases (e.g. relocation processing during section writes), so one lld test is updated to use --threads=1 for deterministic output.
304 lines
8.8 KiB
C++
304 lines
8.8 KiB
C++
//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
|
||
//
|
||
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
|
||
// See https://llvm.org/LICENSE.txt for license information.
|
||
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
||
//
|
||
//===----------------------------------------------------------------------===//
|
||
|
||
#include "llvm/Support/Parallel.h"
|
||
#include "llvm/ADT/ScopeExit.h"
|
||
#include "llvm/Config/llvm-config.h"
|
||
#include "llvm/Support/ExponentialBackoff.h"
|
||
#include "llvm/Support/Jobserver.h"
|
||
#include "llvm/Support/ManagedStatic.h"
|
||
#include "llvm/Support/Threading.h"
|
||
|
||
#include <atomic>
|
||
#include <future>
|
||
#include <memory>
|
||
#include <mutex>
|
||
#include <thread>
|
||
#include <vector>
|
||
|
||
using namespace llvm;
|
||
using namespace llvm::parallel;
|
||
|
||
llvm::ThreadPoolStrategy parallel::strategy;
|
||
|
||
#if LLVM_ENABLE_THREADS
|
||
|
||
#ifdef _WIN32
|
||
static thread_local unsigned threadIndex = UINT_MAX;
|
||
|
||
unsigned parallel::getThreadIndex() { GET_THREAD_INDEX_IMPL; }
|
||
#else
|
||
thread_local unsigned parallel::threadIndex = UINT_MAX;
|
||
#endif
|
||
|
||
namespace {
|
||
|
||
/// Runs closures on a thread pool in filo order.
|
||
class ThreadPoolExecutor {
|
||
public:
|
||
explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
|
||
if (S.UseJobserver)
|
||
TheJobserver = JobserverClient::getInstance();
|
||
|
||
ThreadCount = S.compute_thread_count();
|
||
// Spawn all but one of the threads in another thread as spawning threads
|
||
// can take a while.
|
||
Threads.reserve(ThreadCount);
|
||
Threads.resize(1);
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
// Use operator[] before creating the thread to avoid data race in .size()
|
||
// in 'safe libc++' mode.
|
||
auto &Thread0 = Threads[0];
|
||
Thread0 = std::thread([this, S] {
|
||
for (unsigned I = 1; I < ThreadCount; ++I) {
|
||
Threads.emplace_back([this, S, I] { work(S, I); });
|
||
if (Stop)
|
||
break;
|
||
}
|
||
ThreadsCreated.set_value();
|
||
work(S, 0);
|
||
});
|
||
}
|
||
|
||
// To make sure the thread pool executor can only be created with a parallel
|
||
// strategy.
|
||
ThreadPoolExecutor() = delete;
|
||
|
||
void stop() {
|
||
{
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
if (Stop)
|
||
return;
|
||
Stop = true;
|
||
}
|
||
Cond.notify_all();
|
||
ThreadsCreated.get_future().wait();
|
||
|
||
std::thread::id CurrentThreadId = std::this_thread::get_id();
|
||
for (std::thread &T : Threads)
|
||
if (T.get_id() == CurrentThreadId)
|
||
T.detach();
|
||
else
|
||
T.join();
|
||
}
|
||
|
||
~ThreadPoolExecutor() { stop(); }
|
||
|
||
struct Creator {
|
||
static void *call() { return new ThreadPoolExecutor(strategy); }
|
||
};
|
||
struct Deleter {
|
||
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
|
||
};
|
||
|
||
struct WorkItem {
|
||
std::function<void()> F;
|
||
std::reference_wrapper<parallel::detail::Latch> L;
|
||
void operator()() {
|
||
F();
|
||
L.get().dec();
|
||
}
|
||
};
|
||
|
||
void add(std::function<void()> F, parallel::detail::Latch &L) {
|
||
{
|
||
std::lock_guard<std::mutex> Lock(Mutex);
|
||
WorkStack.push_back({std::move(F), std::ref(L)});
|
||
}
|
||
Cond.notify_one();
|
||
}
|
||
|
||
// Execute tasks from the work queue until the latch reaches zero.
|
||
// Used by nested TaskGroups (on worker threads) to prevent deadlock:
|
||
// instead of blocking in sync(), actively help drain the queue.
|
||
void helpSync(const parallel::detail::Latch &L) {
|
||
while (L.getCount() != 0) {
|
||
std::unique_lock<std::mutex> Lock(Mutex);
|
||
if (Stop || WorkStack.empty())
|
||
return;
|
||
popAndRun(Lock);
|
||
}
|
||
}
|
||
|
||
size_t getThreadCount() const { return ThreadCount; }
|
||
|
||
private:
|
||
// Pop one task from the queue and run it. Must be called with Lock held;
|
||
// releases Lock before executing the task.
|
||
void popAndRun(std::unique_lock<std::mutex> &Lock) {
|
||
auto Item = std::move(WorkStack.back());
|
||
WorkStack.pop_back();
|
||
Lock.unlock();
|
||
Item();
|
||
}
|
||
|
||
void work(ThreadPoolStrategy S, unsigned ThreadID) {
|
||
threadIndex = ThreadID;
|
||
S.apply_thread_strategy(ThreadID);
|
||
// Note on jobserver deadlock avoidance:
|
||
// GNU Make grants each invoked process one implicit job slot. Our
|
||
// JobserverClient models this by returning an implicit JobSlot on the
|
||
// first successful tryAcquire() in a process. This guarantees forward
|
||
// progress without requiring a dedicated "always-on" thread here.
|
||
|
||
while (true) {
|
||
if (TheJobserver) {
|
||
// Jobserver-mode scheduling:
|
||
// - Acquire one job slot (with exponential backoff to avoid busy-wait).
|
||
// - While holding the slot, drain and run tasks from the local queue.
|
||
// - Release the slot when the queue is empty or when shutting down.
|
||
// Rationale: Holding a slot amortizes acquire/release overhead over
|
||
// multiple tasks and avoids requeue/yield churn, while still enforcing
|
||
// the jobserver’s global concurrency limit. With K available slots,
|
||
// up to K workers run tasks in parallel; within each worker tasks run
|
||
// sequentially until the local queue is empty.
|
||
ExponentialBackoff Backoff(std::chrono::hours(24));
|
||
JobSlot Slot;
|
||
do {
|
||
if (Stop)
|
||
return;
|
||
Slot = TheJobserver->tryAcquire();
|
||
if (Slot.isValid())
|
||
break;
|
||
} while (Backoff.waitForNextAttempt());
|
||
|
||
llvm::scope_exit SlotReleaser(
|
||
[&] { TheJobserver->release(std::move(Slot)); });
|
||
|
||
while (true) {
|
||
std::unique_lock<std::mutex> Lock(Mutex);
|
||
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
|
||
if (Stop && WorkStack.empty())
|
||
return;
|
||
if (WorkStack.empty())
|
||
break;
|
||
popAndRun(Lock);
|
||
}
|
||
} else {
|
||
std::unique_lock<std::mutex> Lock(Mutex);
|
||
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
|
||
if (Stop)
|
||
break;
|
||
popAndRun(Lock);
|
||
}
|
||
}
|
||
}
|
||
|
||
std::atomic<bool> Stop{false};
|
||
std::vector<WorkItem> WorkStack;
|
||
std::mutex Mutex;
|
||
std::condition_variable Cond;
|
||
std::promise<void> ThreadsCreated;
|
||
std::vector<std::thread> Threads;
|
||
unsigned ThreadCount;
|
||
|
||
JobserverClient *TheJobserver = nullptr;
|
||
};
|
||
} // namespace
|
||
|
||
static ThreadPoolExecutor *getDefaultExecutor() {
|
||
#ifdef _WIN32
|
||
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
|
||
// llvm_shutdown() on Windows. This is important to avoid various race
|
||
// conditions at process exit that can cause crashes or deadlocks.
|
||
|
||
static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
|
||
ThreadPoolExecutor::Deleter>
|
||
ManagedExec;
|
||
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
|
||
return Exec.get();
|
||
#else
|
||
// ManagedStatic is not desired on other platforms. When `Exec` is destroyed
|
||
// by llvm_shutdown(), worker threads will clean up and invoke TLS
|
||
// destructors. This can lead to race conditions if other threads attempt to
|
||
// access TLS objects that have already been destroyed.
|
||
static ThreadPoolExecutor Exec(strategy);
|
||
return &Exec;
|
||
#endif
|
||
}
|
||
|
||
size_t parallel::getThreadCount() {
|
||
return getDefaultExecutor()->getThreadCount();
|
||
}
|
||
#endif
|
||
|
||
static bool isNested() {
|
||
#if LLVM_ENABLE_THREADS
|
||
return threadIndex != UINT_MAX;
|
||
#else
|
||
return false;
|
||
#endif
|
||
}
|
||
|
||
TaskGroup::TaskGroup()
|
||
: Parallel(
|
||
#if LLVM_ENABLE_THREADS
|
||
strategy.ThreadsRequested != 1
|
||
#else
|
||
false
|
||
#endif
|
||
) {
|
||
}
|
||
|
||
TaskGroup::~TaskGroup() {
|
||
#if LLVM_ENABLE_THREADS
|
||
// In a nested TaskGroup (threadIndex != -1u), actively help drain the queue.
|
||
if (Parallel && isNested())
|
||
getDefaultExecutor()->helpSync(L);
|
||
#endif
|
||
L.sync();
|
||
}
|
||
|
||
void TaskGroup::spawn(std::function<void()> F) {
|
||
#if LLVM_ENABLE_THREADS
|
||
if (Parallel) {
|
||
L.inc();
|
||
getDefaultExecutor()->add(std::move(F), L);
|
||
return;
|
||
}
|
||
#endif
|
||
F();
|
||
}
|
||
|
||
void llvm::parallelFor(size_t Begin, size_t End,
|
||
function_ref<void(size_t)> Fn) {
|
||
#if LLVM_ENABLE_THREADS
|
||
if (strategy.ThreadsRequested != 1) {
|
||
size_t NumItems = End - Begin;
|
||
if (NumItems == 0)
|
||
return;
|
||
// Distribute work via an atomic counter shared by NumWorkers threads,
|
||
// keeping the task count (and thus Linux futex calls) at O(ThreadCount)
|
||
// For lld, per-file work is somewhat uneven, so a multipler > 1 is safer.
|
||
// While 2 vs 4 vs 8 makes no measurable difference, 4 is used as a
|
||
// reasonable default.
|
||
size_t NumWorkers = std::min<size_t>(NumItems, getThreadCount());
|
||
size_t ChunkSize = std::max(size_t(1), NumItems / (NumWorkers * 4));
|
||
std::atomic<size_t> Idx{Begin};
|
||
auto Worker = [&] {
|
||
while (true) {
|
||
size_t I = Idx.fetch_add(ChunkSize, std::memory_order_relaxed);
|
||
if (I >= End)
|
||
break;
|
||
size_t IEnd = std::min(I + ChunkSize, End);
|
||
for (; I < IEnd; ++I)
|
||
Fn(I);
|
||
}
|
||
};
|
||
|
||
TaskGroup TG;
|
||
for (size_t I = 0; I != NumWorkers; ++I)
|
||
TG.spawn(Worker);
|
||
return;
|
||
}
|
||
#endif
|
||
|
||
for (; Begin != End; ++Begin)
|
||
Fn(Begin);
|
||
}
|