diff --git a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php new file mode 100644 index 00000000..7fd2dc8d --- /dev/null +++ b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php @@ -0,0 +1,134 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Stream\Transport; + +use Amp\Promise; +use Amp\Socket\Socket; +use danog\MadelineProto\Stream\Async\RawStream; +use danog\MadelineProto\Stream\RawStreamInterface; +use function Amp\Socket\connect; +use function Amp\Socket\cryptoConnect; +use danog\MadelineProto\Stream\ProxyStreamInterface; +use Amp\ByteStream\ClosedException; +use danog\MadelineProto\Stream\ConnectionContext; + +/** + * Premade stream wrapper. + * + * Manages reading data in chunks + * + * @author Daniil Gentili + */ +class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInterface +{ + use RawStream; + private $stream; + + public function __construct() + { + } + + public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise + { + return $this->stream->enableCrypto($tlsContext); + } + + public function getStream() + { + return $this->stream; + } + + public function connectAsync(ConnectionContext $ctx, string $header = ''): \Generator + { + if ($header !== '') { + yield $this->stream->write($header); + } + } + + /** + * Async chunked read. + * + * @return Promise + */ + public function read(): Promise + { + return $this->stream ? $this->stream->read() : new \Amp\Success(null); + } + + /** + * Async write. + * + * @param string $data Data to write + * + * @return Promise + */ + public function write(string $data): Promise + { + if (!$this->stream) { + throw new ClosedException("MadelineProto stream was disconnected"); + } + return $this->stream->write($data); + } + + /** + * Async close. + * + * @return Generator + */ + public function disconnect() + { + try { + if ($this->stream) { + $this->stream->close(); + $this->stream = null; + } + } catch (\Throwable $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } catch (\Exception $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } + } + + public function close() + { + $this->disconnect(); + } + + /** + * {@inheritdoc} + * + * @return \Amp\Socket\Socket + */ + public function getSocket(): \Amp\Socket\Socket + { + return $this->stream; + } + + /** + * {@inheritdoc} + */ + public function setExtra($extra) + { + $this->stream = $extra; + } + public static function getName(): string + { + return __CLASS__; + } +}