Fix websockes

This commit is contained in:
Daniil Gentili 2019-04-20 17:18:45 +02:00
parent 56e8ed7add
commit 6636b7dfa6
6 changed files with 126 additions and 27 deletions

2
docs

@ -1 +1 @@
Subproject commit 24c6ea38c0735a265d47c04eb0d1b8d20c1b585f
Subproject commit 2566f9ab998e645217654356792f82c9e131a06e

View File

@ -248,7 +248,6 @@ class DataCenter
continue;
}
$address = $this->dclist[$test][$ipv6][$dc_number]['ip_address'];
$port = $this->dclist[$test][$ipv6][$dc_number]['port'];
@ -276,7 +275,7 @@ class DataCenter
}
$path = $this->settings[$dc_config_number]['test_mode'] ? 'apiws_test' : 'apiws';
$uri = 'tcp://'.$subdomain.'.web.telegram.org:'.$port.'/'.$path;
$uri = 'tcp://' . $subdomain . '.'.'web.telegram.org'.':' . $port . '/' . $path;
} elseif ($combo[1][0] === WsStream::getName()) {
$subdomain = $this->dclist['ssl_subdomains'][preg_replace('/\D+/', '', $dc_number)];
if (strpos($dc_number, '_media') !== false) {

View File

@ -142,6 +142,7 @@ class ReadLoop extends SignalLoop
return $payload;
}
$auth_key_id = yield $buffer->bufferRead(8);
if ($auth_key_id === "\0\0\0\0\0\0\0\0") {
$message_id = yield $buffer->bufferRead(8);
if (!in_array($message_id, [1, 0])) {
@ -162,6 +163,7 @@ class ReadLoop extends SignalLoop
$message_key = yield $buffer->bufferRead(16);
list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $connection->temp_auth_key['auth_key'], false);
$encrypted_data = yield $buffer->bufferRead($payload_length - 24);
$protocol_padding = strlen($encrypted_data) % 16;
if ($protocol_padding) {
$encrypted_data = substr($encrypted_data, 0, -$protocol_padding);

View File

@ -46,6 +46,11 @@ class DefaultStream extends Socket implements RawStreamInterface
{
}
public function getStream()
{
return $this->stream;
}
public function connectAsync(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator
{
if ($ctx->isSecure()) {

View File

@ -18,14 +18,22 @@
namespace danog\MadelineProto\Stream\Transport;
use Amp\Http\Rfc7230;
use Amp\Http\Status;
use Amp\Promise;
use Amp\Websocket\Handshake;
use Amp\Websocket\Options;
use Amp\Websocket\Rfc6455Connection;
use Amp\Socket\ConnectException;
use Amp\Websocket\Client\ConnectionException;
use Amp\Websocket\Client\Handshake;
use Amp\Websocket\Client\Internal\ClientSocket;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\Client\Rfc6455Connection;
use Amp\Websocket\Rfc7692CompressionFactory;
use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\RawStreamInterface;
use danog\MadelineProto\Tools;
use function Amp\Websocket\generateKey;
use function Amp\Websocket\validateAcceptForKey;
/**
* Websocket stream wrapper.
@ -49,24 +57,37 @@ class WsStream implements RawStreamInterface
*/
public function connectAsync(ConnectionContext $ctx, string $header = ''): \Generator
{
$this->stream = yield $ctx->getStream();
$handshake = new Handshake($ctx->getStringUri());
$this->dc = $ctx->getIntDc();
$stream = yield $ctx->getStream();
$resource = $stream->getStream()->getResource();
yield $this->stream->write($handshake->generateRequest());
$this->compressionFactory = new Rfc7692CompressionFactory;
$handshake = new Handshake(str_replace('tcp://', $ctx->isSecure() ? 'ws://' : 'wss://', $ctx->getStringUri()));
$key = generateKey();
yield $stream->write($this->generateRequest($handshake, $key));
$buffer = '';
while (($chunk = yield $this->stream->read()) !== null) {
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
if ($position = \strpos($buffer, "\r\n\r\n")) {
$headerBuffer = \substr($buffer, 0, $position + 4);
$buffer = \substr($buffer, $position + 4);
$headers = $handshake->decodeResponse($headerBuffer);
$this->stream = new Rfc6455Connection($this->stream, $headers, $buffer, new Options());
$headers = $this->handleResponse($headerBuffer, $key);
$client = new Rfc6455Client(
new ClientSocket($resource, $buffer),
$handshake->getOptions(),
true
);
$this->stream = new Rfc6455Connection($client, $headers);
//$this->stream = new Rfc6455Connection($this->stream, $headers, $buffer);
break;
}
}
if (!$this->stream) {
throw new WebSocketException('Failed to read response from server');
throw new ConnectionException('Failed to read response from server');
}
yield $this->write($header);
}
@ -78,18 +99,22 @@ class WsStream implements RawStreamInterface
{
try {
$this->stream->close();
} catch (\Amp\Websocket\ClosedException $e) {
} catch (Exception $e) {
}
}
public function readAsync(): \Generator
{
try {
if (!$this->message || ($data = yield $this->message->read()) === null) {
if (!$this->message || ($data = yield $this->message->buffer()) === null) {
$this->message = yield $this->stream->receive();
$data = yield $this->message->read();
if (!$this->message) {
return null;
}
} catch (\Amp\Websocket\ClosedException $e) {
$data = yield $this->message->buffer();
$this->message = null;
}
} catch (Exception $e) {
if ($e->getReason() !== 'Client closed the underlying TCP connection') {
throw $e;
}
@ -112,6 +137,74 @@ class WsStream implements RawStreamInterface
return $this->stream->sendBinary($data);
}
private function generateRequest(Handshake $handshake, string $key): string
{
$uri = $handshake->getUri();
$headers = $handshake->getHeaders();
$headers['host'] = [$uri->getAuthority()];
$headers['connection'] = ['Upgrade'];
$headers['upgrade'] = ['websocket'];
$headers['sec-websocket-version'] = ['13'];
$headers['sec-websocket-key'] = [$key];
if ($handshake->getOptions()->isCompressionEnabled()) {
$headers['sec-websocket-extensions'] = [$this->compressionFactory->createRequestHeader()];
}
if (($path = $uri->getPath()) === '') {
$path = '/';
}
if (($query = $uri->getQuery()) !== '') {
$path .= '?' . $query;
}
return \sprintf("GET %s HTTP/1.1\r\n%s\r\n", $path, Rfc7230::formatHeaders($headers));
}
private function handleResponse(string $headerBuffer, string $key): array
{
if (\substr($headerBuffer, -4) !== "\r\n\r\n") {
throw new ConnectException('Invalid header provided');
}
$position = \strpos($headerBuffer, "\r\n");
$startLine = \substr($headerBuffer, 0, $position);
if (!\preg_match("/^HTTP\/(1\.[01]) (\d{3}) ([^\x01-\x08\x10-\x19]*)$/i", $startLine, $matches)) {
throw new ConnectException('Invalid response start line: ' . $startLine);
}
$version = $matches[1];
$status = (int) $matches[2];
$reason = $matches[3];
if ($version !== '1.1' || $status !== Status::SWITCHING_PROTOCOLS) {
throw new ConnectionException(
\sprintf('Did not receive switching protocols response: %d %s on DC %d', $status, $reason, $this->dc),
$status
);
}
$headerBuffer = \substr($headerBuffer, $position + 2, -2);
$headers = Rfc7230::parseHeaders($headerBuffer);
$upgrade = $headers['upgrade'][0] ?? '';
if (\strtolower($upgrade) !== 'websocket') {
throw new ConnectionException('Missing "Upgrade: websocket" header');
}
$connection = $headers['connection'][0] ?? '';
if (!\in_array('upgrade', \array_map('trim', \array_map('strtolower', \explode(',', $connection))), true)) {
throw new ConnectionException('Missing "Connection: upgrade" header');
}
$secWebsocketAccept = $headers['sec-websocket-accept'][0] ?? '';
if (!validateAcceptForKey($secWebsocketAccept, $key)) {
throw new ConnectionException('Invalid "Sec-WebSocket-Accept" header');
}
return $headers;
}
final protected function createCompressionContext(array $headers): ?Websocket\CompressionContext
{
$extensions = $headers['sec-websocket-extensions'][0] ?? '';
$extensions = \array_map('trim', \explode(',', $extensions));
foreach ($extensions as $extension) {
if ($compressionContext = $this->compressionFactory->fromServerHeader($extension)) {
return $compressionContext;
}
}
return null;
}
public static function getName(): string
{
return __CLASS__;