diff --git a/src/danog/MadelineProto/APIFactory.php b/src/danog/MadelineProto/APIFactory.php index 2624c2bd..a63009ec 100644 --- a/src/danog/MadelineProto/APIFactory.php +++ b/src/danog/MadelineProto/APIFactory.php @@ -158,12 +158,15 @@ class APIFactory extends AsyncConstruct yield $this->initAsync(); } if (Magic::is_fork() && !Magic::$processed_fork) { + throw new Exception("Forking not supported"); + /* \danog\MadelineProto\Logger::log('Detected fork'); $this->API->reset_session(); foreach ($this->API->datacenter->sockets as $datacenter) { yield $datacenter->reconnect(); } Magic::$processed_fork = true; + */ } if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) { Logger::log("Didn't serialize in a while, doing that now..."); diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index 20079b9a..e1db04f1 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -23,7 +23,6 @@ use Amp\Promise; use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; -use danog\MadelineProto\Loop\Connection\UpdateLoop; use danog\MadelineProto\Loop\Connection\WriteLoop; use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\Stream\ConnectionContext; @@ -128,9 +127,6 @@ class Connection if (!isset($this->waiter)) { $this->waiter = new HttpWaitLoop($this->API, $this->datacenter); } - if (!isset($this->updater)) { - $this->updater = new UpdateLoop($this->API, $this->datacenter); - } foreach ($this->new_outgoing as $message_id) { if ($this->outgoing_messages[$message_id]['unencrypted']) { $promise = $this->outgoing_messages[$message_id]['promise']; @@ -151,9 +147,6 @@ class Connection } $this->waiter->start(); - if ($this->datacenter === $this->API->settings['connection_settings']['default_dc']) { - $this->updater->start(); - } } public function sendMessage($message, $flush = true) diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index 6f83febc..b831f59e 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -29,6 +29,16 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; */ class CheckLoop extends ResumableSignalLoop { + protected $connection; + protected $datacenter; + + public function __construct($API, $datacenter) + { + $this->API = $API; + $this->datacenter = $datacenter; + $this->connection = $API->datacenter->sockets[$datacenter]; + } + public function loop() { $API = $this->API; diff --git a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php index 4d20db76..658403fb 100644 --- a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php @@ -31,6 +31,15 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; */ class HttpWaitLoop extends ResumableSignalLoop { + protected $connection; + protected $datacenter; + + public function __construct($API, $datacenter) + { + $this->API = $API; + $this->datacenter = $datacenter; + $this->connection = $API->datacenter->sockets[$datacenter]; + } public function loop() { $API = $this->API; diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index 7a4cca3a..93d96535 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -37,6 +37,15 @@ class ReadLoop extends SignalLoop use Tools; use Crypt; + protected $connection; + protected $datacenter; + + public function __construct($API, $datacenter) + { + $this->API = $API; + $this->datacenter = $datacenter; + $this->connection = $API->datacenter->sockets[$datacenter]; + } public function loop() { $API = $this->API; diff --git a/src/danog/MadelineProto/Loop/Connection/UpdateLoop.php b/src/danog/MadelineProto/Loop/Connection/UpdateLoop.php deleted file mode 100644 index cd3d6c7e..00000000 --- a/src/danog/MadelineProto/Loop/Connection/UpdateLoop.php +++ /dev/null @@ -1,86 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2018 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Connection; - -use Amp\Success; -use danog\MadelineProto\Logger; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; - -/** - * Update loop. - * - * @author Daniil Gentili - */ -class UpdateLoop extends ResumableSignalLoop -{ - use \danog\MadelineProto\Tools; - - public function loop() - { - $API = $this->API; - $datacenter = $this->datacenter; - - if (!$this->API->settings['updates']['handle_updates']) { - yield new Success(0); - - return false; - } - - $this->startedLoop(); - $API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE); - - $timeout = $API->settings['updates']['getdifference_interval']; - while (true) { - while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { - if (yield $this->waitSignal($this->pause())) { - $API->logger->logger("Exiting update loop in DC $datacenter"); - $this->exitedLoop(); - - return; - } - } - if (time() - $API->last_getdifference > $timeout) { - if (!yield $API->get_updates_difference_async()) { - return false; - } - } - if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) { - $API->logger->logger("Exiting update loop in DC $datacenter"); - $this->exitedLoop(); - - return; - } - } - } - - public function has_all_auth() - { - if ($this->API->isInitingAuthorization()) { - return false; - } - - foreach ($this->API->datacenter->sockets as $dc) { - if (!$dc->authorized || $dc->temp_auth_key === null) { - return false; - } - } - - return true; - } -} diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 26b27480..7499b9e2 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -36,6 +36,16 @@ class WriteLoop extends ResumableSignalLoop use Crypt; use Tools; + protected $connection; + protected $datacenter; + + public function __construct($API, $datacenter) + { + $this->API = $API; + $this->datacenter = $datacenter; + $this->connection = $API->datacenter->sockets[$datacenter]; + } + public function loop(): \Generator { $API = $this->API; diff --git a/src/danog/MadelineProto/Loop/Generic/PeriodFetcherLoop.php b/src/danog/MadelineProto/Loop/Generic/PeriodFetcherLoop.php new file mode 100644 index 00000000..0d584450 --- /dev/null +++ b/src/danog/MadelineProto/Loop/Generic/PeriodFetcherLoop.php @@ -0,0 +1,69 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2018 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop\Generic; + +use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; + +/** + * Update loop. + * + * @author Daniil Gentili + */ +class PeriodicFetcherLoop extends ResumableSignalLoop +{ + const STOP = -1; + const PAUSE = null; + + protected $callback; + protected $name; + + /** + * Constructor + * + * @param \danog\MadelineProto\API $API Instance of MadelineProto + * @param callback $callback Callback to run periodically + * @param int $timeout Timeout + * @param string $name Fetcher name + */ + public function __construct($API, $callback, $name) + { + $this->API = $API; + $this->callback = $callback->bindTo($this); + $this->name = $name; + } + public function loop() + { + $API = $this->API; + $callback = $this->callback; + $name = $this->name; + + $this->startedLoop(); + $API->logger->logger("Entered $name loop", Logger::ULTRA_VERBOSE); + while (true) { + $timeout = yield $callback(); + if ($timeout === self::STOP || yield $this->waitSignal($this->pause($timeout))) { + $API->logger->logger("Exiting $name loop"); + $this->exitedLoop(); + + return; + } + } + } +} diff --git a/src/danog/MadelineProto/Loop/Impl/Loop.php b/src/danog/MadelineProto/Loop/Impl/Loop.php index 51e1f62b..45b6b95d 100644 --- a/src/danog/MadelineProto/Loop/Impl/Loop.php +++ b/src/danog/MadelineProto/Loop/Impl/Loop.php @@ -19,7 +19,6 @@ namespace danog\MadelineProto\Loop\Impl; use Amp\Promise; -use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\LoopInterface; /** @@ -36,20 +35,15 @@ abstract class Loop implements LoopInterface private $count = 0; protected $API; - protected $connection; - protected $datacenter; - - public function __construct($API, $datacenter) + public function __construct($API) { $this->API = $API; - $this->datacenter = $datacenter; - $this->connection = $API->datacenter->sockets[$datacenter]; } public function start() { if ($this->count) { - $this->API->logger->logger("NOT entering check loop in DC {$this->datacenter} with running count {$this->count}", Logger::ERROR); + $this->API->logger->logger("NOT entering loop with running count {$this->count}", Logger::ERROR); return false; } diff --git a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php b/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php index 05d1b344..7e008787 100644 --- a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php +++ b/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php @@ -23,6 +23,7 @@ use Amp\Loop; use Amp\Promise; use Amp\Success; use danog\MadelineProto\Loop\ResumableLoopInterface; +use danog\MadelineProto\Tools; /** * Resumable signal loop helper trait. @@ -31,7 +32,9 @@ use danog\MadelineProto\Loop\ResumableLoopInterface; */ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopInterface { + use Tools; private $resume; + private $pause; private $resumeWatcher; public function pause($time = null): Promise @@ -46,11 +49,14 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn $this->resumeWatcher = null; } $this->resumeWatcher = Loop::delay($time * 1000, [$this, 'resume'], $resume); - //var_dump("resume {$this->resumeWatcher} ".get_class($this)." DC {$this->datacenter} after ", ($time * 1000), $resume); } } $this->resume = new Deferred(); + $pause = $this->pause; + $this->pause = new Deferred; + Loop::defer([$pause, 'resolve']); + return $this->resume->promise(); } @@ -64,14 +70,18 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn return; } } - /* - if ($expected) { - //var_dump("=======", "resume $watcherId ".get_class($this)." DC {$this->datacenter} diff ".(microtime(true) - $expected).": expected $expected, actual ".microtime(true)); - }*/ if ($this->resume) { $resume = $this->resume; $this->resume = null; $resume->resolve(); + + return $this->pause ? $this->pause->promise() : null; } } + + public function resumeDefer() + { + Loop::defer([$this, 'resume']); + return $this->pause ? $this->pause->promise() : null; + } } diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php new file mode 100644 index 00000000..d441801f --- /dev/null +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -0,0 +1,190 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2018 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop\Connection; + +use Amp\Success; +use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; + +/** + * update feed loop. + * + * @author Daniil Gentili + */ +class FeedLoop extends ResumableSignalLoop +{ + use \danog\MadelineProto\Tools; + private $incomingUpdates = []; + private $parsedUpdates = []; + private $channelId; + private $updater; + + public function __construct($API, $channelId = false) + { + $this->API = $API; + $this->channelId = $channelId; + } + public function loop() + { + $API = $this->API; + $updater = $this->updater = $API->updater[$this->channelId]; + + if (!$this->API->settings['updates']['handle_updates']) { + yield new Success(0); + + return false; + } + + $this->startedLoop(); + $API->logger->logger("Entered update feed loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE); + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update feed loop in channel {$this->channelId}"); + $this->exitedLoop(); + + return; + } + } + $this->state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId); + + while (true) { + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update feed loop channel {$this->channelId}"); + $this->exitedLoop(); + + return; + } + } + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update feed loop channel {$this->channelId}"); + $this->exitedLoop(); + + return; + } + if (!$this->settings['updates']['handle_updates']) { + $API->logger->logger("Exiting update feed loop channel {$this->channelId}"); + $this->exitedLoop(); + return; + } + while ($this->incomingUpdates) { + $updates = $this->incomingUpdates; + $this->incomingUpdates = null; + yield $this->parse($updates); + $updates = null; + } + } + } + public function parse($updates) + { + reset($updates); + while ($updates) { + $options = []; + $key = key($updates); + $update = $updates[$key]; + unset($updates[$key]); + if (isset($update['options'])) { + $options = $update['options']; + unset($update['options']); + } + if (isset($update['pts'])) { + $logger = function ($msg) use ($update) { + $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; + $this->logger->logger($update); + $double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-'; + $mid = isset($update['message']['id']) ? $update['message']['id'] : '-'; + $mypts = $this->state->pts(); + $this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); + }; + $result = $this->state->checkPts($update); + if ($result < 0) { + $logger("PTS duplicate"); + + continue; + } + if ($result > 0) { + $logger("PTS hole"); + $this->updater->setLimit($state->pts + $result); + yield $this->updater->resume(); + $updates = array_merge($this->incomingUpdates, $updates); + $this->incomingUpdates = null; + continue; + } + if (isset($update['message']['id'], $update['message']['to_id']) && !in_array($update['_'], ['updateEditMessage', 'updateEditChannelMessage'])) { + if (!$this->API->check_msg_id($update['message'])) { + $logger("MSGID duplicate"); + + continue; + } + } + $logger("PTS OK"); + + $this->state->pts($update['pts']); + if ($this->channelId === false && isset($options['date'])) { + $this->state->date($options['date']); + } + } + if ($this->channelId === false && isset($options['seq']) || isset($options['seq_start'])) { + $seq = $options['seq']; + $seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq']; + if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) { + $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); + yield $this->get_updates_difference_async(); + + return false; + } + if ($this->state->seq() !== $seq) { + $this->state->seq($seq); + if (isset($options['date'])) { + $this->state->date($options['date']); + } + } + } + + $this->save($update); + } + } + public function feed($updates) + { + $this->incomingUpdates = array_merge($this->incomingUpdates, $updates); + } + public function fetchSlice($to_pts) + { + $difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $this->state->pts(), 'pts_total_limit' => $to_pts, 'date' => $this->state->date(), 'qts' => $this->state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); + var_dumP($difference); + } + public function save($update) + { + $this->parsedUpdates []= $update; + } + public function has_all_auth() + { + if ($this->API->isInitingAuthorization()) { + return false; + } + + foreach ($this->API->datacenter->sockets as $dc) { + if (!$dc->authorized || $dc->temp_auth_key === null) { + return false; + } + } + + return true; + } +} diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php new file mode 100644 index 00000000..d2b6e60d --- /dev/null +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -0,0 +1,192 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2018 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop\Update; + +use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; + +/** + * Update loop. + * + * @author Daniil Gentili + */ +class UpdateLoop extends ResumableSignalLoop +{ + use \danog\MadelineProto\Tools; + + private $toPts; + private $channelId; + private $feeder; + + public function __construct($API, $channelId) + { + $this->API = $API; + $this->channelId = $channelId; + } + public function loop() + { + $API = $this->API; + $datacenter = $this->datacenter; + $feeder = $this->feeder = $API->feeder[$this->channelId]; + + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update feed loop in channel {$this->channelId}"); + $this->exitedLoop(); + + return; + } + } + $this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId); + + $this->startedLoop(); + + $API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE); + + $timeout = $API->settings['updates']['getdifference_interval']; + while (true) { + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update loop in DC $datacenter"); + $this->exitedLoop(); + + return; + } + } + if (time() - $API->last_getdifference > $timeout) { + $toPts = $this->toPts; + $this->toPts = null; + while (true) { + if ($this->channelId) { + $this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + if ($state->pts() <= 1) { + $limit = 10; + } else if ($API->authorization['user']['bot']) { + $limit = 100000; + } else { + $limit = 100; + } + $difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]); + if (isset($difference['timeout'])) { + $timeout = $difference['timeout']; + } + + switch ($difference['_']) { + case 'updates.channelDifferenceEmpty': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + $state->update($difference); + unset($difference); + break 2; + case 'updates.channelDifference': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + if ($state->pts() >= $difference['pts'] && $state->pts() > 1) { + $this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts()+1), \danog\MadelineProto\Logger::VERBOSE); + $difference['pts'] = $state->pts() + 1; + } + $state->update($difference); + $feeder->feed($difference['other_updates']); + + yield $this->handle_update_messages_async($difference['new_messages'], $channel); + if (!$difference['final']) { + if ($difference['pts'] >= $toPts) { + unset($difference); + break 2; + } + unset($difference); + break; + } + unset($difference); + break 2; + case 'updates.channelDifferenceTooLong': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + $state->update($difference); + yield $this->handle_update_messages_async($difference['messages'], $channel); + unset($difference); + break; + default: + throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); + } + } else { + $this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + + $difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); + + switch ($difference['_']) { + case 'updates.differenceEmpty': + $state->update($difference); + unset($difference); + break 2; + case 'updates.difference': + foreach ($difference['new_encrypted_messages'] as &$encrypted) { + $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; + } + $feeder->feed($difference['other_updates']); + $feeder->feed($difference['new_encrypted_messages']); + yield $this->handle_update_messages_async($difference['new_messages']); + $state->update($difference['state']); + unset($difference); + break 2; + case 'updates.differenceSlice': + foreach ($difference['new_encrypted_messages'] as &$encrypted) { + $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; + } + $feeder->feed($difference['other_updates']); + $feeder->feed($difference['new_encrypted_messages']); + yield $this->handle_update_messages_async($difference['new_messages']); + $state->update($difference['intermediate_state']); + if ($difference['intermediate_state']['pts'] >= $toPts) { + unset($difference); + break 2; + } + unset($difference); + break; + default: + throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); + } + } + } + } + if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) { + $API->logger->logger("Exiting update loop in DC $datacenter"); + $this->exitedLoop(); + + return; + } + } + } + public function setLimit($toPts) + { + $this->toPts = $toPts; + } + public function has_all_auth() + { + if ($this->API->isInitingAuthorization()) { + return false; + } + + foreach ($this->API->datacenter->sockets as $dc) { + if (!$dc->authorized || $dc->temp_auth_key === null) { + return false; + } + } + + return true; + } +} diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index c30d456a..bca3bd7e 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -20,13 +20,15 @@ namespace danog\MadelineProto; use Amp\Loop; +use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; use danog\MadelineProto\MTProtoTools\ReferenceDatabase; use danog\MadelineProto\MTProtoTools\UpdatesState; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\TL\TLCallback; -use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; -use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Loop\Update\UpdateLoop; +use danog\MadelineProto\Loop\Update\FeedLoop; /** * Manages all of the mtproto stuff. @@ -147,7 +149,8 @@ class MTProto extends AsyncConstruct implements TLCallback public $referenceDatabase; public $update_deferred; public $phoneConfigWatcherId; - + public $feeders = []; + public $updaters = []; public function __magic_construct($settings = []) { @@ -179,6 +182,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (!($this->channels_state instanceof CombinedUpdatesState)) { $this->channels_state = new CombinedUpdatesState($this->channels_state); } + $this->channels_state->__construct([false => $this->updates_state]); if (!isset($this->datacenter)) { $this->datacenter = new DataCenter($this, $this->settings['connection'], $this->settings['connection_settings']); } @@ -273,13 +277,13 @@ class MTProto extends AsyncConstruct implements TLCallback }*/ /*$keys = array_keys((array) get_object_vars($this)); if (count($keys) !== count(array_unique($keys))) { - throw new Bug74586Exception(); + throw new Bug74586Exception(); } if (isset($this->data)) { - foreach ($this->data as $k => $v) { - $this->{$k} = $v; - } - unset($this->data); + foreach ($this->data as $k => $v) { + $this->{$k} = $v; + } + unset($this->data); }*/ if ($this->authorized === true) { $this->authorized = self::LOGGED_IN; @@ -290,6 +294,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (is_array($this->channels_state)) { $this->channels_state = new CombinedUpdatesState($this->channels_state); } + $this->channels_state->__construct([false => $this->updates_state]); $this->postpone_updates = false; if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) { @@ -333,7 +338,10 @@ class MTProto extends AsyncConstruct implements TLCallback } foreach ($this->full_chats as $id => $full) { - if (isset($full['full'], $full['last_update'])) $this->full_chats[$id] = ['full' => $full['full'], 'last_update' => $full['last_update']]; + if (isset($full['full'], $full['last_update'])) { + $this->full_chats[$id] = ['full' => $full['full'], 'last_update' => $full['last_update']]; + } + } foreach ($this->secret_chats as $key => &$chat) { if (!is_array($chat)) { @@ -385,6 +393,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (!$this->settings['updates']['handle_old_updates']) { $this->channels_state = new CombinedUpdatesState(); + $this->channels_state->__construct([false => $this->updates_state]); $this->got_state = false; } yield $this->connect_to_all_dcs_async(); @@ -848,6 +857,18 @@ class MTProto extends AsyncConstruct implements TLCallback } yield $this->get_phone_config_async(); + + foreach ($this->channels_state->get() as $state) { + $channelId = $state->getChannel(); + if (!isset($this->feeders[$channelId])) { + $this->feeders[$channelId] = new FeedLoop($this, $channelId); + } + if (!isset($this->updaters[$channelId])) { + $this->updaters[$channelId] = new UpdateLoop($this, $channelId); + } + $this->feeders[$channelId]->start(); + $this->updaters[$channelId]->start(); + } } public function get_phone_config_async($watcherId = null) diff --git a/src/danog/MadelineProto/MTProtoTools/CombinedUpdatesState.php b/src/danog/MadelineProto/MTProtoTools/CombinedUpdatesState.php index 889d6c03..5710caa3 100644 --- a/src/danog/MadelineProto/MTProtoTools/CombinedUpdatesState.php +++ b/src/danog/MadelineProto/MTProtoTools/CombinedUpdatesState.php @@ -40,12 +40,15 @@ class CombinedUpdatesState /** * Update multiple parameters * - * @param array $init + * @param array|null $init * @param integer $channel * @return UpdatesState */ - public function get($channel, $init = []) + public function get($channel = null, $init = []) { + if ($channel === null) { + return $this->states; + } if (!isset($this->states[$channel])) { return $this->states[$channel] = new UpdatesState($init, $channel); } diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index 626a5d87..f88220cf 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -224,6 +224,10 @@ trait UpdateHandler return $this->updates_state; } + public function loadChannelState($channelId = null) + { + return $this->channels_state->get($channelId); + } public function get_updates_difference_async($w = null) { @@ -322,13 +326,9 @@ trait UpdateHandler $this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); $channel_id = false; switch ($update['_']) { + case 'updateChannelWebPage': case 'updateNewChannelMessage': case 'updateEditChannelMessage': - if ($update['message']['_'] === 'messageEmpty') { - $this->logger->logger('Got message empty, not saving', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - - return false; - } $channel_id = $update['message']['to_id']['channel_id']; break; case 'updateDeleteChannelMessages': @@ -350,14 +350,10 @@ trait UpdateHandler } else { $cur_state = $this->channels_state->get($channel_id, $update); } - /* - if ($cur_state['sync_loading'] && in_array($update['_'], ['updateNewMessage', 'updateEditMessage', 'updateNewChannelMessage', 'updateEditChannelMessage'])) { - $this->logger->logger('Sync loading, not handling update', \danog\MadelineProto\Logger::NOTICE); - return false; - }*/ switch ($update['_']) { case 'updateChannelTooLong': + $this->datacenter->sockets[] yield $this->get_channel_difference_async($channel_id); return false; diff --git a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php index d65f1238..8fc4907b 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php @@ -44,7 +44,7 @@ class UpdatesState private $seq = 0; /** * Date - * + * * @var int */ private $date = 1; @@ -64,7 +64,7 @@ class UpdatesState private $syncLoading = false; /** - * Init function + * Init function * * @param array $init Initial parameters * @param boolean $channelId Channel ID @@ -182,4 +182,15 @@ class UpdatesState } return $this->date; } + + /** + * Check validity of PTS contained in update + * + * @param array $update + * @return int -1 if it's too old, 0 if it's ok, 1 if it's too new + */ + public function checkPts($update) + { + return ($this->pts + $update['pts_count']) - $update['pts']; + } }