Merge pull request #49352 from nextcloud/s3-disable-multipart

improve handling of large single-part s3 uploads
This commit is contained in:
Julius Knorr 2024-12-06 09:22:07 +01:00 committed by GitHub
commit 3328cea2ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 64 additions and 21 deletions

View file

@ -75,6 +75,10 @@ class AssemblyStream implements \Icewind\Streams\File {
$offset = $this->size + $offset;
}
if ($offset === $this->pos) {
return true;
}
if ($offset > $this->size) {
return false;
}
@ -95,7 +99,7 @@ class AssemblyStream implements \Icewind\Streams\File {
$stream = $this->getStream($this->nodes[$nodeIndex]);
$nodeOffset = $offset - $nodeStart;
if (fseek($stream, $nodeOffset) === -1) {
if ($nodeOffset > 0 && fseek($stream, $nodeOffset) === -1) {
return false;
}
$this->currentNode = $nodeIndex;
@ -126,9 +130,14 @@ class AssemblyStream implements \Icewind\Streams\File {
}
}
do {
$collectedData = '';
// read data until we either got all the data requested or there is no more stream left
while ($count > 0 && !is_null($this->currentStream)) {
$data = fread($this->currentStream, $count);
$read = strlen($data);
$count -= $read;
$collectedData .= $data;
$this->currentNodeRead += $read;
if (feof($this->currentStream)) {
@ -145,14 +154,11 @@ class AssemblyStream implements \Icewind\Streams\File {
$this->currentStream = null;
}
}
// if no data read, try again with the next node because
// returning empty data can make the caller think there is no more
// data left to read
} while ($read === 0 && !is_null($this->currentStream));
}
// update position
$this->pos += $read;
return $data;
$this->pos += strlen($collectedData);
return $collectedData;
}
/**

View file

@ -25,12 +25,16 @@ class AssemblyStreamTest extends \Test\TestCase {
/**
* @dataProvider providesNodes()
*/
public function testGetContentsFread($expected, $nodes): void {
public function testGetContentsFread($expected, $nodes, $chunkLength = 3): void {
$stream = AssemblyStream::wrap($nodes);
$content = '';
while (!feof($stream)) {
$content .= fread($stream, 3);
$chunk = fread($stream, $chunkLength);
$content .= $chunk;
if ($chunkLength !== 3) {
$this->assertEquals($chunkLength, strlen($chunk));
}
}
$this->assertEquals($expected, $content);
@ -103,7 +107,19 @@ class AssemblyStreamTest extends \Test\TestCase {
]],
'a ton of nodes' => [
$tonofdata, $tonofnodes
]
],
'one read over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 10],
'two reads over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 5],
];
}

View file

@ -457,6 +457,14 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common implements IChunkedFil
}
public function writeStream(string $path, $stream, ?int $size = null): int {
if ($size === null) {
$stats = fstat($stream);
if (is_array($stats) && isset($stats['size'])) {
$size = $stats['size'];
$this->logger->warning("stream size $size");
}
}
$stat = $this->stat($path);
if (empty($stat)) {
// create new file

View file

@ -140,20 +140,33 @@ trait S3ObjectTrait {
* @since 7.0.0
*/
public function writeObject($urn, $stream, ?string $mimetype = null) {
$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
$psrStream = Utils::streamFor($stream);
// ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream
// so the optimisation does not apply
$buffer = new Psr7\Stream(fopen('php://memory', 'rwb+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
$size = $psrStream->getSize();
if ($size === null || !$canSeek) {
// The s3 single-part upload requires the size to be known for the stream.
// So for input streams that don't have a known size, we need to copy (part of)
// the input into a temporary stream so the size can be determined
$buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
}
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
if ($size < $this->putSizeLimit) {
$this->writeSingle($urn, $psrStream, $mimetype);
} else {
$this->writeMultiPart($urn, $psrStream, $mimetype);
}
}
$psrStream->close();
}
/**