Skip to content

Simplify usage by supporting new default loop #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 48 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ Re-attach the data source after a previous `pause()`.
```php
$stream->pause();

$loop->addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream) {
$stream->resume();
});
```
Expand Down Expand Up @@ -737,7 +737,7 @@ stream in order to stop waiting for the stream to flush its final data.

```php
$stream->end();
$loop->addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream) {
$stream->close();
});
```
Expand Down Expand Up @@ -821,7 +821,7 @@ This can be used to represent a read-only resource like a file stream opened in
readable mode or a stream such as `STDIN`:

```php
$stream = new ReadableResourceStream(STDIN, $loop);
$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function ($chunk) {
echo $chunk;
});
Expand All @@ -838,7 +838,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new ReadableResourceStream(false, $loop);
$stream = new ReadableResourceStream(false);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
Expand All @@ -851,14 +851,20 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN, $loop);
$stream = new ReadableResourceStream(STDIN);
```

Once the constructor is called with a valid stream resource, this class will
take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
Expand All @@ -874,7 +880,7 @@ This should read until the stream resource is not readable anymore
mean it reached EOF.

```php
$stream = new ReadableResourceStream(STDIN, $loop, 8192);
$stream = new ReadableResourceStream(STDIN, null, 8192);
```

> PHP bug warning: If the PHP process has explicitly been started without a
Expand All @@ -883,6 +889,9 @@ $stream = new ReadableResourceStream(STDIN, $loop, 8192);
stream like `php test.php < /dev/null` instead of `php test.php <&-`.
See [#81](https://github.com/reactphp/stream/issues/81) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### WritableResourceStream

The `WritableResourceStream` is a concrete implementation of the
Expand All @@ -892,7 +901,7 @@ This can be used to represent a write-only resource like a file stream opened in
writable mode or a stream such as `STDOUT` or `STDERR`:

```php
$stream = new WritableResourceStream(STDOUT, $loop);
$stream = new WritableResourceStream(STDOUT);
$stream->write('hello!');
$stream->end();
```
Expand All @@ -905,7 +914,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new WritableResourceStream(false, $loop);
$stream = new WritableResourceStream(false);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
Expand All @@ -918,7 +927,7 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT, $loop);
$stream = new WritableResourceStream(STDOUT);
```

Once the constructor is called with a valid stream resource, this class will
Expand All @@ -933,13 +942,19 @@ For this, it uses an in-memory buffer string to collect all outstanding writes.
This buffer has a soft-limit applied which defines how much data it is willing
to accept before the caller SHOULD stop sending further data.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $writeBufferSoftLimit` parameter that controls
this maximum buffer size in bytes.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.

```php
$stream = new WritableResourceStream(STDOUT, $loop, 8192);
$stream = new WritableResourceStream(STDOUT, null, 8192);
```

This class takes an optional `int|null $writeChunkSize` parameter that controls
Expand All @@ -954,11 +969,14 @@ This can be `-1` which means "write everything available" to the
underlying stream resource.

```php
$stream = new WritableResourceStream(STDOUT, $loop, null, 8192);
$stream = new WritableResourceStream(STDOUT, null, null, 8192);
```

See also [`write()`](#write) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### DuplexResourceStream

The `DuplexResourceStream` is a concrete implementation of the
Expand All @@ -969,7 +987,7 @@ in read and write mode mode or a stream such as a TCP/IP connection:

```php
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop);
$stream = new DuplexResourceStream($conn);
$stream->write('hello!');
$stream->end();
```
Expand All @@ -982,7 +1000,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new DuplexResourceStream(false, $loop);
$stream = new DuplexResourceStream(false);
```

See also the [`ReadableResourceStream`](#readableresourcestream) for read-only
Expand All @@ -996,14 +1014,20 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT, $loop);
$stream = new DuplexResourceStream(STDOUT);
```

Once the constructor is called with a valid stream resource, this class will
take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
Expand All @@ -1020,7 +1044,7 @@ mean it reached EOF.

```php
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, null, 8192);
```

Any `write()` calls to this class will not be performed instantly, but will
Expand All @@ -1040,12 +1064,15 @@ If you want to change the write buffer soft limit, you can pass an instance of

```php
$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, $loop, null, $buffer);
$buffer = new WritableResourceStream($conn, null, 8192);
$stream = new DuplexResourceStream($conn, null, null, $buffer);
```

See also [`WritableResourceStream`](#writableresourcestream) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### ThroughStream

The `ThroughStream` implements the
Expand Down Expand Up @@ -1123,8 +1150,8 @@ This is useful for some APIs which may require a single
more convenient to work with a single stream instance like this:

```php
$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT, $loop);
$stdin = new ReadableResourceStream(STDIN);
$stdout = new WritableResourceStream(STDOUT);

$stdio = new CompositeStream($stdin, $stdout);

Expand Down Expand Up @@ -1154,14 +1181,10 @@ The following example can be used to pipe the contents of a source file into
a destination file without having to ever read the whole file into memory:

```php
$loop = new React\EventLoop\StreamSelectLoop;

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'), $loop);
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'), $loop);
$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'));
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'));

$source->pipe($dest);

$loop->run();
```

> Note that this example uses `fopen()` for illustration purposes only.
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
],
"require": {
"php": ">=5.3.8",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5",
"react/event-loop": "^1.2",
"evenement/evenement": "^3.0 || ^2.0 || ^1.0"
},
"require-dev": {
Expand Down
6 changes: 1 addition & 5 deletions examples/01-http.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// $ php examples/01-http.php
// $ php examples/01-http.php reactphp.org

use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;

require __DIR__ . '/../vendor/autoload.php';
Expand All @@ -25,8 +24,7 @@
exit(1);
}

$loop = Factory::create();
$stream = new DuplexResourceStream($resource, $loop);
$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
echo $chunk;
Expand All @@ -36,5 +34,3 @@
});

$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n");

$loop->run();
6 changes: 1 addition & 5 deletions examples/02-https.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// $ php examples/02-https.php
// $ php examples/02-https.php reactphp.org

use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;

require __DIR__ . '/../vendor/autoload.php';
Expand All @@ -25,8 +24,7 @@
exit(1);
}

$loop = Factory::create();
$stream = new DuplexResourceStream($resource, $loop);
$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
echo $chunk;
Expand All @@ -36,5 +34,3 @@
});

$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n");

$loop->run();
9 changes: 2 additions & 7 deletions examples/11-cat.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// $ php examples/11-cat.php < README.md
// $ echo hello | php examples/11-cat.php

use React\EventLoop\Factory;
use React\Stream\ReadableResourceStream;
use React\Stream\WritableResourceStream;

Expand All @@ -19,10 +18,6 @@
exit(1);
}

$loop = Factory::create();

$stdout = new WritableResourceStream(STDOUT, $loop);
$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT);
$stdin = new ReadableResourceStream(STDIN);
$stdin->pipe($stdout);

$loop->run();
18 changes: 8 additions & 10 deletions examples/91-benchmark-throughput.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// $ php examples/91-benchmark-throughput.php -t 10 -o zero.bin
// $ php examples/91-benchmark-throughput.php -t 60 -i zero.bin

use React\EventLoop\Loop;

require __DIR__ . '/../vendor/autoload.php';

if (DIRECTORY_SEPARATOR === '\\') {
Expand All @@ -27,36 +29,32 @@
$if = str_replace('/dev/fd/', 'php://fd/', $if);
$of = str_replace('/dev/fd/', 'php://fd/', $of);

$loop = new React\EventLoop\StreamSelectLoop();

// setup information stream
$info = new React\Stream\WritableResourceStream(STDERR, $loop);
$info = new React\Stream\WritableResourceStream(STDERR);
if (extension_loaded('xdebug')) {
$info->write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL);
}
$info->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL);

// setup input and output streams and pipe inbetween
$fh = fopen($if, 'r');
$in = new React\Stream\ReadableResourceStream($fh, $loop);
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'), $loop);
$in = new React\Stream\ReadableResourceStream($fh);
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'));
$in->pipe($out);

// stop input stream in $t seconds
$start = microtime(true);
$timeout = $loop->addTimer($t, function () use ($in, &$bytes) {
$timeout = Loop::addTimer($t, function () use ($in) {
$in->close();
});

// print stream position once stream closes
$in->on('close', function () use ($fh, $start, $loop, $timeout, $info) {
$in->on('close', function () use ($fh, $start, $timeout, $info) {
$t = microtime(true) - $start;
$loop->cancelTimer($timeout);
Loop::cancelTimer($timeout);

$bytes = ftell($fh);

$info->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL);
$info->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL);
});

$loop->run();
7 changes: 5 additions & 2 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
namespace React\Stream;

use Evenement\EventEmitter;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use InvalidArgumentException;

final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
{
private $stream;

/** @var LoopInterface */
private $loop;

/**
Expand All @@ -35,7 +38,7 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt
private $closing = false;
private $listening = false;

public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null)
public function __construct($stream, LoopInterface $loop = null, $readChunkSize = null, WritableStreamInterface $buffer = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand Down Expand Up @@ -70,7 +73,7 @@ public function __construct($stream, LoopInterface $loop, $readChunkSize = null,
}

$this->stream = $stream;
$this->loop = $loop;
$this->loop = $loop ?: Loop::get();
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->buffer = $buffer;

Expand Down
Loading