Files
llvm-project/llvm/unittests/Support/JobserverTest.cpp
Michał Górny 13f7b308d2 [LLVM] Remove the requirement for named pipe in jobserver (#169154)
Remove the requirement that the jobserver "fifo" is actually a named
pipe. Named pipes are essentially stateless, and therefore carry a high
risk of a killed process leaving the server with no tokens left, and no
clear way to reclaim them. Therefore, multiple jobserver implementations
use FUSE instead:

- [nixos-jobserver](https://github.com/NixOS/nixpkgs/pull/314888) (WIP)
uses simple file on FUSE

- [steve](https://gitweb.gentoo.org/proj/steve.git) uses a character
device via CUSE

- [guildmaster](https://codeberg.org/amonakov/guildmaster) uses a
character device via CUSE

This is compatible with GNU make and Ninja, since they do not check the
file type, and seems to be the only solution that can achieve state
tracking while preserving compatibility.

CC @amonakov

---------

Signed-off-by: Michał Górny <mgorny@gentoo.org>
2025-12-16 19:42:00 +01:00

528 lines
17 KiB
C++

//===- llvm/unittest/Support/JobserverTest.cpp ----------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
///
/// \file
/// Jobserver.h unit tests.
///
//===----------------------------------------------------------------------===//
#include "llvm/Support/Jobserver.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Debug.h"
#include "llvm/Support/FileUtilities.h"
#include "llvm/Support/Parallel.h"
#include "llvm/Support/Program.h"
#include "llvm/Support/ThreadPool.h"
#include "llvm/Support/raw_ostream.h"
#include "gtest/gtest.h"
#include <future>
#include <random>
#include <stdlib.h>
#if defined(LLVM_ON_UNIX)
#include "llvm/ADT/SmallString.h"
#include "llvm/Support/FileSystem.h"
#include <atomic>
#include <condition_variable>
#include <fcntl.h>
#include <mutex>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#elif defined(_WIN32)
#include <windows.h>
#endif
#define DEBUG_TYPE "jobserver-test"
using namespace llvm;
// Provided by the unit test main to locate the current test binary.
extern const char *TestMainArgv0;
namespace {
// RAII helper to set an environment variable for the duration of a test.
class ScopedEnvironment {
std::string Name;
std::string OldValue;
bool HadOldValue;
public:
ScopedEnvironment(const char *Name, const char *Value) : Name(Name) {
#if defined(_WIN32)
char *Old = nullptr;
size_t OldLen;
errno_t err = _dupenv_s(&Old, &OldLen, Name);
if (err == 0 && Old != nullptr) {
HadOldValue = true;
OldValue = Old;
free(Old);
} else {
HadOldValue = false;
}
_putenv_s(Name, Value);
#else
const char *Old = getenv(Name);
if (Old) {
HadOldValue = true;
OldValue = Old;
} else {
HadOldValue = false;
}
setenv(Name, Value, 1);
#endif
}
~ScopedEnvironment() {
#if defined(_WIN32)
if (HadOldValue)
_putenv_s(Name.c_str(), OldValue.c_str());
else
// On Windows, setting an environment variable to an empty string
// unsets it, making getenv() return NULL.
_putenv_s(Name.c_str(), "");
#else
if (HadOldValue)
setenv(Name.c_str(), OldValue.c_str(), 1);
else
unsetenv(Name.c_str());
#endif
}
};
TEST(Jobserver, Slot) {
// Default constructor creates an invalid slot.
JobSlot S1;
EXPECT_FALSE(S1.isValid());
EXPECT_FALSE(S1.isImplicit());
// Create an implicit slot.
JobSlot S2 = JobSlot::createImplicit();
EXPECT_TRUE(S2.isValid());
EXPECT_TRUE(S2.isImplicit());
// Create an explicit slot.
JobSlot S3 = JobSlot::createExplicit(42);
EXPECT_TRUE(S3.isValid());
EXPECT_FALSE(S3.isImplicit());
// Test move construction.
JobSlot S4 = std::move(S2);
EXPECT_TRUE(S4.isValid());
EXPECT_TRUE(S4.isImplicit());
EXPECT_FALSE(S2.isValid()); // S2 is now invalid.
// Test move assignment.
S1 = std::move(S3);
EXPECT_TRUE(S1.isValid());
EXPECT_FALSE(S1.isImplicit());
EXPECT_FALSE(S3.isValid()); // S3 is now invalid.
}
// Test fixture for parsing tests to ensure the singleton state is
// reset between each test case.
class JobserverParsingTest : public ::testing::Test {
protected:
void TearDown() override { JobserverClient::resetForTesting(); }
};
TEST_F(JobserverParsingTest, NoMakeflags) {
// No MAKEFLAGS, should be null.
ScopedEnvironment Env("MAKEFLAGS", "");
// On Unix, setting an env var to "" makes getenv() return an empty
// string, not NULL. We must call unsetenv() to test the case where
// the variable is truly not present.
#if !defined(_WIN32)
unsetenv("MAKEFLAGS");
#endif
EXPECT_EQ(JobserverClient::getInstance(), nullptr);
}
TEST_F(JobserverParsingTest, EmptyMakeflags) {
// Empty MAKEFLAGS, should be null.
ScopedEnvironment Env("MAKEFLAGS", "");
EXPECT_EQ(JobserverClient::getInstance(), nullptr);
}
TEST_F(JobserverParsingTest, DryRunFlag) {
// Dry-run flag 'n', should be null.
ScopedEnvironment Env("MAKEFLAGS", "n -j --jobserver-auth=fifo:/tmp/foo");
EXPECT_EQ(JobserverClient::getInstance(), nullptr);
}
// Separate fixture for non-threaded client tests.
class JobserverClientTest : public JobserverParsingTest {};
#if defined(LLVM_ON_UNIX)
// RAII helper to create and clean up a temporary FIFO file.
class ScopedFifo {
SmallString<128> Path;
bool IsValid = false;
public:
ScopedFifo() {
// To get a unique, non-colliding name for a FIFO, we use the
// createTemporaryFile function to reserve a name in the filesystem.
std::error_code EC =
sys::fs::createTemporaryFile("jobserver-test", "fifo", Path);
if (EC)
return;
// Then we immediately remove the regular file it created, but keep the
// unique path.
sys::fs::remove(Path);
// Finally, we create the FIFO at that safe, unique path.
if (mkfifo(Path.c_str(), 0600) != 0)
return;
IsValid = true;
}
~ScopedFifo() {
if (IsValid)
sys::fs::remove(Path);
}
const char *c_str() const { return Path.data(); }
bool isValid() const { return IsValid; }
};
TEST_F(JobserverClientTest, UnixClientFifo) {
// This test covers basic FIFO client creation and behavior with an empty
// FIFO. No job tokens are available.
ScopedFifo F;
ASSERT_TRUE(F.isValid());
// Intentionally inserted \t in environment string.
std::string Makeflags = " \t -j4\t \t--jobserver-auth=fifo:";
Makeflags += F.c_str();
ScopedEnvironment Env("MAKEFLAGS", Makeflags.c_str());
JobserverClient *Client = JobserverClient::getInstance();
ASSERT_NE(Client, nullptr);
// Get the implicit token.
JobSlot S1 = Client->tryAcquire();
EXPECT_TRUE(S1.isValid());
EXPECT_TRUE(S1.isImplicit());
// FIFO is empty, next acquire fails.
JobSlot S2 = Client->tryAcquire();
EXPECT_FALSE(S2.isValid());
// Release does not write to the pipe for the implicit token.
Client->release(std::move(S1));
// Re-acquire the implicit token.
S1 = Client->tryAcquire();
EXPECT_TRUE(S1.isValid());
}
TEST_F(JobserverClientTest, UnixClientNonFifo) {
// This test verifies that non-FIFO jobservers can be used, such as steve
// or guildmaster.
SmallString<64> F;
std::error_code EC =
sys::fs::createTemporaryFile("jobserver-test", "nonfifo", F);
ASSERT_FALSE(EC);
FileRemover Cleanup(F);
// Intentionally inserted \t in environment string.
std::string Makeflags = " \t -j4\t \t--jobserver-auth=fifo:";
Makeflags += F.c_str();
ScopedEnvironment Env("MAKEFLAGS", Makeflags.c_str());
JobserverClient *Client = JobserverClient::getInstance();
ASSERT_NE(Client, nullptr);
// Get the implicit token.
JobSlot S1 = Client->tryAcquire();
EXPECT_TRUE(S1.isValid());
EXPECT_TRUE(S1.isImplicit());
// File is empty, next acquire fails.
JobSlot S2 = Client->tryAcquire();
EXPECT_FALSE(S2.isValid());
// Release does not write to the file for the implicit token.
Client->release(std::move(S1));
// Re-acquire the implicit token.
S1 = Client->tryAcquire();
EXPECT_TRUE(S1.isValid());
}
#if LLVM_ENABLE_THREADS
// Unique anchor whose address helps locate the current test binary.
static int JobserverTestAnchor = 0;
// Test fixture for tests that use the jobserver strategy. It creates a
// temporary FIFO, sets MAKEFLAGS, and provides a helper to pre-load the FIFO
// with job tokens, simulating `make -jN`.
class JobserverStrategyTest : public JobserverParsingTest {
protected:
std::unique_ptr<ScopedFifo> TheFifo;
std::thread MakeThread;
std::atomic<bool> StopMakeThread{false};
// Save and restore the global parallel strategy to avoid interfering with
// other tests in the same process.
ThreadPoolStrategy SavedStrategy;
void SetUp() override {
SavedStrategy = parallel::strategy;
TheFifo = std::make_unique<ScopedFifo>();
ASSERT_TRUE(TheFifo->isValid());
std::string MakeFlags = "--jobserver-auth=fifo:";
MakeFlags += TheFifo->c_str();
setenv("MAKEFLAGS", MakeFlags.c_str(), 1);
}
void TearDown() override {
if (MakeThread.joinable()) {
StopMakeThread = true;
MakeThread.join();
}
unsetenv("MAKEFLAGS");
TheFifo.reset();
// Restore the original strategy to ensure subsequent tests are unaffected.
parallel::strategy = SavedStrategy;
}
// Starts a background thread that emulates `make`. It populates the FIFO
// with initial tokens and then recycles tokens released by clients.
void startMakeProxy(int NumInitialJobs) {
MakeThread = std::thread([this, NumInitialJobs]() {
LLVM_DEBUG(dbgs() << "[MakeProxy] Thread started.\n");
// Open the FIFO for reading and writing. This call does not block.
int RWFd = open(TheFifo->c_str(), O_RDWR);
LLVM_DEBUG(dbgs() << "[MakeProxy] Opened FIFO " << TheFifo->c_str()
<< " with O_RDWR, FD=" << RWFd << "\n");
if (RWFd == -1) {
LLVM_DEBUG(
dbgs()
<< "[MakeProxy] ERROR: Failed to open FIFO with O_RDWR. Errno: "
<< errno << "\n");
return;
}
// Populate with initial jobs.
LLVM_DEBUG(dbgs() << "[MakeProxy] Writing " << NumInitialJobs
<< " initial tokens.\n");
for (int i = 0; i < NumInitialJobs; ++i) {
if (write(RWFd, "+", 1) != 1) {
LLVM_DEBUG(dbgs()
<< "[MakeProxy] ERROR: Failed to write initial token " << i
<< ".\n");
close(RWFd);
return;
}
}
LLVM_DEBUG(dbgs() << "[MakeProxy] Finished writing initial tokens.\n");
// Make the read non-blocking so we can periodically check StopMakeThread.
int flags = fcntl(RWFd, F_GETFL, 0);
fcntl(RWFd, F_SETFL, flags | O_NONBLOCK);
while (!StopMakeThread) {
char Token;
ssize_t Ret = read(RWFd, &Token, 1);
if (Ret == 1) {
LLVM_DEBUG(dbgs() << "[MakeProxy] Read token '" << Token
<< "' to recycle.\n");
// A client released a token, 'make' makes it available again.
std::this_thread::sleep_for(std::chrono::microseconds(100));
ssize_t WRet;
do {
WRet = write(RWFd, &Token, 1);
} while (WRet < 0 && errno == EINTR);
if (WRet <= 0) {
LLVM_DEBUG(
dbgs()
<< "[MakeProxy] ERROR: Failed to write recycled token.\n");
break; // Error, stop the proxy.
}
LLVM_DEBUG(dbgs()
<< "[MakeProxy] Wrote token '" << Token << "' back.\n");
} else if (Ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
LLVM_DEBUG(dbgs() << "[MakeProxy] ERROR: Read failed with errno "
<< errno << ".\n");
break; // Error, stop the proxy.
}
// Yield to prevent this thread from busy-waiting.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
LLVM_DEBUG(dbgs() << "[MakeProxy] Thread stopping.\n");
close(RWFd);
});
// Give the proxy thread a moment to start and populate the FIFO.
// This is a simple way to avoid a race condition where the client starts
// before the initial tokens are in the pipe.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
};
TEST_F(JobserverStrategyTest, ThreadPoolConcurrencyIsLimited) {
// This test simulates `make -j3`. We will have 1 implicit job slot and
// we will add 2 explicit job tokens to the FIFO, for a total of 3.
const int NumExplicitJobs = 2;
const int ConcurrencyLimit = NumExplicitJobs + 1; // +1 for the implicit slot
const int NumTasks = 8; // More tasks than available slots.
LLVM_DEBUG(dbgs() << "Calling startMakeProxy with " << NumExplicitJobs
<< " jobs.\n");
startMakeProxy(NumExplicitJobs);
LLVM_DEBUG(dbgs() << "MakeProxy is running.\n");
// Create the thread pool. Its constructor will call jobserver_concurrency()
// and create a client that reads from our pre-loaded FIFO.
StdThreadPool Pool(jobserver_concurrency());
std::atomic<int> ActiveTasks{0};
std::atomic<int> MaxActiveTasks{0};
std::atomic<int> CompletedTasks{0};
std::mutex M;
std::condition_variable CV;
// Dispatch more tasks than there are job slots. The pool should block
// and only run up to `ConcurrencyLimit` tasks at once.
for (int i = 0; i < NumTasks; ++i) {
Pool.async([&, i] {
// Track the number of concurrently running tasks.
int CurrentActive = ++ActiveTasks;
LLVM_DEBUG(dbgs() << "Task " << i << ": Active tasks: " << CurrentActive
<< "\n");
(void)i;
int OldMax = MaxActiveTasks.load();
while (CurrentActive > OldMax)
MaxActiveTasks.compare_exchange_weak(OldMax, CurrentActive);
std::this_thread::sleep_for(std::chrono::milliseconds(25));
--ActiveTasks;
if (++CompletedTasks == NumTasks) {
std::lock_guard<std::mutex> Lock(M);
CV.notify_one();
}
});
}
// Wait for all tasks to complete.
std::unique_lock<std::mutex> Lock(M);
CV.wait(Lock, [&] { return CompletedTasks == NumTasks; });
LLVM_DEBUG(dbgs() << "Test finished. Max active tasks was " << MaxActiveTasks
<< ".\n");
// The key assertion: the maximum number of concurrent tasks should
// not have exceeded the limit imposed by the jobserver.
EXPECT_LE(MaxActiveTasks, ConcurrencyLimit);
EXPECT_EQ(CompletedTasks, NumTasks);
}
// Parent-side driver that spawns a fresh process to run the child test which
// validates that parallelFor respects the jobserver limit when it is the first
// user of the default executor in that process.
TEST_F(JobserverStrategyTest, ParallelForIsLimited_Subprocess) {
// Mark child execution.
setenv("LLVM_JOBSERVER_TEST_CHILD", "1", 1);
// Find the current test binary and build args to run only the child test.
std::string Executable =
sys::fs::getMainExecutable(TestMainArgv0, &JobserverTestAnchor);
ASSERT_FALSE(Executable.empty()) << "Failed to get main executable path";
SmallVector<StringRef, 4> Args{Executable,
"--gtest_filter=JobserverStrategyTest."
"ParallelForIsLimited_SubprocessChild"};
std::string Error;
bool ExecFailed = false;
int RC = sys::ExecuteAndWait(Executable, Args, std::nullopt, {}, 0, 0, &Error,
&ExecFailed);
unsetenv("LLVM_JOBSERVER_TEST_CHILD");
ASSERT_FALSE(ExecFailed) << Error;
ASSERT_EQ(RC, 0) << "Executable failed with exit code " << RC;
}
// Child-side test: create FIFO and make-proxy in this process, set the
// jobserver strategy, and then run parallelFor.
TEST_F(JobserverStrategyTest, ParallelForIsLimited_SubprocessChild) {
if (!getenv("LLVM_JOBSERVER_TEST_CHILD"))
GTEST_SKIP() << "Not running in child mode";
// This test verifies that llvm::parallelFor respects the jobserver limit.
const int NumExplicitJobs = 3;
const int ConcurrencyLimit = NumExplicitJobs + 1; // +1 implicit
const int NumTasks = 20;
startMakeProxy(NumExplicitJobs);
// Set the global strategy before any default executor is created.
parallel::strategy = jobserver_concurrency();
std::atomic<int> ActiveTasks{0};
std::atomic<int> MaxActiveTasks{0};
parallelFor(0, NumTasks, [&]([[maybe_unused]] int i) {
int CurrentActive = ++ActiveTasks;
int OldMax = MaxActiveTasks.load();
while (CurrentActive > OldMax)
MaxActiveTasks.compare_exchange_weak(OldMax, CurrentActive);
std::this_thread::sleep_for(std::chrono::milliseconds(20));
--ActiveTasks;
});
EXPECT_LE(MaxActiveTasks, ConcurrencyLimit);
}
// Parent-side driver for parallelSort child test.
TEST_F(JobserverStrategyTest, ParallelSortIsLimited_Subprocess) {
setenv("LLVM_JOBSERVER_TEST_CHILD", "1", 1);
std::string Executable =
sys::fs::getMainExecutable(TestMainArgv0, &JobserverTestAnchor);
ASSERT_FALSE(Executable.empty()) << "Failed to get main executable path";
SmallVector<StringRef, 4> Args{Executable,
"--gtest_filter=JobserverStrategyTest."
"ParallelSortIsLimited_SubprocessChild"};
std::string Error;
bool ExecFailed = false;
int RC = sys::ExecuteAndWait(Executable, Args, std::nullopt, {}, 0, 0, &Error,
&ExecFailed);
unsetenv("LLVM_JOBSERVER_TEST_CHILD");
ASSERT_FALSE(ExecFailed) << Error;
ASSERT_EQ(RC, 0) << "Executable failed with exit code " << RC;
}
// Child-side test: ensure parallelSort runs and completes correctly under the
// jobserver strategy when it owns default executor initialization.
TEST_F(JobserverStrategyTest, ParallelSortIsLimited_SubprocessChild) {
if (!getenv("LLVM_JOBSERVER_TEST_CHILD"))
GTEST_SKIP() << "Not running in child mode";
const int NumExplicitJobs = 3;
startMakeProxy(NumExplicitJobs);
parallel::strategy = jobserver_concurrency();
std::vector<int> V(1024);
std::mt19937 randEngine;
std::uniform_int_distribution<int> dist;
for (int &i : V)
i = dist(randEngine);
parallelSort(V.begin(), V.end());
ASSERT_TRUE(llvm::is_sorted(V));
}
#endif // LLVM_ENABLE_THREADS
#endif // defined(LLVM_ON_UNIX)
} // end anonymous namespace