diff --git a/docs b/docs index 24c6ea38..2566f9ab 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 24c6ea38c0735a265d47c04eb0d1b8d20c1b585f +Subproject commit 2566f9ab998e645217654356792f82c9e131a06e diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index d68c7450..535dc8ba 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -97,9 +97,9 @@ class DataCenter return true; } catch (\Throwable $e) { - $this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR); + $this->API->logger->logger('Connection failed: ' . $e->getMessage(), \danog\MadelineProto\Logger::ERROR); } catch (\Exception $e) { - $this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR); + $this->API->logger->logger('Connection failed: ' . $e->getMessage(), \danog\MadelineProto\Logger::ERROR); } } @@ -203,7 +203,7 @@ class DataCenter throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['proxy_class_invalid']); } if ($proxy === ObfuscatedStream::getName() && in_array(strlen($extra['secret']), [17, 34])) { - $combos []= [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [$proxy, $extra], [IntermediatePaddedStream::getName(), []]]; + $combos[] = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [$proxy, $extra], [IntermediatePaddedStream::getName(), []]]; } foreach ($combos as $k => $orig) { $combo = []; @@ -248,7 +248,6 @@ class DataCenter continue; } - $address = $this->dclist[$test][$ipv6][$dc_number]['ip_address']; $port = $this->dclist[$test][$ipv6][$dc_number]['port']; @@ -262,11 +261,11 @@ class DataCenter } $path = $this->settings[$dc_config_number]['test_mode'] ? 'apiw_test1' : 'apiw1'; - $uri = 'tcp://'.$subdomain.'.web.telegram.org:'.$port.'/'.$path; + $uri = 'tcp://' . $subdomain . '.web.telegram.org:' . $port . '/' . $path; } elseif ($stream === HttpStream::getName()) { - $uri = 'tcp://'.$address.':'.$port.'/api'; + $uri = 'tcp://' . $address . ':' . $port . '/api'; } else { - $uri = 'tcp://'.$address.':'.$port; + $uri = 'tcp://' . $address . ':' . $port; } if ($combo[1][0] === WssStream::getName()) { @@ -276,7 +275,7 @@ class DataCenter } $path = $this->settings[$dc_config_number]['test_mode'] ? 'apiws_test' : 'apiws'; - $uri = 'tcp://'.$subdomain.'.web.telegram.org:'.$port.'/'.$path; + $uri = 'tcp://' . $subdomain . '.'.'web.telegram.org'.':' . $port . '/' . $path; } elseif ($combo[1][0] === WsStream::getName()) { $subdomain = $this->dclist['ssl_subdomains'][preg_replace('/\D+/', '', $dc_number)]; if (strpos($dc_number, '_media') !== false) { @@ -285,7 +284,7 @@ class DataCenter $path = $this->settings[$dc_config_number]['test_mode'] ? 'apiws_test' : 'apiws'; //$uri = 'tcp://' . $subdomain . '.web.telegram.org:' . $port . '/' . $path; - $uri = 'tcp://'.$address.':'.$port.'/'.$path; + $uri = 'tcp://' . $address . ':' . $port . '/' . $path; } /** @var $ctx \danog\MadelineProto\Stream\ConnectionContext */ @@ -304,8 +303,8 @@ class DataCenter } } - if (isset($this->dclist[$test][$ipv6][$dc_number.'_bk']['ip_address'])) { - $ctxs = array_merge($ctxs, $this->generate_contexts($dc_number.'_bk')); + if (isset($this->dclist[$test][$ipv6][$dc_number . '_bk']['ip_address'])) { + $ctxs = array_merge($ctxs, $this->generate_contexts($dc_number . '_bk')); } if (empty($ctxs)) { diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index 09ab3ca2..bc2c328e 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -142,6 +142,7 @@ class ReadLoop extends SignalLoop return $payload; } $auth_key_id = yield $buffer->bufferRead(8); + if ($auth_key_id === "\0\0\0\0\0\0\0\0") { $message_id = yield $buffer->bufferRead(8); if (!in_array($message_id, [1, 0])) { @@ -162,6 +163,7 @@ class ReadLoop extends SignalLoop $message_key = yield $buffer->bufferRead(16); list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $connection->temp_auth_key['auth_key'], false); $encrypted_data = yield $buffer->bufferRead($payload_length - 24); + $protocol_padding = strlen($encrypted_data) % 16; if ($protocol_padding) { $encrypted_data = substr($encrypted_data, 0, -$protocol_padding); diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 0b493ffa..a3c04b1b 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -98,7 +98,7 @@ class WriteLoop extends ResumableSignalLoop $pad_length = -$length & 15; $pad_length += 16 * $this->random_int($modulus = 16); - + $pad = $this->random($pad_length); $buffer = yield $connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length); diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index 46feef9f..9c2a0e5d 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -46,6 +46,11 @@ class DefaultStream extends Socket implements RawStreamInterface { } + public function getStream() + { + return $this->stream; + } + public function connectAsync(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator { if ($ctx->isSecure()) { diff --git a/src/danog/MadelineProto/Stream/Transport/WsStream.php b/src/danog/MadelineProto/Stream/Transport/WsStream.php index 570235f2..582c5627 100644 --- a/src/danog/MadelineProto/Stream/Transport/WsStream.php +++ b/src/danog/MadelineProto/Stream/Transport/WsStream.php @@ -18,14 +18,22 @@ namespace danog\MadelineProto\Stream\Transport; +use Amp\Http\Rfc7230; +use Amp\Http\Status; use Amp\Promise; -use Amp\Websocket\Handshake; -use Amp\Websocket\Options; -use Amp\Websocket\Rfc6455Connection; +use Amp\Socket\ConnectException; +use Amp\Websocket\Client\ConnectionException; +use Amp\Websocket\Client\Handshake; +use Amp\Websocket\Client\Internal\ClientSocket; +use Amp\Websocket\Rfc6455Client; +use Amp\Websocket\Client\Rfc6455Connection; +use Amp\Websocket\Rfc7692CompressionFactory; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\RawStreamInterface; use danog\MadelineProto\Tools; +use function Amp\Websocket\generateKey; +use function Amp\Websocket\validateAcceptForKey; /** * Websocket stream wrapper. @@ -49,24 +57,37 @@ class WsStream implements RawStreamInterface */ public function connectAsync(ConnectionContext $ctx, string $header = ''): \Generator { - $this->stream = yield $ctx->getStream(); - $handshake = new Handshake($ctx->getStringUri()); + $this->dc = $ctx->getIntDc(); + $stream = yield $ctx->getStream(); + $resource = $stream->getStream()->getResource(); - yield $this->stream->write($handshake->generateRequest()); + $this->compressionFactory = new Rfc7692CompressionFactory; + + $handshake = new Handshake(str_replace('tcp://', $ctx->isSecure() ? 'ws://' : 'wss://', $ctx->getStringUri())); + + $key = generateKey(); + yield $stream->write($this->generateRequest($handshake, $key)); $buffer = ''; - while (($chunk = yield $this->stream->read()) !== null) { + while (($chunk = yield $stream->read()) !== null) { $buffer .= $chunk; if ($position = \strpos($buffer, "\r\n\r\n")) { $headerBuffer = \substr($buffer, 0, $position + 4); $buffer = \substr($buffer, $position + 4); - $headers = $handshake->decodeResponse($headerBuffer); - $this->stream = new Rfc6455Connection($this->stream, $headers, $buffer, new Options()); + $headers = $this->handleResponse($headerBuffer, $key); + + $client = new Rfc6455Client( + new ClientSocket($resource, $buffer), + $handshake->getOptions(), + true + ); + $this->stream = new Rfc6455Connection($client, $headers); + //$this->stream = new Rfc6455Connection($this->stream, $headers, $buffer); break; } } if (!$this->stream) { - throw new WebSocketException('Failed to read response from server'); + throw new ConnectionException('Failed to read response from server'); } yield $this->write($header); } @@ -78,18 +99,22 @@ class WsStream implements RawStreamInterface { try { $this->stream->close(); - } catch (\Amp\Websocket\ClosedException $e) { + } catch (Exception $e) { } } public function readAsync(): \Generator { try { - if (!$this->message || ($data = yield $this->message->read()) === null) { + if (!$this->message || ($data = yield $this->message->buffer()) === null) { $this->message = yield $this->stream->receive(); - $data = yield $this->message->read(); + if (!$this->message) { + return null; + } + $data = yield $this->message->buffer(); + $this->message = null; } - } catch (\Amp\Websocket\ClosedException $e) { + } catch (Exception $e) { if ($e->getReason() !== 'Client closed the underlying TCP connection') { throw $e; } @@ -112,6 +137,74 @@ class WsStream implements RawStreamInterface return $this->stream->sendBinary($data); } + private function generateRequest(Handshake $handshake, string $key): string + { + $uri = $handshake->getUri(); + $headers = $handshake->getHeaders(); + $headers['host'] = [$uri->getAuthority()]; + $headers['connection'] = ['Upgrade']; + $headers['upgrade'] = ['websocket']; + $headers['sec-websocket-version'] = ['13']; + $headers['sec-websocket-key'] = [$key]; + if ($handshake->getOptions()->isCompressionEnabled()) { + $headers['sec-websocket-extensions'] = [$this->compressionFactory->createRequestHeader()]; + } + if (($path = $uri->getPath()) === '') { + $path = '/'; + } + if (($query = $uri->getQuery()) !== '') { + $path .= '?' . $query; + } + return \sprintf("GET %s HTTP/1.1\r\n%s\r\n", $path, Rfc7230::formatHeaders($headers)); + } + private function handleResponse(string $headerBuffer, string $key): array + { + if (\substr($headerBuffer, -4) !== "\r\n\r\n") { + throw new ConnectException('Invalid header provided'); + } + $position = \strpos($headerBuffer, "\r\n"); + $startLine = \substr($headerBuffer, 0, $position); + if (!\preg_match("/^HTTP\/(1\.[01]) (\d{3}) ([^\x01-\x08\x10-\x19]*)$/i", $startLine, $matches)) { + throw new ConnectException('Invalid response start line: ' . $startLine); + } + $version = $matches[1]; + $status = (int) $matches[2]; + $reason = $matches[3]; + + + if ($version !== '1.1' || $status !== Status::SWITCHING_PROTOCOLS) { + throw new ConnectionException( + \sprintf('Did not receive switching protocols response: %d %s on DC %d', $status, $reason, $this->dc), + $status + ); + } + $headerBuffer = \substr($headerBuffer, $position + 2, -2); + $headers = Rfc7230::parseHeaders($headerBuffer); + $upgrade = $headers['upgrade'][0] ?? ''; + if (\strtolower($upgrade) !== 'websocket') { + throw new ConnectionException('Missing "Upgrade: websocket" header'); + } + $connection = $headers['connection'][0] ?? ''; + if (!\in_array('upgrade', \array_map('trim', \array_map('strtolower', \explode(',', $connection))), true)) { + throw new ConnectionException('Missing "Connection: upgrade" header'); + } + $secWebsocketAccept = $headers['sec-websocket-accept'][0] ?? ''; + if (!validateAcceptForKey($secWebsocketAccept, $key)) { + throw new ConnectionException('Invalid "Sec-WebSocket-Accept" header'); + } + return $headers; + } + final protected function createCompressionContext(array $headers): ?Websocket\CompressionContext + { + $extensions = $headers['sec-websocket-extensions'][0] ?? ''; + $extensions = \array_map('trim', \explode(',', $extensions)); + foreach ($extensions as $extension) { + if ($compressionContext = $this->compressionFactory->fromServerHeader($extension)) { + return $compressionContext; + } + } + return null; + } public static function getName(): string { return __CLASS__;