Rewriting update management

This commit is contained in:
Daniil Gentili 2019-05-29 15:17:14 +02:00
parent ff271bbf68
commit 7f47e3de85
16 changed files with 563 additions and 129 deletions

View File

@ -158,12 +158,15 @@ class APIFactory extends AsyncConstruct
yield $this->initAsync(); yield $this->initAsync();
} }
if (Magic::is_fork() && !Magic::$processed_fork) { if (Magic::is_fork() && !Magic::$processed_fork) {
throw new Exception("Forking not supported");
/*
\danog\MadelineProto\Logger::log('Detected fork'); \danog\MadelineProto\Logger::log('Detected fork');
$this->API->reset_session(); $this->API->reset_session();
foreach ($this->API->datacenter->sockets as $datacenter) { foreach ($this->API->datacenter->sockets as $datacenter) {
yield $datacenter->reconnect(); yield $datacenter->reconnect();
} }
Magic::$processed_fork = true; Magic::$processed_fork = true;
*/
} }
if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) { if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) {
Logger::log("Didn't serialize in a while, doing that now..."); Logger::log("Didn't serialize in a while, doing that now...");

View File

@ -23,7 +23,6 @@ use Amp\Promise;
use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\CheckLoop;
use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop;
use danog\MadelineProto\Loop\Connection\ReadLoop; use danog\MadelineProto\Loop\Connection\ReadLoop;
use danog\MadelineProto\Loop\Connection\UpdateLoop;
use danog\MadelineProto\Loop\Connection\WriteLoop; use danog\MadelineProto\Loop\Connection\WriteLoop;
use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
@ -128,9 +127,6 @@ class Connection
if (!isset($this->waiter)) { if (!isset($this->waiter)) {
$this->waiter = new HttpWaitLoop($this->API, $this->datacenter); $this->waiter = new HttpWaitLoop($this->API, $this->datacenter);
} }
if (!isset($this->updater)) {
$this->updater = new UpdateLoop($this->API, $this->datacenter);
}
foreach ($this->new_outgoing as $message_id) { foreach ($this->new_outgoing as $message_id) {
if ($this->outgoing_messages[$message_id]['unencrypted']) { if ($this->outgoing_messages[$message_id]['unencrypted']) {
$promise = $this->outgoing_messages[$message_id]['promise']; $promise = $this->outgoing_messages[$message_id]['promise'];
@ -151,9 +147,6 @@ class Connection
} }
$this->waiter->start(); $this->waiter->start();
if ($this->datacenter === $this->API->settings['connection_settings']['default_dc']) {
$this->updater->start();
}
} }
public function sendMessage($message, $flush = true) public function sendMessage($message, $flush = true)

View File

@ -29,6 +29,16 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
*/ */
class CheckLoop extends ResumableSignalLoop class CheckLoop extends ResumableSignalLoop
{ {
protected $connection;
protected $datacenter;
public function __construct($API, $datacenter)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop() public function loop()
{ {
$API = $this->API; $API = $this->API;

View File

@ -31,6 +31,15 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
*/ */
class HttpWaitLoop extends ResumableSignalLoop class HttpWaitLoop extends ResumableSignalLoop
{ {
protected $connection;
protected $datacenter;
public function __construct($API, $datacenter)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop() public function loop()
{ {
$API = $this->API; $API = $this->API;

View File

@ -37,6 +37,15 @@ class ReadLoop extends SignalLoop
use Tools; use Tools;
use Crypt; use Crypt;
protected $connection;
protected $datacenter;
public function __construct($API, $datacenter)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop() public function loop()
{ {
$API = $this->API; $API = $this->API;

View File

@ -1,86 +0,0 @@
<?php
/**
* Update loop.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Connection;
use Amp\Success;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
/**
* Update loop.
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class UpdateLoop extends ResumableSignalLoop
{
use \danog\MadelineProto\Tools;
public function loop()
{
$API = $this->API;
$datacenter = $this->datacenter;
if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0);
return false;
}
$this->startedLoop();
$API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['updates']['getdifference_interval'];
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$this->exitedLoop();
return;
}
}
if (time() - $API->last_getdifference > $timeout) {
if (!yield $API->get_updates_difference_async()) {
return false;
}
}
if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$this->exitedLoop();
return;
}
}
}
public function has_all_auth()
{
if ($this->API->isInitingAuthorization()) {
return false;
}
foreach ($this->API->datacenter->sockets as $dc) {
if (!$dc->authorized || $dc->temp_auth_key === null) {
return false;
}
}
return true;
}
}

View File

@ -36,6 +36,16 @@ class WriteLoop extends ResumableSignalLoop
use Crypt; use Crypt;
use Tools; use Tools;
protected $connection;
protected $datacenter;
public function __construct($API, $datacenter)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop(): \Generator public function loop(): \Generator
{ {
$API = $this->API; $API = $this->API;

View File

@ -0,0 +1,69 @@
<?php
/**
* Generic period fetcher loop
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Generic;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
/**
* Update loop.
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class PeriodicFetcherLoop extends ResumableSignalLoop
{
const STOP = -1;
const PAUSE = null;
protected $callback;
protected $name;
/**
* Constructor
*
* @param \danog\MadelineProto\API $API Instance of MadelineProto
* @param callback $callback Callback to run periodically
* @param int $timeout Timeout
* @param string $name Fetcher name
*/
public function __construct($API, $callback, $name)
{
$this->API = $API;
$this->callback = $callback->bindTo($this);
$this->name = $name;
}
public function loop()
{
$API = $this->API;
$callback = $this->callback;
$name = $this->name;
$this->startedLoop();
$API->logger->logger("Entered $name loop", Logger::ULTRA_VERBOSE);
while (true) {
$timeout = yield $callback();
if ($timeout === self::STOP || yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $name loop");
$this->exitedLoop();
return;
}
}
}
}

View File

@ -19,7 +19,6 @@
namespace danog\MadelineProto\Loop\Impl; namespace danog\MadelineProto\Loop\Impl;
use Amp\Promise; use Amp\Promise;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\LoopInterface; use danog\MadelineProto\Loop\LoopInterface;
/** /**
@ -36,20 +35,15 @@ abstract class Loop implements LoopInterface
private $count = 0; private $count = 0;
protected $API; protected $API;
protected $connection; public function __construct($API)
protected $datacenter;
public function __construct($API, $datacenter)
{ {
$this->API = $API; $this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
} }
public function start() public function start()
{ {
if ($this->count) { if ($this->count) {
$this->API->logger->logger("NOT entering check loop in DC {$this->datacenter} with running count {$this->count}", Logger::ERROR); $this->API->logger->logger("NOT entering loop with running count {$this->count}", Logger::ERROR);
return false; return false;
} }

View File

@ -23,6 +23,7 @@ use Amp\Loop;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use danog\MadelineProto\Loop\ResumableLoopInterface; use danog\MadelineProto\Loop\ResumableLoopInterface;
use danog\MadelineProto\Tools;
/** /**
* Resumable signal loop helper trait. * Resumable signal loop helper trait.
@ -31,7 +32,9 @@ use danog\MadelineProto\Loop\ResumableLoopInterface;
*/ */
abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopInterface abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopInterface
{ {
use Tools;
private $resume; private $resume;
private $pause;
private $resumeWatcher; private $resumeWatcher;
public function pause($time = null): Promise public function pause($time = null): Promise
@ -46,11 +49,14 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn
$this->resumeWatcher = null; $this->resumeWatcher = null;
} }
$this->resumeWatcher = Loop::delay($time * 1000, [$this, 'resume'], $resume); $this->resumeWatcher = Loop::delay($time * 1000, [$this, 'resume'], $resume);
//var_dump("resume {$this->resumeWatcher} ".get_class($this)." DC {$this->datacenter} after ", ($time * 1000), $resume);
} }
} }
$this->resume = new Deferred(); $this->resume = new Deferred();
$pause = $this->pause;
$this->pause = new Deferred;
Loop::defer([$pause, 'resolve']);
return $this->resume->promise(); return $this->resume->promise();
} }
@ -64,14 +70,18 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn
return; return;
} }
} }
/*
if ($expected) {
//var_dump("=======", "resume $watcherId ".get_class($this)." DC {$this->datacenter} diff ".(microtime(true) - $expected).": expected $expected, actual ".microtime(true));
}*/
if ($this->resume) { if ($this->resume) {
$resume = $this->resume; $resume = $this->resume;
$this->resume = null; $this->resume = null;
$resume->resolve(); $resume->resolve();
return $this->pause ? $this->pause->promise() : null;
} }
} }
public function resumeDefer()
{
Loop::defer([$this, 'resume']);
return $this->pause ? $this->pause->promise() : null;
}
} }

View File

@ -0,0 +1,190 @@
<?php
/**
* Update feeder loop.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Connection;
use Amp\Success;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
/**
* update feed loop.
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class FeedLoop extends ResumableSignalLoop
{
use \danog\MadelineProto\Tools;
private $incomingUpdates = [];
private $parsedUpdates = [];
private $channelId;
private $updater;
public function __construct($API, $channelId = false)
{
$this->API = $API;
$this->channelId = $channelId;
}
public function loop()
{
$API = $this->API;
$updater = $this->updater = $API->updater[$this->channelId];
if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0);
return false;
}
$this->startedLoop();
$API->logger->logger("Entered update feed loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop in channel {$this->channelId}");
$this->exitedLoop();
return;
}
}
$this->state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId);
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$this->exitedLoop();
return;
}
if (!$this->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$this->exitedLoop();
return;
}
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = null;
yield $this->parse($updates);
$updates = null;
}
}
}
public function parse($updates)
{
reset($updates);
while ($updates) {
$options = [];
$key = key($updates);
$update = $updates[$key];
unset($updates[$key]);
if (isset($update['options'])) {
$options = $update['options'];
unset($update['options']);
}
if (isset($update['pts'])) {
$logger = function ($msg) use ($update) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
$this->logger->logger($update);
$double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-';
$mid = isset($update['message']['id']) ? $update['message']['id'] : '-';
$mypts = $this->state->pts();
$this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR);
};
$result = $this->state->checkPts($update);
if ($result < 0) {
$logger("PTS duplicate");
continue;
}
if ($result > 0) {
$logger("PTS hole");
$this->updater->setLimit($state->pts + $result);
yield $this->updater->resume();
$updates = array_merge($this->incomingUpdates, $updates);
$this->incomingUpdates = null;
continue;
}
if (isset($update['message']['id'], $update['message']['to_id']) && !in_array($update['_'], ['updateEditMessage', 'updateEditChannelMessage'])) {
if (!$this->API->check_msg_id($update['message'])) {
$logger("MSGID duplicate");
continue;
}
}
$logger("PTS OK");
$this->state->pts($update['pts']);
if ($this->channelId === false && isset($options['date'])) {
$this->state->date($options['date']);
}
}
if ($this->channelId === false && isset($options['seq']) || isset($options['seq_start'])) {
$seq = $options['seq'];
$seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq'];
if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) {
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->get_updates_difference_async();
return false;
}
if ($this->state->seq() !== $seq) {
$this->state->seq($seq);
if (isset($options['date'])) {
$this->state->date($options['date']);
}
}
}
$this->save($update);
}
}
public function feed($updates)
{
$this->incomingUpdates = array_merge($this->incomingUpdates, $updates);
}
public function fetchSlice($to_pts)
{
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $this->state->pts(), 'pts_total_limit' => $to_pts, 'date' => $this->state->date(), 'qts' => $this->state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]);
var_dumP($difference);
}
public function save($update)
{
$this->parsedUpdates []= $update;
}
public function has_all_auth()
{
if ($this->API->isInitingAuthorization()) {
return false;
}
foreach ($this->API->datacenter->sockets as $dc) {
if (!$dc->authorized || $dc->temp_auth_key === null) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,192 @@
<?php
/**
* Update loop.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Update;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
/**
* Update loop.
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class UpdateLoop extends ResumableSignalLoop
{
use \danog\MadelineProto\Tools;
private $toPts;
private $channelId;
private $feeder;
public function __construct($API, $channelId)
{
$this->API = $API;
$this->channelId = $channelId;
}
public function loop()
{
$API = $this->API;
$datacenter = $this->datacenter;
$feeder = $this->feeder = $API->feeder[$this->channelId];
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop in channel {$this->channelId}");
$this->exitedLoop();
return;
}
}
$this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId);
$this->startedLoop();
$API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['updates']['getdifference_interval'];
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$this->exitedLoop();
return;
}
}
if (time() - $API->last_getdifference > $timeout) {
$toPts = $this->toPts;
$this->toPts = null;
while (true) {
if ($this->channelId) {
$this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($state->pts() <= 1) {
$limit = 10;
} else if ($API->authorization['user']['bot']) {
$limit = 100000;
} else {
$limit = 100;
}
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]);
if (isset($difference['timeout'])) {
$timeout = $difference['timeout'];
}
switch ($difference['_']) {
case 'updates.channelDifferenceEmpty':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
unset($difference);
break 2;
case 'updates.channelDifference':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
if ($state->pts() >= $difference['pts'] && $state->pts() > 1) {
$this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts()+1), \danog\MadelineProto\Logger::VERBOSE);
$difference['pts'] = $state->pts() + 1;
}
$state->update($difference);
$feeder->feed($difference['other_updates']);
yield $this->handle_update_messages_async($difference['new_messages'], $channel);
if (!$difference['final']) {
if ($difference['pts'] >= $toPts) {
unset($difference);
break 2;
}
unset($difference);
break;
}
unset($difference);
break 2;
case 'updates.channelDifferenceTooLong':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
yield $this->handle_update_messages_async($difference['messages'], $channel);
unset($difference);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
}
} else {
$this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]);
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
switch ($difference['_']) {
case 'updates.differenceEmpty':
$state->update($difference);
unset($difference);
break 2;
case 'updates.difference':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['state']);
unset($difference);
break 2;
case 'updates.differenceSlice':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['intermediate_state']);
if ($difference['intermediate_state']['pts'] >= $toPts) {
unset($difference);
break 2;
}
unset($difference);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
}
}
}
}
if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$this->exitedLoop();
return;
}
}
}
public function setLimit($toPts)
{
$this->toPts = $toPts;
}
public function has_all_auth()
{
if ($this->API->isInitingAuthorization()) {
return false;
}
foreach ($this->API->datacenter->sockets as $dc) {
if (!$dc->authorized || $dc->temp_auth_key === null) {
return false;
}
}
return true;
}
}

View File

@ -20,13 +20,15 @@
namespace danog\MadelineProto; namespace danog\MadelineProto;
use Amp\Loop; use Amp\Loop;
use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\MTProtoTools\CombinedUpdatesState;
use danog\MadelineProto\MTProtoTools\ReferenceDatabase; use danog\MadelineProto\MTProtoTools\ReferenceDatabase;
use danog\MadelineProto\MTProtoTools\UpdatesState; use danog\MadelineProto\MTProtoTools\UpdatesState;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\TL\TLCallback; use danog\MadelineProto\TL\TLCallback;
use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; use danog\MadelineProto\Loop\Update\UpdateLoop;
use danog\MadelineProto\Async\AsyncConstruct; use danog\MadelineProto\Loop\Update\FeedLoop;
/** /**
* Manages all of the mtproto stuff. * Manages all of the mtproto stuff.
@ -147,7 +149,8 @@ class MTProto extends AsyncConstruct implements TLCallback
public $referenceDatabase; public $referenceDatabase;
public $update_deferred; public $update_deferred;
public $phoneConfigWatcherId; public $phoneConfigWatcherId;
public $feeders = [];
public $updaters = [];
public function __magic_construct($settings = []) public function __magic_construct($settings = [])
{ {
@ -179,6 +182,7 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!($this->channels_state instanceof CombinedUpdatesState)) { if (!($this->channels_state instanceof CombinedUpdatesState)) {
$this->channels_state = new CombinedUpdatesState($this->channels_state); $this->channels_state = new CombinedUpdatesState($this->channels_state);
} }
$this->channels_state->__construct([false => $this->updates_state]);
if (!isset($this->datacenter)) { if (!isset($this->datacenter)) {
$this->datacenter = new DataCenter($this, $this->settings['connection'], $this->settings['connection_settings']); $this->datacenter = new DataCenter($this, $this->settings['connection'], $this->settings['connection_settings']);
} }
@ -273,13 +277,13 @@ class MTProto extends AsyncConstruct implements TLCallback
}*/ }*/
/*$keys = array_keys((array) get_object_vars($this)); /*$keys = array_keys((array) get_object_vars($this));
if (count($keys) !== count(array_unique($keys))) { if (count($keys) !== count(array_unique($keys))) {
throw new Bug74586Exception(); throw new Bug74586Exception();
} }
if (isset($this->data)) { if (isset($this->data)) {
foreach ($this->data as $k => $v) { foreach ($this->data as $k => $v) {
$this->{$k} = $v; $this->{$k} = $v;
} }
unset($this->data); unset($this->data);
}*/ }*/
if ($this->authorized === true) { if ($this->authorized === true) {
$this->authorized = self::LOGGED_IN; $this->authorized = self::LOGGED_IN;
@ -290,6 +294,7 @@ class MTProto extends AsyncConstruct implements TLCallback
if (is_array($this->channels_state)) { if (is_array($this->channels_state)) {
$this->channels_state = new CombinedUpdatesState($this->channels_state); $this->channels_state = new CombinedUpdatesState($this->channels_state);
} }
$this->channels_state->__construct([false => $this->updates_state]);
$this->postpone_updates = false; $this->postpone_updates = false;
if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) { if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) {
@ -333,7 +338,10 @@ class MTProto extends AsyncConstruct implements TLCallback
} }
foreach ($this->full_chats as $id => $full) { foreach ($this->full_chats as $id => $full) {
if (isset($full['full'], $full['last_update'])) $this->full_chats[$id] = ['full' => $full['full'], 'last_update' => $full['last_update']]; if (isset($full['full'], $full['last_update'])) {
$this->full_chats[$id] = ['full' => $full['full'], 'last_update' => $full['last_update']];
}
} }
foreach ($this->secret_chats as $key => &$chat) { foreach ($this->secret_chats as $key => &$chat) {
if (!is_array($chat)) { if (!is_array($chat)) {
@ -385,6 +393,7 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!$this->settings['updates']['handle_old_updates']) { if (!$this->settings['updates']['handle_old_updates']) {
$this->channels_state = new CombinedUpdatesState(); $this->channels_state = new CombinedUpdatesState();
$this->channels_state->__construct([false => $this->updates_state]);
$this->got_state = false; $this->got_state = false;
} }
yield $this->connect_to_all_dcs_async(); yield $this->connect_to_all_dcs_async();
@ -848,6 +857,18 @@ class MTProto extends AsyncConstruct implements TLCallback
} }
yield $this->get_phone_config_async(); yield $this->get_phone_config_async();
foreach ($this->channels_state->get() as $state) {
$channelId = $state->getChannel();
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
} }
public function get_phone_config_async($watcherId = null) public function get_phone_config_async($watcherId = null)

View File

@ -40,12 +40,15 @@ class CombinedUpdatesState
/** /**
* Update multiple parameters * Update multiple parameters
* *
* @param array $init * @param array|null $init
* @param integer $channel * @param integer $channel
* @return UpdatesState * @return UpdatesState
*/ */
public function get($channel, $init = []) public function get($channel = null, $init = [])
{ {
if ($channel === null) {
return $this->states;
}
if (!isset($this->states[$channel])) { if (!isset($this->states[$channel])) {
return $this->states[$channel] = new UpdatesState($init, $channel); return $this->states[$channel] = new UpdatesState($init, $channel);
} }

View File

@ -224,6 +224,10 @@ trait UpdateHandler
return $this->updates_state; return $this->updates_state;
} }
public function loadChannelState($channelId = null)
{
return $this->channels_state->get($channelId);
}
public function get_updates_difference_async($w = null) public function get_updates_difference_async($w = null)
{ {
@ -322,13 +326,9 @@ trait UpdateHandler
$this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); $this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
$channel_id = false; $channel_id = false;
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelWebPage':
case 'updateNewChannelMessage': case 'updateNewChannelMessage':
case 'updateEditChannelMessage': case 'updateEditChannelMessage':
if ($update['message']['_'] === 'messageEmpty') {
$this->logger->logger('Got message empty, not saving', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
return false;
}
$channel_id = $update['message']['to_id']['channel_id']; $channel_id = $update['message']['to_id']['channel_id'];
break; break;
case 'updateDeleteChannelMessages': case 'updateDeleteChannelMessages':
@ -350,14 +350,10 @@ trait UpdateHandler
} else { } else {
$cur_state = $this->channels_state->get($channel_id, $update); $cur_state = $this->channels_state->get($channel_id, $update);
} }
/*
if ($cur_state['sync_loading'] && in_array($update['_'], ['updateNewMessage', 'updateEditMessage', 'updateNewChannelMessage', 'updateEditChannelMessage'])) {
$this->logger->logger('Sync loading, not handling update', \danog\MadelineProto\Logger::NOTICE);
return false;
}*/
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelTooLong': case 'updateChannelTooLong':
$this->datacenter->sockets[]
yield $this->get_channel_difference_async($channel_id); yield $this->get_channel_difference_async($channel_id);
return false; return false;

View File

@ -44,7 +44,7 @@ class UpdatesState
private $seq = 0; private $seq = 0;
/** /**
* Date * Date
* *
* @var int * @var int
*/ */
private $date = 1; private $date = 1;
@ -64,7 +64,7 @@ class UpdatesState
private $syncLoading = false; private $syncLoading = false;
/** /**
* Init function * Init function
* *
* @param array $init Initial parameters * @param array $init Initial parameters
* @param boolean $channelId Channel ID * @param boolean $channelId Channel ID
@ -182,4 +182,15 @@ class UpdatesState
} }
return $this->date; return $this->date;
} }
/**
* Check validity of PTS contained in update
*
* @param array $update
* @return int -1 if it's too old, 0 if it's ok, 1 if it's too new
*/
public function checkPts($update)
{
return ($this->pts + $update['pts_count']) - $update['pts'];
}
} }