mirror of
https://github.com/Icinga/icinga2.git
synced 2026-05-19 08:32:30 -04:00
732 lines
18 KiB
C++
732 lines
18 KiB
C++
// SPDX-FileCopyrightText: 2012 Icinga GmbH <https://icinga.com>
|
|
// SPDX-License-Identifier: GPL-2.0-or-later
|
|
|
|
#ifndef REDISCONNECTION_H
|
|
#define REDISCONNECTION_H
|
|
|
|
#include "base/array.hpp"
|
|
#include "base/atomic.hpp"
|
|
#include "base/convert.hpp"
|
|
#include "base/io-engine.hpp"
|
|
#include "base/object.hpp"
|
|
#include "base/ringbuffer.hpp"
|
|
#include "base/shared.hpp"
|
|
#include "base/shared-object.hpp"
|
|
#include "base/string.hpp"
|
|
#include "base/tlsstream.hpp"
|
|
#include "base/value.hpp"
|
|
#include <boost/asio/buffer.hpp>
|
|
#include <boost/asio/buffered_stream.hpp>
|
|
#include <boost/asio/deadline_timer.hpp>
|
|
#include <boost/asio/io_context.hpp>
|
|
#include <boost/asio/io_context_strand.hpp>
|
|
#include <boost/asio/ip/tcp.hpp>
|
|
#include <boost/asio/local/stream_protocol.hpp>
|
|
#include <boost/asio/read.hpp>
|
|
#include <boost/asio/read_until.hpp>
|
|
#include <boost/asio/ssl/context.hpp>
|
|
#include <boost/asio/streambuf.hpp>
|
|
#include <boost/asio/write.hpp>
|
|
#include <boost/lexical_cast.hpp>
|
|
#include <boost/regex.hpp>
|
|
#include <boost/utility/string_view.hpp>
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <cstdio>
|
|
#include <cstring>
|
|
#include <future>
|
|
#include <map>
|
|
#include <memory>
|
|
#include <queue>
|
|
#include <stdexcept>
|
|
#include <string_view>
|
|
#include <utility>
|
|
#include <variant>
|
|
#include <vector>
|
|
#include <variant>
|
|
|
|
namespace icinga
|
|
{
|
|
|
|
/**
|
|
* Information required to connect to a Redis server.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
struct RedisConnInfo final : SharedObject
|
|
{
|
|
DECLARE_PTR_TYPEDEFS(RedisConnInfo);
|
|
|
|
bool EnableTls;
|
|
bool TlsInsecureNoverify;
|
|
int Port;
|
|
int DbIndex;
|
|
double ConnectTimeout;
|
|
String Host;
|
|
String Path;
|
|
String User;
|
|
String Password;
|
|
String TlsCertPath;
|
|
String TlsKeyPath;
|
|
String TlsCaPath;
|
|
String TlsCrlPath;
|
|
String TlsProtocolMin;
|
|
String TlsCipherList;
|
|
DebugInfo DbgInfo;
|
|
};
|
|
|
|
/**
|
|
* An Async Redis connection.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class RedisConnection final : public Object
|
|
{
|
|
public:
|
|
DECLARE_PTR_TYPEDEFS(RedisConnection);
|
|
|
|
/**
|
|
* A Redis query argument. Either owned String or hardcoded const char[].
|
|
* Allows mixing these types in a single query transparently, not requiring any conversions.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class QueryArg
|
|
{
|
|
public:
|
|
constexpr QueryArg(std::string_view data) noexcept : m_Data(data)
|
|
{
|
|
}
|
|
|
|
QueryArg(const char data[]) noexcept : m_Data(std::in_place_type<std::string_view>, data)
|
|
{
|
|
}
|
|
|
|
QueryArg(String data) noexcept : m_Data(std::move(data))
|
|
{
|
|
}
|
|
|
|
bool operator<(const QueryArg& rhs) const noexcept // For std::map keys
|
|
{
|
|
return static_cast<std::string_view>(*this) < static_cast<std::string_view>(rhs);
|
|
}
|
|
|
|
operator std::string_view() const noexcept
|
|
{
|
|
return std::visit([](auto& data) { return ViewOf(data); }, m_Data);
|
|
}
|
|
|
|
explicit operator String() const
|
|
{
|
|
std::string_view sv (*this);
|
|
|
|
return String(sv.begin(), sv.end());
|
|
}
|
|
|
|
private:
|
|
std::variant<std::string_view, String> m_Data;
|
|
|
|
static std::string_view ViewOf(const std::string_view& data) noexcept
|
|
{
|
|
return data;
|
|
}
|
|
|
|
static std::string_view ViewOf(const String& data) noexcept
|
|
{
|
|
return {data.CStr(), data.GetLength()};
|
|
}
|
|
};
|
|
|
|
typedef std::vector<QueryArg> Query;
|
|
typedef std::vector<Query> Queries;
|
|
typedef Value Reply;
|
|
typedef std::vector<Reply> Replies;
|
|
|
|
struct QueryAffects
|
|
{
|
|
size_t Config;
|
|
size_t State;
|
|
size_t History;
|
|
|
|
QueryAffects(size_t config = 0, size_t state = 0, size_t history = 0)
|
|
: Config(config), State(state), History(history) { }
|
|
};
|
|
|
|
explicit RedisConnection(const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent = nullptr, bool trackOwnPendingQueries = true);
|
|
void UpdateTLSContext();
|
|
|
|
void Start();
|
|
|
|
bool IsConnected() const
|
|
{
|
|
return m_Connected.load();
|
|
}
|
|
|
|
void FireAndForgetQuery(Query query, QueryAffects affects = {}, bool highPriority = false);
|
|
void FireAndForgetQueries(Queries queries, QueryAffects affects = {});
|
|
|
|
Reply GetResultOfQuery(Query query, QueryAffects affects = {});
|
|
Replies GetResultsOfQueries(Queries queries, QueryAffects affects = {}, bool highPriority = false);
|
|
|
|
void EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback);
|
|
void Sync();
|
|
double GetOldestPendingQueryTs() const;
|
|
|
|
void SetConnectedCallback(std::function<void(boost::asio::yield_context& yc)> callback);
|
|
|
|
int GetQueryCount(RingBuffer::SizeType span);
|
|
|
|
inline std::size_t GetPendingQueryCount() const
|
|
{
|
|
return m_PendingQueries;
|
|
}
|
|
|
|
inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
|
{
|
|
return m_WrittenConfig.UpdateAndGetValues(tv, span);
|
|
}
|
|
|
|
inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
|
{
|
|
return m_WrittenState.UpdateAndGetValues(tv, span);
|
|
}
|
|
|
|
inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
|
{
|
|
return m_WrittenHistory.UpdateAndGetValues(tv, span);
|
|
}
|
|
|
|
private:
|
|
/**
|
|
* What to do with the responses to Redis queries.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
enum class ResponseAction : unsigned char
|
|
{
|
|
Ignore, // discard
|
|
Deliver, // submit to the requestor
|
|
DeliverBulk // submit multiple responses to the requestor at once
|
|
};
|
|
|
|
/**
|
|
* What to do with how many responses to Redis queries.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
struct FutureResponseAction
|
|
{
|
|
size_t Amount;
|
|
ResponseAction Action;
|
|
};
|
|
|
|
using FireAndForgetQ = Shared<Query>::Ptr; // A single query that does not expect a result.
|
|
using FireAndForgetQs = Shared<Queries>::Ptr; // Multiple queries that do not expect results.
|
|
using QueryWithPromise = Shared<std::pair<Query, std::promise<Reply>>>::Ptr; // A single query expecting a result.
|
|
using QueriesWithPromise = Shared<std::pair<Queries, std::promise<Replies>>>::Ptr; // Multiple queries expecting results.
|
|
using QueryCallback = std::function<void(boost::asio::yield_context&)>; // A callback to be executed.
|
|
|
|
/**
|
|
* An item in the write queue to be sent to Redis.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
struct WriteQueueItem
|
|
{
|
|
std::variant<FireAndForgetQ, FireAndForgetQs, QueryWithPromise, QueriesWithPromise, QueryCallback> Item;
|
|
double CTime; // When was this item queued?
|
|
QueryAffects Affects;
|
|
};
|
|
|
|
typedef boost::asio::ip::tcp Tcp;
|
|
typedef boost::asio::local::stream_protocol Unix;
|
|
|
|
typedef boost::asio::buffered_stream<Tcp::socket> TcpConn;
|
|
typedef boost::asio::buffered_stream<Unix::socket> UnixConn;
|
|
|
|
Shared<boost::asio::ssl::context>::Ptr m_TLSContext;
|
|
|
|
template<class AsyncReadStream>
|
|
static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc);
|
|
|
|
template<class AsyncReadStream>
|
|
static std::vector<char> ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0);
|
|
|
|
template<class AsyncWriteStream>
|
|
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
|
|
|
|
static boost::regex m_ErrAuth;
|
|
|
|
RedisConnection(boost::asio::io_context& io, const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries);
|
|
|
|
void Connect(boost::asio::yield_context& yc);
|
|
void ReadLoop(boost::asio::yield_context& yc);
|
|
void WriteLoop(boost::asio::yield_context& yc);
|
|
void LogStats(boost::asio::yield_context& yc);
|
|
bool WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc);
|
|
bool WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc);
|
|
bool WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc);
|
|
bool WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc);
|
|
bool WriteItem(const QueryCallback& item, boost::asio::yield_context& yc);
|
|
Reply ReadOne(boost::asio::yield_context& yc);
|
|
void WriteOne(Query& query, boost::asio::yield_context& yc);
|
|
|
|
template<class StreamPtr>
|
|
Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc);
|
|
|
|
template<class StreamPtr>
|
|
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
|
|
|
|
void IncreasePendingQueries(int count);
|
|
void DecreasePendingQueries(int count);
|
|
void RecordAffected(QueryAffects affected, double when);
|
|
|
|
template<class StreamPtr>
|
|
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
|
|
|
|
template<class StreamPtr>
|
|
Timeout MakeTimeout(StreamPtr& stream);
|
|
|
|
RedisConnInfo::ConstPtr m_ConnInfo; // Redis connection info (immutable)
|
|
|
|
boost::asio::io_context::strand m_Strand;
|
|
Shared<TcpConn>::Ptr m_TcpConn;
|
|
Shared<UnixConn>::Ptr m_UnixConn;
|
|
Shared<AsioTlsStream>::Ptr m_TlsConn;
|
|
Atomic<bool> m_Connecting, m_Connected, m_Started;
|
|
|
|
struct {
|
|
std::queue<WriteQueueItem> HighWriteQ; // High priority writes to be sent to Redis.
|
|
std::queue<WriteQueueItem> NormalWriteQ; // Normal priority writes to be sent to Redis.
|
|
// Requestors, each waiting for a single response
|
|
std::queue<std::promise<Reply>> ReplyPromises;
|
|
// Requestors, each waiting for multiple responses at once
|
|
std::queue<std::promise<Replies>> RepliesPromises;
|
|
// Metadata about all of the above
|
|
std::queue<FutureResponseAction> FutureResponseActions;
|
|
|
|
WriteQueueItem PopFront()
|
|
{
|
|
if (!HighWriteQ.empty()) {
|
|
WriteQueueItem item(std::move(HighWriteQ.front()));
|
|
HighWriteQ.pop();
|
|
return item;
|
|
}
|
|
WriteQueueItem item(std::move(NormalWriteQ.front()));
|
|
NormalWriteQ.pop();
|
|
return item;
|
|
}
|
|
|
|
void Push(WriteQueueItem&& item, bool highPriority)
|
|
{
|
|
if (highPriority) {
|
|
HighWriteQ.push(std::move(item));
|
|
} else {
|
|
NormalWriteQ.push(std::move(item));
|
|
}
|
|
}
|
|
|
|
bool HasWrites() const
|
|
{
|
|
return !HighWriteQ.empty() || !NormalWriteQ.empty();
|
|
}
|
|
} m_Queues;
|
|
|
|
// Indicate that there's something to send/receive
|
|
AsioEvent m_QueuedWrites;
|
|
AsioDualEvent m_QueuedReads;
|
|
|
|
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;
|
|
|
|
// Stats
|
|
RingBuffer m_InputQueries{10};
|
|
RingBuffer m_OutputQueries{15 * 60};
|
|
RingBuffer m_WrittenConfig{15 * 60};
|
|
RingBuffer m_WrittenState{15 * 60};
|
|
RingBuffer m_WrittenHistory{15 * 60};
|
|
// Number of pending Redis queries, always 0 if m_Parent is set unless m_TrackOwnPendingQueries is true.
|
|
std::atomic_size_t m_PendingQueries{0};
|
|
bool m_TrackOwnPendingQueries; // Whether to track pending queries even if m_Parent is set.
|
|
boost::asio::deadline_timer m_LogStatsTimer;
|
|
Ptr m_Parent;
|
|
};
|
|
|
|
/**
|
|
* An error response from the Redis server.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class RedisError final : public Object
|
|
{
|
|
public:
|
|
DECLARE_PTR_TYPEDEFS(RedisError);
|
|
|
|
inline RedisError(String message) : m_Message(std::move(message))
|
|
{
|
|
}
|
|
|
|
inline const String& GetMessage()
|
|
{
|
|
return m_Message;
|
|
}
|
|
|
|
private:
|
|
String m_Message;
|
|
};
|
|
|
|
/**
|
|
* Thrown if the connection to the Redis server has already been lost.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class RedisDisconnected : public std::exception
|
|
{
|
|
public:
|
|
[[nodiscard]] const char* what() const noexcept override
|
|
{
|
|
return "Redis disconnected";
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Thrown on malformed Redis server responses.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class RedisProtocolError : public std::runtime_error
|
|
{
|
|
protected:
|
|
explicit RedisProtocolError(std::string_view msg) : runtime_error("Redis protocol error: " + std::string(msg))
|
|
{
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Thrown on malformed types in Redis server responses.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class BadRedisType : public RedisProtocolError
|
|
{
|
|
public:
|
|
explicit BadRedisType(char type) : RedisProtocolError("bad type: " + std::string(&type, 1))
|
|
{
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Thrown on malformed ints in Redis server responses.
|
|
*
|
|
* @ingroup icingadb
|
|
*/
|
|
class BadRedisInt : public RedisProtocolError
|
|
{
|
|
public:
|
|
explicit BadRedisInt(std::string_view intStr) : RedisProtocolError("bad int: " + std::string(intStr))
|
|
{
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Read a Redis server response from stream
|
|
*
|
|
* @param stream Redis server connection
|
|
*
|
|
* @return The response
|
|
*/
|
|
template<class StreamPtr>
|
|
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
|
|
{
|
|
namespace asio = boost::asio;
|
|
|
|
if (!stream) {
|
|
BOOST_THROW_EXCEPTION(RedisDisconnected());
|
|
}
|
|
|
|
auto strm (stream);
|
|
|
|
try {
|
|
return ReadRESP(*strm, yc);
|
|
} catch (const std::exception&) {
|
|
if (m_Connecting.exchange(false)) {
|
|
m_Connected.store(false);
|
|
stream = nullptr;
|
|
|
|
if (!m_Connecting.exchange(true)) {
|
|
Ptr keepAlive (this);
|
|
|
|
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
|
|
}
|
|
}
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Write a Redis query to stream
|
|
*
|
|
* @param stream Redis server connection
|
|
* @param query Redis query
|
|
*/
|
|
template<class StreamPtr>
|
|
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
|
|
{
|
|
namespace asio = boost::asio;
|
|
|
|
if (!stream) {
|
|
BOOST_THROW_EXCEPTION(RedisDisconnected());
|
|
}
|
|
|
|
auto strm (stream);
|
|
|
|
try {
|
|
WriteRESP(*strm, query, yc);
|
|
strm->async_flush(yc);
|
|
} catch (const std::exception&) {
|
|
if (m_Connecting.exchange(false)) {
|
|
m_Connected.store(false);
|
|
stream = nullptr;
|
|
|
|
if (!m_Connecting.exchange(true)) {
|
|
Ptr keepAlive (this);
|
|
|
|
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
|
|
}
|
|
}
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Initialize a Redis stream
|
|
*
|
|
* @param stream Redis server connection
|
|
* @param query Redis query
|
|
*/
|
|
template<class StreamPtr>
|
|
void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc)
|
|
{
|
|
if (m_ConnInfo->Password.IsEmpty() && !m_ConnInfo->DbIndex) {
|
|
// Trigger NOAUTH
|
|
WriteRESP(*strm, {"PING"}, yc);
|
|
} else {
|
|
if (!m_ConnInfo->User.IsEmpty()) {
|
|
WriteRESP(*strm, {"AUTH", m_ConnInfo->User, m_ConnInfo->Password}, yc);
|
|
} else if (!m_ConnInfo->Password.IsEmpty()) {
|
|
WriteRESP(*strm, {"AUTH", m_ConnInfo->Password}, yc);
|
|
}
|
|
|
|
if (m_ConnInfo->DbIndex) {
|
|
WriteRESP(*strm, {"SELECT", Convert::ToString(m_ConnInfo->DbIndex)}, yc);
|
|
}
|
|
}
|
|
|
|
strm->async_flush(yc);
|
|
|
|
if (m_ConnInfo->Password.IsEmpty() && !m_ConnInfo->DbIndex) {
|
|
Reply pong (ReadRESP(*strm, yc));
|
|
|
|
if (pong.IsObjectType<RedisError>()) {
|
|
// Likely NOAUTH
|
|
BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(pong)->GetMessage()));
|
|
}
|
|
} else {
|
|
if (!m_ConnInfo->Password.IsEmpty()) {
|
|
Reply auth (ReadRESP(*strm, yc));
|
|
|
|
if (auth.IsObjectType<RedisError>()) {
|
|
auto& authErr (RedisError::Ptr(auth)->GetMessage().GetData());
|
|
boost::smatch what;
|
|
|
|
if (boost::regex_search(authErr, what, m_ErrAuth)) {
|
|
Log(LogWarning, "IcingaDB") << authErr;
|
|
} else {
|
|
// Likely WRONGPASS
|
|
BOOST_THROW_EXCEPTION(std::runtime_error(authErr));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (m_ConnInfo->DbIndex) {
|
|
Reply select (ReadRESP(*strm, yc));
|
|
|
|
if (select.IsObjectType<RedisError>()) {
|
|
// Likely NOAUTH or ERR DB
|
|
BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(select)->GetMessage()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates a Timeout which cancels stream's I/O after m_ConnectTimeout
|
|
*
|
|
* @param stream Redis server connection
|
|
*/
|
|
template<class StreamPtr>
|
|
Timeout RedisConnection::MakeTimeout(StreamPtr& stream)
|
|
{
|
|
return Timeout(
|
|
m_Strand,
|
|
boost::posix_time::microseconds(intmax_t(m_ConnInfo->ConnectTimeout * 1000000)),
|
|
[stream] {
|
|
boost::system::error_code ec;
|
|
stream->lowest_layer().cancel(ec);
|
|
}
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Read a Redis protocol value from stream
|
|
*
|
|
* @param stream Redis server connection
|
|
*
|
|
* @return The value
|
|
*/
|
|
template<class AsyncReadStream>
|
|
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
|
|
{
|
|
namespace asio = boost::asio;
|
|
|
|
char type = 0;
|
|
asio::async_read(stream, asio::mutable_buffer(&type, 1), yc);
|
|
|
|
switch (type) {
|
|
case '+':
|
|
{
|
|
auto buf (ReadLine(stream, yc));
|
|
return String(buf.begin(), buf.end());
|
|
}
|
|
case '-':
|
|
{
|
|
auto buf (ReadLine(stream, yc));
|
|
return new RedisError(String(buf.begin(), buf.end()));
|
|
}
|
|
case ':':
|
|
{
|
|
auto buf (ReadLine(stream, yc, 21));
|
|
intmax_t i = 0;
|
|
|
|
try {
|
|
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
|
|
} catch (...) {
|
|
BOOST_THROW_EXCEPTION(BadRedisInt(std::string_view(buf.data(), buf.size())));
|
|
}
|
|
|
|
return (double)i;
|
|
}
|
|
case '$':
|
|
{
|
|
auto buf (ReadLine(stream, yc, 21));
|
|
intmax_t i = 0;
|
|
|
|
try {
|
|
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
|
|
} catch (...) {
|
|
BOOST_THROW_EXCEPTION(BadRedisInt(std::string_view(buf.data(), buf.size())));
|
|
}
|
|
|
|
if (i < 0) {
|
|
return Value();
|
|
}
|
|
|
|
buf.clear();
|
|
buf.insert(buf.end(), i, 0);
|
|
asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc);
|
|
|
|
{
|
|
char crlf[2];
|
|
asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc);
|
|
}
|
|
|
|
return String(buf.begin(), buf.end());
|
|
}
|
|
case '*':
|
|
{
|
|
auto buf (ReadLine(stream, yc, 21));
|
|
intmax_t i = 0;
|
|
|
|
try {
|
|
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
|
|
} catch (...) {
|
|
BOOST_THROW_EXCEPTION(BadRedisInt(std::string_view(buf.data(), buf.size())));
|
|
}
|
|
|
|
if (i < 0) {
|
|
return Empty;
|
|
}
|
|
|
|
Array::Ptr arr = new Array();
|
|
|
|
arr->Reserve(i);
|
|
|
|
for (; i; --i) {
|
|
arr->Add(ReadRESP(stream, yc));
|
|
}
|
|
|
|
return arr;
|
|
}
|
|
default:
|
|
BOOST_THROW_EXCEPTION(BadRedisType(type));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read from stream until \r\n
|
|
*
|
|
* @param stream Redis server connection
|
|
* @param hint Expected amount of data
|
|
*
|
|
* @return Read data ex. \r\n
|
|
*/
|
|
template<class AsyncReadStream>
|
|
std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint)
|
|
{
|
|
namespace asio = boost::asio;
|
|
|
|
std::vector<char> line;
|
|
line.reserve(hint);
|
|
|
|
char next = 0;
|
|
asio::mutable_buffer buf (&next, 1);
|
|
|
|
for (;;) {
|
|
asio::async_read(stream, buf, yc);
|
|
|
|
if (next == '\r') {
|
|
asio::async_read(stream, buf, yc);
|
|
return line;
|
|
}
|
|
|
|
line.emplace_back(next);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Write a Redis protocol value to stream
|
|
*
|
|
* @param stream Redis server connection
|
|
* @param query Redis protocol value
|
|
*/
|
|
template<class AsyncWriteStream>
|
|
void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc)
|
|
{
|
|
namespace asio = boost::asio;
|
|
|
|
asio::streambuf writeBuffer;
|
|
std::ostream msg(&writeBuffer);
|
|
|
|
msg << "*" << query.size() << "\r\n";
|
|
|
|
for (std::string_view arg : query) {
|
|
msg << "$" << arg.length() << "\r\n" << arg << "\r\n";
|
|
}
|
|
|
|
asio::async_write(stream, writeBuffer, yc);
|
|
}
|
|
|
|
}
|
|
|
|
#endif //REDISCONNECTION_H
|