From 8ab033e995ad5b364e6dd4e388d72953ce4f34f1 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 14:11:04 +0200 Subject: [PATCH] Add PremadeStream --- .../Stream/Transport/PremadeStream.php | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/danog/MadelineProto/Stream/Transport/PremadeStream.php diff --git a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php new file mode 100644 index 00000000..7fd2dc8d --- /dev/null +++ b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php @@ -0,0 +1,134 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Stream\Transport; + +use Amp\Promise; +use Amp\Socket\Socket; +use danog\MadelineProto\Stream\Async\RawStream; +use danog\MadelineProto\Stream\RawStreamInterface; +use function Amp\Socket\connect; +use function Amp\Socket\cryptoConnect; +use danog\MadelineProto\Stream\ProxyStreamInterface; +use Amp\ByteStream\ClosedException; +use danog\MadelineProto\Stream\ConnectionContext; + +/** + * Premade stream wrapper. + * + * Manages reading data in chunks + * + * @author Daniil Gentili + */ +class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInterface +{ + use RawStream; + private $stream; + + public function __construct() + { + } + + public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise + { + return $this->stream->enableCrypto($tlsContext); + } + + public function getStream() + { + return $this->stream; + } + + public function connectAsync(ConnectionContext $ctx, string $header = ''): \Generator + { + if ($header !== '') { + yield $this->stream->write($header); + } + } + + /** + * Async chunked read. + * + * @return Promise + */ + public function read(): Promise + { + return $this->stream ? $this->stream->read() : new \Amp\Success(null); + } + + /** + * Async write. + * + * @param string $data Data to write + * + * @return Promise + */ + public function write(string $data): Promise + { + if (!$this->stream) { + throw new ClosedException("MadelineProto stream was disconnected"); + } + return $this->stream->write($data); + } + + /** + * Async close. + * + * @return Generator + */ + public function disconnect() + { + try { + if ($this->stream) { + $this->stream->close(); + $this->stream = null; + } + } catch (\Throwable $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } catch (\Exception $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } + } + + public function close() + { + $this->disconnect(); + } + + /** + * {@inheritdoc} + * + * @return \Amp\Socket\Socket + */ + public function getSocket(): \Amp\Socket\Socket + { + return $this->stream; + } + + /** + * {@inheritdoc} + */ + public function setExtra($extra) + { + $this->stream = $extra; + } + public static function getName(): string + { + return __CLASS__; + } +}