Skip to content

Commit 25bd3cb

Browse files
committed
Support streaming=true option to resolve response without buffering body
1 parent c68a55b commit 25bd3cb

File tree

5 files changed

+231
-21
lines changed

5 files changed

+231
-21
lines changed

README.md

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mess with most of the low-level details.
3535
* [Methods](#methods)
3636
* [Promises](#promises)
3737
* [Blocking](#blocking)
38+
* [Streaming](#streaming)
3839
* [submit()](#submit)
3940
* [send()](#send)
4041
* [withOptions()](#withoptions)
@@ -53,7 +54,6 @@ mess with most of the low-level details.
5354
* [SOCKS proxy](#socks-proxy)
5455
* [UNIX domain sockets](#unix-domain-sockets)
5556
* [Options](#options)
56-
* [Streaming](#streaming)
5757
* [Install](#install)
5858
* [License](#license)
5959

@@ -129,6 +129,18 @@ $browser->get($url)->then(
129129

130130
If this looks strange to you, you can also use the more traditional [blocking API](#blocking).
131131

132+
Keep in mind that resolving the Promise with the full response message means the
133+
whole response body has to be kept in memory.
134+
This is easy to get started and works reasonably well for smaller responses
135+
(such as common HTML pages or RESTful or JSON API requests).
136+
137+
You may also want to look into the [streaming API](#streaming):
138+
139+
* If you're dealing with lots of concurrent requests (100+) or
140+
* If you want to process individual data chunks as they happen (without having to wait for the full response body) or
141+
* If you're expecting a big response body size (1 MiB or more, for example when downloading binary files) or
142+
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).
143+
132144
#### Blocking
133145

134146
As stated above, this library provides you a powerful, async API by default.
@@ -167,6 +179,89 @@ $responses = Block\awaitAll($promises, $loop);
167179

168180
Please refer to [clue/block-react](https://github.com/clue/php-block-react#readme) for more details.
169181

182+
Keep in mind the above remark about buffering the whole response message in memory.
183+
As an alternative, you may also see the following chapter for the
184+
[streaming API](#streaming).
185+
186+
#### Streaming
187+
188+
All of the above examples assume you want to store the whole response body in memory.
189+
This is easy to get started and works reasonably well for smaller responses.
190+
191+
However, there are several situations where it's usually a better idea to use a
192+
streaming approach, where only small chunks have to be kept in memory:
193+
194+
* If you're dealing with lots of concurrent requests (100+) or
195+
* If you want to process individual data chunks as they happen (without having to wait for the full response body) or
196+
* If you're expecting a big response body size (1 MiB or more, for example when downloading binary files) or
197+
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).
198+
199+
The streaming API uses the same HTTP message API, but does not buffer the response
200+
message body in memory.
201+
It only processes the response body in small chunks as data is received and
202+
forwards this data through [React's Stream API](https://github.com/reactphp/stream).
203+
This works for (any number of) responses of arbitrary sizes.
204+
205+
This resolves with a normal [`ResponseInterface`](#responseinterface), which
206+
can be used access the response message parameters as usual.
207+
You can access the message body as usual, however it now
208+
implements React's [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
209+
as well as parts of the PSR-7's [`StreamInterface`](http://www.php-fig.org/psr/psr-7/#3-4-psr-http-message-streaminterface).
210+
211+
```php
212+
// turn on streaming responses (does no longer buffer response body)
213+
$streamingBrowser = $browser->withOptions(array('streaming' => true));
214+
215+
// issue a normal GET request
216+
$streamingBrowser->get($url)->then(function (ResponseInterface $response) {
217+
$body = $response->getBody();
218+
/* @var $body \React\Stream\ReadableStreamInterface */
219+
220+
$body->on('data', function ($chunk) {
221+
echo $chunk;
222+
});
223+
224+
$body->on('error', function (Exception $error) {
225+
echo 'Error: ' . $error->getMessage() . PHP_EOL;
226+
});
227+
228+
$body->on('close', function () {
229+
echo '[DONE]' . PHP_EOL;
230+
});
231+
});
232+
```
233+
234+
See also the [stream bandwith example](examples/stream-bandwidth.php) and
235+
the [stream forwarding example](examples/stream-forwarding.php).
236+
237+
You can invoke the following methods on the message body:
238+
239+
```php
240+
$body->on($event, $callback);
241+
$body->eof();
242+
$body->isReadable();
243+
$body->close();
244+
$body->pause();
245+
$body->resume();
246+
```
247+
248+
Because the message body is in a streaming state, invoking the following methods
249+
doesn't make much sense:
250+
251+
```php
252+
$body->__toString(); // ''
253+
$body->detach(); // throws BadMethodCallException
254+
$body->getSize(); // null
255+
$body->tell(); // throws BadMethodCallException
256+
$body->isSeekable(); // false
257+
$body->seek(); // throws BadMethodCallException
258+
$body->rewind(); // throws BadMethodCallException
259+
$body->isWritable(); // false
260+
$body->write(); // throws BadMethodCallException
261+
$body->read(); // throws BadMethodCallException
262+
$body->getContents(); // throws BadMethodCallException
263+
```
264+
170265
#### submit()
171266

172267
The `submit($url, array $fields, $headers = array(), $method = 'POST')` method can be used to submit an array of field values similar to submitting a form (`application/x-www-form-urlencoded`).
@@ -433,31 +528,14 @@ can be controlled via the following API (and their defaults):
433528
$newBrowser = $browser->withOptions(array(
434529
'followRedirects' => true,
435530
'maxRedirects' => 10,
436-
'obeySuccessCode' => true
531+
'obeySuccessCode' => true,
532+
'streaming' => false,
437533
));
438534
```
439535

440536
Notice that the [`Browser`](#browser) is an immutable object, i.e. the `withOptions()` method
441537
actually returns a *new* [`Browser`](#browser) instance with the options applied.
442538

443-
### Streaming
444-
445-
Note: This API is subject to change.
446-
447-
The [`Sender`](#sender) emits a `progress` event array on its `Promise` that can be used
448-
to intercept the underlying outgoing request stream (`React\HttpClient\Request` in the `requestStream` key)
449-
and the incoming response stream (`React\HttpClient\Response` in the `responseStream` key).
450-
451-
```php
452-
$client->get('http://www.google.com/')->then($handler, null, function ($event) {
453-
if (isset($event['responseStream'])) {
454-
/* @var $stream React\HttpClient\Response */
455-
$stream = $event['responseStream'];
456-
$stream->on('data', function ($data) { });
457-
}
458-
});
459-
```
460-
461539
## Install
462540

463541
The recommended way to install this library is [through Composer](http://getcomposer.org).

examples/stream-bandwidth.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
use Clue\React\Buzz\Browser;
4+
use Psr\Http\Message\ResponseInterface;
5+
use React\Stream\ReadableStreamInterface;
6+
use RingCentral\Psr7;
7+
8+
$url = isset($argv[1]) ? $argv[1] : 'http://google.com/';
9+
10+
require __DIR__ . '/../vendor/autoload.php';
11+
12+
$loop = React\EventLoop\Factory::create();
13+
$client = new Browser($loop);
14+
15+
echo 'Requesting ' . $url . '' . PHP_EOL;
16+
17+
$client->withOptions(array('streaming' => true))->get($url)->then(function (ResponseInterface $response) use ($loop) {
18+
echo 'Headers received' . PHP_EOL;
19+
echo Psr7\str($response);
20+
21+
$stream = $response->getBody();
22+
if (!$stream instanceof ReadableStreamInterface) {
23+
throw new UnexpectedValueException();
24+
}
25+
26+
// count number of bytes received
27+
$bytes = 0;
28+
$stream->on('data', function ($chunk) use (&$bytes) {
29+
$bytes += strlen($chunk);
30+
});
31+
32+
// report progress every 0.1s
33+
$timer = $loop->addPeriodicTimer(0.1, function () use (&$bytes) {
34+
echo "\rDownloaded " . $bytes . " bytes…";
35+
});
36+
37+
// report results once the stream closes
38+
$time = microtime(true);
39+
$stream->on('close', function() use (&$bytes, $timer, $loop, $time) {
40+
$loop->cancelTimer($timer);
41+
42+
$time = microtime(true) - $time;
43+
44+
echo "\r" . 'Downloaded ' . $bytes . ' bytes in ' . round($time, 3) . 's => ' . round($bytes / $time / 1024 / 1024, 1) . ' MiB/s' . PHP_EOL;
45+
});
46+
}, 'printf');
47+
48+
$loop->run();

examples/stream-forwarding.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
use Clue\React\Buzz\Browser;
4+
use React\Stream\ReadableStreamInterface;
5+
use Psr\Http\Message\ResponseInterface;
6+
use React\Stream\Stream;
7+
use RingCentral\Psr7;
8+
9+
$url = isset($argv[1]) ? $argv[1] : 'http://google.com/';
10+
11+
require __DIR__ . '/../vendor/autoload.php';
12+
13+
$loop = React\EventLoop\Factory::create();
14+
$client = new Browser($loop);
15+
16+
$out = new Stream(STDOUT, $loop);
17+
$out->pause();
18+
19+
$info = new Stream(STDERR, $loop);
20+
$info->pause();
21+
22+
$info->write('Requesting ' . $url . '' . PHP_EOL);
23+
24+
$client->withOptions(array('streaming' => true))->get($url)->then(function (ResponseInterface $response) use ($info, $out) {
25+
$info->write('Received' . PHP_EOL . Psr7\str($response));
26+
27+
$response->getBody()->pipe($out);
28+
}, 'printf');
29+
30+
$loop->run();

src/Io/Transaction.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class Transaction
3535
// context: http.ignore_errors
3636
private $obeySuccessCode = true;
3737

38+
private $streaming = false;
39+
3840
public function __construct(RequestInterface $request, Sender $sender, array $options = array(), MessageFactory $messageFactory)
3941
{
4042
foreach ($options as $name => $value) {
@@ -62,7 +64,10 @@ protected function next(RequestInterface $request)
6264

6365
return $this->sender->send($request, $this->messageFactory)->then(
6466
function (ResponseInterface $response) use ($that) {
65-
return $that->bufferResponse($response);
67+
if (!$that->streaming) {
68+
return $that->bufferResponse($response);
69+
}
70+
return $response;
6671
}
6772
)->then(
6873
function (ResponseInterface $response) use ($request, $that) {

tests/Io/TransactionTest.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use Clue\React\Buzz\Message\MessageFactory;
77
use React\Promise;
88
use Clue\React\Block;
9+
use React\EventLoop\Factory;
10+
use React\Stream\ReadableStream;
911

1012
class TransactionTest extends TestCase
1113
{
@@ -29,4 +31,51 @@ public function testReceivingErrorResponseWillRejectWithResponseException()
2931
$this->assertSame($response, $exception->getResponse());
3032
}
3133
}
34+
35+
public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefault()
36+
{
37+
$messageFactory = new MessageFactory();
38+
$loop = Factory::create();
39+
40+
$stream = new ReadableStream();
41+
$loop->addTimer(0.001, function () use ($stream) {
42+
$stream->emit('data', array('hello world'));
43+
$stream->close();
44+
});
45+
46+
$request = $this->getMock('Psr\Http\Message\RequestInterface');
47+
$response = $messageFactory->response(1.0, 200, 'OK', array(), $stream);
48+
49+
// mock sender to resolve promise with the given $response in response to the given $request
50+
$sender = $this->getMockBuilder('Clue\React\Buzz\Io\Sender')->disableOriginalConstructor()->getMock();
51+
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));
52+
53+
$transaction = new Transaction($request, $sender, array(), $messageFactory);
54+
$promise = $transaction->send();
55+
56+
$response = Block\await($promise, $loop);
57+
58+
$this->assertEquals(200, $response->getStatusCode());
59+
$this->assertEquals('hello world', (string)$response->getBody());
60+
}
61+
62+
public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled()
63+
{
64+
$messageFactory = new MessageFactory();
65+
66+
$request = $this->getMock('Psr\Http\Message\RequestInterface');
67+
$response = $messageFactory->response(1.0, 200, 'OK', array(), $this->getMock('React\Stream\ReadableStreamInterface'));
68+
69+
// mock sender to resolve promise with the given $response in response to the given $request
70+
$sender = $this->getMockBuilder('Clue\React\Buzz\Io\Sender')->disableOriginalConstructor()->getMock();
71+
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));
72+
73+
$transaction = new Transaction($request, $sender, array('streaming' => true), $messageFactory);
74+
$promise = $transaction->send();
75+
76+
$response = Block\await($promise, $this->getMock('React\EventLoop\LoopInterface'));
77+
78+
$this->assertEquals(200, $response->getStatusCode());
79+
$this->assertEquals('', (string)$response->getBody());
80+
}
3281
}

0 commit comments

Comments
 (0)