Use PerfdataWriterConnection in ElasticsearchDatastreamWriter

This commit is contained in:
Johannes Schmidt 2026-02-02 15:45:39 +01:00
parent 0cf6669321
commit 310c1d3c26
3 changed files with 42 additions and 95 deletions

View file

@ -20,18 +20,14 @@
#include "base/application.hpp"
#include "base/base64.hpp"
#include "base/defer.hpp"
#include "base/dictionary.hpp"
#include "base/exception.hpp"
#include "base/io-engine.hpp"
#include "base/json.hpp"
#include "base/logger.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
#include "base/stream.hpp"
#include "base/string.hpp"
#include "base/tcpsocket.hpp"
#include "base/tlsstream.hpp"
#include "base/utility.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/macroprocessor.hpp"
@ -111,6 +107,19 @@ void ElasticsearchDatastreamWriter::Resume()
});
m_FlushTimer->Start();
Shared<boost::asio::ssl::context>::Ptr sslContext;
if (GetEnableTls()) {
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to create SSL context.";
throw;
}
}
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), sslContext};
/* Register for new metrics. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(
[this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@ -126,8 +135,15 @@ void ElasticsearchDatastreamWriter::Pause()
m_FlushTimer->Stop(true);
m_Paused = true;
m_WorkQueue.Enqueue([this]() { Flush(); });
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
m_WorkQueue.Join();
m_Connection->Disconnect();
Log(LogInformation, "ElasticsearchDatastreamWriter")
<< "'" << GetName() << "' paused.";
@ -191,6 +207,11 @@ void ElasticsearchDatastreamWriter::ManageIndexTemplate()
continue;
}
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::system::errc::operation_canceled) {
Log(LogDebug, "ElasticsearchDatastreamWriter") << "Operation cancelled.";
return;
}
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "Failed to install component template 'icinga2@custom', retrying in 5 seconds: " << DiagnosticInformation(ex, false);
Log(LogDebug, "ElasticsearchDatastreamWriter")
@ -208,6 +229,11 @@ void ElasticsearchDatastreamWriter::ManageIndexTemplate()
<< "Response: " << JsonEncode(jsonResponse);
break;
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::system::errc::operation_canceled) {
Log(LogDebug, "ElasticsearchDatastreamWriter") << "Operation cancelled.";
return;
}
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "Failed to install/update index template 'icinga2-metrics', retrying in 5 seconds: " << DiagnosticInformation(ex, false);
Log(LogDebug, "ElasticsearchDatastreamWriter")
@ -531,6 +557,11 @@ void ElasticsearchDatastreamWriter::Flush()
m_DocumentsSent += m_DataBuffer.size();
break;
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::system::errc::operation_canceled) {
Log(LogDebug, "ElasticsearchDatastreamWriter") << "Operation cancelled.";
return;
}
if (m_Paused) {
// We are shutting down, don't retry.
Log(LogWarning, "ElasticsearchDatastreamWriter")
@ -626,35 +657,7 @@ Value ElasticsearchDatastreamWriter::TrySend(const Url::Ptr& url, String body)
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'.";
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;
OptionalTlsStream stream = Connect();
Defer closeStream([&stream]() {
if (stream.first) {
stream.first->lowest_layer().close();
} else if (stream.second) {
stream.second->lowest_layer().close();
}
});
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
http::read(*stream.first, buf, parser);
} else {
http::write(*stream.second, request);
stream.second->flush();
http::read(*stream.second, buf, parser);
}
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "Cannot perform http request API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
auto& response (parser.get());
auto response = m_Connection->Send(request);
if (response.result_int() > 299) {
Log(LogDebug, "ElasticsearchDatastreamWriter")
<< "Unexpected response code " << static_cast<unsigned>(response.result()) << " from URL '" << url->Format() << "'. Error: " << response.body();
@ -686,66 +689,6 @@ Value ElasticsearchDatastreamWriter::TrySend(const Url::Ptr& url, String body)
}
}
OptionalTlsStream ElasticsearchDatastreamWriter::Connect()
{
Log(LogNotice, "ElasticsearchDatastreamWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
OptionalTlsStream stream;
bool tls = GetEnableTls();
if (tls) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "Unable to create SSL context.";
throw;
}
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (tls) {
auto& tlsStream (stream.first->next_layer());
try {
tlsStream.handshake(boost::asio::ssl::stream_base::client);
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchDatastreamWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw;
}
if (!GetInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
}
if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}
return stream;
}
void ElasticsearchDatastreamWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());

View file

@ -11,12 +11,12 @@
#include "base/shared-object.hpp"
#include "base/workqueue.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "config/expression.hpp"
#include "icinga/checkable.hpp"
#include "icinga/checkresult.hpp"
#include "icinga/macroprocessor.hpp"
#include "remote/url.hpp"
#include "perfdata/perfdatawriterconnection.hpp"
#include "perfdata/elasticsearchdatastreamwriter-ti.hpp"
@ -68,6 +68,8 @@ private:
Timer::Ptr m_FlushTimer;
bool m_Paused = false;
PerfdataWriterConnection::Ptr m_Connection;
// This buffer should only be accessed from the worker thread.
// Every other access will lead to a race-condition.
std::vector<EcsDocument::Ptr> m_DataBuffer;
@ -86,7 +88,6 @@ private:
const CheckResult::Ptr& cr);
bool Filter(const Checkable::Ptr& checkable);
OptionalTlsStream Connect();
void AssertOnWorkQueue();
void Flush();
void SendRequest(const String& body);

View file

@ -44,6 +44,9 @@ class ElasticsearchDatastreamWriter : ConfigObject
[config] String cert_path;
[config] String key_path;
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
[config] int flush_interval {
default {{{ return 10; }}}
};