This commit is contained in:
Daniil Gentili 2019-09-04 17:48:07 +02:00
parent 75945704dd
commit 4619d44b12
12 changed files with 109 additions and 55 deletions

17
bot.php
View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
use danog\MadelineProto\Stream\Proxy\SocksProxy;
/*
Copyright 2016-2019 Daniil Gentili
@ -12,14 +12,14 @@ See the GNU Affero General Public License for more details.
You should have received a copy of the GNU General Public License along with MadelineProto.
If not, see <http://www.gnu.org/licenses/>.
*/
set_include_path(get_include_path().':'.realpath(dirname(__FILE__).'/MadelineProto/'));
\set_include_path(\get_include_path().':'.\realpath(\dirname(__FILE__).'/MadelineProto/'));
/*
* Various ways to load MadelineProto
*/
if (!file_exists(__DIR__.'/vendor/autoload.php')) {
if (!file_exists('madeline.php')) {
copy('https://phar.madelineproto.xyz/madeline.php', 'madeline.php');
if (!\file_exists(__DIR__.'/vendor/autoload.php')) {
if (!\file_exists('madeline.php')) {
\copy('https://phar.madelineproto.xyz/madeline.php', 'madeline.php');
}
include 'madeline.php';
} else {
@ -39,10 +39,10 @@ class EventHandler extends \danog\MadelineProto\EventHandler
if (isset($update['message']['_']) && $update['message']['_'] === 'messageEmpty') {
return;
}
$res = json_encode($update, JSON_PRETTY_PRINT);
$res = \json_encode($update, JSON_PRETTY_PRINT);
try {
yield $this->messages->sendMessage(['peer' => $update, 'message' => "<code>$res</code>", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); //'entities' => [['_' => 'messageEntityPre', 'offset' => 0, 'length' => strlen($res), 'language' => 'json']]]);
yield $this->messages->sendMessage(['peer' => $update, 'message' => "<code>$res</code>", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']);
if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') {
yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]);
/* '_' => 'inputMediaUploadedDocument',
@ -56,7 +56,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler
} catch (\danog\MadelineProto\RPCErrorException $e) {
$this->logger((string) $e, \danog\MadelineProto\Logger::FATAL_ERROR);
} catch (\danog\MadelineProto\Exception $e) {
if (stripos($e->getMessage(), 'invalid constructor given') === false) {
if (\stripos($e->getMessage(), 'invalid constructor given') === false) {
$this->logger((string) $e, \danog\MadelineProto\Logger::FATAL_ERROR);
}
//$this->messages->sendMessage(['peer' => '@danogentili', 'message' => $e->getCode().': '.$e->getMessage().PHP_EOL.$e->getTraceAsString()]);
@ -71,4 +71,5 @@ $MadelineProto->loop(function () use ($MadelineProto) {
yield $MadelineProto->start();
yield $MadelineProto->setEventHandler('\EventHandler');
});
$MadelineProto->loop();

View File

@ -40,8 +40,6 @@ class Connection extends Session
use \danog\Serializable;
use Tools;
const PENDING_MAX = 2000000000;
/**
* Writer loop.
*
@ -305,7 +303,7 @@ class Connection extends Session
*/
public function connect(ConnectionContext $ctx): \Generator
{
$this->API->logger->logger("Trying connection ({$this->id}) via $ctx", \danog\MadelineProto\Logger::WARNING);
//$this->API->logger->logger("Trying connection ({$this->id}) via $ctx", \danog\MadelineProto\Logger::WARNING);
$ctx->setReadCallback([$this, 'haveRead']);
@ -411,7 +409,6 @@ class Connection extends Session
$message['send_promise'] = $deferred;
$this->pending_outgoing[$this->pending_outgoing_key++] = $message;
$this->pending_outgoing_key %= self::PENDING_MAX;
if ($flush && isset($this->writer)) {
$this->writer->resume();
}
@ -471,7 +468,7 @@ class Connection extends Session
*/
public function disconnect()
{
$this->API->logger->logger("Disconnecting from DC {$this->datacenterId}");
//$this->API->logger->logger("Disconnecting from DC {$this->datacenterId}");
$this->old = true;
foreach (['reader', 'writer', 'checker', 'waiter', 'updater'] as $loop) {
if (isset($this->{$loop}) && $this->{$loop}) {
@ -485,7 +482,7 @@ class Connection extends Session
$this->API->logger->logger($e);
}
}
$this->API->logger->logger("Disconnected from DC {$this->datacenterId}");
//$this->API->logger->logger("Disconnected from DC {$this->datacenterId}");
}
/**
@ -495,7 +492,7 @@ class Connection extends Session
*/
public function reconnect(): \Generator
{
$this->API->logger->logger("Reconnecting DC {$this->datacenterId}");
//$this->API->logger->logger("Reconnecting DC {$this->datacenterId}");
$this->disconnect();
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc(), $this->id);
if ($this->API->hasAllAuth() && !$this->hasPendingCalls()) {

View File

@ -188,7 +188,7 @@ class DataCenter
$this->settings = $settings;
foreach ($this->sockets as $key => $socket) {
if ($socket instanceof DataCenterConnection && !\strpos($key, '_bk')) {
$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE);
//$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE);
$socket->old = true;
$socket->setExtra($this->API);
$socket->disconnect();

View File

@ -107,6 +107,12 @@ class DataCenterConnection implements JsonSerializable
*/
private $decWrite = 10;
/**
* Backed up messages
*
* @var array
*/
private $backup = [];
/**
* Get auth key.
*
@ -218,6 +224,15 @@ class DataCenterConnection implements JsonSerializable
}
$this->tempAuthKey->bind($this->permAuthKey, $pfs);
}
/**
* Check if auth keys are bound.
*
* @return boolean
*/
public function isBound(): bool
{
return $this->tempAuthKey ? $this->tempAuthKey->isBound() : false;
}
/**
* Check if we are logged in.
*
@ -311,7 +326,7 @@ class DataCenterConnection implements JsonSerializable
*/
public function connect(ConnectionContext $ctx, int $id = -1): \Generator
{
$this->API->logger->logger("Trying shared connection via $ctx", \danog\MadelineProto\Logger::WARNING);
$this->API->logger->logger("Trying shared connection via $ctx");
$this->ctx = $ctx->getCtx();
$this->datacenter = $ctx->getDc();
@ -331,9 +346,11 @@ class DataCenterConnection implements JsonSerializable
if ($id === -1) {
if ($this->connections) {
$this->disconnect();
$this->API->logger("Already connected!", Logger::WARNING);
return;
}
yield $this->connectMore($count);
yield $this->restoreBackup();
} else {
$this->availableConnections[$id] = 0;
yield $this->connections[$id]->connect($ctx);
@ -372,9 +389,14 @@ class DataCenterConnection implements JsonSerializable
$this->robinLoop->signal(true);
$this->robinLoop = null;
}
$before = count($this->backup);
foreach ($this->connections as $connection) {
$this->backup = \array_merge($this->backup, $connection->backupSession());
$connection->disconnect();
}
$count = count($this->backup) - $before;
$this->API->logger->logger("Backed up $count messages from DC {$this->datacenter}");
$this->connections = [];
$this->availableConnections = [];
}
@ -391,6 +413,22 @@ class DataCenterConnection implements JsonSerializable
yield $this->connect($this->ctx);
}
/**
* Restore backed up messages
*
* @return void
*/
public function restoreBackup()
{
$backup = $this->backup;
$this->backup = [];
$count = count($backup);
$this->API->logger->logger("Restoring $count messages to DC {$this->datacenter}");
foreach ($backup as $message) {
Tools::callFork($this->getConnection()->sendMessage($message, false));
}
$this->flush();
}
/**
* Get connection for authorization.
*

View File

@ -19,8 +19,10 @@
namespace danog\MadelineProto\Loop\Connection;
use Amp\Deferred;
use Amp\Loop;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\Tools;
/**
* RPC call status check loop.
@ -160,8 +162,8 @@ class CheckLoop extends ResumableSignalLoop
if ($connection->get_max_id(true) === $last_msgid && $connection->getLastChunk() === $last_chunk) {
$API->logger->logger("We did not receive a response for $timeout seconds: reconnecting and exiting check loop on DC $datacenter");
$this->exitedLoop();
yield $connection->reconnect();
//$this->exitedLoop();
Tools::callForkDefer($connection->reconnect());
return;
}

View File

@ -88,13 +88,14 @@ class ReadLoop extends SignalLoop
}
if (\is_int($error)) {
$this->exitedLoop();
//$this->exitedLoop();
Tools::callForkDefer((function () use ($error, $shared, $connection, $datacenter, $API) {
if ($error === -404) {
if ($shared->hasTempAuthKey()) {
$API->logger->logger("WARNING: Resetting auth key in DC {$datacenter}...", \danog\MadelineProto\Logger::WARNING);
$shared->setTempAuthKey(null);
$connection->session_id = null;
$shared->resetSession();
foreach ($connection->new_outgoing as $message_id) {
$connection->outgoing_messages[$message_id]['sent'] = 0;
}
@ -111,12 +112,14 @@ class ReadLoop extends SignalLoop
yield $connection->reconnect();
} elseif ($error === -429) {
$API->logger->logger("Got -429 from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING);
Loop::delay(1*1000, [$connection, 'reconnect']);
yield Tools::sleep(1);
yield $connection->reconnect();
} else {
yield $connection->reconnect();
throw new \danog\MadelineProto\RPCErrorException($error, $error);
}
})());
return;
}

View File

@ -170,7 +170,6 @@ class WriteLoop extends ResumableSignalLoop
if (\count($to_ack = $connection->ack_queue)) {
foreach (\array_chunk($connection->ack_queue, 8192) as $acks) {
$connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'msgs_ack', 'serialized_body' => yield $this->API->serialize_object_async(['type' => 'msgs_ack'], ['msg_ids' => $acks], 'msgs_ack'), 'content_related' => false, 'unencrypted' => false, 'method' => false];
$connection->pending_outgoing_key %= Connection::PENDING_MAX;
}
}
@ -186,7 +185,6 @@ class WriteLoop extends ResumableSignalLoop
}
if ($shared->isHttp() && !$has_http_wait) {
$connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'http_wait', 'serialized_body' => yield $this->API->serialize_object_async(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'content_related' => true, 'unencrypted' => false, 'method' => true];
$connection->pending_outgoing_key %= Connection::PENDING_MAX;
$has_http_wait = true;
}
@ -202,7 +200,7 @@ class WriteLoop extends ResumableSignalLoop
unset($connection->pending_outgoing[$k]);
continue;
}
if ($shared->getSettings()['pfs'] && !$shared->getTempAuthKey()->isBound() && !$connection->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
if ($shared->getSettings()['pfs'] && !$shared->isBound() && !$connection->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
$API->logger->logger("Skipping {$message['_']} due to unbound keys in DC {$datacenter}");
$skipped = true;
continue;
@ -291,7 +289,6 @@ class WriteLoop extends ResumableSignalLoop
//var_dumP("container ".bin2hex($message_id));
$keys[$connection->pending_outgoing_key++] = $message_id;
$connection->pending_outgoing_key %= Connection::PENDING_MAX;
$message_data = yield $API->serialize_object_async(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container');

View File

@ -108,11 +108,10 @@ class FeedLoop extends ResumableSignalLoop
if (isset($update['pts'], $update['pts_count'])) {
$logger = function ($msg) use ($update) {
$pts_count = $update['pts_count'];
$double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-';
$mid = isset($update['message']['id']) ? $update['message']['id'] : '-';
$mypts = $this->state->pts();
$computed = $mypts + $pts_count;
$this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, computed pts: $computed, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR);
$this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, computed pts: $computed, msg id: {$mid}, channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR);
};
$result = $this->state->checkPts($update);
if ($result < 0) {

View File

@ -99,7 +99,7 @@ trait AckHandler
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {
if ($pfs && !$this->shared->getTempAuthKey()->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
continue;
}
@ -128,7 +128,7 @@ trait AckHandler
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {
if ($pfs && !$this->shared->getTempAuthKey()->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
continue;
}

View File

@ -152,7 +152,7 @@ trait CallHandler
$aargs['multiple'] = true;
}
if (isset($args['message']) && \is_string($args['message']) && \mb_strlen($args['message'], 'UTF-8') > (yield $this->API->get_config_async())['message_length_max'] && \mb_strlen((yield $this->API->parse_mode_async($args))['message'], 'UTF-8') > (yield $this->API->get_config_async())['message_length_max']) {
$args = yield $this->split_to_chunks_async($args);
$args = yield $this->API->split_to_chunks_async($args);
$promises = [];
$aargs['queue'] = $method;
$aargs['multiple'] = true;

View File

@ -35,7 +35,7 @@ abstract class Session
public $new_outgoing = [];
public $pending_outgoing = [];
public $pending_outgoing_key = 0;
public $pending_outgoing_key = 'a';
public $time_delta = 0;
@ -68,4 +68,18 @@ abstract class Session
$this->session_out_seq_no = 0;
}
}
/**
* Backup eventual unsent messages before session deletion
*
* @return array
*/
public function backupSession(): array
{
$pending = array_values($this->pending_outgoing);
foreach ($this->new_outgoing as $id) {
$pending[] = $this->outgoing_messages[$id];
}
return $pending;
}
}

View File

@ -712,6 +712,9 @@ trait AuthKeyHandler
}
if ($media) {
$socket->link(\intval($id));
if ($socket->hasTempAuthKey()) {
return;
}
}
if ($this->datacenter->getDataCenterConnection($id)->getSettings()['pfs']) {
if (!$cdn) {