This commit is contained in:
Daniil Gentili 2019-05-29 22:36:16 +02:00
parent 5da683f508
commit b4114cd5f8
6 changed files with 22 additions and 19 deletions

View File

@ -106,6 +106,12 @@ class FeedLoop extends ResumableSignalLoop
$key = key($updates); $key = key($updates);
$update = $updates[$key]; $update = $updates[$key];
unset($updates[$key]); unset($updates[$key]);
if ($update['_'] === 'updateChannelTooLong') {
$this->API->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
$this->API->updaters[$this->channelId]->resumeDefer();
continue;
}
if (isset($update['pts'])) { if (isset($update['pts'])) {
$logger = function ($msg) use ($update) { $logger = function ($msg) use ($update) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
@ -123,7 +129,7 @@ class FeedLoop extends ResumableSignalLoop
} }
if ($result > 0) { if ($result > 0) {
$logger("PTS hole"); $logger("PTS hole");
$this->updater->setLimit($state->pts + $result); $this->updater->setLimit($this->state->pts() + $result);
yield $this->updater->resume(); yield $this->updater->resume();
$updates = array_merge($this->incomingUpdates, $updates); $updates = array_merge($this->incomingUpdates, $updates);
$this->incomingUpdates = null; $this->incomingUpdates = null;

View File

@ -76,7 +76,7 @@ class SeqLoop extends ResumableSignalLoop
return; return;
} }
if (!$this->settings['updates']['handle_updates']) { if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update seq loop"); $API->logger->logger("Exiting update seq loop");
$this->exitedLoop(); $this->exitedLoop();
return; return;
@ -85,7 +85,7 @@ class SeqLoop extends ResumableSignalLoop
while ($this->incomingUpdates) { while ($this->incomingUpdates) {
$updates = $this->incomingUpdates; $updates = $this->incomingUpdates;
$this->incomingUpdates = []; $this->incomingUpdates = [];
$result += yield $this->parse($updates); $result = array_merge(yield $this->parse($updates), $result);
$updates = null; $updates = null;
} }
foreach ($result as $channelId => $boh) { foreach ($result as $channelId => $boh) {
@ -95,22 +95,22 @@ class SeqLoop extends ResumableSignalLoop
} }
public function parse($updates) public function parse($updates)
{ {
$result = []; $fresult = [];
reset($updates); reset($updates);
while ($updates) { while ($updates) {
$options = []; $options = [];
$key = key($updates); $key = key($updates);
$update = $updates[$key]; $update = $updates[$key];
unset($updates[$key]); unset($updates[$key]);
$options = $update['options']; $options = $update['options'];
$updates = $update['updates'];
$seq_start = $options['seq_start']; $seq_start = $options['seq_start'];
$seq_end = $options['seq_end']; $seq_end = $options['seq_end'];
$result = $this->state->checkSeq($seq_start); $result = $this->state->checkSeq($seq_start);
if ($result > 0) { if ($result > 0) {
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR);
yield $this->pause(1.0); yield $this->pause(1.0);
if (!$this->incomingUpdates) { if (!$this->incomingUpdates) {
yield $this->updaters[false]->resume(); yield $this->updaters[false]->resume();
@ -120,7 +120,7 @@ class SeqLoop extends ResumableSignalLoop
continue; continue;
} }
if ($result < 0) { if ($result < 0) {
$this->logger->logger('Seq too old. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger('Seq too old. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR);
continue; continue;
} }
$this->state->seq($seq_end); $this->state->seq($seq_end);
@ -128,9 +128,9 @@ class SeqLoop extends ResumableSignalLoop
$this->state->date($options['date']); $this->state->date($options['date']);
} }
$result += $this->save($updates); $fresult = array_merge(yield $this->save($update), $fresult);
} }
return $result; return $fresult;
} }
public function feed($updates) public function feed($updates)
{ {

View File

@ -80,7 +80,7 @@ class UpdateLoop extends ResumableSignalLoop
} else { } else {
$limit = 100; $limit = 100;
} }
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]); $difference = yield $this->API->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->API->datacenter->curdc]);
if (isset($difference['timeout'])) { if (isset($difference['timeout'])) {
$timeout = $difference['timeout']; $timeout = $difference['timeout'];
} }

View File

@ -587,9 +587,9 @@ trait ResponseHandler
if ($updates['updates']) { if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') { if ($updates['_'] === 'updatesCombined') {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; $updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} else { } else {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; $updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} }
$this->seqUpdater->feed($updates); $this->seqUpdater->feed($updates);
$this->seqUpdater->resumeDefer(); $this->seqUpdater->resumeDefer();

View File

@ -23,6 +23,8 @@ use Amp\Artax\Request;
use Amp\Deferred; use Amp\Deferred;
use Amp\Delayed; use Amp\Delayed;
use function Amp\Promise\any; use function Amp\Promise\any;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\UpdateLoop;
/** /**
* Manages updates. * Manages updates.
@ -180,11 +182,6 @@ trait UpdateHandler
} }
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelTooLong':
$this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
$this->updaters[$channelId]->resumeDefer();
return;
case 'updateNewMessage': case 'updateNewMessage':
case 'updateEditMessage': case 'updateEditMessage':
case 'updateNewChannelMessage': case 'updateNewChannelMessage':

View File

@ -201,6 +201,6 @@ class UpdatesState
*/ */
public function checkSeq($seq) public function checkSeq($seq)
{ {
return $seq - ($this->seq + 1); return $seq ? $seq - ($this->seq + 1) : $seq;
} }
} }