2GB file limit, async RPC reporting, file reference and MTProto fixes

This commit is contained in:
Daniil Gentili 2020-07-09 18:23:16 +02:00
parent bfd08f5744
commit 4923eec9cd
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
17 changed files with 262 additions and 123 deletions

View File

@ -53,7 +53,8 @@
"danog/7to5": "^1",
"vimeo/psalm": "dev-master",
"phpstan/phpstan": "^0.12.14",
"friendsofphp/php-cs-fixer": "^2.16"
"friendsofphp/php-cs-fixer": "^2.16",
"squizlabs/php_codesniffer": "^3.5"
},
"suggest": {
"ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)"

View File

@ -84,7 +84,6 @@ class MyEventHandler extends EventHandler
if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') {
yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]);
}
yield $this->ping(['multiple' => true] + \array_map(fn ($v) => ['ping_id' => $v], \range(0, 1020)));
}
}
$settings = [

View File

@ -184,9 +184,9 @@ final class APIWrapper
}
$this->serialized = \time();
$realpaths = Serialization::realpaths($this->session);
//Logger::log('Waiting for exclusive lock of serialization lockfile...');
Logger::log('Waiting for exclusive lock of serialization lockfile...');
$unlock = yield Tools::flock($realpaths['lockfile'], LOCK_EX);
//Logger::log('Lock acquired, serializing');
Logger::log('Lock acquired, serializing');
try {
if (!$this->gettingApiId) {
$update_closure = $this->API->settings['updates']['callback'];
@ -207,7 +207,7 @@ final class APIWrapper
}
$unlock();
}
//Logger::log('Done serializing');
Logger::log('Done serializing');
return $wrote;
})());
}

View File

@ -409,8 +409,7 @@ class Connection extends Session
$deferred = new Deferred();
if (!isset($message['serialized_body'])) {
$body = \is_object($message['body']) ? yield from $message['body'] : $message['body'];
$refreshNext = isset($message['refreshNext']) && $message['refreshNext'];
//$refreshNext = true;
$refreshNext = $message['refreshReferences'] ?? false;
if ($refreshNext) {
$this->API->referenceDatabase->refreshNext(true);
}

View File

@ -374,7 +374,7 @@ class DataCenterConnection implements JsonSerializable
$this->decWrite = self::WRITE_WEIGHT;
if ($id === -1 || !isset($this->connections[$id])) {
if ($this->connections) {
$this->API->logger("Already connected!", Logger::WARNING);
$this->API->logger->logger("Already connected!", Logger::WARNING);
return;
}
yield from $this->connectMore($count);

View File

@ -131,14 +131,17 @@ class CheckLoop extends ResumableSignalLoop
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server and was already sent, requesting reply...', \danog\MadelineProto\Logger::ERROR);
$reply[] = $message_id;
} else {
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server, requesting reply...', \danog\MadelineProto\Logger::ERROR);
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server, waiting...', \danog\MadelineProto\Logger::ERROR);
$reply[] = $message_id;
}
}
}
/*
if ($reply) {
\danog\MadelineProto\Tools::callFork($connection->objectCall('msg_resend_ans_req', ['msg_ids' => $reply], ['postpone' => true]));
}
$deferred= new Deferred;
$deferred->promise()->onResolve(fn($e, $res) => var_dump(ord($res['info'][0])));
\danog\MadelineProto\Tools::callFork($connection->objectCall('msg_resend_req', ['msg_ids' => $reply], ['postpone' => true, 'promise' => $deferred]));
}*/
$connection->flush();
});
$list = '';

View File

@ -106,7 +106,7 @@ class ReadLoop extends SignalLoop
yield from $connection->reconnect();
} elseif ($error === -429) {
$API->logger->logger("Got -429 from DC {$datacenter}", Logger::WARNING);
yield Tools::sleep(1);
yield Tools::sleep(3);
yield from $connection->reconnect();
} else {
yield from $connection->reconnect();

View File

@ -20,6 +20,7 @@
namespace danog\MadelineProto\Loop\Connection;
use Amp\ByteStream\StreamException;
use Amp\Loop;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
@ -33,6 +34,10 @@ use danog\MadelineProto\Tools;
*/
class WriteLoop extends ResumableSignalLoop
{
const MAX_COUNT = 1020;
const MAX_SIZE = 1 << 15;
const MAX_IDS = 8192;
/**
* Connection instance.
*
@ -157,37 +162,22 @@ class WriteLoop extends ResumableSignalLoop
if ($shared->isHttp() && empty($connection->pending_outgoing)) {
return;
}
$temporary_keys = [];
if (\count($to_ack = $connection->ack_queue)) {
foreach (\array_chunk($connection->ack_queue, 8192) as $acks) {
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msgs_ack', 'serialized_body' => yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msgs_ack', 'msg_ids' => $acks], 'msgs_ack'), 'contentRelated' => false, 'unencrypted' => false, 'method' => false];
$temporary_keys[$connection->pending_outgoing_key] = true;
$API->logger->logger("Adding msgs_ack {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE);
$connection->pending_outgoing_key++;
}
}
$has_http_wait = false;
\ksort($connection->pending_outgoing);
$messages = [];
$keys = [];
if ($shared->isHttp()) {
foreach ($connection->pending_outgoing as $message) {
if ($message['_'] === 'http_wait') {
$has_http_wait = true;
break;
}
}
if (!$has_http_wait) {
$API->logger->logger("Adding http_wait {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE);
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'http_wait', 'serialized_body' => yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'contentRelated' => true, 'unencrypted' => false, 'method' => true];
$temporary_keys[$connection->pending_outgoing_key] = true;
$connection->pending_outgoing_key++;
}
}
$total_length = 0;
$count = 0;
\ksort($connection->pending_outgoing);
$skipped = false;
$inited = false;
$has_seq = false;
$has_state = false;
$has_resend = false;
$has_http_wait = false;
foreach ($connection->pending_outgoing as $k => $message) {
if ($message['unencrypted']) {
continue;
@ -201,15 +191,42 @@ class WriteLoop extends ResumableSignalLoop
$skipped = true;
continue;
}
if ($message['_'] === 'http_wait') {
$has_http_wait = true;
}
if ($message['_'] === 'msgs_state_req') {
if ($has_state) {
$API->logger->logger("Already have a state request queued for the current container in DC {$datacenter}");
continue;
}
$has_state = true;
}
if ($message['_'] === 'msg_resend_req') {
if ($has_resend) {
$API->logger->logger("Already have a resend request queued for the current container in DC {$datacenter}");
continue;
}
$has_resend = true;
}
$body_length = \strlen($message['serialized_body']);
$actual_length = $body_length + 32;
if ($total_length && $total_length + $actual_length > 32760 || $count >= 1020) {
$API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
break;
}
$message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->msgIdHandler->generateMessageId();
if (isset($message['seqno'])) {
$has_seq = true;
}
$message_id = $message['msg_id'] ?? $connection->msgIdHandler->generateMessageId();
$API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generateOutSeqNo($message['contentRelated'])];
$MTmessage = [
'_' => 'MTmessage',
'msg_id' => $message_id,
'body' => $message['serialized_body'],
'seqno' => $message['seqno'] ?? $connection->generateOutSeqNo($message['contentRelated'])
];
if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') {
if (!$shared->getTempAuthKey()->isInited() && $message['_'] !== 'auth.bindTempAuthKey' && !$inited) {
$inited = true;
@ -229,13 +246,14 @@ class WriteLoop extends ResumableSignalLoop
}
}
// TODO
/* if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) {
if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) {
$MTmessage['body'] = yield $API->getTL()->serializeObject(['type' => ''], ['_' => 'gzip_packed', 'packed_data' => $gzipped], 'gzipped data');
$API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
}
unset($gzipped);
}*/
/*
if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) {
if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) {
$MTmessage['body'] = yield $API->getTL()->serializeObject(['type' => ''], ['_' => 'gzip_packed', 'packed_data' => $gzipped], 'gzipped data');
$API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
}
unset($gzipped);
}*/
}
}
$body_length = \strlen($MTmessage['body']);
@ -249,20 +267,45 @@ class WriteLoop extends ResumableSignalLoop
$MTmessage['bytes'] = $body_length;
$messages[] = $MTmessage;
$keys[$k] = $message_id;
}
if ($shared->isHttp() && $skipped && $count === \count($temporary_keys)) {
foreach ($temporary_keys as $key => $true) {
$API->logger->logger("Removing temporary {$connection->pending_outgoing[$key]['_']} by {$key}", Logger::ULTRA_VERBOSE);
unset($connection->pending_outgoing[$key]);
$count--;
}
$connection->pending_outgoing[$k]['seqno'] = $MTmessage['seqno'];
$connection->pending_outgoing[$k]['msg_id'] = $MTmessage['msg_id'];
}
$MTmessage = null;
if ($count > 1) {
$acks = \array_slice($connection->ack_queue, 0, self::MAX_COUNT);
if ($ackCount = \count($acks)) {
$API->logger->logger("Adding msgs_ack", Logger::ERROR);
$body = yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msgs_ack', 'msg_ids' => $acks], 'msgs_ack');
$messages []= [
'_' => 'MTmessage',
'msg_id' => $connection->msgIdHandler->generateMessageId(),
'body' => $body,
'seqno' => $connection->generateOutSeqNo(false),
'bytes' => \strlen($body)
];
$count++;
unset($acks, $body);
}
if ($shared->isHttp() && !$has_http_wait) {
$API->logger->logger("Adding http_wait", Logger::ULTRA_VERBOSE);
$body = yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait');
$messages []= [
'_' => 'MTmessage',
'msg_id' => $connection->msgIdHandler->generateMessageId(),
'body' => $body,
'seqno' => $connection->generateOutSeqNo(true),
'bytes' => \strlen($body)
];
$count++;
unset($body);
}
if ($count > 1 || $has_seq) {
$API->logger->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_id = $connection->msgIdHandler->generateMessageId();
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => \array_values($keys), 'contentRelated' => false, 'method' => false, 'unencrypted' => false];
//var_dumP("container ".bin2hex($message_id));
$keys[$connection->pending_outgoing_key++] = $message_id;
$message_data = (yield from $API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'));
$message_data_length = \strlen($message_data);
@ -287,31 +330,29 @@ class WriteLoop extends ResumableSignalLoop
$message_key = \substr(\hash('sha256', \substr($shared->getTempAuthKey()->getAuthKey(), 88, 32).$plaintext.$padding, true), 8, 16);
list($aes_key, $aes_iv) = Crypt::aesCalculate($message_key, $shared->getTempAuthKey()->getAuthKey());
$message = $shared->getTempAuthKey()->getID().$message_key.Crypt::igeEncrypt($plaintext.$padding, $aes_key, $aes_iv);
$buffer = yield $connection->stream->getWriteBuffer($len = \strlen($message));
//$t = \microtime(true);
$buffer = yield $connection->stream->getWriteBuffer(\strlen($message));
yield $buffer->bufferWrite($message);
$connection->httpSent();
$API->logger->logger("Sent encrypted payload to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$sent = \time();
if ($to_ack) {
$connection->ack_queue = [];
if ($ackCount) {
$connection->ack_queue = \array_slice($connection->ack_queue, $ackCount);
}
foreach ($keys as $key => $message_id) {
$connection->outgoing_messages[$message_id] =& $connection->pending_outgoing[$key];
if (isset($connection->outgoing_messages[$message_id]['promise'])) {
$connection->new_outgoing[$message_id] = $message_id;
$connection->outgoing_messages[$message_id]['sent'] = $sent;
$connection->outgoing_messages[$message_id]['tries'] = 0;
}
if (isset($connection->outgoing_messages[$message_id]['send_promise'])) {
$connection->outgoing_messages[$message_id]['send_promise']->resolve(isset($connection->outgoing_messages[$message_id]['promise']) ? $connection->outgoing_messages[$message_id]['promise'] : true);
unset($connection->outgoing_messages[$message_id]['send_promise']);
}
//var_dumP("encrypted ".bin2hex($message_id)." ".$connection->outgoing_messages[$message_id]['_']);
unset($connection->pending_outgoing[$key]);
}
//if (!empty($connection->pending_outgoing)) $connection->select();
} while (!empty($connection->pending_outgoing) && !$skipped);
} while ($connection->pending_outgoing && !$skipped);
if (empty($connection->pending_outgoing)) {
$connection->pending_outgoing_key = 'a';
}

View File

@ -91,7 +91,7 @@ class MTProto extends AsyncConstruct implements TLCallback
*
* @var int
*/
const V = 141;
const V = 143;
/**
* String release version.
*
@ -378,6 +378,12 @@ class MTProto extends AsyncConstruct implements TLCallback
* @var PeriodicLoop
*/
private $serializeLoop;
/**
* RPC reporting loop.
*
* @var PeriodicLoop
*/
private $rpcLoop;
/**
* Feeder loops.
*
@ -702,11 +708,15 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!$this->configLoop) {
$this->configLoop = new PeriodicLoop($this, [$this, 'getConfig'], 'config', 24 * 3600);
}
if (!$this->rpcLoop) {
$this->rpcLoop = new PeriodicLoop($this, [$this, 'rpcReport'], 'config', 60);
}
$this->callCheckerLoop->start();
$this->serializeLoop->start();
$this->phoneConfigLoop->start();
$this->configLoop->start();
$this->checkTosLoop->start();
$this->rpcLoop->start();
}
/**
* Stop all internal loops.
@ -735,6 +745,33 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->checkTosLoop->signal(true);
$this->checkTosLoop = null;
}
if ($this->rpcLoop) {
$this->rpcLoop->signal(true);
$this->rpcLoop = null;
}
}
/**
* Report RPC errors.
*
* @internal
*
* @return \Generator
*/
public function rpcReport(): \Generator
{
$toReport = RPCErrorException::$toReport;
RPCErrorException::$toReport = [];
foreach ($toReport as [$method, $code, $error, $time]) {
try {
$res = \json_decode(yield from $this->fileGetContents('https://rpc.pwrtelegram.xyz/?method='.$method.'&code='.$code.'&error='.$error.'&t='.$time), true);
if (isset($res['ok']) && $res['ok'] && isset($res['result'])) {
$description = $res['result'];
RPCErrorException::$descriptions[$error] = $description;
RPCErrorException::$errorMethodMap[$code][$method][$error] = $error;
}
} catch (\Throwable $e) {
}
}
}
/**
* Clean up properties from previous versions of MadelineProto.
@ -909,7 +946,7 @@ class MTProto extends AsyncConstruct implements TLCallback
yield from $this->updateSettings($backtrace['args'][1], false);
}
}
if (($this->settings['tl_schema']['src']['botAPI'] ?? '') !== __DIR__.'/../../../schemas/TL_botAPI.tl') {
if (($this->settings['tl_schema']['src']['botAPI'] ?? '') !== __DIR__.'/TL_botAPI.tl') {
unset($this->v);
}
if (!\file_exists($this->settings['tl_schema']['src']['telegram'])) {
@ -944,8 +981,6 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->startLoops();
if (yield from $this->fullGetSelf()) {
$this->authorized = self::LOGGED_IN;
}
if ($this->authorized === self::LOGGED_IN) {
yield from $this->getCdnConfig($this->datacenter->curdc);
$this->setupLogger();
}
@ -1194,6 +1229,8 @@ class MTProto extends AsyncConstruct implements TLCallback
'ipv6' => Magic::$ipv6,
// decides whether to use ipv6, ipv6 attribute of API attribute of API class contains autodetected boolean
'timeout' => 2,
// RPC timeout
'drop_timeout' => 5*60,
// timeout for sockets
'proxy' => Magic::$altervista ? '\\HttpProxy' : '\\Socket',
// The proxy class to use
@ -1667,6 +1704,8 @@ class MTProto extends AsyncConstruct implements TLCallback
}
$this->config = empty($config) ? yield from $this->methodCallAsyncRead('help.getConfig', $config, $options ?: ['datacenter' => $this->settings['connection_settings']['default_dc']]) : $config;
yield from $this->parseConfig();
$this->logger->logger(Lang::$current_lang['config_updated'], Logger::NOTICE);
$this->logger->logger($this->config, Logger::NOTICE);
return $this->config;
}
/**
@ -1681,8 +1720,6 @@ class MTProto extends AsyncConstruct implements TLCallback
unset($this->config['dc_options']);
yield from $this->parseDcOptions($options);
}
$this->logger->logger(Lang::$current_lang['config_updated'], Logger::NOTICE);
$this->logger->logger($this->config, Logger::NOTICE);
}
/**
* Parse DC options from config.
@ -1705,7 +1742,6 @@ class MTProto extends AsyncConstruct implements TLCallback
}
$id .= $dc['media_only'] ? '_media' : '';
$ipv6 = $dc['ipv6'] ? 'ipv6' : 'ipv4';
//$id .= isset($this->settings['connection'][$test][$ipv6][$id]) && $this->settings['connection'][$test][$ipv6][$id]['ip_address'] != $dc['ip_address'] ? '_bk' : '';
if (\is_numeric($id)) {
$id = (int) $id;
}

View File

@ -19,11 +19,22 @@
namespace danog\MadelineProto\MTProtoSession;
use Amp\Deferred;
use Amp\Promise;
use danog\MadelineProto\Loop\Connection\WriteLoop;
/**
* Manages acknowledgement of messages.
*/
trait AckHandler
{
/**
* Acknowledge outgoing message ID
*
* @param string|int $message_id Message Id
*
* @return boolean
*/
public function ackOutgoingMessageId($message_id): bool
{
// The server acknowledges that it received my message
@ -31,16 +42,15 @@ trait AckHandler
$this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING);
return false;
}
//$this->logger->logger("Ack-ed ".$this->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter");
/*
if (isset($this->outgoing_messages[$message_id]['body'])) {
unset($this->outgoing_messages[$message_id]['body']);
}
if (isset($this->new_outgoing[$message_id])) {
unset($this->new_outgoing[$message_id]);
}*/
return true;
}
/**
* We have gotten response for outgoing message ID
*
* @param string|int $message_id Message ID
*
* @return boolean
*/
public function gotResponseForOutgoingMessageId($message_id): bool
{
// The server acknowledges that it received my message
@ -59,19 +69,23 @@ trait AckHandler
}
return true;
}
/**
* Acknowledge incoming message ID
*
* @param string|int $message_id Message ID
*
* @return boolean
*/
public function ackIncomingMessageId($message_id): bool
{
// I let the server know that I received its message
if (!isset($this->incoming_messages[$message_id])) {
$this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of incoming messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING);
}
/*if ($this->temp_auth_key['id'] === null || $this->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") {
// || (isset($this->incoming_messages[$message_id]['ack']) && $this->incoming_messages[$message_id]['ack'])) {
return;
}*/
$this->ack_queue[$message_id] = $message_id;
return true;
}
/**
* Check if there are some pending calls.
*
@ -103,6 +117,7 @@ trait AckHandler
public function getPendingCalls(): array
{
$settings = $this->shared->getSettings();
$dropTimeout = $settings['drop_timeout'];
$timeout = $settings['timeout'];
$pfs = $settings['pfs'];
$unencrypted = !$this->shared->hasTempAuthKey();
@ -118,6 +133,11 @@ trait AckHandler
unset($this->new_outgoing[$k], $this->outgoing_messages[$message_id]);
continue;
}
if ($this->outgoing_messages[$message_id]['sent'] + $dropTimeout < \time()) {
$this->gotResponseForOutgoingMessageId($message_id);
$this->handleReject($this->outgoing_messages[$message_id], new \danog\MadelineProto\Exception("Request timeout"));
continue;
}
$result[] = $message_id;
}
}

View File

@ -56,8 +56,8 @@ trait CallHandler
} else {
Tools::callFork($this->sendMessage($this->outgoing_messages[$message_id], false));
}
$this->ackOutgoingMessageId($message_id);
$this->gotResponseForOutgoingMessageId($message_id);
//$this->ackOutgoingMessageId($message_id);
//$this->gotResponseForOutgoingMessageId($message_id);
} else {
$this->logger->logger('Could not resend '.(isset($this->outgoing_messages[$message_id]['_']) ? $this->outgoing_messages[$message_id]['_'] : $message_id));
}

View File

@ -0,0 +1,65 @@
<?php
/**
* Reliable module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2020 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\MTProtoSession;
use Amp\Loop;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProto;
/**
* Manages responses.
*/
trait Reliable
{
/**
* Send state info for message IDs
*
* @param string|int $req_msg_id Message ID of msgs_state_req that initiated this
* @param array $msg_ids Message IDs to send info about
*
* @return \Generator
*/
public function sendMsgsStateInfo($req_msg_id, array $msg_ids): \Generator
{
$this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs');
$info = '';
foreach ($msg_ids as $msg_id) {
$cur_info = 0;
if (!isset($this->incoming_messages[$msg_id])) {
$msg_id = new \tgseclib\Math\BigInteger(\strrev($msg_id), 256);
if ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) {
$this->logger->logger("Do not know anything about {$msg_id} and it is too big");
$cur_info |= 3;
} elseif ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) {
$this->logger->logger("Do not know anything about {$msg_id} and it is too small");
$cur_info |= 1;
} else {
$this->logger->logger("Do not know anything about {$msg_id}");
$cur_info |= 2;
}
} else {
$this->logger->logger("Know about {$msg_id}");
$cur_info |= 4;
}
$info .= \chr($cur_info);
}
$this->outgoing_messages[yield from $this->objectCall('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id;
}
}

View File

@ -28,32 +28,6 @@ use danog\MadelineProto\MTProto;
*/
trait ResponseHandler
{
public function sendMsgsStateInfo($req_msg_id, $msg_ids): \Generator
{
$this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs');
$info = '';
foreach ($msg_ids as $msg_id) {
$cur_info = 0;
if (!isset($this->incoming_messages[$msg_id])) {
$msg_id = new \tgseclib\Math\BigInteger(\strrev($msg_id), 256);
if ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) {
$this->logger->logger("Do not know anything about {$msg_id} and it is too small");
$cur_info |= 3;
} elseif ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) {
$this->logger->logger("Do not know anything about {$msg_id} and it is too big");
$cur_info |= 1;
} else {
$this->logger->logger("Do not know anything about {$msg_id}");
$cur_info |= 2;
}
} else {
$this->logger->logger("Know about {$msg_id}");
$cur_info |= 4;
}
$info .= \chr($cur_info);
}
$this->outgoing_messages[yield from $this->objectCall('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id;
}
public $n = 0;
public function handleMessages()
{
@ -271,7 +245,6 @@ trait ResponseHandler
if ($this->pending_outgoing) {
$this->writer->resume();
}
//$this->n--;
return $only_updates;
}
/**

View File

@ -28,6 +28,7 @@ abstract class Session
use ResponseHandler;
use SeqNoHandler;
use CallHandler;
use Reliable;
/**
* Incoming message array.
*

View File

@ -96,7 +96,7 @@ trait Files
}
StatCacheAsync::clear($file);
$size = (yield statAsync($file))['size'];
if ($size > 512 * 1024 * 3000) {
if ($size > 512 * 1024 * 4000) {
throw new \danog\MadelineProto\Exception('Given file is too big!');
}
$stream = yield open($file, 'rb');
@ -127,7 +127,7 @@ trait Files
/** @var $response \Amp\Http\Client\Response */
$request = new Request($url);
$request->setTransferTimeout(10 * 1000 * 3600);
$request->setBodySizeLimit(512 * 1024 * 3000);
$request->setBodySizeLimit(512 * 1024 * 4000);
$response = yield $this->datacenter->getHTTPClient()->request($request);
if (200 !== ($status = $response->getStatus())) {
throw new Exception("Wrong status code: {$status} ".$response->getReason());
@ -269,7 +269,7 @@ trait Files
$datacenter .= '_media';
}
$part_size = $this->settings['upload']['part_size'];
$parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 3000;
$parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 4000;
$part_total_num = (int) \ceil($size / $part_size);
$part_num = 0;
$method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart';
@ -1234,7 +1234,7 @@ trait Files
$end = $messageMedia['size'];
}
$part_size = $part_size ?? $this->settings['download']['part_size'];
$parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000;
$parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 4000;
$datacenter = isset($messageMedia['InputFileLocation']['dc_id']) ? $messageMedia['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc'];
if ($this->datacenter->has($datacenter.'_media')) {
$datacenter .= '_media';
@ -1257,7 +1257,7 @@ trait Files
}
$params = [];
$start_at = $offset % $part_size;
$probable_end = $end !== -1 ? $end : 512 * 1024 * 3000;
$probable_end = $end !== -1 ? $end : 512 * 1024 * 4000;
$breakOut = false;
for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) {
$end_at = $part_size;

View File

@ -45,7 +45,7 @@ class GarbageCollector
\gc_collect_cycles();
static::$memoryConsumption = static::getMemoryConsumption();
$cleanedMemory = $currentMemory - static::$memoryConsumption;
Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::NOTICE);
Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
}
});
}
@ -53,7 +53,7 @@ class GarbageCollector
private static function getMemoryConsumption(): int
{
$memory = \round(\memory_get_usage()/1024/1024, 1);
Logger::log("Memory consumption: $memory Mb", Logger::VERBOSE);
Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
return (int) $memory;
}
}

View File

@ -29,6 +29,7 @@ class RPCErrorException extends \Exception
private $fetched = false;
public static $descriptions = ['RPC_MCGET_FAIL' => 'Telegram is having internal issues, please try again later.', 'RPC_CALL_FAIL' => 'Telegram is having internal issues, please try again later.', 'USER_PRIVACY_RESTRICTED' => "The user's privacy settings do not allow you to do this", 'CHANNEL_PRIVATE' => "You haven't joined this channel/supergroup", 'USER_IS_BOT' => "Bots can't send messages to other bots", 'BOT_METHOD_INVALID' => 'This method cannot be run by a bot', 'PHONE_CODE_EXPIRED' => 'The phone code you provided has expired, this may happen if it was sent to any chat on telegram (if the code is sent through a telegram chat (not the official account) to avoid it append or prepend to the code some chars)', 'USERNAME_INVALID' => 'The provided username is not valid', 'ACCESS_TOKEN_INVALID' => 'The provided token is not valid', 'ACTIVE_USER_REQUIRED' => 'The method is only available to already activated users', 'FIRSTNAME_INVALID' => 'The first name is invalid', 'LASTNAME_INVALID' => 'The last name is invalid', 'PHONE_NUMBER_INVALID' => 'The phone number is invalid', 'PHONE_CODE_HASH_EMPTY' => 'phone_code_hash is missing', 'PHONE_CODE_EMPTY' => 'phone_code is missing', 'API_ID_INVALID' => 'The api_id/api_hash combination is invalid', 'PHONE_NUMBER_OCCUPIED' => 'The phone number is already in use', 'PHONE_NUMBER_UNOCCUPIED' => 'The phone number is not yet being used', 'USERS_TOO_FEW' => 'Not enough users (to create a chat, for example)', 'USERS_TOO_MUCH' => 'The maximum number of users has been exceeded (to create a chat, for example)', 'TYPE_CONSTRUCTOR_INVALID' => 'The type constructor is invalid', 'FILE_PART_INVALID' => 'The file part number is invalid', 'FILE_PARTS_INVALID' => 'The number of file parts is invalid', 'MD5_CHECKSUM_INVALID' => 'The MD5 checksums do not match', 'PHOTO_INVALID_DIMENSIONS' => 'The photo dimensions are invalid', 'FIELD_NAME_INVALID' => 'The field with the name FIELD_NAME is invalid', 'FIELD_NAME_EMPTY' => 'The field with the name FIELD_NAME is missing', 'MSG_WAIT_FAILED' => 'A waiting call returned an error', 'USERNAME_NOT_OCCUPIED' => 'The provided username is not occupied', 'PHONE_NUMBER_BANNED' => 'The provided phone number is banned from telegram', 'AUTH_KEY_UNREGISTERED' => 'The authorization key has expired', 'INVITE_HASH_EXPIRED' => 'The invite link has expired', 'USER_DEACTIVATED' => 'The user was deactivated', 'USER_ALREADY_PARTICIPANT' => 'The user is already in the group', 'MESSAGE_ID_INVALID' => 'The provided message id is invalid', 'PEER_ID_INVALID' => 'The provided peer id is invalid', 'CHAT_ID_INVALID' => 'The provided chat id is invalid', 'MESSAGE_DELETE_FORBIDDEN' => "You can't delete one of the messages you tried to delete, most likely because it is a service message.", 'CHAT_ADMIN_REQUIRED' => 'You must be an admin in this chat to do this', -429 => 'Too many requests', 'PEER_FLOOD' => "You are spamreported, you can't do this"];
public static $errorMethodMap = [];
public static $toReport = [];
private $caller = '';
public static function localizeMessage($method, int $code, string $error)
{
@ -38,12 +39,12 @@ class RPCErrorException extends \Exception
$error = \preg_replace('/\\d+$/', "X", $error);
$description = self::$descriptions[$error] ?? '';
if (!isset(self::$errorMethodMap[$code][$method][$error]) || !isset(self::$descriptions[$error]) || $code === 500) {
$res = \json_decode(@\file_get_contents('https://rpc.pwrtelegram.xyz/?method='.$method.'&code='.$code.'&error='.$error, false, \stream_context_create(['http' => ['timeout' => 3]])), true);
if (isset($res['ok']) && $res['ok'] && isset($res['result'])) {
$description = $res['result'];
self::$descriptions[$error] = $description;
self::$errorMethodMap[$code][$method][$error] = $error;
if (\count(self::$toReport) > 100) {
self::$toReport = \array_slice(self::$toReport, -100);
}
self::$toReport []= [
$method, $code, $error, time()
];
}
if (!$description) {
return $error;