diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 9eed8d3b1..4bff9d629 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -122,9 +123,9 @@ private: } }; -typedef boost::asio::buffered_stream AsioTcpStream; -typedef std::pair::Ptr, Shared::Ptr> OptionalTlsStream; - +using AsioTcpStream = boost::asio::buffered_stream; +using OptionalTlsStream = std::pair::Ptr, Shared::Ptr>; +using AsioTlsOrTcpStream = std::variant::Ptr, Shared::Ptr>; } #endif /* TLSSTREAM_H */ diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp index 6ad761b8a..c3b80dafc 100644 --- a/lib/remote/httpmessage.cpp +++ b/lib/remote/httpmessage.cpp @@ -1,7 +1,6 @@ /* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ #include "remote/httpmessage.hpp" -#include "base/io-engine.hpp" #include "base/json.hpp" #include "remote/httputility.hpp" #include "remote/url.hpp" @@ -27,10 +26,15 @@ constexpr std::size_t l_FlushThreshold = 128UL * 1024UL; * * @ingroup base */ +template class HttpResponseJsonWriter : public AsyncJsonWriter { public: - explicit HttpResponseJsonWriter(HttpApiResponse& msg) : m_Message{msg} + HttpResponseJsonWriter(const HttpResponseJsonWriter&) = delete; + HttpResponseJsonWriter(HttpResponseJsonWriter&&) = delete; + HttpResponseJsonWriter& operator=(const HttpResponseJsonWriter&) = delete; + HttpResponseJsonWriter& operator=(HttpResponseJsonWriter&&) = delete; + explicit HttpResponseJsonWriter(Message& msg) : m_Message{msg} { m_Message.body().Start(); #if BOOST_VERSION >= 107000 @@ -59,23 +63,37 @@ public: } private: - HttpApiResponse& m_Message; + Message& m_Message; }; -HttpApiRequest::HttpApiRequest(Shared::Ptr stream) : m_Stream(std::move(stream)) +template +IncomingHttpMessage::IncomingHttpMessage(StreamVariant stream) + : m_Stream(std::move(stream)) { } -void HttpApiRequest::ParseHeader(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) +template +void IncomingHttpMessage::ParseHeader( + boost::beast::flat_buffer& buf, + boost::asio::yield_context yc +) { - boost::beast::http::async_read_header(*m_Stream, buf, m_Parser, yc); - base() = m_Parser.get().base(); + std::visit([&](auto& stream) { boost::beast::http::async_read_header(*stream, buf, m_Parser, yc); }, m_Stream); + Base::base() = m_Parser.get().base(); } -void HttpApiRequest::ParseBody(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) +template +void IncomingHttpMessage::ParseBody( + boost::beast::flat_buffer& buf, + boost::asio::yield_context yc +) +{ + std::visit([&](auto& stream) { boost::beast::http::async_read(*stream, buf, m_Parser, yc); }, m_Stream); + Base::body() = std::move(m_Parser.release().body()); +} + +HttpApiRequest::HttpApiRequest(Shared::Ptr stream) : IncomingHttpMessage(std::move(stream)) { - boost::beast::http::async_read(*m_Stream, buf, m_Parser, yc); - body() = std::move(m_Parser.release().body()); } ApiUser::Ptr HttpApiRequest::User() const @@ -111,49 +129,72 @@ void HttpApiRequest::DecodeParams() m_Params = HttpUtility::FetchRequestParameters(m_Url, body()); } -HttpApiResponse::HttpApiResponse(Shared::Ptr stream, HttpServerConnection::Ptr server) - : m_Server(std::move(server)), m_Stream(std::move(stream)) +template +OutgoingHttpMessage::OutgoingHttpMessage(StreamVariant stream) + : m_Stream(std::move(stream)) { } -void HttpApiResponse::Clear() +template +void OutgoingHttpMessage::Clear() { ASSERT(!m_SerializationStarted); - boost::beast::http::response::operator=({}); + Base::operator=({}); } -void HttpApiResponse::Flush(boost::asio::yield_context yc) +template +void OutgoingHttpMessage::Flush(boost::asio::yield_context yc, bool finish) { - if (!chunked() && !has_content_length()) { + if (!Base::chunked() && !Base::has_content_length()) { ASSERT(!m_SerializationStarted); - prepare_payload(); + Base::prepare_payload(); } - m_SerializationStarted = true; + std::visit( + [&](auto& stream) { + m_SerializationStarted = true; - if (!m_Serializer.is_header_done()) { - boost::beast::http::write_header(*m_Stream, m_Serializer); - } + if (!m_Serializer.is_header_done()) { + boost::beast::http::write_header(*stream, m_Serializer); + } - boost::system::error_code ec; - boost::beast::http::async_write(*m_Stream, m_Serializer, yc[ec]); - if (ec && ec != boost::beast::http::error::need_buffer) { - if (yc.ec_) { - *yc.ec_ = ec; - return; - } - BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); - } - m_Stream->async_flush(yc); + if (finish) { + Base::body().Finish(); + } - ASSERT(m_Serializer.is_done() || !body().Finished()); + boost::system::error_code ec; + boost::beast::http::async_write(*stream, m_Serializer, yc[ec]); + if (ec && ec != boost::beast::http::error::need_buffer) { + if (yc.ec_) { + *yc.ec_ = ec; + return; + } + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + } + stream->async_flush(yc); + + ASSERT(m_Serializer.is_done() || !Base::body().Finished()); + }, + m_Stream + ); +} + +template +void OutgoingHttpMessage::StartStreaming() +{ + ASSERT(Base::body().Size() == 0 && !m_SerializationStarted); + Base::body().Start(); + Base::chunked(true); +} + +HttpApiResponse::HttpApiResponse(Shared::Ptr stream, HttpServerConnection::Ptr server) + : OutgoingHttpMessage(std::move(stream)), m_Server(std::move(server)) +{ } void HttpApiResponse::StartStreaming(bool checkForDisconnect) { - ASSERT(body().Size() == 0 && !m_SerializationStarted); - body().Start(); - chunked(true); + OutgoingHttpMessage::StartStreaming(); if (checkForDisconnect) { ASSERT(m_Server); @@ -167,7 +208,11 @@ bool HttpApiResponse::IsClientDisconnected() const return m_Server->Disconnected(); } -void HttpApiResponse::SendFile(const String& path, const boost::asio::yield_context& yc) +template +void OutgoingHttpMessage::SendFile( + const String& path, + const boost::asio::yield_context& yc +) { std::ifstream fp(path.CStr(), std::ifstream::in | std::ifstream::binary | std::ifstream::ate); fp.exceptions(std::ifstream::badbit | std::ifstream::eofbit); @@ -175,22 +220,44 @@ void HttpApiResponse::SendFile(const String& path, const boost::asio::yield_cont std::uint64_t remaining = fp.tellg(); fp.seekg(0); - content_length(remaining); - body().Start(); + Base::content_length(remaining); + Base::body().Start(); while (remaining) { auto maxTransfer = std::min(remaining, static_cast(l_FlushThreshold)); - auto buf = *body().Buffer().prepare(maxTransfer).begin(); + using BodyBuffer = std::decay_t().Buffer())>; + using BufferOrSequence = typename BodyBuffer::mutable_buffers_type; + + boost::asio::mutable_buffer buf; + + if constexpr (!std::is_same_v) { + buf = *Base::body().Buffer().prepare(maxTransfer).begin(); + } else { + buf = Base::body().Buffer().prepare(maxTransfer); + } fp.read(static_cast(buf.data()), buf.size()); - body().Buffer().commit(buf.size()); + Base::body().Buffer().commit(buf.size()); remaining -= buf.size(); Flush(yc); } } -JsonEncoder HttpApiResponse::GetJsonEncoder(bool pretty) +template +JsonEncoder OutgoingHttpMessage::GetJsonEncoder(bool pretty) { - return JsonEncoder{std::make_shared(*this), pretty}; + return JsonEncoder{ + std::make_shared>>(*this), pretty + }; } + +// More general instantiations +template class icinga::OutgoingHttpMessage; +template class icinga::OutgoingHttpMessage; +template class icinga::IncomingHttpMessage; +template class icinga::IncomingHttpMessage; + +// Instantiations specifically for HttpApi(Request|Response) +template class icinga::IncomingHttpMessage::Ptr>>; +template class icinga::OutgoingHttpMessage::Ptr>>; diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp index ef0f70bed..b26d48480 100644 --- a/lib/remote/httpmessage.hpp +++ b/lib/remote/httpmessage.hpp @@ -10,6 +10,7 @@ #include "remote/url.hpp" #include #include +#include namespace icinga { @@ -143,17 +144,17 @@ struct SerializableBody }; }; -/** - * A wrapper class for a boost::beast HTTP request - * - * @ingroup remote - */ -class HttpApiRequest : public boost::beast::http::request -{ -public: - using ParserType = boost::beast::http::request_parser; +using SerializableMultiBufferBody = SerializableBody; +using SerializableFlatBufferBody = SerializableBody; - explicit HttpApiRequest(Shared::Ptr stream); +template +class IncomingHttpMessage : public boost::beast::http::message +{ + using ParserType = boost::beast::http::parser; + using Base = boost::beast::http::message; + +public: + explicit IncomingHttpMessage(StreamVariant stream); /** * Parse the header of the response using the internal parser object. @@ -176,34 +177,23 @@ public: ParserType& Parser() { return m_Parser; } - [[nodiscard]] ApiUser::Ptr User() const; - void User(const ApiUser::Ptr& user); - - [[nodiscard]] icinga::Url::Ptr Url() const; - void DecodeUrl(); - - [[nodiscard]] Dictionary::Ptr Params() const; - void DecodeParams(); - private: - ApiUser::Ptr m_User; - Url::Ptr m_Url; - Dictionary::Ptr m_Params; - ParserType m_Parser; - Shared::Ptr m_Stream; + StreamVariant m_Stream; }; -/** - * A wrapper class for a boost::beast HTTP response - * - * @ingroup remote - */ -class HttpApiResponse : public boost::beast::http::response> +using IncomingHttpRequest = IncomingHttpMessage; +using IncomingHttpResponse = IncomingHttpMessage; + +template +class OutgoingHttpMessage : public boost::beast::http::message { + using Serializer = boost::beast::http::serializer; + using Base = boost::beast::http::message; + public: - explicit HttpApiResponse(Shared::Ptr stream, HttpServerConnection::Ptr server = nullptr); + explicit OutgoingHttpMessage(StreamVariant stream); /* Delete the base class clear() which is inherited from the fields<> class and doesn't * clear things like the body or obviously our own members. @@ -217,6 +207,32 @@ public: */ void Clear(); + /** + * Commits the specified number of bytes (previously obtained via @c prepare()) for reading. + * + * This function makes the specified number of bytes available in the body buffer for reading. + * + * @param size The number of bytes to commit + */ + void Commit(std::size_t size) { Base::body().Buffer().commit(size); } + + /** + * Prepare a buffer of the specified size for writing. + * + * The returned buffer serves just as a view onto the internal buffer sequence but does not actually + * own the memory. Thus, destroying the returned buffer will not free any memory it represents. + * + * @param size The size of the buffer to prepare + * + * @return A mutable buffer representing the prepared space + */ + auto Prepare(std::size_t size) { return Base::body().Buffer().prepare(size); } + + /** + * Enables chunked encoding. + */ + void StartStreaming(); + /** * Writes as much of the response as is currently available. * @@ -228,31 +244,10 @@ public: * * @param yc The yield_context for this operation */ - void Flush(boost::asio::yield_context yc); + void Flush(boost::asio::yield_context yc, bool finish = false); [[nodiscard]] bool HasSerializationStarted() const { return m_SerializationStarted; } - /** - * Enables chunked encoding. - * - * Optionally starts a coroutine that reads from the stream and checks for client-side - * disconnects. In this case, the stream can not be reused after the response has been - * sent and any further requests sent over the connections will be discarded, even if - * no client-side disconnect occurs. This requires that this object has been constructed - * with a valid HttpServerConnection::Ptr. - * - * @param checkForDisconnect Whether to start a coroutine to detect disconnects - */ - void StartStreaming(bool checkForDisconnect = false); - - /** - * Check if the server has initiated a disconnect. - * - * @note This requires that the message has been constructed with a pointer to the - * @c HttpServerConnection. - */ - [[nodiscard]] bool IsClientDisconnected() const; - /** * Sends the contents of a file. * @@ -270,12 +265,80 @@ public: JsonEncoder GetJsonEncoder(bool pretty = false); private: - using Serializer = boost::beast::http::response_serializer; Serializer m_Serializer{*this}; bool m_SerializationStarted = false; - HttpServerConnection::Ptr m_Server; - Shared::Ptr m_Stream; + StreamVariant m_Stream; }; +using OutgoingHttpRequest = OutgoingHttpMessage; +using OutgoingHttpResponse = OutgoingHttpMessage; + +class HttpApiRequest + : public IncomingHttpMessage::Ptr>> +{ +public: + explicit HttpApiRequest(Shared::Ptr stream); + + [[nodiscard]] ApiUser::Ptr User() const; + void User(const ApiUser::Ptr& user); + + [[nodiscard]] icinga::Url::Ptr Url() const; + void DecodeUrl(); + + [[nodiscard]] Dictionary::Ptr Params() const; + void DecodeParams(); + +private: + ApiUser::Ptr m_User; + Url::Ptr m_Url; + Dictionary::Ptr m_Params; +}; + +/** + * A wrapper class for a boost::beast HTTP response for the Icinga 2 API + * + * @ingroup remote + */ +class HttpApiResponse + : public OutgoingHttpMessage::Ptr>> +{ +public: + explicit HttpApiResponse(Shared::Ptr stream, HttpServerConnection::Ptr server = nullptr); + + /** + * Enables chunked encoding. + * + * Optionally starts a coroutine that reads from the stream and checks for client-side + * disconnects. In this case, the stream can not be reused after the response has been + * sent and any further requests sent over the connections will be discarded, even if + * no client-side disconnect occurs. This requires that this object has been constructed + * with a valid HttpServerConnection::Ptr. + * + * @param checkForDisconnect Whether to start a coroutine to detect disconnects + */ + void StartStreaming(bool checkForDisconnect); + + /** + * Check if the server has initiated a disconnect. + * + * @note This requires that the message has been constructed with a pointer to the + * @c HttpServerConnection. + */ + [[nodiscard]] bool IsClientDisconnected() const; + +private: + HttpServerConnection::Ptr m_Server; +}; + +// More general instantiations +extern template class OutgoingHttpMessage; +extern template class OutgoingHttpMessage; +extern template class IncomingHttpMessage; +extern template class IncomingHttpMessage; + +// Instantiations specifically for HttpApi(Request|Response) +extern template class IncomingHttpMessage::Ptr>>; +extern template class OutgoingHttpMessage::Ptr>>; + } // namespace icinga diff --git a/lib/remote/objectqueryhandler.cpp b/lib/remote/objectqueryhandler.cpp index 10757aa1e..fda93e1f0 100644 --- a/lib/remote/objectqueryhandler.cpp +++ b/lib/remote/objectqueryhandler.cpp @@ -328,7 +328,7 @@ bool ObjectQueryHandler::HandleRequest( response.result(http::status::ok); response.set(http::field::content_type, "application/json"); - response.StartStreaming(); + response.StartStreaming(false); Dictionary::Ptr results = new Dictionary{{"results", new ValueGenerator{generatorFunc}}}; results->Freeze(); diff --git a/test/remote-httpmessage.cpp b/test/remote-httpmessage.cpp index 96a43a357..62232a509 100644 --- a/test/remote-httpmessage.cpp +++ b/test/remote-httpmessage.cpp @@ -228,7 +228,7 @@ BOOST_AUTO_TEST_CASE(response_write_chunked) HttpApiResponse response(server); response.result(http::status::ok); - response.StartStreaming(); + response.StartStreaming(false); BOOST_REQUIRE_NO_THROW(response.Flush(yc)); BOOST_REQUIRE(response.HasSerializationStarted()); diff --git a/test/remote-httpserverconnection.cpp b/test/remote-httpserverconnection.cpp index 4a37f0ec0..1c3e747f8 100644 --- a/test/remote-httpserverconnection.cpp +++ b/test/remote-httpserverconnection.cpp @@ -422,7 +422,7 @@ BOOST_AUTO_TEST_CASE(client_shutdown) SetupHttpServerConnection(true); UnitTestHandler::RegisterTestFn("stream", [](HttpApiResponse& response, const boost::asio::yield_context& yc) { - response.StartStreaming(); + response.StartStreaming(false); response.Flush(yc); boost::asio::deadline_timer dt{IoEngine::Get().GetIoContext()}; @@ -471,7 +471,7 @@ BOOST_AUTO_TEST_CASE(handler_throw_error) SetupHttpServerConnection(true); UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context&) { - response.StartStreaming(); + response.StartStreaming(false); response.body() << "test"; boost::system::error_code ec{}; @@ -509,7 +509,7 @@ BOOST_AUTO_TEST_CASE(handler_throw_streaming) SetupHttpServerConnection(true); UnitTestHandler::RegisterTestFn("throw", [](HttpApiResponse& response, const boost::asio::yield_context& yc) { - response.StartStreaming(); + response.StartStreaming(false); response.body() << "test"; response.Flush(yc);