getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $ret = Util::pipe($readable, $writable); $this->assertSame($writable, $ret); } public function testPipeNonReadableSourceShouldDoNothing() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $readable ->expects($this->any()) ->method('isReadable') ->willReturn(false); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->never()) ->method('isWritable'); $writable ->expects($this->never()) ->method('end'); Util::pipe($readable, $writable); } public function testPipeIntoNonWritableDestinationShouldPauseSource() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $readable ->expects($this->any()) ->method('isReadable') ->willReturn(true); $readable ->expects($this->once()) ->method('pause'); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(false); $writable ->expects($this->never()) ->method('end'); Util::pipe($readable, $writable); } public function testPipeClosingDestPausesSource() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $readable ->expects($this->any()) ->method('isReadable') ->willReturn(true); $readable ->expects($this->once()) ->method('pause'); $writable = new ThroughStream(); Util::pipe($readable, $writable); $writable->close(); } public function testPipeWithEnd() { $readable = new Stub\ReadableStreamStub(); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(true); $writable ->expects($this->once()) ->method('end'); Util::pipe($readable, $writable); $readable->end(); } public function testPipeWithoutEnd() { $readable = new Stub\ReadableStreamStub(); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(true); $writable ->expects($this->never()) ->method('end'); Util::pipe($readable, $writable, array('end' => false)); $readable->end(); } public function testPipeWithTooSlowWritableShouldPauseReadable() { $readable = new Stub\ReadableStreamStub(); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(true); $writable ->expects($this->once()) ->method('write') ->with('some data') ->will($this->returnValue(false)); $readable->pipe($writable); $this->assertFalse($readable->paused); $readable->write('some data'); $this->assertTrue($readable->paused); } public function testPipeWithTooSlowWritableShouldResumeOnDrain() { $readable = new Stub\ReadableStreamStub(); $onDrain = null; $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable ->expects($this->any()) ->method('isWritable') ->willReturn(true); $writable ->expects($this->any()) ->method('on') ->will($this->returnCallback(function ($name, $callback) use (&$onDrain) { if ($name === 'drain') { $onDrain = $callback; } })); $readable->pipe($writable); $readable->pause(); $this->assertTrue($readable->paused); $this->assertNotNull($onDrain); $onDrain(); $this->assertFalse($readable->paused); } public function testPipeWithWritableResourceStream() { $readable = new Stub\ReadableStreamStub(); $stream = fopen('php://temp', 'r+'); $loop = $this->createLoopMock(); $buffer = new WritableResourceStream($stream, $loop); $readable->pipe($buffer); $readable->write('hello, I am some '); $readable->write('random data'); $buffer->handleWrite(); rewind($stream); $this->assertSame('hello, I am some random data', stream_get_contents($stream)); } public function testPipeSetsUpListeners() { $source = new ThroughStream(); $dest = new ThroughStream(); $this->assertCount(0, $source->listeners('data')); $this->assertCount(0, $source->listeners('end')); $this->assertCount(0, $dest->listeners('drain')); Util::pipe($source, $dest); $this->assertCount(1, $source->listeners('data')); $this->assertCount(1, $source->listeners('end')); $this->assertCount(1, $dest->listeners('drain')); } public function testPipeClosingSourceRemovesListeners() { $source = new ThroughStream(); $dest = new ThroughStream(); Util::pipe($source, $dest); $source->close(); $this->assertCount(0, $source->listeners('data')); $this->assertCount(0, $source->listeners('end')); $this->assertCount(0, $dest->listeners('drain')); } public function testPipeClosingDestRemovesListeners() { $source = new ThroughStream(); $dest = new ThroughStream(); Util::pipe($source, $dest); $dest->close(); $this->assertCount(0, $source->listeners('data')); $this->assertCount(0, $source->listeners('end')); $this->assertCount(0, $dest->listeners('drain')); } public function testPipeDuplexIntoSelfEndsOnEnd() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $readable->expects($this->any())->method('isReadable')->willReturn(true); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable->expects($this->any())->method('isWritable')->willReturn(true); $duplex = new CompositeStream($readable, $writable); Util::pipe($duplex, $duplex); $writable->expects($this->once())->method('end'); $duplex->emit('end'); } /** @test */ public function forwardEventsShouldSetupForwards() { $source = new ThroughStream(); $target = new ThroughStream(); Util::forwardEvents($source, $target, array('data')); $target->on('data', $this->expectCallableOnce()); $target->on('foo', $this->expectCallableNever()); $source->emit('data', array('hello')); $source->emit('foo', array('bar')); } private function createLoopMock() { return $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); } private function notEqualTo($value) { return new \PHPUnit_Framework_Constraint_Not($value); } }