More feeders

This commit is contained in:
Daniil Gentili 2019-05-29 18:28:43 +02:00
parent c86e9d31fb
commit 60b20f98f7
7 changed files with 208 additions and 54 deletions

View File

@ -89,6 +89,12 @@ class FeedLoop extends ResumableSignalLoop
yield $this->parse($updates);
$updates = null;
}
if ($this->parsedUpdates) {
foreach ($this->parsedUpdates as $update) {
yield $API->save_update_async($update);
}
$this->parsedUpdates = [];
}
}
}
public function parse($updates)
@ -99,10 +105,6 @@ class FeedLoop extends ResumableSignalLoop
$key = key($updates);
$update = $updates[$key];
unset($updates[$key]);
if (isset($update['options'])) {
$options = $update['options'];
unset($update['options']);
}
if (isset($update['pts'])) {
$logger = function ($msg) use ($update) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
@ -140,23 +142,6 @@ class FeedLoop extends ResumableSignalLoop
$this->state->date($options['date']);
}
}
if ($this->channelId === false && isset($options['seq']) || isset($options['seq_start'])) {
$seq = $options['seq'];
$seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq'];
if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) {
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->updaters[false]->resume();
return false;
}
if ($this->state->seq() !== $seq) {
$this->state->seq($seq);
if (isset($options['date'])) {
$this->state->date($options['date']);
}
}
}
$this->save($update);
}
}

View File

@ -0,0 +1,149 @@
<?php
/**
* Update feeder loop.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Update;
use Amp\Success;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
/**
* update feed loop.
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class SeqLoop extends ResumableSignalLoop
{
use \danog\MadelineProto\Tools;
private $incomingUpdates = [];
private $channelId;
private $feeder;
public function __construct($API)
{
$this->API = $API;
}
public function loop()
{
$API = $this->API;
$feeder = $this->feeder = $API->feeders[false];
if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0);
return false;
}
$this->startedLoop();
$API->logger->logger("Entered update seq loop", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$this->exitedLoop();
return;
}
}
$this->state = yield $API->load_update_state_async();
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$this->exitedLoop();
return;
}
if (!$this->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update seq loop");
$this->exitedLoop();
return;
}
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = null;
yield $this->parse($updates);
$updates = null;
}
$feeder->resumeDefer();
}
}
public function parse($updates)
{
reset($updates);
while ($updates) {
$options = [];
$key = key($updates);
$update = $updates[$key];
unset($updates[$key]);
$options = $update['options'];
$updates = $update['updates'];
unset($update);
$seq_start = $options['seq_start'];
$seq_end = $options['seq_end'];
$result = $this->state->checkSeq($seq_start);
if ($result > 0) {
$this->logger->logger('Seq hole of $result. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->updaters[false]->resume();
continue;
}
if ($result < 0) {
}
if ($this->state->seq() !== $seq) {
$this->state->seq($seq);
if (isset($options['date'])) {
$this->state->date($options['date']);
}
}
$this->save($updates);
}
}
public function feed($updates)
{
$this->incomingUpdates[] = $updates;
}
public function save($updates)
{
$this->feeder->feed($updates);
}
public function has_all_auth()
{
if ($this->API->isInitingAuthorization()) {
return false;
}
foreach ($this->API->datacenter->sockets as $dc) {
if (!$dc->authorized || $dc->temp_auth_key === null) {
return false;
}
}
return true;
}
}

View File

@ -42,7 +42,7 @@ class UpdateLoop extends ResumableSignalLoop
public function loop()
{
$API = $this->API;
$feeder = $this->feeder = $API->feeder[$this->channelId];
$feeder = $this->feeder = $API->feeders[$this->channelId];
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
@ -56,13 +56,13 @@ class UpdateLoop extends ResumableSignalLoop
$this->startedLoop();
$API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered updates loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['updates']['getdifference_interval'];
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$API->logger->logger("Exiting update loop in channel {$this->channelId}");
$this->exitedLoop();
return;
@ -160,8 +160,9 @@ class UpdateLoop extends ResumableSignalLoop
}
}
}
$feeder->resumeDefer();
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting update loop in DC $datacenter");
$API->logger->logger("Exiting update loop in channel {$this->channelId}");
$this->exitedLoop();
return;

View File

@ -258,7 +258,7 @@ trait ResponseHandler
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
if (strpos($datacenter, 'cdn') === false) {
$this->callFork($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']));
$this->callForkDefer($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']));
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
@ -572,40 +572,41 @@ trait ResponseHandler
$this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
$opts = [];
$result = [];
switch ($updates['_']) {
case 'updates':
case 'updatesCombined':
$handle_updates = [];
foreach ($updates['updates'] as $key => $update) {
if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' ||
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||
$update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' ||
$update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') {
$handle_updates[] = $update;
$result[yield $this->feedSingle($update)] = true;
unset($updates['updates'][$key]);
}
}
$this->feeders[false]->feed($handle_updates);
if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} else {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
}
$this->feeders[false]->feed($updates);
foreach ($updates as $update) {
$result[yield $this->feedSingle($update)] = true;
}
}
break;
case 'updateShort':
$updates['update']['options'] = ['date' => $updates['date']];
$this->feeders[false]->feed([$updates['update']]);
$this->feedSingle($updates['update']);
break;
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
$to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']);
if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) {
yield $this->updaters[false]->resume();
yield $this->updaters[false]->resumeDefer();
return;
// TOFIX
}
$message = $updates;
@ -624,7 +625,8 @@ trait ResponseHandler
break;
}
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
yield $this->handle_update_async($update, $opts);
$updates['update']['options'] = ['date' => $updates['date']];
$result[yield $this->feedSingle($update)] = true;
break;
case 'updateShortSentMessage':
//yield $this->set_update_state_async(['date' => $updates['date']]);
@ -636,5 +638,8 @@ trait ResponseHandler
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true));
break;
}
foreach ($result as $channelId => $Boh) {
$this->feeders[$channelId]->resumeDefer();
}
}
}

View File

@ -22,7 +22,6 @@ namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Request;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use function Amp\Promise\any;
/**
@ -144,7 +143,7 @@ trait UpdateHandler
return $data;
}
public function handle_update_async($update, $options = [])
public function feedSingle($update)
{
if (!$this->settings['updates']['handle_updates']) {
return;
@ -168,20 +167,16 @@ trait UpdateHandler
break;
}
if ($channelId === false) {
$cur_state = yield $this->load_update_state_async();
} else {
if (!$this->channels_state->has($channelId)) {
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
if ($channelId && !$this->channels_state->has($channelId)) {
$this->channels_state->get($channelId, $update);
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
$cur_state = $this->channels_state->get($channelId, $update);
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
switch ($update['_']) {
@ -189,7 +184,7 @@ trait UpdateHandler
$this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
$this->updaters[$channelId]->resumeDefer();
return false;
return;
case 'updateNewMessage':
case 'updateEditMessage':
case 'updateNewChannelMessage':
@ -228,17 +223,19 @@ trait UpdateHandler
$this->updaters[false]->resumeDefer();
}
return false;
return;
}
break;
default:
if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return false;
return;
}
break;
}
$this->feeders[$channelId]->feedSingle($update);
return $channelId;
}
public function handle_update_messages_async($messages, $channel = false)
@ -313,7 +310,7 @@ trait UpdateHandler
}
if ($update['qts'] > $cur_state->qts() + 1) {
$this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR);
$this->updaters[$channelId]->resumeDefer();
$this->updaters[false]->resumeDefer();
return false;
}

View File

@ -193,4 +193,14 @@ class UpdatesState
{
return ($this->pts + $update['pts_count']) - $update['pts'];
}
/**
* Check validity of seq contained in update
*
* @param int $seq
* @return int -1 if it's too old, 0 if it's ok, 1 if it's too new
*/
public function checkSeq($seq)
{
return $seq - ($this->seq + 1);
}
}

View File

@ -240,8 +240,11 @@ trait Tools
return $promise;
}
public function callFork($promise)
public function callFork($promise, $actual = null)
{
if ($actual) {
$promise = $actual;
}
if ($promise instanceof \Generator) {
$promise = new Coroutine($promise);
}
@ -254,6 +257,10 @@ trait Tools
}
return $promise;
}
public function callForkDefer($promise)
{
Loop::defer([$this, 'callFork'], $promise);
}
public function rethrow($e)
{
$logger = isset($this->logger) ? $this->logger : Logger::$default;