Almost finished refactoring
This commit is contained in:
parent
a1d139d526
commit
40c0976c5a
0
serializeLoop
Normal file
0
serializeLoop
Normal file
@ -20,6 +20,7 @@ namespace danog\MadelineProto;
|
||||
|
||||
use Amp\ByteStream\ClosedException;
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Loop\Connection\CheckLoop;
|
||||
use danog\MadelineProto\Loop\Connection\HttpWaitLoop;
|
||||
use danog\MadelineProto\Loop\Connection\ReadLoop;
|
||||
@ -42,33 +43,57 @@ class Connection extends Session
|
||||
const PENDING_MAX = 2000000000;
|
||||
|
||||
/**
|
||||
* The actual socket
|
||||
* Writer loop.
|
||||
*
|
||||
* @var \danog\MadelineProto\Loop\Connection\WriteLoop
|
||||
*/
|
||||
protected $writer;
|
||||
/**
|
||||
* Reader loop.
|
||||
*
|
||||
* @var \danog\MadelineProto\Loop\Connection\ReadLoop
|
||||
*/
|
||||
protected $reader;
|
||||
/**
|
||||
* Checker loop.
|
||||
*
|
||||
* @var \danog\MadelineProto\Loop\Connection\CheckLoop
|
||||
*/
|
||||
protected $checker;
|
||||
/**
|
||||
* Waiter loop.
|
||||
*
|
||||
* @var \danog\MadelineProto\Loop\Connection\HttpWaitLoop
|
||||
*/
|
||||
protected $waiter;
|
||||
/**
|
||||
* The actual socket.
|
||||
*
|
||||
* @var Stream
|
||||
*/
|
||||
private $stream;
|
||||
/**
|
||||
* Connection context
|
||||
* Connection context.
|
||||
*
|
||||
* @var Connection context
|
||||
* @var ConnectionContext
|
||||
*/
|
||||
private $ctx;
|
||||
|
||||
/**
|
||||
* HTTP request count
|
||||
* HTTP request count.
|
||||
*
|
||||
* @var integer
|
||||
*/
|
||||
private $httpReqCount = 0;
|
||||
/**
|
||||
* HTTP response count
|
||||
* HTTP response count.
|
||||
*
|
||||
* @var integer
|
||||
*/
|
||||
private $httpResCount = 0;
|
||||
|
||||
/**
|
||||
* Date of last chunk received
|
||||
* Date of last chunk received.
|
||||
*
|
||||
* @var integer
|
||||
*/
|
||||
@ -86,9 +111,15 @@ class Connection extends Session
|
||||
* @var MTProto
|
||||
*/
|
||||
protected $API;
|
||||
/**
|
||||
* Shared connection instance.
|
||||
*
|
||||
* @var DataCenterConnection
|
||||
*/
|
||||
protected $shared;
|
||||
|
||||
/**
|
||||
* DC ID
|
||||
* DC ID.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
@ -107,6 +138,19 @@ class Connection extends Session
|
||||
*/
|
||||
private $writing = false;
|
||||
|
||||
/**
|
||||
* Writing callback.
|
||||
*
|
||||
* @var callable
|
||||
*/
|
||||
private $writingCallback;
|
||||
/**
|
||||
* Reading callback.
|
||||
*
|
||||
* @var callable
|
||||
*/
|
||||
private $readingCallback;
|
||||
|
||||
/**
|
||||
* Check if the socket is writing stuff.
|
||||
*
|
||||
@ -126,7 +170,7 @@ class Connection extends Session
|
||||
return $this->reading;
|
||||
}
|
||||
/**
|
||||
* Set writing boolean
|
||||
* Set writing boolean.
|
||||
*
|
||||
* @param boolean $writing
|
||||
*
|
||||
@ -135,9 +179,10 @@ class Connection extends Session
|
||||
public function writing(bool $writing)
|
||||
{
|
||||
$this->writing = $writing;
|
||||
($this->writingCallback)($writing);
|
||||
}
|
||||
/**
|
||||
* Set reading boolean
|
||||
* Set reading boolean.
|
||||
*
|
||||
* @param boolean $reading
|
||||
*
|
||||
@ -146,10 +191,11 @@ class Connection extends Session
|
||||
public function reading(bool $reading)
|
||||
{
|
||||
$this->reading = $reading;
|
||||
($this->readingCallback)($writing);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell the class that we have read a chunk of data from the socket
|
||||
* Tell the class that we have read a chunk of data from the socket.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
@ -168,7 +214,7 @@ class Connection extends Session
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicate a received HTTP response
|
||||
* Indicate a received HTTP response.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
@ -177,7 +223,7 @@ class Connection extends Session
|
||||
$this->httpResCount++;
|
||||
}
|
||||
/**
|
||||
* Count received HTTP responses
|
||||
* Count received HTTP responses.
|
||||
*
|
||||
* @return integer
|
||||
*/
|
||||
@ -186,7 +232,7 @@ class Connection extends Session
|
||||
return $this->httpResCount;
|
||||
}
|
||||
/**
|
||||
* Indicate a sent HTTP request
|
||||
* Indicate a sent HTTP request.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
@ -195,7 +241,7 @@ class Connection extends Session
|
||||
$this->httpReqCount++;
|
||||
}
|
||||
/**
|
||||
* Count sent HTTP requests
|
||||
* Count sent HTTP requests.
|
||||
*
|
||||
* @return integer
|
||||
*/
|
||||
@ -206,7 +252,7 @@ class Connection extends Session
|
||||
|
||||
|
||||
/**
|
||||
* Get connection context
|
||||
* Get connection context.
|
||||
*
|
||||
* @return ConnectionContext
|
||||
*/
|
||||
@ -271,7 +317,41 @@ class Connection extends Session
|
||||
$this->waiter->start();
|
||||
}
|
||||
|
||||
public function sendMessage($message, $flush = true)
|
||||
/**
|
||||
* Send an MTProto message.
|
||||
*
|
||||
* Structure of message array:
|
||||
* [
|
||||
* // only in outgoing messages
|
||||
* 'body' => deserialized body, (optional if container)
|
||||
* 'serialized_body' => 'serialized body', (optional if container)
|
||||
* 'content_related' => bool,
|
||||
* '_' => 'predicate',
|
||||
* 'promise' => deferred promise that gets resolved when a response to the message is received (optional),
|
||||
* 'send_promise' => deferred promise that gets resolved when the message is sent (optional),
|
||||
* 'file' => bool (optional),
|
||||
* 'type' => 'type' (optional),
|
||||
* 'queue' => queue ID (optional),
|
||||
* 'container' => [message ids] (optional),
|
||||
*
|
||||
* // only in incoming messages
|
||||
* 'content' => deserialized body,
|
||||
* 'seq_no' => number (optional),
|
||||
* 'from_container' => bool (optional),
|
||||
*
|
||||
* // can be present in both
|
||||
* 'response' => message id (optional),
|
||||
* 'msg_id' => message id (optional),
|
||||
* 'sent' => timestamp,
|
||||
* 'tries' => number
|
||||
* ]
|
||||
*
|
||||
* @param array $message The message to send
|
||||
* @param boolean $flush Whether to flush the message right away
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function sendMessage(array $message, bool $flush = true): Promise
|
||||
{
|
||||
$deferred = new Deferred();
|
||||
|
||||
@ -314,14 +394,17 @@ class Connection extends Session
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function setExtra(MTProto $extra)
|
||||
public function setExtra(DataCenterConnection $extra, $readingCallback, $writingCallback)
|
||||
{
|
||||
$this->API = $extra;
|
||||
$this->logger = $extra->logger;
|
||||
$this->shared = $extra;
|
||||
$this->readingCallback = $readingCallback;
|
||||
$this->writingCallback = $writingCallback;
|
||||
$this->API = $extra->getExtra();
|
||||
$this->logger = $this->API->logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get main instance
|
||||
* Get main instance.
|
||||
*
|
||||
* @return MTProto
|
||||
*/
|
||||
@ -329,6 +412,12 @@ class Connection extends Session
|
||||
{
|
||||
return $this->API;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from DC
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect()
|
||||
{
|
||||
$this->API->logger->logger("Disconnecting from DC {$this->datacenter}");
|
||||
@ -348,6 +437,11 @@ class Connection extends Session
|
||||
$this->API->logger->logger("Disconnected from DC {$this->datacenter}");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect to DC
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function reconnect(): \Generator
|
||||
{
|
||||
$this->API->logger->logger("Reconnecting DC {$this->datacenter}");
|
||||
@ -364,59 +458,11 @@ class Connection extends Session
|
||||
}
|
||||
}
|
||||
|
||||
public function hasPendingCalls()
|
||||
{
|
||||
$API = $this->API;
|
||||
$datacenter = $this->datacenter;
|
||||
|
||||
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
|
||||
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
|
||||
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
|
||||
|
||||
foreach ($this->new_outgoing as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
|
||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
||||
) {
|
||||
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public function getPendingCalls()
|
||||
{
|
||||
$API = $this->API;
|
||||
$datacenter = $this->datacenter;
|
||||
|
||||
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
|
||||
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
|
||||
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
|
||||
|
||||
$result = [];
|
||||
foreach ($this->new_outgoing as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
|
||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
||||
) {
|
||||
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$result[] = $message_id;
|
||||
}
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get name
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getName(): string
|
||||
{
|
||||
return __CLASS__;
|
||||
|
@ -740,6 +740,27 @@ class DataCenter
|
||||
return yield (yield $this->getHTTPClient()->request($url))->getBody();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Connection instance
|
||||
*
|
||||
* @param string $dc
|
||||
* @return Connection
|
||||
*/
|
||||
public function getConnection(string $dc): Connection
|
||||
{
|
||||
return $this->sockets[$dc]->getConnection();
|
||||
}
|
||||
/**
|
||||
* Check if a DC is present
|
||||
*
|
||||
* @param string $dc DC ID
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public function has(string $dc): bool
|
||||
{
|
||||
return isset($this->sockets[$dc]);
|
||||
}
|
||||
public function get_dcs($all = true)
|
||||
{
|
||||
$test = $this->settings['all']['test_mode'] ? 'test' : 'main';
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
namespace danog\MadelineProto;
|
||||
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Stream\ConnectionContext;
|
||||
|
||||
class DataCenterConnection
|
||||
@ -43,33 +44,53 @@ class DataCenterConnection
|
||||
private $authorized = false;
|
||||
|
||||
/**
|
||||
* Connections open to a certain DC
|
||||
* Connections open to a certain DC.
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $connections = [];
|
||||
/**
|
||||
* Connection weights
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $availableConnections = [];
|
||||
|
||||
/**
|
||||
* Main API instance
|
||||
* Main API instance.
|
||||
*
|
||||
* @var \danog\MadelineProto\MTProto
|
||||
*/
|
||||
private $API;
|
||||
|
||||
/**
|
||||
* Connection context
|
||||
* Connection context.
|
||||
*
|
||||
* @var ConnectionContext
|
||||
*/
|
||||
private $ctx;
|
||||
|
||||
/**
|
||||
* DC ID
|
||||
* DC ID.
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $datacenter;
|
||||
|
||||
/**
|
||||
* Index for round robin.
|
||||
*
|
||||
* @var integer
|
||||
*/
|
||||
private $index = 0;
|
||||
|
||||
/**
|
||||
* Loop to keep weights at sane value
|
||||
*
|
||||
* @var \danog\MadelineProto\Loop\Generic\PeriodicLoop
|
||||
*/
|
||||
private $robinLoop;
|
||||
|
||||
/**
|
||||
* Get auth key.
|
||||
*
|
||||
@ -127,7 +148,7 @@ class DataCenterConnection
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection context
|
||||
* Get connection context.
|
||||
*
|
||||
* @return ConnectionContext
|
||||
*/
|
||||
@ -145,50 +166,126 @@ class DataCenterConnection
|
||||
*/
|
||||
public function connect(ConnectionContext $ctx): \Generator
|
||||
{
|
||||
$this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING);
|
||||
$this->API->logger->logger("Trying shared connection via $ctx", \danog\MadelineProto\Logger::WARNING);
|
||||
|
||||
$this->ctx = $ctx->getCtx();
|
||||
$this->datacenter = $ctx->getDc();
|
||||
$media = $ctx->isMedia();
|
||||
|
||||
|
||||
$count = $media ? $this->API->settings['connection_settings']['media_socket_count'] : 1;
|
||||
|
||||
if ($count > 1) {
|
||||
if (!$this->robinLoop) {
|
||||
$this->robinLoop = new PeriodicLoop($this, [$this, 'even'], "Robin loop DC {$this->datacenter}", 10);
|
||||
}
|
||||
$this->robinLoop->start();
|
||||
}
|
||||
|
||||
$incRead = $media ? 5 : 1;
|
||||
|
||||
$this->connections = [];
|
||||
$this->availableConnections = [];
|
||||
for ($x = 0; $x < $count; $x++) {
|
||||
$this->availableConnections[$x] = 0;
|
||||
$this->connections[$x] = new Connection();
|
||||
$this->connections[$x]->setExtra(
|
||||
$this,
|
||||
function (bool $reading) use ($x, $incRead) {
|
||||
$this->availableConnections[$x] += $reading ? -$incRead : $incRead;
|
||||
},
|
||||
function (bool $writing) use ($x) {
|
||||
$this->availableConnections[$x] += $writing ? -10 : 10;
|
||||
}
|
||||
);
|
||||
yield $this->connections[$x]->connect(yield $ctx->getStream());
|
||||
$ctx = $this->ctx->getCtx();
|
||||
}
|
||||
}
|
||||
|
||||
public function sendMessage($message, $flush = true)
|
||||
{
|
||||
}
|
||||
|
||||
public function setExtra(API $API)
|
||||
{
|
||||
$this->API = $API;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections to DC
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect()
|
||||
{
|
||||
$this->API->logger->logger("Disconnecting from DC {$this->datacenter}");
|
||||
$this->API->logger->logger("Disconnecting from shared DC {$this->datacenter}");
|
||||
if ($this->robinLoop) {
|
||||
$this->robinLoop->signal(true);
|
||||
$this->robinLoop = null;
|
||||
}
|
||||
foreach ($this->connections as $connection) {
|
||||
$connection->disconnect();
|
||||
}
|
||||
$this->connections = [];
|
||||
$this->availableConnections = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect to DC
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function reconnect(): \Generator
|
||||
{
|
||||
$this->API->logger->logger("Reconnecting DC {$this->datacenter}");
|
||||
foreach ($this->connections as $connection) {
|
||||
yield $connection->reconnect();
|
||||
}
|
||||
$this->API->logger->logger("Reconnecting shared DC {$this->datacenter}");
|
||||
$this->disconnect();
|
||||
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc());
|
||||
yield $this->connect($this->ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get best socket in round robin.
|
||||
*
|
||||
* @return Connection
|
||||
*/
|
||||
public function getConnection(): Connection
|
||||
{
|
||||
if (count($this->availableConnections) === 1) {
|
||||
return $this->connections[0];
|
||||
}
|
||||
max($this->availableConnections);
|
||||
$key = key($this->availableConnections);
|
||||
// Decrease to implement round robin
|
||||
$this->availableConnections[$key]--;
|
||||
return $this->connections[$key];
|
||||
}
|
||||
|
||||
/**
|
||||
* Even out round robin values
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function even()
|
||||
{
|
||||
if (\min($this->availableConnections) < 1000) {
|
||||
foreach ($this->availableConnections as &$value) {
|
||||
$value += 1000;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set main instance
|
||||
*
|
||||
* @param MTProto $API Main instance
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function setExtra(MTProto $API)
|
||||
{
|
||||
$this->API = $API;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get main instance
|
||||
*
|
||||
* @return MTProto
|
||||
*/
|
||||
public function getExtra(): MTProto
|
||||
{
|
||||
return $this->API;
|
||||
}
|
||||
/**
|
||||
* Sleep function.
|
||||
*
|
||||
|
@ -106,7 +106,7 @@ class CheckLoop extends ResumableSignalLoop
|
||||
break;
|
||||
}
|
||||
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' not received by server, resending...', \danog\MadelineProto\Logger::ERROR);
|
||||
$API->method_recall('watcherId', ['message_id' => $message_id, 'datacenter' => $datacenter, 'postpone' => true]);
|
||||
$connection->method_recall('watcherId', ['message_id' => $message_id, 'postpone' => true]);
|
||||
break;
|
||||
case 4:
|
||||
if ($chr & 32) {
|
||||
@ -144,7 +144,7 @@ class CheckLoop extends ResumableSignalLoop
|
||||
&& $connection->outgoing_messages[$message_id]['unencrypted']
|
||||
) {
|
||||
$API->logger->logger('Still missing '.$connection->outgoing_messages[$message_id]['_'].' with message id '.($message_id)." on DC $datacenter, resending", \danog\MadelineProto\Logger::ERROR);
|
||||
$API->method_recall('', ['message_id' => $message_id, 'datacenter' => $datacenter, 'postpone' => true]);
|
||||
$connection->method_recall('', ['message_id' => $message_id, 'postpone' => true]);
|
||||
}
|
||||
}
|
||||
$connection->writer->resume();
|
||||
|
File diff suppressed because one or more lines are too long
@ -78,4 +78,71 @@ trait AckHandler
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Check if there are some pending calls
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public function hasPendingCalls()
|
||||
{
|
||||
$API = $this->API;
|
||||
$datacenter = $this->datacenter;
|
||||
|
||||
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
|
||||
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
|
||||
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
|
||||
|
||||
foreach ($this->new_outgoing as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
|
||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
||||
) {
|
||||
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all pending calls
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function getPendingCalls()
|
||||
{
|
||||
$API = $this->API;
|
||||
$datacenter = $this->datacenter;
|
||||
|
||||
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
|
||||
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
|
||||
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
|
||||
|
||||
$result = [];
|
||||
foreach ($this->new_outgoing as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
|
||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
||||
) {
|
||||
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$result[] = $message_id;
|
||||
}
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
}
|
||||
|
241
src/danog/MadelineProto/MTProtoSession/CallHandler.php
Normal file
241
src/danog/MadelineProto/MTProtoSession/CallHandler.php
Normal file
@ -0,0 +1,241 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* CallHandler module.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
*
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\MTProtoSession;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Async\Parameters;
|
||||
use function Amp\Promise\all;
|
||||
|
||||
/**
|
||||
* Manages method and object calls.
|
||||
*/
|
||||
trait CallHandler
|
||||
{
|
||||
/**
|
||||
* Recall method.
|
||||
*
|
||||
* @param string $watcherId Watcher ID for defer
|
||||
* @param array $args Args
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function method_recall(string $watcherId, $args)
|
||||
{
|
||||
$message_id = $args['message_id'];
|
||||
$postpone = $args['postpone'] ?? false;
|
||||
$datacenter = $args['datacenter'] ?? false;
|
||||
|
||||
$message_ids = $this->outgoing_messages[$message_id]['container'] ?? [$message_id];
|
||||
|
||||
foreach ($message_ids as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id]['body'])) {
|
||||
if ($datacenter) {
|
||||
$res = $this->API->datacenter->sockets[$datacenter]->sendMessage($this->outgoing_messages[$message_id], false);
|
||||
} else {
|
||||
$res = $this->sendMessage($this->outgoing_messages[$message_id], false);
|
||||
}
|
||||
$this->callFork($res);
|
||||
$this->ack_outgoing_message_id($message_id);
|
||||
$this->got_response_for_outgoing_message_id($message_id);
|
||||
} else {
|
||||
$this->logger->logger('Could not resend '.isset($this->outgoing_messages[$message_id]['_']) ? $this->outgoing_messages[$message_id]['_'] : $message_id);
|
||||
}
|
||||
}
|
||||
if (!$postpone) {
|
||||
if ($datacenter) {
|
||||
$this->API->datacenter->sockets[$datacenter]->writer->resume();
|
||||
} else {
|
||||
$this->writer->resume();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronous wrapper for method_call.
|
||||
*
|
||||
* @param string $method Method name
|
||||
* @param array $args Arguments
|
||||
* @param array $aargs Additional arguments
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function method_call(string $method, $args = [], array $aargs = ['msg_id' => null])
|
||||
{
|
||||
return $this->wait($this->method_call_async_read($method, $args, $aargs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Call method and wait asynchronously for response.
|
||||
*
|
||||
* If the $aargs['noResponse'] is true, will not wait for a response.
|
||||
*
|
||||
* @param string $method Method name
|
||||
* @param array $args Arguments
|
||||
* @param array $aargs Additional arguments
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function method_call_async_read(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
|
||||
{
|
||||
$deferred = new Deferred();
|
||||
$this->method_call_async_write($method, $args, $aargs)->onResolve(function ($e, $read_deferred) use ($deferred) {
|
||||
if ($e) {
|
||||
$deferred->fail($e);
|
||||
} else {
|
||||
if (\is_array($read_deferred)) {
|
||||
$read_deferred = \array_map(function ($value) {
|
||||
return $value->promise();
|
||||
}, $read_deferred);
|
||||
$deferred->resolve(all($read_deferred));
|
||||
} else {
|
||||
$deferred->resolve($read_deferred->promise());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return ($aargs['noResponse'] ?? false) ? new Success() : $deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call method and make sure it is asynchronously sent.
|
||||
*
|
||||
* @param string $method Method name
|
||||
* @param array $args Arguments
|
||||
* @param array $aargs Additional arguments
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function method_call_async_write(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
|
||||
{
|
||||
return $this->call($this->method_call_async_write_generator($method, $args, $aargs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Call method and make sure it is asynchronously sent (generator).
|
||||
*
|
||||
* @param string $method Method name
|
||||
* @param array $args Arguments
|
||||
* @param array $aargs Additional arguments
|
||||
*
|
||||
* @return Generator
|
||||
*/
|
||||
public function method_call_async_write_generator(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
|
||||
{
|
||||
if (\is_array($args)
|
||||
&& isset($args['id']['_'])
|
||||
&& isset($args['id']['dc_id'])
|
||||
&& $args['id']['_'] === 'inputBotInlineMessageID'
|
||||
&& $this->datacenter !== $args['id']['dc_id']
|
||||
) {
|
||||
return yield $this->API->datacenter->getConnection($args['id']['dc_id'])->method_call_async_write_generator($method, $args, $aargs);
|
||||
}
|
||||
if ($aargs['file'] ?? false && !$this->getCtx()->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) {
|
||||
$this->logger->logger('Using media DC');
|
||||
return yield $this->API->datacenter->getConnection($this->datacenter.'_media')->method_call_async_write_generator($method, $args, $aargs);
|
||||
}
|
||||
if (\in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) {
|
||||
$aargs['queue'] = 'secret';
|
||||
}
|
||||
|
||||
if (\is_array($args)) {
|
||||
if (isset($args['multiple'])) {
|
||||
$aargs['multiple'] = true;
|
||||
}
|
||||
if (isset($args['message']) && \is_string($args['message']) && \mb_strlen($args['message'], 'UTF-8') > $this->API->config['message_length_max'] && \mb_strlen((yield $this->parse_mode_async($args))['message'], 'UTF-8') > $this->API->config['message_length_max']) {
|
||||
$args = yield $this->split_to_chunks_async($args);
|
||||
$promises = [];
|
||||
$aargs['queue'] = $method;
|
||||
$aargs['multiple'] = true;
|
||||
}
|
||||
if (isset($aargs['multiple'])) {
|
||||
$new_aargs = $aargs;
|
||||
$new_aargs['postpone'] = true;
|
||||
unset($new_aargs['multiple']);
|
||||
|
||||
if (isset($args['multiple'])) {
|
||||
unset($args['multiple']);
|
||||
}
|
||||
foreach ($args as $single_args) {
|
||||
$promises[] = $this->method_call_async_write($method, $single_args, $new_aargs);
|
||||
}
|
||||
|
||||
if (!isset($aargs['postpone'])) {
|
||||
$this->writer->resume();
|
||||
}
|
||||
|
||||
return yield all($promises);
|
||||
}
|
||||
$args = yield $this->botAPI_to_MTProto_async($args);
|
||||
if (isset($args['ping_id']) && \is_int($args['ping_id'])) {
|
||||
$args['ping_id'] = $this->pack_signed_long($args['ping_id']);
|
||||
}
|
||||
}
|
||||
|
||||
$deferred = new Deferred();
|
||||
$message = \array_merge(
|
||||
$aargs,
|
||||
[
|
||||
'_' => $method,
|
||||
'type' => $this->methods->find_by_method($method)['type'],
|
||||
'content_related' => $this->content_related($method),
|
||||
'promise' => $deferred,
|
||||
'method' => true,
|
||||
'unencrypted' => $this->shared->hasAuthKey() && \strpos($method, '.') === false
|
||||
]
|
||||
);
|
||||
|
||||
if (\is_object($args) && $args instanceof Parameters) {
|
||||
$message['body'] = yield $args->fetchParameters();
|
||||
} else {
|
||||
$message['body'] = $args;
|
||||
}
|
||||
|
||||
if (($method === 'users.getUsers' && $args === ['id' => [['_' => 'inputUserSelf']]]) || $method === 'auth.exportAuthorization' || $method === 'updates.getDifference') {
|
||||
$message['user_related'] = true;
|
||||
}
|
||||
$aargs['postpone'] = $aargs['postpone'] ?? false;
|
||||
$deferred = yield $this->sendMessage($message, !$aargs['postpone']);
|
||||
|
||||
$this->checker->resume();
|
||||
|
||||
return $deferred;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send object and make sure it is asynchronously sent (generator).
|
||||
*
|
||||
* @param string $object Object name
|
||||
* @param array $args Arguments
|
||||
* @param array $aargs Additional arguments
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function object_call_async(string $object, $args = [], array $aargs = ['msg_id' => null]): Promise
|
||||
{
|
||||
$message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => $this->shared->hasAuthKey(), 'method' => false];
|
||||
if (isset($aargs['promise'])) {
|
||||
$message['promise'] = $aargs['promise'];
|
||||
}
|
||||
|
||||
return $this->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true);
|
||||
}
|
||||
}
|
@ -385,7 +385,6 @@ trait ResponseHandler
|
||||
|
||||
return;
|
||||
case 303:
|
||||
$old_datacenter = $datacenter;
|
||||
$this->API->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']);
|
||||
|
||||
if (isset($request['file']) && $request['file'] && isset($this->API->datacenter->sockets[$datacenter.'_media'])) {
|
||||
@ -396,8 +395,8 @@ trait ResponseHandler
|
||||
if (isset($request['user_related']) && $request['user_related']) {
|
||||
$this->settings['connection_settings']['default_dc'] = $this->API->authorized_dc = $this->API->datacenter->curdc;
|
||||
}
|
||||
Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]);
|
||||
//$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
|
||||
Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
|
||||
//$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
|
||||
|
||||
return;
|
||||
case 401:
|
||||
|
@ -28,6 +28,7 @@ abstract class Session
|
||||
use ResponseHandler;
|
||||
use SaltHandler;
|
||||
use SeqNoHandler;
|
||||
use CallHandler;
|
||||
|
||||
public $incoming_messages = [];
|
||||
public $outgoing_messages = [];
|
||||
@ -41,4 +42,6 @@ abstract class Session
|
||||
|
||||
public $call_queue = [];
|
||||
public $ack_queue = [];
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,218 +0,0 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* CallHandler module.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
*
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Async\Parameters;
|
||||
use function Amp\Promise\all;
|
||||
|
||||
/**
|
||||
* Manages method and object calls.
|
||||
*/
|
||||
trait CallHandler
|
||||
{
|
||||
public function method_recall($watcherId, $args)
|
||||
{
|
||||
$message_id = $args['message_id'];
|
||||
$new_datacenter = $args['datacenter'];
|
||||
$old_datacenter = $new_datacenter;
|
||||
if (isset($args['old_datacenter'])) {
|
||||
$old_datacenter = $args['old_datacenter'];
|
||||
}
|
||||
$postpone = false;
|
||||
if (isset($args['postpone'])) {
|
||||
$postpone = $args['postpone'];
|
||||
}
|
||||
|
||||
if (isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['container'])) {
|
||||
$message_ids = $this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['container'];
|
||||
} else {
|
||||
$message_ids = [$message_id];
|
||||
}
|
||||
|
||||
foreach ($message_ids as $message_id) {
|
||||
if (isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['body'])) {
|
||||
$this->callFork($this->datacenter->sockets[$new_datacenter]->sendMessage($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id], false));
|
||||
$this->ack_outgoing_message_id($message_id, $old_datacenter);
|
||||
$this->got_response_for_outgoing_message_id($message_id, $old_datacenter);
|
||||
} else {
|
||||
$this->logger->logger('Could not resend '.isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_']) ? $this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_'] : $message_id);
|
||||
}
|
||||
}
|
||||
if (!$postpone) {
|
||||
$this->datacenter->sockets[$new_datacenter]->writer->resume();
|
||||
}
|
||||
}
|
||||
|
||||
public function method_call($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
|
||||
{
|
||||
return $this->wait($this->method_call_async_read($method, $args, $aargs));
|
||||
}
|
||||
|
||||
public function method_call_async_read($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
|
||||
{
|
||||
$deferred = new Deferred();
|
||||
$this->method_call_async_write($method, $args, $aargs)->onResolve(function ($e, $read_deferred) use ($deferred) {
|
||||
if ($e) {
|
||||
$deferred->fail($e);
|
||||
} else {
|
||||
if (is_array($read_deferred)) {
|
||||
$read_deferred = array_map(function ($value) {
|
||||
return $value->promise();
|
||||
}, $read_deferred);
|
||||
$deferred->resolve(all($read_deferred));
|
||||
} else {
|
||||
$deferred->resolve($read_deferred->promise());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return isset($aargs['noResponse']) && $aargs['noResponse'] ? 0 : $deferred->promise();
|
||||
}
|
||||
|
||||
public function method_call_async_write($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): Promise
|
||||
{
|
||||
return $this->call($this->method_call_async_write_generator($method, $args, $aargs));
|
||||
}
|
||||
|
||||
public function method_call_async_write_generator($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): \Generator
|
||||
{
|
||||
if (is_array($args) && isset($args['id']['_']) && isset($args['id']['dc_id']) && $args['id']['_'] === 'inputBotInlineMessageID') {
|
||||
$aargs['datacenter'] = $args['id']['dc_id'];
|
||||
}
|
||||
if ($this->wrapper instanceof \danog\MadelineProto\API && isset($this->wrapper->session) && !is_null($this->wrapper->session) && time() - $this->wrapper->serialized > $this->settings['serialization']['serialization_interval'] && !$this->asyncInitPromise) {
|
||||
$this->logger->logger("Didn't serialize in a while, doing that now...");
|
||||
$this->wrapper->serialize($this->wrapper->session);
|
||||
}
|
||||
if (isset($aargs['file']) && $aargs['file'] && isset($this->datacenter->sockets[$aargs['datacenter'].'_media'])) {
|
||||
$this->logger->logger('Using media DC');
|
||||
$aargs['datacenter'] .= '_media';
|
||||
}
|
||||
if (in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) {
|
||||
$aargs['queue'] = 'secret';
|
||||
}
|
||||
|
||||
if (is_array($args)) {
|
||||
if (isset($args['multiple'])) {
|
||||
$aargs['multiple'] = true;
|
||||
}
|
||||
if (isset($args['message']) && is_string($args['message']) && mb_strlen($args['message'], 'UTF-8') > $this->config['message_length_max'] && mb_strlen((yield $this->parse_mode_async($args))['message'], 'UTF-8') > $this->config['message_length_max']) {
|
||||
$args = yield $this->split_to_chunks_async($args);
|
||||
$promises = [];
|
||||
$aargs['queue'] = $method;
|
||||
$aargs['multiple'] = true;
|
||||
}
|
||||
if (isset($aargs['multiple'])) {
|
||||
$new_aargs = $aargs;
|
||||
$new_aargs['postpone'] = true;
|
||||
unset($new_aargs['multiple']);
|
||||
|
||||
if (isset($args['multiple'])) {
|
||||
unset($args['multiple']);
|
||||
}
|
||||
foreach ($args as $single_args) {
|
||||
$promises[] = $this->method_call_async_write($method, $single_args, $new_aargs);
|
||||
}
|
||||
|
||||
if (!isset($aargs['postpone'])) {
|
||||
$this->datacenter->sockets[$aargs['datacenter']]->writer->resume();
|
||||
}
|
||||
|
||||
return yield all($promises);
|
||||
}
|
||||
$args = yield $this->botAPI_to_MTProto_async($args);
|
||||
if (isset($args['ping_id']) && is_int($args['ping_id'])) {
|
||||
$args['ping_id'] = $this->pack_signed_long($args['ping_id']);
|
||||
}
|
||||
}
|
||||
|
||||
$deferred = new Deferred();
|
||||
$message = ['_' => $method, 'type' => $this->methods->find_by_method($method)['type'], 'content_related' => $this->content_related($method), 'promise' => $deferred, 'method' => true, 'unencrypted' => $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key === null && strpos($method, '.') === false];
|
||||
|
||||
if (is_object($args) && $args instanceof Parameters) {
|
||||
$message['body'] = yield $args->fetchParameters();
|
||||
} else {
|
||||
$message['body'] = $args;
|
||||
}
|
||||
|
||||
if (isset($aargs['msg_id'])) {
|
||||
$message['msg_id'] = $aargs['msg_id'];
|
||||
}
|
||||
if (isset($aargs['queue'])) {
|
||||
$message['queue'] = $aargs['queue'];
|
||||
}
|
||||
if (isset($aargs['file'])) {
|
||||
$message['file'] = $aargs['file'];
|
||||
}
|
||||
if (isset($aargs['botAPI'])) {
|
||||
$message['botAPI'] = $aargs['botAPI'];
|
||||
}
|
||||
if (isset($aargs['FloodWaitLimit'])) {
|
||||
$message['FloodWaitLimit'] = $aargs['FloodWaitLimit'];
|
||||
}
|
||||
if (($method === 'users.getUsers' && $args === ['id' => [['_' => 'inputUserSelf']]]) || $method === 'auth.exportAuthorization' || $method === 'updates.getDifference') {
|
||||
$message['user_related'] = true;
|
||||
}
|
||||
|
||||
$deferred = yield $this->datacenter->sockets[$aargs['datacenter']]->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true);
|
||||
|
||||
$this->datacenter->sockets[$aargs['datacenter']]->checker->resume();
|
||||
|
||||
return $deferred;
|
||||
}
|
||||
|
||||
public function object_call_async($object, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
|
||||
{
|
||||
$message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key === null, 'method' => false];
|
||||
if (isset($aargs['promise'])) {
|
||||
$message['promise'] = $aargs['promise'];
|
||||
}
|
||||
|
||||
return $this->datacenter->sockets[$aargs['datacenter']]->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true);
|
||||
}
|
||||
|
||||
/*
|
||||
$message = [
|
||||
// only in outgoing messages
|
||||
'body' => deserialized body, (optional if container)
|
||||
'serialized_body' => 'serialized body', (optional if container)
|
||||
'content_related' => bool,
|
||||
'_' => 'predicate',
|
||||
'promise' => deferred promise that gets resolved when a response to the message is received (optional),
|
||||
'send_promise' => deferred promise that gets resolved when the message is sent (optional),
|
||||
'file' => bool (optional),
|
||||
'type' => 'type' (optional),
|
||||
'queue' => queue ID (optional),
|
||||
'container' => [message ids] (optional),
|
||||
|
||||
// only in incoming messages
|
||||
'content' => deserialized body,
|
||||
'seq_no' => number (optional),
|
||||
'from_container' => bool (optional),
|
||||
|
||||
// can be present in both
|
||||
'response' => message id (optional),
|
||||
'msg_id' => message id (optional),
|
||||
'sent' => timestamp,
|
||||
'tries' => number
|
||||
];
|
||||
*/
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user