diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index c08ec7be..a3397c58 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto\Loop\Update; use Amp\Success; use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use Amp\Loop; /** * update feed loop. @@ -95,6 +96,7 @@ class FeedLoop extends ResumableSignalLoop } $this->parsedUpdates = []; } + if ($API->update_deferred) Loop::defer([$API->update_deferred, 'resolve']); } } public function parse($updates) @@ -107,11 +109,11 @@ class FeedLoop extends ResumableSignalLoop if (isset($update['pts'])) { $logger = function ($msg) use ($update) { $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; - $this->logger->logger($update); + $this->API->logger->logger($update); $double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-'; $mid = isset($update['message']['id']) ? $update['message']['id'] : '-'; $mypts = $this->state->pts(); - $this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); + $this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); }; $result = $this->state->checkPts($update); if ($result < 0) { diff --git a/src/danog/MadelineProto/Loop/Update/SeqLoop.php b/src/danog/MadelineProto/Loop/Update/SeqLoop.php index 16e47339..4baef5b6 100644 --- a/src/danog/MadelineProto/Loop/Update/SeqLoop.php +++ b/src/danog/MadelineProto/Loop/Update/SeqLoop.php @@ -81,17 +81,21 @@ class SeqLoop extends ResumableSignalLoop $this->exitedLoop(); return; } + $result = []; while ($this->incomingUpdates) { $updates = $this->incomingUpdates; $this->incomingUpdates = []; - yield $this->parse($updates); + $result += yield $this->parse($updates); $updates = null; } - $feeder->resumeDefer(); + foreach ($result as $channelId => $boh) { + $this->API->feeders[$channelId]->resumeDefer(); + } } } public function parse($updates) { + $result = []; reset($updates); while ($updates) { $options = []; @@ -124,8 +128,9 @@ class SeqLoop extends ResumableSignalLoop $this->state->date($options['date']); } - $this->save($updates); + $result += $this->save($updates); } + return $result; } public function feed($updates) { @@ -133,7 +138,11 @@ class SeqLoop extends ResumableSignalLoop } public function save($updates) { - $this->feeder->feed($updates); + $result = []; + foreach ($updates['updates'] as $update) { + $result[yield $this->API->feedSingle($update)] = true; + } + return $result; } public function has_all_auth() { diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 38fd35a5..26ebf0b4 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -591,9 +591,8 @@ trait ResponseHandler } else { $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } - foreach ($updates as $update) { - $result[yield $this->seqUpdater->feed($update)] = true; - } + $this->seqUpdater->feed($updates); + $this->seqUpdater->resumeDefer(); } break; case 'updateShort':