Skip to content

Commit 4da77db

Browse files
committed
Support sending streaming request
1 parent 25bd3cb commit 4da77db

File tree

5 files changed

+124
-10
lines changed

5 files changed

+124
-10
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,19 @@ $body->read(); // throws BadMethodCallException
262262
$body->getContents(); // throws BadMethodCallException
263263
```
264264

265+
Besides streaming the response body, you can also stream the request body.
266+
This can be useful if you want to send big POST requests (uploading files etc.)
267+
or process many outgoing streams at once.
268+
Instead of passing the body as a string, you can simply pass an instance
269+
implementing React's [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
270+
to the [HTTP methods](#methods) like this:
271+
272+
```php
273+
$browser->post($url, array(), $stream)->then(function (ResponseInterface $response) {
274+
echo 'Successfully sent.';
275+
});
276+
```
277+
265278
#### submit()
266279

267280
The `submit($url, array $fields, $headers = array(), $method = 'POST')` method can be used to submit an array of field values similar to submitting a form (`application/x-www-form-urlencoded`).

examples/stream-stdin.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
use Clue\React\Buzz\Browser;
4+
use React\Stream\ReadableStreamInterface;
5+
use Psr\Http\Message\ResponseInterface;
6+
use React\Stream\Stream;
7+
use RingCentral\Psr7;
8+
9+
$url = isset($argv[1]) ? $argv[1] : 'https://httpbin.org/post';
10+
11+
require __DIR__ . '/../vendor/autoload.php';
12+
13+
$loop = React\EventLoop\Factory::create();
14+
$client = new Browser($loop);
15+
16+
$in = new Stream(STDIN, $loop);
17+
18+
echo 'Sending STDIN as POST to ' . $url . '' . PHP_EOL;
19+
20+
$client->post($url, array(), $in)->then(function (ResponseInterface $response) {
21+
echo 'Received' . PHP_EOL . Psr7\str($response);
22+
}, 'printf');
23+
24+
$loop->run();

src/Io/Sender.php

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use React\Dns\Resolver\Resolver;
1818
use React\Promise;
1919
use Clue\React\Buzz\Message\MessageFactory;
20+
use React\Stream\ReadableStreamInterface;
2021

2122
class Sender
2223
{
@@ -115,11 +116,15 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
115116
return Promise\reject(new \InvalidArgumentException('Sending request requires absolute URI with scheme and host'));
116117
}
117118

118-
$body = (string)$request->getBody();
119+
$body = $request->getBody();
119120

120-
// automatically assign a Content-Length header if the body is not empty
121-
if ($body !== '' && $request->hasHeader('Content-Length') !== null) {
122-
$request = $request->withHeader('Content-Length', strlen($body));
121+
// automatically assign a Content-Length header if the body size is known
122+
if ($body->getSize() !== null && $body->getSize() !== 0 && $request->hasHeader('Content-Length') !== null) {
123+
$request = $request->withHeader('Content-Length', $body->getSize());
124+
}
125+
126+
if ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
127+
$request = $request->withHeader('Transfer-Encoding', 'chunked');
123128
}
124129

125130
$headers = array();
@@ -146,7 +151,29 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
146151
));
147152
});
148153

149-
$requestStream->end($body);
154+
if ($body instanceof ReadableStreamInterface) {
155+
if ($body->isReadable()) {
156+
if ($request->hasHeader('Content-Length')) {
157+
// length is known => just write to request
158+
$body->pipe($requestStream);
159+
} else {
160+
// length unknown => apply chunked transfer-encoding
161+
// this should be moved somewhere else obviously
162+
$body->on('data', function ($data) use ($requestStream) {
163+
$requestStream->write(dechex(strlen($data)) . "\r\n" . $data . "\r\n");
164+
});
165+
$body->on('end', function() use ($requestStream) {
166+
$requestStream->end("0\r\n\r\n");
167+
});
168+
}
169+
} else {
170+
// stream is not readable => end request without body
171+
$requestStream->end();
172+
}
173+
} else {
174+
// body is fully buffered => write as one chunk
175+
$requestStream->end((string)$body);
176+
}
150177

151178
return $deferred->promise();
152179
}

src/Message/MessageFactory.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ class MessageFactory
1717
/**
1818
* Creates a new instance of RequestInterface for the given request parameters
1919
*
20-
* @param string $method
21-
* @param string|UriInterface $uri
22-
* @param array $headers
23-
* @param string $content
20+
* @param string $method
21+
* @param string|UriInterface $uri
22+
* @param array $headers
23+
* @param string|ReadableStreamInterface $content
2424
* @return RequestInterface
2525
*/
2626
public function request($method, $uri, $headers = array(), $content = '')
2727
{
28-
return new Request($method, $uri, $headers, $content);
28+
return new Request($method, $uri, $headers, $this->body($content));
2929
}
3030

3131
/**

tests/FunctionalBrowserTest.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use React\SocketClient\DnsConnector;
1010
use Clue\React\Buzz\Message\ResponseException;
1111
use Clue\React\Block;
12+
use React\Stream\ReadableStream;
1213

1314
class FunctionalBrowserTest extends TestCase
1415
{
@@ -130,4 +131,53 @@ public function testErrorStatusCodeRejectsWithResponseException()
130131
$this->assertEquals(404, $e->getResponse()->getStatusCode());
131132
}
132133
}
134+
135+
public function testPostString()
136+
{
137+
$response = Block\await($this->browser->post($this->base . 'post', array(), 'hello world'), $this->loop);
138+
$data = json_decode((string)$response->getBody(), true);
139+
140+
$this->assertEquals('hello world', $data['data']);
141+
}
142+
143+
public function testPostStreamChunked()
144+
{
145+
$stream = new ReadableStream();
146+
147+
$this->loop->addTimer(0.001, function () use ($stream) {
148+
$stream->emit('data', array('hello world'));
149+
$stream->close();
150+
});
151+
152+
$response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop);
153+
$data = json_decode((string)$response->getBody(), true);
154+
155+
$this->assertEquals('hello world', $data['data']);
156+
}
157+
158+
public function testPostStreamKnownLength()
159+
{
160+
$stream = new ReadableStream();
161+
162+
$this->loop->addTimer(0.001, function () use ($stream) {
163+
$stream->emit('data', array('hello world'));
164+
$stream->close();
165+
});
166+
167+
$response = Block\await($this->browser->post($this->base . 'post', array('Content-Length' => 11), $stream), $this->loop);
168+
$data = json_decode((string)$response->getBody(), true);
169+
170+
$this->assertEquals('hello world', $data['data']);
171+
}
172+
173+
public function testPostStreamClosed()
174+
{
175+
$stream = new ReadableStream();
176+
$stream->close();
177+
178+
$response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop);
179+
$data = json_decode((string)$response->getBody(), true);
180+
181+
$this->assertEquals('', $data['data']);
182+
}
133183
}

0 commit comments

Comments
 (0)