Continue rewriting update management

This commit is contained in:
Daniil Gentili 2019-05-29 17:19:42 +02:00
parent 7f47e3de85
commit c86e9d31fb
10 changed files with 185 additions and 487 deletions

View File

@ -55,7 +55,7 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn
$pause = $this->pause; $pause = $this->pause;
$this->pause = new Deferred; $this->pause = new Deferred;
Loop::defer([$pause, 'resolve']); if ($pause) Loop::defer([$pause, 'resolve']);
return $this->resume->promise(); return $this->resume->promise();
} }

View File

@ -16,7 +16,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation * @link https://docs.madelineproto.xyz MadelineProto documentation
*/ */
namespace danog\MadelineProto\Loop\Connection; namespace danog\MadelineProto\Loop\Update;
use Amp\Success; use Amp\Success;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
@ -43,7 +43,7 @@ class FeedLoop extends ResumableSignalLoop
public function loop() public function loop()
{ {
$API = $this->API; $API = $this->API;
$updater = $this->updater = $API->updater[$this->channelId]; $updater = $this->updater = $API->updaters[$this->channelId];
if (!$this->API->settings['updates']['handle_updates']) { if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0); yield new Success(0);
@ -145,7 +145,7 @@ class FeedLoop extends ResumableSignalLoop
$seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq']; $seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq'];
if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) { if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) {
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->get_updates_difference_async(); yield $this->updaters[false]->resume();
return false; return false;
} }
@ -164,10 +164,9 @@ class FeedLoop extends ResumableSignalLoop
{ {
$this->incomingUpdates = array_merge($this->incomingUpdates, $updates); $this->incomingUpdates = array_merge($this->incomingUpdates, $updates);
} }
public function fetchSlice($to_pts) public function feedSingle($update)
{ {
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $this->state->pts(), 'pts_total_limit' => $to_pts, 'date' => $this->state->date(), 'qts' => $this->state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); $this->incomingUpdates []= $update;
var_dumP($difference);
} }
public function save($update) public function save($update)
{ {

View File

@ -42,7 +42,6 @@ class UpdateLoop extends ResumableSignalLoop
public function loop() public function loop()
{ {
$API = $this->API; $API = $this->API;
$datacenter = $this->datacenter;
$feeder = $this->feeder = $API->feeder[$this->channelId]; $feeder = $this->feeder = $API->feeder[$this->channelId];
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
@ -69,101 +68,99 @@ class UpdateLoop extends ResumableSignalLoop
return; return;
} }
} }
if (time() - $API->last_getdifference > $timeout) { $toPts = $this->toPts;
$toPts = $this->toPts; $this->toPts = null;
$this->toPts = null; while (true) {
while (true) { if ($this->channelId) {
if ($this->channelId) { $this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); if ($state->pts() <= 1) {
if ($state->pts() <= 1) { $limit = 10;
$limit = 10; } else if ($API->authorization['user']['bot']) {
} else if ($API->authorization['user']['bot']) { $limit = 100000;
$limit = 100000;
} else {
$limit = 100;
}
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]);
if (isset($difference['timeout'])) {
$timeout = $difference['timeout'];
}
switch ($difference['_']) {
case 'updates.channelDifferenceEmpty':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
unset($difference);
break 2;
case 'updates.channelDifference':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
if ($state->pts() >= $difference['pts'] && $state->pts() > 1) {
$this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts()+1), \danog\MadelineProto\Logger::VERBOSE);
$difference['pts'] = $state->pts() + 1;
}
$state->update($difference);
$feeder->feed($difference['other_updates']);
yield $this->handle_update_messages_async($difference['new_messages'], $channel);
if (!$difference['final']) {
if ($difference['pts'] >= $toPts) {
unset($difference);
break 2;
}
unset($difference);
break;
}
unset($difference);
break 2;
case 'updates.channelDifferenceTooLong':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
yield $this->handle_update_messages_async($difference['messages'], $channel);
unset($difference);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
}
} else { } else {
$this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $limit = 100;
}
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]);
if (isset($difference['timeout'])) {
$timeout = $difference['timeout'];
}
$difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); switch ($difference['_']) {
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); case 'updates.channelDifferenceEmpty':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
unset($difference);
break 2;
case 'updates.channelDifference':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
if ($state->pts() >= $difference['pts'] && $state->pts() > 1) {
$this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE);
$difference['pts'] = $state->pts() + 1;
}
$state->update($difference);
$feeder->feed($difference['other_updates']);
switch ($difference['_']) { yield $this->handle_update_messages_async($difference['new_messages'], $channel);
case 'updates.differenceEmpty': if (!$difference['final']) {
$state->update($difference); if ($difference['pts'] >= $toPts) {
unset($difference);
break 2;
case 'updates.difference':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['state']);
unset($difference);
break 2;
case 'updates.differenceSlice':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['intermediate_state']);
if ($difference['intermediate_state']['pts'] >= $toPts) {
unset($difference); unset($difference);
break 2; break 2;
} }
unset($difference); unset($difference);
break; break;
default: }
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); unset($difference);
} break 2;
case 'updates.channelDifferenceTooLong':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
yield $this->handle_update_messages_async($difference['messages'], $channel);
unset($difference);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
}
} else {
$this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]);
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
switch ($difference['_']) {
case 'updates.differenceEmpty':
$state->update($difference);
unset($difference);
break 2;
case 'updates.difference':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['state']);
unset($difference);
break 2;
case 'updates.differenceSlice':
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$state->update($difference['intermediate_state']);
if ($difference['intermediate_state']['pts'] >= $toPts) {
unset($difference);
break 2;
}
unset($difference);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
} }
} }
} }
if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) { if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting update loop in DC $datacenter"); $API->logger->logger("Exiting update loop in DC $datacenter");
$this->exitedLoop(); $this->exitedLoop();

View File

@ -420,7 +420,7 @@ class MTProto extends AsyncConstruct implements TLCallback
} }
if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state->syncLoading()) { if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state->syncLoading()) {
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE); $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE);
yield $this->get_updates_difference_async(); yield $this->updaters[false]->resume();
} }
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
} }

View File

@ -463,13 +463,7 @@ trait AuthKeyHandler
public function get_dh_config_async() public function get_dh_config_async()
{ {
$this->updates_state->syncLoading(true); $dh_config = yield $this->method_call_async_read('messages.getDhConfig', ['version' => $this->dh_config['version'], 'random_length' => 0], ['datacenter' => $this->datacenter->curdc]);
try {
$dh_config = yield $this->method_call_async_read('messages.getDhConfig', ['version' => $this->dh_config['version'], 'random_length' => 0], ['datacenter' => $this->datacenter->curdc]);
} finally {
$this->updates_state->syncLoading(false);
}
if ($dh_config['_'] === 'messages.dhConfigNotModified') { if ($dh_config['_'] === 'messages.dhConfigNotModified') {
$this->logger->logger(\danog\MadelineProto\Logger::VERBOSE, ['DH configuration not modified']); $this->logger->logger(\danog\MadelineProto\Logger::VERBOSE, ['DH configuration not modified']);
@ -563,12 +557,7 @@ trait AuthKeyHandler
return false; return false;
} }
// Creates authorization keys
public function init_authorization_async() public function init_authorization_async()
{
return $this->ainit_authorization_async();
}
public function ainit_authorization_async()
{ {
if ($this->pending_auth) { if ($this->pending_auth) {
return; return;
@ -576,8 +565,6 @@ trait AuthKeyHandler
$initing = $this->initing_authorization; $initing = $this->initing_authorization;
$this->initing_authorization = true; $this->initing_authorization = true;
$this->updates_state->syncLoading(true);
$this->postpone_updates = true;
try { try {
$dcs = []; $dcs = [];
@ -618,10 +605,7 @@ trait AuthKeyHandler
} }
} finally { } finally {
$this->pending_auth = false; $this->pending_auth = false;
$this->postpone_updates = false;
$this->initing_authorization = $initing; $this->initing_authorization = $initing;
$this->updates_state->syncLoading(false);
yield $this->handle_pending_updates_async();
} }
} }

View File

@ -115,8 +115,7 @@ trait ResponseHandler
// Acknowledge that I received the server's response // Acknowledge that I received the server's response
if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) { if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) {
$this->updaters[false]->resumeDefer();
$this->callFork($this->get_updates_difference_async());
} }
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
@ -292,7 +291,7 @@ trait ResponseHandler
} }
} }
$this->datacenter->sockets[$datacenter]->writer->resume(); $this->datacenter->sockets[$datacenter]->writer->resume();
//$this->n--; //$this->n--;
return $only_updates; return $only_updates;
@ -562,23 +561,6 @@ trait ResponseHandler
)()); )());
} }
public function handle_pending_updates_async()
{
if ($this->postpone_updates) {
return false;
}
if (count($this->pending_updates)) {
$this->logger->logger('Parsing pending updates...');
foreach (array_keys($this->pending_updates) as $key) {
if (isset($this->pending_updates[$key])) {
$updates = $this->pending_updates[$key];
unset($this->pending_updates[$key]);
yield $this->handle_updates_async($updates);
}
}
}
}
public function handle_updates_async($updates, $actual_updates = null) public function handle_updates_async($updates, $actual_updates = null)
{ {
if (!$this->settings['updates']['handle_updates']) { if (!$this->settings['updates']['handle_updates']) {
@ -588,78 +570,71 @@ trait ResponseHandler
$updates = $actual_updates; $updates = $actual_updates;
} }
if ($this->postpone_updates) {
$this->logger->logger('Postpone update handling', \danog\MadelineProto\Logger::VERBOSE);
$this->pending_updates[] = $updates;
return false;
}
yield $this->handle_pending_updates_async();
$this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE); $this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
try { $opts = [];
$this->postpone_updates = true; switch ($updates['_']) {
case 'updates':
$opts = []; case 'updatesCombined':
foreach (['date', 'seq', 'seq_start'] as $key) { $handle_updates = [];
if (isset($updates[$key])) { foreach ($updates['updates'] as $key => $update) {
$opts[$key] = $updates[$key]; if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' ||
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||
$update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' ||
$update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') {
$handle_updates[] = $update;
unset($updates['updates'][$key]);
}
} }
} $this->feeders[false]->feed($handle_updates);
switch ($updates['_']) { if ($updates['updates']) {
case 'updates': if ($updates['_'] === 'updatesCombined') {
case 'updatesCombined': $updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
foreach ($updates['updates'] as $update) { } else {
yield $this->handle_update_async($update, $opts); $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} }
break; $this->feeders[false]->feed($updates);
case 'updateShort': }
yield $this->handle_update_async($updates['update'], $opts); break;
break; case 'updateShort':
case 'updateShortMessage': $updates['update']['options'] = ['date' => $updates['date']];
case 'updateShortChatMessage': $this->feeders[false]->feed([$updates['update']]);
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); break;
$to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); case 'updateShortMessage':
if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { case 'updateShortChatMessage':
$this->logger->logger('getDifference: good - getting user for updateShortMessage', \danog\MadelineProto\Logger::VERBOSE); $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
yield $this->get_updates_difference_async(); $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']);
} if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) {
$message = $updates; yield $this->updaters[false]->resume();
$message['_'] = 'message'; // TOFIX
$message['from_id'] = $from_id; }
$message = $updates;
$message['_'] = 'message';
$message['from_id'] = $from_id;
try { try {
$message['to_id'] = (yield $this->get_info_async($to_id))['Peer']; $message['to_id'] = (yield $this->get_info_async($to_id))['Peer'];
} catch (\danog\MadelineProto\Exception $e) { } catch (\danog\MadelineProto\Exception $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates; //$this->pending_updates[] = $updates;
break;
} catch (\danog\MadelineProto\RPCErrorException $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break;
}
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
yield $this->handle_update_async($update, $opts);
break; break;
case 'updateShortSentMessage': } catch (\danog\MadelineProto\RPCErrorException $e) {
//yield $this->set_update_state_async(['date' => $updates['date']]); $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break; break;
case 'updatesTooLong': }
yield $this->get_updates_difference_async(); $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
break; yield $this->handle_update_async($update, $opts);
default: break;
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); case 'updateShortSentMessage':
break; //yield $this->set_update_state_async(['date' => $updates['date']]);
} break;
} finally { case 'updatesTooLong':
$this->postpone_updates = false; $this->updaters[false]->resumeDefer();
} break;
if ($this->updates && $this->update_deferred) { default:
$d = $this->update_deferred; throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true));
$this->update_deferred = null; break;
Loop::defer([$d, 'resolve']);
} }
} }
} }

View File

@ -122,99 +122,6 @@ trait UpdateHandler
return false; return false;
} }
public function get_channel_difference_async($channel)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
if ($this->channels_state->syncLoading($channel)) {
$this->logger->logger('Not fetching '.$channel.' difference, I am already fetching it');
return;
}
$this->channels_state->syncLoading($channel, true);
$this->postpone_updates = true;
try {
$input = yield $this->get_info_async('channel#'.$channel);
if (!isset($input['InputChannel'])) {
throw new \danog\MadelineProto\Exception('This peer is not present in the internal peer database');
}
$input = $input['InputChannel'];
} catch (\danog\MadelineProto\Exception $e) {
return false;
} catch (\danog\MadelineProto\RPCErrorException $e) {
return false;
} finally {
$this->postpone_updates = false;
$this->channels_state->syncLoading($channel, false);
}
$this->logger->logger('Fetching '.$channel.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->channels_state->syncLoading($channel, true);
$this->postpone_updates = true;
try {
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => $input, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $this->channels_state->get($channel)->pts(), 'limit' => 30], ['datacenter' => $this->datacenter->curdc]);
} catch (\danog\MadelineProto\RPCErrorException $e) {
if ($e->getMessage() === "You haven't joined this channel/supergroup") {
return false;
}
throw $e;
} catch (\danog\MadelineProto\PTSException $e) {
$this->logger->logger($e->getMessage());
$this->channels_state->remove($channel);
return false; //yield $this->get_channel_difference_async($channel);
} finally {
$this->postpone_updates = false;
$this->channels_state->syncLoading($channel, false);
}
unset($input);
switch ($difference['_']) {
case 'updates.channelDifferenceEmpty':
$this->channels_state->get($channel, $difference);
break;
case 'updates.channelDifference':
$this->channels_state->syncLoading($channel, true);
$this->postpone_updates = true;
try {
$this->channels_state->get($channel, $difference);
yield $this->handle_update_messages_async($difference['new_messages'], $channel);
yield $this->handle_multiple_update_async($difference['other_updates'], [], $channel);
} finally {
$this->postpone_updates = false;
$this->channels_state->syncLoading($channel, false);
}
if (!$difference['final']) {
unset($difference);
yield $this->get_channel_difference_async($channel);
}
break;
case 'updates.channelDifferenceTooLong':
$this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$this->channels_state->syncLoading($channel, true);
$this->postpone_updates = true;
try {
$this->channels_state->get($channel, $difference);
yield $this->handle_update_messages_async($difference['messages'], $channel);
unset($difference);
} finally {
$this->postpone_updates = false;
$this->channels_state->syncLoading($channel, false);
}
yield $this->get_channel_difference_async($channel);
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
break;
}
yield $this->handle_pending_updates_async();
}
public function load_update_state_async() public function load_update_state_async()
{ {
if (!$this->got_state) { if (!$this->got_state) {
@ -229,91 +136,10 @@ trait UpdateHandler
return $this->channels_state->get($channelId); return $this->channels_state->get($channelId);
} }
public function get_updates_difference_async($w = null)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
if ($this->updates_state->syncLoading()) {
$this->logger->logger('Not fetching normal difference, I am already fetching it');
return false;
}
$this->updates_state->syncLoading(true);
$this->postpone_updates = true;
$this->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
while (!isset($difference)) {
try {
$state = yield $this->load_update_state_async();
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
} catch (\danog\MadelineProto\PTSException $e) {
$this->updates_state->syncLoading(false);
$this->got_state = false;
} finally {
$this->postpone_updates = false;
$this->updates_state->syncLoading(false);
}
}
$this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->postpone_updates = true;
$this->updates_state->syncLoading(true);
$this->last_getdifference = time();
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->resume();
try {
switch ($difference['_']) {
case 'updates.differenceEmpty':
$this->updates_state->update($difference);
break;
case 'updates.difference':
$this->updates_state->syncLoading(true);
yield $this->handle_multiple_update_async($difference['other_updates']);
foreach ($difference['new_encrypted_messages'] as $encrypted) {
yield $this->handle_encrypted_update_async(['_' => 'updateNewEncryptedMessage', 'message' => $encrypted], true);
}
yield $this->handle_update_messages_async($difference['new_messages']);
$this->updates_state->update($difference['state']);
break;
case 'updates.differenceSlice':
$this->updates_state->syncLoading(true);
yield $this->handle_multiple_update_async($difference['other_updates']);
yield $this->handle_update_messages_async($difference['new_messages']);
$this->updates_state->update($difference['intermediate_state']);
unset($difference);
$this->updates_state->syncLoading(false);
yield $this->get_updates_difference_async();
break;
default:
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
break;
}
} finally {
$this->postpone_updates = false;
$this->updates_state->syncLoading(false);
}
yield $this->handle_pending_updates_async();
if ($this->updates && $this->update_deferred) {
$d = $this->update_deferred;
$this->update_deferred = null;
Loop::defer([$d, 'resolve']);
}
return true;
}
public function get_updates_state_async() public function get_updates_state_async()
{ {
$last = $this->updates_state->syncLoading(); $data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
$this->updates_state->syncLoading(true); yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']);
try {
$data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']);
} finally {
$this->updates_state->syncLoading($last);
}
return $data; return $data;
} }
@ -324,37 +150,44 @@ trait UpdateHandler
return; return;
} }
$this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); $this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
$channel_id = false; $channelId = false;
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelWebPage': case 'updateChannelWebPage':
case 'updateNewChannelMessage': case 'updateNewChannelMessage':
case 'updateEditChannelMessage': case 'updateEditChannelMessage':
$channel_id = $update['message']['to_id']['channel_id']; $channelId = $update['message']['to_id']['channel_id'];
break; break;
case 'updateDeleteChannelMessages': case 'updateDeleteChannelMessages':
$channel_id = $update['channel_id']; $channelId = $update['channel_id'];
break; break;
case 'updateChannelTooLong': case 'updateChannelTooLong':
$channel_id = $update['channel_id']; $channelId = $update['channel_id'];
$this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE); if (!$this->channels_state->has($channelId) && !isset($update['pts'])) {
if (!$this->channels_state->has($channel_id) && !isset($update['pts'])) { $update['pts'] = 1;
$this->logger->logger('I do not have the channel in the states and the pts is not set.', \danog\MadelineProto\Logger::ERROR);
return;
} }
break; break;
} }
if ($channel_id === false) { if ($channelId === false) {
$cur_state = yield $this->load_update_state_async(); $cur_state = yield $this->load_update_state_async();
} else { } else {
$cur_state = $this->channels_state->get($channel_id, $update); if (!$this->channels_state->has($channelId)) {
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
$cur_state = $this->channels_state->get($channelId, $update);
} }
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelTooLong': case 'updateChannelTooLong':
$this->datacenter->sockets[] $this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
yield $this->get_channel_difference_async($channel_id); $this->updaters[$channelId]->resumeDefer();
return false; return false;
case 'updateNewMessage': case 'updateNewMessage':
@ -389,104 +222,29 @@ trait UpdateHandler
} }
$this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE); $this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
if ($channel_id !== false && yield $this->peer_isset_async($this->to_supergroup($channel_id))) { if ($channelId !== false && yield $this->peer_isset_async($this->to_supergroup($channelId))) {
yield $this->get_channel_difference_async($channel_id); $this->updaters[$channelId]->resumeDefer();
} else { } else {
yield $this->get_updates_difference_async(); $this->updaters[false]->resumeDefer();
} }
return false; return false;
} }
break; break;
default: default:
if ($channel_id !== false && !yield $this->peer_isset_async($this->to_supergroup($channel_id))) { if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->logger->logger('Skipping update, I do not have the channel id '.$channel_id, \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return false; return false;
} }
break; break;
} }
if (isset($update['pts'])) {
$logger = function ($msg) use ($update, $cur_state, $channel_id) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
$this->logger->logger($update);
$double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-';
$mid = isset($update['message']['id']) ? $update['message']['id'] : '-';
$mypts = $cur_state->pts();
$this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: $channel_id", \danog\MadelineProto\Logger::ERROR);
};
if ($update['pts'] < $cur_state->pts()) {
$logger("PTS duplicate");
return false;
}
if ($cur_state->pts() + (isset($update['pts_count']) ? $update['pts_count'] : 0) !== $update['pts']) {
$logger("PTS hole");
if ($channel_id !== false && yield $this->peer_isset_async($this->to_supergroup($channel_id))) {
yield $this->get_channel_difference_async($channel_id);
} else {
yield $this->get_updates_difference_async();
}
return false;
}
if (isset($update['message']['id'], $update['message']['to_id']) && !in_array($update['_'], ['updateEditMessage', 'updateEditChannelMessage'])) {
if (!$this->check_msg_id($update['message'])) {
$logger("MSGID duplicate");
return false;
}
}
$logger("PTS OK");
//$this->logger->logger('Applying pts. my pts: '.$cur_state->pts().', remote pts: '.$update['pts'].', channel id: '.$channel_id, \danog\MadelineProto\Logger::VERBOSE);
$cur_state->pts($update['pts']);
if ($channel_id === false && isset($options['date'])) {
$cur_state->date($options['date']);
}
}
if ($channel_id === false && isset($options['seq']) || isset($options['seq_start'])) {
$seq = $options['seq'];
$seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq'];
if ($seq_start != $cur_state->seq() + 1 && $seq_start > $cur_state->seq()) {
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$cur_state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->get_updates_difference_async();
return false;
}
if ($cur_state->seq() !== $seq) {
$cur_state->seq($seq);
if (isset($options['date'])) {
$cur_state->date($options['date']);
}
}
}
yield $this->save_update_async($update);
}
public function handle_multiple_update_async($updates, $options = [], $channel = false)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
if ($channel === false) {
foreach ($updates as $update) {
yield $this->handle_update_async($update, $options);
}
} else {
foreach ($updates as $update) {
yield $this->handle_update_async($update);
}
}
} }
public function handle_update_messages_async($messages, $channel = false) public function handle_update_messages_async($messages, $channel = false)
{ {
if (!$this->settings['updates']['handle_updates']) {
return;
}
foreach ($messages as $message) { foreach ($messages as $message) {
yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]); yield $this->save_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]);
} }
} }
@ -555,7 +313,7 @@ trait UpdateHandler
} }
if ($update['qts'] > $cur_state->qts() + 1) { if ($update['qts'] > $cur_state->qts() + 1) {
$this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR); $this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR);
yield $this->get_updates_difference_async(); $this->updaters[$channelId]->resumeDefer();
return false; return false;
} }

View File

@ -53,7 +53,6 @@ trait AuthKeyHandler
$this->check_G($g_b, $dh_config['p']); $this->check_G($g_b, $dh_config['p']);
yield $this->method_call_async_read('messages.acceptEncryption', ['peer' => $params['id'], 'g_b' => $g_b->toBytes(), 'key_fingerprint' => $key['fingerprint']], ['datacenter' => $this->datacenter->curdc]); yield $this->method_call_async_read('messages.acceptEncryption', ['peer' => $params['id'], 'g_b' => $g_b->toBytes(), 'key_fingerprint' => $key['fingerprint']], ['datacenter' => $this->datacenter->curdc]);
yield $this->notify_layer_async($params['id']); yield $this->notify_layer_async($params['id']);
yield $this->handle_pending_updates_async();
$this->logger->logger('Secret chat '.$params['id'].' accepted successfully!', \danog\MadelineProto\Logger::NOTICE); $this->logger->logger('Secret chat '.$params['id'].' accepted successfully!', \danog\MadelineProto\Logger::NOTICE);
} }
@ -73,8 +72,7 @@ trait AuthKeyHandler
$this->check_G($g_a, $dh_config['p']); $this->check_G($g_a, $dh_config['p']);
$res = yield $this->method_call_async_read('messages.requestEncryption', ['user_id' => $user, 'g_a' => $g_a->toBytes()], ['datacenter' => $this->datacenter->curdc]); $res = yield $this->method_call_async_read('messages.requestEncryption', ['user_id' => $user, 'g_a' => $g_a->toBytes()], ['datacenter' => $this->datacenter->curdc]);
$this->temp_requested_secret_chats[$res['id']] = $a; $this->temp_requested_secret_chats[$res['id']] = $a;
yield $this->handle_pending_updates_async(); $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
$this->logger->logger('Secret chat '.$res['id'].' requested successfully!', \danog\MadelineProto\Logger::NOTICE); $this->logger->logger('Secret chat '.$res['id'].' requested successfully!', \danog\MadelineProto\Logger::NOTICE);
return $res['id']; return $res['id'];
@ -104,7 +102,6 @@ trait AuthKeyHandler
$key['visualization_46'] = substr(hash('sha256', $key['auth_key'], true), 20); $key['visualization_46'] = substr(hash('sha256', $key['auth_key'], true), 20);
$this->secret_chats[$params['id']] = ['key' => $key, 'admin' => true, 'user_id' => $params['participant_id'], 'InputEncryptedChat' => ['chat_id' => $params['id'], 'access_hash' => $params['access_hash'], '_' => 'inputEncryptedChat'], 'in_seq_no_x' => 0, 'out_seq_no_x' => 1, 'in_seq_no' => 0, 'out_seq_no' => 0, 'layer' => 8, 'ttl' => 0, 'ttr' => 100, 'updated' => time(), 'incoming' => [], 'outgoing' => [], 'created' => time(), 'rekeying' => [0], 'key_x' => 'to server', 'mtproto' => 1]; $this->secret_chats[$params['id']] = ['key' => $key, 'admin' => true, 'user_id' => $params['participant_id'], 'InputEncryptedChat' => ['chat_id' => $params['id'], 'access_hash' => $params['access_hash'], '_' => 'inputEncryptedChat'], 'in_seq_no_x' => 0, 'out_seq_no_x' => 1, 'in_seq_no' => 0, 'out_seq_no' => 0, 'layer' => 8, 'ttl' => 0, 'ttr' => 100, 'updated' => time(), 'incoming' => [], 'outgoing' => [], 'created' => time(), 'rekeying' => [0], 'key_x' => 'to server', 'mtproto' => 1];
yield $this->notify_layer_async($params['id']); yield $this->notify_layer_async($params['id']);
yield $this->handle_pending_updates_async();
$this->logger->logger('Secret chat '.$params['id'].' completed successfully!', \danog\MadelineProto\Logger::NOTICE); $this->logger->logger('Secret chat '.$params['id'].' completed successfully!', \danog\MadelineProto\Logger::NOTICE);
} }
@ -131,8 +128,7 @@ trait AuthKeyHandler
$this->temp_rekeyed_secret_chats[$e] = $a; $this->temp_rekeyed_secret_chats[$e] = $a;
$this->secret_chats[$chat]['rekeying'] = [1, $e]; $this->secret_chats[$chat]['rekeying'] = [1, $e];
yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionRequestKey', 'g_a' => $g_a->toBytes(), 'exchange_id' => $e]]], ['datacenter' => $this->datacenter->curdc]); yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionRequestKey', 'g_a' => $g_a->toBytes(), 'exchange_id' => $e]]], ['datacenter' => $this->datacenter->curdc]);
yield $this->handle_pending_updates_async(); $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
return $e; return $e;
} }
@ -167,8 +163,7 @@ trait AuthKeyHandler
$g_b = $dh_config['g']->powMod($b, $dh_config['p']); $g_b = $dh_config['g']->powMod($b, $dh_config['p']);
$this->check_G($g_b, $dh_config['p']); $this->check_G($g_b, $dh_config['p']);
yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionAcceptKey', 'g_b' => $g_b->toBytes(), 'exchange_id' => $params['exchange_id'], 'key_fingerprint' => $key['fingerprint']]]], ['datacenter' => $this->datacenter->curdc]); yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionAcceptKey', 'g_b' => $g_b->toBytes(), 'exchange_id' => $params['exchange_id'], 'key_fingerprint' => $key['fingerprint']]]], ['datacenter' => $this->datacenter->curdc]);
yield $this->handle_pending_updates_async(); $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
} }
public function commit_rekey_async($chat, $params) public function commit_rekey_async($chat, $params)
@ -198,8 +193,7 @@ trait AuthKeyHandler
$this->secret_chats[$chat]['key'] = $key; $this->secret_chats[$chat]['key'] = $key;
$this->secret_chats[$chat]['ttr'] = 100; $this->secret_chats[$chat]['ttr'] = 100;
$this->secret_chats[$chat]['updated'] = time(); $this->secret_chats[$chat]['updated'] = time();
yield $this->handle_pending_updates_async(); $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
} }
public function complete_rekey_async($chat, $params) public function complete_rekey_async($chat, $params)

View File

@ -67,8 +67,7 @@ trait AuthKeyHandler
$res = yield $this->method_call_async_read('phone.requestCall', ['user_id' => $user, 'g_a_hash' => hash('sha256', $g_a->toBytes(), true), 'protocol' => ['_' => 'phoneCallProtocol', 'udp_p2p' => true, 'udp_reflector' => true, 'min_layer' => 65, 'max_layer' => \danog\MadelineProto\VoIP::getConnectionMaxLayer()]], ['datacenter' => $this->datacenter->curdc]); $res = yield $this->method_call_async_read('phone.requestCall', ['user_id' => $user, 'g_a_hash' => hash('sha256', $g_a->toBytes(), true), 'protocol' => ['_' => 'phoneCallProtocol', 'udp_p2p' => true, 'udp_reflector' => true, 'min_layer' => 65, 'max_layer' => \danog\MadelineProto\VoIP::getConnectionMaxLayer()]], ['datacenter' => $this->datacenter->curdc]);
$controller->setCall($res['phone_call']); $controller->setCall($res['phone_call']);
$this->calls[$res['phone_call']['id']] = $controller; $this->calls[$res['phone_call']['id']] = $controller;
yield $this->handle_pending_updates_async(); yield $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
return $controller; return $controller;
} }
@ -113,8 +112,7 @@ trait AuthKeyHandler
throw $e; throw $e;
} }
$this->calls[$res['phone_call']['id']]->storage['b'] = $b; $this->calls[$res['phone_call']['id']]->storage['b'] = $b;
yield $this->handle_pending_updates_async(); yield $this->updaters[false]->resume();
yield $this->get_updates_difference_async();
return true; return true;
} }
@ -152,7 +150,6 @@ trait AuthKeyHandler
$this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration); $this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration);
$this->calls[$params['id']]->parseConfig(); $this->calls[$params['id']]->parseConfig();
$res = $this->calls[$params['id']]->startTheMagic(); $res = $this->calls[$params['id']]->startTheMagic();
yield $this->handle_pending_updates_async();
return $res; return $res;
} }

View File

@ -40,9 +40,7 @@ trait DialogHandler
$res = ['dialogs' => [0], 'count' => 1]; $res = ['dialogs' => [0], 'count' => 1];
$datacenter = $this->datacenter->curdc; $datacenter = $this->datacenter->curdc;
$dialogs = []; $dialogs = [];
$this->postpone_updates = true;
try {
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getting_dialogs']); $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getting_dialogs']);
while ($this->dialog_params['count'] < $res['count']) { while ($this->dialog_params['count'] < $res['count']) {
$res = yield $this->method_call_async_read('messages.getDialogs', $this->dialog_params, ['datacenter' => $datacenter, 'FloodWaitLimit' => 100]); $res = yield $this->method_call_async_read('messages.getDialogs', $this->dialog_params, ['datacenter' => $datacenter, 'FloodWaitLimit' => 100]);
@ -82,10 +80,6 @@ trait DialogHandler
break; break;
} }
} }
} finally {
$this->postpone_updates = false;
$this->callFork($this->handle_pending_updates_async());
}
return $dialogs; return $dialogs;
} }