From 6695294d70dee244821f5a169f10029c0da0c5d2 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 5 Dec 2025 16:49:41 +0100 Subject: [PATCH] IcingaDB: use polymorphism for queue entries --- lib/icingadb/icingadb-worker.cpp | 188 +++++++++++++++---------------- lib/icingadb/icingadb.hpp | 90 ++++++++------- 2 files changed, 132 insertions(+), 146 deletions(-) diff --git a/lib/icingadb/icingadb-worker.cpp b/lib/icingadb/icingadb-worker.cpp index 0d4661d58..a67ccc627 100644 --- a/lib/icingadb/icingadb-worker.cpp +++ b/lib/icingadb/icingadb-worker.cpp @@ -6,27 +6,23 @@ using namespace icinga; -PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits) - : DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()} +PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits) + : Object{obj}, DirtyBits{bits & DirtyBitsAll} { } -PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits) - : PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj} -{ -} PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup) - : PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup} + : DepGroup{depGroup} { } PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child) - : PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child} + : DepGroup{depGroup}, Child{child} { } -RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeyMap relations) - : PendingQueueItem{id, 0}, Relations{std::move(relations)} +RelationsDeletionItem::RelationsDeletionItem(const String& id, const RelationsKeyMap& relations) + : ID{id}, Relations{relations} { } @@ -103,7 +99,7 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock(); for (auto it(seqView.begin()); it != seqView.end(); ++it) { if (it != seqView.begin()) { - if (std::holds_alternative(*it)) { + if (dynamic_cast(it->get())) { // We don't know whether the previous items are related to this deletion item or not, // thus we can't just process this right now when there are older items in the queue. // Otherwise, we might delete something that is going to be updated/created. @@ -111,7 +107,7 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lockEnqueueTime; if (GetActive() && 1000ms > age) { if (it == seqView.begin()) { retryAfter = 1000ms - age; @@ -119,26 +115,23 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock(&*it); citem) { - cobj = citem->Object; - } - + ConfigObject::Ptr cobj = (*it)->GetObjectToLock(); ObjectLock olock(cobj, std::defer_lock); if (cobj && !olock.TryLock()) { continue; // Can't lock the object right now, try the next one. } - PendingItemVariant itemToProcess = *it; + auto itemToProcess = *it; seqView.erase(it); madeProgress = true; lock.unlock(); try { - std::visit([this](const auto& item) { ProcessPendingItem(item); }, itemToProcess); + itemToProcess->Execute(*this); } catch (const std::exception& ex) { + PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects). Log(LogCritical, "IcingaDB") - << "Exception while processing pending item of type index '" << itemToProcess.index() << "': " + << "Exception while processing pending item of type '" << typeid(itemRef).name() << "': " << DiagnosticInformation(ex, GetActive()); } lock.lock(); @@ -152,24 +145,27 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lockFireAndForgetQueries( + return Object; +} + +/** + * Execute the pending configuration item. + * + * This function processes the pending configuration item by performing the necessary Redis operations based + * on the dirty bits set for the associated configuration object. It handles configuration deletions, updates, + * and state updates for checkable objects. + * + * @param icingadb The IcingaDB instance to use for executing Redis queries. + */ +void PendingConfigItem::Execute(IcingaDB& icingadb) const { + if (DirtyBits & ConfigDelete) { + String typeName = icingadb.GetLowerCaseTypeNameDB(Object); + icingadb.m_RconWorker->FireAndForgetQueries( { - {"HDEL", m_PrefixConfigObject + typeName, GetObjectIdentifier(item.Object)}, - {"HDEL", m_PrefixConfigCheckSum + typeName, GetObjectIdentifier(item.Object)}, + {"HDEL", icingadb.m_PrefixConfigObject + typeName, icingadb.GetObjectIdentifier(Object)}, + {"HDEL", icingadb.m_PrefixConfigCheckSum + typeName, icingadb.GetObjectIdentifier(Object)}, { "XADD", "icinga:runtime", @@ -178,9 +174,9 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item) "1000000", "*", "redis_key", - m_PrefixConfigObject + typeName, + icingadb.m_PrefixConfigObject + typeName, "id", - GetObjectIdentifier(item.Object), + icingadb.GetObjectIdentifier(Object), "runtime_type", "delete" } @@ -188,78 +184,74 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item) ); } - if (item.DirtyBits & ConfigUpdate) { + if (DirtyBits & ConfigUpdate) { std::map> hMSets; std::vector runtimeUpdates; - CreateConfigUpdate(item.Object, GetLowerCaseTypeNameDB(item.Object), hMSets, runtimeUpdates, true); - ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); + icingadb.CreateConfigUpdate(Object, icingadb.GetLowerCaseTypeNameDB(Object), hMSets, runtimeUpdates, true); + icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates); } - if (auto checkable = dynamic_pointer_cast(item.Object); checkable) { - if (item.DirtyBits & FullState) { - UpdateState(checkable, item.DirtyBits); + if (auto checkable = dynamic_pointer_cast(Object); checkable) { + if (DirtyBits & FullState) { + icingadb.UpdateState(checkable, DirtyBits); } - if (item.DirtyBits & NextUpdate) { - SendNextUpdate(checkable); + if (DirtyBits & NextUpdate) { + icingadb.SendNextUpdate(checkable); } } } /** - * Process a single pending dependency group state item. + * Execute the pending dependency group state item. * - * This function processes a single pending dependency group state item by updating the dependencies - * state for the associated dependency group. It selects any child checkable from the dependency group - * to initiate the state update process. + * This function processes the pending dependency group state item by updating the state of the + * dependency group in Redis. It selects any child checkable from the dependency group to initiate + * the state update, as all children share the same dependency group state. * - * @param item The pending dependency group state item containing the dependency group. + * @param icingadb The IcingaDB instance to use for executing Redis queries. */ -void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const +void PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const { // For dependency group state updates, we don't actually care which child triggered the update, // since all children share the same dependency group state. Thus, we can just pick any child to // start the update from. - if (auto child = item.DepGroup->GetAnyChild(); child) { - UpdateDependenciesState(child, item.DepGroup); + if (auto child = DepGroup->GetAnyChild(); child) { + icingadb.UpdateDependenciesState(child, DepGroup); } } /** - * Process a single pending dependency edge item. + * Execute the pending dependency edge item. * - * This function fully serializes a single pending dependency edge item (child registration) - * and sends all the resulting Redis queries in a single transaction. The dependencies (edges) - * to serialize are determined by the dependency group and child checkable the provided item represents. + * This function processes the pending dependency edge item and ensures that the necessary Redis + * operations are performed to register the child checkable as part of the dependency group. * - * @param item The pending dependency edge item containing the dependency group and child checkable. + * @param icingadb The IcingaDB instance to use for executing Redis queries. */ -void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item) +void PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const { std::vector runtimeUpdates; std::map hMSets; - InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup); - ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); + icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup); + icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates); } /** - * Process a single pending deletion item. + * Execute the pending relations deletion item. * - * This function processes a single pending deletion item by deleting the specified sub-keys - * from Redis based on the provided deletion keys map. It ensures that the object's ID is - * removed from the specified Redis keys and their corresponding checksum keys if indicated. + * This function processes the pending relations deletion item by deleting the specified relations + * from Redis. It iterates over the map of Redis keys and deletes the relations associated with + * the given ID. * - * @param item The pending deletion item containing the ID and deletion keys map. + * @param icingadb The IcingaDB instance to use for executing Redis queries. */ -void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item) +void RelationsDeletionItem::Execute(IcingaDB& icingadb) const { - ASSERT(std::holds_alternative(item.ID)); // Relation deletion items must have real IDs. - - auto id = std::get(item.ID); - for (auto [redisKey, hasChecksum] : item.Relations) { - if (IsStateKey(redisKey)) { - DeleteState(id, redisKey, hasChecksum); + for (auto [redisKey, hasChecksum] : Relations) { + if (icingadb.IsStateKey(redisKey)) { + icingadb.DeleteState(ID, redisKey, hasChecksum); } else { - DeleteRelationship(id, redisKey, hasChecksum); + icingadb.DeleteRelationship(ID, redisKey, hasChecksum); } } } @@ -278,24 +270,20 @@ void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bit { std::lock_guard lock(m_PendingItemsMutex); - if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) { - m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) mutable { - std::visit( - [&bits](auto& item) { - if (bits & ConfigDelete) { - // A config delete and config update cancel each other out, and we don't need - // to keep any state updates either, as the object is being deleted. - item.DirtyBits &= ~(ConfigUpdate | FullState); - bits &= ~(ConfigUpdate | FullState); // Must not add these bits either. - } else if (bits & ConfigUpdate) { - // A new config update cancels any pending config deletion for the same object. - item.DirtyBits &= ~ConfigDelete; - bits &= ~ConfigDelete; - } - item.DirtyBits |= bits & DirtyBitsAll; - }, - itemToProcess - ); + if (auto [it, inserted] = m_PendingItems.insert(std::make_shared(object, bits)); !inserted) { + m_PendingItems.modify(it, [bits](const std::shared_ptr& item) mutable { + auto configItem = dynamic_cast(item.get()); + if (bits & ConfigDelete) { + // A config delete and config update cancel each other out, and we don't need + // to keep any state updates either, as the object is being deleted. + configItem->DirtyBits &= ~(ConfigUpdate | FullState); + bits &= ~(ConfigUpdate | FullState); // Must not add these bits either. + } else if (bits & ConfigUpdate) { + // A new config update cancels any pending config deletion for the same object. + configItem->DirtyBits &= ~ConfigDelete; + bits &= ~ConfigDelete; + } + configItem->DirtyBits |= bits & DirtyBitsAll; }); } } @@ -307,7 +295,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { { std::lock_guard lock(m_PendingItemsMutex); - m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup}); + m_PendingItems.insert(std::make_shared(depGroup)); } m_PendingItemsCV.notify_one(); } @@ -327,7 +315,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { { std::lock_guard lock(m_PendingItemsMutex); - m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child}); + m_PendingItems.insert(std::make_shared(depGroup, child)); } m_PendingItemsCV.notify_one(); } @@ -469,7 +457,7 @@ void IcingaDB::EnqueueDependencyChildRemoved( * @param id The ID of the relation to be deleted. * @param relations A map of Redis keys from which to delete the relation. */ -void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations) +void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeyMap& relations) { if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { return; // No need to enqueue anything if we're not connected. @@ -477,10 +465,10 @@ void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& { std::lock_guard lock(m_PendingItemsMutex); - if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) { - m_PendingItems.modify(it, [&relations](PendingItemVariant& val) { - auto& item = std::get(val); - item.Relations.insert(relations.begin(), relations.end()); + if (auto [it, inserted] = m_PendingItems.insert(std::make_shared(id, relations)); !inserted) { + m_PendingItems.modify(it, [&relations](std::shared_ptr& val) { + auto item = dynamic_cast(val.get()); + item->Relations.insert(relations.begin(), relations.end()); }); } } diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index c4d6bda87..0f919a22e 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -16,6 +16,7 @@ #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" #include +#include #include #include #include @@ -121,18 +122,6 @@ enum DirtyBits : uint32_t DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate }; -/** - * A variant type representing the identifier of a pending item. - * - * This variant can hold either a string representing a real Redis hash key or a pair consisting of - * a configuration object pointer and a dependency group pointer. A pending item identified by the - * latter variant type operates primarily on the associated configuration object or dependency group, - * thus the pairs are used for uniqueness in the pending items container. - * - * @ingroup icingadb - */ -using PendingItemKey = std::variant>; - /** * A pending queue item. * @@ -146,11 +135,23 @@ using PendingItemKey = std::variant>; - PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits); + virtual ~PendingQueueItem() = default; + + const std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()}; + + virtual Key GetID() const = 0; + virtual ConfigObject::Ptr GetObjectToLock() const { return nullptr; }; + virtual void Execute(IcingaDB& icingadb) const = 0; }; /** @@ -166,8 +167,13 @@ struct PendingQueueItem struct PendingConfigItem : PendingQueueItem { ConfigObject::Ptr Object; + uint32_t DirtyBits; PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits); + + Key GetID() const override { return std::make_pair(Object, nullptr); } + ConfigObject::Ptr GetObjectToLock() const override; + void Execute(IcingaDB& icingadb) const override; }; /** @@ -185,6 +191,9 @@ struct PendingDependencyGroupStateItem : PendingQueueItem DependencyGroup::Ptr DepGroup; explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup); + + Key GetID() const override { return std::make_pair(nullptr, DepGroup.get()); } + void Execute(IcingaDB& icingadb) const override; }; /** @@ -203,10 +212,10 @@ struct PendingDependencyEdgeItem : PendingQueueItem Checkable::Ptr Child; PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); -}; -// Map of Redis keys to a boolean indicating whether to delete the checksum key as well. -using RelationsKeyMap = std::map; + Key GetID() const override { return std::make_pair(Child.get(), DepGroup.get()); } + void Execute(IcingaDB& icingadb) const override; +}; /** * A pending relations deletion item. @@ -220,9 +229,14 @@ using RelationsKeyMap = std::map; */ struct RelationsDeletionItem : PendingQueueItem { + std::string ID; + using RelationsKeyMap = std::map; RelationsKeyMap Relations; - RelationsDeletionItem(const String& id, RelationsKeyMap relations); + RelationsDeletionItem(const String& id, const RelationsKeyMap& relations); + + Key GetID() const override { return ID; } + void Execute(IcingaDB& icingadb) const override; }; /** @@ -464,33 +478,16 @@ private: static std::unordered_set m_IndexedTypes; - // A variant type that can hold any of the pending item types used in the pending items container. - using PendingItemVariant = std::variant< - PendingConfigItem, - PendingDependencyGroupStateItem, - PendingDependencyEdgeItem, - RelationsDeletionItem - >; - - struct PendingItemKeyExtractor - { - // The type of the key extracted from a pending item required by Boost.MultiIndex. - using result_type = const PendingItemKey&; - - result_type operator()(const PendingItemVariant& item) const - { - return std::visit([](const auto& pendingItem) -> result_type { return pendingItem.ID; }, item); - } - }; - // A multi-index container for managing pending items with unique IDs and maintaining insertion order. // The first index is an ordered unique index based on the pending item key, allowing for efficient // lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the // order of insertion, enabling FIFO processing of pending items. using PendingItemsSet = boost::multi_index_container< - PendingItemVariant, + std::shared_ptr, boost::multi_index::indexed_by< - boost::multi_index::ordered_unique, // std::variant has operator< defined. + boost::multi_index::ordered_unique< + boost::multi_index::const_mem_fun + >, boost::multi_index::sequenced<> > >; @@ -502,16 +499,17 @@ private: void PendingItemsThreadProc(); std::chrono::duration DequeueAndProcessOne(std::unique_lock& lock); - void ProcessPendingItem(const PendingConfigItem& item); - void ProcessPendingItem(const PendingDependencyGroupStateItem& item) const; - void ProcessPendingItem(const PendingDependencyEdgeItem& item); - void ProcessPendingItem(const RelationsDeletionItem& item); void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits); void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup); void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector& dependencies, bool removeGroup); - void EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations); + void EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeyMap& relations); + + friend struct PendingConfigItem; + friend struct PendingDependencyGroupStateItem; + friend struct PendingDependencyEdgeItem; + friend struct RelationsDeletionItem; }; }