Refactor HttpMessage into generalized templated types

This adds generalized IncomingHttpMessage and OutgoingHttpMessage templates
that support different types of streams (via a std::variant) and can both
be used for either requests or responses.

The tacked on metadata from the old HttpRequest and server connection from
the old HttpServerConnection have been moved to HttpApi(Request|Response)
classes that derive from the above generalized message types.
This commit is contained in:
Johannes Schmidt 2026-01-22 12:41:33 +01:00
parent a0f603f608
commit 1505f09ed6
6 changed files with 237 additions and 106 deletions

View file

@ -13,6 +13,7 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <variant>
#include <boost/asio/buffered_stream.hpp> #include <boost/asio/buffered_stream.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
@ -122,9 +123,9 @@ private:
} }
}; };
typedef boost::asio::buffered_stream<boost::asio::ip::tcp::socket> AsioTcpStream; using AsioTcpStream = boost::asio::buffered_stream<boost::asio::ip::tcp::socket>;
typedef std::pair<Shared<AsioTlsStream>::Ptr, Shared<AsioTcpStream>::Ptr> OptionalTlsStream; using OptionalTlsStream = std::pair<Shared<AsioTlsStream>::Ptr, Shared<AsioTcpStream>::Ptr>;
using AsioTlsOrTcpStream = std::variant<Shared<AsioTlsStream>::Ptr, Shared<AsioTcpStream>::Ptr>;
} }
#endif /* TLSSTREAM_H */ #endif /* TLSSTREAM_H */

View file

@ -1,7 +1,6 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ /* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
#include "remote/httpmessage.hpp" #include "remote/httpmessage.hpp"
#include "base/io-engine.hpp"
#include "base/json.hpp" #include "base/json.hpp"
#include "remote/httputility.hpp" #include "remote/httputility.hpp"
#include "remote/url.hpp" #include "remote/url.hpp"
@ -27,10 +26,15 @@ constexpr std::size_t l_FlushThreshold = 128UL * 1024UL;
* *
* @ingroup base * @ingroup base
*/ */
template<typename Message>
class HttpResponseJsonWriter : public AsyncJsonWriter class HttpResponseJsonWriter : public AsyncJsonWriter
{ {
public: public:
explicit HttpResponseJsonWriter(HttpApiResponse& msg) : m_Message{msg} HttpResponseJsonWriter(const HttpResponseJsonWriter&) = delete;
HttpResponseJsonWriter(HttpResponseJsonWriter&&) = delete;
HttpResponseJsonWriter& operator=(const HttpResponseJsonWriter&) = delete;
HttpResponseJsonWriter& operator=(HttpResponseJsonWriter&&) = delete;
explicit HttpResponseJsonWriter(Message& msg) : m_Message{msg}
{ {
m_Message.body().Start(); m_Message.body().Start();
#if BOOST_VERSION >= 107000 #if BOOST_VERSION >= 107000
@ -59,23 +63,37 @@ public:
} }
private: private:
HttpApiResponse& m_Message; Message& m_Message;
}; };
HttpApiRequest::HttpApiRequest(Shared<AsioTlsStream>::Ptr stream) : m_Stream(std::move(stream)) template<bool isRequest, typename Body, typename StreamVariant>
IncomingHttpMessage<isRequest, Body, StreamVariant>::IncomingHttpMessage(StreamVariant stream)
: m_Stream(std::move(stream))
{ {
} }
void HttpApiRequest::ParseHeader(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) template<bool isRequest, typename Body, typename StreamVariant>
void IncomingHttpMessage<isRequest, Body, StreamVariant>::ParseHeader(
boost::beast::flat_buffer& buf,
boost::asio::yield_context yc
)
{ {
boost::beast::http::async_read_header(*m_Stream, buf, m_Parser, yc); std::visit([&](auto& stream) { boost::beast::http::async_read_header(*stream, buf, m_Parser, yc); }, m_Stream);
base() = m_Parser.get().base(); Base::base() = m_Parser.get().base();
} }
void HttpApiRequest::ParseBody(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) template<bool isRequest, typename Body, typename StreamVariant>
void IncomingHttpMessage<isRequest, Body, StreamVariant>::ParseBody(
boost::beast::flat_buffer& buf,
boost::asio::yield_context yc
)
{
std::visit([&](auto& stream) { boost::beast::http::async_read(*stream, buf, m_Parser, yc); }, m_Stream);
Base::body() = std::move(m_Parser.release().body());
}
HttpApiRequest::HttpApiRequest(Shared<AsioTlsStream>::Ptr stream) : IncomingHttpMessage(std::move(stream))
{ {
boost::beast::http::async_read(*m_Stream, buf, m_Parser, yc);
body() = std::move(m_Parser.release().body());
} }
ApiUser::Ptr HttpApiRequest::User() const ApiUser::Ptr HttpApiRequest::User() const
@ -111,49 +129,72 @@ void HttpApiRequest::DecodeParams()
m_Params = HttpUtility::FetchRequestParameters(m_Url, body()); m_Params = HttpUtility::FetchRequestParameters(m_Url, body());
} }
HttpApiResponse::HttpApiResponse(Shared<AsioTlsStream>::Ptr stream, HttpServerConnection::Ptr server) template<bool isRequest, typename Body, typename StreamVariant>
: m_Server(std::move(server)), m_Stream(std::move(stream)) OutgoingHttpMessage<isRequest, Body, StreamVariant>::OutgoingHttpMessage(StreamVariant stream)
: m_Stream(std::move(stream))
{ {
} }
void HttpApiResponse::Clear() template<bool isRequest, typename Body, typename StreamVariant>
void OutgoingHttpMessage<isRequest, Body, StreamVariant>::Clear()
{ {
ASSERT(!m_SerializationStarted); ASSERT(!m_SerializationStarted);
boost::beast::http::response<body_type>::operator=({}); Base::operator=({});
} }
void HttpApiResponse::Flush(boost::asio::yield_context yc) template<bool isRequest, typename Body, typename StreamVariant>
void OutgoingHttpMessage<isRequest, Body, StreamVariant>::Flush(boost::asio::yield_context yc, bool finish)
{ {
if (!chunked() && !has_content_length()) { if (!Base::chunked() && !Base::has_content_length()) {
ASSERT(!m_SerializationStarted); ASSERT(!m_SerializationStarted);
prepare_payload(); Base::prepare_payload();
} }
m_SerializationStarted = true; std::visit(
[&](auto& stream) {
m_SerializationStarted = true;
if (!m_Serializer.is_header_done()) { if (!m_Serializer.is_header_done()) {
boost::beast::http::write_header(*m_Stream, m_Serializer); boost::beast::http::write_header(*stream, m_Serializer);
} }
boost::system::error_code ec; if (finish) {
boost::beast::http::async_write(*m_Stream, m_Serializer, yc[ec]); Base::body().Finish();
if (ec && ec != boost::beast::http::error::need_buffer) { }
if (yc.ec_) {
*yc.ec_ = ec;
return;
}
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
}
m_Stream->async_flush(yc);
ASSERT(m_Serializer.is_done() || !body().Finished()); boost::system::error_code ec;
boost::beast::http::async_write(*stream, m_Serializer, yc[ec]);
if (ec && ec != boost::beast::http::error::need_buffer) {
if (yc.ec_) {
*yc.ec_ = ec;
return;
}
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
}
stream->async_flush(yc);
ASSERT(m_Serializer.is_done() || !Base::body().Finished());
},
m_Stream
);
}
template<bool isRequest, typename Body, typename StreamVariant>
void OutgoingHttpMessage<isRequest, Body, StreamVariant>::StartStreaming()
{
ASSERT(Base::body().Size() == 0 && !m_SerializationStarted);
Base::body().Start();
Base::chunked(true);
}
HttpApiResponse::HttpApiResponse(Shared<AsioTlsStream>::Ptr stream, HttpServerConnection::Ptr server)
: OutgoingHttpMessage(std::move(stream)), m_Server(std::move(server))
{
} }
void HttpApiResponse::StartStreaming(bool checkForDisconnect) void HttpApiResponse::StartStreaming(bool checkForDisconnect)
{ {
ASSERT(body().Size() == 0 && !m_SerializationStarted); OutgoingHttpMessage::StartStreaming();
body().Start();
chunked(true);
if (checkForDisconnect) { if (checkForDisconnect) {
ASSERT(m_Server); ASSERT(m_Server);
@ -167,7 +208,11 @@ bool HttpApiResponse::IsClientDisconnected() const
return m_Server->Disconnected(); return m_Server->Disconnected();
} }
void HttpApiResponse::SendFile(const String& path, const boost::asio::yield_context& yc) template<bool isRequest, typename Body, typename StreamVariant>
void OutgoingHttpMessage<isRequest, Body, StreamVariant>::SendFile(
const String& path,
const boost::asio::yield_context& yc
)
{ {
std::ifstream fp(path.CStr(), std::ifstream::in | std::ifstream::binary | std::ifstream::ate); std::ifstream fp(path.CStr(), std::ifstream::in | std::ifstream::binary | std::ifstream::ate);
fp.exceptions(std::ifstream::badbit | std::ifstream::eofbit); fp.exceptions(std::ifstream::badbit | std::ifstream::eofbit);
@ -175,22 +220,44 @@ void HttpApiResponse::SendFile(const String& path, const boost::asio::yield_cont
std::uint64_t remaining = fp.tellg(); std::uint64_t remaining = fp.tellg();
fp.seekg(0); fp.seekg(0);
content_length(remaining); Base::content_length(remaining);
body().Start(); Base::body().Start();
while (remaining) { while (remaining) {
auto maxTransfer = std::min(remaining, static_cast<std::uint64_t>(l_FlushThreshold)); auto maxTransfer = std::min(remaining, static_cast<std::uint64_t>(l_FlushThreshold));
auto buf = *body().Buffer().prepare(maxTransfer).begin(); using BodyBuffer = std::decay_t<decltype(std::declval<typename Body::value_type>().Buffer())>;
using BufferOrSequence = typename BodyBuffer::mutable_buffers_type;
boost::asio::mutable_buffer buf;
if constexpr (!std::is_same_v<BufferOrSequence, boost::asio::mutable_buffer>) {
buf = *Base::body().Buffer().prepare(maxTransfer).begin();
} else {
buf = Base::body().Buffer().prepare(maxTransfer);
}
fp.read(static_cast<char*>(buf.data()), buf.size()); fp.read(static_cast<char*>(buf.data()), buf.size());
body().Buffer().commit(buf.size()); Base::body().Buffer().commit(buf.size());
remaining -= buf.size(); remaining -= buf.size();
Flush(yc); Flush(yc);
} }
} }
JsonEncoder HttpApiResponse::GetJsonEncoder(bool pretty) template<bool isRequest, typename Body, typename StreamVariant>
JsonEncoder OutgoingHttpMessage<isRequest, Body, StreamVariant>::GetJsonEncoder(bool pretty)
{ {
return JsonEncoder{std::make_shared<HttpResponseJsonWriter>(*this), pretty}; return JsonEncoder{
std::make_shared<HttpResponseJsonWriter<OutgoingHttpMessage<isRequest, Body, StreamVariant>>>(*this), pretty
};
} }
// More general instantiations
template class icinga::OutgoingHttpMessage<true, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
template class icinga::OutgoingHttpMessage<false, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
template class icinga::IncomingHttpMessage<true, boost::beast::http::string_body, AsioTlsOrTcpStream>;
template class icinga::IncomingHttpMessage<false, boost::beast::http::string_body, AsioTlsOrTcpStream>;
// Instantiations specifically for HttpApi(Request|Response)
template class icinga::IncomingHttpMessage<true, boost::beast::http::string_body, std::variant<Shared<AsioTlsStream>::Ptr>>;
template class icinga::OutgoingHttpMessage<false, SerializableMultiBufferBody, std::variant<Shared<AsioTlsStream>::Ptr>>;

View file

@ -10,6 +10,7 @@
#include "remote/url.hpp" #include "remote/url.hpp"
#include <boost/beast/http.hpp> #include <boost/beast/http.hpp>
#include <boost/version.hpp> #include <boost/version.hpp>
#include <utility>
namespace icinga { namespace icinga {
@ -143,17 +144,17 @@ struct SerializableBody
}; };
}; };
/** using SerializableMultiBufferBody = SerializableBody<boost::beast::multi_buffer>;
* A wrapper class for a boost::beast HTTP request using SerializableFlatBufferBody = SerializableBody<boost::beast::flat_buffer>;
*
* @ingroup remote
*/
class HttpApiRequest : public boost::beast::http::request<boost::beast::http::string_body>
{
public:
using ParserType = boost::beast::http::request_parser<body_type>;
explicit HttpApiRequest(Shared<AsioTlsStream>::Ptr stream); template<bool isRequest, typename Body, typename StreamVariant>
class IncomingHttpMessage : public boost::beast::http::message<isRequest, Body>
{
using ParserType = boost::beast::http::parser<isRequest, Body>;
using Base = boost::beast::http::message<isRequest, Body>;
public:
explicit IncomingHttpMessage(StreamVariant stream);
/** /**
* Parse the header of the response using the internal parser object. * Parse the header of the response using the internal parser object.
@ -176,34 +177,23 @@ public:
ParserType& Parser() { return m_Parser; } ParserType& Parser() { return m_Parser; }
[[nodiscard]] ApiUser::Ptr User() const;
void User(const ApiUser::Ptr& user);
[[nodiscard]] icinga::Url::Ptr Url() const;
void DecodeUrl();
[[nodiscard]] Dictionary::Ptr Params() const;
void DecodeParams();
private: private:
ApiUser::Ptr m_User;
Url::Ptr m_Url;
Dictionary::Ptr m_Params;
ParserType m_Parser; ParserType m_Parser;
Shared<AsioTlsStream>::Ptr m_Stream; StreamVariant m_Stream;
}; };
/** using IncomingHttpRequest = IncomingHttpMessage<true, boost::beast::http::string_body, AsioTlsOrTcpStream>;
* A wrapper class for a boost::beast HTTP response using IncomingHttpResponse = IncomingHttpMessage<false, boost::beast::http::string_body, AsioTlsOrTcpStream>;
*
* @ingroup remote template<bool isRequest, typename Body, typename StreamVariant>
*/ class OutgoingHttpMessage : public boost::beast::http::message<isRequest, Body>
class HttpApiResponse : public boost::beast::http::response<SerializableBody<boost::beast::multi_buffer>>
{ {
using Serializer = boost::beast::http::serializer<isRequest, Body>;
using Base = boost::beast::http::message<isRequest, Body>;
public: public:
explicit HttpApiResponse(Shared<AsioTlsStream>::Ptr stream, HttpServerConnection::Ptr server = nullptr); explicit OutgoingHttpMessage(StreamVariant stream);
/* Delete the base class clear() which is inherited from the fields<> class and doesn't /* Delete the base class clear() which is inherited from the fields<> class and doesn't
* clear things like the body or obviously our own members. * clear things like the body or obviously our own members.
@ -217,6 +207,32 @@ public:
*/ */
void Clear(); void Clear();
/**
* Commits the specified number of bytes (previously obtained via @c prepare()) for reading.
*
* This function makes the specified number of bytes available in the body buffer for reading.
*
* @param size The number of bytes to commit
*/
void Commit(std::size_t size) { Base::body().Buffer().commit(size); }
/**
* Prepare a buffer of the specified size for writing.
*
* The returned buffer serves just as a view onto the internal buffer sequence but does not actually
* own the memory. Thus, destroying the returned buffer will not free any memory it represents.
*
* @param size The size of the buffer to prepare
*
* @return A mutable buffer representing the prepared space
*/
auto Prepare(std::size_t size) { return Base::body().Buffer().prepare(size); }
/**
* Enables chunked encoding.
*/
void StartStreaming();
/** /**
* Writes as much of the response as is currently available. * Writes as much of the response as is currently available.
* *
@ -228,31 +244,10 @@ public:
* *
* @param yc The yield_context for this operation * @param yc The yield_context for this operation
*/ */
void Flush(boost::asio::yield_context yc); void Flush(boost::asio::yield_context yc, bool finish = false);
[[nodiscard]] bool HasSerializationStarted() const { return m_SerializationStarted; } [[nodiscard]] bool HasSerializationStarted() const { return m_SerializationStarted; }
/**
* Enables chunked encoding.
*
* Optionally starts a coroutine that reads from the stream and checks for client-side
* disconnects. In this case, the stream can not be reused after the response has been
* sent and any further requests sent over the connections will be discarded, even if
* no client-side disconnect occurs. This requires that this object has been constructed
* with a valid HttpServerConnection::Ptr.
*
* @param checkForDisconnect Whether to start a coroutine to detect disconnects
*/
void StartStreaming(bool checkForDisconnect = false);
/**
* Check if the server has initiated a disconnect.
*
* @note This requires that the message has been constructed with a pointer to the
* @c HttpServerConnection.
*/
[[nodiscard]] bool IsClientDisconnected() const;
/** /**
* Sends the contents of a file. * Sends the contents of a file.
* *
@ -270,12 +265,80 @@ public:
JsonEncoder GetJsonEncoder(bool pretty = false); JsonEncoder GetJsonEncoder(bool pretty = false);
private: private:
using Serializer = boost::beast::http::response_serializer<HttpApiResponse::body_type>;
Serializer m_Serializer{*this}; Serializer m_Serializer{*this};
bool m_SerializationStarted = false; bool m_SerializationStarted = false;
HttpServerConnection::Ptr m_Server; StreamVariant m_Stream;
Shared<AsioTlsStream>::Ptr m_Stream;
}; };
using OutgoingHttpRequest = OutgoingHttpMessage<true, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
using OutgoingHttpResponse = OutgoingHttpMessage<false, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
class HttpApiRequest
: public IncomingHttpMessage<true, boost::beast::http::string_body, std::variant<Shared<AsioTlsStream>::Ptr>>
{
public:
explicit HttpApiRequest(Shared<AsioTlsStream>::Ptr stream);
[[nodiscard]] ApiUser::Ptr User() const;
void User(const ApiUser::Ptr& user);
[[nodiscard]] icinga::Url::Ptr Url() const;
void DecodeUrl();
[[nodiscard]] Dictionary::Ptr Params() const;
void DecodeParams();
private:
ApiUser::Ptr m_User;
Url::Ptr m_Url;
Dictionary::Ptr m_Params;
};
/**
* A wrapper class for a boost::beast HTTP response for the Icinga 2 API
*
* @ingroup remote
*/
class HttpApiResponse
: public OutgoingHttpMessage<false, SerializableMultiBufferBody, std::variant<Shared<AsioTlsStream>::Ptr>>
{
public:
explicit HttpApiResponse(Shared<AsioTlsStream>::Ptr stream, HttpServerConnection::Ptr server = nullptr);
/**
* Enables chunked encoding.
*
* Optionally starts a coroutine that reads from the stream and checks for client-side
* disconnects. In this case, the stream can not be reused after the response has been
* sent and any further requests sent over the connections will be discarded, even if
* no client-side disconnect occurs. This requires that this object has been constructed
* with a valid HttpServerConnection::Ptr.
*
* @param checkForDisconnect Whether to start a coroutine to detect disconnects
*/
void StartStreaming(bool checkForDisconnect);
/**
* Check if the server has initiated a disconnect.
*
* @note This requires that the message has been constructed with a pointer to the
* @c HttpServerConnection.
*/
[[nodiscard]] bool IsClientDisconnected() const;
private:
HttpServerConnection::Ptr m_Server;
};
// More general instantiations
extern template class OutgoingHttpMessage<true, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
extern template class OutgoingHttpMessage<false, SerializableFlatBufferBody, AsioTlsOrTcpStream>;
extern template class IncomingHttpMessage<true, boost::beast::http::string_body, AsioTlsOrTcpStream>;
extern template class IncomingHttpMessage<false, boost::beast::http::string_body, AsioTlsOrTcpStream>;
// Instantiations specifically for HttpApi(Request|Response)
extern template class IncomingHttpMessage<true, boost::beast::http::string_body, std::variant<Shared<AsioTlsStream>::Ptr>>;
extern template class OutgoingHttpMessage<false, SerializableMultiBufferBody, std::variant<Shared<AsioTlsStream>::Ptr>>;
} // namespace icinga } // namespace icinga

View file

@ -328,7 +328,7 @@ bool ObjectQueryHandler::HandleRequest(
response.result(http::status::ok); response.result(http::status::ok);
response.set(http::field::content_type, "application/json"); response.set(http::field::content_type, "application/json");
response.StartStreaming(); response.StartStreaming(false);
Dictionary::Ptr results = new Dictionary{{"results", new ValueGenerator{generatorFunc}}}; Dictionary::Ptr results = new Dictionary{{"results", new ValueGenerator{generatorFunc}}};
results->Freeze(); results->Freeze();

View file

@ -228,7 +228,7 @@ BOOST_AUTO_TEST_CASE(response_write_chunked)
HttpApiResponse response(server); HttpApiResponse response(server);
response.result(http::status::ok); response.result(http::status::ok);
response.StartStreaming(); response.StartStreaming(false);
BOOST_REQUIRE_NO_THROW(response.Flush(yc)); BOOST_REQUIRE_NO_THROW(response.Flush(yc));
BOOST_REQUIRE(response.HasSerializationStarted()); BOOST_REQUIRE(response.HasSerializationStarted());

View file

@ -422,7 +422,7 @@ BOOST_AUTO_TEST_CASE(client_shutdown)
SetupHttpServerConnection(true); SetupHttpServerConnection(true);
UnitTestHandler::RegisterTestFn("stream", [](HttpApiResponse& response, const boost::asio::yield_context& yc) { UnitTestHandler::RegisterTestFn("stream", [](HttpApiResponse& response, const boost::asio::yield_context& yc) {
response.StartStreaming(); response.StartStreaming(false);
response.Flush(yc); response.Flush(yc);
boost::asio::deadline_timer dt{IoEngine::Get().GetIoContext()}; boost::asio::deadline_timer dt{IoEngine::Get().GetIoContext()};
@ -471,7 +471,7 @@ BOOST_AUTO_TEST_CASE(handler_throw_error)
SetupHttpServerConnection(true); SetupHttpServerConnection(true);
UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context&) { UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context&) {
response.StartStreaming(); response.StartStreaming(false);
response.body() << "test"; response.body() << "test";
boost::system::error_code ec{}; boost::system::error_code ec{};
@ -509,7 +509,7 @@ BOOST_AUTO_TEST_CASE(handler_throw_streaming)
SetupHttpServerConnection(true); SetupHttpServerConnection(true);
UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context& yc) { UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context& yc) {
response.StartStreaming(); response.StartStreaming(false);
response.body() << "test"; response.body() << "test";
response.Flush(yc); response.Flush(yc);