Files
llvm-project/llvm/lib/Support/Parallel.cpp
Fangrui Song 8daaa26efd [Support] Support nested parallel TaskGroup via work-stealing (#189293)
Nested TaskGroups run serially to prevent deadlock, as documented by
https://reviews.llvm.org/D61115 and refined by
https://reviews.llvm.org/D148984 to use threadIndex.

Enable nested parallelism by having worker threads actively execute
tasks from the work queue while waiting (work-stealing), instead of
just blocking. Root-level TaskGroups (main thread) keep the efficient
blocking Latch::sync(), so there is no overhead for the common
non-nested case.

In lld, https://reviews.llvm.org/D131247 worked around the limitation
by passing a single root TaskGroup into OutputSection::writeTo and
spawning 4MB-chunked tasks into it. However, SyntheticSection::writeTo
calls with internal parallelism (e.g. GdbIndexSection,
MergeNoTailSection) still ran serially on worker threads. With this
change, their internal parallelFor/parallelForEach calls parallelize
automatically via helpSync work-stealing.

The increased parallelism can reorder error messages from parallel
phases (e.g. relocation processing during section writes), so one lld
test is updated to use --threads=1 for deterministic output.
2026-04-01 19:20:16 -07:00

304 lines
8.8 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "llvm/Support/Parallel.h"
#include "llvm/ADT/ScopeExit.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/ExponentialBackoff.h"
#include "llvm/Support/Jobserver.h"
#include "llvm/Support/ManagedStatic.h"
#include "llvm/Support/Threading.h"
#include <atomic>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
using namespace llvm;
using namespace llvm::parallel;
llvm::ThreadPoolStrategy parallel::strategy;
#if LLVM_ENABLE_THREADS
#ifdef _WIN32
static thread_local unsigned threadIndex = UINT_MAX;
unsigned parallel::getThreadIndex() { GET_THREAD_INDEX_IMPL; }
#else
thread_local unsigned parallel::threadIndex = UINT_MAX;
#endif
namespace {
/// Runs closures on a thread pool in filo order.
class ThreadPoolExecutor {
public:
explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
if (S.UseJobserver)
TheJobserver = JobserverClient::getInstance();
ThreadCount = S.compute_thread_count();
// Spawn all but one of the threads in another thread as spawning threads
// can take a while.
Threads.reserve(ThreadCount);
Threads.resize(1);
std::lock_guard<std::mutex> Lock(Mutex);
// Use operator[] before creating the thread to avoid data race in .size()
// in 'safe libc++' mode.
auto &Thread0 = Threads[0];
Thread0 = std::thread([this, S] {
for (unsigned I = 1; I < ThreadCount; ++I) {
Threads.emplace_back([this, S, I] { work(S, I); });
if (Stop)
break;
}
ThreadsCreated.set_value();
work(S, 0);
});
}
// To make sure the thread pool executor can only be created with a parallel
// strategy.
ThreadPoolExecutor() = delete;
void stop() {
{
std::lock_guard<std::mutex> Lock(Mutex);
if (Stop)
return;
Stop = true;
}
Cond.notify_all();
ThreadsCreated.get_future().wait();
std::thread::id CurrentThreadId = std::this_thread::get_id();
for (std::thread &T : Threads)
if (T.get_id() == CurrentThreadId)
T.detach();
else
T.join();
}
~ThreadPoolExecutor() { stop(); }
struct Creator {
static void *call() { return new ThreadPoolExecutor(strategy); }
};
struct Deleter {
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
};
struct WorkItem {
std::function<void()> F;
std::reference_wrapper<parallel::detail::Latch> L;
void operator()() {
F();
L.get().dec();
}
};
void add(std::function<void()> F, parallel::detail::Latch &L) {
{
std::lock_guard<std::mutex> Lock(Mutex);
WorkStack.push_back({std::move(F), std::ref(L)});
}
Cond.notify_one();
}
// Execute tasks from the work queue until the latch reaches zero.
// Used by nested TaskGroups (on worker threads) to prevent deadlock:
// instead of blocking in sync(), actively help drain the queue.
void helpSync(const parallel::detail::Latch &L) {
while (L.getCount() != 0) {
std::unique_lock<std::mutex> Lock(Mutex);
if (Stop || WorkStack.empty())
return;
popAndRun(Lock);
}
}
size_t getThreadCount() const { return ThreadCount; }
private:
// Pop one task from the queue and run it. Must be called with Lock held;
// releases Lock before executing the task.
void popAndRun(std::unique_lock<std::mutex> &Lock) {
auto Item = std::move(WorkStack.back());
WorkStack.pop_back();
Lock.unlock();
Item();
}
void work(ThreadPoolStrategy S, unsigned ThreadID) {
threadIndex = ThreadID;
S.apply_thread_strategy(ThreadID);
// Note on jobserver deadlock avoidance:
// GNU Make grants each invoked process one implicit job slot. Our
// JobserverClient models this by returning an implicit JobSlot on the
// first successful tryAcquire() in a process. This guarantees forward
// progress without requiring a dedicated "always-on" thread here.
while (true) {
if (TheJobserver) {
// Jobserver-mode scheduling:
// - Acquire one job slot (with exponential backoff to avoid busy-wait).
// - While holding the slot, drain and run tasks from the local queue.
// - Release the slot when the queue is empty or when shutting down.
// Rationale: Holding a slot amortizes acquire/release overhead over
// multiple tasks and avoids requeue/yield churn, while still enforcing
// the jobservers global concurrency limit. With K available slots,
// up to K workers run tasks in parallel; within each worker tasks run
// sequentially until the local queue is empty.
ExponentialBackoff Backoff(std::chrono::hours(24));
JobSlot Slot;
do {
if (Stop)
return;
Slot = TheJobserver->tryAcquire();
if (Slot.isValid())
break;
} while (Backoff.waitForNextAttempt());
llvm::scope_exit SlotReleaser(
[&] { TheJobserver->release(std::move(Slot)); });
while (true) {
std::unique_lock<std::mutex> Lock(Mutex);
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
if (Stop && WorkStack.empty())
return;
if (WorkStack.empty())
break;
popAndRun(Lock);
}
} else {
std::unique_lock<std::mutex> Lock(Mutex);
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
if (Stop)
break;
popAndRun(Lock);
}
}
}
std::atomic<bool> Stop{false};
std::vector<WorkItem> WorkStack;
std::mutex Mutex;
std::condition_variable Cond;
std::promise<void> ThreadsCreated;
std::vector<std::thread> Threads;
unsigned ThreadCount;
JobserverClient *TheJobserver = nullptr;
};
} // namespace
static ThreadPoolExecutor *getDefaultExecutor() {
#ifdef _WIN32
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
// llvm_shutdown() on Windows. This is important to avoid various race
// conditions at process exit that can cause crashes or deadlocks.
static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
ThreadPoolExecutor::Deleter>
ManagedExec;
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
return Exec.get();
#else
// ManagedStatic is not desired on other platforms. When `Exec` is destroyed
// by llvm_shutdown(), worker threads will clean up and invoke TLS
// destructors. This can lead to race conditions if other threads attempt to
// access TLS objects that have already been destroyed.
static ThreadPoolExecutor Exec(strategy);
return &Exec;
#endif
}
size_t parallel::getThreadCount() {
return getDefaultExecutor()->getThreadCount();
}
#endif
static bool isNested() {
#if LLVM_ENABLE_THREADS
return threadIndex != UINT_MAX;
#else
return false;
#endif
}
TaskGroup::TaskGroup()
: Parallel(
#if LLVM_ENABLE_THREADS
strategy.ThreadsRequested != 1
#else
false
#endif
) {
}
TaskGroup::~TaskGroup() {
#if LLVM_ENABLE_THREADS
// In a nested TaskGroup (threadIndex != -1u), actively help drain the queue.
if (Parallel && isNested())
getDefaultExecutor()->helpSync(L);
#endif
L.sync();
}
void TaskGroup::spawn(std::function<void()> F) {
#if LLVM_ENABLE_THREADS
if (Parallel) {
L.inc();
getDefaultExecutor()->add(std::move(F), L);
return;
}
#endif
F();
}
void llvm::parallelFor(size_t Begin, size_t End,
function_ref<void(size_t)> Fn) {
#if LLVM_ENABLE_THREADS
if (strategy.ThreadsRequested != 1) {
size_t NumItems = End - Begin;
if (NumItems == 0)
return;
// Distribute work via an atomic counter shared by NumWorkers threads,
// keeping the task count (and thus Linux futex calls) at O(ThreadCount)
// For lld, per-file work is somewhat uneven, so a multipler > 1 is safer.
// While 2 vs 4 vs 8 makes no measurable difference, 4 is used as a
// reasonable default.
size_t NumWorkers = std::min<size_t>(NumItems, getThreadCount());
size_t ChunkSize = std::max(size_t(1), NumItems / (NumWorkers * 4));
std::atomic<size_t> Idx{Begin};
auto Worker = [&] {
while (true) {
size_t I = Idx.fetch_add(ChunkSize, std::memory_order_relaxed);
if (I >= End)
break;
size_t IEnd = std::min(I + ChunkSize, End);
for (; I < IEnd; ++I)
Fn(I);
}
};
TaskGroup TG;
for (size_t I = 0; I != NumWorkers; ++I)
TG.spawn(Worker);
return;
}
#endif
for (; Begin != End; ++Begin)
Fn(Begin);
}