mirror of
https://github.com/Icinga/icinga2.git
synced 2026-04-25 08:07:31 -04:00
Merge pull request #10668 from Icinga/perfdata-writers-connection-handling
Add PerfdatawriterConnection to handle network requests for Perfdata Writers
This commit is contained in:
commit
6592eae21d
34 changed files with 1752 additions and 762 deletions
|
|
@ -1214,6 +1214,7 @@ Configuration Attributes:
|
|||
port | Number | **Required.** Elasticsearch port. Defaults to `9200`.
|
||||
index | String | **Required.** Prefix for the index names. Defaults to `icinga2`.
|
||||
enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`.
|
||||
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`.
|
||||
username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy.
|
||||
|
|
@ -1310,6 +1311,7 @@ Configuration Attributes:
|
|||
--------------------------|-----------------------|----------------------------------
|
||||
host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`.
|
||||
port | Number | **Optional.** GELF receiver port. Defaults to `12201`.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
|
||||
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
|
||||
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
|
|
@ -1340,6 +1342,7 @@ Configuration Attributes:
|
|||
--------------------------|-----------------------|----------------------------------
|
||||
host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`.
|
||||
port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`.
|
||||
host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`.
|
||||
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
|
||||
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
|
||||
|
|
@ -1682,6 +1685,7 @@ Configuration Attributes:
|
|||
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
|
||||
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
|
||||
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
|
||||
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
|
|
@ -1745,6 +1749,7 @@ Configuration Attributes:
|
|||
service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol.
|
||||
enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data.
|
||||
enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`.
|
||||
flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
|
|
@ -1860,6 +1865,7 @@ Configuration Attributes:
|
|||
--------------------------|-----------------------|----------------------------------
|
||||
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
|
||||
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.
|
||||
host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`.
|
||||
|
|
|
|||
|
|
@ -76,5 +76,10 @@
|
|||
#define BOOST_BIND_NO_PLACEHOLDERS
|
||||
|
||||
#include <functional>
|
||||
#include <chrono>
|
||||
|
||||
namespace icinga {
|
||||
using namespace std::chrono_literals;
|
||||
} // namespace icinga
|
||||
|
||||
#endif /* I2BASE_H */
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ set(perfdata_SOURCES
|
|||
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
|
||||
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
|
||||
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
|
||||
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
|
||||
)
|
||||
|
||||
if(ICINGA2_UNITY_BUILD)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/elasticsearchwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/elasticsearchwriter-ti.cpp"
|
||||
#include "remote/url.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
|
|
@ -9,30 +10,14 @@
|
|||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/base64.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/beast/core/flat_buffer.hpp>
|
||||
#include <boost/beast/http/field.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/parser.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/status.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <boost/beast/http/verb.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
|
|
@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
|
|||
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
if (GetEnableTls()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "ElasticsearchWriter")
|
||||
<< "Unable to create SSL context: " << ex.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Resume()
|
||||
{
|
||||
ObjectImpl<ElasticsearchWriter>::Resume();
|
||||
|
||||
m_EventPrefix = "icinga2.event.";
|
||||
|
||||
Log(LogInformation, "ElasticsearchWriter")
|
||||
<< "'" << GetName() << "' resumed.";
|
||||
|
||||
|
|
@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume()
|
|||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
|
||||
/* Register for new metrics. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
|
|
@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause()
|
|||
m_HandleNotifications.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
m_WorkQueue.Join();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_DataBufferMutex);
|
||||
std::promise<void> queueDonePromise;
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Flush();
|
||||
}
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "ElasticsearchWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
|
@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
AddCheckResult(fields, checkable, cr);
|
||||
|
|
@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
|
||||
|
||||
AddCheckResult(fields, checkable, cr);
|
||||
|
|
@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "ElasticsearchWriter")
|
||||
|
|
@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
|
|||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
/* Atomically buffer the data point. */
|
||||
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
|
||||
|
||||
/* Format the timestamps to dynamically select the date datatype inside the index. */
|
||||
fields->Set("@timestamp", FormatTimestamp(ts));
|
||||
fields->Set("timestamp", FormatTimestamp(ts));
|
||||
|
||||
String eventType = m_EventPrefix + type;
|
||||
fields->Set("type", eventType);
|
||||
fields->Set("type", "icinga2.event." + type);
|
||||
|
||||
/* Every payload needs a line describing the index.
|
||||
* We do it this way to avoid problems with a near full queue.
|
||||
|
|
@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue if there isn't one queued already.
|
||||
*/
|
||||
void ElasticsearchWriter::FlushTimeout()
|
||||
{
|
||||
/* Prevent new data points from being added to the array, there is a
|
||||
* race condition where they could disappear.
|
||||
*/
|
||||
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
|
||||
|
||||
/* Flush if there are any data available. */
|
||||
if (m_DataBuffer.size() > 0) {
|
||||
Log(LogDebug, "ElasticsearchWriter")
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
Flush();
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{
|
||||
[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }
|
||||
};
|
||||
Flush();
|
||||
});
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Flush()
|
||||
|
|
@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
|
||||
url->SetPath(path);
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
try {
|
||||
stream = Connect();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
|
||||
return;
|
||||
}
|
||||
|
||||
Defer s ([&stream]() {
|
||||
if (stream.first) {
|
||||
stream.first->next_layer().shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
|
||||
|
||||
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
|
||||
|
|
@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
|
||||
<< " to '" << url->Format() << "'.";
|
||||
|
||||
decltype(m_Connection->Send(request)) response;
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::write(*stream.first, request);
|
||||
stream.first->flush();
|
||||
} else {
|
||||
http::write(*stream.second, request);
|
||||
stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
response = m_Connection->Send(request);
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "ElasticsearchWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
|
||||
http::parser<false, http::string_body> parser;
|
||||
beast::flat_buffer buf;
|
||||
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::read(*stream.first, buf, parser);
|
||||
} else {
|
||||
http::read(*stream.second, buf, parser);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
|
||||
throw;
|
||||
}
|
||||
|
||||
auto& response (parser.get());
|
||||
|
||||
if (response.result_int() > 299) {
|
||||
if (response.result() == http::status::unauthorized) {
|
||||
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
|
||||
|
|
@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
}
|
||||
}
|
||||
|
||||
OptionalTlsStream ElasticsearchWriter::Connect()
|
||||
{
|
||||
Log(LogNotice, "ElasticsearchWriter")
|
||||
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
OptionalTlsStream stream;
|
||||
bool tls = GetEnableTls();
|
||||
|
||||
if (tls) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (tls) {
|
||||
auto& tlsStream (stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::AssertOnWorkQueue()
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
|
|
|||
|
|
@ -5,11 +5,10 @@
|
|||
#define ELASTICSEARCHWRITER_H
|
||||
|
||||
#include "perfdata/elasticsearchwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -29,16 +28,18 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
String m_EventPrefix;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::mutex m_DataBufferMutex;
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
|
|
@ -51,7 +52,6 @@ private:
|
|||
void Enqueue(const Checkable::Ptr& checkable, const String& type,
|
||||
const Dictionary::Ptr& fields, double ts);
|
||||
|
||||
OptionalTlsStream Connect();
|
||||
void AssertOnWorkQueue();
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
void FlushTimeout();
|
||||
|
|
|
|||
|
|
@ -40,7 +40,10 @@ class ElasticsearchWriter : ConfigObject
|
|||
[config] String cert_path;
|
||||
[config] String key_path;
|
||||
|
||||
[config] int flush_interval {
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] double flush_interval {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] int flush_threshold {
|
||||
|
|
|
|||
|
|
@ -6,28 +6,19 @@
|
|||
#include "icinga/service.hpp"
|
||||
#include "icinga/notification.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/context.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <utility>
|
||||
#include "base/io-engine.hpp"
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
|
|
@ -62,7 +53,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
|
|||
nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", gelfwriter->GetConnected() },
|
||||
{ "connected", gelfwriter->m_Connection->IsConnected() },
|
||||
{ "source", gelfwriter->GetSource() }
|
||||
}));
|
||||
|
||||
|
|
@ -73,6 +64,22 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
|
|||
status->Set("gelfwriter", new Dictionary(std::move(nodes)));
|
||||
}
|
||||
|
||||
void GelfWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
/* Initialize connection */
|
||||
if (GetEnableTls()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void GelfWriter::Resume()
|
||||
{
|
||||
ObjectImpl<GelfWriter>::Resume();
|
||||
|
|
@ -83,12 +90,7 @@ void GelfWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Timer for reconnecting */
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
@ -113,18 +115,15 @@ void GelfWriter::Pause()
|
|||
m_HandleNotifications.disconnect();
|
||||
m_HandleStateChanges.disconnect();
|
||||
|
||||
m_ReconnectTimer->Stop(true);
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([this]() {
|
||||
try {
|
||||
ReconnectInternal();
|
||||
} catch (const std::exception&) {
|
||||
Log(LogInformation, "GelfWriter")
|
||||
<< "Unable to connect, not flushing buffers. Data may be lost.";
|
||||
}
|
||||
}, PriorityImmediate);
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow);
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "GelfWriter")
|
||||
|
|
@ -142,126 +141,6 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
{
|
||||
Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
|
||||
Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::Reconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
if (IsPaused()) {
|
||||
SetConnected(false);
|
||||
return;
|
||||
}
|
||||
|
||||
ReconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectInternal()
|
||||
{
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
Log(LogNotice, "GelfWriter")
|
||||
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
bool ssl = GetEnableTls();
|
||||
|
||||
if (ssl) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ssl) {
|
||||
auto& tlsStream (m_Stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "TLS handshake with host '" << GetHost() << " failed.'";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "GelfWriter")
|
||||
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectTimerHandler()
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
|
||||
}
|
||||
|
||||
void GelfWriter::Disconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::DisconnectInternal()
|
||||
{
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
if (m_Stream.first) {
|
||||
boost::system::error_code ec;
|
||||
m_Stream.first->next_layer().shutdown(ec);
|
||||
|
||||
// https://stackoverflow.com/a/25703699
|
||||
// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
|
||||
if (ec.category() == boost::asio::error::get_ssl_category()) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
|
||||
}
|
||||
} else if (m_Stream.second) {
|
||||
m_Stream.second->close();
|
||||
}
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
}
|
||||
|
||||
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
|
|
@ -298,6 +177,10 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
|
|||
fields->Set("_check_command", checkCommand->GetName());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -405,6 +288,10 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti
|
|||
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -447,6 +334,10 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
|
|||
fields->Set("_check_source", cr->GetCheckSource());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -473,26 +364,15 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
|
|||
msgbuf << gelfMessage;
|
||||
msgbuf << '\0';
|
||||
|
||||
String log = msgbuf.str();
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
auto log = msgbuf.str();
|
||||
|
||||
try {
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
|
||||
|
||||
if (m_Stream.first) {
|
||||
boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
|
||||
m_Stream.first->flush();
|
||||
} else {
|
||||
boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
|
||||
m_Stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
throw ex;
|
||||
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GelfWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,12 +5,10 @@
|
|||
#define GELFWRITER_H
|
||||
|
||||
#include "perfdata/gelfwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -30,15 +28,16 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
OptionalTlsStream m_Stream;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
|
||||
|
|
@ -48,13 +47,6 @@ private:
|
|||
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
|
||||
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void Disconnect();
|
||||
void DisconnectInternal();
|
||||
void Reconnect();
|
||||
void ReconnectInternal();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -25,9 +25,8 @@ class GelfWriter : ConfigObject
|
|||
default {{{ return false; }}}
|
||||
};
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_ha {
|
||||
default {{{ return false; }}}
|
||||
|
|
|
|||
|
|
@ -7,16 +7,13 @@
|
|||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
|
@ -65,7 +62,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
|||
nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", graphitewriter->GetConnected() }
|
||||
{ "connected", graphitewriter->m_Connection->IsConnected() }
|
||||
}));
|
||||
|
||||
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
|
||||
|
|
@ -88,12 +85,7 @@ void GraphiteWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Timer for reconnecting */
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
@ -108,20 +100,17 @@ void GraphiteWriter::Resume()
|
|||
void GraphiteWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer->Stop(true);
|
||||
|
||||
try {
|
||||
ReconnectInternal();
|
||||
} catch (const std::exception&) {
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
ObjectImpl<GraphiteWriter>::Pause();
|
||||
return;
|
||||
}
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
DisconnectInternal();
|
||||
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
|
@ -150,105 +139,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
|
||||
Log(LogDebug, "GraphiteWriter")
|
||||
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
|
||||
|
||||
if (GetConnected()) {
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect method, stops when the feature is paused in HA zones.
|
||||
*
|
||||
* Called inside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::Reconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
if (IsPaused()) {
|
||||
SetConnected(false);
|
||||
return;
|
||||
}
|
||||
|
||||
ReconnectInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect method, connects to a TCP Stream
|
||||
*/
|
||||
void GraphiteWriter::ReconnectInternal()
|
||||
{
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
Log(LogNotice, "GraphiteWriter")
|
||||
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
|
||||
try {
|
||||
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GraphiteWriter")
|
||||
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect handler called by the timer.
|
||||
*
|
||||
* Enqueues a reconnect task into the WQ.
|
||||
*/
|
||||
void GraphiteWriter::ReconnectTimerHandler()
|
||||
{
|
||||
if (IsPaused())
|
||||
return;
|
||||
|
||||
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect the stream.
|
||||
*
|
||||
* Called inside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::Disconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect the stream.
|
||||
*
|
||||
* Called outside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::DisconnectInternal()
|
||||
{
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -302,11 +192,11 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
}
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() {
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* TODO: Deal with missing connection here. Needs refactoring
|
||||
* into parsing the actual performance data and then putting it
|
||||
* into a queue for re-inserting. */
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
for (auto& [name, val] : metadata) {
|
||||
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
|
||||
|
|
@ -394,19 +284,11 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
|
|||
// do not send \n to debug log
|
||||
msgbuf << "\n";
|
||||
|
||||
std::unique_lock<std::mutex> lock(m_StreamMutex);
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
try {
|
||||
asio::write(*m_Stream, asio::buffer(msgbuf.str()));
|
||||
m_Stream->flush();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GraphiteWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
throw ex;
|
||||
m_Connection->Send(asio::buffer(msgbuf.str()));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GraphiteWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,13 +5,10 @@
|
|||
#define GRAPHITEWRITER_H
|
||||
|
||||
#include "perfdata/graphitewriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <fstream>
|
||||
#include <mutex>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -38,12 +35,10 @@ protected:
|
|||
void Pause() override;
|
||||
|
||||
private:
|
||||
Shared<AsioTcpStream>::Ptr m_Stream;
|
||||
std::mutex m_StreamMutex;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
|
||||
|
|
@ -52,13 +47,6 @@ private:
|
|||
static String EscapeMetricLabel(const String& str);
|
||||
static Value EscapeMacroMetric(const Value& value);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void Disconnect();
|
||||
void DisconnectInternal();
|
||||
void Reconnect();
|
||||
void ReconnectInternal();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -27,9 +27,8 @@ class GraphiteWriter : ConfigObject
|
|||
[config] bool enable_send_thresholds;
|
||||
[config] bool enable_send_metadata;
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_ha {
|
||||
default {{{ return false; }}}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/influxdbcommonwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/influxdbcommonwriter-ti.cpp"
|
||||
#include "remote/url.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
|
|
@ -9,36 +10,15 @@
|
|||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include "base/tlsutility.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/beast/core/flat_buffer.hpp>
|
||||
#include <boost/beast/http/field.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/parser.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/status.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <boost/beast/http/verb.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <boost/math/special_functions/fpclassify.hpp>
|
||||
#include <boost/regex.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
|
|
@ -80,6 +60,21 @@ void InfluxdbCommonWriter::OnConfigLoaded()
|
|||
}
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
if (GetSslEnable()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, GetReflectionType()->GetName())
|
||||
<< "Unable to create SSL context: " << ex.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::Resume()
|
||||
{
|
||||
ObjectImpl<InfluxdbCommonWriter>::Resume();
|
||||
|
|
@ -97,6 +92,8 @@ void InfluxdbCommonWriter::Resume()
|
|||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()};
|
||||
|
||||
/* Register for new metrics. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
|
|
@ -114,7 +111,15 @@ void InfluxdbCommonWriter::Pause()
|
|||
<< "Processing pending tasks and flushing data buffers.";
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
FlushWQ();
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
|
||||
m_WorkQueue.Join();
|
||||
|
|
@ -136,68 +141,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
|
||||
Log(LogDebug, GetReflectionType()->GetName())
|
||||
<< "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
|
||||
|
||||
//TODO: Close the connection, if we keep it open.
|
||||
}
|
||||
|
||||
OptionalTlsStream InfluxdbCommonWriter::Connect()
|
||||
{
|
||||
Log(LogNotice, GetReflectionType()->GetName())
|
||||
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
OptionalTlsStream stream;
|
||||
bool ssl = GetSslEnable();
|
||||
|
||||
if (ssl) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ssl) {
|
||||
auto& tlsStream (stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetSslInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
|
|
@ -261,6 +204,10 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c
|
|||
}
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
double ts = cr->GetExecutionEnd();
|
||||
|
|
@ -411,19 +358,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue and restarts the timer.
|
||||
*/
|
||||
void InfluxdbCommonWriter::FlushTimeout()
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
|
||||
}
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::FlushTimeoutWQ()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
Log(LogDebug, GetReflectionType()->GetName())
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
|
||||
FlushWQ();
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
|
||||
FlushWQ();
|
||||
});
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::FlushWQ()
|
||||
|
|
@ -444,55 +391,16 @@ void InfluxdbCommonWriter::FlushWQ()
|
|||
m_DataBuffer.clear();
|
||||
m_DataBufferSize = 0;
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
try {
|
||||
stream = Connect();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
|
||||
return;
|
||||
}
|
||||
|
||||
Defer s ([&stream]() {
|
||||
if (stream.first) {
|
||||
stream.first->next_layer().shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
auto request (AssembleRequest(std::move(body)));
|
||||
|
||||
decltype(m_Connection->Send(request)) response;
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::write(*stream.first, request);
|
||||
stream.first->flush();
|
||||
} else {
|
||||
http::write(*stream.second, request);
|
||||
stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
response = m_Connection->Send(request);
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, GetReflectionType()->GetName()) << ex.what();
|
||||
return;
|
||||
}
|
||||
|
||||
http::parser<false, http::string_body> parser;
|
||||
beast::flat_buffer buf;
|
||||
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::read(*stream.first, buf, parser);
|
||||
} else {
|
||||
http::read(*stream.second, buf, parser);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
|
||||
throw;
|
||||
}
|
||||
|
||||
auto& response (parser.get());
|
||||
|
||||
if (response.result() != http::status::no_content) {
|
||||
Log(LogCritical, GetReflectionType()->GetName())
|
||||
<< "Unexpected response code: " << response.result() << ", InfluxDB error message:\n" << response.body();
|
||||
|
|
|
|||
|
|
@ -5,18 +5,13 @@
|
|||
#define INFLUXDBCOMMONWRITER_H
|
||||
|
||||
#include "perfdata/influxdbcommonwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include "remote/url.hpp"
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include <atomic>
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -39,6 +34,7 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
|
|
@ -50,22 +46,22 @@ protected:
|
|||
private:
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::atomic_size_t m_DataBufferSize{0};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
|
||||
const String& label, const Dictionary::Ptr& fields, double ts);
|
||||
void FlushTimeout();
|
||||
void FlushTimeoutWQ();
|
||||
void FlushWQ();
|
||||
|
||||
static String EscapeKeyOrTagValue(const String& str);
|
||||
static String EscapeValue(const Value& value);
|
||||
|
||||
OptionalTlsStream Connect();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -52,13 +52,16 @@ abstract class InfluxdbCommonWriter : ConfigObject
|
|||
});
|
||||
}}}
|
||||
};
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_send_thresholds {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] bool enable_send_metadata {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] int flush_interval {
|
||||
[config] double flush_interval {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] int flush_threshold {
|
||||
|
|
|
|||
|
|
@ -7,17 +7,12 @@
|
|||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
|
@ -36,6 +31,8 @@ void OpenTsdbWriter::OnConfigLoaded()
|
|||
{
|
||||
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
|
||||
|
||||
m_WorkQueue.SetName("OpenTsdbWriter, " + GetName());
|
||||
|
||||
if (!GetEnableHa()) {
|
||||
Log(LogDebug, "OpenTsdbWriter")
|
||||
<< "HA functionality disabled. Won't pause connection: " << GetName();
|
||||
|
|
@ -51,14 +48,26 @@ void OpenTsdbWriter::OnConfigLoaded()
|
|||
*
|
||||
* @param status Key value pairs for feature stats
|
||||
*/
|
||||
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
|
||||
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
||||
{
|
||||
DictionaryData nodes;
|
||||
|
||||
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
|
||||
nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
|
||||
{ "connected", opentsdbwriter->GetConnected() }
|
||||
}));
|
||||
size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
|
||||
nodes.emplace_back(
|
||||
opentsdbwriter->GetName(),
|
||||
new Dictionary({
|
||||
{ "connected", opentsdbwriter->m_Connection->IsConnected() },
|
||||
{"work_queue_items", workQueueItems},
|
||||
{"work_queue_item_rate", workQueueItemRate}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems));
|
||||
perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
|
||||
}
|
||||
|
||||
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
|
||||
|
|
@ -74,13 +83,14 @@ void OpenTsdbWriter::Resume()
|
|||
Log(LogInformation, "OpentsdbWriter")
|
||||
<< "'" << GetName() << "' resumed.";
|
||||
|
||||
m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) {
|
||||
Log(LogDebug, "OpenTsdbWriter")
|
||||
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
|
||||
});
|
||||
|
||||
ReadConfigTemplate();
|
||||
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
||||
m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
CheckResultHandler(checkable, cr);
|
||||
|
|
@ -93,60 +103,24 @@ void OpenTsdbWriter::Resume()
|
|||
void OpenTsdbWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer->Stop(true);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "OpentsdbWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
ObjectImpl<OpenTsdbWriter>::Pause();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect handler called by the timer.
|
||||
* Handles TLS
|
||||
*/
|
||||
void OpenTsdbWriter::ReconnectTimerHandler()
|
||||
{
|
||||
if (IsPaused())
|
||||
return;
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
Log(LogNotice, "OpenTsdbWriter")
|
||||
<< "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
/*
|
||||
* We're using telnet as input method. Future PRs may change this into using the HTTP API.
|
||||
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
|
||||
*/
|
||||
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
|
||||
try {
|
||||
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "OpenTsdbWriter")
|
||||
<< "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "OpenTsdbWriter")
|
||||
<< "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
/**
|
||||
* Registered check result handler processing data.
|
||||
* Calculates tags from the config.
|
||||
|
|
@ -251,7 +225,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
String escaped_hostName = EscapeTag(host->GetName());
|
||||
tags["host"] = escaped_hostName;
|
||||
|
||||
double ts = cr->GetExecutionEnd();
|
||||
std::vector<std::pair<String, double>> metadata;
|
||||
|
||||
if (service) {
|
||||
|
||||
|
|
@ -262,40 +236,55 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
String escaped_serviceName = EscapeMetric(serviceName);
|
||||
metric = "icinga.service." + escaped_serviceName;
|
||||
}
|
||||
|
||||
SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
|
||||
|
||||
metadata.emplace_back("state", service->GetState());
|
||||
} else {
|
||||
if (!config_tmpl_metric.IsEmpty()) {
|
||||
metric = config_tmpl_metric;
|
||||
} else {
|
||||
metric = "icinga.host";
|
||||
}
|
||||
SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
|
||||
metadata.emplace_back("state", host->GetState());
|
||||
}
|
||||
|
||||
SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
|
||||
SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
|
||||
SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
|
||||
SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
|
||||
metadata.emplace_back("state_type", checkable->GetStateType());
|
||||
metadata.emplace_back("reachable", checkable->IsReachable());
|
||||
metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth());
|
||||
metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement());
|
||||
|
||||
SendPerfdata(checkable, metric, tags, cr, ts);
|
||||
m_WorkQueue.Enqueue(
|
||||
[this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
metric = "icinga.check";
|
||||
double ts = cr->GetExecutionEnd();
|
||||
|
||||
if (service) {
|
||||
tags["type"] = "service";
|
||||
String serviceName = service->GetShortName();
|
||||
String escaped_serviceName = EscapeTag(serviceName);
|
||||
tags["service"] = escaped_serviceName;
|
||||
} else {
|
||||
tags["type"] = "host";
|
||||
}
|
||||
for (auto& [name, val] : metadata) {
|
||||
AddMetric(checkable, metric + "." + name, tags, val, ts);
|
||||
}
|
||||
|
||||
SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
|
||||
SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
|
||||
SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
|
||||
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
|
||||
AddPerfdata(checkable, metric, tags, cr, ts);
|
||||
|
||||
metric = "icinga.check";
|
||||
|
||||
if (service) {
|
||||
tags["type"] = "service";
|
||||
String serviceName = service->GetShortName();
|
||||
String escaped_serviceName = EscapeTag(serviceName);
|
||||
tags["service"] = escaped_serviceName;
|
||||
} else {
|
||||
tags["type"] = "host";
|
||||
}
|
||||
|
||||
AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
|
||||
AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
|
||||
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
|
||||
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
|
||||
|
||||
SendMsgBuffer();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -307,9 +296,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
* @param cr Check result containing performance data
|
||||
* @param ts Timestamp when the check result was received
|
||||
*/
|
||||
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
|
||||
void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
||||
Array::Ptr perfdata = cr->GetPerformanceData();
|
||||
|
||||
if (!perfdata)
|
||||
|
|
@ -350,21 +341,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
|
|||
tags_new["label"] = escaped_key;
|
||||
}
|
||||
|
||||
SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
|
||||
AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
|
||||
|
||||
if (!pdv->GetCrit().IsEmpty())
|
||||
SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
|
||||
AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
|
||||
if (!pdv->GetWarn().IsEmpty())
|
||||
SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
|
||||
AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
|
||||
if (!pdv->GetMin().IsEmpty())
|
||||
SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
|
||||
AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
|
||||
if (!pdv->GetMax().IsEmpty())
|
||||
SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
|
||||
AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send given metric to OpenTSDB
|
||||
* Add given metric to the data buffer to be later sent to OpenTSDB
|
||||
*
|
||||
* @param checkable Host/service object
|
||||
* @param metric Full metric name
|
||||
|
|
@ -372,9 +363,11 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
|
|||
* @param value Floating point metric value
|
||||
* @param ts Timestamp where the metric was received from the check result
|
||||
*/
|
||||
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
|
||||
void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, double value, double ts)
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
||||
String tags_string = "";
|
||||
|
||||
for (auto& tag : tags) {
|
||||
|
|
@ -394,22 +387,21 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
|
|||
|
||||
/* do not send \n to debug log */
|
||||
msgbuf << "\n";
|
||||
String put = msgbuf.str();
|
||||
m_MsgBuf.append(msgbuf.str());
|
||||
}
|
||||
|
||||
ObjectLock olock(this);
|
||||
void OpenTsdbWriter::SendMsgBuffer()
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
Log(LogDebug, "OpenTsdbWriter")
|
||||
<< "Flushing data buffer to OpenTsdb.";
|
||||
|
||||
try {
|
||||
Log(LogDebug, "OpenTsdbWriter")
|
||||
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
|
||||
|
||||
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
|
||||
m_Stream->flush();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "OpenTsdbWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "OpenTsdbWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@
|
|||
#define OPENTSDBWRITER_H
|
||||
|
||||
#include "perfdata/opentsdbwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include <fstream>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -36,24 +34,24 @@ protected:
|
|||
void Pause() override;
|
||||
|
||||
private:
|
||||
Shared<AsioTcpStream>::Ptr m_Stream;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::string m_MsgBuf;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
Dictionary::Ptr m_ServiceConfigTemplate;
|
||||
Dictionary::Ptr m_HostConfigTemplate;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const String& metric,
|
||||
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, double value, double ts);
|
||||
void SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
|
||||
void SendMsgBuffer();
|
||||
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
|
||||
static String EscapeTag(const String& str);
|
||||
static String EscapeMetric(const String& str);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void ReadConfigTemplate();
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -31,10 +31,8 @@ class OpenTsdbWriter : ConfigObject
|
|||
[config] bool enable_generic_metrics {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
|||
209
lib/perfdata/perfdatawriterconnection.cpp
Normal file
209
lib/perfdata/perfdatawriterconnection.cpp
Normal file
|
|
@ -0,0 +1,209 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include <boost/asio/use_future.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <utility>
|
||||
|
||||
using namespace icinga;
|
||||
using HttpResponse = PerfdataWriterConnection::HttpResponse;
|
||||
|
||||
PerfdataWriterConnection::PerfdataWriterConnection(
|
||||
const ConfigObject::Ptr& parent,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext,
|
||||
bool verifyPeerCertificate
|
||||
)
|
||||
: PerfdataWriterConnection(
|
||||
parent->GetReflectionType()->GetName(),
|
||||
parent->GetName(),
|
||||
std::move(host),
|
||||
std::move(port),
|
||||
std::move(sslContext),
|
||||
verifyPeerCertificate
|
||||
) {};
|
||||
|
||||
PerfdataWriterConnection::PerfdataWriterConnection(
|
||||
String logFacility,
|
||||
String parentName,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext,
|
||||
bool verifyPeerCertificate
|
||||
)
|
||||
: m_VerifyPeerCertificate(verifyPeerCertificate),
|
||||
m_SslContext(std::move(sslContext)),
|
||||
m_LogFacility(std::move(logFacility)),
|
||||
m_ParentName(std::move(parentName)),
|
||||
m_Host(std::move(host)),
|
||||
m_Port(std::move(port)),
|
||||
m_ReconnectTimer(IoEngine::Get().GetIoContext()),
|
||||
m_Strand(IoEngine::Get().GetIoContext()),
|
||||
m_Stream(MakeStream())
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state of the connection.
|
||||
*/
|
||||
bool PerfdataWriterConnection::IsConnected() const
|
||||
{
|
||||
return m_Connected;
|
||||
}
|
||||
|
||||
bool PerfdataWriterConnection::IsStopped() const
|
||||
{
|
||||
return m_Stopped;
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::Disconnect()
|
||||
{
|
||||
if (m_Stopped.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::promise<void> promise;
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
try {
|
||||
/* Cancel any outstanding operations of the other coroutine.
|
||||
* Since we're on the same strand we're hopefully guaranteed that all cancellations
|
||||
* result in exceptions thrown by the yield_context, even if its already queued for
|
||||
* completion.
|
||||
*/
|
||||
std::visit(
|
||||
[](const auto& stream) {
|
||||
if (stream->lowest_layer().is_open()) {
|
||||
stream->lowest_layer().cancel();
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
m_ReconnectTimer.cancel();
|
||||
|
||||
Disconnect(std::move(yc));
|
||||
promise.set_value();
|
||||
} catch (const std::exception& ex) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
promise.get_future().get();
|
||||
}
|
||||
|
||||
AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
|
||||
{
|
||||
AsioTlsOrTcpStream ret;
|
||||
if (m_SslContext) {
|
||||
ret = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
|
||||
} else {
|
||||
ret = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the next attempt after an error, using a backoff algorithm.
|
||||
*
|
||||
* The waits between retries are doubled for each failure, up to a maximum of 32s, until it is
|
||||
* reset by a successful attempt.
|
||||
*/
|
||||
void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc)
|
||||
{
|
||||
m_ReconnectTimer.expires_after(m_RetryTimeout);
|
||||
if (m_RetryTimeout <= FinalRetryWait / 2) {
|
||||
m_RetryTimeout *= 2;
|
||||
}
|
||||
m_ReconnectTimer.async_wait(yc);
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc)
|
||||
{
|
||||
if (m_Connected) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
|
||||
|
||||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
using type = boost::asio::ssl::stream_base::handshake_type;
|
||||
|
||||
stream->next_layer().async_handshake(type::client, yc);
|
||||
|
||||
if (m_VerifyPeerCertificate) {
|
||||
if (!stream->next_layer().IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(
|
||||
std::runtime_error{
|
||||
"TLS certificate validation failed: " + stream->next_layer().GetVerifyError()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
m_Connected = true;
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
|
||||
{
|
||||
if (!m_Connected.exchange(false, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
stream->GracefulDisconnect(m_Strand, yc);
|
||||
} else {
|
||||
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both);
|
||||
stream->lowest_layer().close();
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
m_Stream = MakeStream();
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc)
|
||||
{
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
boost::asio::async_write(*stream, buf, yc);
|
||||
stream->async_flush(yc);
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
}
|
||||
|
||||
HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc)
|
||||
{
|
||||
boost::beast::http::response<boost::beast::http::string_body> response;
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
boost::beast::http::request_serializer<boost::beast::http::string_body> sr{request};
|
||||
boost::beast::http::async_write(*stream, sr, yc);
|
||||
stream->async_flush(yc);
|
||||
|
||||
boost::beast::flat_buffer buf;
|
||||
boost::beast::http::async_read(*stream, buf, response, yc);
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
if (!response.keep_alive()) {
|
||||
Disconnect(yc);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
157
lib/perfdata/perfdatawriterconnection.hpp
Normal file
157
lib/perfdata/perfdatawriterconnection.hpp
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <future>
|
||||
|
||||
namespace icinga {
|
||||
|
||||
/**
|
||||
* Class handling the connection to the various Perfdata backends.
|
||||
*/
|
||||
class PerfdataWriterConnection final : public Object
|
||||
{
|
||||
static constexpr auto InitialRetryWait = 50ms;
|
||||
static constexpr auto FinalRetryWait = 32s;
|
||||
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);
|
||||
|
||||
struct Stopped : std::exception
|
||||
{
|
||||
[[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; }
|
||||
};
|
||||
|
||||
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;
|
||||
using HttpResponse = boost::beast::http::response<boost::beast::http::string_body>;
|
||||
|
||||
PerfdataWriterConnection(
|
||||
const ConfigObject::Ptr& parent,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext = nullptr,
|
||||
bool verifyPeerCertificate = true
|
||||
);
|
||||
|
||||
PerfdataWriterConnection(
|
||||
String logFacility,
|
||||
String parentName,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext = nullptr,
|
||||
bool verifyPeerCertificate = true
|
||||
);
|
||||
|
||||
/**
|
||||
* Send the given data buffer to the server.
|
||||
*
|
||||
* To support each Buffer type this function needs an overload of the WriteMessage method.
|
||||
* If the selected WriteMessage functions returns something, Send() will return that result.
|
||||
*
|
||||
* @param buf The buffer to send
|
||||
* @return the return value returned by the WriteMessage overload for Buffer, otherwise void
|
||||
*/
|
||||
template<typename Buffer>
|
||||
auto Send(Buffer&& buf)
|
||||
{
|
||||
if (m_Stopped) {
|
||||
BOOST_THROW_EXCEPTION(Stopped{});
|
||||
}
|
||||
|
||||
using RetType = decltype(WriteMessage(std::declval<Buffer>(), std::declval<boost::asio::yield_context>()));
|
||||
std::promise<RetType> promise;
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
while (true) {
|
||||
try {
|
||||
EnsureConnected(yc);
|
||||
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
WriteMessage(std::forward<Buffer>(buf), yc);
|
||||
promise.set_value();
|
||||
} else {
|
||||
promise.set_value(WriteMessage(std::forward<Buffer>(buf), yc));
|
||||
}
|
||||
|
||||
m_RetryTimeout = InitialRetryWait;
|
||||
return;
|
||||
} catch (const std::exception& ex) {
|
||||
if (m_Stopped) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
|
||||
Log(LogCritical, m_LogFacility)
|
||||
<< "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":"
|
||||
<< m_Port << "' for '" << m_ParentName << "': " << ex.what();
|
||||
|
||||
m_Stream = MakeStream();
|
||||
m_Connected = false;
|
||||
|
||||
try {
|
||||
BackoffWait(yc);
|
||||
} catch (const std::exception&) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise.get_future().get();
|
||||
}
|
||||
|
||||
void Disconnect();
|
||||
|
||||
/**
|
||||
* Cancels ongoing operations either after a timeout or a future became ready.
|
||||
*
|
||||
* This will disconnect and set a flag so that no further Send() requests are accepted.
|
||||
*
|
||||
* @param future The future to wait for
|
||||
* @param timeout The timeout after which ongoing operations are canceled
|
||||
*/
|
||||
template<class Rep, class Period>
|
||||
void CancelAfterTimeout(const std::future<void>& future, const std::chrono::duration<Rep, Period>& timeout)
|
||||
{
|
||||
future.wait_for(timeout);
|
||||
Disconnect();
|
||||
}
|
||||
|
||||
bool IsConnected() const;
|
||||
bool IsStopped() const;
|
||||
|
||||
private:
|
||||
AsioTlsOrTcpStream MakeStream() const;
|
||||
void BackoffWait(const boost::asio::yield_context& yc);
|
||||
void EnsureConnected(const boost::asio::yield_context& yc);
|
||||
void Disconnect(boost::asio::yield_context yc);
|
||||
|
||||
void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc);
|
||||
HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc);
|
||||
|
||||
std::atomic_bool m_Stopped{false};
|
||||
std::atomic_bool m_Connected{false};
|
||||
|
||||
bool m_VerifyPeerCertificate;
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
String m_LogFacility;
|
||||
String m_ParentName;
|
||||
String m_Host;
|
||||
String m_Port;
|
||||
|
||||
std::chrono::milliseconds m_RetryTimeout{InitialRetryWait};
|
||||
boost::asio::steady_timer m_ReconnectTimer;
|
||||
boost::asio::io_context::strand m_Strand;
|
||||
AsioTlsOrTcpStream m_Stream;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
|
|
@ -140,6 +140,18 @@ if(ICINGA2_WITH_NOTIFICATION)
|
|||
)
|
||||
endif()
|
||||
|
||||
if(ICINGA2_WITH_PERFDATA)
|
||||
list(APPEND base_test_SOURCES
|
||||
perfdata-elasticsearchwriter.cpp
|
||||
perfdata-gelfwriter.cpp
|
||||
perfdata-graphitewriter.cpp
|
||||
perfdata-influxdbwriter.cpp
|
||||
perfdata-opentsdbwriter.cpp
|
||||
perfdata-perfdatawriterconnection.cpp
|
||||
$<TARGET_OBJECTS:perfdata>
|
||||
)
|
||||
endif()
|
||||
|
||||
if(ICINGA2_UNITY_BUILD)
|
||||
mkunity_target(base test base_test_SOURCES)
|
||||
endif()
|
||||
|
|
|
|||
|
|
@ -11,6 +11,12 @@
|
|||
#include <boost/test/test_tools.hpp>
|
||||
#include <future>
|
||||
|
||||
#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout))
|
||||
#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout))
|
||||
|
||||
#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout))
|
||||
#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout))
|
||||
|
||||
namespace icinga {
|
||||
|
||||
class TestLogger : public Logger
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
#include "base/defer.hpp"
|
||||
#include "remote/apilistener.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
#include "config/configcompiler.hpp"
|
||||
#include "notification/notificationcomponent.hpp"
|
||||
|
||||
|
|
@ -194,22 +195,7 @@ object NotificationComponent "nc" {}
|
|||
|
||||
void ReceiveCheckResults(std::size_t num, ServiceState state)
|
||||
{
|
||||
StoppableWaitGroup::Ptr wg = new StoppableWaitGroup();
|
||||
|
||||
for (auto i = 0UL; i < num; ++i) {
|
||||
CheckResult::Ptr cr = new CheckResult();
|
||||
|
||||
cr->SetState(state);
|
||||
|
||||
double now = Utility::GetTime();
|
||||
cr->SetActive(false);
|
||||
cr->SetScheduleStart(now);
|
||||
cr->SetScheduleEnd(now);
|
||||
cr->SetExecutionStart(now);
|
||||
cr->SetExecutionEnd(now);
|
||||
|
||||
BOOST_REQUIRE(m_Host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok);
|
||||
}
|
||||
::ReceiveCheckResults(m_Host, num, state);
|
||||
}
|
||||
|
||||
double GetLastNotificationTimestamp() { return m_Notification->GetLastNotification(); }
|
||||
|
|
|
|||
57
test/perfdata-elasticsearchwriter.cpp
Normal file
57
test/perfdata-elasticsearchwriter.cpp
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "perfdata/elasticsearchwriter.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatawriterfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_elasticsearchwriter, PerfdataWriterFixture<ElasticsearchWriter>,
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
ReceiveCheckResults(1, ServiceState::ServiceCritical);
|
||||
|
||||
Accept();
|
||||
auto resp = GetSplitDecodedRequestBody();
|
||||
SendResponse();
|
||||
|
||||
// ElasticsearchWriter wants to send the same message twice, once for the check result
|
||||
// and once for the "state change".
|
||||
resp = GetSplitDecodedRequestBody();
|
||||
SendResponse();
|
||||
|
||||
// Just some basic sanity tests. It's not important to check if everything is entirely
|
||||
// correct here.
|
||||
BOOST_REQUIRE_GT(resp->GetLength(), 1);
|
||||
Dictionary::Ptr cr = resp->Get(1);
|
||||
BOOST_CHECK(cr->Contains("@timestamp"));
|
||||
BOOST_CHECK_EQUAL(cr->Get("check_command"), "dummy");
|
||||
BOOST_CHECK_EQUAL(cr->Get("host"), "h1");
|
||||
|
||||
PauseWriter();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
// Process check-results until the writer is stuck.
|
||||
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
|
||||
|
||||
// Now try to pause.
|
||||
PauseWriter();
|
||||
|
||||
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
|
||||
REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
48
test/perfdata-gelfwriter.cpp
Normal file
48
test/perfdata-gelfwriter.cpp
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "perfdata/gelfwriter.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatawriterfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_gelfwriter, PerfdataWriterFixture<GelfWriter>,
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
ReceiveCheckResults(1, ServiceState::ServiceCritical);
|
||||
|
||||
Accept();
|
||||
Dictionary::Ptr resp = JsonDecode(GetDataUntil('\0'));
|
||||
|
||||
// Just some basic sanity tests. It's not important to check if everything is entirely
|
||||
// correct here.
|
||||
BOOST_CHECK_CLOSE(resp->Get("timestamp").Get<double>(), Utility::GetTime(), 0.5);
|
||||
BOOST_CHECK_EQUAL(resp->Get("_check_command"), "dummy");
|
||||
BOOST_CHECK_EQUAL(resp->Get("_hostname"), "h1");
|
||||
PauseWriter();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
// Process check-results until the writer is stuck.
|
||||
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
|
||||
|
||||
// Now stop reading and try to pause OpenTsdbWriter.
|
||||
PauseWriter();
|
||||
|
||||
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 1s);
|
||||
REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
47
test/perfdata-graphitewriter.cpp
Normal file
47
test/perfdata-graphitewriter.cpp
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "perfdata/graphitewriter.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatawriterfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_graphitewriter, PerfdataWriterFixture<GraphiteWriter>,
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
ReceiveCheckResults(1, ServiceState::ServiceCritical);
|
||||
|
||||
Accept();
|
||||
auto msg = GetDataUntil('\n');
|
||||
|
||||
// Just some basic sanity tests. It's not important to check if everything is entirely correct here.
|
||||
std::string_view cmpStr{"icinga2.h1.host.dummy.perfdata.dummy.value 42"};
|
||||
BOOST_REQUIRE_EQUAL(msg.substr(0, cmpStr.length()), cmpStr);
|
||||
PauseWriter();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
// Process check-results until the writer is stuck.
|
||||
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
|
||||
|
||||
// Now stop reading and try to pause OpenTsdbWriter.
|
||||
PauseWriter();
|
||||
|
||||
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
|
||||
REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
50
test/perfdata-influxdbwriter.cpp
Normal file
50
test/perfdata-influxdbwriter.cpp
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "perfdata/influxdb2writer.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatawriterfixture.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_influxdbwriter, PerfdataWriterFixture<Influxdb2Writer>,
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
ReceiveCheckResults(1, ServiceState::ServiceCritical);
|
||||
|
||||
Accept();
|
||||
auto req = GetSplitRequestBody(',');
|
||||
SendResponse(boost::beast::http::status::no_content);
|
||||
|
||||
// Just some basic sanity tests. It's not important to check if everything is entirely
|
||||
// correct here.
|
||||
BOOST_REQUIRE_EQUAL(req.size(), 3);
|
||||
BOOST_CHECK_EQUAL(req[0], "dummy");
|
||||
BOOST_CHECK_EQUAL(req[1], "hostname=h1");
|
||||
std::string_view perfData = "metric=dummy value=42";
|
||||
BOOST_CHECK_EQUAL(req[2].substr(0, perfData.length()), perfData);
|
||||
PauseWriter();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
// Process check-results until the writer is stuck.
|
||||
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
|
||||
|
||||
// Now try to pause.
|
||||
PauseWriter();
|
||||
|
||||
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
|
||||
REQUIRE_LOG_MESSAGE("'Influxdb2Writer' paused\\.", 1s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
53
test/perfdata-opentsdbwriter.cpp
Normal file
53
test/perfdata-opentsdbwriter.cpp
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "perfdata/opentsdbwriter.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatawriterfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_opentsdbwriter, PerfdataWriterFixture<OpenTsdbWriter>,
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(connect)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
ReceiveCheckResults(1, ServiceState::ServiceCritical);
|
||||
|
||||
Accept();
|
||||
auto msg = GetDataUntil('\n');
|
||||
std::vector<std::string> splitMsg;
|
||||
boost::split(splitMsg, msg, boost::is_any_of(" "));
|
||||
|
||||
// Just some basic sanity tests. It's not important to check if everything is entirely correct here.
|
||||
BOOST_REQUIRE_EQUAL(splitMsg.size(), 5);
|
||||
BOOST_REQUIRE_EQUAL(splitMsg[0], "put");
|
||||
BOOST_REQUIRE_EQUAL(splitMsg[1], "icinga.host.state");
|
||||
BOOST_REQUIRE_CLOSE(boost::lexical_cast<double>(splitMsg[2]), Utility::GetTime(), 1);
|
||||
BOOST_REQUIRE_EQUAL(splitMsg[3], "1");
|
||||
BOOST_REQUIRE_EQUAL(splitMsg[4], "host=h1");
|
||||
PauseWriter();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
|
||||
{
|
||||
ResumeWriter();
|
||||
|
||||
// Process check-results until the writer is stuck.
|
||||
BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck.");
|
||||
|
||||
// Now stop reading and try to pause OpenTsdbWriter.
|
||||
PauseWriter();
|
||||
|
||||
REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s);
|
||||
REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
198
test/perfdata-perfdatatargetfixture.hpp
Normal file
198
test/perfdata-perfdatatargetfixture.hpp
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/asio/read_until.hpp>
|
||||
#include <boost/asio/streambuf.hpp>
|
||||
#include <boost/asio/use_future.hpp>
|
||||
#include <boost/beast/http.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/parser.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
|
||||
namespace icinga {
|
||||
|
||||
/**
|
||||
* A fixture that provides methods to simulate a perfdata target
|
||||
*/
|
||||
class PerfdataWriterTargetFixture
|
||||
{
|
||||
public:
|
||||
PerfdataWriterTargetFixture()
|
||||
: icinga::PerfdataWriterTargetFixture(Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()))
|
||||
{
|
||||
}
|
||||
|
||||
explicit PerfdataWriterTargetFixture(const Shared<boost::asio::ssl::context>::Ptr& sslCtx)
|
||||
: icinga::PerfdataWriterTargetFixture(Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslCtx))
|
||||
{
|
||||
m_SslContext = sslCtx;
|
||||
}
|
||||
|
||||
explicit PerfdataWriterTargetFixture(AsioTlsOrTcpStream stream)
|
||||
: m_Stream(std::move(stream)),
|
||||
m_Acceptor(
|
||||
IoEngine::Get().GetIoContext(),
|
||||
boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::loopback(), 0}
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
unsigned short GetPort() { return m_Acceptor.local_endpoint().port(); }
|
||||
|
||||
void Accept()
|
||||
{
|
||||
BOOST_REQUIRE_NO_THROW(
|
||||
std::visit([&](auto& stream) { return m_Acceptor.accept(stream->lowest_layer()); }, m_Stream)
|
||||
);
|
||||
}
|
||||
|
||||
void Handshake()
|
||||
{
|
||||
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
|
||||
using handshake_type = UnbufferedAsioTlsStream::handshake_type;
|
||||
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
|
||||
BOOST_REQUIRE_NO_THROW(stream->next_layer().handshake(handshake_type::server));
|
||||
BOOST_REQUIRE(stream->next_layer().IsVerifyOK());
|
||||
}
|
||||
|
||||
void Shutdown()
|
||||
{
|
||||
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
|
||||
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
|
||||
try {
|
||||
stream->next_layer().shutdown();
|
||||
} catch (const std::exception& ex) {
|
||||
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
|
||||
!se || se->code() != boost::asio::error::eof) {
|
||||
BOOST_FAIL("Exception in shutdown(): " << ex.what());
|
||||
}
|
||||
}
|
||||
|
||||
ResetStream();
|
||||
}
|
||||
|
||||
void ResetStream()
|
||||
{
|
||||
if (std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream)) {
|
||||
m_Stream = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
|
||||
} else {
|
||||
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the HTTP request body from the stream, with an optional limit on the number of bytes to read.
|
||||
*
|
||||
* @param bytes The maximum number of bytes to read from the request body. If 0, there is no limit and the entire body will be read.
|
||||
*
|
||||
* @return The HTTP request read from the stream.
|
||||
*/
|
||||
boost::beast::http::request<boost::beast::http::string_body> GetRequest(std::size_t bytes = 0)
|
||||
{
|
||||
using namespace boost::beast;
|
||||
|
||||
boost::beast::flat_buffer buf;
|
||||
if (bytes > 0) {
|
||||
buf = boost::beast::flat_buffer{bytes};
|
||||
}
|
||||
boost::system::error_code ec;
|
||||
http::request_parser<http::string_body> parser;
|
||||
parser.body_limit(-1);
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
http::read(*stream, buf, parser, ec);
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
if (bytes > 0) {
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
!ec || ec == http::error::buffer_overflow,
|
||||
"Reading request body with a buffer limit of '" << bytes <<
|
||||
"' should either succeed or fail with a buffer_overflow error, but got: " << ec.message()
|
||||
);
|
||||
} else {
|
||||
BOOST_REQUIRE_MESSAGE(!ec, "Error while reading request body: " << ec.message());
|
||||
BOOST_REQUIRE_MESSAGE(parser.is_done(), "Parser did not finish reading the request, but no error was set.");
|
||||
}
|
||||
return parser.release();
|
||||
}
|
||||
|
||||
auto GetSplitRequestBody(char delim)
|
||||
{
|
||||
auto request = GetRequest();
|
||||
std::vector<std::string> result{};
|
||||
boost::split(result, request.body(), boost::is_any_of(std::string{delim}));
|
||||
return result;
|
||||
}
|
||||
|
||||
auto GetSplitDecodedRequestBody()
|
||||
{
|
||||
Array::Ptr result = new Array;
|
||||
for (const auto& line : GetSplitRequestBody('\n')) {
|
||||
if (!line.empty()) {
|
||||
result->Add(JsonDecode(line));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
std::string GetDataUntil(T&& delim)
|
||||
{
|
||||
using namespace boost::asio::ip;
|
||||
|
||||
std::size_t delimLength{1};
|
||||
if constexpr (!std::is_same_v<std::decay_t<T>, char>) {
|
||||
delimLength = std::string_view{delim}.size();
|
||||
}
|
||||
|
||||
boost::asio::streambuf buf;
|
||||
boost::system::error_code ec;
|
||||
auto bytesRead = std::visit(
|
||||
[&](auto& stream) { return boost::asio::read_until(*stream, buf, std::forward<T>(delim), ec); }, m_Stream
|
||||
);
|
||||
BOOST_REQUIRE_MESSAGE(!ec, ec.message());
|
||||
|
||||
std::string ret{
|
||||
boost::asio::buffers_begin(buf.data()), boost::asio::buffers_begin(buf.data()) + bytesRead - delimLength
|
||||
};
|
||||
buf.consume(bytesRead);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void SendResponse(boost::beast::http::status status = boost::beast::http::status::ok)
|
||||
{
|
||||
using namespace boost::asio::ip;
|
||||
using namespace boost::beast;
|
||||
|
||||
boost::system::error_code ec;
|
||||
http::response<boost::beast::http::empty_body> response;
|
||||
response.result(status);
|
||||
response.prepare_payload();
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
http::write(*stream, response, ec);
|
||||
BOOST_REQUIRE_MESSAGE(!ec, ec.message());
|
||||
stream->flush(ec);
|
||||
BOOST_REQUIRE_MESSAGE(!ec, ec.message());
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
}
|
||||
|
||||
private:
|
||||
AsioTlsOrTcpStream m_Stream;
|
||||
boost::asio::ip::tcp::acceptor m_Acceptor;
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
335
test/perfdata-perfdatawriterconnection.cpp
Normal file
335
test/perfdata-perfdatawriterconnection.cpp
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "test/perfdata-perfdatatargetfixture.hpp"
|
||||
#include "test/remote-certificate-fixture.hpp"
|
||||
#include "test/test-ctest.hpp"
|
||||
#include "test/test-thread.hpp"
|
||||
#include "test/utils.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
class TlsPerfdataWriterFixture : public CertificateFixture, public PerfdataWriterTargetFixture
|
||||
{
|
||||
public:
|
||||
TlsPerfdataWriterFixture() : PerfdataWriterTargetFixture(MakeContext("server"))
|
||||
{
|
||||
m_PdwSslContext = MakeContext("client");
|
||||
|
||||
m_Conn = new PerfdataWriterConnection{"Test", "test", "127.0.0.1", std::to_string(GetPort()), m_PdwSslContext};
|
||||
}
|
||||
|
||||
auto& GetConnection() { return *m_Conn; }
|
||||
|
||||
private:
|
||||
Shared<boost::asio::ssl::context>::Ptr MakeContext(const std::string& name)
|
||||
{
|
||||
auto testCert = EnsureCertFor(name);
|
||||
return SetupSslContext(
|
||||
testCert.crtFile,
|
||||
testCert.keyFile,
|
||||
m_CaCrtFile.string(),
|
||||
"",
|
||||
DEFAULT_TLS_CIPHERS,
|
||||
DEFAULT_TLS_PROTOCOLMIN,
|
||||
DebugInfo()
|
||||
);
|
||||
}
|
||||
|
||||
Shared<boost::asio::ssl::context>::Ptr m_PdwSslContext;
|
||||
PerfdataWriterConnection::Ptr m_Conn;
|
||||
};
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(perfdata_connection, TlsPerfdataWriterFixture,
|
||||
*CTestProperties("FIXTURES_REQUIRED ssl_certs")
|
||||
*boost::unit_test::label("perfdata")
|
||||
*boost::unit_test::label("network")
|
||||
)
|
||||
|
||||
/* If there is no acceptor listening on the other side, connecting should fail.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(connection_refused)
|
||||
{
|
||||
std::promise<void> p;
|
||||
TestThread timeoutThread{[&]() {
|
||||
auto f = p.get_future();
|
||||
GetConnection().CancelAfterTimeout(f, 50ms);
|
||||
}};
|
||||
|
||||
BOOST_REQUIRE_THROW(
|
||||
GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped
|
||||
);
|
||||
|
||||
REQUIRE_JOINS_WITHIN(timeoutThread, 1s);
|
||||
}
|
||||
|
||||
/* The PerfdataWriterConnection connects automatically when sending the first data.
|
||||
* In case of http we also need to support disconnecting and reconnecting.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(ensure_connected)
|
||||
{
|
||||
std::promise<void> disconnectedPromise;
|
||||
|
||||
TestThread mockTargetThread{[&]() {
|
||||
Accept();
|
||||
Handshake();
|
||||
auto ret = GetDataUntil('\0');
|
||||
Shutdown();
|
||||
disconnectedPromise.get_future().get();
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
}};
|
||||
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{"foobar", 7}));
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
|
||||
disconnectedPromise.set_value();
|
||||
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* Verify that data can still be sent while CancelAfterTimeout is waiting and the timeout
|
||||
* can be aborted when all data has been sent successfully.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(finish_during_timeout)
|
||||
{
|
||||
std::promise<void> p;
|
||||
|
||||
TestThread mockTargetThread{[&]() {
|
||||
Accept();
|
||||
Handshake();
|
||||
auto ret = GetDataUntil('\0');
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
ret = GetDataUntil('\0');
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
// This is done here instead of the main thread after send, because we need to
|
||||
// synchronize the asserts done in the timeoutThread after this point.
|
||||
p.set_value();
|
||||
Shutdown();
|
||||
}};
|
||||
|
||||
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
|
||||
|
||||
TestThread timeoutThread{[&]() {
|
||||
auto f = p.get_future();
|
||||
GetConnection().CancelAfterTimeout(f, 50ms);
|
||||
BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::ready);
|
||||
BOOST_REQUIRE(!GetConnection().IsConnected());
|
||||
}};
|
||||
|
||||
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
|
||||
|
||||
REQUIRE_JOINS_WITHIN(timeoutThread, 1s);
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* For the client, even a hanging server will accept the connection immediately, since it's done
|
||||
* in the kernel. But in that case the TLS handshake will be stuck, so we need to verify that a
|
||||
* handshake can be interrupted by CancelAfterTimeout().
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(stuck_in_handshake)
|
||||
{
|
||||
TestThread mockTargetThread{[&]() { Accept(); }};
|
||||
|
||||
std::promise<void> p;
|
||||
TestThread timeoutThread{[&]() {
|
||||
auto f = p.get_future();
|
||||
GetConnection().CancelAfterTimeout(f, 50ms);
|
||||
BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::timeout);
|
||||
}};
|
||||
|
||||
BOOST_REQUIRE_THROW(
|
||||
GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped
|
||||
);
|
||||
|
||||
REQUIRE_JOINS_WITHIN(timeoutThread, 1s);
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* When the disconnect timeout runs out while sending something to a slow or blocking server, we
|
||||
* expect the send to be aborted after a timeout with an 'operation cancelled' exception, in
|
||||
* order to not delay the shutdown of a perfdata writer indefinitely.
|
||||
* No orderly TLS shutdown can be performed in this case, because the stream has been truncated.
|
||||
* The server will need to handle this one on their own.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(stuck_sending)
|
||||
{
|
||||
std::promise<void> shutdownPromise;
|
||||
std::promise<void> dataReadPromise;
|
||||
|
||||
TestThread mockTargetThread{[&]() {
|
||||
Accept();
|
||||
Handshake();
|
||||
auto ret = GetDataUntil("#");
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
dataReadPromise.set_value();
|
||||
|
||||
// There's still a full buffer waiting to be read, but we're pretending to be dead and
|
||||
// close the socket at this point.
|
||||
shutdownPromise.get_future().get();
|
||||
ResetStream();
|
||||
}};
|
||||
|
||||
TestThread timeoutThread{[&]() {
|
||||
// Synchronize with when mockTargetThread has read the initial data.
|
||||
// This should especially help with timing on slow machines like the ARM GHA runners.
|
||||
dataReadPromise.get_future().get();
|
||||
BOOST_REQUIRE(GetConnection().IsConnected());
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
|
||||
BOOST_REQUIRE(!GetConnection().IsConnected());
|
||||
}};
|
||||
|
||||
// Allocate a large string that will fill the buffers on both sides of the connection, in
|
||||
// order to make Send() block.
|
||||
auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024);
|
||||
auto buf = boost::asio::const_buffer{randomData.data(), randomData.size()};
|
||||
BOOST_REQUIRE_THROW(GetConnection().Send(buf), PerfdataWriterConnection::Stopped);
|
||||
shutdownPromise.set_value();
|
||||
|
||||
REQUIRE_JOINS_WITHIN(timeoutThread, 1s);
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* This simulates a server that is stuck after receiving a HTTP request and before sending their
|
||||
* response. Here, the simulated server is polite and still responds to a shutdown request, but
|
||||
* in reality a server might not even do that. That case should be handled by our
|
||||
* AsioTlsStream::GracefulDisconnect() function with an additional 10s timeout.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(stuck_reading_response)
|
||||
{
|
||||
std::promise<void> shutdownPromise;
|
||||
std::promise<void> requestReadPromise;
|
||||
|
||||
TestThread mockTargetThread{[&]() {
|
||||
Accept();
|
||||
Handshake();
|
||||
auto ret = GetRequest();
|
||||
BOOST_REQUIRE_EQUAL(ret.body(), "bar");
|
||||
requestReadPromise.set_value();
|
||||
// Do not send a response but react to the shutdown to be polite.
|
||||
shutdownPromise.get_future().get();
|
||||
Shutdown();
|
||||
}};
|
||||
|
||||
TestThread timeoutThread{[&]() {
|
||||
// Synchronize with after mockTargetThread has read the request
|
||||
requestReadPromise.get_future().get();
|
||||
BOOST_REQUIRE(GetConnection().IsConnected());
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
|
||||
BOOST_REQUIRE(!GetConnection().IsConnected());
|
||||
}};
|
||||
|
||||
boost::beast::http::request<boost::beast::http::string_body> request;
|
||||
request.body() = "bar";
|
||||
request.method(boost::beast::http::verb::get);
|
||||
request.target("foo");
|
||||
request.prepare_payload();
|
||||
BOOST_REQUIRE_THROW(GetConnection().Send(request), PerfdataWriterConnection::Stopped);
|
||||
shutdownPromise.set_value();
|
||||
|
||||
REQUIRE_JOINS_WITHIN(timeoutThread, 1s);
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* This test simulates a server that closes the connection and reappears at a later time.
|
||||
* PerfdataWriterConnection should detect the disconnect, catch the exception and attempt to
|
||||
* reconnect without exiting Send().
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(reconnect_failed)
|
||||
{
|
||||
TestThread mockTargetThread{[&]() {
|
||||
Accept();
|
||||
Handshake();
|
||||
auto ret = GetDataUntil("#");
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
|
||||
ResetStream();
|
||||
|
||||
Accept();
|
||||
Handshake();
|
||||
|
||||
ret = GetDataUntil("#");
|
||||
BOOST_REQUIRE_EQUAL(ret, "foobar");
|
||||
ret = GetDataUntil("\n");
|
||||
|
||||
Shutdown();
|
||||
}};
|
||||
|
||||
// Allocate a large string that will fill the buffers on both sides of the connection, in
|
||||
// order to make Send() block.
|
||||
auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024);
|
||||
randomData.push_back('\n');
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{randomData.data(), randomData.size()}));
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
|
||||
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
/* This tests if retrying an http send will reproducibly lead to the exact same message being
|
||||
* received. Normally this us guaranteed by the interface only accepting a const reference, but
|
||||
* since on older boost versions the async_write() functions also accept non-const references, it
|
||||
* doesn't hurt to ensure this with a test-case.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(http_send_retry)
|
||||
{
|
||||
TestThread mockTargetThread{[&] {
|
||||
Accept();
|
||||
Handshake();
|
||||
|
||||
/* Read only the first 512 bytes of the request body, since we don't want to unblock the client yet.
|
||||
*/
|
||||
auto request = GetRequest(512);
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
request.method() == boost::beast::http::verb::post,
|
||||
"Request method is not POST: " << request.method_string()
|
||||
);
|
||||
BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target());
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
request.body().compare(0, 7, "foobar#") == 0,
|
||||
"Request body does not start with 'foobar#': " << request.body().substr(0, 7)
|
||||
);
|
||||
|
||||
ResetStream();
|
||||
Accept();
|
||||
Handshake();
|
||||
|
||||
/* Read the entire response now and verify that we still get the expected body,
|
||||
* even though the first read was only partial.
|
||||
*/
|
||||
request = GetRequest();
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
request.method() == boost::beast::http::verb::post,
|
||||
"Request method is not POST: " << request.method_string()
|
||||
);
|
||||
BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target());
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
request.body().compare(0, 7, "foobar#") == 0,
|
||||
"Request body does not start with 'foobar#': " << request.body().substr(0, 7)
|
||||
);
|
||||
|
||||
/* The body size is 4MB + 7 bytes (7 bytes for the "foobar#" prefix of the generated message)
|
||||
*/
|
||||
BOOST_REQUIRE_MESSAGE(
|
||||
request.body().size() == (4UL * 1024 * 1024) + 7,
|
||||
"Request body is not the expected size: " << request.body().size()
|
||||
);
|
||||
|
||||
SendResponse();
|
||||
|
||||
Shutdown();
|
||||
}};
|
||||
|
||||
boost::beast::http::request<boost::beast::http::string_body> request{boost::beast::http::verb::post, "foo", 10};
|
||||
request.set(boost::beast::http::field::host, "localhost:" + std::to_string(GetPort()));
|
||||
|
||||
/* Allocate a large string that will fill the buffers on both sides of the connection, in
|
||||
* order to make Send() block.
|
||||
*/
|
||||
request.body() = GetRandomString("foobar#", 4UL * 1024 * 1024);
|
||||
request.prepare_payload();
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Send(request));
|
||||
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
|
||||
|
||||
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
139
test/perfdata-perfdatawriterfixture.hpp
Normal file
139
test/perfdata-perfdatawriterfixture.hpp
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <BoostTestTargetConfig.h>
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "config/configcompiler.hpp"
|
||||
#include "config/configitem.hpp"
|
||||
#include "icinga/host.hpp"
|
||||
#include "test/base-testloggerfixture.hpp"
|
||||
#include "test/perfdata-perfdatatargetfixture.hpp"
|
||||
#include "test/utils.hpp"
|
||||
#include <boost/hana.hpp>
|
||||
|
||||
namespace icinga {
|
||||
|
||||
template<typename Writer>
|
||||
class PerfdataWriterFixture : public PerfdataWriterTargetFixture, public TestLoggerFixture
|
||||
{
|
||||
public:
|
||||
PerfdataWriterFixture() : m_Writer(new Writer)
|
||||
{
|
||||
auto createObjects = [&]() {
|
||||
String config = R"CONFIG(
|
||||
object CheckCommand "dummy" {
|
||||
command = "/bin/echo"
|
||||
}
|
||||
object Host "h1" {
|
||||
address = "h1"
|
||||
check_command = "dummy"
|
||||
enable_notifications = true
|
||||
enable_active_checks = false
|
||||
enable_passive_checks = true
|
||||
}
|
||||
)CONFIG";
|
||||
|
||||
std::unique_ptr<Expression> expr = ConfigCompiler::CompileText("<test>", config);
|
||||
expr->Evaluate(*ScriptFrame::GetCurrentFrame());
|
||||
};
|
||||
|
||||
ConfigItem::RunWithActivationContext(new Function("CreateTestObjects", createObjects));
|
||||
|
||||
m_Host = Host::GetByName("h1");
|
||||
BOOST_REQUIRE(m_Host);
|
||||
|
||||
m_Writer->SetPort(std::to_string(GetPort()));
|
||||
m_Writer->SetName(m_Writer->GetReflectionType()->GetName());
|
||||
m_Writer->SetDisconnectTimeout(0.05);
|
||||
m_Writer->Register();
|
||||
|
||||
auto hasFlushInterval = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushInterval(0.05)) {});
|
||||
if constexpr (decltype(hasFlushInterval(std::declval<Writer>()))::value) {
|
||||
m_Writer->SetFlushInterval(0.05);
|
||||
}
|
||||
|
||||
auto hasFlushThreshold = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushThreshold(1)) {});
|
||||
if constexpr (decltype(hasFlushThreshold(std::declval<Writer>()))::value) {
|
||||
m_Writer->SetFlushThreshold(1);
|
||||
}
|
||||
}
|
||||
|
||||
void ReceiveCheckResults(
|
||||
std::size_t num,
|
||||
ServiceState state,
|
||||
const std::function<void(const CheckResult::Ptr&)>& fn = {}
|
||||
)
|
||||
{
|
||||
::ReceiveCheckResults(m_Host, num, state, fn);
|
||||
}
|
||||
|
||||
std::size_t GetWorkQueueLength()
|
||||
{
|
||||
Array::Ptr dummy = new Array;
|
||||
Dictionary::Ptr status = new Dictionary;
|
||||
m_Writer->StatsFunc(status, dummy);
|
||||
ObjectLock lock{status};
|
||||
// Unpack the single-key top-level dictionary
|
||||
Dictionary::Ptr writer = status->Begin()->second;
|
||||
BOOST_REQUIRE(writer);
|
||||
Dictionary::Ptr values = writer->Get(m_Writer->GetName());
|
||||
BOOST_REQUIRE(values);
|
||||
BOOST_REQUIRE(values->Contains("work_queue_items"));
|
||||
return values->Get("work_queue_items");
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes check results until the writer's work queue is no longer moving.
|
||||
*
|
||||
* @param timeout Time after which to give up trying to get the writer stuck
|
||||
* @return true if the writer is now stuck
|
||||
*/
|
||||
bool GetWriterStuck(std::chrono::milliseconds timeout)
|
||||
{
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
std::size_t unchangedCount = 0;
|
||||
while(true){
|
||||
ReceiveCheckResults(10, ServiceCritical, [&](const CheckResult::Ptr& cr) {
|
||||
cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("", 4096), 1});
|
||||
});
|
||||
|
||||
if (std::chrono::steady_clock::now() - start >= timeout) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto numWq = GetWorkQueueLength();
|
||||
if (numWq >= 10) {
|
||||
std::this_thread::sleep_for(1ms);
|
||||
if (numWq == GetWorkQueueLength()) {
|
||||
if (unchangedCount < 5) {
|
||||
++unchangedCount;
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
unchangedCount = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ResumeWriter()
|
||||
{
|
||||
static_cast<ConfigObject::Ptr>(m_Writer)->OnConfigLoaded();
|
||||
m_Writer->SetActive(true);
|
||||
m_Writer->Activate();
|
||||
BOOST_REQUIRE(!m_Writer->IsPaused());
|
||||
}
|
||||
|
||||
void PauseWriter() { static_cast<ConfigObject::Ptr>(m_Writer)->Pause(); }
|
||||
|
||||
auto GetWriter() { return m_Writer; }
|
||||
|
||||
private:
|
||||
Host::Ptr m_Host;
|
||||
typename Writer::Ptr m_Writer;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
60
test/test-thread.hpp
Normal file
60
test/test-thread.hpp
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
#define REQUIRE_JOINS_WITHIN(t, timeout) \
|
||||
BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.")
|
||||
#define CHECK_JOINS_WITHIN(t, timeout) \
|
||||
BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.")
|
||||
#define TEST_JOINS_WITHIN(t, timeout) \
|
||||
BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.")
|
||||
|
||||
#define REQUIRE_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
|
||||
#define CHECK_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
|
||||
#define TEST_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
|
||||
|
||||
namespace icinga {
|
||||
|
||||
class TestThread
|
||||
{
|
||||
public:
|
||||
explicit TestThread(std::function<void()> fn) : TestThread(std::move(fn), std::promise<void>{}) {}
|
||||
|
||||
bool Joinable()
|
||||
{
|
||||
auto status = m_JoinFuture.wait_for(std::chrono::milliseconds{0});
|
||||
return status == std::future_status::ready;
|
||||
}
|
||||
|
||||
template<class Rep, class Period>
|
||||
bool TryJoinWithin(std::chrono::duration<Rep, Period> timeout)
|
||||
{
|
||||
auto status = m_JoinFuture.wait_for(timeout);
|
||||
if (status == std::future_status::ready) {
|
||||
m_Thread.join();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
explicit TestThread(std::function<void()> fn, std::promise<void> joinPromise)
|
||||
: m_JoinFuture(joinPromise.get_future()),
|
||||
m_Thread([fn = std::move(fn), jp = std::move(joinPromise)]() mutable {
|
||||
fn();
|
||||
jp.set_value();
|
||||
})
|
||||
{
|
||||
}
|
||||
|
||||
std::future<void> m_JoinFuture;
|
||||
std::thread m_Thread;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
|
|
@ -2,8 +2,10 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "utils.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include <cstring>
|
||||
#include <iomanip>
|
||||
#include <random>
|
||||
#include <sstream>
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
|
|
@ -66,3 +68,58 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture()
|
|||
#endif
|
||||
tzset();
|
||||
}
|
||||
|
||||
std::string GetRandomString(std::string prefix, std::size_t length)
|
||||
{
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<int> distribution('!', '~');
|
||||
|
||||
for (auto i = 0U; i < length; i++) {
|
||||
prefix += static_cast<char>(distribution(gen));
|
||||
}
|
||||
|
||||
return prefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make our test host receive a number of check-results.
|
||||
*
|
||||
* @param num The number of check-results to receive
|
||||
* @param state The state the check results should have
|
||||
* @param fn A function that will be passed the current check-result
|
||||
*/
|
||||
void ReceiveCheckResults(
|
||||
const icinga::Checkable::Ptr& host,
|
||||
std::size_t num,
|
||||
icinga::ServiceState state,
|
||||
const std::function<void(const icinga::CheckResult::Ptr&)>& fn
|
||||
)
|
||||
{
|
||||
using namespace icinga;
|
||||
|
||||
StoppableWaitGroup::Ptr wg = new StoppableWaitGroup();
|
||||
|
||||
for (auto i = 0UL; i < num; ++i) {
|
||||
CheckResult::Ptr cr = new CheckResult();
|
||||
|
||||
cr->SetState(state);
|
||||
|
||||
double now = Utility::GetTime();
|
||||
cr->SetActive(false);
|
||||
cr->SetScheduleStart(now);
|
||||
cr->SetScheduleEnd(now);
|
||||
cr->SetExecutionStart(now);
|
||||
cr->SetExecutionEnd(now);
|
||||
|
||||
Array::Ptr perfData = new Array;
|
||||
perfData->Add(new PerfdataValue{"dummy", 42});
|
||||
cr->SetPerformanceData(perfData);
|
||||
|
||||
if (fn) {
|
||||
fn(cr);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "icinga/host.hpp"
|
||||
#include <ctime>
|
||||
#include <functional>
|
||||
#include <string>
|
||||
|
||||
tm make_tm(std::string s);
|
||||
|
|
@ -24,3 +26,12 @@ struct GlobalTimezoneFixture
|
|||
|
||||
char *tz;
|
||||
};
|
||||
|
||||
std::string GetRandomString(std::string prefix, std::size_t length);
|
||||
|
||||
void ReceiveCheckResults(
|
||||
const icinga::Checkable::Ptr& host,
|
||||
std::size_t num,
|
||||
icinga::ServiceState state,
|
||||
const std::function<void(const icinga::CheckResult::Ptr&)>& fn = {}
|
||||
);
|
||||
|
|
|
|||
Loading…
Reference in a new issue