Properly start update system

This commit is contained in:
Daniil Gentili 2019-06-01 22:04:37 +02:00
parent 0c40d6fb58
commit 5e6beafec6
12 changed files with 54 additions and 32 deletions

View File

@ -14,7 +14,7 @@
* get_full_dialogs
* new APIfactory
* sendmessage with secret messages
* 2fa+++++
Things to expect in the next releases:
Document async apis
optional max_id and min_id
@ -22,4 +22,4 @@ async iterators
Method name changes
#MadelineProtoForNode async
lua async
improved get_pwr_chat
improved get_pwr_chat

View File

@ -55,7 +55,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler
}
}
}
$settings = ['logger' => ['logger_level' => 5], 'connection_settings' => ['all' => ['protocol' => 'https']]];
$settings = ['logger' => ['logger_level' => 5]];
$MadelineProto = new \danog\MadelineProto\API('bot.madeline', $settings);

View File

@ -247,7 +247,7 @@ class CombinedAPI
}
if (!$instance->API->settings['updates']['handle_updates']) {
$instance->API->settings['updates']['handle_updates'] = true;
$instance->API->updaters[false]->start();
$instance->API->startUpdateSystem();
}
$instance->setCallback(function ($update) use ($path) {
return $this->event_update_handler($update, $path);

View File

@ -176,13 +176,16 @@ class MTProto extends AsyncConstruct implements TLCallback
}
// Connect to servers
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['inst_dc'], Logger::ULTRA_VERBOSE);
if (!($this->updates_state instanceof UpdatesState)) {
$this->updates_state = new UpdatesState($this->updates_state);
}
if (!($this->channels_state instanceof CombinedUpdatesState)) {
$this->channels_state = new CombinedUpdatesState($this->channels_state);
}
$this->channels_state->__construct([false => $this->updates_state]);
if (isset($this->updates_state)) {
if (!($this->updates_state instanceof UpdatesState)) {
$this->updates_state = new UpdatesState($this->updates_state);
}
$this->channels_state->__construct([false => $this->updates_state]);
unset($this->updates_state);
}
if (!isset($this->datacenter)) {
$this->datacenter = new DataCenter($this, $this->settings['connection'], $this->settings['connection_settings']);
}
@ -223,7 +226,7 @@ class MTProto extends AsyncConstruct implements TLCallback
public function __sleep()
{
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
}
public function isAltervista()
@ -243,6 +246,7 @@ class MTProto extends AsyncConstruct implements TLCallback
public function __wakeup()
{
$backtrace = debug_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT, 3);
$this->asyncInitPromise = true;
$this->setInitPromise($this->__wakeup_async($backtrace));
}
public function __wakeup_async($backtrace)
@ -293,13 +297,16 @@ class MTProto extends AsyncConstruct implements TLCallback
if ($this->authorized === true) {
$this->authorized = self::LOGGED_IN;
}
if (is_array($this->updates_state)) {
$this->updates_state = new UpdatesState($this->updates_state);
}
if (is_array($this->channels_state)) {
if (!($this->channels_state instanceof CombinedUpdatesState)) {
$this->channels_state = new CombinedUpdatesState($this->channels_state);
}
$this->channels_state->__construct([false => $this->updates_state]);
if (isset($this->updates_state)) {
if (!($this->updates_state instanceof UpdatesState)) {
$this->updates_state = new UpdatesState($this->updates_state);
}
$this->channels_state->__construct([false => $this->updates_state]);
unset($this->updates_state);
}
$this->postpone_updates = false;
if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) {
@ -397,8 +404,7 @@ class MTProto extends AsyncConstruct implements TLCallback
}
if (!$this->settings['updates']['handle_old_updates']) {
$this->channels_state = new CombinedUpdatesState();
$this->channels_state->__construct([false => $this->updates_state]);
$this->channels_state = new CombinedUpdatesState([false => new UpdatesState()]);
$this->got_state = false;
}
yield $this->connect_to_all_dcs_async();
@ -424,7 +430,7 @@ class MTProto extends AsyncConstruct implements TLCallback
if ($this->authorized === self::LOGGED_IN && !$this->authorization['user']['bot'] && $this->settings['peer']['cache_all_peers_on_startup']) {
yield $this->get_dialogs_async($force);
}
if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state->syncLoading()) {
if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates']) {
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE);
yield $this->updaters[false]->resume();
}
@ -879,6 +885,8 @@ class MTProto extends AsyncConstruct implements TLCallback
}
public function startUpdateSystem()
{
if ($this->asyncInitPromise) return;
if (!isset($this->seqUpdater)) {
$this->seqUpdater = new SeqLoop($this);
}
@ -890,10 +898,16 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
if ($this->feeders[$channelId]->start()) {
$this->feeders[$channelId]->resume();
}
if ($this->updaters[$channelId]->start()) {
$this->updaters[$channelId]->resume();
}
}
if ($this->seqUpdater->start()) {
$this->seqUpdater->resume();
}
$this->seqUpdater->start();
}
public function get_phone_config_async($watcherId = null)
{

View File

@ -696,6 +696,7 @@ trait PeerHandler
$gres = yield $this->method_call_async_read('channels.getParticipants', ['channel' => $channel, 'filter' => ['_' => $filter, 'q' => $q], 'offset' => $offset, 'limit' => $limit, 'hash' => $hash = $this->get_participants_hash($channel, $filter, $q, $offset, $limit)], ['datacenter' => $this->datacenter->curdc, 'heavy' => true]);
} catch (\danog\MadelineProto\RPCErrorException $e) {
if ($e->rpc === 'CHAT_ADMIN_REQUIRED') {
$this->logger->logger($e->rpc);
return $has_more;
} else {
throw $e;

View File

@ -29,7 +29,6 @@ use Amp\Loop;
*/
trait UpdateHandler
{
private $updates_state;
private $got_state = false;
private $channels_state;
public $updates = [];
@ -60,7 +59,7 @@ trait UpdateHandler
{
if (!$this->settings['updates']['handle_updates']) {
$this->settings['updates']['handle_updates'] = true;
$this->updaters[false]->start();
$this->startUpdateSystem();
}
if (!$this->settings['updates']['run_callback']) {
$this->settings['updates']['run_callback'] = true;
@ -148,10 +147,10 @@ trait UpdateHandler
{
if (!$this->got_state) {
$this->got_state = true;
$this->updates_state->update(yield $this->get_updates_state_async());
$this->channels_state->get(false, yield $this->get_updates_state_async());
}
return $this->updates_state;
return $this->channels_state->get(false);
}
public function loadChannelState($channelId = null, $init = [])
{

View File

@ -29,7 +29,6 @@ trait Callback
$this->settings['updates']['callback'] = $callback;
$this->settings['updates']['run_callback'] = true;
$this->settings['updates']['handle_updates'] = true;
return $this->updaters[false];
$this->startUpdateSystem();
}
}

View File

@ -62,8 +62,8 @@ trait Events
$this->settings['updates']['handle_updates'] = true;
$this->settings['updates']['run_callback'] = true;
if (isset($this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater) && !$this->asyncInitPromise) {
$this->updaters[false]->start();
if (!$this->asyncInitPromise) {
$this->startUpdateSystem();
}
}

View File

@ -44,6 +44,7 @@ trait Login
$this->tos = ['expires' => 0, 'accepted' => true];
yield $this->method_call_async_read('auth.logOut', [], ['datacenter' => $this->datacenter->curdc]);
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['logout_ok'], \danog\MadelineProto\Logger::NOTICE);
$this->startUpdateSystem();
return true;
}
@ -62,6 +63,8 @@ trait Login
$this->updates = [];
$this->updates_key = 0;
yield $this->init_authorization_async();
$this->startUpdateSystem();
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['login_ok'], \danog\MadelineProto\Logger::NOTICE);
return $this->authorization;
@ -120,6 +123,7 @@ trait Login
$this->datacenter->sockets[$this->datacenter->curdc]->authorized = true;
yield $this->init_authorization_async();
yield $this->get_phone_config_async();
$this->startUpdateSystem();
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['login_ok'], \danog\MadelineProto\Logger::NOTICE);
@ -152,7 +156,10 @@ trait Login
yield $this->init_authorization_async();
yield $this->get_phone_config_async();
return yield $this->get_self_async();
$res = yield $this->get_self_async();
$this->startUpdateSystem();
return $res;
}
public function export_authorization_async()
@ -180,6 +187,7 @@ trait Login
yield $this->get_phone_config_async();
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['signup_ok'], \danog\MadelineProto\Logger::NOTICE);
$this->startUpdateSystem();
return $this->authorization;
}
@ -199,6 +207,7 @@ trait Login
yield $this->init_authorization_async();
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['login_ok'], \danog\MadelineProto\Logger::NOTICE);
yield $this->get_phone_config_async();
$this->startUpdateSystem();
return $this->authorization;
}

View File

@ -99,7 +99,7 @@ trait Loop
if (!$this->settings['updates']['run_callback']) {
$this->settings['updates']['run_callback'] = true;
}
$this->updaters[false]->start();
$this->startUpdateSystem();
$this->logger->logger('Started update loop', \danog\MadelineProto\Logger::NOTICE);

View File

@ -26,7 +26,7 @@ trait Noop
$this->settings['updates']['callback'] = [$this, 'noop'];
$this->settings['updates']['run_callback'] = false;
$this->settings['updates']['handle_updates'] = true;
$this->updaters[false]->start();
$this->startUpdateSystem();
}
public function noop()

View File

@ -31,6 +31,6 @@ trait Webhook
$this->settings['updates']['callback'] = [$this, 'pwr_webhook'];
$this->settings['updates']['run_callback'] = true;
$this->settings['updates']['handle_updates'] = true;
$this->updaters[false]->start();
$this->startUpdateSystem();
}
}