Parallelized uploads and downloads, upload by URL and rename files

This commit is contained in:
Daniil Gentili 2019-09-02 16:54:36 +02:00
parent d1bbc86d21
commit 72d353fff0
13 changed files with 137 additions and 58 deletions

View File

@ -44,7 +44,13 @@ class EventHandler extends \danog\MadelineProto\EventHandler
try {
yield $this->messages->sendMessage(['peer' => $update, 'message' => "<code>$res</code>", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); //'entities' => [['_' => 'messageEntityPre', 'offset' => 0, 'length' => strlen($res), 'language' => 'json']]]);
if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') {
yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]);
yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $media]);
/* '_' => 'inputMediaUploadedDocument',
'file' => $update,
'attributes' => [
['_' => 'documentAttributeFilename', 'file_name' => 'document.txt']
]
],]);*/
//yield $this->download_to_dir($update, '/tmp');
}
} catch (\danog\MadelineProto\RPCErrorException $e) {

View File

@ -497,7 +497,7 @@ class Connection extends Session
{
$this->API->logger->logger("Reconnecting DC {$this->datacenterId}");
$this->disconnect();
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc());
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc(), $this->id);
if ($this->API->hasAllAuth() && !$this->hasPendingCalls()) {
/*$this->callFork((function () {
try {

View File

@ -137,8 +137,8 @@ class DataCenter
$array[$id]['tempAuthKey'] = $socket->temp_auth_key;
}
if ($socket->auth_key) {
$array[$id]['authKey'] = $socket->auth_key;
$array[$id]['authKey']['authorized'] = $socket->authorized;
$array[$id]['permAuthKey'] = $socket->auth_key;
$array[$id]['permAuthKey']['authorized'] = $socket->authorized;
}
}
}
@ -156,15 +156,15 @@ class DataCenter
{
foreach ($saved as $id => $data) {
$connection = $this->sockets[$id] = new DataCenterConnection;
if (isset($data['authKey'])) {
$connection->setPermAuthKey(new PermAuthKey($data['authKey']));
if (isset($data['permAuthKey'])) {
$connection->setPermAuthKey(new PermAuthKey($data['permAuthKey']));
}
if (isset($data['linked'])) {
continue;
}
if (isset($data['tempAuthKey'])) {
$connection->setTempAuthKey(new TempAuthKey($data['tempAuthKey']));
if ($data['tempAuthKey']['bound'] ?? false && $connection->hasPermAuthKey()) {
if (($data['tempAuthKey']['bound'] ?? false) && $connection->hasPermAuthKey()) {
$connection->bind();
}
}
@ -175,7 +175,7 @@ class DataCenter
$connection->link($data['linked']);
if (isset($data['tempAuthKey'])) {
$connection->setTempAuthKey(new TempAuthKey($data['tempAuthKey']));
if ($data['tempAuthKey']['bound'] ?? false && $connection->hasPermAuthKey()) {
if (($data['tempAuthKey']['bound'] ?? false) && $connection->hasPermAuthKey()) {
$connection->bind();
}
}
@ -187,7 +187,7 @@ class DataCenter
$this->dclist = $dclist;
$this->settings = $settings;
foreach ($this->sockets as $key => $socket) {
if ($socket instanceof Connection && !\strpos($key, '_bk')) {
if ($socket instanceof DataCenterConnection && !\strpos($key, '_bk')) {
$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE);
$socket->old = true;
$socket->setExtra($this->API);
@ -490,7 +490,7 @@ class DataCenter
throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
}
public function dcConnectAsync($dc_number): \Generator
public function dcConnectAsync(string $dc_number, int $id = -1): \Generator
{
if (isset($this->sockets[$dc_number]) && !isset($this->sockets[$dc_number]->old)) {
return false;
@ -503,7 +503,7 @@ class DataCenter
try {
if (isset($this->sockets[$dc_number]->old)) {
$this->sockets[$dc_number]->setExtra($this->API);
yield $this->sockets[$dc_number]->connect($ctx);
yield $this->sockets[$dc_number]->connect($ctx, $id);
} else {
$this->sockets[$dc_number] = new DataCenterConnection();
$this->sockets[$dc_number]->setExtra($this->API);

View File

@ -29,6 +29,10 @@ use JsonSerializable;
class DataCenterConnection implements JsonSerializable
{
const READ_WEIGHT = 1;
const READ_WEIGHT_MEDIA = 5;
const WRITE_WEIGHT = 10;
/**
* Temporary auth key.
*
@ -40,7 +44,7 @@ class DataCenterConnection implements JsonSerializable
*
* @var PermAuthKey|null
*/
private $authKey;
private $permAuthKey;
/**
* Connections open to a certain DC.
@ -112,7 +116,7 @@ class DataCenterConnection implements JsonSerializable
*/
public function getAuthKey(bool $temp = true): AuthKey
{
return $this->{$temp ? 'tempAuthKey' : 'authKey'};
return $this->{$temp ? 'tempAuthKey' : 'permAuthKey'};
}
/**
* Check if auth key is present.
@ -123,7 +127,7 @@ class DataCenterConnection implements JsonSerializable
*/
public function hasAuthKey(bool $temp = true): bool
{
return $this->{$temp ? 'tempAuthKey' : 'authKey'} !== null && $this->{$temp ? 'tempAuthKey' : 'authKey'}->hasAuthKey();
return $this->{$temp ? 'tempAuthKey' : 'permAuthKey'} !== null && $this->{$temp ? 'tempAuthKey' : 'permAuthKey'}->hasAuthKey();
}
/**
* Set auth key.
@ -135,7 +139,7 @@ class DataCenterConnection implements JsonSerializable
*/
public function setAuthKey(?AuthKey $key, bool $temp = true)
{
$this->{$temp ? 'tempAuthKey' : 'authKey'} = $key;
$this->{$temp ? 'tempAuthKey' : 'permAuthKey'} = $key;
}
/**
@ -209,7 +213,7 @@ class DataCenterConnection implements JsonSerializable
*/
public function bind(bool $pfs = true)
{
$this->tempAuthKey->bind($this->authKey, $pfs);
$this->tempAuthKey->bind($this->permAuthKey, $pfs);
}
/**
* Check if we are logged in.
@ -232,7 +236,7 @@ class DataCenterConnection implements JsonSerializable
{
if ($authorized) {
$this->getTempAuthKey()->authorized($authorized);
} else if ($this->hasTempAuthKey()) {
} elseif ($this->hasTempAuthKey()) {
$this->getTempAuthKey()->authorized($authorized);
}
}
@ -247,7 +251,7 @@ class DataCenterConnection implements JsonSerializable
public function link(string $dc)
{
$this->linked = $dc;
$this->authKey = &$this->API->datacenter->getDataCenterConnection($dc)->authKey;
$this->permAuthKey = &$this->API->datacenter->getDataCenterConnection($dc)->permAuthKey;
}
/**
@ -261,6 +265,17 @@ class DataCenterConnection implements JsonSerializable
$socket->resetSession();
}
}
/**
* Create MTProto sessions if needed
*
* @return void
*/
public function createSession()
{
foreach ($this->connections as $socket) {
$socket->createSession();
}
}
/**
* Flush all pending packets.
*
@ -287,31 +302,52 @@ class DataCenterConnection implements JsonSerializable
* Connect function.
*
* @param ConnectionContext $ctx Connection context
* @param int $id Optional connection ID to reconnect
*
* @return \Generator
*/
public function connect(ConnectionContext $ctx): \Generator
public function connect(ConnectionContext $ctx, int $id = -1): \Generator
{
$this->API->logger->logger("Trying shared connection via $ctx", \danog\MadelineProto\Logger::WARNING);
$this->ctx = $ctx->getCtx();
$this->datacenter = $ctx->getDc();
$media = $ctx->isMedia();
$media = $ctx->isMedia() || $ctx->isCDN();
$count = $media ? $this->API->settings['connection_settings']['media_socket_count'] : 1;
$count = $media ? $this->API->settings['connection_settings']['media_socket_count']['min'] : 1;
if ($count > 1) {
if (!$this->robinLoop) {
$this->robinLoop = new PeriodicLoop($this->API, [$this, 'even'], "Robin loop DC {$this->datacenter}", 10);
$this->robinLoop = new PeriodicLoop($this->API, [$this, 'even'], "robin loop DC {$this->datacenter}", $this->API->settings['connection_settings']['robin_period']);
}
$this->robinLoop->start();
}
$incRead = $media ? 5 : 1;
$this->decRead = $media ? self::READ_WEIGHT_MEDIA : self::READ_WEIGHT;
$this->decWrite = self::WRITE_WEIGHT;
$this->connections = [];
$this->availableConnections = [];
for ($x = 0; $x < $count; $x++) {
if ($id === -1 || !isset($this->connections[$id])) {
$this->connections = [];
$this->availableConnections = [];
yield $this->connectMore($count);
} else {
yield $this->connections[$id]->connect($ctx);
}
}
/**
* Connect to the DC using count more sockets
*
* @param integer $count Number of sockets to open
*
* @return void
*/
private function connectMore(int $count)
{
$ctx = $this->ctx->getCtx();
$count += $previousCount = count($this->connections);
for ($x = $previousCount; $x < $count; $x++) {
$this->availableConnections[$x] = 0;
$this->connections[$x] = new Connection();
$this->connections[$x]->setExtra($this, $x);
@ -371,8 +407,9 @@ class DataCenterConnection implements JsonSerializable
if (\count($this->availableConnections) <= 1) {
return $this->connections[0];
}
\max($this->availableConnections);
$key = \key($this->availableConnections);
$max = \max($this->availableConnections);
$key = array_search($max, $this->availableConnections);
// Decrease to implement round robin
$this->availableConnections[$key]--;
return $this->connections[$key];
@ -385,9 +422,14 @@ class DataCenterConnection implements JsonSerializable
*/
public function even()
{
if (\min($this->availableConnections) < 1000) {
foreach ($this->availableConnections as &$value) {
$value += 1000;
if (\min($this->availableConnections) < 100) {
$max = $this->isMedia() || $this->isCDN() ? $this->API->settings['connection_settings']['media_socket_count']['max'] : 1;
if (\count($this->availableConnections) < $max) {
$this->connectMore(2);
} else {
foreach ($this->availableConnections as &$value) {
$value += 1000;
}
}
}
}
@ -451,7 +493,7 @@ class DataCenterConnection implements JsonSerializable
}
/**
* Check if is a media connection
* Check if is a media connection.
*
* @return boolean
*/
@ -461,7 +503,7 @@ class DataCenterConnection implements JsonSerializable
}
/**
* Check if is a CDN connection
* Check if is a CDN connection.
*
* @return boolean
*/
@ -471,7 +513,7 @@ class DataCenterConnection implements JsonSerializable
}
/**
* Get DC-specific settings
* Get DC-specific settings.
*
* @return array
*/
@ -494,7 +536,7 @@ class DataCenterConnection implements JsonSerializable
'tempAuthKey' => $this->tempAuthKey
] :
[
'authKey' => $this->authKey,
'permAuthKey' => $this->permAuthKey,
'tempAuthKey' => $this->tempAuthKey
];
}
@ -507,6 +549,6 @@ class DataCenterConnection implements JsonSerializable
*/
public function __sleep()
{
return $this->linked ? ['linked', 'tempAuthKey'] : ['linked', 'authKey', 'tempAuthKey'];
return $this->linked ? ['linked', 'tempAuthKey'] : ['permAuthKey', 'tempAuthKey'];
}
}

View File

@ -131,7 +131,7 @@ class CheckLoop extends ResumableSignalLoop
if ($reply) {
$this->callFork($connection->object_call_async('msg_resend_ans_req', ['msg_ids' => $reply], ['postpone' => true]));
}
$connection->writer->resume();
$connection->flush();
}
);
$list = '';
@ -152,7 +152,7 @@ class CheckLoop extends ResumableSignalLoop
$connection->method_recall('', ['message_id' => $message_id, 'postpone' => true]);
}
}
$connection->writer->resume();
$connection->flush();
}
if (yield $this->waitSignal($this->pause($timeout))) {
return;

View File

@ -91,7 +91,7 @@ class ReadLoop extends SignalLoop
$this->exitedLoop();
if ($error === -404) {
if ($shared->getTemp) {
if ($shared->hasTempAuthKey()) {
$API->logger->logger("WARNING: Resetting auth key in DC {$datacenter}...", \danog\MadelineProto\Logger::WARNING);
$shared->setTempAuthKey(null);
$connection->session_id = null;
@ -104,11 +104,14 @@ class ReadLoop extends SignalLoop
yield $connection->reconnect();
}
} elseif ($error === -1) {
yield $connection->reconnect();
$API->logger->logger("WARNING: Got quick ack from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING);
} elseif ($error === 0) {
yield $connection->reconnect();
} elseif ($error === 0) {
$API->logger->logger("Got NOOP from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING);
yield $connection->reconnect();
} else if ($error === -429) {
$API->logger->logger("Got -429 from DC {$datacenter}", \danog\MadelineProto\Logger::WARNING);
Loop::delay(1*1000, [$connection, 'reconnect']);
} else {
yield $connection->reconnect();

View File

@ -202,7 +202,7 @@ class WriteLoop extends ResumableSignalLoop
unset($connection->pending_outgoing[$k]);
continue;
}
if ($shared->getSettings()['pfs'] && !$shared->getTempAuthKey()->isBound() && !$shared->getCtx()->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
if ($shared->getSettings()['pfs'] && !$shared->getTempAuthKey()->isBound() && !$connection->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
$API->logger->logger("Skipping {$message['_']} due to unbound keys in DC {$datacenter}");
$skipped = true;
continue;
@ -232,8 +232,8 @@ class WriteLoop extends ResumableSignalLoop
[
'api_id' => $API->settings['app_info']['api_id'],
'api_hash' => $API->settings['app_info']['api_hash'],
'device_model' => !$connection->getCtx()->isCDN() ? $API->settings['app_info']['device_model'] : 'n/a',
'system_version' => !$connection->getCtx()->isCDN() ? $API->settings['app_info']['system_version'] : 'n/a',
'device_model' => !$connection->isCDN() ? $API->settings['app_info']['device_model'] : 'n/a',
'system_version' => !$connection->isCDN() ? $API->settings['app_info']['system_version'] : 'n/a',
'app_version' => $API->settings['app_info']['app_version'],
'system_lang_code' => $API->settings['app_info']['lang_code'],
'lang_code' => $API->settings['app_info']['lang_code'],

View File

@ -32,6 +32,11 @@ class FeedLoop extends ResumableSignalLoop
private $incomingUpdates = [];
private $parsedUpdates = [];
private $channelId;
/**
* Update loop
*
* @var UpdateLoop
*/
private $updater;
public function __construct($API, $channelId = false)

View File

@ -66,6 +66,14 @@ class MTProto extends AsyncConstruct implements TLCallback
use \danog\MadelineProto\Wrappers\Templates;
use \danog\MadelineProto\Wrappers\TOS;
/**
* Old internal version of MadelineProto.
*
* DO NOT REMOVE THIS COMMENTED OUT CONSTANT
*
* @var int
*/
/*
const V = 71;
*/
@ -76,7 +84,7 @@ class MTProto extends AsyncConstruct implements TLCallback
*
* @var int
*/
const V = 129;
const V = 130;
/**
* String release version.
*
@ -977,7 +985,11 @@ class MTProto extends AsyncConstruct implements TLCallback
'transport' => 'tcp',
'pfs' => \extension_loaded('gmp'),
],
'media_socket_count' => 5,
'media_socket_count' => [
'min' => 5,
'max' => 10
],
'robin_period' => 10,
'default_dc' => 2,
], 'app_info' => [
// obtained in https://my.telegram.org

View File

@ -52,7 +52,7 @@ trait CallHandler
foreach ($message_ids as $message_id) {
if (isset($this->outgoing_messages[$message_id]['body'])) {
if ($datacenter) {
$res = $this->API->datacenter->getDataCenterConnection($datacenter)->sendMessage($this->outgoing_messages[$message_id], false);
$res = $this->API->datacenter->getConnection($datacenter)->sendMessage($this->outgoing_messages[$message_id], false);
} else {
$res = $this->sendMessage($this->outgoing_messages[$message_id], false);
}
@ -137,7 +137,7 @@ trait CallHandler
) {
return yield $this->API->datacenter->getConnection($args['id']['dc_id'])->method_call_async_write_generator($method, $args, $aargs);
}
if ($aargs['file'] ?? false && !$this->getCtx()->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) {
if (($aargs['file'] ?? false) && !$this->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) {
$this->logger->logger('Using media DC');
return yield $this->API->datacenter->getConnection($this->datacenter.'_media')->method_call_async_write_generator($method, $args, $aargs);
}
@ -218,7 +218,7 @@ trait CallHandler
*
* @return Promise
*/
public function object_call_async(string $object, $args = [], array $aargs = ['msg_id' => null]): Promise
public function object_call_async(string $object, $args = [], array $aargs = ['msg_id' => null]): \Generator
{
$message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => !$this->shared->hasTempAuthKey(), 'method' => false];
if (isset($aargs['promise'])) {

View File

@ -396,7 +396,7 @@ trait ResponseHandler
if (isset($request['user_related']) && $request['user_related']) {
$this->settings['connection_settings']['default_dc'] = $this->API->authorized_dc = $this->API->datacenter->curdc;
}
Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
//$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;

View File

@ -55,4 +55,17 @@ abstract class Session
$this->max_incoming_id = null;
$this->max_outgoing_id = null;
}
/**
* Create MTProto session if needed
*
* @return void
*/
public function createSession()
{
if ($this->session_id === null) {
$this->session_id = $this->random(8);
$this->session_in_seq_no = 0;
$this->session_out_seq_no = 0;
}
}
}

View File

@ -20,10 +20,10 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Request;
use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\MTProto\AuthKey;
use danog\MadelineProto\MTProto\PermAuthKey;
use danog\MadelineProto\MTProto\TempAuthKey;
use danog\MadelineProto\DataCenterConnection;
use phpseclib\Math\BigInteger;
/**
@ -706,13 +706,11 @@ trait AuthKeyHandler
$connection = $socket->getAuthConnection();
try {
if ($connection->session_id === null) {
$connection->session_id = $this->random(8);
$connection->session_in_seq_no = 0;
$connection->session_out_seq_no = 0;
}
$socket->createSession();
$cdn = $socket->isCDN();
$media = $socket->isMedia();
if (!$socket->hasTempAuthKey() || !$socket->hasPermAuthKey()) {
if (!$socket->hasPermAuthKey() && !$cdn && !$media) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_perm_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);
@ -733,7 +731,7 @@ trait AuthKeyHandler
$socket->setTempAuthKey(yield $this->create_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id));
yield $this->bind_temp_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id);
$config = yield $this->method_call_async_read('help.getConfig', [], ['datacenter' => $id]);
$config = yield $connection->method_call_async_read('help.getConfig', []);
yield $this->sync_authorization_async($id);
yield $this->get_config_async($config);
@ -744,7 +742,7 @@ trait AuthKeyHandler
} else {
if (!$cdn) {
$socket->bind(false);
$config = yield $this->method_call_async_read('help.getConfig', [], ['datacenter' => $id]);
$config = yield $connection->method_call_async_read('help.getConfig', []);
yield $this->sync_authorization_async($id);
yield $this->get_config_async($config);
} elseif (!$socket->hasTempAuthKey()) {