icinga2/lib/base/stream.cpp
Yonas Habteab 29a5268961 Replace all existing copyright headers with SPDX headers
I've used the following command to replace the original copyright header
lines in a C-style comment block:

```
$ find . \( -type d \( -name '\..*' -o -name third-party -o -name scripts -o -name prefix -o -name malloc -o -name server -o -name docker -o -name build -o -name doc \) -prune \) -o -type f -exec perl -pi -e 's{/\*[^*]*\(\s*c\s*\)\s*(\d{4})\s*Icinga\s+GmbH[^*]*\*/}{// SPDX-FileCopyrightText: \1 Icinga GmbH <https://icinga.com>\n// SPDX-License-Identifier: GPL-2.0-or-later}gi' {} +
```

For files that use shell-style comments (#) like CMakeLists.txt, I've
used this command:

```
$ find . \( -type d \( -name '\..*' -o -name third-party -o -name scripts -o -name prefix -o -name malloc -o -name server -o -name docker -o -name build -o -name doc \) -prune \) -o -type f -exec perl -pi -e 's{#.*\(\s*c\s*\)\s(\d{4})\sIcinga\s+GmbH.*}{# SPDX-FileCopyrightText: \1 Icinga GmbH <https://icinga.com>\n# SPDX-License-Identifier: GPL-2.0-or-later}gi' {} +
```

And for SQL files:

```
$ find . \( -type d \( -name '\..*' -o -name third-party -o -name scripts -o -name prefix -o -name malloc -o -name server -o -name docker -o -name build -o -name doc \) -prune \) -o -type f \( -name '*.sql' \) -exec perl -pi -e 's{--.*\(c\)\s(\d{4})\sIcinga\sGmbH.*}{-- SPDX-FileCopyrightText: \1 Icinga GmbH <https://icinga.com>\n-- SPDX-License-Identifier: GPL-2.0-or-later}gi' {} +
$ find . \( -type d \( -name '\..*' -o -name third-party -o -name scripts -o -name prefix -o -name malloc -o -name server -o -name docker -o -name build -o -name doc \) -prune \) -o -type f \( -name '*.sql' \) -exec perl -pi -e 's{-- Copyright \(c\)\s(\d{4})\sIcinga\s+Development\sTeam.*}{-- SPDX-FileCopyrightText: \1 Icinga GmbH <https://icinga.com>\n-- SPDX-License-Identifier: GPL-2.0-or-later}gi' {} +
```
2026-02-03 17:54:50 +01:00

145 lines
3.1 KiB
C++

// SPDX-FileCopyrightText: 2012 Icinga GmbH <https://icinga.com>
// SPDX-License-Identifier: GPL-2.0-or-later
#include "base/stream.hpp"
#include <boost/algorithm/string/trim.hpp>
#include <chrono>
using namespace icinga;
void Stream::RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler)
{
if (SupportsWaiting())
OnDataAvailable.connect(handler);
else
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
}
bool Stream::SupportsWaiting() const
{
return false;
}
bool Stream::IsDataAvailable() const
{
return false;
}
void Stream::Shutdown()
{
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
}
void Stream::SignalDataAvailable()
{
OnDataAvailable(this);
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_CV.notify_all();
}
}
bool Stream::WaitForData()
{
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
std::unique_lock<std::mutex> lock(m_Mutex);
while (!IsDataAvailable() && !IsEof())
m_CV.wait(lock);
return IsDataAvailable() || IsEof();
}
bool Stream::WaitForData(int timeout)
{
namespace ch = std::chrono;
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
if (timeout < 0)
BOOST_THROW_EXCEPTION(std::runtime_error("Timeout can't be negative"));
std::unique_lock<std::mutex> lock(m_Mutex);
return m_CV.wait_for(lock, ch::duration<int>(timeout), [this]() { return IsDataAvailable() || IsEof(); });
}
void Stream::Close()
{
OnDataAvailable.disconnect_all_slots();
/* Force signals2 to remove the slots, see https://stackoverflow.com/questions/2049291/force-deletion-of-slot-in-boostsignals2
* for details. */
OnDataAvailable.connect([](const Stream::Ptr&) { });
}
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
if (!context.FillFromStream(this, may_wait)) {
context.Eof = true;
*line = String(context.Buffer, &(context.Buffer[context.Size]));
boost::algorithm::trim_right(*line);
return StatusNewItem;
}
}
for (size_t i = 0; i < context.Size; i++) {
if (context.Buffer[i] == '\n') {
*line = String(context.Buffer, context.Buffer + i);
boost::algorithm::trim_right(*line);
context.DropData(i + 1u);
context.MustRead = !context.Size;
return StatusNewItem;
}
}
context.MustRead = true;
return StatusNeedData;
}
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
{
if (may_wait && stream->SupportsWaiting())
stream->WaitForData();
size_t count = 0;
do {
Buffer = (char *)realloc(Buffer, Size + 4096);
if (!Buffer)
throw std::bad_alloc();
if (stream->IsEof())
break;
size_t rc = stream->Read(Buffer + Size, 4096);
Size += rc;
count += rc;
} while (count < 64 * 1024 && stream->IsDataAvailable());
if (count == 0 && stream->IsEof())
return false;
else
return true;
}
void StreamReadContext::DropData(size_t count)
{
ASSERT(count <= Size);
memmove(Buffer, Buffer + count, Size - count);
Size -= count;
}