From bbb7de72f46d27fa3b043231f321d204ca4247aa Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Fri, 12 Dec 2025 13:27:02 +0100 Subject: [PATCH] Refactor OpenTsdbWriter to use a WorkQueue --- lib/perfdata/opentsdbwriter.cpp | 102 +++++++++++++++++++------------- lib/perfdata/opentsdbwriter.hpp | 7 ++- 2 files changed, 65 insertions(+), 44 deletions(-) diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 4ce1a89a6..b518a75ba 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -35,6 +35,8 @@ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); + m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); + if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); @@ -73,6 +75,11 @@ void OpenTsdbWriter::Resume() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; + m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { + Log(LogDebug, "ElasticsearchWriter") + << "Exception during Elasticsearch operation: " << DiagnosticInformation(exp); + }); + ReadConfigTemplate(); m_ReconnectTimer = Timer::Create(); @@ -94,6 +101,8 @@ void OpenTsdbWriter::Pause() m_HandleCheckResults.disconnect(); m_ReconnectTimer->Stop(true); + m_WorkQueue.Join(); + Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; @@ -262,7 +271,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C metric = "icinga.service." + escaped_serviceName; } - SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); + AddMetric(checkable, metric + ".state", tags, service->GetState(), ts); } else { if (!config_tmpl_metric.IsEmpty()) { @@ -270,31 +279,36 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C } else { metric = "icinga.host"; } - SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); + AddMetric(checkable, metric + ".state", tags, host->GetState(), ts); } - SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); - SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); - SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); - SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); + AddMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); + AddMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); + AddMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); + AddMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); - SendPerfdata(checkable, metric, tags, cr, ts); + m_WorkQueue.Enqueue([this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), ts]( + ) mutable { + AddPerfdata(checkable, metric, tags, cr, ts); - metric = "icinga.check"; + metric = "icinga.check"; - if (service) { - tags["type"] = "service"; - String serviceName = service->GetShortName(); - String escaped_serviceName = EscapeTag(serviceName); - tags["service"] = escaped_serviceName; - } else { - tags["type"] = "host"; - } + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } - SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); - SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); - SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); - SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + + SendMsgBuffer(); + }); } /** @@ -306,7 +320,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ -void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { Array::Ptr perfdata = cr->GetPerformanceData(); @@ -349,21 +363,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& tags_new["label"] = escaped_key; } - SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) - SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** - * Send given metric to OpenTSDB + * Add given metric to the data buffer to be later sent to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name @@ -371,40 +385,44 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ -void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { String tags_string = ""; - for (auto& tag : tags) { - tags_string += " " + tag.first + "=" + tag.second; - } - - std::ostringstream msgbuf; /* * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html) * put * "tags" must include at least one tag, we use "host=HOSTNAME" */ - msgbuf << "put " << metric << " " << static_cast(ts) << " " << Convert::ToString(value) << tags_string; + std::string msg{ + "put " + metric + " " + std::to_string(static_cast(ts)) + " " + Convert::ToString(value) + }; + + for (const auto& tag : tags) { + tags_string += " " + tag.first + "=" + tag.second; + } Log(LogDebug, "OpenTsdbWriter") - << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'."; + << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msg << "'."; /* do not send \n to debug log */ - msgbuf << "\n"; - String put = msgbuf.str(); + m_MsgBuf.append(msg); + m_MsgBuf.append(tags_string).push_back('\n'); +} - ObjectLock olock(this); +void OpenTsdbWriter::SendMsgBuffer() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); if (!GetConnected()) return; - try { - Log(LogDebug, "OpenTsdbWriter") - << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + Log(LogDebug, "OpenTsdbWriter") + << "Flushing data buffer to OpenTsdb."; - boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + try { + boost::asio::write(*m_Stream, boost::asio::buffer(m_MsgBuf)); m_Stream->flush(); } catch (const std::exception& ex) { Log(LogCritical, "OpenTsdbWriter") diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index de888e5da..65e814a43 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -35,6 +35,8 @@ protected: void Pause() override; private: + WorkQueue m_WorkQueue{10000000, 1}; + std::string m_MsgBuf; Shared::Ptr m_Stream; boost::signals2::connection m_HandleCheckResults; @@ -44,9 +46,10 @@ private: Dictionary::Ptr m_HostConfigTemplate; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const String& metric, + void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); - void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + void SendMsgBuffer(); + void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); static String EscapeTag(const String& str); static String EscapeMetric(const String& str);