From 4619d44b1285a7a95ac7b7688f324e105f58486c Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Wed, 4 Sep 2019 17:48:07 +0200 Subject: [PATCH] Clean up --- bot.php | 17 ++++--- src/danog/MadelineProto/Connection.php | 11 ++-- src/danog/MadelineProto/DataCenter.php | 2 +- .../MadelineProto/DataCenterConnection.php | 44 ++++++++++++++-- .../Loop/Connection/CheckLoop.php | 6 ++- .../Loop/Connection/ReadLoop.php | 51 ++++++++++--------- .../Loop/Connection/WriteLoop.php | 5 +- .../MadelineProto/Loop/Update/FeedLoop.php | 3 +- .../MTProtoSession/AckHandler.php | 4 +- .../MTProtoSession/CallHandler.php | 2 +- .../MadelineProto/MTProtoSession/Session.php | 16 +++++- .../MTProtoTools/AuthKeyHandler.php | 3 ++ 12 files changed, 109 insertions(+), 55 deletions(-) diff --git a/bot.php b/bot.php index b82dc80f..e0dc41a6 100755 --- a/bot.php +++ b/bot.php @@ -1,6 +1,6 @@ #!/usr/bin/env php . */ -set_include_path(get_include_path().':'.realpath(dirname(__FILE__).'/MadelineProto/')); +\set_include_path(\get_include_path().':'.\realpath(\dirname(__FILE__).'/MadelineProto/')); /* * Various ways to load MadelineProto */ -if (!file_exists(__DIR__.'/vendor/autoload.php')) { - if (!file_exists('madeline.php')) { - copy('https://phar.madelineproto.xyz/madeline.php', 'madeline.php'); +if (!\file_exists(__DIR__.'/vendor/autoload.php')) { + if (!\file_exists('madeline.php')) { + \copy('https://phar.madelineproto.xyz/madeline.php', 'madeline.php'); } include 'madeline.php'; } else { @@ -39,10 +39,10 @@ class EventHandler extends \danog\MadelineProto\EventHandler if (isset($update['message']['_']) && $update['message']['_'] === 'messageEmpty') { return; } - $res = json_encode($update, JSON_PRETTY_PRINT); + $res = \json_encode($update, JSON_PRETTY_PRINT); try { - yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); //'entities' => [['_' => 'messageEntityPre', 'offset' => 0, 'length' => strlen($res), 'language' => 'json']]]); + yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); /* '_' => 'inputMediaUploadedDocument', @@ -56,7 +56,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler } catch (\danog\MadelineProto\RPCErrorException $e) { $this->logger((string) $e, \danog\MadelineProto\Logger::FATAL_ERROR); } catch (\danog\MadelineProto\Exception $e) { - if (stripos($e->getMessage(), 'invalid constructor given') === false) { + if (\stripos($e->getMessage(), 'invalid constructor given') === false) { $this->logger((string) $e, \danog\MadelineProto\Logger::FATAL_ERROR); } //$this->messages->sendMessage(['peer' => '@danogentili', 'message' => $e->getCode().': '.$e->getMessage().PHP_EOL.$e->getTraceAsString()]); @@ -71,4 +71,5 @@ $MadelineProto->loop(function () use ($MadelineProto) { yield $MadelineProto->start(); yield $MadelineProto->setEventHandler('\EventHandler'); }); + $MadelineProto->loop(); diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index f1f378be..566ad05b 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -40,8 +40,6 @@ class Connection extends Session use \danog\Serializable; use Tools; - const PENDING_MAX = 2000000000; - /** * Writer loop. * @@ -305,7 +303,7 @@ class Connection extends Session */ public function connect(ConnectionContext $ctx): \Generator { - $this->API->logger->logger("Trying connection ({$this->id}) via $ctx", \danog\MadelineProto\Logger::WARNING); + //$this->API->logger->logger("Trying connection ({$this->id}) via $ctx", \danog\MadelineProto\Logger::WARNING); $ctx->setReadCallback([$this, 'haveRead']); @@ -411,7 +409,6 @@ class Connection extends Session $message['send_promise'] = $deferred; $this->pending_outgoing[$this->pending_outgoing_key++] = $message; - $this->pending_outgoing_key %= self::PENDING_MAX; if ($flush && isset($this->writer)) { $this->writer->resume(); } @@ -471,7 +468,7 @@ class Connection extends Session */ public function disconnect() { - $this->API->logger->logger("Disconnecting from DC {$this->datacenterId}"); + //$this->API->logger->logger("Disconnecting from DC {$this->datacenterId}"); $this->old = true; foreach (['reader', 'writer', 'checker', 'waiter', 'updater'] as $loop) { if (isset($this->{$loop}) && $this->{$loop}) { @@ -485,7 +482,7 @@ class Connection extends Session $this->API->logger->logger($e); } } - $this->API->logger->logger("Disconnected from DC {$this->datacenterId}"); + //$this->API->logger->logger("Disconnected from DC {$this->datacenterId}"); } /** @@ -495,7 +492,7 @@ class Connection extends Session */ public function reconnect(): \Generator { - $this->API->logger->logger("Reconnecting DC {$this->datacenterId}"); + //$this->API->logger->logger("Reconnecting DC {$this->datacenterId}"); $this->disconnect(); yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc(), $this->id); if ($this->API->hasAllAuth() && !$this->hasPendingCalls()) { diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index 3edc6312..259d9857 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -188,7 +188,7 @@ class DataCenter $this->settings = $settings; foreach ($this->sockets as $key => $socket) { if ($socket instanceof DataCenterConnection && !\strpos($key, '_bk')) { - $this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE); + //$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE); $socket->old = true; $socket->setExtra($this->API); $socket->disconnect(); diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php index 878e59f0..b9bad865 100644 --- a/src/danog/MadelineProto/DataCenterConnection.php +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -107,6 +107,12 @@ class DataCenterConnection implements JsonSerializable */ private $decWrite = 10; + /** + * Backed up messages + * + * @var array + */ + private $backup = []; /** * Get auth key. * @@ -218,6 +224,15 @@ class DataCenterConnection implements JsonSerializable } $this->tempAuthKey->bind($this->permAuthKey, $pfs); } + /** + * Check if auth keys are bound. + * + * @return boolean + */ + public function isBound(): bool + { + return $this->tempAuthKey ? $this->tempAuthKey->isBound() : false; + } /** * Check if we are logged in. * @@ -311,7 +326,7 @@ class DataCenterConnection implements JsonSerializable */ public function connect(ConnectionContext $ctx, int $id = -1): \Generator { - $this->API->logger->logger("Trying shared connection via $ctx", \danog\MadelineProto\Logger::WARNING); + $this->API->logger->logger("Trying shared connection via $ctx"); $this->ctx = $ctx->getCtx(); $this->datacenter = $ctx->getDc(); @@ -331,9 +346,11 @@ class DataCenterConnection implements JsonSerializable if ($id === -1) { if ($this->connections) { - $this->disconnect(); + $this->API->logger("Already connected!", Logger::WARNING); + return; } yield $this->connectMore($count); + yield $this->restoreBackup(); } else { $this->availableConnections[$id] = 0; yield $this->connections[$id]->connect($ctx); @@ -372,9 +389,14 @@ class DataCenterConnection implements JsonSerializable $this->robinLoop->signal(true); $this->robinLoop = null; } + $before = count($this->backup); foreach ($this->connections as $connection) { + $this->backup = \array_merge($this->backup, $connection->backupSession()); $connection->disconnect(); } + $count = count($this->backup) - $before; + $this->API->logger->logger("Backed up $count messages from DC {$this->datacenter}"); + $this->connections = []; $this->availableConnections = []; } @@ -391,6 +413,22 @@ class DataCenterConnection implements JsonSerializable yield $this->connect($this->ctx); } + /** + * Restore backed up messages + * + * @return void + */ + public function restoreBackup() + { + $backup = $this->backup; + $this->backup = []; + $count = count($backup); + $this->API->logger->logger("Restoring $count messages to DC {$this->datacenter}"); + foreach ($backup as $message) { + Tools::callFork($this->getConnection()->sendMessage($message, false)); + } + $this->flush(); + } /** * Get connection for authorization. * @@ -431,7 +469,7 @@ class DataCenterConnection implements JsonSerializable foreach ($this->availableConnections as &$count) { $count += 50; } - } else if ($min < 100) { + } elseif ($min < 100) { $max = $this->isMedia() || $this->isCDN() ? $this->API->settings['connection_settings']['media_socket_count']['max'] : 1; if (\count($this->availableConnections) < $max) { $this->connectMore(2); diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index 398fdca8..4372ef16 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -19,8 +19,10 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\Deferred; +use Amp\Loop; use danog\MadelineProto\Connection; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\MadelineProto\Tools; /** * RPC call status check loop. @@ -160,8 +162,8 @@ class CheckLoop extends ResumableSignalLoop if ($connection->get_max_id(true) === $last_msgid && $connection->getLastChunk() === $last_chunk) { $API->logger->logger("We did not receive a response for $timeout seconds: reconnecting and exiting check loop on DC $datacenter"); - $this->exitedLoop(); - yield $connection->reconnect(); + //$this->exitedLoop(); + Tools::callForkDefer($connection->reconnect()); return; } diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index 347c2c5b..54dd8d8d 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -88,35 +88,38 @@ class ReadLoop extends SignalLoop } if (\is_int($error)) { - $this->exitedLoop(); + //$this->exitedLoop(); - if ($error === -404) { - if ($shared->hasTempAuthKey()) { - $API->logger->logger("WARNING: Resetting auth key in DC {$datacenter}...", \danog\MadelineProto\Logger::WARNING); - $shared->setTempAuthKey(null); - $connection->session_id = null; - foreach ($connection->new_outgoing as $message_id) { - $connection->outgoing_messages[$message_id]['sent'] = 0; + Tools::callForkDefer((function () use ($error, $shared, $connection, $datacenter, $API) { + if ($error === -404) { + if ($shared->hasTempAuthKey()) { + $API->logger->logger("WARNING: Resetting auth key in DC {$datacenter}...", \danog\MadelineProto\Logger::WARNING); + $shared->setTempAuthKey(null); + $shared->resetSession(); + foreach ($connection->new_outgoing as $message_id) { + $connection->outgoing_messages[$message_id]['sent'] = 0; + } + yield $shared->reconnect(); + yield $API->init_authorization_async(); + } else { + yield $connection->reconnect(); } - yield $shared->reconnect(); - yield $API->init_authorization_async(); + } elseif ($error === -1) { + $API->logger->logger("WARNING: Got quick ack from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); + yield $connection->reconnect(); + } elseif ($error === 0) { + $API->logger->logger("Got NOOP from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); + yield $connection->reconnect(); + } elseif ($error === -429) { + $API->logger->logger("Got -429 from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); + yield Tools::sleep(1); + yield $connection->reconnect(); } else { yield $connection->reconnect(); - } - } elseif ($error === -1) { - $API->logger->logger("WARNING: Got quick ack from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); - yield $connection->reconnect(); - } elseif ($error === 0) { - $API->logger->logger("Got NOOP from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); - yield $connection->reconnect(); - } elseif ($error === -429) { - $API->logger->logger("Got -429 from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING); - Loop::delay(1*1000, [$connection, 'reconnect']); - } else { - yield $connection->reconnect(); - throw new \danog\MadelineProto\RPCErrorException($error, $error); - } + throw new \danog\MadelineProto\RPCErrorException($error, $error); + } + })()); return; } diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 77a55142..7b62e504 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -170,7 +170,6 @@ class WriteLoop extends ResumableSignalLoop if (\count($to_ack = $connection->ack_queue)) { foreach (\array_chunk($connection->ack_queue, 8192) as $acks) { $connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'msgs_ack', 'serialized_body' => yield $this->API->serialize_object_async(['type' => 'msgs_ack'], ['msg_ids' => $acks], 'msgs_ack'), 'content_related' => false, 'unencrypted' => false, 'method' => false]; - $connection->pending_outgoing_key %= Connection::PENDING_MAX; } } @@ -186,7 +185,6 @@ class WriteLoop extends ResumableSignalLoop } if ($shared->isHttp() && !$has_http_wait) { $connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'http_wait', 'serialized_body' => yield $this->API->serialize_object_async(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'content_related' => true, 'unencrypted' => false, 'method' => true]; - $connection->pending_outgoing_key %= Connection::PENDING_MAX; $has_http_wait = true; } @@ -202,7 +200,7 @@ class WriteLoop extends ResumableSignalLoop unset($connection->pending_outgoing[$k]); continue; } - if ($shared->getSettings()['pfs'] && !$shared->getTempAuthKey()->isBound() && !$connection->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) { + if ($shared->getSettings()['pfs'] && !$shared->isBound() && !$connection->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) { $API->logger->logger("Skipping {$message['_']} due to unbound keys in DC {$datacenter}"); $skipped = true; continue; @@ -291,7 +289,6 @@ class WriteLoop extends ResumableSignalLoop //var_dumP("container ".bin2hex($message_id)); $keys[$connection->pending_outgoing_key++] = $message_id; - $connection->pending_outgoing_key %= Connection::PENDING_MAX; $message_data = yield $API->serialize_object_async(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'); diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index 6e125c6a..1fd9f94b 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -108,11 +108,10 @@ class FeedLoop extends ResumableSignalLoop if (isset($update['pts'], $update['pts_count'])) { $logger = function ($msg) use ($update) { $pts_count = $update['pts_count']; - $double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-'; $mid = isset($update['message']['id']) ? $update['message']['id'] : '-'; $mypts = $this->state->pts(); $computed = $mypts + $pts_count; - $this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, computed pts: $computed, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); + $this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, computed pts: $computed, msg id: {$mid}, channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); }; $result = $this->state->checkPts($update); if ($result < 0) { diff --git a/src/danog/MadelineProto/MTProtoSession/AckHandler.php b/src/danog/MadelineProto/MTProtoSession/AckHandler.php index cbd36349..483affed 100644 --- a/src/danog/MadelineProto/MTProtoSession/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/AckHandler.php @@ -99,7 +99,7 @@ trait AckHandler && $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted'] && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' ) { - if ($pfs && !$this->shared->getTempAuthKey()->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { + if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { continue; } @@ -128,7 +128,7 @@ trait AckHandler && $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted'] && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' ) { - if ($pfs && !$this->shared->getTempAuthKey()->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { + if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { continue; } diff --git a/src/danog/MadelineProto/MTProtoSession/CallHandler.php b/src/danog/MadelineProto/MTProtoSession/CallHandler.php index 53081f00..105b22f6 100644 --- a/src/danog/MadelineProto/MTProtoSession/CallHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/CallHandler.php @@ -152,7 +152,7 @@ trait CallHandler $aargs['multiple'] = true; } if (isset($args['message']) && \is_string($args['message']) && \mb_strlen($args['message'], 'UTF-8') > (yield $this->API->get_config_async())['message_length_max'] && \mb_strlen((yield $this->API->parse_mode_async($args))['message'], 'UTF-8') > (yield $this->API->get_config_async())['message_length_max']) { - $args = yield $this->split_to_chunks_async($args); + $args = yield $this->API->split_to_chunks_async($args); $promises = []; $aargs['queue'] = $method; $aargs['multiple'] = true; diff --git a/src/danog/MadelineProto/MTProtoSession/Session.php b/src/danog/MadelineProto/MTProtoSession/Session.php index 0e8d0e1c..5e33eb37 100644 --- a/src/danog/MadelineProto/MTProtoSession/Session.php +++ b/src/danog/MadelineProto/MTProtoSession/Session.php @@ -35,7 +35,7 @@ abstract class Session public $new_outgoing = []; public $pending_outgoing = []; - public $pending_outgoing_key = 0; + public $pending_outgoing_key = 'a'; public $time_delta = 0; @@ -68,4 +68,18 @@ abstract class Session $this->session_out_seq_no = 0; } } + + /** + * Backup eventual unsent messages before session deletion + * + * @return array + */ + public function backupSession(): array + { + $pending = array_values($this->pending_outgoing); + foreach ($this->new_outgoing as $id) { + $pending[] = $this->outgoing_messages[$id]; + } + return $pending; + } } diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index ffc7db2f..70f59340 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -712,6 +712,9 @@ trait AuthKeyHandler } if ($media) { $socket->link(\intval($id)); + if ($socket->hasTempAuthKey()) { + return; + } } if ($this->datacenter->getDataCenterConnection($id)->getSettings()['pfs']) { if (!$cdn) {