1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
<?php
namespace React\Socket;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use InvalidArgumentException;
use RuntimeException;
/**
* The `TcpServer` class implements the `ServerInterface` and
* is responsible for accepting plaintext TCP/IP connections.
*
* ```php
* $server = new TcpServer(8080, $loop);
* ```
*
* Whenever a client connects, it will emit a `connection` event with a connection
* instance implementing `ConnectionInterface`:
*
* ```php
* $server->on('connection', function (ConnectionInterface $connection) {
* echo 'Plaintext connection from ' . $connection->getRemoteAddress() . PHP_EOL;
* $connection->write('hello there!' . PHP_EOL);
* …
* });
* ```
*
* See also the `ServerInterface` for more details.
*
* @see ServerInterface
* @see ConnectionInterface
*/
final class TcpServer extends EventEmitter implements ServerInterface
{
private $master;
private $loop;
private $listening = false;
/**
* Creates a plaintext TCP/IP socket server and starts listening on the given address
*
* This starts accepting new incoming connections on the given address.
* See also the `connection event` documented in the `ServerInterface`
* for more details.
*
* ```php
* $server = new TcpServer(8080, $loop);
* ```
*
* As above, the `$uri` parameter can consist of only a port, in which case the
* server will default to listening on the localhost address `127.0.0.1`,
* which means it will not be reachable from outside of this system.
*
* In order to use a random port assignment, you can use the port `0`:
*
* ```php
* $server = new TcpServer(0, $loop);
* $address = $server->getAddress();
* ```
*
* In order to change the host the socket is listening on, you can provide an IP
* address through the first parameter provided to the constructor, optionally
* preceded by the `tcp://` scheme:
*
* ```php
* $server = new TcpServer('192.168.0.1:8080', $loop);
* ```
*
* If you want to listen on an IPv6 address, you MUST enclose the host in square
* brackets:
*
* ```php
* $server = new TcpServer('[::1]:8080', $loop);
* ```
*
* If the given URI is invalid, does not contain a port, any other scheme or if it
* contains a hostname, it will throw an `InvalidArgumentException`:
*
* ```php
* // throws InvalidArgumentException due to missing port
* $server = new TcpServer('127.0.0.1', $loop);
* ```
*
* If the given URI appears to be valid, but listening on it fails (such as if port
* is already in use or port below 1024 may require root access etc.), it will
* throw a `RuntimeException`:
*
* ```php
* $first = new TcpServer(8080, $loop);
*
* // throws RuntimeException because port is already in use
* $second = new TcpServer(8080, $loop);
* ```
*
* Note that these error conditions may vary depending on your system and/or
* configuration.
* See the exception message and code for more details about the actual error
* condition.
*
* Optionally, you can specify [socket context options](http://php.net/manual/en/context.socket.php)
* for the underlying stream socket resource like this:
*
* ```php
* $server = new TcpServer('[::1]:8080', $loop, array(
* 'backlog' => 200,
* 'so_reuseport' => true,
* 'ipv6_v6only' => true
* ));
* ```
*
* Note that available [socket context options](http://php.net/manual/en/context.socket.php),
* their defaults and effects of changing these may vary depending on your system
* and/or PHP version.
* Passing unknown context options has no effect.
*
* @param string|int $uri
* @param LoopInterface $loop
* @param array $context
* @throws InvalidArgumentException if the listening address is invalid
* @throws RuntimeException if listening on this address fails (already in use etc.)
*/
public function __construct($uri, LoopInterface $loop, array $context = array())
{
$this->loop = $loop;
// a single port has been given => assume localhost
if ((string)(int)$uri === (string)$uri) {
$uri = '127.0.0.1:' . $uri;
}
// assume default scheme if none has been given
if (strpos($uri, '://') === false) {
$uri = 'tcp://' . $uri;
}
// parse_url() does not accept null ports (random port assignment) => manually remove
if (substr($uri, -2) === ':0') {
$parts = parse_url(substr($uri, 0, -2));
if ($parts) {
$parts['port'] = 0;
}
} else {
$parts = parse_url($uri);
}
// ensure URI contains TCP scheme, host and port
if (!$parts || !isset($parts['scheme'], $parts['host'], $parts['port']) || $parts['scheme'] !== 'tcp') {
throw new InvalidArgumentException('Invalid URI "' . $uri . '" given');
}
if (false === filter_var(trim($parts['host'], '[]'), FILTER_VALIDATE_IP)) {
throw new InvalidArgumentException('Given URI "' . $uri . '" does not contain a valid host IP');
}
$this->master = @stream_socket_server(
$uri,
$errno,
$errstr,
STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,
stream_context_create(array('socket' => $context))
);
if (false === $this->master) {
throw new RuntimeException('Failed to listen on "' . $uri . '": ' . $errstr, $errno);
}
stream_set_blocking($this->master, 0);
$this->resume();
}
public function getAddress()
{
if (!is_resource($this->master)) {
return null;
}
$address = stream_socket_get_name($this->master, false);
// check if this is an IPv6 address which includes multiple colons but no square brackets
$pos = strrpos($address, ':');
if ($pos !== false && strpos($address, ':') < $pos && substr($address, 0, 1) !== '[') {
$port = substr($address, $pos + 1);
$address = '[' . substr($address, 0, $pos) . ']:' . $port;
}
return 'tcp://' . $address;
}
public function pause()
{
if (!$this->listening) {
return;
}
$this->loop->removeReadStream($this->master);
$this->listening = false;
}
public function resume()
{
if ($this->listening || !is_resource($this->master)) {
return;
}
$that = $this;
$this->loop->addReadStream($this->master, function ($master) use ($that) {
$newSocket = @stream_socket_accept($master);
if (false === $newSocket) {
$that->emit('error', array(new RuntimeException('Error accepting new connection')));
return;
}
$that->handleConnection($newSocket);
});
$this->listening = true;
}
public function close()
{
if (!is_resource($this->master)) {
return;
}
$this->pause();
fclose($this->master);
$this->removeAllListeners();
}
/** @internal */
public function handleConnection($socket)
{
$this->emit('connection', array(
new Connection($socket, $this->loop)
));
}
}
|