Refactor OpenTsdbWriter to use a WorkQueue

This commit is contained in:
Johannes Schmidt 2025-12-12 13:27:02 +01:00
parent 3eae8c01fc
commit bbb7de72f4
2 changed files with 65 additions and 44 deletions

View file

@ -35,6 +35,8 @@ void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("OpenTsdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "OpenTsdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
@ -73,6 +75,11 @@ void OpenTsdbWriter::Resume()
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) {
Log(LogDebug, "ElasticsearchWriter")
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
});
ReadConfigTemplate();
m_ReconnectTimer = Timer::Create();
@ -94,6 +101,8 @@ void OpenTsdbWriter::Pause()
m_HandleCheckResults.disconnect();
m_ReconnectTimer->Stop(true);
m_WorkQueue.Join();
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
@ -262,7 +271,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
metric = "icinga.service." + escaped_serviceName;
}
SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
AddMetric(checkable, metric + ".state", tags, service->GetState(), ts);
} else {
if (!config_tmpl_metric.IsEmpty()) {
@ -270,31 +279,36 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
} else {
metric = "icinga.host";
}
SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
AddMetric(checkable, metric + ".state", tags, host->GetState(), ts);
}
SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
AddMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
AddMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
AddMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
AddMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
SendPerfdata(checkable, metric, tags, cr, ts);
m_WorkQueue.Enqueue([this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), ts](
) mutable {
AddPerfdata(checkable, metric, tags, cr, ts);
metric = "icinga.check";
metric = "icinga.check";
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
SendMsgBuffer();
});
}
/**
@ -306,7 +320,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
* @param cr Check result containing performance data
* @param ts Timestamp when the check result was received
*/
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
@ -349,21 +363,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
tags_new["label"] = escaped_key;
}
SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
if (!pdv->GetCrit().IsEmpty())
SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
if (!pdv->GetWarn().IsEmpty())
SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
if (!pdv->GetMin().IsEmpty())
SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
if (!pdv->GetMax().IsEmpty())
SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
}
}
/**
* Send given metric to OpenTSDB
* Add given metric to the data buffer to be later sent to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
@ -371,40 +385,44 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
* @param value Floating point metric value
* @param ts Timestamp where the metric was received from the check result
*/
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
String tags_string = "";
for (auto& tag : tags) {
tags_string += " " + tag.first + "=" + tag.second;
}
std::ostringstream msgbuf;
/*
* must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
* put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
* "tags" must include at least one tag, we use "host=HOSTNAME"
*/
msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << tags_string;
std::string msg{
"put " + metric + " " + std::to_string(static_cast<long>(ts)) + " " + Convert::ToString(value)
};
for (const auto& tag : tags) {
tags_string += " " + tag.first + "=" + tag.second;
}
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msg << "'.";
/* do not send \n to debug log */
msgbuf << "\n";
String put = msgbuf.str();
m_MsgBuf.append(msg);
m_MsgBuf.append(tags_string).push_back('\n');
}
ObjectLock olock(this);
void OpenTsdbWriter::SendMsgBuffer()
{
ASSERT(m_WorkQueue.IsWorkerThread());
if (!GetConnected())
return;
try {
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
Log(LogDebug, "OpenTsdbWriter")
<< "Flushing data buffer to OpenTsdb.";
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
try {
boost::asio::write(*m_Stream, boost::asio::buffer(m_MsgBuf));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "OpenTsdbWriter")

View file

@ -35,6 +35,8 @@ protected:
void Pause() override;
private:
WorkQueue m_WorkQueue{10000000, 1};
std::string m_MsgBuf;
Shared<AsioTcpStream>::Ptr m_Stream;
boost::signals2::connection m_HandleCheckResults;
@ -44,9 +46,10 @@ private:
Dictionary::Ptr m_HostConfigTemplate;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& metric,
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void SendMsgBuffer();
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
static String EscapeTag(const String& str);
static String EscapeMetric(const String& str);