IcingaDB: use polymorphism for queue entries

This commit is contained in:
Julian Brost 2025-12-05 16:49:41 +01:00 committed by Yonas Habteab
parent 7b923149b8
commit 6695294d70
2 changed files with 132 additions and 146 deletions

View file

@ -6,27 +6,23 @@
using namespace icinga;
PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits)
: DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()}
PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
: Object{obj}, DirtyBits{bits & DirtyBitsAll}
{
}
PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
: PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj}
{
}
PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup)
: PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup}
: DepGroup{depGroup}
{
}
PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
: PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child}
: DepGroup{depGroup}, Child{child}
{
}
RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeyMap relations)
: PendingQueueItem{id, 0}, Relations{std::move(relations)}
RelationsDeletionItem::RelationsDeletionItem(const String& id, const RelationsKeyMap& relations)
: ID{id}, Relations{relations}
{
}
@ -103,7 +99,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
auto& seqView = m_PendingItems.get<1>();
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
if (it != seqView.begin()) {
if (std::holds_alternative<RelationsDeletionItem>(*it)) {
if (dynamic_cast<const RelationsDeletionItem*>(it->get())) {
// We don't know whether the previous items are related to this deletion item or not,
// thus we can't just process this right now when there are older items in the queue.
// Otherwise, we might delete something that is going to be updated/created.
@ -111,7 +107,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
}
}
auto age = now - std::visit([](const auto& item) { return item.EnqueueTime; }, *it);
auto age = now - (*it)->EnqueueTime;
if (GetActive() && 1000ms > age) {
if (it == seqView.begin()) {
retryAfter = 1000ms - age;
@ -119,26 +115,23 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
break;
}
ConfigObject::Ptr cobj;
if (auto* citem = std::get_if<PendingConfigItem>(&*it); citem) {
cobj = citem->Object;
}
ConfigObject::Ptr cobj = (*it)->GetObjectToLock();
ObjectLock olock(cobj, std::defer_lock);
if (cobj && !olock.TryLock()) {
continue; // Can't lock the object right now, try the next one.
}
PendingItemVariant itemToProcess = *it;
auto itemToProcess = *it;
seqView.erase(it);
madeProgress = true;
lock.unlock();
try {
std::visit([this](const auto& item) { ProcessPendingItem(item); }, itemToProcess);
itemToProcess->Execute(*this);
} catch (const std::exception& ex) {
PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects).
Log(LogCritical, "IcingaDB")
<< "Exception while processing pending item of type index '" << itemToProcess.index() << "': "
<< "Exception while processing pending item of type '" << typeid(itemRef).name() << "': "
<< DiagnosticInformation(ex, GetActive());
}
lock.lock();
@ -152,24 +145,27 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
return retryAfter;
}
/**
* Process a single pending object.
*
* This function processes a single pending object based on its dirty bits. It checks if the object is a
* @c ConfigObject and performs the appropriate actions such as sending configuration updates, state updates,
* or deletions to the Redis connection. The function handles different types of objects, including @c Checkable
* objects, and ensures that the correct updates are sent based on the dirty bits set for the object.
*
* @param item The pending item containing the object and its dirty bits.
*/
void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
ConfigObject::Ptr PendingConfigItem::GetObjectToLock() const
{
if (item.DirtyBits & ConfigDelete) {
String typeName = GetLowerCaseTypeNameDB(item.Object);
m_RconWorker->FireAndForgetQueries(
return Object;
}
/**
* Execute the pending configuration item.
*
* This function processes the pending configuration item by performing the necessary Redis operations based
* on the dirty bits set for the associated configuration object. It handles configuration deletions, updates,
* and state updates for checkable objects.
*
* @param icingadb The IcingaDB instance to use for executing Redis queries.
*/
void PendingConfigItem::Execute(IcingaDB& icingadb) const {
if (DirtyBits & ConfigDelete) {
String typeName = icingadb.GetLowerCaseTypeNameDB(Object);
icingadb.m_RconWorker->FireAndForgetQueries(
{
{"HDEL", m_PrefixConfigObject + typeName, GetObjectIdentifier(item.Object)},
{"HDEL", m_PrefixConfigCheckSum + typeName, GetObjectIdentifier(item.Object)},
{"HDEL", icingadb.m_PrefixConfigObject + typeName, icingadb.GetObjectIdentifier(Object)},
{"HDEL", icingadb.m_PrefixConfigCheckSum + typeName, icingadb.GetObjectIdentifier(Object)},
{
"XADD",
"icinga:runtime",
@ -178,9 +174,9 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
"1000000",
"*",
"redis_key",
m_PrefixConfigObject + typeName,
icingadb.m_PrefixConfigObject + typeName,
"id",
GetObjectIdentifier(item.Object),
icingadb.GetObjectIdentifier(Object),
"runtime_type",
"delete"
}
@ -188,78 +184,74 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
);
}
if (item.DirtyBits & ConfigUpdate) {
if (DirtyBits & ConfigUpdate) {
std::map<String, std::vector<String>> hMSets;
std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(item.Object, GetLowerCaseTypeNameDB(item.Object), hMSets, runtimeUpdates, true);
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
icingadb.CreateConfigUpdate(Object, icingadb.GetLowerCaseTypeNameDB(Object), hMSets, runtimeUpdates, true);
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
}
if (auto checkable = dynamic_pointer_cast<Checkable>(item.Object); checkable) {
if (item.DirtyBits & FullState) {
UpdateState(checkable, item.DirtyBits);
if (auto checkable = dynamic_pointer_cast<Checkable>(Object); checkable) {
if (DirtyBits & FullState) {
icingadb.UpdateState(checkable, DirtyBits);
}
if (item.DirtyBits & NextUpdate) {
SendNextUpdate(checkable);
if (DirtyBits & NextUpdate) {
icingadb.SendNextUpdate(checkable);
}
}
}
/**
* Process a single pending dependency group state item.
* Execute the pending dependency group state item.
*
* This function processes a single pending dependency group state item by updating the dependencies
* state for the associated dependency group. It selects any child checkable from the dependency group
* to initiate the state update process.
* This function processes the pending dependency group state item by updating the state of the
* dependency group in Redis. It selects any child checkable from the dependency group to initiate
* the state update, as all children share the same dependency group state.
*
* @param item The pending dependency group state item containing the dependency group.
* @param icingadb The IcingaDB instance to use for executing Redis queries.
*/
void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const
void PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
{
// For dependency group state updates, we don't actually care which child triggered the update,
// since all children share the same dependency group state. Thus, we can just pick any child to
// start the update from.
if (auto child = item.DepGroup->GetAnyChild(); child) {
UpdateDependenciesState(child, item.DepGroup);
if (auto child = DepGroup->GetAnyChild(); child) {
icingadb.UpdateDependenciesState(child, DepGroup);
}
}
/**
* Process a single pending dependency edge item.
* Execute the pending dependency edge item.
*
* This function fully serializes a single pending dependency edge item (child registration)
* and sends all the resulting Redis queries in a single transaction. The dependencies (edges)
* to serialize are determined by the dependency group and child checkable the provided item represents.
* This function processes the pending dependency edge item and ensures that the necessary Redis
* operations are performed to register the child checkable as part of the dependency group.
*
* @param item The pending dependency edge item containing the dependency group and child checkable.
* @param icingadb The IcingaDB instance to use for executing Redis queries.
*/
void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item)
void PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
{
std::vector<Dictionary::Ptr> runtimeUpdates;
std::map<String, RedisConnection::Query> hMSets;
InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup);
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup);
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
}
/**
* Process a single pending deletion item.
* Execute the pending relations deletion item.
*
* This function processes a single pending deletion item by deleting the specified sub-keys
* from Redis based on the provided deletion keys map. It ensures that the object's ID is
* removed from the specified Redis keys and their corresponding checksum keys if indicated.
* This function processes the pending relations deletion item by deleting the specified relations
* from Redis. It iterates over the map of Redis keys and deletes the relations associated with
* the given ID.
*
* @param item The pending deletion item containing the ID and deletion keys map.
* @param icingadb The IcingaDB instance to use for executing Redis queries.
*/
void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item)
void RelationsDeletionItem::Execute(IcingaDB& icingadb) const
{
ASSERT(std::holds_alternative<std::string>(item.ID)); // Relation deletion items must have real IDs.
auto id = std::get<std::string>(item.ID);
for (auto [redisKey, hasChecksum] : item.Relations) {
if (IsStateKey(redisKey)) {
DeleteState(id, redisKey, hasChecksum);
for (auto [redisKey, hasChecksum] : Relations) {
if (icingadb.IsStateKey(redisKey)) {
icingadb.DeleteState(ID, redisKey, hasChecksum);
} else {
DeleteRelationship(id, redisKey, hasChecksum);
icingadb.DeleteRelationship(ID, redisKey, hasChecksum);
}
}
}
@ -278,24 +270,20 @@ void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bit
{
std::lock_guard lock(m_PendingItemsMutex);
if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) {
m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) mutable {
std::visit(
[&bits](auto& item) {
if (bits & ConfigDelete) {
// A config delete and config update cancel each other out, and we don't need
// to keep any state updates either, as the object is being deleted.
item.DirtyBits &= ~(ConfigUpdate | FullState);
bits &= ~(ConfigUpdate | FullState); // Must not add these bits either.
} else if (bits & ConfigUpdate) {
// A new config update cancels any pending config deletion for the same object.
item.DirtyBits &= ~ConfigDelete;
bits &= ~ConfigDelete;
}
item.DirtyBits |= bits & DirtyBitsAll;
},
itemToProcess
);
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<PendingConfigItem>(object, bits)); !inserted) {
m_PendingItems.modify(it, [bits](const std::shared_ptr<PendingQueueItem>& item) mutable {
auto configItem = dynamic_cast<PendingConfigItem*>(item.get());
if (bits & ConfigDelete) {
// A config delete and config update cancel each other out, and we don't need
// to keep any state updates either, as the object is being deleted.
configItem->DirtyBits &= ~(ConfigUpdate | FullState);
bits &= ~(ConfigUpdate | FullState); // Must not add these bits either.
} else if (bits & ConfigUpdate) {
// A new config update cancels any pending config deletion for the same object.
configItem->DirtyBits &= ~ConfigDelete;
bits &= ~ConfigDelete;
}
configItem->DirtyBits |= bits & DirtyBitsAll;
});
}
}
@ -307,7 +295,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
{
std::lock_guard lock(m_PendingItemsMutex);
m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup});
m_PendingItems.insert(std::make_shared<PendingDependencyGroupStateItem>(depGroup));
}
m_PendingItemsCV.notify_one();
}
@ -327,7 +315,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
{
std::lock_guard lock(m_PendingItemsMutex);
m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child});
m_PendingItems.insert(std::make_shared<PendingDependencyEdgeItem>(depGroup, child));
}
m_PendingItemsCV.notify_one();
}
@ -469,7 +457,7 @@ void IcingaDB::EnqueueDependencyChildRemoved(
* @param id The ID of the relation to be deleted.
* @param relations A map of Redis keys from which to delete the relation.
*/
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations)
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeyMap& relations)
{
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
return; // No need to enqueue anything if we're not connected.
@ -477,10 +465,10 @@ void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap&
{
std::lock_guard lock(m_PendingItemsMutex);
if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) {
m_PendingItems.modify(it, [&relations](PendingItemVariant& val) {
auto& item = std::get<RelationsDeletionItem>(val);
item.Relations.insert(relations.begin(), relations.end());
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<RelationsDeletionItem>(id, relations)); !inserted) {
m_PendingItems.modify(it, [&relations](std::shared_ptr<PendingQueueItem>& val) {
auto item = dynamic_cast<RelationsDeletionItem*>(val.get());
item->Relations.insert(relations.begin(), relations.end());
});
}
}

View file

@ -16,6 +16,7 @@
#include "icinga/downtime.hpp"
#include "remote/messageorigin.hpp"
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <atomic>
@ -121,18 +122,6 @@ enum DirtyBits : uint32_t
DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate
};
/**
* A variant type representing the identifier of a pending item.
*
* This variant can hold either a string representing a real Redis hash key or a pair consisting of
* a configuration object pointer and a dependency group pointer. A pending item identified by the
* latter variant type operates primarily on the associated configuration object or dependency group,
* thus the pairs are used for uniqueness in the pending items container.
*
* @ingroup icingadb
*/
using PendingItemKey = std::variant<std::string /* Redis hash keys */, std::pair<ConfigObject::Ptr, DependencyGroup::Ptr>>;
/**
* A pending queue item.
*
@ -146,11 +135,23 @@ using PendingItemKey = std::variant<std::string /* Redis hash keys */, std::pair
*/
struct PendingQueueItem
{
uint32_t DirtyBits;
PendingItemKey ID;
const std::chrono::steady_clock::time_point EnqueueTime;
/**
* A variant type representing the identifier of a pending item.
*
* This variant can hold either a string representing a real Redis hash key or a pair consisting of
* a configuration object pointer and a dependency group pointer. A pending item identified by the
* latter variant type operates primarily on the associated configuration object or dependency group,
* thus the pairs are used for uniqueness in the pending items container.
*/
using Key = std::variant<std::string /* Redis hash keys */, std::pair<ConfigObject::Ptr, DependencyGroup::Ptr>>;
PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits);
virtual ~PendingQueueItem() = default;
const std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()};
virtual Key GetID() const = 0;
virtual ConfigObject::Ptr GetObjectToLock() const { return nullptr; };
virtual void Execute(IcingaDB& icingadb) const = 0;
};
/**
@ -166,8 +167,13 @@ struct PendingQueueItem
struct PendingConfigItem : PendingQueueItem
{
ConfigObject::Ptr Object;
uint32_t DirtyBits;
PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits);
Key GetID() const override { return std::make_pair(Object, nullptr); }
ConfigObject::Ptr GetObjectToLock() const override;
void Execute(IcingaDB& icingadb) const override;
};
/**
@ -185,6 +191,9 @@ struct PendingDependencyGroupStateItem : PendingQueueItem
DependencyGroup::Ptr DepGroup;
explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup);
Key GetID() const override { return std::make_pair(nullptr, DepGroup.get()); }
void Execute(IcingaDB& icingadb) const override;
};
/**
@ -203,10 +212,10 @@ struct PendingDependencyEdgeItem : PendingQueueItem
Checkable::Ptr Child;
PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
};
// Map of Redis keys to a boolean indicating whether to delete the checksum key as well.
using RelationsKeyMap = std::map<RedisKey, bool /* checksum? */>;
Key GetID() const override { return std::make_pair(Child.get(), DepGroup.get()); }
void Execute(IcingaDB& icingadb) const override;
};
/**
* A pending relations deletion item.
@ -220,9 +229,14 @@ using RelationsKeyMap = std::map<RedisKey, bool /* checksum? */>;
*/
struct RelationsDeletionItem : PendingQueueItem
{
std::string ID;
using RelationsKeyMap = std::map<RedisKey, bool /* checksum? */>;
RelationsKeyMap Relations;
RelationsDeletionItem(const String& id, RelationsKeyMap relations);
RelationsDeletionItem(const String& id, const RelationsKeyMap& relations);
Key GetID() const override { return ID; }
void Execute(IcingaDB& icingadb) const override;
};
/**
@ -464,33 +478,16 @@ private:
static std::unordered_set<Type*> m_IndexedTypes;
// A variant type that can hold any of the pending item types used in the pending items container.
using PendingItemVariant = std::variant<
PendingConfigItem,
PendingDependencyGroupStateItem,
PendingDependencyEdgeItem,
RelationsDeletionItem
>;
struct PendingItemKeyExtractor
{
// The type of the key extracted from a pending item required by Boost.MultiIndex.
using result_type = const PendingItemKey&;
result_type operator()(const PendingItemVariant& item) const
{
return std::visit([](const auto& pendingItem) -> result_type { return pendingItem.ID; }, item);
}
};
// A multi-index container for managing pending items with unique IDs and maintaining insertion order.
// The first index is an ordered unique index based on the pending item key, allowing for efficient
// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the
// order of insertion, enabling FIFO processing of pending items.
using PendingItemsSet = boost::multi_index_container<
PendingItemVariant,
std::shared_ptr<PendingQueueItem>,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<PendingItemKeyExtractor>, // std::variant has operator< defined.
boost::multi_index::ordered_unique<
boost::multi_index::const_mem_fun<PendingQueueItem, PendingQueueItem::Key, &PendingQueueItem::GetID>
>,
boost::multi_index::sequenced<>
>
>;
@ -502,16 +499,17 @@ private:
void PendingItemsThreadProc();
std::chrono::duration<double> DequeueAndProcessOne(std::unique_lock<std::mutex>& lock);
void ProcessPendingItem(const PendingConfigItem& item);
void ProcessPendingItem(const PendingDependencyGroupStateItem& item) const;
void ProcessPendingItem(const PendingDependencyEdgeItem& item);
void ProcessPendingItem(const RelationsDeletionItem& item);
void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits);
void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup);
void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector<Dependency::Ptr>& dependencies, bool removeGroup);
void EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations);
void EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeyMap& relations);
friend struct PendingConfigItem;
friend struct PendingDependencyGroupStateItem;
friend struct PendingDependencyEdgeItem;
friend struct RelationsDeletionItem;
};
}