Implement async HTTP request module, fix TLS handshakes in proxies, general proxy improvements and cleanups

This commit is contained in:
Daniil Gentili 2019-05-10 20:01:39 +02:00
parent d26fa7ef12
commit a890789a5d
31 changed files with 251 additions and 1485 deletions

View File

@ -54,8 +54,10 @@ class EventHandler extends \danog\MadelineProto\EventHandler
}
}
}
$settings = ['logger' => ['logger_level' => 5]];
$MadelineProto = new \danog\MadelineProto\API('bot.madeline', ['logger' => ['logger_level' => 5]]);
$MadelineProto = new \danog\MadelineProto\API('bot.madeline', $settings);
$MadelineProto->async(true);
$MadelineProto->loop(function () use ($MadelineProto) {
yield $MadelineProto->start();

View File

@ -28,7 +28,8 @@
"amphp/file": "dev-master#5a69fca406ac5fd220de0aa68c887bc8046eb93c as 0.3.3",
"amphp/uri": "dev-master#f3195b163275383909ded7770a11d8eb865cbc86 as 0.1.3",
"amphp/websocket": "dev-master",
"amphp/websocket-client": "dev-master"
"amphp/websocket-client": "dev-master",
"amphp/artax": "^3.0"
},
"require-dev": {
"phpdocumentor/reflection-docblock": "^3.1",
@ -51,15 +52,6 @@
},
"files": [
"src/BigIntegor.php",
"src/Socket.php",
"src/Collectable.php",
"src/Threaded.php",
"src/Volatile.php",
"src/Thread.php",
"src/Worker.php",
"src/Pool.php",
"src/HttpProxy.php",
"src/SocksProxy.php",
"src/YieldReturnValue.php"
]
}

2
docs

@ -1 +1 @@
Subproject commit 8dee62b19a4e2c8d03f5602711587403b5f14d75
Subproject commit e8e22ca165ac1bc8a7435d5d55b3e94b1883d9d8

View File

@ -1,8 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
interface Collectable
{
public function isGarbage();
}
}

View File

@ -1,215 +0,0 @@
<?php
/**
* CustomHTTPProxy module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
class CustomHTTPProxy implements \danog\MadelineProto\Proxy
{
private $sock;
private $protocol;
private $timeout = ['sec' => 0, 'usec' => 0];
private $domain;
private $type;
private $options = [];
private $use_connect = false;
private $use_ssl = false;
public function __construct($domain, $type, $protocol)
{
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol === PHP_INT_MAX ? 'tls' : 'tcp';
if ($protocol === PHP_INT_MAX) { /* https */
$this->use_connect = $this->use_ssl = true;
} elseif ($protocol !== PHP_INT_MAX - 1) { /* http */
$this->use_connect = true;
}
}
public function __destruct()
{
if ($this->sock !== null) {
fclose($this->sock);
$this->sock = null;
}
}
public function accept()
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function bind($address, $port = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function close()
{
fclose($this->sock);
$this->sock = null;
}
public function connect($address, $port = 0)
{
$errno = 0;
$errstr = '';
if (isset($this->options['host']) && isset($this->options['port'])) {
$this->sock = @fsockopen($this->options['host'], $this->options['port'], $errno, $errstr, $this->timeout['sec'] + ($this->timeout['usec'] / 1000000));
} else {
$this->sock = @fsockopen($address, $port, $errno, $errstr, $this->timeout['sec'] + ($this->timeout['usec'] / 1000000));
}
stream_set_timeout($this->sock, $this->timeout['sec'], $this->timeout['usec']);
if (isset($this->options['host']) && isset($this->options['port']) &&
true === $this->use_connect) {
if ($this->domain === AF_INET6 && strpos($address, ':') !== false) {
$address = '['.$address.']';
}
fwrite($this->sock, 'CONNECT '.$address.':'.$port." HTTP/1.1\r\n".
"Accept: */*\r\n".
'Host: '.$address.':'.$port."\r\n".
$this->getProxyAuthHeader().
"connection: keep-Alive\r\n".
"\r\n");
$response = '';
$status = false;
while ($line = @fgets($this->sock)) {
$status = $status || (strpos($line, 'HTTP') !== false);
if ($status) {
$response .= $line;
if (!rtrim($line)) {
break;
}
}
}
if (substr($response, 0, 13) !== 'HTTP/1.1 200 ') {
return false;
}
}
if (true === $this->use_ssl) {
$modes = [
STREAM_CRYPTO_METHOD_TLS_CLIENT,
STREAM_CRYPTO_METHOD_SSLv3_CLIENT,
STREAM_CRYPTO_METHOD_SSLv23_CLIENT,
STREAM_CRYPTO_METHOD_SSLv2_CLIENT,
];
$contextOptions = [
'ssl' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
];
stream_context_set_option($this->sock, $contextOptions);
$success = false;
foreach ($modes as $mode) {
$success = stream_socket_enable_crypto($this->sock, true, $mode);
if ($success) {
return true;
}
}
return false;
}
return true;
}
public function getOption($level, $name)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getPeerName($port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getSockName($port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function listen($backlog = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function read($length, $flags = 0)
{
return stream_get_contents($this->sock, $length);
}
public function select(array &$read, array &$write, array &$except, $tv_sec, $tv_usec = 0)
{
return stream_select($read, $write, $except, $tv_sec, $tv_usec);
}
public function send($data, $length, $flags)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function setBlocking($blocking)
{
return stream_set_blocking($this->sock, $blocking);
}
public function setOption($level, $name, $value)
{
if (in_array($name, [\SO_RCVTIMEO, \SO_SNDTIMEO])) {
$this->timeout = ['sec' => (int) $value, 'usec' => (int) (($value - (int) $value) * 1000000)];
return true;
}
throw new \danog\MadelineProto\Exception('Not supported');
}
public function write($buffer, $length = -1)
{
return $length === -1 ? fwrite($this->sock, $buffer) : fwrite($this->sock, $buffer, $length);
}
private function getProxyAuthHeader()
{
if (!isset($this->options['user']) || !isset($this->options['pass'])) {
return '';
}
return 'Proxy-Authorization: Basic '.base64_encode($this->options['user'].':'.$this->options['pass'])."\r\n";
}
public function getProxyHeaders()
{
return ($this->use_connect === true) ? '' : $this->getProxyAuthHeader();
}
public function setExtra(array $extra = [])
{
$this->options = $extra;
}
public function getResource()
{
return $this->sock->getResource();
}
}

View File

@ -1,194 +0,0 @@
<?php
/**
* HttpProxy module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
class HttpProxy implements \danog\MadelineProto\Proxy
{
private $domain;
private $type;
private $protocol;
private $extra;
private $sock;
public function __construct(int $domain, int $type, int $protocol)
{
if (!in_array($domain, [AF_INET, AF_INET6])) {
throw new \danog\MadelineProto\Exception('Wrong protocol family provided');
}
if (!in_array($type, [SOCK_STREAM])) {
throw new \danog\MadelineProto\Exception('Wrong connection type provided');
}
if (!in_array($protocol, [getprotobyname('tcp'), PHP_INT_MAX])) {
throw new \danog\MadelineProto\Exception('Wrong protocol provided');
}
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol;
}
public function setExtra(array $extra = [])
{
$this->extra = $extra;
$name = $this->protocol === PHP_INT_MAX ? '\\FSocket' : '\\Socket';
$this->sock = new $name(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol);
}
public function setOption(int $level, int $name, $value)
{
return $this->sock->setOption($level, $name, $value);
}
public function getOption(int $level, int $name)
{
return $this->sock->getOption($level, $name);
}
public function setBlocking(bool $blocking)
{
return $this->sock->setBlocking($blocking);
}
public function bind(string $address, int $port = 0)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function listen(int $backlog = 0)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function accept()
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
return $this->sock->select($read, $write, $except, $tv_sec, $tv_usec);
}
public function connect(string $address, int $port = 0)
{
$this->sock->connect($this->extra['address'], $this->extra['port']);
try {
if (strlen(inet_pton($address)) !== 4) {
$address = '['.$address.']';
}
} catch (\danog\MadelineProto\Exception $e) {
}
$this->sock->write("CONNECT $address:$port HTTP/1.1\r\nHost: $address:$port\r\nAccept: */*\r\n".$this->getProxyAuthHeader()."Connection: keep-Alive\r\n\r\n");
$response = $this->read_http_payload();
if ($response['code'] !== 200) {
\danog\MadelineProto\Logger::log(trim($response['body']));
throw new \danog\MadelineProto\Exception($response['description'], $response['code']);
}
\danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http');
return true;
}
public function read_http_line()
{
$line = $lastchar = $curchar = '';
while ($lastchar.$curchar !== "\r\n") {
$line .= $lastchar;
$lastchar = $curchar;
$curchar = $this->sock->read(1);
}
return $line;
}
public function read_http_payload()
{
list($protocol, $code, $description) = explode(' ', $this->read_http_line(), 3);
list($protocol, $protocol_version) = explode('/', $protocol);
if ($protocol !== 'HTTP') {
throw new \danog\MadelineProto\Exception('Wrong protocol');
}
$code = (int) $code;
$headers = [];
while (strlen($current_header = $this->read_http_line())) {
$current_header = explode(':', $current_header, 2);
$headers[strtolower($current_header[0])] = trim($current_header[1]);
}
$read = '';
if (isset($headers['content-length'])) {
$read = $this->sock->read((int) $headers['content-length']);
}/* elseif (isset($headers['transfer-encoding']) && $headers['transfer-encoding'] === 'chunked') {
do {
$length = hexdec($this->read_http_line());
$read .= $this->sock->read($length);
$this->read_http_line();
} while ($length);
}*/
return ['protocol' => $protocol, 'protocol_version' => $protocol_version, 'code' => $code, 'description' => $description, 'body' => $read, 'headers' => $headers];
}
public function read(int $length, int $flags = 0)
{
return $this->sock->read($length, $flags);
}
public function write(string $buffer, int $length = -1)
{
return $this->sock->write($buffer, $length);
}
public function send(string $data, int $length, int $flags)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function close()
{
$this->sock->close();
}
public function getPeerName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function getSockName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
private function getProxyAuthHeader()
{
if (!isset($this->extra['username']) || !isset($this->extra['password'])) {
return '';
}
return 'Proxy-Authorization: Basic '.base64_encode($this->extra['username'].':'.$this->extra['password'])."\r\n";
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock->getResource();
}
}

View File

@ -1,69 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
class Pool
{
public function __construct($size, $class = \Worker::class, $ctor = [])
{
$this->size = $size;
$this->clazz = $class;
$this->ctor = $ctor;
}
public function submit(Threaded $collectable)
{
if ($this->last > $this->size) {
$this->last = 0;
}
if (!isset($this->workers[$this->last])) {
$this->workers[$this->last] =
new $this->clazz(...$this->ctor);
$this->workers[$this->last]->start();
}
$this->workers[$this->last++]->stack($collectable);
}
public function submitTo($worker, Threaded $collectable)
{
if (isset($this->workers[$worker])) {
$this->workers[$worker]->stack($collectable);
}
}
public function collect(Closure $collector = null)
{
$total = 0;
foreach ($this->workers as $worker) {
$total += $worker->collect($collector);
}
return $total;
}
public function resize($size)
{
if ($size < $this->size) {
while ($this->size > $size) {
if (isset($this->workers[$this->size - 1])) {
$this->workers[$this->size - 1]->shutdown();
}
unset($this->workers[$this->size - 1]);
$this->size--;
}
}
}
public function shutdown()
{
$this->workers = null;
}
protected $workers;
protected $size;
protected $last;
protected $clazz;
protected $ctor;
}
}

View File

@ -1,392 +0,0 @@
<?php
/**
* Socket module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
class FSocket
{
private $sock;
private $protocol;
private $timeout = ['sec' => 0, 'usec' => 0];
private $blocking = false;
private $domain;
private $type;
public function __construct(int $domain, int $type, int $protocol)
{
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol === PHP_INT_MAX ? 'tls' : ($protocol === PHP_INT_MAX - 1 ? 'tcp' : getprotobynumber($protocol));
}
public function __destruct()
{
if ($this->sock !== null) {
fclose($this->sock);
}
}
public function setOption(int $level, int $name, $value)
{
if (in_array($name, [\SO_RCVTIMEO, \SO_SNDTIMEO])) {
$this->timeout = ['sec' => (int) $value, 'usec' => (int) (($value - (int) $value) * 1000000)];
return true;
}
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getOption(int $level, int $name)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function setBlocking(bool $blocking)
{
return stream_set_blocking($this->sock, $blocking);
}
public function bind(string $address, int $port = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function listen(int $backlog = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function accept()
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function connect(string $address, int $port = -1)
{
if ($this->domain === AF_INET6 && strpos($address, ':') !== false) {
$address = '['.$address.']';
}
$errno = 0;
$errstr = '';
$this->sock = fsockopen($this->protocol.'://'.$address, $port, $errno, $errstr, $this->timeout['sec'] + ($this->timeout['usec'] / 1000000));
stream_set_timeout($this->sock, $this->timeout['sec'], $this->timeout['usec']);
return true;
}
public static function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
$actual_read = [];
foreach ($read as $key => $resource) {
$actual_read[$key] = $resource->getResource();
}
$actual_write = [];
foreach ($write as $key => $resource) {
$actual_write[$key] = $resource->getResource();
}
$actual_except = [];
foreach ($except as $key => $resource) {
$actual_except[$key] = $resource->getResource();
}
$res = stream_select($actual_read, $actual_write, $actual_except, $tv_sec, $tv_usec);
foreach ($read as $key => $resource) {
if (!isset($actual_read[$key])) {
unset($read[$key]);
}
}
foreach ($write as $key => $resource) {
if (!isset($actual_write[$key])) {
unset($write[$key]);
}
}
foreach ($except as $key => $resource) {
if (!isset($actual_except[$key])) {
unset($except[$key]);
}
}
return $res;
}
public function read(int $length, int $flags = 0)
{
$packet = '';
$try = 0;
while (($current_length = strlen($packet)) < $length) {
$read = stream_get_contents($this->sock, $length - $current_length);
if ($read === false || (strlen($read) === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing in the socket!');
}
$packet .= $read;
$try++;
}
return $packet;
}
public function write(string $buffer, int $length = -1)
{
if ($length === -1) {
$length = strlen($buffer);
} else {
$buffer = substr($buffer, 0, $length);
}
$try = 0;
$wrote = fwrite($this->sock, $buffer, $length);
while ($wrote < $length) {
$wrote_now = fwrite($this->sock, substr($buffer, $wrote), $length - $wrote);
if ($wrote_now === false || ($wrote_now === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing could be written in the socket!');
}
$wrote += $wrote_now;
$try++;
}
return $wrote;
}
public function send(string $data, int $length, int $flags)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function close()
{
fclose($this->sock);
$this->sock = null;
}
public function getPeerName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getSockName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function setExtra(array $extra = [])
{
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock;
}
}
if (!extension_loaded('pthreads')) {
if (extension_loaded('sockets')) {
class SocketBase
{
private $sock;
public function __construct($sock)
{
$this->sock = $sock;
}
public function __destruct()
{
socket_close($this->sock);
unset($this->sock);
}
public function setOption(int $level, int $name, $value)
{
if (in_array($name, [\SO_RCVTIMEO, \SO_SNDTIMEO])) {
$value = ['sec' => (int) $value, 'usec' => (int) (($value - (int) $value) * 1000000)];
}
return socket_set_option($this->sock, $level, $name, $value);
}
public function getOption(int $level, int $name)
{
return socket_get_option($this->sock, $level, $name);
}
public function setBlocking(bool $blocking)
{
if ($blocking) {
return socket_set_block($this->sock);
}
return socket_set_nonblock($this->sock);
}
public function bind(string $address, int $port = 0)
{
return socket_bind($this->sock, $address, $port);
}
public function listen(int $backlog = 0)
{
return socket_listen($this->sock, $backlog);
}
public function accept()
{
if ($socket = socket_accept($this->sock)) {
return new self($socket);
} else {
return $socket;
}
}
public function connect(string $address, int $port = 0)
{
return socket_connect($this->sock, $address, $port);
}
public static function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
$actual_read = [];
foreach ($read as $key => $resource) {
$actual_read[$key] = $resource->getResource();
}
$actual_write = [];
foreach ($write as $key => $resource) {
$actual_write[$key] = $resource->getResource();
}
$actual_except = [];
foreach ($except as $key => $resource) {
$actual_except[$key] = $resource->getResource();
}
$res = socket_select($actual_read, $actual_write, $actual_except, $tv_sec, $tv_usec);
foreach ($read as $key => $resource) {
if (!isset($actual_read[$key])) {
unset($read[$key]);
}
}
foreach ($write as $key => $resource) {
if (!isset($actual_write[$key])) {
unset($write[$key]);
}
}
foreach ($except as $key => $resource) {
if (!isset($actual_except[$key])) {
unset($except[$key]);
}
}
return $res;
}
public function read(int $length, int $flags = 0)
{
$packet = '';
$try = 0;
while (strlen($packet) < $length) {
$read = socket_read($this->sock, $length - strlen($packet), $flags);
if ($read === false || (strlen($read) === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing in the socket!');
}
$packet .= $read;
$try++;
}
return $packet;
}
public function write(string $buffer, int $length = -1)
{
if ($length === -1) {
$length = strlen($buffer);
} else {
$buffer = substr($buffer, 0, $length);
}
$try = 0;
$wrote = socket_write($this->sock, $buffer, $length);
while ($wrote < $length) {
$wrote_now = socket_write($this->sock, substr($buffer, $wrote), $length - $wrote);
if ($wrote_now === false || ($wrote_now === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing could be written in the socket!');
}
$wrote += $wrote_now;
$try++;
}
return $wrote;
}
public function send(string $data, int $length, int $flags)
{
return socket_send($data, $length, $flags);
}
public function close()
{
socket_close($this->sock);
$this->sock = null;
}
public function getPeerName(bool $port = true)
{
$address = '';
$port = 0;
$port ? socket_getpeername($this->sock, $address, $ip) : socket_getpeername($this->sock, $address);
return $port ? ['host' => $address, 'port' => $port] : ['host' => $address];
}
public function getSockName(bool $port = true)
{
$address = '';
$port = 0;
$port ? socket_getsockname($this->sock, $address, $ip) : socket_getsockname($this->sock, $address);
return $port ? ['host' => $address, 'port' => $port] : ['host' => $address];
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock;
}
}
class Socket extends SocketBase
{
public function __construct(int $domain, int $type, int $protocol)
{
parent::__construct(socket_create($domain, $type, $protocol));
}
}
} else {
define('AF_INET', 0);
define('AF_INET6', 1);
define('SOCK_STREAM', 2);
define('SOL_SOCKET', 3);
define('SO_RCVTIMEO', 4);
define('SO_SNDTIMEO', 5);
class Socket extends FSocket
{
}
}
}

View File

@ -1,196 +0,0 @@
<?php
/**
* SocksProxy module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
class SocksProxy implements \danog\MadelineProto\Proxy
{
private $domain;
private $type;
private $protocol;
private $extra;
private $sock;
public function __construct(int $domain, int $type, int $protocol)
{
if (!in_array($domain, [AF_INET, AF_INET6])) {
throw new \danog\MadelineProto\Exception('Wrong protocol family provided');
}
if (!in_array($type, [SOCK_STREAM, SOCK_DGRAM])) {
throw new \danog\MadelineProto\Exception('Wrong connection type provided');
}
if (!in_array($protocol, [getprotobyname('tcp'), getprotobyname('udp'), PHP_INT_MAX])) {
throw new \danog\MadelineProto\Exception('Wrong protocol provided');
}
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol;
}
public function setExtra(array $extra = [])
{
$this->extra = $extra;
$name = $this->protocol === PHP_INT_MAX ? '\\FSocket' : '\\Socket';
$this->sock = new $name(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol);
}
public function setOption(int $level, int $name, $value)
{
return $this->sock->setOption($level, $name, $value);
}
public function getOption(int $level, int $name)
{
return $this->sock->getOption($level, $name);
}
public function setBlocking(bool $blocking)
{
return $this->sock->setBlocking($blocking);
}
public function bind(string $address, int $port = 0)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function listen(int $backlog = 0)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function accept()
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
return $this->sock->select($read, $write, $except, $tv_sec, $tv_usec);
}
public function connect(string $address, int $port = 0)
{
$this->sock->connect($this->extra['address'], $this->extra['port']);
$methods = chr(0);
if (isset($this->extra['username']) && isset($this->extra['password'])) {
$methods .= chr(2);
}
$this->sock->write(chr(5).chr(strlen($methods)).$methods);
$version = ord($this->sock->read(1));
$method = ord($this->sock->read(1));
if ($version !== 5) {
throw new \danog\MadelineProto\Exception("Wrong SOCKS5 version: $version");
}
if ($method === 2) {
$this->sock->write(chr(1).chr(strlen($this->extra['username'])).$this->extra['username'].chr(strlen($this->extra['password'])).$this->extra['password']);
$version = ord($this->sock->read(1));
if ($version !== 1) {
throw new \danog\MadelineProto\Exception("Wrong authorized SOCKS version: $version");
}
$result = ord($this->sock->read(1));
if ($result !== 0) {
throw new \danog\MadelineProto\Exception("Wrong authorization status: $version");
}
} elseif ($method !== 0) {
throw new \danog\MadelineProto\Exception("Wrong method: $method");
}
$payload = pack('C3', 0x05, 0x01, 0x00);
try {
$ip = inet_pton($address);
$payload .= pack('C1', strlen($ip) === 4 ? 0x01 : 0x04).$ip;
} catch (\danog\MadelineProto\Exception $e) {
$payload .= pack('C2', 0x03, strlen($address)).$address;
}
$payload .= pack('n', $port);
$this->sock->write($payload);
$version = ord($this->sock->read(1));
if ($version !== 5) {
throw new \danog\MadelineProto\Exception("Wrong SOCKS5 version: $version");
}
$rep = ord($this->sock->read(1));
if ($rep !== 0) {
throw new \danog\MadelineProto\Exception("Wrong SOCKS5 rep: $rep");
}
$rsv = ord($this->sock->read(1));
if ($rsv !== 0) {
throw new \danog\MadelineProto\Exception("Wrong socks5 final RSV: $rsv");
}
switch (ord($this->sock->read(1))) {
case 1:
$ip = inet_ntop($this->sock->read(4));
break;
case 4:
$ip = inet_ntop($this->sock->read(16));
break;
case 3:
$ip = $this->sock->read(ord($this->sock->read(1)));
break;
}
$port = unpack('n', $this->sock->read(2))[1];
\danog\MadelineProto\Logger::log(['Connected to '.$ip.':'.$port.' via socks5']);
return true;
}
public function read(int $length, int $flags = 0)
{
return $this->sock->read($length, $flags);
}
public function write(string $buffer, int $length = -1)
{
return $this->sock->write($buffer, $length);
}
public function send(string $data, int $length, int $flags)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function close()
{
$this->sock->close();
}
public function getPeerName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function getSockName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not Implemented');
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock->getResource();
}
}

View File

@ -1,67 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
class Thread extends Threaded
{
public function isStarted()
{
return (bool) ($this->state & self::STARTED);
}
public function isJoined()
{
return (bool) ($this->state & self::JOINED);
}
public function kill()
{
$this->state |= self::ERROR;
return true;
}
public static function getCurrentThreadId()
{
return 1;
}
public function getThreadId()
{
return 1;
}
public function start()
{
if (!isset($this->state)) {
$this->state = 0;
}
if ($this->state & self::STARTED) {
throw new \RuntimeException();
}
$this->state |= self::STARTED;
$this->state |= self::RUNNING;
try {
$this->run();
} catch (Exception $t) {
$this->state |= self::ERROR;
}
$this->state &= ~self::RUNNING;
return true;
}
public function join()
{
if ($this->state & self::JOINED) {
throw new \RuntimeException();
}
$this->state |= self::JOINED;
return true;
}
}
}

View File

@ -1,186 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
class Threaded implements ArrayAccess, Countable, IteratorAggregate, Collectable
{
const NOTHING = (0);
const STARTED = (1 << 0);
const RUNNING = (1 << 1);
const JOINED = (1 << 2);
const ERROR = (1 << 3);
public function offsetSet($offset, $value)
{
$this->__set($offset, $value);
}
public function offsetGet($offset)
{
return $this->__get($offset);
}
public function offsetUnset($offset)
{
$this->__unset($offset);
}
public function offsetExists($offset)
{
return $this->__isset($offset);
}
public function count()
{
return count((array) $this);
}
public function getIterator()
{
return new ArrayIterator($this);
}
public function __set($offset, $value)
{
if ($offset === null) {
$offset = count($this);
}
if (!$this instanceof Volatile) {
if (isset($this->{$offset}) &&
$this->{$offset} instanceof self) {
throw new \RuntimeException();
}
}
return $this->{$offset} = $value;
}
public function __get($offset)
{
return $this->{$offset};
}
public function __isset($offset)
{
return isset($this->{$offset});
}
public function __unset($offset)
{
if (!$this instanceof Volatile) {
if (isset($this->{$offset}) && $this->{$offset} instanceof self) {
throw new \RuntimeException();
}
}
unset($this->{$offset});
}
public function shift()
{
}
public function chunk($size)
{
$chunk = [];
while (count($chunk) < $size) {
$chunk[] = $this->shift();
}
return $chunk;
}
public function pop()
{
}
public function merge($merge)
{
foreach ($merge as $k => $v) {
$this->{$k} = $v;
}
}
public function wait($timeout = 0)
{
return true;
}
public function notify()
{
return true;
}
public function synchronized(Closure $closure, ...$args)
{
return $closure(...$args);
}
public function isRunning()
{
return $this->state & THREAD::RUNNING;
}
public function isTerminated()
{
return $this->state & THREAD::ERROR;
}
public static function extend($class)
{
return true;
}
public function addRef()
{
}
public function delRef()
{
}
public function getRefCount()
{
}
public function lock()
{
return true;
}
public function unlock()
{
return true;
}
public function isWaiting()
{
return false;
}
public function run()
{
}
public function isGarbage()
{
return true;
}
public function convertToVolatile($value)
{
/*
if (is_array($value)) {
foreach ($value as $k => $v) {
if (is_array($v)) {
$value[$k] =
new Volatile();
$value[$k]->merge(
$this->convertToVolatile($v));
}
}
}
*/
return $value;
}
}
}

View File

@ -1,15 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
class Volatile extends Threaded
{
public function __set($offset, $value)
{
if ($offset === null) {
$offset = count((array) $this);
}
return $this->{$offset} = $value;
}
}
}

View File

@ -1,80 +0,0 @@
<?php
if (!extension_loaded('pthreads')) {
class Worker extends Thread
{
public function collect(Closure $collector = null)
{
foreach ($this->gc as $idx => $collectable) {
if ($collector) {
if ($collector($collectable)) {
unset($this->gc[$idx]);
}
} else {
if ($this->collector($collectable)) {
unset($this->gc[$idx]);
}
}
}
return count($this->gc) + count($this->stack);
}
public function collector(Collectable $collectable)
{
return $collectable->isGarbage();
}
public function shutdown()
{
return $this->join();
}
public function isShutdown()
{
return $this->isJoined();
}
public function getStacked()
{
return count($this->stack);
}
public function unstack()
{
return array_shift($this->stack);
}
public function stack(Threaded $collectable)
{
$this->stack[] = $collectable;
if ($this->isStarted()) {
$this->runCollectable(count($this->stack) - 1, $collectable);
}
}
public function run()
{
foreach ($this->stack as $idx => $collectable) {
$this
->runCollectable($idx, $collectable);
}
}
private function runCollectable($idx, Collectable $collectable)
{
$collectable->worker = $this;
if (!isset($collectable->state)) {
$collectable->state = 0;
}
$collectable->state |= THREAD::RUNNING;
$collectable->run();
$collectable->state &= ~THREAD::RUNNING;
$this->gc[] = $collectable;
unset($this->stack[$idx]);
}
private $stack = [];
private $gc = [];
}
}

View File

@ -238,7 +238,7 @@ class Connection
{
$this->API->logger->logger('Reconnecting');
$this->disconnect();
yield $this->API->datacenter->dc_connect_async($this->ctx->getDc());
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc());
}
public function hasPendingCalls()

View File

@ -19,6 +19,9 @@
namespace danog\MadelineProto;
use Amp\CancellationToken;
use Amp\Artax\DefaultClient;
use Amp\Artax\HttpSocketPool;
use Amp\Socket\ClientConnectContext;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
@ -34,6 +37,7 @@ use danog\MadelineProto\Stream\Proxy\SocksProxy;
use danog\MadelineProto\Stream\Transport\DefaultStream;
use danog\MadelineProto\Stream\Transport\WssStream;
use danog\MadelineProto\Stream\Transport\WsStream;
use danog\MadelineProto\TL\Conversion\Exception;
/**
* Manages datacenters.
@ -47,6 +51,7 @@ class DataCenter
private $API;
private $dclist = [];
private $settings = [];
private $HTTPClient;
public function __sleep()
{
@ -67,14 +72,41 @@ class DataCenter
unset($this->sockets[$key]);
}
}
$this->HTTPClient = new DefaultClient(null, new HttpSocketPool(new ProxySocketPool($this)));
}
public function rawConnectAsync(string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null): \Generator
{
$ctxs = $this->generateContexts(0, $uri, $ctx);
if (empty($ctxs)) {
throw new Exception("No contexts for raw connection to URI $uri");
}
foreach ($ctxs as $ctx) {
/** @var $ctx \danog\MadelineProto\Stream\ConnectionContext */
try {
$ctx->setCancellationToken($token);
$result = yield $ctx->getStream();
$this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING);
return $result->getSocket();
} catch (\Throwable $e) {
if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
throw $e;
}
$this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR);
} catch (\Exception $e) {
$this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR);
}
}
throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
}
public function dc_connect_async($dc_number): \Generator
public function dcConnectAsync($dc_number): \Generator
{
if (isset($this->sockets[$dc_number]) && !isset($this->sockets[$dc_number]->old)) {
return false;
}
$ctxs = $this->generate_contexts($dc_number);
$ctxs = $this->generateContexts($dc_number);
if (empty($ctxs)) {
return false;
}
@ -95,16 +127,16 @@ class DataCenter
if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
throw $e;
}
$this->API->logger->logger('Connection failed: ' . $e->getMessage(), \danog\MadelineProto\Logger::ERROR);
$this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR);
} catch (\Exception $e) {
$this->API->logger->logger('Connection failed: ' . $e->getMessage(), \danog\MadelineProto\Logger::ERROR);
$this->API->logger->logger('Connection failed: '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR);
}
}
throw new \danog\MadelineProto\Exception("Could not connect to DC $dc_number");
}
public function generate_contexts($dc_number)
public function generateContexts($dc_number = 0, string $uri = '', ClientConnectContext $context = null)
{
$ctxs = [];
$combos = [];
@ -164,6 +196,9 @@ class DataCenter
break;
}
}
if (!$dc_number) {
$default = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []]];
}
$combos[] = $default;
if (!isset($this->settings[$dc_config_number]['do_not_retry'])) {
@ -196,6 +231,9 @@ class DataCenter
if ($proxy === DefaultStream::getName()) {
continue;
}
if (!$dc_number && $proxy === ObfuscatedStream::getName()) {
continue;
}
$extra = $proxy_extras[$key];
if (!isset(class_implements($proxy)['danog\\MadelineProto\\Stream\\StreamInterface'])) {
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['proxy_class_invalid']);
@ -232,16 +270,31 @@ class DataCenter
}
}
$combos[] = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [HttpsStream::getName(), []]];
if ($dc_number) {
$combos[] = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [HttpsStream::getName(), []]];
}
$combos = array_unique($combos, SORT_REGULAR);
}
/* @var $context \Amp\ClientConnectContext */
$context = (new ClientConnectContext())->withMaxAttempts(1)->withConnectTimeout(1000 * $this->settings[$dc_config_number]['timeout']);
$context = $context ?? (new ClientConnectContext())->withMaxAttempts(1)->withConnectTimeout(1000 * $this->settings[$dc_config_number]['timeout']);
foreach ($combos as $combo) {
$ipv6 = [$this->settings[$dc_config_number]['ipv6'] ? 'ipv6' : 'ipv4', $this->settings[$dc_config_number]['ipv6'] ? 'ipv4' : 'ipv6'];
foreach ($ipv6 as $ipv6) {
if (!$dc_number) {
/** @var $ctx \danog\MadelineProto\Stream\ConnectionContext */
$ctx = (new ConnectionContext())
->setSocketContext($context)
->setUri($uri)
->setIpv6($ipv6 === 'ipv6');
foreach ($combo as $stream) {
$ctx->addStream(...$stream);
}
$ctxs[] = $ctx;
continue;
}
if (!isset($this->dclist[$test][$ipv6][$dc_number]['ip_address'])) {
continue;
}
@ -259,11 +312,11 @@ class DataCenter
}
$path = $this->settings[$dc_config_number]['test_mode'] ? 'apiw_test1' : 'apiw1';
$uri = 'tcp://' . $subdomain . '.web.telegram.org:' . $port . '/' . $path;
$uri = 'tcp://'.$subdomain.'.web.telegram.org:'.$port.'/'.$path;
} elseif ($stream === HttpStream::getName()) {
$uri = 'tcp://' . $address . ':' . $port . '/api';
$uri = 'tcp://'.$address.':'.$port.'/api';
} else {
$uri = 'tcp://' . $address . ':' . $port;
$uri = 'tcp://'.$address.':'.$port;
}
if ($combo[1][0] === WssStream::getName()) {
@ -273,7 +326,7 @@ class DataCenter
}
$path = $this->settings[$dc_config_number]['test_mode'] ? 'apiws_test' : 'apiws';
$uri = 'tcp://' . $subdomain . '.'.'web.telegram.org'.':' . $port . '/' . $path;
$uri = 'tcp://'.$subdomain.'.'.'web.telegram.org'.':'.$port.'/'.$path;
} elseif ($combo[1][0] === WsStream::getName()) {
$subdomain = $this->dclist['ssl_subdomains'][preg_replace('/\D+/', '', $dc_number)];
if (strpos($dc_number, '_media') !== false) {
@ -282,7 +335,7 @@ class DataCenter
$path = $this->settings[$dc_config_number]['test_mode'] ? 'apiws_test' : 'apiws';
//$uri = 'tcp://' . $subdomain . '.web.telegram.org:' . $port . '/' . $path;
$uri = 'tcp://' . $address . ':' . $port . '/' . $path;
$uri = 'tcp://'.$address.':'.$port.'/'.$path;
}
/** @var $ctx \danog\MadelineProto\Stream\ConnectionContext */
@ -301,8 +354,8 @@ class DataCenter
}
}
if (isset($this->dclist[$test][$ipv6][$dc_number . '_bk']['ip_address'])) {
$ctxs = array_merge($ctxs, $this->generate_contexts($dc_number . '_bk'));
if (isset($this->dclist[$test][$ipv6][$dc_number.'_bk']['ip_address'])) {
$ctxs = array_merge($ctxs, $this->generateContexts($dc_number.'_bk'));
}
if (empty($ctxs)) {
@ -310,14 +363,16 @@ class DataCenter
$this->API->logger->logger("No info for DC $dc_number", \danog\MadelineProto\Logger::ERROR);
}
if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
} else if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
return [$ctxs[0]];
}
return $ctxs;
}
public function getHTTPClient()
{
return $this->HTTPClient;
}
public function get_dcs($all = true)
{
$test = $this->settings['all']['test_mode'] ? 'test' : 'main';

File diff suppressed because one or more lines are too long

View File

@ -213,6 +213,16 @@ class BufferedRawStream implements \danog\MadelineProto\Stream\BufferedStreamInt
return $this->write($data);
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
/**
* Get class name.
*

View File

@ -311,6 +311,15 @@ class HashedBufferedStream implements BufferedProxyStreamInterface, BufferInterf
return $this->write_buffer->bufferWrite($data);
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -98,6 +98,15 @@ class AbridgedStream implements BufferedStreamInterface, MTProtoBufferInterface
return $buffer;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -106,6 +106,15 @@ class FullStream implements BufferedStreamInterface, MTProtoBufferInterface
return $buffer;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -181,6 +181,15 @@ class HttpStream implements MTProtoBufferInterface, BufferedProxyStreamInterface
{
return new Success($this->code);
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -39,6 +39,15 @@ class HttpsStream extends HttpStream implements MTProtoBufferInterface
{
return parent::connectAsync($ctx->getCtx()->secure(true), $header);
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -90,6 +90,15 @@ class IntermediatePaddedStream implements BufferedStreamInterface, MTProtoBuffer
return $buffer;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -87,6 +87,15 @@ class IntermediateStream implements BufferedStreamInterface, MTProtoBufferInterf
return $buffer;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -196,6 +196,15 @@ class ObfuscatedStream implements BufferedProxyStreamInterface
}
$this->extra = $extra;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -23,6 +23,7 @@ use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\RawProxyStreamInterface;
use Amp\Socket\ClientTlsContext;
/**
* HTTP proxy stream wrapper.
@ -45,13 +46,17 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
{
$ctx = $ctx->getCtx();
$uri = $ctx->getUri();
$secure = $ctx->isSecure();
$ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false);
$this->stream = yield $ctx->getStream();
$address = $uri->getHost();
$port = $uri->getPort();
if (strlen(inet_pton($address)) === 16) {
$address = '['.$address.']';
}
yield $this->stream->write("CONNECT $address:$port HTTP/1.1\r\nHost: $address:$port\r\nAccept: */*\r\n".$this->getProxyAuthHeader()."Connection: keep-Alive\r\n\r\n");
$buffer = yield $this->stream->getReadBuffer($l);
@ -100,7 +105,7 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
if ($close) {
$this->disconnect();
yield $this->connect($this->ctx);
yield $this->connect($ctx);
}
\danog\MadelineProto\Logger::log(trim($read));
@ -118,6 +123,10 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
}
\danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http');
if ($secure && method_exists($this->getSocket(), 'enableCrypto')) {
yield $this->getSocket()->enableCrypto((new ClientTlsContext)->withPeerName($uri->getHost()));
}
if (strlen($header)) {
yield (yield $this->stream->getWriteBuffer(strlen($header)))->bufferWrite($header);
}
@ -187,6 +196,15 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
{
$this->extra = $extra;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -23,6 +23,7 @@ use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\RawProxyStreamInterface;
use Amp\Socket\ClientTlsContext;
/**
* Socks5 stream wrapper.
@ -53,7 +54,9 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
$methods .= chr(2);
}
$this->stream = yield $ctx->getStream(chr(5).chr(strlen($methods)).$methods);
$l = 2;
$buffer = yield $this->stream->getReadBuffer($l);
$version = ord(yield $buffer->bufferRead(1));
@ -84,10 +87,11 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
try {
$ip = inet_pton($uri->getHost());
$payload .= pack('C1', strlen($ip) === 4 ? 0x01 : 0x04).$ip;
$payload .= $ip ? pack('C1', strlen($ip) === 4 ? 0x01 : 0x04).$ip : pack('C2', 0x03, strlen($uri->getHost())).$uri->getHost();
} catch (\danog\MadelineProto\Exception $e) {
$payload .= pack('C2', 0x03, strlen($uri->getHost())).$uri->getHost();
}
$payload .= pack('n', $uri->getPort());
yield $this->stream->write($payload);
@ -108,6 +112,7 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
if ($rsv !== 0) {
throw new \danog\MadelineProto\Exception("Wrong socks5 final RSV: $rsv");
}
switch (ord(yield $buffer->bufferRead(1))) {
case 1:
$buffer = yield $this->stream->getReadBuffer($l);
@ -130,10 +135,11 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
$l = 2;
$buffer = yield $this->stream->getReadBuffer($l);
$port = unpack('n', yield $buffer->bufferRead(2))[1];
\danog\MadelineProto\Logger::log(['Connected to '.$ip.':'.$port.' via socks5']);
if ($secure && method_exists($this->stream, 'enableCrypto')) {
yield $this->stream->enableCrypto();
if ($secure && method_exists($this->getSocket(), 'enableCrypto')) {
yield $this->getSocket()->enableCrypto((new ClientTlsContext)->withPeerName($uri->getHost()));
}
if (strlen($header)) {
yield (yield $this->stream->getWriteBuffer(strlen($header)))->bufferWrite($header);
@ -195,6 +201,15 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
{
$this->extra = $extra;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream;
use Amp\Promise;
use Amp\Socket\Socket;
/**
* Generic stream interface.
@ -37,9 +38,16 @@ interface StreamInterface
public function connect(ConnectionContext $ctx, string $header = ''): Promise;
/**
* Disonnect from the server.
* Disconnect from the server.
*
* @return Promise
* @return void
*/
public function disconnect();
/**
* Get underlying AMPHP socket resource
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): Socket;
}

View File

@ -37,14 +37,13 @@ class DefaultStream extends Socket implements RawStreamInterface
use RawStream;
private $stream;
public function enableCrypto(): Promise
{
return $this->stream->enableCrypto();
}
public function __construct()
{
}
public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise
{
return $this->enableCrypto($tlsContext);
}
public function getStream()
{
@ -54,9 +53,9 @@ class DefaultStream extends Socket implements RawStreamInterface
public function connectAsync(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator
{
if ($ctx->isSecure()) {
$this->stream = yield cryptoConnect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
$this->stream = yield cryptoConnect($ctx->getStringUri(), $ctx->getSocketContext(), null, $ctx->getCancellationToken());
} else {
$this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext());
$this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
}
yield $this->stream->write($header);
}
@ -106,6 +105,15 @@ class DefaultStream extends Socket implements RawStreamInterface
{
$this->disconnect();
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream;
}
public static function getName(): string
{

View File

@ -25,8 +25,8 @@ use Amp\Socket\ConnectException;
use Amp\Websocket\Client\ConnectionException;
use Amp\Websocket\Client\Handshake;
use Amp\Websocket\Client\Internal\ClientSocket;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\Client\Rfc6455Connection;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\Rfc7692CompressionFactory;
use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\ConnectionContext;
@ -153,7 +153,7 @@ class WsStream implements RawStreamInterface
$path = '/';
}
if (($query = $uri->getQuery()) !== '') {
$path .= '?' . $query;
$path .= '?'.$query;
}
return \sprintf("GET %s HTTP/1.1\r\n%s\r\n", $path, Rfc7230::formatHeaders($headers));
}
@ -165,13 +165,12 @@ class WsStream implements RawStreamInterface
$position = \strpos($headerBuffer, "\r\n");
$startLine = \substr($headerBuffer, 0, $position);
if (!\preg_match("/^HTTP\/(1\.[01]) (\d{3}) ([^\x01-\x08\x10-\x19]*)$/i", $startLine, $matches)) {
throw new ConnectException('Invalid response start line: ' . $startLine);
throw new ConnectException('Invalid response start line: '.$startLine);
}
$version = $matches[1];
$status = (int) $matches[2];
$reason = $matches[3];
if ($version !== '1.1' || $status !== Status::SWITCHING_PROTOCOLS) {
throw new ConnectionException(
\sprintf('Did not receive switching protocols response: %d %s on DC %d', $status, $reason, $this->dc),
@ -205,6 +204,16 @@ class WsStream implements RawStreamInterface
}
return null;
}
/**
* @inheritDoc
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string
{
return __CLASS__;

View File

@ -19,7 +19,7 @@
namespace danog\MadelineProto\TL\Types;
class Bytes extends \Volatile implements \JsonSerializable
class Bytes implements \JsonSerializable
{
use \danog\Serializable;
private $bytes = [];