. * * @author Daniil Gentili * @copyright 2016-2020 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * * @link https://docs.madelineproto.xyz MadelineProto documentation */ namespace danog\MadelineProto\Stream\Transport; use Amp\ByteStream\ClosedException; use Amp\CancellationToken; use Amp\Promise; use Amp\Socket\ClientTlsContext; use Amp\Socket\Connector; use Amp\Socket\EncryptableSocket; use Amp\Socket\Socket; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ProxyStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface; use function Amp\Socket\connector; /** * Default stream wrapper. * * Manages reading data in chunks * * @author Daniil Gentili */ class DefaultStream implements RawStreamInterface, ProxyStreamInterface { use RawStream; /** * Socket. * * @var EncryptableSocket */ protected $stream; /** * Connector. * * @var Connector */ private $connector; public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise { return $this->stream->setupTls($cancellationToken); } public function getStream() { return $this->stream; } public function connect(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator { $ctx = $ctx->getCtx(); $uri = $ctx->getUri(); $secure = $ctx->isSecure(); if ($secure) { $ctx->setSocketContext($ctx->getSocketContext()->withTlsContext(new ClientTlsContext($uri->getHost()))); } $this->stream = (yield ($this->connector ?? connector())->connect((string) $uri, $ctx->getSocketContext(), $ctx->getCancellationToken())); if ($secure) { yield $this->stream->setupTls(); } yield $this->stream->write($header); } /** * Async chunked read. * * @return Promise */ public function read(): Promise { return $this->stream ? $this->stream->read() : new \Amp\Success(null); } /** * Async write. * * @param string $data Data to write * * @return Promise */ public function write(string $data): Promise { if (!$this->stream) { throw new ClosedException("MadelineProto stream was disconnected"); } return $this->stream->write($data); } /** * Close. * * @return void */ public function disconnect() { try { if ($this->stream) { $this->stream->close(); $this->stream = null; } } catch (\Throwable $e) { \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); } } /** * Close. * * @return void */ public function close() { $this->disconnect(); } /** * {@inheritdoc} * * @return EncryptableSocket */ public function getSocket(): EncryptableSocket { return $this->stream; } public function setExtra($extra) { $this->connector = $extra; } public static function getName(): string { return __CLASS__; } }