forked from reactphp/datagram
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFactory.php
More file actions
133 lines (107 loc) · 3.91 KB
/
Factory.php
File metadata and controls
133 lines (107 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
<?php
namespace React\Datagram;
use React\Datagram\Socket;
use React\Dns\Resolver\Factory as DnsFactory;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\Promise;
use React\Promise\CancellablePromiseInterface;
use \Exception;
class Factory
{
protected $loop;
protected $resolver;
/**
*
* @param LoopInterface $loop
* @param Resolver|null $resolver Resolver instance to use. Will otherwise
* default to using Google's public DNS 8.8.8.8
*/
public function __construct(LoopInterface $loop, Resolver $resolver = null)
{
if ($resolver === null) {
$factory = new DnsFactory();
$resolver = $factory->create('8.8.8.8', $loop);
}
$this->loop = $loop;
$this->resolver = $resolver;
}
public function createClient($address)
{
$loop = $this->loop;
return $this->resolveAddress($address)->then(function ($address) use ($loop) {
$socket = @stream_socket_client($address, $errno, $errstr);
if (!$socket) {
throw new Exception('Unable to create client socket: ' . $errstr, $errno);
}
return new Socket($loop, $socket);
});
}
public function createServer($address)
{
$loop = $this->loop;
return $this->resolveAddress($address)->then(function ($address) use ($loop) {
$socket = @stream_socket_server($address, $errno, $errstr, STREAM_SERVER_BIND);
if (!$socket) {
throw new Exception('Unable to create server socket: ' . $errstr, $errno);
}
return new Socket($loop, $socket);
});
}
protected function resolveAddress($address)
{
if (strpos($address, '://') === false) {
$address = 'udp://' . $address;
}
// parse_url() does not accept null ports (random port assignment) => manually remove
$nullport = false;
if (substr($address, -2) === ':0') {
$address = substr($address, 0, -2);
$nullport = true;
}
$parts = parse_url($address);
if (!$parts || !isset($parts['host'])) {
return Promise\resolve($address);
}
if ($nullport) {
$parts['port'] = 0;
}
// remove square brackets for IPv6 addresses
$host = trim($parts['host'], '[]');
return $this->resolveHost($host)->then(function ($host) use ($parts) {
$address = $parts['scheme'] . '://';
if (isset($parts['port']) && strpos($host, ':') !== false) {
// enclose IPv6 address in square brackets if a port will be appended
$host = '[' . $host . ']';
}
$address .= $host;
if (isset($parts['port'])) {
$address .= ':' . $parts['port'];
}
return $address;
});
}
protected function resolveHost($host)
{
// there's no need to resolve if the host is already given as an IP address
if (false !== filter_var($host, FILTER_VALIDATE_IP)) {
return Promise\resolve($host);
}
$promise = $this->resolver->resolve($host);
// wrap DNS lookup in order to control cancellation behavior
return new Promise\Promise(
function ($resolve, $reject) use ($promise) {
// forward promise resolution
$promise->then($resolve, $reject);
},
function ($_, $reject) use ($promise) {
// reject with custom message once cancelled
$reject(new \RuntimeException('Cancelled creating socket during DNS lookup'));
// (try to) cancel pending DNS lookup, otherwise ignoring its results
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}
);
}
}