Now working (with a huge latency but working)

This commit is contained in:
Daniil Gentili 2019-05-29 22:12:36 +02:00
parent 28928e339f
commit 5da683f508
3 changed files with 19 additions and 9 deletions

View File

@ -21,6 +21,7 @@ namespace danog\MadelineProto\Loop\Update;
use Amp\Success; use Amp\Success;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use Amp\Loop;
/** /**
* update feed loop. * update feed loop.
@ -95,6 +96,7 @@ class FeedLoop extends ResumableSignalLoop
} }
$this->parsedUpdates = []; $this->parsedUpdates = [];
} }
if ($API->update_deferred) Loop::defer([$API->update_deferred, 'resolve']);
} }
} }
public function parse($updates) public function parse($updates)
@ -107,11 +109,11 @@ class FeedLoop extends ResumableSignalLoop
if (isset($update['pts'])) { if (isset($update['pts'])) {
$logger = function ($msg) use ($update) { $logger = function ($msg) use ($update) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
$this->logger->logger($update); $this->API->logger->logger($update);
$double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-'; $double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-';
$mid = isset($update['message']['id']) ? $update['message']['id'] : '-'; $mid = isset($update['message']['id']) ? $update['message']['id'] : '-';
$mypts = $this->state->pts(); $mypts = $this->state->pts();
$this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: {$this->channelId}", \danog\MadelineProto\Logger::ERROR);
}; };
$result = $this->state->checkPts($update); $result = $this->state->checkPts($update);
if ($result < 0) { if ($result < 0) {

View File

@ -81,17 +81,21 @@ class SeqLoop extends ResumableSignalLoop
$this->exitedLoop(); $this->exitedLoop();
return; return;
} }
$result = [];
while ($this->incomingUpdates) { while ($this->incomingUpdates) {
$updates = $this->incomingUpdates; $updates = $this->incomingUpdates;
$this->incomingUpdates = []; $this->incomingUpdates = [];
yield $this->parse($updates); $result += yield $this->parse($updates);
$updates = null; $updates = null;
} }
$feeder->resumeDefer(); foreach ($result as $channelId => $boh) {
$this->API->feeders[$channelId]->resumeDefer();
}
} }
} }
public function parse($updates) public function parse($updates)
{ {
$result = [];
reset($updates); reset($updates);
while ($updates) { while ($updates) {
$options = []; $options = [];
@ -124,8 +128,9 @@ class SeqLoop extends ResumableSignalLoop
$this->state->date($options['date']); $this->state->date($options['date']);
} }
$this->save($updates); $result += $this->save($updates);
} }
return $result;
} }
public function feed($updates) public function feed($updates)
{ {
@ -133,7 +138,11 @@ class SeqLoop extends ResumableSignalLoop
} }
public function save($updates) public function save($updates)
{ {
$this->feeder->feed($updates); $result = [];
foreach ($updates['updates'] as $update) {
$result[yield $this->API->feedSingle($update)] = true;
}
return $result;
} }
public function has_all_auth() public function has_all_auth()
{ {

View File

@ -591,9 +591,8 @@ trait ResponseHandler
} else { } else {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} }
foreach ($updates as $update) { $this->seqUpdater->feed($updates);
$result[yield $this->seqUpdater->feed($update)] = true; $this->seqUpdater->resumeDefer();
}
} }
break; break;
case 'updateShort': case 'updateShort':