This commit is contained in:
Johannes Schmidt 2026-02-03 14:57:50 +01:00 committed by GitHub
commit cb9fecde61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 1568 additions and 829 deletions

View file

@ -11,80 +11,80 @@ concurrency:
group: linux-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true
jobs:
linux:
name: ${{ matrix.distro }}${{ matrix.platform != 'linux/amd64' && format(' ({0})', matrix.platform) || '' }}
runs-on: ubuntu-latest
# jobs:
# linux:
# name: ${{ matrix.distro }}${{ matrix.platform != 'linux/amd64' && format(' ({0})', matrix.platform) || '' }}
# runs-on: ubuntu-latest
strategy:
fail-fast: false
max-parallel: 2
matrix:
distro:
# Alpine Linux to build Icinga 2 with LibreSSL, OpenBSD's default.
# The "alpine:bash" image will be built below based on "alpine:3".
- alpine:bash
# strategy:
# fail-fast: false
# max-parallel: 2
# matrix:
# distro:
# # Alpine Linux to build Icinga 2 with LibreSSL, OpenBSD's default.
# # The "alpine:bash" image will be built below based on "alpine:3".
# - alpine:bash
- amazonlinux:2
- amazonlinux:2023
# - amazonlinux:2
# - amazonlinux:2023
# Raspberry Pi OS is close enough to Debian to test just one of them.
# Its architecture is different, though, and covered by the Docker job.
- debian:11
- debian:12
- debian:13
# # Raspberry Pi OS is close enough to Debian to test just one of them.
# # Its architecture is different, though, and covered by the Docker job.
# - debian:11
# - debian:12
# - debian:13
- fedora:41
- fedora:42
- fedora:43
# - fedora:41
# - fedora:42
# - fedora:43
- opensuse/leap:15.6
- opensuse/leap:16.0
# - opensuse/leap:15.6
# - opensuse/leap:16.0
# We don't actually support Rocky Linux as such!
# We just use that RHEL clone to test the original.
- rockylinux:8
- rockylinux:9
- rockylinux/rockylinux:10
# # We don't actually support Rocky Linux as such!
# # We just use that RHEL clone to test the original.
# - rockylinux:8
# - rockylinux:9
# - rockylinux/rockylinux:10
- registry.suse.com/suse/sle15:15.6
- registry.suse.com/suse/sle15:15.7
- registry.suse.com/bci/bci-base:16.0
# - registry.suse.com/suse/sle15:15.6
# - registry.suse.com/suse/sle15:15.7
# - registry.suse.com/bci/bci-base:16.0
- ubuntu:22.04
- ubuntu:24.04
- ubuntu:25.04
- ubuntu:25.10
# - ubuntu:22.04
# - ubuntu:24.04
# - ubuntu:25.04
# - ubuntu:25.10
platform:
- linux/amd64
# platform:
# - linux/amd64
include:
- distro: debian:11
platform: linux/386
- distro: debian:12
platform: linux/386
# include:
# - distro: debian:11
# platform: linux/386
# - distro: debian:12
# platform: linux/386
steps:
- name: Checkout HEAD
uses: actions/checkout@v6
# steps:
# - name: Checkout HEAD
# uses: actions/checkout@v6
- name: Turn on Problem Matcher
run: echo "::add-matcher::.github/problem-matchers/gcc.json"
# - name: Turn on Problem Matcher
# run: echo "::add-matcher::.github/problem-matchers/gcc.json"
- name: Restore/backup ccache
uses: actions/cache@v5
with:
path: ccache
key: ccache/${{ matrix.distro }}
# - name: Restore/backup ccache
# uses: actions/cache@v5
# with:
# path: ccache
# key: ccache/${{ matrix.distro }}
- name: Build Alpine Docker Image
if: "matrix.distro == 'alpine:bash'"
run: >-
docker build --file .github/workflows/alpine-bash.Dockerfile
--tag alpine:bash `mktemp -d`
# - name: Build Alpine Docker Image
# if: "matrix.distro == 'alpine:bash'"
# run: >-
# docker build --file .github/workflows/alpine-bash.Dockerfile
# --tag alpine:bash `mktemp -d`
- name: Build Icinga
run: >-
docker run --rm -v "$(pwd):/icinga2" -e DISTRO=${{ matrix.distro }}
--platform ${{ matrix.platform }} ${{ matrix.distro }} /icinga2/.github/workflows/linux.bash
# - name: Build Icinga
# run: >-
# docker run --rm -v "$(pwd):/icinga2" -e DISTRO=${{ matrix.distro }}
# --platform ${{ matrix.platform }} ${{ matrix.distro }} /icinga2/.github/workflows/linux.bash

View file

@ -11,53 +11,53 @@ concurrency:
group: windows-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true
jobs:
windows:
name: Windows
# jobs:
# windows:
# name: Windows
strategy:
fail-fast: false
max-parallel: 1
matrix:
bits: [32, 64]
# strategy:
# fail-fast: false
# max-parallel: 1
# matrix:
# bits: [32, 64]
runs-on: windows-2025
# runs-on: windows-2025
env:
BITS: '${{ matrix.bits }}'
CMAKE_BUILD_TYPE: RelWithDebInfo
# env:
# BITS: '${{ matrix.bits }}'
# CMAKE_BUILD_TYPE: RelWithDebInfo
steps:
- name: Checkout HEAD
uses: actions/checkout@v6
with:
fetch-depth: 0
# steps:
# - name: Checkout HEAD
# uses: actions/checkout@v6
# with:
# fetch-depth: 0
- name: Build tools
run: |
Set-PSDebug -Trace 1
& .\doc\win-dev.ps1
# - name: Build tools
# run: |
# Set-PSDebug -Trace 1
# & .\doc\win-dev.ps1
- name: Turn on Problem Matcher
run: |
Write-Host "::add-matcher::.github/problem-matchers/msvc.json"
# - name: Turn on Problem Matcher
# run: |
# Write-Host "::add-matcher::.github/problem-matchers/msvc.json"
- name: Binary
run: |
Set-PSDebug -Trace 1
& .\tools\win32\load-vsenv.ps1
& powershell.exe .\tools\win32\configure.ps1
if ($LastExitCode -ne 0) { throw "Error during configure" }
& powershell.exe .\tools\win32\build.ps1
if ($LastExitCode -ne 0) { throw "Error during build" }
& powershell.exe .\tools\win32\test.ps1
if ($LastExitCode -ne 0) { throw "Error during test" }
# - name: Binary
# run: |
# Set-PSDebug -Trace 1
# & .\tools\win32\load-vsenv.ps1
# & powershell.exe .\tools\win32\configure.ps1
# if ($LastExitCode -ne 0) { throw "Error during configure" }
# & powershell.exe .\tools\win32\build.ps1
# if ($LastExitCode -ne 0) { throw "Error during build" }
# & powershell.exe .\tools\win32\test.ps1
# if ($LastExitCode -ne 0) { throw "Error during test" }
- name: Show Log Files
if: ${{ always() }}
run: |
foreach ($file in Get-ChildItem -Recurse -Filter "*.log") {
Write-Host "::group::$($file.FullName)"
Get-Content $file.FullName
Write-Host "::endgroup::"
}
# - name: Show Log Files
# if: ${{ always() }}
# run: |
# foreach ($file in Get-ChildItem -Recurse -Filter "*.log") {
# Write-Host "::group::$($file.FullName)"
# Get-Content $file.FullName
# Write-Host "::endgroup::"
# }

View file

@ -75,5 +75,10 @@
#define BOOST_BIND_NO_PLACEHOLDERS
#include <functional>
#include <chrono>
namespace icinga {
using namespace std::chrono_literals;
} // namespace icinga
#endif /* I2BASE_H */

View file

@ -18,6 +18,7 @@ set(perfdata_SOURCES
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
perfdatawriterconnection.cpp
)
if(ICINGA2_UNITY_BUILD)

View file

@ -8,14 +8,10 @@
#include "icinga/macroprocessor.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/stream.hpp"
#include "base/base64.hpp"
#include "base/json.hpp"
#include "base/utility.hpp"
#include "base/networkstream.hpp"
#include "base/perfdatavalue.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
@ -31,7 +27,6 @@
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility>
@ -81,8 +76,6 @@ void ElasticsearchWriter::Resume()
{
ObjectImpl<ElasticsearchWriter>::Resume();
m_EventPrefix = "icinga2.event.";
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' resumed.";
@ -91,10 +84,25 @@ void ElasticsearchWriter::Resume()
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->OnTimerExpired.connect([this](const Timer* const&) {
m_WorkQueue.Enqueue([&]() { Flush(); });
});
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
Shared<boost::asio::ssl::context>::Ptr sslContext;
if (GetEnableTls()) {
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to create SSL context.";
throw;
}
}
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), sslContext};
/* Register for new metrics. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@ -117,14 +125,16 @@ void ElasticsearchWriter::Pause()
m_HandleCheckResults.disconnect();
m_HandleStateChanges.disconnect();
m_HandleNotifications.disconnect();
m_FlushTimer->Stop();
m_WorkQueue.Enqueue([&]() { Flush(); });
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
m_FlushTimer->Stop(true);
m_WorkQueue.Join();
{
std::unique_lock<std::mutex> lock (m_DataBufferMutex);
Flush();
}
m_Connection->Disconnect();
Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' paused.";
@ -378,14 +388,11 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
{
AssertOnWorkQueue();
/* Atomically buffer the data point. */
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
/* Format the timestamps to dynamically select the date datatype inside the index. */
fields->Set("@timestamp", FormatTimestamp(ts));
fields->Set("timestamp", FormatTimestamp(ts));
String eventType = m_EventPrefix + type;
String eventType = "icinga2.event." + type;
fields->Set("type", eventType);
/* Every payload needs a line describing the index.
@ -407,21 +414,6 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
}
}
void ElasticsearchWriter::FlushTimeout()
{
/* Prevent new data points from being added to the array, there is a
* race condition where they could disappear.
*/
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
/* Flush if there are any data available. */
if (m_DataBuffer.size() > 0) {
Log(LogDebug, "ElasticsearchWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
void ElasticsearchWriter::Flush()
{
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
@ -465,22 +457,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
url->SetPath(path);
OptionalTlsStream stream;
try {
stream = Connect();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter")
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
return;
}
Defer s ([&stream]() {
if (stream.first) {
stream.first->next_layer().shutdown();
}
});
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
@ -510,36 +486,18 @@ void ElasticsearchWriter::SendRequest(const String& body)
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'.";
decltype(m_Connection->Send(request)) response;
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;
try {
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
}
response = m_Connection->Send(request);
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
throw;
}
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
Log(LogDebug, "ElasticsearchWriter") << "Operation cancelled.";
return;
}
auto& response (parser.get());
Log(LogCritical, "ElasticsearchWriter") << "Error sending Request: " << ex.what();
}
if (response.result_int() > 299) {
if (response.result() == http::status::unauthorized) {
@ -588,66 +546,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
}
}
OptionalTlsStream ElasticsearchWriter::Connect()
{
Log(LogNotice, "ElasticsearchWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
OptionalTlsStream stream;
bool tls = GetEnableTls();
if (tls) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Unable to create SSL context.";
throw;
}
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (tls) {
auto& tlsStream (stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw;
}
if (!GetInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
}
if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}
return stream;
}
void ElasticsearchWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());

View file

@ -4,11 +4,11 @@
#define ELASTICSEARCHWRITER_H
#include "perfdata/elasticsearchwriter-ti.hpp"
#include "icinga/service.hpp"
#include "icinga/checkable.hpp"
#include "base/configobject.hpp"
#include "base/workqueue.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "perfdata/perfdatawriterconnection.hpp"
namespace icinga
{
@ -32,12 +32,12 @@ protected:
void Pause() override;
private:
String m_EventPrefix;
WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
std::mutex m_DataBufferMutex;
PerfdataWriterConnection::Ptr m_Connection;
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
@ -50,10 +50,8 @@ private:
void Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts);
OptionalTlsStream Connect();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);
void FlushTimeout();
void Flush();
void SendRequest(const String& body);
};

View file

@ -39,7 +39,10 @@ class ElasticsearchWriter : ConfigObject
[config] String cert_path;
[config] String key_path;
[config] int flush_interval {
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
[config] double flush_interval {
default {{{ return 10; }}}
};
[config] int flush_threshold {

View file

@ -5,24 +5,19 @@
#include "icinga/service.hpp"
#include "icinga/notification.hpp"
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/compatutility.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/context.hpp"
#include "base/exception.hpp"
#include "base/json.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string/replace.hpp>
#include <utility>
#include "base/io-engine.hpp"
#include <boost/asio/write.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/system/error_code.hpp>
@ -61,7 +56,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "connected", gelfwriter->GetConnected() },
{ "connected", gelfwriter->m_Connection->IsConnected() },
{ "source", gelfwriter->GetSource() }
}));
@ -82,12 +77,19 @@ void GelfWriter::Resume()
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Timer for reconnecting */
m_ReconnectTimer = Timer::Create();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
/* Initialize connection */
Shared<boost::asio::ssl::context>::Ptr sslContext;
if (GetEnableTls()) {
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "Unable to create SSL context.";
throw;
}
}
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), sslContext};
/* Register event handlers. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
@ -112,20 +114,14 @@ void GelfWriter::Pause()
m_HandleNotifications.disconnect();
m_HandleStateChanges.disconnect();
m_ReconnectTimer->Stop(true);
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
m_WorkQueue.Enqueue([this]() {
try {
ReconnectInternal();
} catch (const std::exception&) {
Log(LogInformation, "GelfWriter")
<< "Unable to connect, not flushing buffers. Data may be lost.";
}
}, PriorityImmediate);
m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow);
m_WorkQueue.Join();
m_Connection->Disconnect();
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' paused.";
@ -142,125 +138,7 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
DisconnectInternal();
}
void GelfWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
ReconnectInternal();
}
void GelfWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
SetShouldConnect(true);
if (GetConnected())
return;
Log(LogNotice, "GelfWriter")
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
bool ssl = GetEnableTls();
if (ssl) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "Unable to create SSL context.";
throw;
}
m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
throw;
}
if (ssl) {
auto& tlsStream (m_Stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "TLS handshake with host '" << GetHost() << " failed.'";
throw;
}
if (!GetInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
}
if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}
SetConnected(true);
Log(LogInformation, "GelfWriter")
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
void GelfWriter::ReconnectTimerHandler()
{
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
}
void GelfWriter::Disconnect()
{
AssertOnWorkQueue();
DisconnectInternal();
}
void GelfWriter::DisconnectInternal()
{
if (!GetConnected())
return;
if (m_Stream.first) {
boost::system::error_code ec;
m_Stream.first->next_layer().shutdown(ec);
// https://stackoverflow.com/a/25703699
// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
if (ec.category() == boost::asio::error::get_ssl_category()) {
Log(LogCritical, "GelfWriter")
<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
}
} else if (m_Stream.second) {
m_Stream.second->close();
}
SetConnected(false);
// TODO: m_Connection->Disconnect();
}
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -472,26 +350,23 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
msgbuf << gelfMessage;
msgbuf << '\0';
String log = msgbuf.str();
if (!GetConnected())
return;
auto log = msgbuf.str();
try {
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
if (m_Stream.first) {
boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
m_Stream.first->flush();
} else {
boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
m_Stream.second->flush();
}
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
Log(LogDebug, "ElasticsearchWriter") << "Operation cancelled.";
return;
}
throw ex;
Log(LogCritical, "GelfWriter")
<< "Error during send operation: " << ex.what();
throw;
}
}

View file

@ -4,12 +4,10 @@
#define GELFWRITER_H
#include "perfdata/gelfwriter-ti.hpp"
#include "icinga/service.hpp"
#include "perfdata/perfdatawriterconnection.hpp"
#include "icinga/checkable.hpp"
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <fstream>
namespace icinga
{
@ -33,11 +31,10 @@ protected:
void Pause() override;
private:
OptionalTlsStream m_Stream;
PerfdataWriterConnection::Ptr m_Connection;
WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
@ -47,13 +44,6 @@ private:
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
void ReconnectTimerHandler();
void Disconnect();
void DisconnectInternal();
void Reconnect();
void ReconnectInternal();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);

View file

@ -24,10 +24,10 @@ class GelfWriter : ConfigObject
default {{{ return false; }}}
};
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
[config] bool enable_ha {
default {{{ return false; }}}
};

View file

@ -6,16 +6,13 @@
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp>
@ -64,7 +61,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
{ "work_queue_items", workQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "connected", graphitewriter->GetConnected() }
{ "connected", graphitewriter->m_Connection->IsConnected() }
}));
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
@ -87,12 +84,7 @@ void GraphiteWriter::Resume()
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Timer for reconnecting */
m_ReconnectTimer = Timer::Create();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
m_Connection = new PerfdataWriterConnection(GetHost(), GetPort(), nullptr);
/* Register event handlers. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
@ -107,20 +99,14 @@ void GraphiteWriter::Resume()
void GraphiteWriter::Pause()
{
m_HandleCheckResults.disconnect();
m_ReconnectTimer->Stop(true);
try {
ReconnectInternal();
} catch (const std::exception&) {
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
ObjectImpl<GraphiteWriter>::Pause();
return;
}
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
m_WorkQueue.Join();
DisconnectInternal();
m_Connection->Disconnect();
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' paused.";
@ -149,105 +135,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
Log(LogDebug, "GraphiteWriter")
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) {
m_Stream->close();
SetConnected(false);
}
}
/**
* Reconnect method, stops when the feature is paused in HA zones.
*
* Called inside the WQ.
*/
void GraphiteWriter::Reconnect()
{
AssertOnWorkQueue();
if (IsPaused()) {
SetConnected(false);
return;
}
ReconnectInternal();
}
/**
* Reconnect method, connects to a TCP Stream
*/
void GraphiteWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
SetShouldConnect(true);
if (GetConnected())
return;
Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
SetConnected(false);
throw;
}
SetConnected(true);
Log(LogInformation, "GraphiteWriter")
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
/**
* Reconnect handler called by the timer.
*
* Enqueues a reconnect task into the WQ.
*/
void GraphiteWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
}
/**
* Disconnect the stream.
*
* Called inside the WQ.
*/
void GraphiteWriter::Disconnect()
{
AssertOnWorkQueue();
DisconnectInternal();
}
/**
* Disconnect the stream.
*
* Called outside the WQ.
*/
void GraphiteWriter::DisconnectInternal()
{
if (!GetConnected())
return;
m_Stream->close();
SetConnected(false);
}
/**
@ -393,15 +280,15 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
// do not send \n to debug log
msgbuf << "\n";
std::unique_lock<std::mutex> lock(m_StreamMutex);
if (!GetConnected())
return;
try {
asio::write(*m_Stream, asio::buffer(msgbuf.str()));
m_Stream->flush();
m_Connection->Send(asio::buffer(msgbuf.str()));
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
Log(LogDebug, "ElasticsearchWriter") << "Operation Cancelled.";
return;
}
Log(LogCritical, "GraphiteWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";

View file

@ -4,13 +4,10 @@
#define GRAPHITEWRITER_H
#include "perfdata/graphitewriter-ti.hpp"
#include "icinga/service.hpp"
#include "icinga/checkable.hpp"
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <fstream>
#include <mutex>
#include "perfdata/perfdatawriterconnection.hpp"
namespace icinga
{
@ -37,12 +34,11 @@ protected:
void Pause() override;
private:
Shared<AsioTcpStream>::Ptr m_Stream;
std::mutex m_StreamMutex;
PerfdataWriterConnection::Ptr m_Connection;
WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
@ -51,13 +47,6 @@ private:
static String EscapeMetricLabel(const String& str);
static Value EscapeMacroMetric(const Value& value);
void ReconnectTimerHandler();
void Disconnect();
void DisconnectInternal();
void Reconnect();
void ReconnectInternal();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);

View file

@ -26,9 +26,8 @@ class GraphiteWriter : ConfigObject
[config] bool enable_send_thresholds;
[config] bool enable_send_metadata;
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
[config] bool enable_ha {
default {{{ return false; }}}

View file

@ -8,19 +8,11 @@
#include "icinga/icingaapplication.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/stream.hpp"
#include "base/json.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include "base/tlsutility.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -37,7 +29,6 @@
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility>
@ -96,6 +87,19 @@ void InfluxdbCommonWriter::Resume()
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
Shared<boost::asio::ssl::context>::Ptr sslContext;
if (GetSslEnable()) {
try {
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to create SSL context.";
throw;
}
}
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), sslContext};
/* Register for new metrics. */
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
@ -112,12 +116,18 @@ void InfluxdbCommonWriter::Pause()
Log(LogDebug, GetReflectionType()->GetName())
<< "Processing pending tasks and flushing data buffers.";
m_FlushTimer->Stop(true);
m_FlushTimer->Stop();
m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
m_WorkQueue.Join();
m_Connection->Disconnect();
Log(LogInformation, GetReflectionType()->GetName())
<< "'" << GetName() << "' paused.";
@ -139,66 +149,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
//TODO: Close the connection, if we keep it open.
}
OptionalTlsStream InfluxdbCommonWriter::Connect()
{
Log(LogNotice, GetReflectionType()->GetName())
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
OptionalTlsStream stream;
bool ssl = GetSslEnable();
if (ssl) {
Shared<boost::asio::ssl::context>::Ptr sslContext;
try {
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Unable to create SSL context.";
throw;
}
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else {
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
try {
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (ssl) {
auto& tlsStream (stream.first->next_layer());
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "TLS handshake with host '" << GetHost() << "' failed.";
throw;
}
if (!GetSslInsecureNoverify()) {
if (!tlsStream.GetPeerCertificate()) {
BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate."));
}
if (!tlsStream.IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error(
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
));
}
}
}
return stream;
}
void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
@ -443,54 +393,20 @@ void InfluxdbCommonWriter::FlushWQ()
m_DataBuffer.clear();
m_DataBufferSize = 0;
OptionalTlsStream stream;
try {
stream = Connect();
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
return;
}
Defer s ([&stream]() {
if (stream.first) {
stream.first->next_layer().shutdown();
}
});
auto request (AssembleRequest(std::move(body)));
decltype(m_Connection->Send(request)) response;
try {
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
response = m_Connection->Send(request);
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;
try {
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
Log(LogDebug, GetReflectionType()->GetName()) << "Operation cancelled.";
return;
}
} catch (const std::exception& ex) {
Log(LogWarning, GetReflectionType()->GetName())
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
throw;
}
auto& response (parser.get());
Log(LogCritical, GetReflectionType()->GetName()) << "Error sending Request: " << ex.what();
}
if (response.result() != http::status::no_content) {
Log(LogWarning, GetReflectionType()->GetName())

View file

@ -4,18 +4,16 @@
#define INFLUXDBCOMMONWRITER_H
#include "perfdata/influxdbcommonwriter-ti.hpp"
#include "icinga/service.hpp"
#include "icinga/checkable.hpp"
#include "base/configobject.hpp"
#include "base/perfdatavalue.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include "remote/url.hpp"
#include "perfdata/perfdatawriterconnection.hpp"
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <atomic>
#include <fstream>
namespace icinga
{
@ -49,10 +47,13 @@ protected:
private:
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_FlushTimer;
WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer;
std::atomic_size_t m_DataBufferSize{0};
PerfdataWriterConnection::Ptr m_Connection;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
const String& label, const Dictionary::Ptr& fields, double ts);
@ -63,8 +64,6 @@ private:
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);
OptionalTlsStream Connect();
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);

View file

@ -51,13 +51,16 @@ abstract class InfluxdbCommonWriter : ConfigObject
});
}}}
};
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
[config] bool enable_send_thresholds {
default {{{ return false; }}}
};
[config] bool enable_send_metadata {
default {{{ return false; }}}
};
[config] int flush_interval {
[config] double flush_interval {
default {{{ return 10; }}}
};
[config] int flush_threshold {

View file

@ -6,17 +6,12 @@
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/compatutility.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp>
@ -35,6 +30,8 @@ void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("OpenTsdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "OpenTsdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
@ -56,7 +53,7 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
{ "connected", opentsdbwriter->GetConnected() }
{ "connected", opentsdbwriter->m_Connection->IsConnected() }
}));
}
@ -73,13 +70,14 @@ void OpenTsdbWriter::Resume()
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) {
Log(LogDebug, "ElasticsearchWriter")
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
});
ReadConfigTemplate();
m_ReconnectTimer = Timer::Create();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
m_Connection = new PerfdataWriterConnection{GetHost(), GetPort(), nullptr};
m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr);
@ -92,60 +90,21 @@ void OpenTsdbWriter::Resume()
void OpenTsdbWriter::Pause()
{
m_HandleCheckResults.disconnect();
m_ReconnectTimer->Stop(true);
m_Connection->StartDisconnectTimeout(
std::chrono::milliseconds{static_cast<unsigned>(GetDisconnectTimeout() * 1000)}
);
m_WorkQueue.Join();
m_Connection->Disconnect();
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
m_Stream->close();
SetConnected(false);
ObjectImpl<OpenTsdbWriter>::Pause();
}
/**
* Reconnect handler called by the timer.
* Handles TLS
*/
void OpenTsdbWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
SetShouldConnect(true);
if (GetConnected())
return;
double startTime = Utility::GetTime();
Log(LogNotice, "OpenTsdbWriter")
<< "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
/*
* We're using telnet as input method. Future PRs may change this into using the HTTP API.
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
*/
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "OpenTsdbWriter")
<< "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
SetConnected(false);
return;
}
SetConnected(true);
Log(LogInformation, "OpenTsdbWriter")
<< "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
/**
* Registered check result handler processing data.
* Calculates tags from the config.
@ -199,17 +158,17 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
// Resolve macros for the service and host template config line
if (config_tmpl_tags) {
ObjectLock olock(config_tmpl_tags);
for (const Dictionary::Pair& pair : config_tmpl_tags) {
String missing_macro;
Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
if (!missing_macro.IsEmpty()) {
Log(LogDebug, "OpenTsdbWriter")
<< "Unable to resolve macro '" << missing_macro
<< "Unable to resolve macro '" << missing_macro
<< "' for checkable '" << checkable->GetName() << "'.";
continue;
}
@ -220,29 +179,29 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
continue;
}
String tagname = Convert::ToString(pair.first);
tags[tagname] = EscapeTag(value);
}
}
// Resolve macros for the metric config line
if (!config_tmpl_metric.IsEmpty()) {
String missing_macro;
Value value = MacroProcessor::ResolveMacros(config_tmpl_metric, resolvers, cr, &missing_macro);
if (!missing_macro.IsEmpty()) {
Log(LogDebug, "OpenTsdbWriter")
<< "Unable to resolve macro '" << missing_macro
<< "Unable to resolve macro '" << missing_macro
<< "' for checkable '" << checkable->GetName() << "'.";
}
else {
config_tmpl_metric = Convert::ToString(value);
}
}
}
@ -262,39 +221,43 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
metric = "icinga.service." + escaped_serviceName;
}
SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
AddMetric(checkable, metric + ".state", tags, service->GetState(), ts);
} else {
if (!config_tmpl_metric.IsEmpty()) {
metric = config_tmpl_metric;
} else {
metric = "icinga.host";
}
SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
AddMetric(checkable, metric + ".state", tags, host->GetState(), ts);
}
SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
AddMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
AddMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
AddMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
AddMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
SendPerfdata(checkable, metric, tags, cr, ts);
m_WorkQueue.Enqueue([this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), ts](
) mutable {
AddPerfdata(checkable, metric, tags, cr, ts);
metric = "icinga.check";
metric = "icinga.check";
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
SendMsgBuffer();
});
}
/**
@ -306,7 +269,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
* @param cr Check result containing performance data
* @param ts Timestamp when the check result was received
*/
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
@ -333,7 +296,7 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
continue;
}
}
String metric_name;
std::map<String, String> tags_new = tags;
@ -349,21 +312,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
tags_new["label"] = escaped_key;
}
SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
if (!pdv->GetCrit().IsEmpty())
SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
if (!pdv->GetWarn().IsEmpty())
SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
if (!pdv->GetMin().IsEmpty())
SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
if (!pdv->GetMax().IsEmpty())
SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
}
}
/**
* Send given metric to OpenTSDB
* Add given metric to the data buffer to be later sent to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
@ -371,42 +334,48 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
* @param value Floating point metric value
* @param ts Timestamp where the metric was received from the check result
*/
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
String tags_string = "";
for (auto& tag : tags) {
tags_string += " " + tag.first + "=" + tag.second;
}
std::ostringstream msgbuf;
/*
* must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
* put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
* "tags" must include at least one tag, we use "host=HOSTNAME"
*/
msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << tags_string;
std::string msg{
"put " + metric + " " + std::to_string(static_cast<long>(ts)) + " " + Convert::ToString(value)
};
for (const auto& tag : tags) {
tags_string += " " + tag.first + "=" + tag.second;
}
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msg << "'.";
/* do not send \n to debug log */
msgbuf << "\n";
String put = msgbuf.str();
m_MsgBuf.append(msg);
m_MsgBuf.append(tags_string).push_back('\n');
}
ObjectLock olock(this);
void OpenTsdbWriter::SendMsgBuffer()
{
ASSERT(m_WorkQueue.IsWorkerThread());
if (!GetConnected())
return;
Log(LogDebug, "OpenTsdbWriter")
<< "Flushing data buffer to OpenTsdb.";
try {
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
m_Stream->flush();
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
Log(LogDebug, "ElasticsearchWriter") << "Operation canceled.";
return;
}
Log(LogCritical, "OpenTsdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
}
@ -507,7 +476,7 @@ void OpenTsdbWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, c
}
/**
* Validates the service_template configuration block in the
* Validates the service_template configuration block in the
* configuration file and checks for syntax errors.
*
* @param lvalue The service_template dictionary

View file

@ -4,11 +4,9 @@
#define OPENTSDBWRITER_H
#include "perfdata/opentsdbwriter-ti.hpp"
#include "icinga/service.hpp"
#include "icinga/checkable.hpp"
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include <fstream>
#include "perfdata/perfdatawriterconnection.hpp"
namespace icinga
{
@ -35,24 +33,24 @@ protected:
void Pause() override;
private:
Shared<AsioTcpStream>::Ptr m_Stream;
WorkQueue m_WorkQueue{10000000, 1};
std::string m_MsgBuf;
PerfdataWriterConnection::Ptr m_Connection;
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_ReconnectTimer;
Dictionary::Ptr m_ServiceConfigTemplate;
Dictionary::Ptr m_HostConfigTemplate;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& metric,
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void SendMsgBuffer();
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
static String EscapeTag(const String& str);
static String EscapeMetric(const String& str);
void ReconnectTimerHandler();
void ReadConfigTemplate();
};

View file

@ -30,10 +30,8 @@ class OpenTsdbWriter : ConfigObject
[config] bool enable_generic_metrics {
default {{{ return false; }}}
};
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
[config] double disconnect_timeout {
default {{{ return 0.5; }}}
};
};

View file

@ -0,0 +1,248 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include "perfdata/perfdatawriterconnection.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include <boost/asio/read.hpp>
#include <future>
#include <utility>
using namespace icinga;
using HttpResponse = PerfdataWriterConnection::HttpResponse;
PerfdataWriterConnection::
PerfdataWriterConnection(String host, String port, Shared<boost::asio::ssl::context>::Ptr sslContext, bool verifySecure)
: m_VerifySecure(verifySecure), m_SslContext(std::move(sslContext)), m_Host(std::move(host)),
m_Port(std::move(port)), m_DisconnectTimer(IoEngine::Get().GetIoContext()),
m_ReconnectTimer(IoEngine::Get().GetIoContext()), m_Strand(IoEngine::Get().GetIoContext()), m_Stream(ResetStream())
{
}
void PerfdataWriterConnection::Send(boost::asio::const_buffer data)
{
std::promise<void> promise;
IoEngine::SpawnCoroutine(m_Strand, [&, keepAlive = PerfdataWriterConnection::Ptr(this)](boost::asio::yield_context yc) {
try {
EnsureConnected(yc);
std::visit(
[&](auto& stream) {
boost::asio::async_write(*stream, data, yc);
stream->async_flush(yc);
},
m_Stream
);
promise.set_value();
} catch (const std::exception&) {
promise.set_exception(std::current_exception());
}
});
promise.get_future().get();
}
HttpResponse PerfdataWriterConnection::Send(HttpRequest& request)
{
std::promise<HttpResponse> promise;
IoEngine::SpawnCoroutine(m_Strand, [&, keepAlive = PerfdataWriterConnection::Ptr(this)](boost::asio::yield_context yc) {
try {
EnsureConnected(yc);
std::visit(
[&](auto& stream) {
boost::beast::http::async_write(*stream, request, yc);
stream->async_flush(yc);
},
m_Stream
);
boost::beast::http::response_parser<boost::beast::http::string_body> parser;
std::visit([&](auto& stream) { boost::beast::http::async_read(*stream, m_Streambuf, parser, yc); }, m_Stream);
if (!parser.get().keep_alive()) {
Disconnect(yc);
}
promise.set_value(parser.release());
} catch (const std::exception&) {
promise.set_exception(std::current_exception());
}
});
return promise.get_future().get();
}
/**
* Get the current state of the connection.
*
* This wraps retrieving the state in boost::asio::post() on the strand instead of making it
* atomic, because the only defined states are the suspension points where the coroutine yields.
*/
bool PerfdataWriterConnection::IsConnected()
{
std::promise<bool> p;
boost::asio::post(m_Strand, [&]() { p.set_value(m_State == State::connected); });
return p.get_future().get();
}
void PerfdataWriterConnection::Disconnect()
{
std::promise<void> promise;
IoEngine::SpawnCoroutine(m_Strand, [&, keepAlive = PerfdataWriterConnection::Ptr(this)](boost::asio::yield_context yc) {
try {
Disconnect(std::move(yc));
promise.set_value();
} catch (const std::exception& ex) {
promise.set_exception(std::current_exception());
}
});
promise.get_future().get();
}
void PerfdataWriterConnection::StartDisconnectTimeout(std::chrono::milliseconds timeout)
{
constexpr auto cancelStreamIfOpen = [](auto& stream) {
if (stream->lowest_layer().is_open()) {
stream->lowest_layer().cancel();
}
};
IoEngine::SpawnCoroutine(
m_Strand, [&, timeout, keepAlive = PerfdataWriterConnection::Ptr(this)](boost::asio::yield_context yc) {
try {
if (m_State != State::initial) {
m_DisconnectTimer.expires_after(timeout);
m_DisconnectTimer.async_wait(yc);
} else {
m_Stopped = true;
return;
}
m_Stopped = true;
/* This needs to be done in a loop, because ASIO's cancel isn't guaranteed to
* cancel anything. For example, a connect() operation may already be queued for
* completion after this coroutine yields, so we need to attempt another
* cancellation for a potential handshake.
*/
while (m_State == State::connecting) {
std::visit(cancelStreamIfOpen, m_Stream);
boost::asio::post(yc);
}
/* From here on, the other coroutine is either in failed or connected state.
*/
if (m_State == State::failed) {
m_ReconnectTimer.cancel();
} else if (m_State == State::connected) {
std::visit(cancelStreamIfOpen, m_Stream);
}
/* We can now be sure that the other coroutine will throw an operation_aborted
* error_code. All we need to do is yield to give it a chance to throw it.
*/
boost::asio::post(yc);
/* Disconnect only does anything if the last state was connected.
*/
Disconnect(yc);
} catch (const std::exception& ex) {
Log(LogCritical, "PerfdataWriterConnection") << "Exception during disconnect timeout: " << ex.what();
}
}
);
}
AsioTlsOrTcpStream PerfdataWriterConnection::ResetStream()
{
AsioTlsOrTcpStream ret;
if (m_SslContext) {
ret = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
} else {
ret = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
}
return ret;
}
void PerfdataWriterConnection::EnsureConnected(boost::asio::yield_context yc)
{
while (m_State != State::connected) {
if (m_Stopped) {
const boost::system::error_code ec{boost::asio::error::operation_aborted, boost::system::system_category()};
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
}
m_State = State::connecting;
try {
std::visit(
[&](auto& stream) {
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
if constexpr (std::is_same_v<std::remove_reference_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
using type = boost::asio::ssl::stream_base::handshake_type;
stream->next_layer().async_handshake(type::client, yc);
if (m_VerifySecure && !stream->next_layer().IsVerifyOK()) {
BOOST_THROW_EXCEPTION(std::runtime_error{"TLS certificate validation failed"});
}
}
},
m_Stream
);
m_State = State::connected;
m_RetryTimeout = 1s;
} catch (const std::exception& ex) {
if (m_State == State::connecting) {
m_State = State::failed;
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
se->code() == boost::asio::error::operation_aborted) {
throw;
}
m_Stream = ResetStream();
/* Timeout before making another attempt at connecting.
*/
m_ReconnectTimer.expires_after(m_RetryTimeout);
if (m_RetryTimeout < 30s) {
m_RetryTimeout *= 2;
}
m_ReconnectTimer.async_wait(yc);
}
}
}
}
void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
{
if (m_State != State::connected) {
return;
}
m_State = State::disconnecting;
std::visit(
[&](auto& stream) {
if constexpr (std::is_same_v<std::remove_reference_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
stream->GracefulDisconnect(m_Strand, yc);
} else {
stream->next_layer().shutdown(boost::asio::socket_base::shutdown_both);
stream->lowest_layer().close();
}
},
m_Stream
);
m_Stream = ResetStream();
m_State = State::disconnected;
}

View file

@ -0,0 +1,72 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#pragma once
#include "base/tlsstream.hpp"
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/string_body.hpp>
namespace icinga {
/**
* Class handling the connection to the various Perfdata backends.
*/
class PerfdataWriterConnection final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;
using HttpResponse = boost::beast::http::response<boost::beast::http::string_body>;
explicit PerfdataWriterConnection(
String host,
String port,
Shared<boost::asio::ssl::context>::Ptr sslContext,
bool verifySecure = true
);
void Send(boost::asio::const_buffer data);
HttpResponse Send(HttpRequest& request);
void Disconnect();
void StartDisconnectTimeout(std::chrono::milliseconds timeout);
bool IsConnected();
private:
AsioTlsOrTcpStream ResetStream();
void EnsureConnected(boost::asio::yield_context yc);
void Disconnect(boost::asio::yield_context yc);
enum class State : std::uint8_t
{
initial,
connecting,
connected,
disconnecting,
disconnected,
failed
};
State m_State{State::initial};
bool m_Stopped{};
bool m_VerifySecure;
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
String m_Host;
String m_Port;
std::chrono::milliseconds m_RetryTimeout{1000ms};
boost::asio::steady_timer m_DisconnectTimer;
boost::asio::steady_timer m_ReconnectTimer;
boost::asio::io_context::strand m_Strand;
boost::asio::streambuf m_Streambuf;
AsioTlsOrTcpStream m_Stream;
};
} // namespace icinga

View file

@ -139,6 +139,18 @@ if(ICINGA2_WITH_NOTIFICATION)
)
endif()
if(ICINGA2_WITH_PERFDATA)
list(APPEND base_test_SOURCES
perfdata-elasticsearchwriter.cpp
perfdata-gelfwriter.cpp
perfdata-graphitewriter.cpp
perfdata-influxdbwriter.cpp
perfdata-opentsdbwriter.cpp
perfdata-perfdatawriterconnection.cpp
$<TARGET_OBJECTS:perfdata>
)
endif()
if(ICINGA2_UNITY_BUILD)
mkunity_target(base test base_test_SOURCES)
endif()

View file

@ -10,6 +10,12 @@
#include <boost/test/test_tools.hpp>
#include <future>
#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout))
#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout))
#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout))
#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout))
namespace icinga {
class TestLogger : public Logger
@ -95,8 +101,8 @@ struct TestLoggerFixture
TestLoggerFixture()
{
testLogger->SetSeverity(testLogger->SeverityToString(LogDebug));
testLogger->Activate(true);
testLogger->SetActive(true);
testLogger->Activate(true);
}
~TestLoggerFixture()

View file

@ -142,6 +142,13 @@ object NotificationComponent "nc" {}
return !result;
}
boost::test_tools::assertion_result AssertNoReSendSuppressedLogPattern()
{
auto result = ExpectLogPattern("^Attempting to re-send previously suppressed notification.*$", 0s);
ClearTestLogger();
return !result;
}
void BeginTimePeriod()
{
ObjectLock lock{m_TimePeriod};
@ -252,7 +259,7 @@ BOOST_AUTO_TEST_CASE(notify_send_reminders)
// Rerunning the timer before the next interval should not trigger a reminder notification.
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
@ -272,7 +279,7 @@ BOOST_AUTO_TEST_CASE(notify_send_reminders)
// Now we wait for one interval and check that no reminder has been sent.
WaitUntilNextReminderScheduled();
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 3);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationRecovery);
}
@ -399,7 +406,7 @@ BOOST_AUTO_TEST_CASE(notify_after_timeperiod_simple)
ReceiveCheckResults(3, ServiceCritical);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 0);
BOOST_REQUIRE_EQUAL(GetLastNotification(), 0);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationProblem);
@ -429,7 +436,7 @@ BOOST_AUTO_TEST_CASE(notify_multiple_state_changes_outside_timeperiod)
EndTimePeriod();
ReceiveCheckResults(1, ServiceOK);
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationRecovery);
@ -454,7 +461,7 @@ BOOST_AUTO_TEST_CASE(notify_multiple_state_changes_outside_timeperiod)
// Third Critical check result will set the Critical hard state.
ReceiveCheckResults(2, ServiceCritical);
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), 0);
@ -466,7 +473,7 @@ BOOST_AUTO_TEST_CASE(notify_multiple_state_changes_outside_timeperiod)
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), 0);
ReceiveCheckResults(1, ServiceOK);
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationRecovery);
@ -498,14 +505,14 @@ BOOST_AUTO_TEST_CASE(no_notify_suppressed_cancel_out)
ReceiveCheckResults(3, ServiceCritical);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 0);
BOOST_REQUIRE_EQUAL(GetLastNotification(), 0);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationProblem);
ReceiveCheckResults(1, ServiceOK);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 0);
BOOST_REQUIRE_EQUAL(GetLastNotification(), 0);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), 0);
@ -530,14 +537,14 @@ BOOST_AUTO_TEST_CASE(no_notify_suppressed_cancel_out)
ReceiveCheckResults(1, ServiceOK);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationRecovery);
ReceiveCheckResults(3, ServiceCritical);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 1);
BOOST_REQUIRE_EQUAL(GetLastNotification(), NotificationProblem);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), 0);
@ -574,7 +581,7 @@ BOOST_AUTO_TEST_CASE(no_notify_non_applicable_reason)
// We queue a suppressed notification.
ReceiveCheckResults(3, ServiceCritical);
NotificationTimerHandler();
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 0);
BOOST_REQUIRE_EQUAL(GetLastNotification(), 0);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationProblem);
@ -585,7 +592,7 @@ BOOST_AUTO_TEST_CASE(no_notify_non_applicable_reason)
// before the timer can run again. No notification should be sent, because the last state
// change the user was notified about was the same.
ReceiveCheckResults(1, ServiceOK);
BOOST_REQUIRE(AssertNoAttemptedSendLogPattern());
BOOST_REQUIRE(AssertNoReSendSuppressedLogPattern());
BOOST_REQUIRE_EQUAL(GetNotificationCount(), 0);
BOOST_REQUIRE_EQUAL(GetLastNotification(), 0);
BOOST_REQUIRE_EQUAL(GetSuppressedNotifications(), NotificationProblem);

View file

@ -0,0 +1,53 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <BoostTestTargetConfig.h>
#include "perfdata/elasticsearchwriter.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
#include "test/utils.hpp"
using namespace icinga;
BOOST_FIXTURE_TEST_SUITE(perfdata_elasticsearchwriter, PerfdataWriterFixture<ElasticsearchWriter>,
*boost::unit_test::label("perfdata"))
BOOST_AUTO_TEST_CASE(connect)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical);
Accept();
auto resp = GetSplitDecodedRequestBody();
SendResponse();
// Just some basic sanity tests. It's not important to check if everything is entirely
// correct here.
BOOST_REQUIRE_GT(resp->GetLength(), 1);
Dictionary::Ptr cr = resp->Get(1);
BOOST_CHECK(cr->Contains("@timestamp"));
BOOST_CHECK_EQUAL(cr->Get("check_command"), "dummy");
BOOST_CHECK_EQUAL(cr->Get("host"), "h1");
PauseWriter();
}
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical, [](const CheckResult::Ptr& cr) {
cr->SetOutput(GetRandomString("####", 1024UL * 1024));
});
// Accept the connection, but don't read from it to leave the client hanging.
Accept();
GetDataUntil("####");
// Now try to pause.
PauseWriter();
REQUIRE_LOG_MESSAGE("Operation cancelled\\.", 10s);
REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,51 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <BoostTestTargetConfig.h>
#include "perfdata/gelfwriter.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
#include "test/utils.hpp"
using namespace icinga;
BOOST_FIXTURE_TEST_SUITE(perfdata_gelfwriter, PerfdataWriterFixture<GelfWriter>,
*boost::unit_test::label("perfdata"))
BOOST_AUTO_TEST_CASE(connect)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical);
Accept();
Dictionary::Ptr resp = JsonDecode(GetDataUntil('\0'));
// Just some basic sanity tests. It's not important to check if everything is entirely
// correct here.
BOOST_CHECK_CLOSE(resp->Get("timestamp").Get<double>(), Utility::GetTime(), 0.5);
BOOST_CHECK_EQUAL(resp->Get("_check_command"), "dummy");
BOOST_CHECK_EQUAL(resp->Get("_hostname"), "h1");
PauseWriter();
}
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
{
ResumeWriter();
// Make GelfWriter fill up the connection's buffer with a huge check-result.
ReceiveCheckResults(1, ServiceState::ServiceCritical, [](const CheckResult::Ptr& cr) {
cr->SetOutput(GetRandomString("####", 1024UL * 1024));
});
// Accept the connection, but only read far enough so we know the writer is now stuck.
Accept();
GetDataUntil("####");
// Now try to pause.
PauseWriter();
REQUIRE_LOG_MESSAGE("Operation cancelled\\.", 1s);
REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,52 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <BoostTestTargetConfig.h>
#include "perfdata/graphitewriter.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
#include "test/utils.hpp"
using namespace icinga;
BOOST_FIXTURE_TEST_SUITE(
perfdata_graphitewriter,
PerfdataWriterFixture<GraphiteWriter>,
*boost::unit_test::label("perfdata")
)
BOOST_AUTO_TEST_CASE(connect)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical);
Accept();
auto msg = GetDataUntil('\n');
// Just some basic sanity tests. It's not important to check if everything is entirely correct here.
std::string_view cmpStr{"icinga2.h1.host.dummy.perfdata.thing.value 42"};
BOOST_REQUIRE_EQUAL(msg.substr(0, cmpStr.length()), cmpStr);
PauseWriter();
}
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
{
ResumeWriter();
// Make GraphiteWriter send a huge message that fills up the connection's buffer.
ReceiveCheckResults(1, ServiceState::ServiceCritical, [&](const CheckResult::Ptr& cr) {
cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("aaaa", 24UL * 1024 * 1024), 1});
});
// Accept the connection, but don't read from it to leave the client hanging.
Accept();
GetDataUntil("aaaa");
// Now try to pause.
PauseWriter();
REQUIRE_LOG_MESSAGE("Operation Cancelled\\.", 10s);
REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,50 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <BoostTestTargetConfig.h>
#include "perfdata/influxdb2writer.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
using namespace icinga;
BOOST_FIXTURE_TEST_SUITE(perfdata_influxdbwriter, PerfdataWriterFixture<Influxdb2Writer>,
*boost::unit_test::label("perfdata"))
BOOST_AUTO_TEST_CASE(connect)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical);
Accept();
auto req = GetSplitRequestBody(',');
SendResponse(boost::beast::http::status::no_content);
// Just some basic sanity tests. It's not important to check if everything is entirely
// correct here.
BOOST_REQUIRE_EQUAL(req.size(), 3);
BOOST_CHECK_EQUAL(req[0], "dummy");
BOOST_CHECK_EQUAL(req[1], "hostname=h1");
std::string_view perfData = "metric=thing value=42";
BOOST_CHECK_EQUAL(req[2].substr(0, perfData.length()), perfData);
PauseWriter();
}
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
{
ResumeWriter();
// Make Influxdb2Writer fill up the connection's buffer with a huge check-result.
ReceiveCheckResults(1, ServiceState::ServiceCritical);
// Accept the connection, but only read far enough so we know the writer is now stuck.
Accept();
// Now try to pause.
PauseWriter();
REQUIRE_LOG_MESSAGE("Operation cancelled\\.", 1s);
REQUIRE_LOG_MESSAGE("'Influxdb2Writer' paused\\.", 1s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,56 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <BoostTestTargetConfig.h>
#include "perfdata/opentsdbwriter.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
#include "test/utils.hpp"
using namespace icinga;
BOOST_FIXTURE_TEST_SUITE(perfdata_opentsdbwriter, PerfdataWriterFixture<OpenTsdbWriter>,
*boost::unit_test::label("perfdata"))
BOOST_AUTO_TEST_CASE(connect)
{
ResumeWriter();
ReceiveCheckResults(1, ServiceState::ServiceCritical);
Accept();
auto msg = GetDataUntil('\n');
std::vector<std::string> splitMsg;
boost::split(splitMsg, msg, boost::is_any_of(" "));
// Just some basic sanity tests. It's not important to check if everything is entirely correct here.
BOOST_REQUIRE_EQUAL(splitMsg.size(), 5);
BOOST_REQUIRE_EQUAL(splitMsg[0], "put");
BOOST_REQUIRE_EQUAL(splitMsg[1], "icinga.host.state");
BOOST_REQUIRE_CLOSE(boost::lexical_cast<double>(splitMsg[2]), Utility::GetTime(), 1);
BOOST_REQUIRE_EQUAL(splitMsg[3], "1");
BOOST_REQUIRE_EQUAL(splitMsg[4], "host=h1");
PauseWriter();
}
BOOST_AUTO_TEST_CASE(pause_with_pending_work)
{
ResumeWriter();
// Make OpenTsdbWriter send a huge message that fills up the connection's buffer.
ReceiveCheckResults(1, ServiceState::ServiceCritical, [&](const CheckResult::Ptr& cr) {
cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("aaaaa", 24 * 1024 * 1024), 1});
});
// Accept the connection, and read until OpenTsdbWriter has started sending the large part
// of the PerfdataValue sent above.
Accept();
GetDataUntil("aaaaa");
// Now stop reading and try to pause OpenTsdbWriter.
PauseWriter();
REQUIRE_LOG_MESSAGE("Operation canceled\\.", 10s);
REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,182 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#pragma once
#include <BoostTestTargetConfig.h>
#include "base/io-engine.hpp"
#include "base/json.hpp"
#include "base/tlsstream.hpp"
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/string_body.hpp>
namespace icinga {
/**
* A fixture that provides methods to simulate a perfdata target
*/
class PerfdataWriterTargetFixture
{
public:
PerfdataWriterTargetFixture()
: icinga::PerfdataWriterTargetFixture(Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()))
{
}
explicit PerfdataWriterTargetFixture(const Shared<boost::asio::ssl::context>::Ptr& sslCtx)
: icinga::PerfdataWriterTargetFixture(Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslCtx))
{
m_SslContext = sslCtx;
}
explicit PerfdataWriterTargetFixture(AsioTlsOrTcpStream stream)
: m_Stream(std::move(stream)),
m_Acceptor(
IoEngine::Get().GetIoContext(),
boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::loopback(), 0}
)
{
}
unsigned short GetPort() { return m_Acceptor.local_endpoint().port(); }
void Listen()
{
boost::asio::socket_base::receive_buffer_size option{512};
m_Acceptor.set_option(option);
m_Acceptor.listen();
}
void CloseAcceptor() { m_Acceptor.close(); }
void Accept()
{
BOOST_REQUIRE_NO_THROW(
std::visit([&](auto& stream) { return m_Acceptor.accept(stream->lowest_layer()); }, m_Stream)
);
}
void Handshake()
{
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
using handshake_type = UnbufferedAsioTlsStream::handshake_type;
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
BOOST_REQUIRE_NO_THROW(stream->next_layer().handshake(handshake_type::server));
BOOST_REQUIRE(stream->next_layer().IsVerifyOK());
}
void Shutdown()
{
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
try {
stream->next_layer().shutdown();
} catch (const std::exception& ex) {
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
!se || se->code() != boost::asio::error::eof) {
BOOST_FAIL("Exception in shutdown(): " << ex.what());
}
}
m_Stream = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
}
std::string GetRequestBody()
{
using namespace boost::asio::ip;
using namespace boost::beast;
boost::system::error_code ec;
http::request_parser<boost::beast::http::string_body> parser;
std::visit([&](auto& stream) { http::read(*stream, m_Buffer, parser, ec); }, m_Stream);
BOOST_REQUIRE(!ec);
return parser.get().body();
}
auto GetSplitRequestBody(char delim)
{
auto body = GetRequestBody();
std::vector<std::string> result{};
boost::split(result, body, boost::is_any_of(std::string{delim}));
return result;
}
auto GetSplitDecodedRequestBody()
{
Array::Ptr result = new Array;
for (const auto& line : GetSplitRequestBody('\n')) {
if (!line.empty()) {
result->Add(JsonDecode(line));
}
}
return result;
}
template<typename T>
std::string GetDataUntil(T&& delim)
{
using namespace boost::asio::ip;
boost::system::error_code ec;
auto bytesRead = std::visit(
[&](auto& stream) { return boost::asio::read_until(*stream, m_Buffer, std::forward<T>(delim), ec); },
m_Stream
);
BOOST_REQUIRE_MESSAGE(!ec, ec.message());
std::string ret{
boost::asio::buffers_begin(m_Buffer.data()), boost::asio::buffers_begin(m_Buffer.data()) + bytesRead - 1
};
m_Buffer.consume(bytesRead);
return ret;
}
std::size_t ReadRemainingData()
{
boost::system::error_code ec;
std::size_t bytesRead{};
std::visit(
[&](auto& stream) {
while (!ec) {
bytesRead += stream->read_some(m_Buffer.prepare(4096), ec);
std::cout << "bytesRead: " << bytesRead << "; ec: " << ec.message() << std::endl;
}
},
m_Stream
);
// BOOST_REQUIRE_MESSAGE(!ec, ec.message());
return bytesRead;
}
void SendResponse(boost::beast::http::status status = boost::beast::http::status::ok)
{
using namespace boost::asio::ip;
using namespace boost::beast;
boost::system::error_code ec;
http::response<boost::beast::http::empty_body> response;
response.result(status);
std::visit([&](auto& stream) { http::write(*stream, response, ec); }, m_Stream);
BOOST_REQUIRE_MESSAGE(!ec, ec.message());
}
void CloseConnection()
{
std::visit([&](auto& stream) { stream->lowest_layer().close(); }, m_Stream);
}
private:
boost::asio::streambuf m_Buffer;
AsioTlsOrTcpStream m_Stream;
boost::asio::ip::tcp::acceptor m_Acceptor;
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
};
} // namespace icinga

View file

@ -0,0 +1,227 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include <utility>
#include "perfdata/perfdatawriterconnection.hpp"
#include "test/perfdata-perfdatatargetfixture.hpp"
#include "test/perfdata-perfdatawriterfixture.hpp"
#include "test/remote-certificate-fixture.hpp"
#include "test/test-ctest.hpp"
#include "test/test-thread.hpp"
#include "test/utils.hpp"
using namespace icinga;
class TlsPerfdataWriterFixture : public CertificateFixture, public PerfdataWriterTargetFixture
{
public:
TlsPerfdataWriterFixture() : PerfdataWriterTargetFixture(MakeContext())
{
auto pdwCert = EnsureCertFor("client");
m_PdwSslContext = SetupSslContext(
pdwCert.crtFile,
pdwCert.keyFile,
m_CaCrtFile.string(),
"",
DEFAULT_TLS_CIPHERS,
DEFAULT_TLS_PROTOCOLMIN,
DebugInfo()
);
m_Conn = new PerfdataWriterConnection{"127.0.0.1", std::to_string(GetPort()), m_PdwSslContext};
}
auto& GetConnection() { return *m_Conn; }
private:
Shared<boost::asio::ssl::context>::Ptr MakeContext()
{
auto testCert = EnsureCertFor("server");
return SetupSslContext(
testCert.crtFile,
testCert.keyFile,
m_CaCrtFile.string(),
"",
DEFAULT_TLS_CIPHERS,
DEFAULT_TLS_PROTOCOLMIN,
DebugInfo()
);
}
Shared<boost::asio::ssl::context>::Ptr m_PdwSslContext;
PerfdataWriterConnection::Ptr m_Conn;
};
BOOST_FIXTURE_TEST_SUITE(perfdatawriterconnection, TlsPerfdataWriterFixture,
*CTestProperties("FIXTURES_REQUIRED ssl_certs")
*boost::unit_test::label("perfdata"))
/* If there is no acceptor listening on the other side, connecting should fail.
*/
BOOST_AUTO_TEST_CASE(connection_refused)
{
CloseAcceptor();
TestThread mockTargetThread{[&]() { GetConnection().StartDisconnectTimeout(50ms); }};
BOOST_REQUIRE_EXCEPTION(
GetConnection().Send(boost::asio::const_buffer{"foobar", 7}),
boost::system::system_error,
[](const auto& ex) -> bool { return ex.code() == boost::asio::error::operation_aborted; }
);
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
/* The PerfdataWriterConnection connects automatically when sending the first data.
* In case of http we also need to support disconnecting and reconnecting.
*/
BOOST_AUTO_TEST_CASE(ensure_connected)
{
TestThread mockTargetThread{[&]() {
Accept();
Handshake();
auto ret = GetDataUntil('\0');
BOOST_REQUIRE_EQUAL(ret, "foobar");
Shutdown();
/* Test a second cycle to make sure reusing the socket works.
*/
Accept();
Handshake();
ret = GetDataUntil('\0');
BOOST_REQUIRE_EQUAL(ret, "foobar");
Shutdown();
}};
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
GetConnection().Disconnect();
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
GetConnection().Disconnect();
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
/* This tests a "manual" disconnect() while in the diconnection timeout, similar to what a
* perfdata writer does if the it manages to finish the WorkQueue before the timeout runs out.
*/
BOOST_AUTO_TEST_CASE(disconnect_during_timeout)
{
TestThread mockTargetThread{[&]() {
Accept();
Handshake();
auto ret = GetDataUntil('\0');
BOOST_REQUIRE_EQUAL(ret, "foobar");
Shutdown();
}};
BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{"foobar", 7}));
GetConnection().StartDisconnectTimeout(50ms);
BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect());
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
/* While the grace-period given through StartDisconnectTimeout is active, data can still be sent,
* assuming we had already connected to the server.
*/
BOOST_AUTO_TEST_CASE(finish_during_timeout)
{
TestThread mockTargetThread{[&]() {
Accept();
Handshake();
auto ret = GetDataUntil('\0');
BOOST_REQUIRE_EQUAL(ret, "foobar");
ret = GetDataUntil('\0');
BOOST_REQUIRE_EQUAL(ret, "foobar");
Shutdown();
}};
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
GetConnection().StartDisconnectTimeout(50ms);
GetConnection().Send(boost::asio::const_buffer{"foobar", 7});
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
/* Stops the connection before any attempt to connect anywhere has been made. This is needed to
* speed up quick successive reloads of the daemon.
*/
BOOST_AUTO_TEST_CASE(stop_immediately)
{
auto start = std::chrono::steady_clock::now();
GetConnection().StartDisconnectTimeout(50ms);
BOOST_REQUIRE_EXCEPTION(
GetConnection().Send(boost::asio::const_buffer{"foobar", 7}),
boost::system::system_error,
[](const auto& ex) -> bool { return ex.code() == boost::asio::error::operation_aborted; }
);
// Ensure that the Send() has actually returned before the timeout has elapsed.
BOOST_REQUIRE(std::chrono::steady_clock::now() - start < 40ms);
}
/* For the client, even a hanging server will accept the connection immediately, since it's done
* in the kernel. But in that case the TLS handshake will be stuck, so we need to verify that a
* handshake can be interrupted by StartDisconnectTimeout().
*/
BOOST_AUTO_TEST_CASE(stuck_in_handshake)
{
TestThread mockTargetThread{[&]() {
Accept();
GetConnection().StartDisconnectTimeout(50ms);
}};
BOOST_REQUIRE_EXCEPTION(
GetConnection().Send(boost::asio::const_buffer{"foobar", 7}),
boost::system::system_error,
[&](const auto& ex) { return ex.code() == boost::asio::error::operation_aborted; }
);
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
/* When the disconnect timeout runs out while sending something to a slow or blocking server, we
* expect the send to be aborted immediately with an 'operation cancelled' exception, in order to
* not prolong the shutdown of a perfdata writer.
* No regular shutdown can be performed in this case, because the stream has been truncated. The
* server will need to handle this one on their own.
*/
BOOST_AUTO_TEST_CASE(stuck_sending)
{
TestThread mockTargetThread{[&]() {
Accept();
Handshake();
auto ret = GetDataUntil("#");
BOOST_REQUIRE_EQUAL(ret, "foobar");
/* This is necessary so ultry-slow machines (like the Github-workflow building the
* container image) don't interrupt the connection process with the successive 50ms
* disconnect timeout.
*/
auto start = std::chrono::steady_clock::now();
while (!GetConnection().IsConnected() && std::chrono::steady_clock::now() - start < 1s) {
std::this_thread::sleep_for(10ms);
}
GetConnection().StartDisconnectTimeout(1s);
}};
// Allocate a large string that will fill the buffers on both sides of the connection, in
// order to make Send() block.
auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024);
BOOST_REQUIRE_EXCEPTION(
GetConnection().Send(boost::asio::const_buffer{randomData.data(), randomData.size()}),
boost::system::system_error,
[&](const auto& ex) {
BOOST_TEST_INFO("Exception: " << ex.what());
return ex.code() == boost::asio::error::operation_aborted;
}
);
REQUIRE_JOINS_WITHIN(mockTargetThread, 1s);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -0,0 +1,113 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#pragma once
#include <BoostTestTargetConfig.h>
#include "base/perfdatavalue.hpp"
#include "base/wait-group.hpp"
#include "config/configcompiler.hpp"
#include "config/configitem.hpp"
#include "icinga/host.hpp"
#include "test/base-testloggerfixture.hpp"
#include "test/perfdata-perfdatatargetfixture.hpp"
#include <boost/hana.hpp>
namespace icinga {
template<typename Writer>
class PerfdataWriterFixture : public PerfdataWriterTargetFixture, public TestLoggerFixture
{
public:
PerfdataWriterFixture() : m_Writer(new Writer)
{
auto createObjects = [&]() {
String config = R"CONFIG(
object CheckCommand "dummy" {
command = "/bin/echo"
}
object Host "h1" {
address = "h1"
check_command = "dummy"
enable_notifications = true
enable_active_checks = false
enable_passive_checks = true
}
)CONFIG";
std::unique_ptr<Expression> expr = ConfigCompiler::CompileText("<test>", config);
expr->Evaluate(*ScriptFrame::GetCurrentFrame());
};
ConfigItem::RunWithActivationContext(new Function("CreateTestObjects", createObjects));
m_Host = Host::GetByName("h1");
BOOST_REQUIRE(m_Host);
m_Writer->SetPort(std::to_string(GetPort()));
m_Writer->SetName(m_Writer->GetReflectionType()->GetName());
m_Writer->SetDisconnectTimeout(0.05);
auto hasFlushInterval = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushInterval(0.05)) {});
if constexpr (decltype(hasFlushInterval(std::declval<Writer>()))::value) {
m_Writer->SetFlushInterval(0.05);
}
auto hasFlushThreshold = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushThreshold(1)) {});
if constexpr (decltype(hasFlushThreshold(std::declval<Writer>()))::value) {
m_Writer->SetFlushThreshold(1);
}
}
/**
* Make our test host receive a number of check-results.
*
* @param num The number of check-results to receive
* @param state The state the check results should have
* @param fn A function that will be passed the current check-result
*/
void ReceiveCheckResults(std::size_t num, ServiceState state, const std::function<void(const CheckResult::Ptr&)>& fn = {})
{
StoppableWaitGroup::Ptr wg = new StoppableWaitGroup();
for (auto i = 0UL; i < num; ++i) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(state);
double now = Utility::GetTime();
cr->SetActive(false);
cr->SetScheduleStart(now);
cr->SetScheduleEnd(now);
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);
Array::Ptr perfData = new Array;
perfData->Add(new PerfdataValue{"thing", 42});
cr->SetPerformanceData(perfData);
if (fn) {
fn(cr);
}
BOOST_REQUIRE(m_Host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok);
}
}
void ResumeWriter()
{
static_cast<ConfigObject::Ptr>(m_Writer)->OnConfigLoaded();
m_Writer->SetActive(true);
m_Writer->Activate();
BOOST_REQUIRE(!m_Writer->IsPaused());
}
void PauseWriter() { static_cast<ConfigObject::Ptr>(m_Writer)->Pause(); }
auto GetWriter() { return m_Writer; }
private:
Host::Ptr m_Host;
typename Writer::Ptr m_Writer;
};
} // namespace icinga

View file

@ -38,6 +38,11 @@ struct CertificateFixture : ConfigurationDataDirFixture
}
}
CertificateFixture(const CertificateFixture&) = delete;
CertificateFixture(CertificateFixture&&) = delete;
CertificateFixture& operator=(const CertificateFixture&) = delete;
CertificateFixture& operator=(CertificateFixture&&) = delete;
~CertificateFixture()
{
namespace fs = boost::filesystem;

61
test/test-thread.hpp Normal file
View file

@ -0,0 +1,61 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#pragma once
#include <functional>
#include <future>
#include <thread>
#include <utility>
#define REQUIRE_JOINS_WITHIN(t, timeout) \
BOOST_REQUIRE_MESSAGE(t.TryJoinFor(timeout), "Thread not joinable within timeout.")
#define CHECK_JOINS_WITHIN(t, timeout) \
BOOST_REQUIRE_MESSAGE(t.TryJoinFor(timeout), "Thread not joinable within timeout.")
#define TEST_JOINS_WITHIN(t, timeout) \
BOOST_REQUIRE_MESSAGE(t.TryJoinFor(timeout), "Thread not joinable within timeout.")
#define REQUIRE_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
#define CHECK_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
#define TEST_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.")
namespace icinga {
class TestThread
{
public:
explicit TestThread(std::function<void()> fn) : TestThread(std::move(fn), std::promise<void>{}) {}
bool Joinable()
{
auto status = m_JoinFuture.wait_for(std::chrono::milliseconds{0});
return status == std::future_status::ready;
}
template<class Rep, class Period>
bool TryJoinFor(std::chrono::duration<Rep, Period> timeout)
{
auto status = m_JoinFuture.wait_for(timeout);
if (status == std::future_status::ready) {
m_Thread.join();
return true;
}
return false;
}
bool TryJoin() { return TryJoinFor(std::chrono::milliseconds{0}); }
private:
explicit TestThread(std::function<void()> fn, std::promise<void> joinPromise)
: m_JoinFuture(joinPromise.get_future()), m_Thread([fn = std::move(fn), jp = std::move(joinPromise)]() mutable {
fn();
jp.set_value();
})
{
}
std::future<void> m_JoinFuture;
std::thread m_Thread;
};
} // namespace icinga

View file

@ -5,6 +5,7 @@
#include <iomanip>
#include <sstream>
#include <boost/test/unit_test.hpp>
#include <boost/random.hpp>
tm make_tm(std::string s)
{
@ -65,3 +66,16 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture()
#endif
tzset();
}
std::string GetRandomString(std::string prefix, std::size_t length)
{
std::string alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
boost::random::mt19937 generator;
boost::random::uniform_int_distribution<> distribution(0, alphabet.size() - 1);
for (auto i = 0U; i < length; i++) {
prefix += alphabet[distribution(generator)];
}
return prefix;
}

View file

@ -23,3 +23,5 @@ struct GlobalTimezoneFixture
char *tz;
};
std::string GetRandomString(std::string prefix, std::size_t length);