Better error handling and proxied DNS over HTTPS

This commit is contained in:
Daniil Gentili 2019-06-12 20:51:38 +02:00
parent 1ea8b9683a
commit 3fe04930e7
8 changed files with 270 additions and 15 deletions

View File

@ -61,7 +61,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler
}
}
}
$settings = ['logger' => ['logger_level' => 5]];
$settings = ['logger' => ['logger_level' => 5], 'connection_settings' => ['all' => ['protocol' => 'tcp_abridged']]];
$MadelineProto = new \danog\MadelineProto\API('bot.madeline', $settings);
$MadelineProto->async(true);

View File

@ -9,7 +9,7 @@
"krakjoe/pthreads-polyfill": "*"
},
"require": {
"php": ">=7.0.0",
"php": ">=7.1.0",
"danog/primemodule": "^1.0.3",
"danog/magicalserializer": "^1.0",
"phpseclib/phpseclib": "dev-master#27370df as 2.0.15",

View File

@ -104,14 +104,21 @@ class API extends APIFactory
throw $e;
}
class_exists('\\Volatile');
$tounserialize = str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
foreach (['RSA', 'TL\\TLMethod', 'TL\\TLConstructor', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
class_exists('\\danog\\MadelineProto\\'.$class);
}
Logger::log((string) $e, Logger::ERROR);
$changed = false;
if (strpos($tounserialize, 'O:26:"danog\\MadelineProto\\Button":') !== false) {
$tounserialize = str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
$changed = true;
}
if (strpos($e->getMessage(), "Erroneous data format for unserializing 'phpseclib\\Math\\BigInteger'") === 0) {
$tounserialize = str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize);
$changed = true;
}
Logger::log((string) $e, Logger::ERROR);
if (!$changed) throw $e;
$unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\Throwable $e) {
Logger::log((string) $e, Logger::ERROR);
@ -176,7 +183,7 @@ class API extends APIFactory
public function __wakeup()
{
$this->APIFactory();
//$this->APIFactory();
}
public function __destruct()

View File

@ -160,6 +160,9 @@ class APIFactory extends AsyncConstruct
if (Magic::is_fork() && !Magic::$processed_fork) {
throw new Exception('Forking not supported, use async logic, instead: https://docs.madelineproto.xyz/docs/ASYNC.html');
}
if (!$this->API) {
throw new Exception('API did not init!');
}
if ($this->API->asyncInitPromise) {
yield $this->API->initAsync();
$this->API->logger->logger('Finished init asynchronously');

View File

@ -48,8 +48,11 @@ class AsyncConstruct
public function setInitPromise($promise)
{
$this->asyncInitPromise = $this->call($promise);
$this->asyncInitPromise->onResolve(function () {
$this->asyncInitPromise = $this->callFork($promise);
$this->asyncInitPromise->onResolve(function ($error, $result) {
if ($error) {
throw $error;
}
$this->asyncInitPromise = null;
});
}

View File

@ -19,10 +19,14 @@
namespace danog\MadelineProto;
use Amp\Artax\Client;
use Amp\Artax\Cookie\ArrayCookieJar;
use Amp\Artax\DefaultClient;
use Amp\Artax\HttpSocketPool;
use Amp\CancellationToken;
use Amp\Dns\Resolver;
use Amp\DoH\DoHConfig;
use Amp\DoH\Rfc8484StubResolver;
use Amp\Socket\ClientConnectContext;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
@ -39,6 +43,18 @@ use danog\MadelineProto\Stream\Transport\DefaultStream;
use danog\MadelineProto\Stream\Transport\WssStream;
use danog\MadelineProto\Stream\Transport\WsStream;
use danog\MadelineProto\TL\Conversion\Exception;
use Amp\DoH\Nameserver;
use function Amp\call;
use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Deferred;
use Amp\NullCancellationToken;
use function Amp\Socket\Internal\parseUri;
use Amp\Dns\Record;
use Amp\Socket\ConnectException;
use Amp\Loop;
use Amp\TimeoutException;
use Amp\Socket\ClientSocket;
/**
* Manages datacenters.
@ -53,6 +69,7 @@ class DataCenter
private $dclist = [];
private $settings = [];
private $HTTPClient;
private $DoHClient;
public function __sleep()
{
@ -75,6 +92,197 @@ class DataCenter
}
}
$this->HTTPClient = new DefaultClient(new ArrayCookieJar(), new HttpSocketPool(new ProxySocketPool($this)));
$DoHConfig = new DoHConfig(
[
new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'),
new Nameserver('https://google.com/resolve', Nameserver::GOOGLE_JSON, ["Host" => "dns.google.com"]),
]
);
$this->DoHClient = new Rfc8484StubResolver($DoHConfig);
}
/**
* Asynchronously establish an encrypted TCP connection (non-blocking).
*
* Note: Once resolved the socket stream will already be set to non-blocking mode.
*
* @param string $uricall
* @param ClientConnectContext $socketContext
* @param ClientTlsContext $tlsContext
* @param CancellationToken $token
*
* @return Promise<ClientSocket>
*/
public function cryptoConnect(
string $uri,
ClientConnectContext $socketContext = null,
ClientTlsContext $tlsContext = null,
CancellationToken $token = null
): Promise {
return call(function () use ($uri, $socketContext, $tlsContext, $token) {
$tlsContext = $tlsContext ?? new ClientTlsContext;
if ($tlsContext->getPeerName() === null) {
$tlsContext = $tlsContext->withPeerName(\parse_url($uri, PHP_URL_HOST));
}
/** @var ClientSocket $socket */
$socket = yield $this->socketConnect($uri, $socketContext, $token);
$promise = $socket->enableCrypto($tlsContext);
if ($token) {
$deferred = new Deferred;
$id = $token->subscribe([$deferred, "fail"]);
$promise->onResolve(function ($exception) use ($id, $token, $deferred) {
if ($token->isRequested()) {
return;
}
$token->unsubscribe($id);
if ($exception) {
$deferred->fail($exception);
return;
}
$deferred->resolve();
});
$promise = $deferred->promise();
}
try {
yield $promise;
} catch (\Throwable $exception) {
$socket->close();
throw $exception;
}
return $socket;
});
}
/**
* Asynchronously establish a socket connection to the specified URI.
*
* @param string $uri URI in scheme://host:port format. TCP is assumed if no scheme is present.
* @param ClientConnectContext $socketContext Socket connect context to use when connecting.
* @param CancellationToken|null $token
*
* @return Promise<\Amp\Socket\ClientSocket>
*/
public function socketConnect(string $uri, ClientConnectContext $socketContext = null, CancellationToken $token = null): Promise
{
return call(function () use ($uri, $socketContext, $token) {
$socketContext = $socketContext ?? new ClientConnectContext;
$token = $token ?? new NullCancellationToken;
$attempt = 0;
$uris = [];
$failures = [];
list($scheme, $host, $port) = parseUri($uri);
if ($host[0] === '[') {
$host = substr($host, 1, -1);
}
if ($port === 0 || @\inet_pton($host)) {
// Host is already an IP address or file path.
$uris = [$uri];
} else {
// Host is not an IP address, so resolve the domain name.
$records = yield $this->DoHClient->resolve($host, $socketContext->getDnsTypeRestriction());
// Usually the faster response should be preferred, but we don't have a reliable way of determining IPv6
// support, so we always prefer IPv4 here.
\usort($records, function (Record $a, Record $b) {
return $a->getType() - $b->getType();
});
foreach ($records as $record) {
/** @var Record $record */
if ($record->getType() === Record::AAAA) {
$uris[] = \sprintf("%s://[%s]:%d", $scheme, $record->getValue(), $port);
} else {
$uris[] = \sprintf("%s://%s:%d", $scheme, $record->getValue(), $port);
}
}
}
$flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT;
$timeout = $socketContext->getConnectTimeout();
foreach ($uris as $builtUri) {
try {
$context = \stream_context_create($socketContext->toStreamContextArray());
if (!$socket = @\stream_socket_client($builtUri, $errno, $errstr, null, $flags, $context)) {
throw new ConnectException(\sprintf(
"Connection to %s failed: [Error #%d] %s%s",
$uri,
$errno,
$errstr,
$failures ? "; previous attempts: ".\implode($failures) : ""
), $errno);
}
\stream_set_blocking($socket, false);
$deferred = new Deferred;
$watcher = Loop::onWritable($socket, [$deferred, 'resolve']);
$id = $token->subscribe([$deferred, 'fail']);
try {
yield Promise\timeout($deferred->promise(), $timeout);
} catch (TimeoutException $e) {
throw new ConnectException(\sprintf(
"Connecting to %s failed: timeout exceeded (%d ms)%s",
$uri,
$timeout,
$failures ? "; previous attempts: ".\implode($failures) : ""
), 110); // See ETIMEDOUT in http://www.virtsync.com/c-error-codes-include-errno
} finally {
Loop::cancel($watcher);
$token->unsubscribe($id);
}
// The following hack looks like the only way to detect connection refused errors with PHP's stream sockets.
if (\stream_socket_get_name($socket, true) === false) {
\fclose($socket);
throw new ConnectException(\sprintf(
"Connection to %s refused%s",
$uri,
$failures ? "; previous attempts: ".\implode($failures) : ""
), 111); // See ECONNREFUSED in http://www.virtsync.com/c-error-codes-include-errno
}
} catch (ConnectException $e) {
// Includes only error codes used in this file, as error codes on other OS families might be different.
// In fact, this might show a confusing error message on OS families that return 110 or 111 by itself.
$knownReasons = [
110 => "connection timeout",
111 => "connection refused",
];
$code = $e->getCode();
$reason = $knownReasons[$code] ?? ("Error #".$code);
if (++$attempt === $socketContext->getMaxAttempts()) {
break;
}
$failures[] = "{$uri} ({$reason})";
continue; // Could not connect to host, try next host in the list.
}
return new ClientSocket($socket);
}
// This is reached if either all URIs failed or the maximum number of attempts is reached.
throw $e;
});
}
public function rawConnectAsync(string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null): \Generator
@ -293,6 +501,9 @@ class DataCenter
->setIpv6($ipv6 === 'ipv6');
foreach ($combo as $stream) {
if ($stream[0] === DefaultStream::getName() && $stream[1] === []) {
$stream[1] = [[$this, 'socketConnect'], [$this, 'cryptoConnect']];
}
$ctx->addStream(...$stream);
}
$ctxs[] = $ctx;
@ -350,6 +561,9 @@ class DataCenter
->setIpv6($ipv6 === 'ipv6');
foreach ($combo as $stream) {
if ($stream[0] === DefaultStream::getName() && $stream[1] === []) {
$stream[1] = [[$this, 'socketConnect'], [$this, 'cryptoConnect']];
}
$ctx->addStream(...$stream);
}
$ctxs[] = $ctx;
@ -375,12 +589,21 @@ class DataCenter
/**
* Get Artax async HTTP client.
*
* @return \Amp\Artax\DefaultClient
* @return \Amp\Artax\Client
*/
public function getHTTPClient()
public function getHTTPClient(): Client
{
return $this->HTTPClient;
}
/**
* Get DNS over HTTPS async DNS client.
*
* @return \Amp\Dns\Resolver
*/
public function getDNSClient(): Resolver
{
return $this->DoHClient;
}
public function fileGetContents($url): \Generator
{

View File

@ -140,8 +140,15 @@ class Magic
// Even an empty handler is enough to catch ctrl+c
if (defined('SIGINT')) {
//if (function_exists('pcntl_async_signals')) pcntl_async_signals(true);
Loop::onSignal(SIGINT, static function () {Logger::log('Got sigint', Logger::FATAL_ERROR);die();});
Loop::onSignal(SIGTERM, static function () {Logger::log('Got sigterm', Logger::FATAL_ERROR);die();});
Loop::onSignal(SIGINT, static function () {
Logger::log('Got sigint', Logger::FATAL_ERROR);
die();
});
Loop::onSignal(SIGTERM, static function () {
Logger::log('Got sigterm', Logger::FATAL_ERROR);
Loop::stop();
die();
});
}
$DohConfig = new DoHConfig(
[

View File

@ -24,6 +24,7 @@ use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\RawStreamInterface;
use function Amp\Socket\connect;
use function Amp\Socket\cryptoConnect;
use danog\MadelineProto\Stream\ProxyStreamInterface;
/**
* Default stream wrapper.
@ -32,11 +33,13 @@ use function Amp\Socket\cryptoConnect;
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class DefaultStream extends Socket implements RawStreamInterface
class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInterface
{
use RawStream;
private $stream;
private $connector = 'Amp\\Socket\\connect';
private $cryptoConnector = 'Amp\\Socket\\cryptoConnect';
public function __construct()
{
}
@ -54,9 +57,11 @@ class DefaultStream extends Socket implements RawStreamInterface
public function connectAsync(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator
{
if ($ctx->isSecure()) {
$this->stream = yield cryptoConnect($ctx->getStringUri(), $ctx->getSocketContext(), null, $ctx->getCancellationToken());
$connector = $this->cryptoConnector;
$this->stream = yield $connector($ctx->getStringUri(), $ctx->getSocketContext(), null, $ctx->getCancellationToken());
} else {
$this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
$connector = $this->connector;
$this->stream = yield $connector($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
}
yield $this->stream->write($header);
}
@ -117,6 +122,13 @@ class DefaultStream extends Socket implements RawStreamInterface
return $this->stream;
}
/**
* {@inheritdoc}
*/
public function setExtra($extra)
{
list($this->connector, $this->cryptoConnector) = $extra;
}
public static function getName(): string
{
return __CLASS__;