This commit is contained in:
Alexander Aleksandrovič Klimov 2026-02-03 15:23:57 +01:00 committed by GitHub
commit 7ecbb4e924
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 155 additions and 60 deletions

View file

@ -16,60 +16,116 @@
using namespace icinga;
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
/**
* Acquires a slot for CPU-bound work.
*
* If and as long as the lock-free TryAcquireSlot() doesn't succeed,
* subscribes to the slow path by waiting on a condition variable.
* It is woken up by Done() which is called by the destructor.
*
* @param yc Needed to asynchronously wait for the condition variable.
* @param strand Where to post the wake-up of the condition variable.
*/
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());
VERIFY(strand.running_in_this_thread());
for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
auto& ie (IoEngine::Get());
Shared<AsioConditionVariable>::Ptr cv;
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
while (!TryAcquireSlot()) {
if (!cv) {
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());
}
break;
{
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
// The above lines may take a little bit, so let's optimistically re-check.
// Also mitigate lost wake-ups by re-checking during the lock:
//
// During our lock, Done() can't retrieve the subscribers to wake up,
// so any ongoing wake-up is either done at this point or has not started yet.
// If such a wake-up is done, it's a lost wake-up to us unless we re-check here
// whether the slot being freed (just before the wake-up) is still available.
if (TryAcquireSlot()) {
break;
}
// If the (hypothetical) slot mentioned above was taken by another coroutine,
// there are no free slots again, just as if no wake-ups happened just now.
ie.m_CpuBoundWaiting.emplace_back(strand, cv);
}
cv->Wait(yc);
}
}
CpuBoundWork::~CpuBoundWork()
/**
* Tries to acquire a slot for CPU-bound work.
*
* Specifically, decrements the number of free slots (semaphore) by one,
* but only if it's currently greater than zero.
* Not falling below zero requires an atomic#compare_exchange_weak() loop
* instead of a simple atomic#fetch_sub() call, but it's also atomic.
*
* @return Whether a slot was acquired.
*/
bool CpuBoundWork::TryAcquireSlot()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
auto& ie (IoEngine::Get());
auto freeSlots (ie.m_CpuBoundSemaphore.load());
while (freeSlots > 0u) {
// If ie.m_CpuBoundSemaphore was changed after the last load,
// compare_exchange_weak() will load its latest value into freeSlots for us to retry until...
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1u)) {
// ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ...
return true;
}
}
// ... or it becomes zero due to another coroutine.
return false;
}
/**
* Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done.
*
* Precisely, increments the number of free slots (semaphore) by one.
* Also wakes up all waiting constructors (slow path) if necessary.
*/
void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
m_Done = true;
}
}
IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc)
: yc(yc)
{
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
}
auto& ie (IoEngine::Get());
IoBoundWorkSlot::~IoBoundWorkSlot()
{
auto& ioEngine (IoEngine::Get());
// The constructor takes the slow path only if the semaphore is full,
// so we only have to wake up constructors if the semaphore was full.
// This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed.
if (ie.m_CpuBoundSemaphore.fetch_add(1) == 0u) {
// So now there are only slow path subscribers from just before the fetch_add() to be woken up.
// Precisely, only subscribers from just before the fetch_add() which turned 0 to 1.
for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
decltype(ie.m_CpuBoundWaiting) subscribers;
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
{
// Locking after fetch_add() is safe because a delayed wake-up is fine.
// Wake-up of constructors which subscribed after the fetch_add() is also not a problem.
// In worst case, they will just re-subscribe to the slow path.
// Lost wake-ups are mitigated by the constructor, see its implementation comments.
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
std::swap(subscribers, ie.m_CpuBoundWaiting);
}
// Again, a delayed wake-up is fine, hence unlocked.
for (auto& [strand, cv] : subscribers) {
boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); });
}
}
break;
}
}
@ -85,9 +141,8 @@ boost::asio::io_context& IoEngine::GetIoContext()
return m_IoContext;
}
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u))
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
for (auto& thread : m_Threads) {
@ -171,6 +226,30 @@ void AsioDualEvent::WaitForClear(boost::asio::yield_context yc)
m_IsFalse.Wait(std::move(yc));
}
AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io)
: m_Timer(io)
{
m_Timer.expires_at(boost::posix_time::pos_infin);
}
void AsioConditionVariable::Wait(boost::asio::yield_context yc)
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
}
bool AsioConditionVariable::NotifyOne()
{
boost::system::error_code ec;
return m_Timer.cancel_one(ec);
}
size_t AsioConditionVariable::NotifyAll()
{
boost::system::error_code ec;
return m_Timer.cancel(ec);
}
/**
* Cancels any pending timeout callback.
*

View file

@ -12,6 +12,7 @@
#include <atomic>
#include <exception>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
@ -20,6 +21,7 @@
#include <boost/exception/all.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/spawn.hpp>
#if BOOST_VERSION >= 108700
@ -37,36 +39,41 @@ namespace icinga
class CpuBoundWork
{
public:
CpuBoundWork(boost::asio::yield_context yc);
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
CpuBoundWork& operator=(CpuBoundWork&&) = delete;
~CpuBoundWork();
inline ~CpuBoundWork()
{
Done();
}
void Done();
private:
static bool TryAcquireSlot();
bool m_Done;
};
/**
* Scope break for CPU-bound work done in an I/O thread
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class IoBoundWorkSlot
class AsioConditionVariable
{
public:
IoBoundWorkSlot(boost::asio::yield_context yc);
IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
~IoBoundWorkSlot();
AsioConditionVariable(boost::asio::io_context& io);
void Wait(boost::asio::yield_context yc);
bool NotifyOne();
size_t NotifyAll();
private:
boost::asio::yield_context yc;
boost::asio::deadline_timer m_Timer;
};
/**
@ -77,7 +84,6 @@ private:
class IoEngine
{
friend CpuBoundWork;
friend IoBoundWorkSlot;
public:
IoEngine(const IoEngine&) = delete;
@ -133,12 +139,6 @@ public:
#endif // BOOST_VERSION >= 108700
}
static inline
void YieldCurrentCoroutine(boost::asio::yield_context yc)
{
Get().m_AlreadyExpiredTimer.async_wait(yc);
}
private:
IoEngine();
@ -149,8 +149,10 @@ private:
boost::asio::io_context m_IoContext;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
std::atomic_uint_fast32_t m_CpuBoundSemaphore;
std::mutex m_CpuBoundWaitingMutex;
std::vector<std::pair<boost::asio::io_context::strand, Shared<AsioConditionVariable>::Ptr>> m_CpuBoundWaiting;
};
class TerminateIoThread : public std::exception

View file

@ -93,8 +93,6 @@ bool EventsHandler::HandleRequest(
EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
IoBoundWorkSlot dontLockTheIoThread (yc);
response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
response.StartStreaming(true);

View file

@ -197,6 +197,12 @@ void HttpApiResponse::StartStreaming(bool checkForDisconnect)
OutgoingHttpMessage::StartStreaming();
if (checkForDisconnect) {
auto work (m_CpuBoundWork.lock());
if (work) {
work->Done();
}
ASSERT(m_Server);
m_Server->StartDetectClientSideShutdown();
}

View file

@ -3,6 +3,7 @@
#pragma once
#include "base/dictionary.hpp"
#include "base/io-engine.hpp"
#include "base/json.hpp"
#include "base/tlsstream.hpp"
#include "remote/apiuser.hpp"
@ -10,6 +11,7 @@
#include "remote/url.hpp"
#include <boost/beast/http.hpp>
#include <boost/version.hpp>
#include <memory>
#include <utility>
namespace icinga {
@ -327,8 +329,14 @@ public:
*/
[[nodiscard]] bool IsClientDisconnected() const;
void SetCpuBoundWork(std::weak_ptr<CpuBoundWork> cbw)
{
m_CpuBoundWork = std::move(cbw);
}
private:
HttpServerConnection::Ptr m_Server;
std::weak_ptr<CpuBoundWork> m_CpuBoundWork;
};
// More general instantiations

View file

@ -417,15 +417,17 @@ void ProcessRequest(
HttpApiResponse& response,
const WaitGroup::Ptr& waitGroup,
std::chrono::steady_clock::duration& cpuBoundWorkTime,
boost::asio::yield_context& yc
boost::asio::yield_context& yc,
boost::asio::io_context::strand& strand
)
{
try {
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
auto start (std::chrono::steady_clock::now());
CpuBoundWork handlingRequest (yc);
auto handlingRequest (std::make_shared<CpuBoundWork>(yc, strand));
cpuBoundWorkTime = std::chrono::steady_clock::now() - start;
response.SetCpuBoundWork(handlingRequest);
HttpHandler::ProcessRequest(waitGroup, request, response, yc);
response.body().Finish();
} catch (const std::exception& ex) {
@ -535,7 +537,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
m_Seen = ch::steady_clock::time_point::max();
ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc);
ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc, m_IoStrand);
if (!request.keep_alive() || !m_ConnectionReusable) {
break;

View file

@ -91,7 +91,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
auto start (ch::steady_clock::now());
try {
CpuBoundWork handleMessage (yc);
CpuBoundWork handleMessage (yc, m_IoStrand);
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;