// SPDX-FileCopyrightText: 2012 Icinga GmbH // SPDX-License-Identifier: GPL-2.0-or-later #include "icingadb/redisconnection.hpp" #include "base/convert.hpp" #include "base/defer.hpp" #include "base/exception.hpp" #include "base/io-engine.hpp" #include "base/logger.hpp" #include "base/string.hpp" #include "base/tcpsocket.hpp" #include "base/tlsutility.hpp" #include "base/utility.hpp" #include #include #include #include #include #include #include #include #include #include #include using namespace icinga; namespace asio = boost::asio; boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH "); RedisConnection::RedisConnection(const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries) : RedisConnection{IoEngine::Get().GetIoContext(), connInfo, parent, trackOwnPendingQueries} { } RedisConnection::RedisConnection( boost::asio::io_context& io, const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries ) : m_ConnInfo{connInfo}, m_Strand(io), m_Connecting(false), m_Connected(false), m_Started(false), m_QueuedWrites(io), m_QueuedReads(io), m_TrackOwnPendingQueries{trackOwnPendingQueries}, m_LogStatsTimer(io), m_Parent(parent) { if (connInfo->EnableTls && connInfo->Path.IsEmpty()) { UpdateTLSContext(); } } void RedisConnection::UpdateTLSContext() { m_TLSContext = SetupSslContext( m_ConnInfo->TlsCertPath, m_ConnInfo->TlsKeyPath, m_ConnInfo->TlsCaPath, m_ConnInfo->TlsCrlPath, m_ConnInfo->TlsCipherList, m_ConnInfo->TlsProtocolMin, m_ConnInfo->DbgInfo ); } void RedisConnection::Start() { if (!m_Started.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); if (!m_Parent) { IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); } } if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } /** * Append a Redis query to a log message * * @param query Redis query * @param msg Log message */ static inline void LogQuery(RedisConnection::Query& query, Log& msg) { int i = 0; for (std::string_view arg : query) { if (++i == 8) { msg << " ..."; break; } if (arg.length() > 64) { msg << " '" << arg.substr(0, 61) << "...'"; } else { msg << " '" << arg << '\''; } } } /** * Queue a Redis query for sending without waiting for the response in a fire-and-forget manner. * * If the highPriority flag is set to true, the query is treated with high priority and placed at the front of * the write queue, ensuring it is sent before other queued queries. This is useful for time-sensitive operations * that require to be executed promptly, which is the case for IcingaDB heartbeat queries. If there are already * queries with high priority in the queue, the new query is inserted after all existing high priority queries but * before any normal priority queries to maintain the order of high priority items. * * @note The highPriority flag should be used sparingly and only for critical queries, as it can affect the overall * performance and responsiveness of the Redis connection by potentially delaying other queued queries. * * @param query The Redis query to be sent. * @param affects Does the query affect config, state or history data. * @param highPriority Whether the query should be treated with high priority. */ void RedisConnection::FireAndForgetQuery(Query query, QueryAffects affects, bool highPriority) { if (LogDebug >= Logger::GetMinLogSeverity()) { Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:"); LogQuery(query, msg); } auto item (Shared::Make(std::move(query))); asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() { m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority); m_QueuedWrites.Set(); IncreasePendingQueries(1); }); } /** * Queue Redis queries for sending * * @param queries Redis queries */ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, QueryAffects affects) { if (LogDebug >= Logger::GetMinLogSeverity()) { for (auto& query : queries) { Log msg(LogDebug, "IcingaDB", "Firing and forgetting query:"); LogQuery(query, msg); } } auto item (Shared::Make(std::move(queries))); asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() { m_Queues.Push(WriteQueueItem{item, ctime, affects}, false); m_QueuedWrites.Set(); IncreasePendingQueries(item->size()); }); } /** * Queue a Redis query for sending, wait for the response and return (or throw) it * * @param query Redis query * * @return The response */ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, QueryAffects affects) { if (LogDebug >= Logger::GetMinLogSeverity()) { Log msg (LogDebug, "IcingaDB", "Executing query:"); LogQuery(query, msg); } std::promise promise; auto future (promise.get_future()); auto item (Shared>>::Make(std::move(query), std::move(promise))); asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() { m_Queues.Push(WriteQueueItem{item, ctime, affects}, false); m_QueuedWrites.Set(); IncreasePendingQueries(1); }); item = nullptr; future.wait(); return future.get(); } /** * Queue Redis queries for sending, wait for the responses and return (or throw) them * * @param queries Redis queries * * @return The responses */ RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, QueryAffects affects, bool highPriority) { if (LogDebug >= Logger::GetMinLogSeverity()) { for (auto& query : queries) { Log msg(LogDebug, "IcingaDB", "Executing query:"); LogQuery(query, msg); } } std::promise promise; auto future (promise.get_future()); auto item (Shared>>::Make(std::move(queries), std::move(promise))); asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() { m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority); m_QueuedWrites.Set(); IncreasePendingQueries(item->first.size()); }); item = nullptr; future.wait(); return future.get(); } void RedisConnection::EnqueueCallback(const std::function& callback) { asio::post(m_Strand, [this, callback, ctime = Utility::GetTime()]() { m_Queues.Push(WriteQueueItem{callback, ctime, {}}, false); m_QueuedWrites.Set(); }); } /** * Puts a no-op command with a result at the end of the queue and wait for the result, * i.e. for everything enqueued to be processed by the server. * * @ingroup icingadb */ void RedisConnection::Sync() { GetResultOfQuery({"PING"}); } /** * Get the enqueue time of the oldest still queued Redis query * * @return *nix timestamp or 0 */ double RedisConnection::GetOldestPendingQueryTs() const { auto promise (Shared>::Make()); auto future (promise->get_future()); asio::post(m_Strand, [this, promise]() { boost::container::static_vector timestamps; if (!m_Queues.HighWriteQ.empty()) { timestamps.push_back(m_Queues.HighWriteQ.front().CTime); } if (!m_Queues.NormalWriteQ.empty()) { timestamps.push_back(m_Queues.NormalWriteQ.front().CTime); } if (timestamps.empty()) { timestamps.push_back(0); } promise->set_value(*boost::range::min_element(timestamps)); }); future.wait(); return future.get(); } /** * Try to connect to Redis */ void RedisConnection::Connect(asio::yield_context& yc) { Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); boost::asio::deadline_timer timer (m_Strand.context()); for (;;) { try { if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") << "Trying to connect to Redis server (async, TLS) on host '" << m_ConnInfo->Host << ":" << m_ConnInfo->Port << "'"; auto conn (Shared::Make(m_Strand.context(), *m_TLSContext, m_ConnInfo->Host)); auto& tlsConn (conn->next_layer()); auto connectTimeout (MakeTimeout(conn)); icinga::Connect(conn->lowest_layer(), m_ConnInfo->Host, Convert::ToString(m_ConnInfo->Port), yc); tlsConn.async_handshake(tlsConn.client, yc); if (!m_ConnInfo->TlsInsecureNoverify) { std::shared_ptr cert (tlsConn.GetPeerCertificate()); if (!cert) { BOOST_THROW_EXCEPTION(std::runtime_error( "Redis didn't present any TLS certificate." )); } if (!tlsConn.IsVerifyOK()) { BOOST_THROW_EXCEPTION(std::runtime_error( "TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError()) )); } } Handshake(conn, yc); m_QueuedReads.WaitForClear(yc); m_TlsConn = std::move(conn); } else { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") << "Trying to connect to Redis server (async) on host '" << m_ConnInfo->Host << ":" << m_ConnInfo->Port << "'"; auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); icinga::Connect(conn->next_layer(), m_ConnInfo->Host, Convert::ToString(m_ConnInfo->Port), yc); Handshake(conn, yc); m_QueuedReads.WaitForClear(yc); m_TcpConn = std::move(conn); } } else { Log(LogInformation, "IcingaDB") << "Trying to connect to Redis server (async) on unix socket path '" << m_ConnInfo->Path << "'"; auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); conn->next_layer().async_connect(Unix::endpoint(m_ConnInfo->Path.CStr()), yc); Handshake(conn, yc); m_QueuedReads.WaitForClear(yc); m_UnixConn = std::move(conn); } m_Connected.store(true); Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server"); // Operate on a copy so that the callback can set a new callback without destroying itself while running. auto callback (m_ConnectedCallback); if (callback) { callback(yc); } break; } catch (const std::exception& ex) { Log(LogCritical, "IcingaDB") << "Cannot connect to Redis server ('" << (m_ConnInfo->Path.IsEmpty() ? m_ConnInfo->Host+":"+Convert::ToString(m_ConnInfo->Port) : m_ConnInfo->Path) << "'): " << ex.what(); } timer.expires_from_now(boost::posix_time::seconds(5)); timer.async_wait(yc); } } /** * Actually receive the responses to the Redis queries send by WriteItem() and handle them */ void RedisConnection::ReadLoop(asio::yield_context& yc) { for (;;) { m_QueuedReads.WaitForSet(yc); while (!m_Queues.FutureResponseActions.empty()) { auto item (std::move(m_Queues.FutureResponseActions.front())); m_Queues.FutureResponseActions.pop(); switch (item.Action) { case ResponseAction::Ignore: try { for (auto i (item.Amount); i; --i) { ReadOne(yc); } } catch (const std::exception& ex) { Log(LogCritical, "IcingaDB") << "Error during receiving the response to a query which has been fired and forgotten: " << ex.what(); continue; } break; case ResponseAction::Deliver: for (auto i (item.Amount); i; --i) { auto promise (std::move(m_Queues.ReplyPromises.front())); m_Queues.ReplyPromises.pop(); Reply reply; try { reply = ReadOne(yc); } catch (const std::exception&) { promise.set_exception(std::current_exception()); continue; } promise.set_value(std::move(reply)); } break; case ResponseAction::DeliverBulk: { auto promise (std::move(m_Queues.RepliesPromises.front())); m_Queues.RepliesPromises.pop(); Replies replies; replies.reserve(item.Amount); for (auto i (item.Amount); i; --i) { try { replies.emplace_back(ReadOne(yc)); } catch (const std::exception&) { promise.set_exception(std::current_exception()); break; } } try { promise.set_value(std::move(replies)); } catch (const std::future_error&) { // Complaint about the above op is not allowed // due to promise.set_exception() was already called } } } } m_QueuedReads.Clear(); } } /** * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}() */ void RedisConnection::WriteLoop(asio::yield_context& yc) { for (;;) { m_QueuedWrites.Wait(yc); while (m_Queues.HasWrites()) { auto queuedWrite(m_Queues.PopFront()); std::visit( [this, &yc, &queuedWrite](const auto& item) { if (WriteItem(item, yc)) { RecordAffected(queuedWrite.Affects, Utility::GetTime()); } }, queuedWrite.Item ); } m_QueuedWrites.Clear(); } } /** * Periodically log current query performance */ void RedisConnection::LogStats(asio::yield_context& yc) { double lastMessage = 0; m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); for (;;) { m_LogStatsTimer.async_wait(yc); m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); if (!IsConnected()) continue; auto now (Utility::GetTime()); bool timeoutReached = now - lastMessage >= 5 * 60; if (m_PendingQueries < 1 && !timeoutReached) continue; auto output (round(m_OutputQueries.CalculateRate(now, 10))); if (m_PendingQueries < output * 5 && !timeoutReached) continue; Log(LogInformation, "IcingaDB") << "Pending queries: " << m_PendingQueries << " (Input: " << round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)"; lastMessage = now; } } /** * Write a single Redis query in a fire-and-forget manner. * * @param item Redis query * * @return true on success, false on failure. */ bool RedisConnection::WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc) { DecreasePendingQueries(1); try { WriteOne(*item, yc); } catch (const std::exception& ex) { Log msg (LogCritical, "IcingaDB", "Error during sending query"); LogQuery(*item, msg); msg << " which has been fired and forgotten: " << ex.what(); return false; } if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); } else { ++m_Queues.FutureResponseActions.back().Amount; } m_QueuedReads.Set(); return true; } /** * Write multiple Redis queries in a fire-and-forget manner. * * @param item Redis queries * * @return true on success, false on failure. */ bool RedisConnection::WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc) { size_t i = 0; DecreasePendingQueries(item->size()); try { for (auto& query : *item) { WriteOne(query, yc); ++i; } } catch (const std::exception& ex) { Log msg (LogCritical, "IcingaDB", "Error during sending query"); LogQuery((*item)[i], msg); msg << " which has been fired and forgotten: " << ex.what(); return false; } if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->size(), ResponseAction::Ignore}); } else { m_Queues.FutureResponseActions.back().Amount += item->size(); } m_QueuedReads.Set(); return true; } /** * Write a single Redis query and enqueue a response promise to be fulfilled once the response has been received. * * @param item Redis query and promise for the response */ bool RedisConnection::WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc) { DecreasePendingQueries(1); try { WriteOne(item->first, yc); } catch (const std::exception&) { item->second.set_exception(std::current_exception()); return false; } m_Queues.ReplyPromises.push(std::move(item->second)); if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); } else { ++m_Queues.FutureResponseActions.back().Amount; } m_QueuedReads.Set(); return true; } /** * Write multiple Redis queries and enqueue a response promise to be fulfilled once all responses have been received. * * @param item Redis queries and promise for the responses. * * @return true on success, false on failure. */ bool RedisConnection::WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc) { DecreasePendingQueries(item->first.size()); try { for (auto& query : item->first) { WriteOne(query, yc); } } catch (const std::exception&) { item->second.set_exception(std::current_exception()); return false; } m_Queues.RepliesPromises.emplace(std::move(item->second)); m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->first.size(), ResponseAction::DeliverBulk}); m_QueuedReads.Set(); return true; } /** * Invokes the provided callback immediately. * * @param item Callback to execute */ bool RedisConnection::WriteItem(const QueryCallback& item, boost::asio::yield_context& yc) { item(yc); return true; } /** * Receive the response to a Redis query * * @return The response */ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) { if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { return ReadOne(m_TlsConn, yc); } else { return ReadOne(m_TcpConn, yc); } } else { return ReadOne(m_UnixConn, yc); } } /** * Send query * * @param query Redis query */ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) { if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { WriteOne(m_TlsConn, query, yc); } else { WriteOne(m_TcpConn, query, yc); } } else { WriteOne(m_UnixConn, query, yc); } } /** * Specify a callback that is run each time a connection is successfully established * * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations. * * @param callback Callback to execute */ void RedisConnection::SetConnectedCallback(std::function callback) { m_ConnectedCallback = std::move(callback); } int RedisConnection::GetQueryCount(RingBuffer::SizeType span) { return m_OutputQueries.UpdateAndGetValues(Utility::GetTime(), span); } void RedisConnection::IncreasePendingQueries(int count) { if (m_Parent) { m_Parent->IncreasePendingQueries(count); } // Only track the pending queries of the root connection or if explicitly // requested to do so for child connections as well. if (!m_Parent || m_TrackOwnPendingQueries) { m_PendingQueries.fetch_add(count); m_InputQueries.InsertValue(Utility::GetTime(), count); } } void RedisConnection::DecreasePendingQueries(int count) { if (m_Parent) { m_Parent->DecreasePendingQueries(count); } // Same as in IncreasePendingQueries(). if (!m_Parent || m_TrackOwnPendingQueries) { m_PendingQueries.fetch_sub(count); m_OutputQueries.InsertValue(Utility::GetTime(), count); } } void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when) { if (m_Parent) { auto parent (m_Parent); asio::post(parent->m_Strand, [parent, affected, when]() { parent->RecordAffected(affected, when); }); } else { if (affected.Config) { m_WrittenConfig.InsertValue(when, affected.Config); } if (affected.State) { m_WrittenState.InsertValue(when, affected.State); } if (affected.History) { m_WrittenHistory.InsertValue(when, affected.History); } } }