assets
data
dependencies
amphp
amp
byte-stream
lib
Base64
ClosedException.php
InMemoryStream.php
InputStream.php
InputStreamChain.php
IteratorStream.php
LineReader.php
Message.php
OutputBuffer.php
OutputStream.php
Payload.php
PendingReadError.php
ResourceInputStream.php
ResourceOutputStream.php
StreamException.php
ZlibInputStream.php
ZlibOutputStream.php
functions.php
psalm.xml
cache
dns
hpack
http
http-client
parser
process
serialization
socket
sync
windows-registry
berlindb
composer
daverandom
delight-im
doctrine
guzzlehttp
hashids
ifsnop
jasny
kelunik
league
mexitek
mpdf
myclabs
nesbot
nyholm
pablo-sg-pacheco
paragonie
phpdocumentor
phpseclib
phpstan
psr
rakit
ralouphie
remotelyliving
rpnzl
scssphp
setasign
spatie
stripe
symfony
webmozart
woocommerce
yahnis-elsts
autoload.php
scoper-autoload.php
inc
lang
views
.gitignore
LICENSE
autoload.php
composer.json
constants.php
loco.xml
readme.txt
sunrise.php
uninstall.php
wp-multisite-waas.php
81 lines
2.5 KiB
PHP
81 lines
2.5 KiB
PHP
<?php
|
|
|
|
namespace WP_Ultimo\Dependencies\Amp\ByteStream;
|
|
|
|
use WP_Ultimo\Dependencies\Amp\Coroutine;
|
|
use WP_Ultimo\Dependencies\Amp\Promise;
|
|
use function WP_Ultimo\Dependencies\Amp\call;
|
|
/**
|
|
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
|
|
* be buffered and accessed in its entirety by calling buffer(). Once buffering is requested through buffer(), the
|
|
* stream cannot be read in chunks. On destruct any remaining data is read from the InputStream given to this class.
|
|
*/
|
|
class Payload implements InputStream
|
|
{
|
|
/** @var InputStream */
|
|
private $stream;
|
|
/** @var \Amp\Promise|null */
|
|
private $promise;
|
|
/** @var \Amp\Promise|null */
|
|
private $lastRead;
|
|
/**
|
|
* @param \Amp\ByteStream\InputStream $stream
|
|
*/
|
|
public function __construct(InputStream $stream)
|
|
{
|
|
$this->stream = $stream;
|
|
}
|
|
public function __destruct()
|
|
{
|
|
if (!$this->promise) {
|
|
Promise\rethrow(new Coroutine($this->consume()));
|
|
}
|
|
}
|
|
private function consume() : \Generator
|
|
{
|
|
try {
|
|
if ($this->lastRead && null === (yield $this->lastRead)) {
|
|
return;
|
|
}
|
|
while (null !== (yield $this->stream->read())) {
|
|
// Discard unread bytes from message.
|
|
}
|
|
} catch (\Throwable $exception) {
|
|
// If exception is thrown here the connection closed anyway.
|
|
}
|
|
}
|
|
/**
|
|
* @inheritdoc
|
|
*
|
|
* @throws \Error If a buffered message was requested by calling buffer().
|
|
*/
|
|
public final function read() : Promise
|
|
{
|
|
if ($this->promise) {
|
|
throw new \Error("Cannot stream message data once a buffered message has been requested");
|
|
}
|
|
return $this->lastRead = $this->stream->read();
|
|
}
|
|
/**
|
|
* Buffers the entire message and resolves the returned promise then.
|
|
*
|
|
* @return Promise<string> Resolves with the entire message contents.
|
|
*/
|
|
public final function buffer() : Promise
|
|
{
|
|
if ($this->promise) {
|
|
return $this->promise;
|
|
}
|
|
return $this->promise = call(function () {
|
|
$buffer = '';
|
|
if ($this->lastRead && null === (yield $this->lastRead)) {
|
|
return $buffer;
|
|
}
|
|
while (null !== ($chunk = (yield $this->stream->read()))) {
|
|
$buffer .= $chunk;
|
|
}
|
|
return $buffer;
|
|
});
|
|
}
|
|
}
|