Update websocket logic

This commit is contained in:
Daniil Gentili 2019-12-13 15:37:57 +01:00
parent 89bb80285b
commit a7b0a16a01
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
13 changed files with 92 additions and 154 deletions

View File

@ -32,7 +32,8 @@
"amphp/file": "^1", "amphp/file": "^1",
"amphp/byte-stream": "^1.6", "amphp/byte-stream": "^1.6",
"danog/dns-over-https": "^0.2", "danog/dns-over-https": "^0.2",
"amphp/http-client-cookies": "dev-master" "amphp/http-client-cookies": "dev-master",
"amphp/uri": "dev-master"
}, },
"require-dev": { "require-dev": {
"phpdocumentor/reflection-docblock": "^4.3", "phpdocumentor/reflection-docblock": "^4.3",

View File

@ -18,6 +18,7 @@
*/ */
namespace danog\MadelineProto; namespace danog\MadelineProto;
use Amp\CancellationToken; use Amp\CancellationToken;
use Amp\MultiReasonException; use Amp\MultiReasonException;
use Amp\NullCancellationToken; use Amp\NullCancellationToken;
@ -28,20 +29,22 @@ use Amp\Socket\Connector;
class ContextConnector implements Connector class ContextConnector implements Connector
{ {
private $dataCenter; private $dataCenter;
private $logger;
private $fromDns = false; private $fromDns = false;
public function __construct(DataCenter $dataCenter, bool $fromDns = false) public function __construct(DataCenter $dataCenter, bool $fromDns = false)
{ {
$this->dataCenter = $dataCenter; $this->dataCenter = $dataCenter;
$this->fromDns = false; $this->fromDns = false;
$this->logger = $dataCenter->getAPI()->getLogger();
} }
public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise
{ {
return Tools::call(function () use ($uri, $ctx, $token) { return Tools::call((function () use ($uri, $ctx, $token) {
$ctx = $ctx ?? new ConnectContext; $ctx = $ctx ?? new ConnectContext;
$token = $token ?? new NullCancellationToken; $token = $token ?? new NullCancellationToken;
$ctxs = $this->datacenter->generateContexts(0, $uri, $ctx); $ctxs = $this->dataCenter->generateContexts(0, $uri, $ctx);
if (empty($ctxs)) { if (empty($ctxs)) {
throw new Exception("No contexts for raw connection to URI $uri"); throw new Exception("No contexts for raw connection to URI $uri");
} }
@ -51,24 +54,23 @@ class ContextConnector implements Connector
$ctx->setIsDns($this->fromDns); $ctx->setIsDns($this->fromDns);
$ctx->setCancellationToken($token); $ctx->setCancellationToken($token);
$result = yield $ctx->getStream(); $result = yield $ctx->getStream();
$this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING); $this->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING);
return $result->getSocket(); return $result->getSocket();
} catch (\Throwable $e) { } catch (\Throwable $e) {
if (\MADELINEPROTO_TEST === 'pony') { if (\MADELINEPROTO_TEST === 'pony') {
throw $e; throw $e;
} }
$this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR);
if ($e instanceof MultiReasonException) { if ($e instanceof MultiReasonException) {
foreach ($e->getReasons() as $reason) { foreach ($e->getReasons() as $reason) {
$this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR);
} }
} }
} }
} }
throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
}); throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
})());
} }
} }

View File

@ -31,6 +31,7 @@ use Amp\Http\Client\Cookie\CookieJar;
use Amp\Http\Client\Cookie\InMemoryCookieJar; use Amp\Http\Client\Cookie\InMemoryCookieJar;
use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Socket\ConnectContext; use Amp\Socket\ConnectContext;
use Amp\Websocket\Client\Rfc6455Connector; use Amp\Websocket\Client\Rfc6455Connector;
use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\PermAuthKey;
@ -513,17 +514,7 @@ class DataCenter
$stream[1] = new DoHConnector($this, $ctx); $stream[1] = new DoHConnector($this, $ctx);
} }
if (\in_array($stream[0], [WsStream::class, WssStream::class]) && $stream[1] === []) { if (\in_array($stream[0], [WsStream::class, WssStream::class]) && $stream[1] === []) {
$stream[1] = new Rfc6455Connector( $stream[1] = new Rfc6455Connector($this->HTTPClient);
(new HttpClientBuilder)
->usingPool(
new UnlimitedConnectionPool(
new DefaultConnectionFactory(
new DoHConnector($this, $ctx)
)
)
)
->build()
);
} }
$ctx->addStream(...$stream); $ctx->addStream(...$stream);
} }
@ -547,6 +538,16 @@ class DataCenter
return $ctxs; return $ctxs;
} }
/**
* Get main API.
*
* @return MTProto
*/
public function getAPI(): MTProto
{
return $this->API;
}
/** /**
* Get async HTTP client. * Get async HTTP client.
* *
@ -594,7 +595,7 @@ class DataCenter
*/ */
public function fileGetContents(string $url): \Generator public function fileGetContents(string $url): \Generator
{ {
return yield (yield $this->getHTTPClient()->request($url))->getBody()->buffer(); return yield (yield $this->getHTTPClient()->request(new Request($url)))->getBody()->buffer();
} }
/** /**

View File

@ -37,13 +37,13 @@ use function Amp\Socket\Internal\parseUri;
class DoHConnector implements Connector class DoHConnector implements Connector
{ {
/** /**
* Datacenter instance * Datacenter instance.
* *
* @property DataCenter $dataCenter * @property DataCenter $dataCenter
*/ */
private $dataCenter; private $dataCenter;
/** /**
* Connection context * Connection context.
* *
* @var ConnectionContext * @var ConnectionContext
*/ */
@ -56,7 +56,7 @@ class DoHConnector implements Connector
public function connect(string $uri, ?ConnectContext $socketContext = null, ?CancellationToken $token = null): Promise public function connect(string $uri, ?ConnectContext $socketContext = null, ?CancellationToken $token = null): Promise
{ {
return Tools::call(function () use ($uri, $socketContext, $token) { return Tools::call((function () use ($uri, $socketContext, $token) {
$socketContext = $socketContext ?? new ConnectContext; $socketContext = $socketContext ?? new ConnectContext;
$token = $token ?? new NullCancellationToken; $token = $token ?? new NullCancellationToken;
@ -115,7 +115,7 @@ class DoHConnector implements Connector
} }
} }
} }
$flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT; $flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT;
$timeout = $socketContext->getConnectTimeout(); $timeout = $socketContext->getConnectTimeout();
foreach ($uris as $builtUri) { foreach ($uris as $builtUri) {
@ -176,7 +176,6 @@ class DoHConnector implements Connector
// This is reached if either all URIs failed or the maximum number of attempts is reached. // This is reached if either all URIs failed or the maximum number of attempts is reached.
/** @noinspection PhpUndefinedVariableInspection */ /** @noinspection PhpUndefinedVariableInspection */
throw $e; throw $e;
})());
});
} }
} }

View File

@ -4067,12 +4067,21 @@ class InternalDoc extends APIFactory
{ {
return $this->__call(__FUNCTION__, [$extra]); return $this->__call(__FUNCTION__, [$extra]);
} }
/**
* Get logger.
*
* @return Logger
*/
public function getLogger(array $extra = []): danog\MadelineProto\Logger
{
return $this->__call(__FUNCTION__, [$extra]);
}
/** /**
* Get async HTTP client. * Get async HTTP client.
* *
* @return \Amp\Artax\Client * @return \Amp\Http\Client\DelegateHttpClient
*/ */
public function getHTTPClient(array $extra = []): Amp\Artax\Client public function getHTTPClient(array $extra = []): Amp\Http\Client\DelegateHttpClient
{ {
return $this->__call(__FUNCTION__, [$extra]); return $this->__call(__FUNCTION__, [$extra]);
} }

View File

@ -637,10 +637,19 @@ class MTProto extends AsyncConstruct implements TLCallback
* *
* @return TL * @return TL
*/ */
public function getTL(): TL public function getTL(): \danog\MadelineProto\TL\TL
{ {
return $this->TL; return $this->TL;
} }
/**
* Get logger.
*
* @return Logger
*/
public function getLogger(): Logger
{
return $this->logger;
}
/** /**
* Get async HTTP client. * Get async HTTP client.

View File

@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream;
use Amp\CancellationToken; use Amp\CancellationToken;
use Amp\Socket\ConnectContext; use Amp\Socket\ConnectContext;
use Amp\Uri\Uri;
use danog\MadelineProto\Exception; use danog\MadelineProto\Exception;
use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream;
use danog\MadelineProto\Stream\Transport\DefaultStream; use danog\MadelineProto\Stream\Transport\DefaultStream;

View File

@ -102,7 +102,7 @@ class FullStream implements BufferedStreamInterface, MTProtoBufferInterface
$this->in_seq_no++; $this->in_seq_no++;
$in_seq_no = \unpack('V', yield $buffer->bufferRead(4))[1]; $in_seq_no = \unpack('V', yield $buffer->bufferRead(4))[1];
if ($in_seq_no != $this->in_seq_no) { if ($in_seq_no != $this->in_seq_no) {
throw new Exception('Incoming seq_no mismatch'); throw new \danog\MadelineProto\Exception('Incoming seq_no mismatch');
} }
return $buffer; return $buffer;

View File

@ -20,7 +20,6 @@ namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket; use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Stream\Async\Buffer; use danog\MadelineProto\Stream\Async\Buffer;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
@ -171,7 +170,7 @@ class ObfuscatedStream implements BufferedProxyStreamInterface
$this->append_after = 0; $this->append_after = 0;
$this->append = ''; $this->append = '';
throw new Exception('Tried to send too much out of frame data, cannot append'); throw new \danog\MadelineProto\Exception('Tried to send too much out of frame data, cannot append');
} }
} }

View File

@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream\Proxy;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientTlsContext; use Amp\Socket\ClientTlsContext;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;

View File

@ -21,6 +21,7 @@ namespace danog\MadelineProto\Stream\Transport;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\CancellationToken; use Amp\CancellationToken;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\EncryptableSocket; use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket; use Amp\Socket\Socket;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
@ -67,7 +68,18 @@ class DefaultStream implements
public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator
{ {
$this->stream = yield ($this->connector ?? connector())($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); $ctx = $ctx->getCtx();
$uri = $ctx->getUri();
$secure = $ctx->isSecure();
if ($secure) {
$ctx->setSocketContext(
$ctx->getSocketContext()->withTlsContext(
new ClientTlsContext($uri->getHost())
)
);
}
$this->stream = yield ($this->connector ?? connector())->connect((string) $uri, $ctx->getSocketContext(), $ctx->getCancellationToken());
if ($ctx->isSecure()) { if ($ctx->isSecure()) {
yield $this->stream->setupTls(); yield $this->stream->setupTls();
} }

View File

@ -18,28 +18,22 @@
namespace danog\MadelineProto\Stream\Transport; namespace danog\MadelineProto\Stream\Transport;
use Amp\Http\Rfc7230;
use Amp\Http\Status;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ConnectException; use Amp\Socket\EncryptableSocket;
use Amp\Websocket\Client\ConnectionException;
use Amp\Websocket\Client\Handshake; use Amp\Websocket\Client\Handshake;
use Amp\Websocket\Client\Internal\ClientSocket;
use Amp\Websocket\Client\Rfc6455Connection;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\Rfc7692CompressionFactory;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\ProxyStreamInterface;
use danog\MadelineProto\Stream\RawStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface;
use function Amp\Websocket\generateKey;
use function Amp\Websocket\validateAcceptForKey; use function Amp\Websocket\Client\connector;
/** /**
* Websocket stream wrapper. * Websocket stream wrapper.
* *
* @author Daniil Gentili <daniil@daniil.it> * @author Daniil Gentili <daniil@daniil.it>
*/ */
class WsStream implements RawStreamInterface class WsStream implements RawStreamInterface, ProxyStreamInterface
{ {
use RawStream; use RawStream;
@ -55,6 +49,12 @@ class WsStream implements RawStreamInterface
* @var Message * @var Message
*/ */
private $message; private $message;
/**
* Websocket Connector.
*
* @var Connector
*/
private $connector;
/** /**
* Connect to stream. * Connect to stream.
@ -66,38 +66,11 @@ class WsStream implements RawStreamInterface
public function connectGenerator(ConnectionContext $ctx, string $header = ''): \Generator public function connectGenerator(ConnectionContext $ctx, string $header = ''): \Generator
{ {
$this->dc = $ctx->getIntDc(); $this->dc = $ctx->getIntDc();
$stream = yield $ctx->getStream();
$resource = $stream->getStream()->getResource();
$this->compressionFactory = new Rfc7692CompressionFactory();
$handshake = new Handshake(\str_replace('tcp://', $ctx->isSecure() ? 'ws://' : 'wss://', $ctx->getStringUri())); $handshake = new Handshake(\str_replace('tcp://', $ctx->isSecure() ? 'ws://' : 'wss://', $ctx->getStringUri()));
$key = generateKey(); $this->stream = yield ($this->connector ?? connector())->connect($handshake, $ctx->getCancellationToken());
yield $stream->write($this->generateRequest($handshake, $key));
$buffer = '';
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
if ($position = \strpos($buffer, "\r\n\r\n")) {
$headerBuffer = \substr($buffer, 0, $position + 4);
$buffer = \substr($buffer, $position + 4);
$headers = $this->handleResponse($headerBuffer, $key);
$client = new Rfc6455Client(
new ClientSocket($resource, $buffer),
$handshake->getOptions(),
true
);
$this->stream = new Rfc6455Connection($client, $headers);
//$this->stream = new Rfc6455Connection($this->stream, $headers, $buffer);
break;
}
}
if (!$this->stream) {
throw new ConnectionException('Failed to read response from server');
}
yield $this->write($header); yield $this->write($header);
} }
@ -108,7 +81,7 @@ class WsStream implements RawStreamInterface
{ {
try { try {
$this->stream->close(); $this->stream->close();
} catch (Exception $e) { } catch (\Throwable $e) {
} }
} }
@ -123,7 +96,7 @@ class WsStream implements RawStreamInterface
$data = yield $this->message->buffer(); $data = yield $this->message->buffer();
$this->message = null; $this->message = null;
} }
} catch (Exception $e) { } catch (\Exception $e) {
if ($e->getReason() !== 'Client closed the underlying TCP connection') { if ($e->getReason() !== 'Client closed the underlying TCP connection') {
throw $e; throw $e;
} }
@ -145,88 +118,19 @@ class WsStream implements RawStreamInterface
return $this->stream->sendBinary($data); return $this->stream->sendBinary($data);
} }
private function generateRequest(Handshake $handshake, string $key): string
{
$uri = $handshake->getUri();
$headers = $handshake->getHeaders();
$headers['host'] = [$uri->getAuthority()];
$headers['connection'] = ['Upgrade'];
$headers['upgrade'] = ['websocket'];
$headers['sec-websocket-version'] = ['13'];
$headers['sec-websocket-key'] = [$key];
if ($handshake->getOptions()->isCompressionEnabled()) {
$headers['sec-websocket-extensions'] = [$this->compressionFactory->createRequestHeader()];
}
if (($path = $uri->getPath()) === '') {
$path = '/';
}
if (($query = $uri->getQuery()) !== '') {
$path .= '?'.$query;
}
return \sprintf("GET %s HTTP/1.1\r\n%s\r\n", $path, Rfc7230::formatHeaders($headers));
}
private function handleResponse(string $headerBuffer, string $key): array
{
if (\substr($headerBuffer, -4) !== "\r\n\r\n") {
throw new ConnectException('Invalid header provided');
}
$position = \strpos($headerBuffer, "\r\n");
$startLine = \substr($headerBuffer, 0, $position);
if (!\preg_match("/^HTTP\/(1\.[01]) (\d{3}) ([^\x01-\x08\x10-\x19]*)$/i", $startLine, $matches)) {
throw new ConnectException('Invalid response start line: '.$startLine);
}
$version = $matches[1];
$status = (int) $matches[2];
$reason = $matches[3];
if ($version !== '1.1' || $status !== Status::SWITCHING_PROTOCOLS) {
throw new ConnectionException(
\sprintf('Did not receive switching protocols response: %d %s on DC %d', $status, $reason, $this->dc),
$status
);
}
$headerBuffer = \substr($headerBuffer, $position + 2, -2);
$headers = Rfc7230::parseHeaders($headerBuffer);
$upgrade = $headers['upgrade'][0] ?? '';
if (\strtolower($upgrade) !== 'websocket') {
throw new ConnectionException('Missing "Upgrade: websocket" header');
}
$connection = $headers['connection'][0] ?? '';
if (!\in_array('upgrade', \array_map('trim', \array_map('strtolower', \explode(',', $connection))), true)) {
throw new ConnectionException('Missing "Connection: upgrade" header');
}
$secWebsocketAccept = $headers['sec-websocket-accept'][0] ?? '';
if (!validateAcceptForKey($secWebsocketAccept, $key)) {
throw new ConnectionException('Invalid "Sec-WebSocket-Accept" header');
}
return $headers;
}
final protected function createCompressionContext(array $headers): ?Websocket\CompressionContext
{
$extensions = $headers['sec-websocket-extensions'][0] ?? '';
$extensions = \array_map('trim', \explode(',', $extensions));
foreach ($extensions as $extension) {
if ($compressionContext = $this->compressionFactory->fromServerHeader($extension)) {
return $compressionContext;
}
}
return null;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return \Amp\Socket\Socket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }
public function setExtra($extra)
{
$this->connector = $extra;
}
public static function getName(): string public static function getName(): string
{ {

View File

@ -665,9 +665,9 @@ trait Tools
public static function isArrayOrAlike($var): bool public static function isArrayOrAlike($var): bool
{ {
return \is_array($var) || return \is_array($var) ||
($var instanceof ArrayAccess && ($var instanceof \ArrayAccess &&
$var instanceof Traversable && $var instanceof \Traversable &&
$var instanceof Countable); $var instanceof \Countable);
} }
/** /**