loop = $loop; if (strpos($path, '://') === false) { $path = 'unix://' . $path; } elseif (substr($path, 0, 7) !== 'unix://') { throw new InvalidArgumentException('Given URI "' . $path . '" is invalid'); } $this->master = @stream_socket_server( $path, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, stream_context_create(array('socket' => $context)) ); if (false === $this->master) { throw new RuntimeException('Failed to listen on unix domain socket "' . $path . '": ' . $errstr, $errno); } stream_set_blocking($this->master, 0); $this->resume(); } public function getAddress() { if (!is_resource($this->master)) { return null; } return 'unix://' . stream_socket_get_name($this->master, false); } public function pause() { if (!$this->listening) { return; } $this->loop->removeReadStream($this->master); $this->listening = false; } public function resume() { if ($this->listening || !is_resource($this->master)) { return; } $that = $this; $this->loop->addReadStream($this->master, function ($master) use ($that) { $newSocket = @stream_socket_accept($master); if (false === $newSocket) { $that->emit('error', array(new RuntimeException('Error accepting new connection'))); return; } $that->handleConnection($newSocket); }); $this->listening = true; } public function close() { if (!is_resource($this->master)) { return; } $this->pause(); fclose($this->master); $this->removeAllListeners(); } /** @internal */ public function handleConnection($socket) { $connection = new Connection($socket, $this->loop); $connection->unix = true; $this->emit('connection', array( $connection )); } }