createLoopMock(); new ReadableResourceStream($stream, $loop); } /** * @covers React\Stream\ReadableResourceStream::__construct * @doesNotPerformAssertions */ public function testConstructorWithExcessiveMode() { // excessive flags are ignored for temp streams, so we have to use a file stream $name = tempnam(sys_get_temp_dir(), 'test'); $stream = @fopen($name, 'r+eANYTHING'); unlink($name); $loop = $this->createLoopMock(); $buffer = new ReadableResourceStream($stream, $loop); $buffer->close(); } /** * @covers React\Stream\ReadableResourceStream::__construct * @expectedException InvalidArgumentException */ public function testConstructorThrowsExceptionOnInvalidStream() { $loop = $this->createLoopMock(); new ReadableResourceStream(false, $loop); } /** * @covers React\Stream\ReadableResourceStream::__construct * @expectedException InvalidArgumentException */ public function testConstructorThrowsExceptionOnWriteOnlyStream() { if (defined('HHVM_VERSION')) { $this->markTestSkipped('HHVM does not report fopen mode for STDOUT'); } $loop = $this->createLoopMock(); new ReadableResourceStream(STDOUT, $loop); } /** * @covers React\Stream\ReadableResourceStream::__construct * @expectedException InvalidArgumentException */ public function testConstructorThrowsExceptionOnWriteOnlyStreamWithExcessiveMode() { // excessive flags are ignored for temp streams, so we have to use a file stream $name = tempnam(sys_get_temp_dir(), 'test'); $stream = fopen($name, 'weANYTHING'); unlink($name); $loop = $this->createLoopMock(); new ReadableResourceStream($stream, $loop); } /** * @covers React\Stream\ReadableResourceStream::__construct * @expectedException RuntimeException */ public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() { if (!in_array('blocking', stream_get_wrappers())) { stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); } $stream = fopen('blocking://test', 'r+'); $loop = $this->createLoopMock(); new ReadableResourceStream($stream, $loop); } public function testCloseShouldEmitCloseEvent() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('close', $this->expectCallableOnce()); $conn->close(); $this->assertFalse($conn->isReadable()); } public function testCloseTwiceShouldEmitCloseEventOnce() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('close', $this->expectCallableOnce()); $conn->close(); $conn->close(); } /** * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ public function testDataEvent() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $capturedData = null; $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', function ($data) use (&$capturedData) { $capturedData = $data; }); fwrite($stream, "foobar\n"); rewind($stream); $conn->handleData($stream); $this->assertSame("foobar\n", $capturedData); } /** * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ public function testDataEventDoesEmitOneChunkMatchingBufferSize() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $capturedData = null; $conn = new ReadableResourceStream($stream, $loop, 4321); $conn->on('data', function ($data) use (&$capturedData) { $capturedData = $data; }); fwrite($stream, str_repeat("a", 100000)); rewind($stream); $conn->handleData($stream); $this->assertTrue($conn->isReadable()); $this->assertEquals(4321, strlen($capturedData)); } /** * @covers React\Stream\ReadableResourceStream::__construct * @covers React\Stream\ReadableResourceStream::handleData */ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $capturedData = null; $conn = new ReadableResourceStream($stream, $loop, -1); $conn->on('data', function ($data) use (&$capturedData) { $capturedData = $data; }); fwrite($stream, str_repeat("a", 100000)); rewind($stream); $conn->handleData($stream); $this->assertTrue($conn->isReadable()); $this->assertEquals(100000, strlen($capturedData)); } /** * @covers React\Stream\ReadableResourceStream::handleData */ public function testEmptyStreamShouldNotEmitData() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', $this->expectCallableNever()); $conn->handleData($stream); } public function testPipeShouldReturnDestination() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $this->assertSame($dest, $conn->pipe($dest)); } /** * @covers React\Stream\ReadableResourceStream::handleData */ public function testClosingStreamInDataEventShouldNotTriggerError() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('error', $this->expectCallableNever()); $conn->on('data', function ($data) use ($conn) { $conn->close(); }); fwrite($stream, "foobar\n"); rewind($stream); $conn->handleData($stream); } /** * @covers React\Stream\ReadableResourceStream::pause */ public function testPauseRemovesReadStreamFromLoop() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addReadStream')->with($stream); $loop->expects($this->once())->method('removeReadStream')->with($stream); $conn = new ReadableResourceStream($stream, $loop); $conn->pause(); $conn->pause(); } /** * @covers React\Stream\ReadableResourceStream::pause */ public function testResumeDoesAddStreamToLoopOnlyOnce() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addReadStream')->with($stream); $conn = new ReadableResourceStream($stream, $loop); $conn->resume(); $conn->resume(); } /** * @covers React\Stream\ReadableResourceStream::close */ public function testCloseRemovesReadStreamFromLoop() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addReadStream')->with($stream); $loop->expects($this->once())->method('removeReadStream')->with($stream); $conn = new ReadableResourceStream($stream, $loop); $conn->close(); } /** * @covers React\Stream\ReadableResourceStream::close */ public function testCloseAfterPauseRemovesReadStreamFromLoopOnce() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addReadStream')->with($stream); $loop->expects($this->once())->method('removeReadStream')->with($stream); $conn = new ReadableResourceStream($stream, $loop); $conn->pause(); $conn->close(); } /** * @covers React\Stream\ReadableResourceStream::close */ public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce() { $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $loop->expects($this->once())->method('addReadStream')->with($stream); $conn = new ReadableResourceStream($stream, $loop); $conn->close(); $conn->resume(); } /** * @covers React\Stream\ReadableResourceStream::handleData */ public function testDataFiltered() { $stream = fopen('php://temp', 'r+'); // add a filter which removes every 'a' when reading Filter\append($stream, function ($chunk) { return str_replace('a', '', $chunk); }, STREAM_FILTER_READ); $loop = $this->createLoopMock(); $capturedData = null; $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', function ($data) use (&$capturedData) { $capturedData = $data; }); fwrite($stream, "foobar\n"); rewind($stream); $conn->handleData($stream); $this->assertSame("foobr\n", $capturedData); } /** * @covers React\Stream\ReadableResourceStream::handleData */ public function testDataErrorShouldEmitErrorAndClose() { $stream = fopen('php://temp', 'r+'); // add a filter which returns an error when encountering an 'a' when reading Filter\append($stream, function ($chunk) { if (strpos($chunk, 'a') !== false) { throw new \Exception('Invalid'); } return $chunk; }, STREAM_FILTER_READ); $loop = $this->createLoopMock(); $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', $this->expectCallableNever()); $conn->on('error', $this->expectCallableOnce()); $conn->on('close', $this->expectCallableOnce()); fwrite($stream, "foobar\n"); rewind($stream); $conn->handleData($stream); } private function createLoopMock() { return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); } }