Working on async update management...

This commit is contained in:
Daniil Gentili 2019-05-11 17:33:43 +02:00
parent caec0e5109
commit db2eadb217
3 changed files with 26 additions and 24 deletions

View File

@ -55,7 +55,7 @@ class UpdateLoop extends ResumableSignalLoop
}
}
if (time() - $API->last_getdifference > $timeout) {
if (!$API->get_updates_difference()) {
if (!yield $API->get_updates_difference_async()) {
return false;
}
}

View File

@ -556,15 +556,17 @@ trait ResponseHandler
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
Loop::defer(function () use ($request_id, $response, $datacenter, $botAPI) {
$this->call((function ()use ($request_id, $response, $datacenter, $botAPI) {
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$this->logger->logger("Deferred: sent $r to deferred");
if ($botAPI) {
$response = yield $this->MTProto_to_botAPI_async($response);
}
$this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']->resolve($response);
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']);
})());
$this->call((
function () use ($request_id, $response, $datacenter, $botAPI) {
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$this->logger->logger("Deferred: sent $r to deferred");
if ($botAPI) {
$response = yield $this->MTProto_to_botAPI_async($response);
}
$this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']->resolve($response);
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']);
}
)());
});
}

View File

@ -232,30 +232,30 @@ trait UpdateHandler
public function set_update_state_async($data)
{
if (isset($data['pts']) && $data['pts'] !== 0) {
$this->load_update_state()['pts'] = $data['pts'];
yield $this->load_update_state_async()['pts'] = $data['pts'];
}
if (isset($data['qts']) && $data['qts'] !== 0) {
$this->load_update_state()['qts'] = $data['qts'];
yield $this->load_update_state_async()['qts'] = $data['qts'];
}
if (isset($data['seq']) && $data['seq'] !== 0) {
$this->load_update_state()['seq'] = $data['seq'];
yield $this->load_update_state_async()['seq'] = $data['seq'];
}
if (isset($data['date']) && $data['date'] > $this->load_update_state()['date']) {
$this->load_update_state()['date'] = $data['date'];
if (isset($data['date']) && $data['date'] > yield $this->load_update_state_async()['date']) {
yield $this->load_update_state_async()['date'] = $data['date'];
}
}
public function reset_update_state_async()
{
$this->load_update_state()['pts'] = 1;
$this->load_update_state()['qts'] = 0;
$this->load_update_state()['seq'] = 0;
$this->load_update_state()['date'] = 1;
yield $this->load_update_state_async()['pts'] = 1;
yield $this->load_update_state_async()['qts'] = 0;
yield $this->load_update_state_async()['seq'] = 0;
yield $this->load_update_state_async()['date'] = 1;
foreach ($this->channels_state as &$state) {
$state['pts'] = 1;
}
$this->msg_ids = [];
}
public function &load_update_state_async()
public function load_update_state_async()
{
if (!isset($this->updates_state['qts'])) {
$this->updates_state['qts'] = 0;
@ -283,7 +283,7 @@ trait UpdateHandler
$this->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
while (!isset($difference)) {
try {
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $this->load_update_state()['pts'], 'date' => $this->load_update_state()['date'], 'qts' => $this->load_update_state()['qts']], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => yield $this->load_update_state_async()['pts'], 'date' => yield $this->load_update_state_async()['date'], 'qts' => yield $this->load_update_state_async()['qts']], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
} catch (\danog\MadelineProto\PTSException $e) {
$this->updates_state['sync_loading'] = false;
$this->got_state = false;
@ -388,7 +388,7 @@ trait UpdateHandler
}
if ($channel_id === false) {
$cur_state = &$this->load_update_state();
$cur_state = &yield $this->load_update_state_async();
} else {
$cur_state = &$this->load_channel_state($channel_id, (isset($update['pts']) ? $update['pts'] : 0) - (isset($update['pts_count']) ? $update['pts_count'] : 0));
}
@ -519,7 +519,7 @@ trait UpdateHandler
return;
}
foreach ($messages as $message) {
yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel === false ? $this->load_update_state()['pts'] : $this->load_channel_state($channel)['pts'], 'pts_count' => 0]);
yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel === false ? yield $this->load_update_state_async()['pts'] : $this->load_channel_state($channel)['pts'], 'pts_count' => 0]);
}
}
@ -577,7 +577,7 @@ trait UpdateHandler
}
}
if ($update['_'] === 'updateNewEncryptedMessage' && !isset($update['message']['decrypted_message'])) {
$cur_state = $this->load_update_state();
$cur_state = yield $this->load_update_state_async();
if ($cur_state['qts'] === -1) {
$cur_state['qts'] = $update['qts'];
}