// SPDX-FileCopyrightText: 2012 Icinga GmbH // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/opentsdbwriter.hpp" #include "perfdata/opentsdbwriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" #include "base/perfdatavalue.hpp" #include "base/stream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include #include using namespace icinga; REGISTER_TYPE(OpenTsdbWriter); REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc); /* * Enable HA capabilities once the config object is loaded. */ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); SetHAMode(HARunEverywhere); } else { SetHAMode(HARunOnce); } } /** * Feature stats interface * * @param status Key value pairs for feature stats */ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; nodes.emplace_back( opentsdbwriter->GetName(), new Dictionary({ { "connected", opentsdbwriter->m_Connection->IsConnected() }, {"work_queue_items", workQueueItems}, {"work_queue_item_rate", workQueueItemRate} } ) ); perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems)); perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); } /** * Resume is equivalent to Start, but with HA capabilities to resume at runtime. */ void OpenTsdbWriter::Resume() { ObjectImpl::Resume(); Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { Log(LogDebug, "OpenTsdbWriter") << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); }); ReadConfigTemplate(); m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); }); } /** * Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); std::promise queueDonePromise; m_WorkQueue.Enqueue([&]() { queueDonePromise.set_value(); }, PriorityLow); auto timeout = std::chrono::duration{GetDisconnectTimeout()}; m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; ObjectImpl::Pause(); } /** * Registered check result handler processing data. * Calculates tags from the config. * * @param checkable Host/service object * @param cr Check result */ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { if (IsPaused()) return; CONTEXT("Processing check result for '" << checkable->GetName() << "'"); if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) return; Service::Ptr service = dynamic_pointer_cast(checkable); Host::Ptr host; Dictionary::Ptr config_tmpl; Dictionary::Ptr config_tmpl_tags; String config_tmpl_metric; if (service) { host = service->GetHost(); config_tmpl = m_ServiceConfigTemplate; } else { host = static_pointer_cast(checkable); config_tmpl = m_HostConfigTemplate; } // Get the tags nested dictionary in the service/host template in the config if (config_tmpl) { config_tmpl_tags = config_tmpl->Get("tags"); config_tmpl_metric = config_tmpl->Get("metric"); } String metric; std::map tags; // Resolve macros in configuration template and build custom tag list if (config_tmpl_tags || !config_tmpl_metric.IsEmpty()) { // Configure config template macro resolver MacroProcessor::ResolverList resolvers; if (service) resolvers.emplace_back("service", service); resolvers.emplace_back("host", host); // Resolve macros for the service and host template config line if (config_tmpl_tags) { ObjectLock olock(config_tmpl_tags); for (const Dictionary::Pair& pair : config_tmpl_tags) { String missing_macro; Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); if (!missing_macro.IsEmpty()) { Log(LogDebug, "OpenTsdbWriter") << "Unable to resolve macro '" << missing_macro << "' for checkable '" << checkable->GetName() << "'."; continue; } if (value.IsEmpty()) { Log(LogDebug, "OpenTsdbWriter") << "Resolved macro '" << pair.second << "' for checkable '" << checkable->GetName() << "' to '', skipping."; continue; } String tagname = Convert::ToString(pair.first); tags[tagname] = EscapeTag(value); } } // Resolve macros for the metric config line if (!config_tmpl_metric.IsEmpty()) { String missing_macro; Value value = MacroProcessor::ResolveMacros(config_tmpl_metric, resolvers, cr, &missing_macro); if (!missing_macro.IsEmpty()) { Log(LogDebug, "OpenTsdbWriter") << "Unable to resolve macro '" << missing_macro << "' for checkable '" << checkable->GetName() << "'."; } else { config_tmpl_metric = Convert::ToString(value); } } } String escaped_hostName = EscapeTag(host->GetName()); tags["host"] = escaped_hostName; std::vector> metadata; if (service) { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { String serviceName = service->GetShortName(); String escaped_serviceName = EscapeMetric(serviceName); metric = "icinga.service." + escaped_serviceName; } metadata.emplace_back("state", service->GetState()); } else { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { metric = "icinga.host"; } metadata.emplace_back("state", host->GetState()); } metadata.emplace_back("state_type", checkable->GetStateType()); metadata.emplace_back("reachable", checkable->IsReachable()); metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth()); metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); m_WorkQueue.Enqueue( [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable { if (m_Connection->IsStopped()) { return; } double ts = cr->GetExecutionEnd(); for (auto& [name, val] : metadata) { AddMetric(checkable, metric + "." + name, tags, val, ts); } AddPerfdata(checkable, metric, tags, cr, ts); metric = "icinga.check"; if (service) { tags["type"] = "service"; String serviceName = service->GetShortName(); String escaped_serviceName = EscapeTag(serviceName); tags["service"] = escaped_serviceName; } else { tags["type"] = "host"; } AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); SendMsgBuffer(); } ); } /** * Parse and send performance data metrics to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name * @param tags Tag key pairs * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { ASSERT(m_WorkQueue.IsWorkerThread()); Array::Ptr perfdata = cr->GetPerformanceData(); if (!perfdata) return; CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); ObjectLock olock(perfdata); for (const Value& val : perfdata) { PerfdataValue::Ptr pdv; if (val.IsObjectType()) pdv = val; else { try { pdv = PerfdataValue::Parse(val); } catch (const std::exception&) { Log(LogWarning, "OpenTsdbWriter") << "Ignoring invalid perfdata for checkable '" << checkable->GetName() << "' and command '" << checkCommand->GetName() << "' with value: " << val; continue; } } String metric_name; std::map tags_new = tags; // Do not break original functionality where perfdata labels form // part of the metric name if (!GetEnableGenericMetrics()) { String escaped_key = EscapeMetric(pdv->GetLabel()); boost::algorithm::replace_all(escaped_key, "::", "."); metric_name = metric + "." + escaped_key; } else { String escaped_key = EscapeTag(pdv->GetLabel()); metric_name = metric; tags_new["label"] = escaped_key; } AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** * Add given metric to the data buffer to be later sent to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name * @param tags Tag key pairs * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { ASSERT(m_WorkQueue.IsWorkerThread()); String tags_string = ""; for (auto& tag : tags) { tags_string += " " + tag.first + "=" + tag.second; } std::ostringstream msgbuf; /* * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html) * put * "tags" must include at least one tag, we use "host=HOSTNAME" */ msgbuf << "put " << metric << " " << static_cast(ts) << " " << Convert::ToString(value) << tags_string; Log(LogDebug, "OpenTsdbWriter") << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'."; /* do not send \n to debug log */ msgbuf << "\n"; m_MsgBuf.append(msgbuf.str()); } void OpenTsdbWriter::SendMsgBuffer() { ASSERT(m_WorkQueue.IsWorkerThread()); Log(LogDebug, "OpenTsdbWriter") << "Flushing data buffer to OpenTsdb."; try { m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); } catch (const PerfdataWriterConnection::Stopped& ex) { Log(LogDebug, "OpenTsdbWriter") << ex.what(); return; } } /** * Escape tags for OpenTSDB * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags * * @param str Tag name * @return Escaped tag */ String OpenTsdbWriter::EscapeTag(const String& str) { String result = str; boost::replace_all(result, " ", "_"); boost::replace_all(result, "\\", "_"); boost::replace_all(result, ":", "_"); return result; } /** * Escape metric name for OpenTSDB * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags * * @param str Metric name * @return Escaped metric */ String OpenTsdbWriter::EscapeMetric(const String& str) { String result = str; boost::replace_all(result, " ", "_"); boost::replace_all(result, ".", "_"); boost::replace_all(result, "\\", "_"); boost::replace_all(result, ":", "_"); return result; } /** * Saves the template dictionaries defined in the config file into running memory * * @param stemplate The dictionary to save the service configuration to * @param htemplate The dictionary to save the host configuration to */ void OpenTsdbWriter::ReadConfigTemplate() { m_ServiceConfigTemplate = GetServiceTemplate(); if (!m_ServiceConfigTemplate) { Log(LogDebug, "OpenTsdbWriter") << "Unable to locate service template configuration."; } else if (m_ServiceConfigTemplate->GetLength() == 0) { Log(LogDebug, "OpenTsdbWriter") << "The service template configuration is empty."; } m_HostConfigTemplate = GetHostTemplate(); if (!m_HostConfigTemplate) { Log(LogDebug, "OpenTsdbWriter") << "Unable to locate host template configuration."; } else if (m_HostConfigTemplate->GetLength() == 0) { Log(LogDebug, "OpenTsdbWriter") << "The host template configuration is empty."; } } /** * Validates the host_template configuration block in the configuration * file and checks for syntax errors. * * @param lvalue The host_template dictionary * @param utils Validation helper utilities */ void OpenTsdbWriter::ValidateHostTemplate(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateHostTemplate(lvalue, utils); String metric = lvalue()->Get("metric"); if (!MacroProcessor::ValidateMacroString(metric)) BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'.")); Dictionary::Ptr tags = lvalue()->Get("tags"); if (tags) { ObjectLock olock(tags); for (const Dictionary::Pair& pair : tags) { if (!MacroProcessor::ValidateMacroString(pair.second)) BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); } } } /** * Validates the service_template configuration block in the * configuration file and checks for syntax errors. * * @param lvalue The service_template dictionary * @param utils Validation helper utilities */ void OpenTsdbWriter::ValidateServiceTemplate(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateServiceTemplate(lvalue, utils); String metric = lvalue()->Get("metric"); if (!MacroProcessor::ValidateMacroString(metric)) BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'.")); Dictionary::Ptr tags = lvalue()->Get("tags"); if (tags) { ObjectLock olock(tags); for (const Dictionary::Pair& pair : tags) { if (!MacroProcessor::ValidateMacroString(pair.second)) BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); } } }