Viewing file: Stream.php (14.51 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
<?php
/** * Hoa * * * @license * * New BSD License * * Copyright © 2007-2017, Hoa community. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of the Hoa nor the names of its contributors may be * used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */
namespace Psy\Readline\Hoa;
/** * Class \Hoa\Stream. * * Static register for all streams (files, sockets etc.). */ abstract class Stream implements IStream, EventListenable { use EventListens;
/** * Name index in the stream bucket. */ const NAME = 0;
/** * Handler index in the stream bucket. */ const HANDLER = 1;
/** * Resource index in the stream bucket. */ const RESOURCE = 2;
/** * Context index in the stream bucket. */ const CONTEXT = 3;
/** * Default buffer size. */ const DEFAULT_BUFFER_SIZE = 8192;
/** * Current stream bucket. */ protected $_bucket = [];
/** * Static stream register. */ private static $_register = [];
/** * Buffer size (default is 8Ko). */ protected $_bufferSize = self::DEFAULT_BUFFER_SIZE;
/** * Original stream name, given to the stream constructor. */ protected $_streamName = null;
/** * Context name. */ protected $_context = null;
/** * Whether the opening has been deferred. */ protected $_hasBeenDeferred = false;
/** * Whether this stream is already opened by another handler. */ protected $_borrowing = false;
/** * Set the current stream. * If not exists in the register, try to call the * `$this->_open()` method. Please, see the `self::_getStream()` method. */ public function __construct(string $streamName, string $context = null, bool $wait = false) { $this->_streamName = $streamName; $this->_context = $context; $this->_hasBeenDeferred = $wait; $this->setListener( new EventListener( $this, [ 'authrequire', 'authresult', 'complete', 'connect', 'failure', 'mimetype', 'progress', 'redirect', 'resolve', 'size', ] ) );
if (true === $wait) { return; }
$this->open();
return; }
/** * Get a stream in the register. * If the stream does not exist, try to open it by calling the * $handler->_open() method. */ private static function &_getStream( string $streamName, self $handler, string $context = null ): array { $name = \md5($streamName);
if (null !== $context) { if (false === StreamContext::contextExists($context)) { throw new StreamException('Context %s was not previously declared, cannot retrieve '.'this context.', 0, $context); }
$context = StreamContext::getInstance($context); }
if (!isset(self::$_register[$name])) { self::$_register[$name] = [ self::NAME => $streamName, self::HANDLER => $handler, self::RESOURCE => $handler->_open($streamName, $context), self::CONTEXT => $context, ]; Event::register( 'hoa://Event/Stream/'.$streamName, $handler ); // Add :open-ready? Event::register( 'hoa://Event/Stream/'.$streamName.':close-before', $handler ); } else { $handler->_borrowing = true; }
if (null === self::$_register[$name][self::RESOURCE]) { self::$_register[$name][self::RESOURCE] = $handler->_open($streamName, $context); }
return self::$_register[$name]; }
/** * Open the stream and return the associated resource. * Note: This method is protected, but do not forget that it could be * overloaded into a public context. */ abstract protected function &_open(string $streamName, StreamContext $context = null);
/** * Close the current stream. * Note: this method is protected, but do not forget that it could be * overloaded into a public context. */ abstract protected function _close(): bool;
/** * Open the stream. */ final public function open(): self { $context = $this->_context;
if (true === $this->hasBeenDeferred()) { if (null === $context) { $handle = StreamContext::getInstance(\uniqid()); $handle->setParameters([ 'notification' => [$this, '_notify'], ]); $context = $handle->getId(); } elseif (true === StreamContext::contextExists($context)) { $handle = StreamContext::getInstance($context); $parameters = $handle->getParameters();
if (!isset($parameters['notification'])) { $handle->setParameters([ 'notification' => [$this, '_notify'], ]); } } }
$this->_bufferSize = self::DEFAULT_BUFFER_SIZE; $this->_bucket = self::_getStream( $this->_streamName, $this, $context );
return $this; }
/** * Close the current stream. */ final public function close() { $streamName = $this->getStreamName();
if (null === $streamName) { return; }
$name = \md5($streamName);
if (!isset(self::$_register[$name])) { return; }
Event::notify( 'hoa://Event/Stream/'.$streamName.':close-before', $this, new EventBucket() );
if (false === $this->_close()) { return; }
unset(self::$_register[$name]); $this->_bucket[self::HANDLER] = null; Event::unregister( 'hoa://Event/Stream/'.$streamName ); Event::unregister( 'hoa://Event/Stream/'.$streamName.':close-before' );
return; }
/** * Get the current stream name. */ public function getStreamName() { if (empty($this->_bucket)) { return null; }
return $this->_bucket[self::NAME]; }
/** * Get the current stream. */ public function getStream() { if (empty($this->_bucket)) { return null; }
return $this->_bucket[self::RESOURCE]; }
/** * Get the current stream context. */ public function getStreamContext() { if (empty($this->_bucket)) { return null; }
return $this->_bucket[self::CONTEXT]; }
/** * Get stream handler according to its name. */ public static function getStreamHandler(string $streamName) { $name = \md5($streamName);
if (!isset(self::$_register[$name])) { return null; }
return self::$_register[$name][self::HANDLER]; }
/** * Set the current stream. Useful to manage a stack of streams (e.g. socket * and select). Notice that it could be unsafe to use this method without * taking time to think about it two minutes. Resource of type “Unknown” is * considered as valid. */ public function _setStream($stream) { if (false === \is_resource($stream) && ('resource' !== \gettype($stream) || 'Unknown' !== \get_resource_type($stream))) { throw new StreamException('Try to change the stream resource with an invalid one; '.'given %s.', 1, \gettype($stream)); }
$old = $this->_bucket[self::RESOURCE]; $this->_bucket[self::RESOURCE] = $stream;
return $old; }
/** * Check if the stream is opened. */ public function isOpened(): bool { return \is_resource($this->getStream()); }
/** * Set the timeout period. */ public function setStreamTimeout(int $seconds, int $microseconds = 0): bool { return \stream_set_timeout($this->getStream(), $seconds, $microseconds); }
/** * Whether the opening of the stream has been deferred. */ protected function hasBeenDeferred() { return $this->_hasBeenDeferred; }
/** * Check whether the connection has timed out or not. * This is basically a shortcut of `getStreamMetaData` + the `timed_out` * index, but the resulting code is more readable. */ public function hasTimedOut(): bool { $metaData = $this->getStreamMetaData();
return true === $metaData['timed_out']; }
/** * Set blocking/non-blocking mode. */ public function setStreamBlocking(bool $mode): bool { return \stream_set_blocking($this->getStream(), $mode); }
/** * Set stream buffer. * Output using fwrite() (or similar function) is normally buffered at 8 Ko. * This means that if there are two processes wanting to write to the same * output stream, each is paused after 8 Ko of data to allow the other to * write. */ public function setStreamBuffer(int $buffer): bool { // Zero means success. $out = 0 === \stream_set_write_buffer($this->getStream(), $buffer);
if (true === $out) { $this->_bufferSize = $buffer; }
return $out; }
/** * Disable stream buffering. * Alias of $this->setBuffer(0). */ public function disableStreamBuffer(): bool { return $this->setStreamBuffer(0); }
/** * Get stream buffer size. */ public function getStreamBufferSize(): int { return $this->_bufferSize; }
/** * Get stream wrapper name. */ public function getStreamWrapperName(): string { if (false === $pos = \strpos($this->getStreamName(), '://')) { return 'file'; }
return \substr($this->getStreamName(), 0, $pos); }
/** * Get stream meta data. */ public function getStreamMetaData(): array { return \stream_get_meta_data($this->getStream()); }
/** * Whether this stream is already opened by another handler. */ public function isBorrowing(): bool { return $this->_borrowing; }
/** * Notification callback. */ public function _notify( int $ncode, int $severity, $message, $code, $transferred, $max ) { static $_map = [ \STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire', \STREAM_NOTIFY_AUTH_RESULT => 'authresult', \STREAM_NOTIFY_COMPLETED => 'complete', \STREAM_NOTIFY_CONNECT => 'connect', \STREAM_NOTIFY_FAILURE => 'failure', \STREAM_NOTIFY_MIME_TYPE_IS => 'mimetype', \STREAM_NOTIFY_PROGRESS => 'progress', \STREAM_NOTIFY_REDIRECTED => 'redirect', \STREAM_NOTIFY_RESOLVE => 'resolve', \STREAM_NOTIFY_FILE_SIZE_IS => 'size', ];
$this->getListener()->fire($_map[$ncode], new EventBucket([ 'code' => $code, 'severity' => $severity, 'message' => $message, 'transferred' => $transferred, 'max' => $max, ])); }
/** * Call the $handler->close() method on each stream in the static stream * register. * This method does not check the return value of $handler->close(). Thus, * if a stream is persistent, the $handler->close() should do anything. It * is a very generic method. */ final public static function _Hoa_Stream() { foreach (self::$_register as $entry) { $entry[self::HANDLER]->close(); }
return; }
/** * Transform object to string. */ public function __toString(): string { return $this->getStreamName(); }
/** * Close the stream when destructing. */ public function __destruct() { if (false === $this->isOpened()) { return; }
$this->close();
return; } }
/** * Class \Hoa\Stream\_Protocol. * * The `hoa://Library/Stream` node. * * @license New BSD License */ class _Protocol extends ProtocolNode { /** * Component's name. * * @var string */ protected $_name = 'Stream';
/** * ID of the component. * * @param string $id ID of the component * * @return mixed */ public function reachId(string $id) { return Stream::getStreamHandler($id); } }
/* * Shutdown method. */ \register_shutdown_function([Stream::class, '_Hoa_Stream']);
/** * Add the `hoa://Library/Stream` node. Should be use to reach/get an entry * in the stream register. */ $protocol = Protocol::getInstance(); $protocol['Library'][] = new _Protocol();
|