diff options
Diffstat (limited to 'assets/php/vendor/react/stream/tests/DuplexResourceStreamIntegrationTest.php')
-rw-r--r-- | assets/php/vendor/react/stream/tests/DuplexResourceStreamIntegrationTest.php | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/assets/php/vendor/react/stream/tests/DuplexResourceStreamIntegrationTest.php b/assets/php/vendor/react/stream/tests/DuplexResourceStreamIntegrationTest.php new file mode 100644 index 0000000..fb5f02a --- /dev/null +++ b/assets/php/vendor/react/stream/tests/DuplexResourceStreamIntegrationTest.php @@ -0,0 +1,352 @@ +<?php + +namespace React\Tests\Stream; + +use React\Stream\DuplexResourceStream; +use React\Stream\ReadableResourceStream; +use React\EventLoop\ExtEventLoop; +use React\EventLoop\ExtLibeventLoop; +use React\EventLoop\ExtLibevLoop; +use React\EventLoop\LoopInterface; +use React\EventLoop\LibEventLoop; +use React\EventLoop\LibEvLoop; +use React\EventLoop\StreamSelectLoop; + +class DuplexResourceStreamIntegrationTest extends TestCase +{ + public function loopProvider() + { + return array( + array( + function() { + return true; + }, + function () { + return new StreamSelectLoop(); + } + ), + array( + function () { + return function_exists('event_base_new'); + }, + function () { + return class_exists('React\EventLoop\ExtLibeventLoop') ? new ExtLibeventLoop() : new LibEventLoop(); + } + ), + array( + function () { + return class_exists('libev\EventLoop'); + }, + function () { + return class_exists('React\EventLoop\ExtLibevLoop') ? new ExtLibevLoop() : new LibEvLoop(); + } + ), + array( + function () { + return class_exists('EventBase') && class_exists('React\EventLoop\ExtEventLoop'); + }, + function () { + return new ExtEventLoop(); + } + ) + ); + } + + /** + * @dataProvider loopProvider + */ + public function testBufferReadsLargeChunks($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + + $bufferSize = 4096; + $streamA = new DuplexResourceStream($sockA, $loop, $bufferSize); + $streamB = new DuplexResourceStream($sockB, $loop, $bufferSize); + + $testString = str_repeat("*", $bufferSize + 1); + + $buffer = ""; + $streamB->on('data', function ($data) use (&$buffer) { + $buffer .= $data; + }); + + $streamA->write($testString); + + $this->loopTick($loop); + $this->loopTick($loop); + $this->loopTick($loop); + + $streamA->close(); + $streamB->close(); + + $this->assertEquals($testString, $buffer); + } + + /** + * @dataProvider loopProvider + */ + public function testWriteLargeChunk($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + + $streamA = new DuplexResourceStream($sockA, $loop); + $streamB = new DuplexResourceStream($sockB, $loop); + + // limit seems to be 192 KiB + $size = 256 * 1024; + + // sending side sends and expects clean close with no errors + $streamA->end(str_repeat('*', $size)); + $streamA->on('close', $this->expectCallableOnce()); + $streamA->on('error', $this->expectCallableNever()); + + // receiving side counts bytes and expects clean close with no errors + $received = 0; + $streamB->on('data', function ($chunk) use (&$received) { + $received += strlen($chunk); + }); + $streamB->on('close', $this->expectCallableOnce()); + $streamB->on('error', $this->expectCallableNever()); + + $loop->run(); + + $streamA->close(); + $streamB->close(); + + $this->assertEquals($size, $received); + } + + /** + * @dataProvider loopProvider + */ + public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + + $streamA = new DuplexResourceStream($sockA, $loop); + $streamB = new DuplexResourceStream($sockB, $loop); + + // end streamA without writing any data + $streamA->end(); + + // streamB should not emit any data + $streamB->on('data', $this->expectCallableNever()); + + $loop->run(); + + $streamA->close(); + $streamB->close(); + } + + /** + * @dataProvider loopProvider + */ + public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); + + $streamA = new DuplexResourceStream($sockA, $loop); + $streamB = new DuplexResourceStream($sockB, $loop); + + // end streamA without writing any data + $streamA->pause(); + $streamA->write('hello'); + $streamA->on('close', $this->expectCallableOnce()); + + $streamB->on('data', $this->expectCallableNever()); + $streamB->close(); + + $loop->run(); + + $streamA->close(); + $streamB->close(); + } + + /** + * @dataProvider loopProvider + */ + public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $server = stream_socket_server('tcp://127.0.0.1:0'); + + $client = stream_socket_client(stream_socket_get_name($server, false)); + $peer = stream_socket_accept($server); + + $streamA = new DuplexResourceStream($client, $loop); + $streamB = new DuplexResourceStream($peer, $loop); + + // end streamA without writing any data + $streamA->pause(); + $streamA->write('hello'); + $streamA->on('close', $this->expectCallableOnce()); + + $streamB->on('data', $this->expectCallableNever()); + $streamB->close(); + + $loop->run(); + + $streamA->close(); + $streamB->close(); + } + + /** + * @dataProvider loopProvider + */ + public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $server = stream_socket_server('tcp://127.0.0.1:0'); + + $client = stream_socket_client(stream_socket_get_name($server, false)); + $peer = stream_socket_accept($server); + + $streamA = new DuplexResourceStream($peer, $loop); + $streamB = new DuplexResourceStream($client, $loop); + + // end streamA without writing any data + $streamA->pause(); + $streamA->write('hello'); + $streamA->on('close', $this->expectCallableOnce()); + + $streamB->on('data', $this->expectCallableNever()); + $streamB->close(); + + $loop->run(); + + $streamA->close(); + $streamB->close(); + } + + /** + * @dataProvider loopProvider + */ + public function testReadsSingleChunkFromProcessPipe($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $stream = new ReadableResourceStream(popen('echo test', 'r'), $loop); + $stream->on('data', $this->expectCallableOnceWith("test\n")); + $stream->on('end', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); + + $loop->run(); + } + + /** + * @dataProvider loopProvider + */ + public function testReadsMultipleChunksFromProcessPipe($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $stream = new ReadableResourceStream(popen('echo a;sleep 0.1;echo b;sleep 0.1;echo c', 'r'), $loop); + + $buffer = ''; + $stream->on('data', function ($chunk) use (&$buffer) { + $buffer .= $chunk; + }); + + $stream->on('end', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); + + $loop->run(); + + $this->assertEquals("a\n" . "b\n" . "c\n", $buffer); + } + + /** + * @dataProvider loopProvider + */ + public function testReadsLongChunksFromProcessPipe($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $stream = new ReadableResourceStream(popen('dd if=/dev/zero bs=12345 count=1234 2>&-', 'r'), $loop); + + $bytes = 0; + $stream->on('data', function ($chunk) use (&$bytes) { + $bytes += strlen($chunk); + }); + + $stream->on('end', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); + + $loop->run(); + + $this->assertEquals(12345 * 1234, $bytes); + } + + /** + * @dataProvider loopProvider + */ + public function testReadsNothingFromProcessPipeWithNoOutput($condition, $loopFactory) + { + if (true !== $condition()) { + return $this->markTestSkipped('Loop implementation not available'); + } + + $loop = $loopFactory(); + + $stream = new ReadableResourceStream(popen('true', 'r'), $loop); + $stream->on('data', $this->expectCallableNever()); + $stream->on('end', $this->expectCallableOnce()); + $stream->on('error', $this->expectCallableNever()); + + $loop->run(); + } + + private function loopTick(LoopInterface $loop) + { + $loop->addTimer(0, function () use ($loop) { + $loop->stop(); + }); + $loop->run(); + } +} |