diff --git a/lib/methods/clusterzonechecktask.cpp b/lib/methods/clusterzonechecktask.cpp index 07a03a97a..f98fa7723 100644 --- a/lib/methods/clusterzonechecktask.cpp +++ b/lib/methods/clusterzonechecktask.cpp @@ -134,6 +134,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che double lastMessageSent = 0; double lastMessageReceived = 0; + uint_fast64_t pendingOutgoingMessages = 0; double messagesSentPerSecond = 0; double messagesReceivedPerSecond = 0; double bytesSentPerSecond = 0; @@ -157,6 +158,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che if (endpoint->GetLastMessageReceived() > lastMessageReceived) lastMessageReceived = endpoint->GetLastMessageReceived(); + pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages(); messagesSentPerSecond += endpoint->GetMessagesSentPerSecond(); messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond(); bytesSentPerSecond += endpoint->GetBytesSentPerSecond(); @@ -208,6 +210,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical), new PerfdataValue("last_messages_sent", lastMessageSent), new PerfdataValue("last_messages_received", lastMessageReceived), + new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages), new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond), new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond), new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond), diff --git a/lib/methods/icingachecktask.cpp b/lib/methods/icingachecktask.cpp index aa2061ef0..81541d684 100644 --- a/lib/methods/icingachecktask.cpp +++ b/lib/methods/icingachecktask.cpp @@ -124,6 +124,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes double lastMessageSent = 0; double lastMessageReceived = 0; + uint_fast64_t pendingOutgoingMessages = 0; double messagesSentPerSecond = 0; double messagesReceivedPerSecond = 0; double bytesSentPerSecond = 0; @@ -137,6 +138,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes if (endpoint->GetLastMessageReceived() > lastMessageReceived) lastMessageReceived = endpoint->GetLastMessageReceived(); + pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages(); messagesSentPerSecond += endpoint->GetMessagesSentPerSecond(); messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond(); bytesSentPerSecond += endpoint->GetBytesSentPerSecond(); @@ -145,6 +147,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent)); perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived)); + perfdata->Add(new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages)); perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond)); perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond)); perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond)); diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp index d558347b7..5a4fa76c8 100644 --- a/lib/remote/endpoint.cpp +++ b/lib/remote/endpoint.cpp @@ -110,6 +110,18 @@ Endpoint::Ptr Endpoint::GetLocalEndpoint() return listener->GetLocalEndpoint(); } +uint_fast64_t Endpoint::GetPendingOutgoingMessages() const +{ + uint_fast64_t pending = 0; + std::unique_lock lock (m_ClientsLock); + + for (auto& client : m_Clients) { + pending += client->GetPendingOutgoingMessages(); + } + + return pending; +} + void Endpoint::AddMessageSent(int bytes) { double time = Utility::GetTime(); diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index dcf5b0f19..28fe7ae84 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -46,6 +46,7 @@ public: static Endpoint::Ptr GetLocalEndpoint(); void SetCachedZone(const intrusive_ptr& zone); + uint_fast64_t GetPendingOutgoingMessages() const override; void AddMessageSent(int bytes); void AddMessageReceived(int bytes); diff --git a/lib/remote/endpoint.ti b/lib/remote/endpoint.ti index 019dda89d..864080e8c 100644 --- a/lib/remote/endpoint.ti +++ b/lib/remote/endpoint.ti @@ -40,6 +40,10 @@ class Endpoint : ConfigObject Timestamp last_message_sent; Timestamp last_message_received; + [no_user_modify, no_storage] uint_fast64_t pending_outgoing_messages { + get; + }; + [no_user_modify, no_storage] double messages_sent_per_second { get; }; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 0d40de1de..df26f6076 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -158,6 +158,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) } size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc); + m_PendingOutgoingMessages.fetch_sub(1, std::memory_order_relaxed); if (m_Endpoint) { m_Endpoint->AddMessageSent(bytesSent); @@ -234,6 +235,7 @@ void JsonRpcConnection::SendRawMessage(const String& message) m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed); }); } @@ -245,6 +247,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed); } void JsonRpcConnection::Disconnect() diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 78b64322b..d13803d27 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -57,6 +57,11 @@ public: Shared::Ptr GetStream() const; ConnectionRole GetRole() const; + auto GetPendingOutgoingMessages() const noexcept + { + return m_PendingOutgoingMessages.load(std::memory_order_relaxed); + } + void Disconnect(); void SendMessage(const Dictionary::Ptr& request); @@ -79,6 +84,7 @@ private: boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioEvent m_OutgoingMessagesQueued; + Atomic m_PendingOutgoingMessages {0}; AsioEvent m_WriterDone; Atomic m_ShuttingDown; WaitGroup::Ptr m_WaitGroup;