Asyncify update handling
This commit is contained in:
parent
240910475f
commit
7ca89ce869
38
asyncify.php
Normal file
38
asyncify.php
Normal file
@ -0,0 +1,38 @@
|
||||
<?php
|
||||
$not_subbing = [];
|
||||
|
||||
foreach (explode("\n", shell_exec("find src -type f -name '*.php'")) as $file) {
|
||||
if (!$file) continue;
|
||||
if (in_array(basename($file, '.php'), ['APIFactory', 'API', 'Connection', 'Coroutine', 'ReferenceDatabase', 'ProxySocketPool'])) continue;
|
||||
if (strpos($file, 'Loop/')) continue;
|
||||
if (strpos($file, 'Stream/')) continue;
|
||||
if (strpos($file, 'Server/')) continue;
|
||||
if (strpos($file, 'Async/')) continue;
|
||||
$to_sub = [];
|
||||
$last_match = null;
|
||||
foreach (explode("\n", $filec = file_get_contents($file)) as $number => $line) {
|
||||
if (preg_match("/public function (\w*)[(]/", $line, $matches)) {
|
||||
$last_match = stripos($matches[1], 'async') === false ? $matches[1] : null;
|
||||
}
|
||||
if (preg_match("/function [(]/", $line) && stripos($line, 'public function') === false) {
|
||||
$last_match = 0;
|
||||
}
|
||||
if (strpos($line, "yield") !== false) {
|
||||
if ($last_match) {
|
||||
echo ("subbing $last_match for $line at $number in $file".PHP_EOL);
|
||||
$to_sub []= $last_match;
|
||||
} else if ($last_match === 0) {
|
||||
echo ("============\nNOT SUBBING $last_match for $line at $number in $file\n============".PHP_EOL);
|
||||
$not_subbing[$file] = $file;
|
||||
}
|
||||
}
|
||||
}
|
||||
$input = [];
|
||||
$output = [];
|
||||
foreach ($to_sub as $func) {
|
||||
$input []= "public function $func(";
|
||||
$output []= "public function $func"."_async(";
|
||||
}
|
||||
if ($input) file_put_contents($file, str_replace($input, $output, $filec));
|
||||
}
|
||||
var_dump(array_values($not_subbing));
|
@ -29,6 +29,8 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
|
||||
*/
|
||||
class UpdateLoop extends ResumableSignalLoop
|
||||
{
|
||||
use \danog\MadelineProto\Tools;
|
||||
|
||||
public function loop(): \Generator
|
||||
{
|
||||
$API = $this->API;
|
||||
@ -55,7 +57,7 @@ class UpdateLoop extends ResumableSignalLoop
|
||||
}
|
||||
}
|
||||
if (time() - $API->last_getdifference > $timeout) {
|
||||
if (!yield $API->get_updates_difference_async()) {
|
||||
if (!yield $this->call($API->get_updates_difference_async())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -21,9 +21,11 @@ namespace danog\MadelineProto;
|
||||
|
||||
use Amp\Loop;
|
||||
use danog\MadelineProto\MTProtoTools\ReferenceDatabase;
|
||||
use danog\MadelineProto\MTProtoTools\UpdatesState;
|
||||
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
|
||||
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
|
||||
use danog\MadelineProto\TL\TLCallback;
|
||||
use danog\MadelineProto\MTProtoTools\CombinedUpdatesState;
|
||||
|
||||
/**
|
||||
* Manages all of the mtproto stuff.
|
||||
@ -173,6 +175,12 @@ class MTProto implements TLCallback
|
||||
}
|
||||
// Connect to servers
|
||||
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['inst_dc'], Logger::ULTRA_VERBOSE);
|
||||
if (!($this->updates_state instanceof UpdatesState)) {
|
||||
$this->updates_state = new UpdatesState($this->updates_state);
|
||||
}
|
||||
if (!($this->channels_state instanceof CombinedUpdatesState)) {
|
||||
$this->channels_state = new CombinedUpdatesState($this->channels_state);
|
||||
}
|
||||
if (!isset($this->datacenter)) {
|
||||
$this->datacenter = new DataCenter($this, $this->settings['connection'], $this->settings['connection_settings']);
|
||||
}
|
||||
@ -280,10 +288,13 @@ class MTProto implements TLCallback
|
||||
if ($this->authorized === true) {
|
||||
$this->authorized = self::LOGGED_IN;
|
||||
}
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
foreach ($this->channels_state as $key => $state) {
|
||||
$this->channels_state[$key]['sync_loading'] = false;
|
||||
if (is_array($this->updates_state)) {
|
||||
$this->updates_state = new UpdatesState($this->updates_state);
|
||||
}
|
||||
if (is_array($this->channels_state)) {
|
||||
$this->channels_state = new CombinedUpdatesState($this->channels_state);
|
||||
}
|
||||
|
||||
$this->postpone_updates = false;
|
||||
if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) {
|
||||
$this->setEventHandler($this->event_handler);
|
||||
@ -401,7 +412,7 @@ class MTProto implements TLCallback
|
||||
if ($this->authorized === self::LOGGED_IN && !$this->authorization['user']['bot'] && $this->settings['peer']['cache_all_peers_on_startup']) {
|
||||
yield $this->get_dialogs_async($force);
|
||||
}
|
||||
if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state['sync_loading']) {
|
||||
if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state->syncLoading()) {
|
||||
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE);
|
||||
yield $this->get_updates_difference_async();
|
||||
}
|
||||
|
@ -461,12 +461,12 @@ trait AuthKeyHandler
|
||||
|
||||
public function get_dh_config_async()
|
||||
{
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
|
||||
try {
|
||||
$dh_config = yield $this->method_call_async_read('messages.getDhConfig', ['version' => $this->dh_config['version'], 'random_length' => 0], ['datacenter' => $this->datacenter->curdc]);
|
||||
} finally {
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
}
|
||||
if ($dh_config['_'] === 'messages.dhConfigNotModified') {
|
||||
$this->logger->logger(\danog\MadelineProto\Logger::VERBOSE, ['DH configuration not modified']);
|
||||
@ -568,7 +568,7 @@ trait AuthKeyHandler
|
||||
$initing = $this->initing_authorization;
|
||||
|
||||
$this->initing_authorization = true;
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
$this->postpone_updates = true;
|
||||
|
||||
try {
|
||||
@ -590,7 +590,10 @@ trait AuthKeyHandler
|
||||
return $this->init_authorization_socket_async($id, $socket);
|
||||
};
|
||||
}
|
||||
if ($dcs) yield array_shift($dcs)();
|
||||
if ($dcs) {
|
||||
yield array_shift($dcs)();
|
||||
}
|
||||
|
||||
foreach ($dcs as $id => &$dc) {
|
||||
$dc = $dc();
|
||||
}
|
||||
@ -609,7 +612,7 @@ trait AuthKeyHandler
|
||||
$this->pending_auth = false;
|
||||
$this->postpone_updates = false;
|
||||
$this->initing_authorization = $initing;
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
yield $this->handle_pending_updates_async();
|
||||
}
|
||||
}
|
||||
@ -627,7 +630,6 @@ trait AuthKeyHandler
|
||||
$cdn = strpos($id, 'cdn');
|
||||
$media = strpos($id, 'media');
|
||||
|
||||
|
||||
if ($socket->temp_auth_key === null || $socket->auth_key === null) {
|
||||
$dc_config_number = isset($this->settings['connection_settings'][$id]) ? $id : 'all';
|
||||
if ($socket->auth_key === null && !$cdn && !$media) {
|
||||
|
@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* CombinedUpdatesState class.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
*
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
/**
|
||||
* Stores multiple states
|
||||
*/
|
||||
class CombinedUpdatesState
|
||||
{
|
||||
private $states = [];
|
||||
public function __construct($init)
|
||||
{
|
||||
if (!is_array($init)) {
|
||||
return;
|
||||
}
|
||||
foreach ($init as $channel => $state) {
|
||||
if (is_array($state)) {
|
||||
$state = new UpdatesState($state, $channel);
|
||||
}
|
||||
$this->states[$channel] = $state;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Update multiple parameters
|
||||
*
|
||||
* @param array $init
|
||||
* @param integer $channel
|
||||
* @return UpdatesState
|
||||
*/
|
||||
public function get($channel, $init = [])
|
||||
{
|
||||
if (!isset($this->states[$channel])) {
|
||||
return $this->states[$channel] = new UpdatesState($init, $channel);
|
||||
}
|
||||
return $this->states[$channel]->update($init);
|
||||
}
|
||||
/**
|
||||
* Remove update state
|
||||
*
|
||||
* @param integer $channel
|
||||
* @return void
|
||||
*/
|
||||
public function remove($channel)
|
||||
{
|
||||
if (isset($this->states[$channel])) {
|
||||
unset($this->states[$channel]);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Check if update state is present
|
||||
*
|
||||
* @param integer $channel
|
||||
* @return void
|
||||
*/
|
||||
public function has($channel)
|
||||
{
|
||||
return isset($this->states[$channel]);
|
||||
}
|
||||
/**
|
||||
* Are we currently busy?
|
||||
*
|
||||
* @param integer $channel
|
||||
* @param boolean|null $set
|
||||
* @return boolean
|
||||
*/
|
||||
public function syncLoading($channel, $set = null)
|
||||
{
|
||||
return $this->get($channel)->syncLoading($set);
|
||||
}
|
||||
}
|
@ -113,7 +113,7 @@ trait ResponseHandler
|
||||
|
||||
// Acknowledge that I received the server's response
|
||||
if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) {
|
||||
Loop::defer([$this, 'get_updates_difference']);
|
||||
Loop::defer(function () { $this->call($this->get_updates_difference_async()); });
|
||||
}
|
||||
|
||||
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
|
||||
@ -256,7 +256,7 @@ trait ResponseHandler
|
||||
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
|
||||
|
||||
if (strpos($datacenter, 'cdn') === false) {
|
||||
Loop::defer([$this, 'handle_updates'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
|
||||
Loop::defer(function ($updates) { $this->call($this->handle_updates($updates)); }, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
|
||||
}
|
||||
|
||||
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
|
||||
|
@ -29,9 +29,9 @@ use function Amp\Promise\any;
|
||||
trait UpdateHandler
|
||||
{
|
||||
private $pending_updates = [];
|
||||
private $updates_state = ['_' => 'MadelineProto.Updates_state', 'seq' => 0, 'pts' => 0, 'date' => 0, 'qts' => 0];
|
||||
private $updates_state;
|
||||
private $got_state = false;
|
||||
private $channels_state = [];
|
||||
private $channels_state;
|
||||
public $updates = [];
|
||||
public $updates_key = 0;
|
||||
public $last_getdifference = 0;
|
||||
@ -101,22 +101,6 @@ trait UpdateHandler
|
||||
return $updates;
|
||||
}
|
||||
|
||||
public function &load_channel_state($channel, $pts = 1)
|
||||
{
|
||||
if (!isset($this->channels_state[$channel])) {
|
||||
$this->channels_state[$channel] = ['pts' => $pts, 'sync_loading' => false];
|
||||
}
|
||||
|
||||
return $this->channels_state[$channel];
|
||||
}
|
||||
|
||||
public function set_channel_state($channel, $data)
|
||||
{
|
||||
if (isset($data['pts']) && $data['pts'] !== 0) {
|
||||
$this->load_channel_state($channel)['pts'] = $data['pts'];
|
||||
}
|
||||
}
|
||||
|
||||
public function check_msg_id($message)
|
||||
{
|
||||
try {
|
||||
@ -141,12 +125,12 @@ trait UpdateHandler
|
||||
if (!$this->settings['updates']['handle_updates']) {
|
||||
return;
|
||||
}
|
||||
if ($this->load_channel_state($channel)['sync_loading']) {
|
||||
if ($this->channels_state->syncLoading($channel)) {
|
||||
$this->logger->logger('Not fetching '.$channel.' difference, I am already fetching it');
|
||||
|
||||
return;
|
||||
}
|
||||
$this->load_channel_state($channel)['sync_loading'] = true;
|
||||
$this->channels_state->syncLoading($channel, true);
|
||||
$this->postpone_updates = true;
|
||||
|
||||
try {
|
||||
@ -161,14 +145,14 @@ trait UpdateHandler
|
||||
return false;
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->load_channel_state($channel)['sync_loading'] = false;
|
||||
$this->channels_state->syncLoading($channel, false);
|
||||
}
|
||||
$this->logger->logger('Fetching '.$channel.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
|
||||
$this->load_channel_state($channel)['sync_loading'] = true;
|
||||
$this->channels_state->syncLoading($channel, true);
|
||||
$this->postpone_updates = true;
|
||||
|
||||
try {
|
||||
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => $input, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $this->load_channel_state($channel)['pts'], 'limit' => 30], ['datacenter' => $this->datacenter->curdc]);
|
||||
$difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => $input, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $this->channels_state->get($channel)->pts(), 'limit' => 30], ['datacenter' => $this->datacenter->curdc]);
|
||||
} catch (\danog\MadelineProto\RPCErrorException $e) {
|
||||
if ($e->getMessage() === "You haven't joined this channel/supergroup") {
|
||||
return false;
|
||||
@ -177,30 +161,30 @@ trait UpdateHandler
|
||||
throw $e;
|
||||
} catch (\danog\MadelineProto\PTSException $e) {
|
||||
$this->logger->logger($e->getMessage());
|
||||
unset($this->channels_state[$channel]);
|
||||
$this->channels_state->remove($channel);
|
||||
|
||||
return false; //yield $this->get_channel_difference_async($channel);
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->load_channel_state($channel)['sync_loading'] = false;
|
||||
$this->channels_state->syncLoading($channel, false);
|
||||
}
|
||||
unset($input);
|
||||
|
||||
switch ($difference['_']) {
|
||||
case 'updates.channelDifferenceEmpty':
|
||||
$this->set_channel_state($channel, $difference);
|
||||
$this->channels_state->get($channel, $difference);
|
||||
break;
|
||||
case 'updates.channelDifference':
|
||||
$this->load_channel_state($channel)['sync_loading'] = true;
|
||||
$this->channels_state->syncLoading($channel, true);
|
||||
$this->postpone_updates = true;
|
||||
|
||||
try {
|
||||
$this->set_channel_state($channel, $difference);
|
||||
$this->channels_state->get($channel, $difference);
|
||||
yield $this->handle_update_messages_async($difference['new_messages'], $channel);
|
||||
yield $this->handle_multiple_update_async($difference['other_updates'], [], $channel);
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->load_channel_state($channel)['sync_loading'] = false;
|
||||
$this->channels_state->syncLoading($channel, false);
|
||||
}
|
||||
if (!$difference['final']) {
|
||||
unset($difference);
|
||||
@ -209,16 +193,16 @@ trait UpdateHandler
|
||||
break;
|
||||
case 'updates.channelDifferenceTooLong':
|
||||
$this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
|
||||
$this->load_channel_state($channel)['sync_loading'] = true;
|
||||
$this->channels_state->syncLoading($channel, true);
|
||||
$this->postpone_updates = true;
|
||||
|
||||
try {
|
||||
$this->set_channel_state($channel, $difference);
|
||||
$this->channels_state->get($channel, $difference);
|
||||
yield $this->handle_update_messages_async($difference['messages'], $channel);
|
||||
unset($difference);
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->load_channel_state($channel)['sync_loading'] = false;
|
||||
$this->channels_state->syncLoading($channel, false);
|
||||
}
|
||||
yield $this->get_channel_difference_async($channel);
|
||||
break;
|
||||
@ -229,40 +213,11 @@ trait UpdateHandler
|
||||
yield $this->handle_pending_updates_async();
|
||||
}
|
||||
|
||||
public function set_update_state_async($data)
|
||||
{
|
||||
if (isset($data['pts']) && $data['pts'] !== 0) {
|
||||
(yield $this->load_update_state_async())['pts'] = $data['pts'];
|
||||
}
|
||||
if (isset($data['qts']) && $data['qts'] !== 0) {
|
||||
(yield $this->load_update_state_async())['qts'] = $data['qts'];
|
||||
}
|
||||
if (isset($data['seq']) && $data['seq'] !== 0) {
|
||||
(yield $this->load_update_state_async())['seq'] = $data['seq'];
|
||||
}
|
||||
if (isset($data['date']) && $data['date'] > (yield $this->load_update_state_async())['date']) {
|
||||
(yield $this->load_update_state_async())['date'] = $data['date'];
|
||||
}
|
||||
}
|
||||
public function reset_update_state_async()
|
||||
{
|
||||
(yield $this->load_update_state_async())['pts'] = 1;
|
||||
(yield $this->load_update_state_async())['qts'] = 0;
|
||||
(yield $this->load_update_state_async())['seq'] = 0;
|
||||
(yield $this->load_update_state_async())['date'] = 1;
|
||||
foreach ($this->channels_state as &$state) {
|
||||
$state['pts'] = 1;
|
||||
}
|
||||
$this->msg_ids = [];
|
||||
}
|
||||
public function load_update_state_async()
|
||||
{
|
||||
if (!isset($this->updates_state['qts'])) {
|
||||
$this->updates_state['qts'] = 0;
|
||||
}
|
||||
if (!$this->got_state) {
|
||||
$this->got_state = true;
|
||||
yield $this->set_update_state_async(yield $this->get_updates_state_async());
|
||||
$this->updates_state->update(yield $this->get_updates_state_async());
|
||||
}
|
||||
|
||||
return $this->updates_state;
|
||||
@ -273,52 +228,53 @@ trait UpdateHandler
|
||||
if (!$this->settings['updates']['handle_updates']) {
|
||||
return;
|
||||
}
|
||||
if ($this->updates_state['sync_loading']) {
|
||||
if ($this->updates_state->syncLoading()) {
|
||||
$this->logger->logger('Not fetching normal difference, I am already fetching it');
|
||||
|
||||
return false;
|
||||
}
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
$this->postpone_updates = true;
|
||||
$this->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
|
||||
while (!isset($difference)) {
|
||||
try {
|
||||
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => (yield $this->load_update_state_async())['pts'], 'date' => (yield $this->load_update_state_async())['date'], 'qts' => (yield $this->load_update_state_async())['qts']], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
|
||||
$state = yield $this->load_update_state_async();
|
||||
$difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
|
||||
} catch (\danog\MadelineProto\PTSException $e) {
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
$this->got_state = false;
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
}
|
||||
}
|
||||
$this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
|
||||
$this->postpone_updates = true;
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
$this->last_getdifference = time();
|
||||
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->resume();
|
||||
|
||||
try {
|
||||
switch ($difference['_']) {
|
||||
case 'updates.differenceEmpty':
|
||||
yield $this->set_update_state_async($difference);
|
||||
$this->updates_state->update($difference);
|
||||
break;
|
||||
case 'updates.difference':
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
yield $this->handle_multiple_update_async($difference['other_updates']);
|
||||
foreach ($difference['new_encrypted_messages'] as $encrypted) {
|
||||
yield $this->handle_encrypted_update_async(['_' => 'updateNewEncryptedMessage', 'message' => $encrypted], true);
|
||||
}
|
||||
yield $this->handle_update_messages_async($difference['new_messages']);
|
||||
yield $this->set_update_state_async($difference['state']);
|
||||
$this->updates_state->update($difference['state']);
|
||||
break;
|
||||
case 'updates.differenceSlice':
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$this->updates_state->syncLoading(true);
|
||||
yield $this->handle_multiple_update_async($difference['other_updates']);
|
||||
yield $this->handle_update_messages_async($difference['new_messages']);
|
||||
yield $this->set_update_state_async($difference['intermediate_state']);
|
||||
$this->updates_state->update($difference['intermediate_state']);
|
||||
unset($difference);
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
yield $this->get_updates_difference_async();
|
||||
break;
|
||||
default:
|
||||
@ -327,7 +283,7 @@ trait UpdateHandler
|
||||
}
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
$this->updates_state->syncLoading(false);
|
||||
}
|
||||
yield $this->handle_pending_updates_async();
|
||||
|
||||
@ -343,14 +299,14 @@ trait UpdateHandler
|
||||
|
||||
public function get_updates_state_async()
|
||||
{
|
||||
$last = $this->updates_state['sync_loading'];
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$last = $this->updates_state->syncLoading();
|
||||
$this->updates_state->syncLoading(true);
|
||||
|
||||
try {
|
||||
$data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
|
||||
yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']);
|
||||
} finally {
|
||||
$this->updates_state['sync_loading'] = $last;
|
||||
$this->updates_state->syncLoading($last);
|
||||
}
|
||||
|
||||
return $data;
|
||||
@ -379,7 +335,7 @@ trait UpdateHandler
|
||||
case 'updateChannelTooLong':
|
||||
$channel_id = $update['channel_id'];
|
||||
$this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
|
||||
if (!isset($this->channels_state[$channel_id]) && !isset($update['pts'])) {
|
||||
if (!$this->channels_state->has($channel_id) && !isset($update['pts'])) {
|
||||
$this->logger->logger('I do not have the channel in the states and the pts is not set.', \danog\MadelineProto\Logger::ERROR);
|
||||
|
||||
return;
|
||||
@ -390,7 +346,7 @@ trait UpdateHandler
|
||||
if ($channel_id === false) {
|
||||
$cur_state = yield $this->load_update_state_async();
|
||||
} else {
|
||||
$cur_state = &$this->load_channel_state($channel_id, (isset($update['pts']) ? $update['pts'] : 0) - (isset($update['pts_count']) ? $update['pts_count'] : 0));
|
||||
$cur_state = $this->channels_state->get($channel_id, $update);
|
||||
}
|
||||
/*
|
||||
if ($cur_state['sync_loading'] && in_array($update['_'], ['updateNewMessage', 'updateEditMessage', 'updateNewChannelMessage', 'updateEditChannelMessage'])) {
|
||||
@ -519,7 +475,7 @@ trait UpdateHandler
|
||||
return;
|
||||
}
|
||||
foreach ($messages as $message) {
|
||||
yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel === false ? (yield $this->load_update_state_async())['pts'] : $this->load_channel_state($channel)['pts'], 'pts_count' => 0]);
|
||||
yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel === false ? (yield $this->load_update_state_async())->pts() : $this->channels_state->get($channel), 'pts_count' => 0]);
|
||||
}
|
||||
}
|
||||
|
||||
@ -652,42 +608,44 @@ trait UpdateHandler
|
||||
}
|
||||
}
|
||||
|
||||
public function pwr_webhook_async($update)
|
||||
public function pwr_webhook($update)
|
||||
{
|
||||
$payload = json_encode($update);
|
||||
//$this->logger->logger($update, $payload, json_last_error());
|
||||
if ($payload === '') {
|
||||
$this->logger->logger('EMPTY UPDATE');
|
||||
$this->call((function () use ($update) {
|
||||
$payload = json_encode($update);
|
||||
//$this->logger->logger($update, $payload, json_last_error());
|
||||
if ($payload === '') {
|
||||
$this->logger->logger('EMPTY UPDATE');
|
||||
|
||||
return false;
|
||||
}
|
||||
$ch = curl_init();
|
||||
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
||||
curl_setopt($ch, CURLOPT_URL, $this->hook_url);
|
||||
curl_setopt($ch, CURLOPT_POST, true);
|
||||
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
|
||||
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
|
||||
$parse = parse_url($this->hook_url);
|
||||
if (isset($parse['scheme']) && $parse['scheme'] == 'https') {
|
||||
if (isset($this->pem_path) && file_exists($this->pem_path)) {
|
||||
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, true);
|
||||
curl_setopt($ch, CURLOPT_CAINFO, $this->pem_path);
|
||||
} else {
|
||||
//curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
$result = curl_exec($ch);
|
||||
curl_close($ch);
|
||||
$this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE);
|
||||
$result = json_decode($result, true);
|
||||
if (is_array($result) && isset($result['method']) && $result['method'] != '' && is_string($result['method'])) {
|
||||
try {
|
||||
$this->logger->logger('Reverse webhook command returned', yield $this->method_call_async_read($result['method'], $result, ['datacenter' => $this->datacenter->curdc]));
|
||||
} catch (\danog\MadelineProto\Exception $e) {
|
||||
} catch (\danog\MadelineProto\TL\Exception $e) {
|
||||
} catch (\danog\MadelineProto\RPCErrorException $e) {
|
||||
} catch (\danog\MadelineProto\SecurityException $e) {
|
||||
$ch = curl_init();
|
||||
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
||||
curl_setopt($ch, CURLOPT_URL, $this->hook_url);
|
||||
curl_setopt($ch, CURLOPT_POST, true);
|
||||
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
|
||||
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
|
||||
$parse = parse_url($this->hook_url);
|
||||
if (isset($parse['scheme']) && $parse['scheme'] == 'https') {
|
||||
if (isset($this->pem_path) && file_exists($this->pem_path)) {
|
||||
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, true);
|
||||
curl_setopt($ch, CURLOPT_CAINFO, $this->pem_path);
|
||||
} else {
|
||||
//curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
$result = curl_exec($ch);
|
||||
curl_close($ch);
|
||||
$this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE);
|
||||
$result = json_decode($result, true);
|
||||
if (is_array($result) && isset($result['method']) && $result['method'] != '' && is_string($result['method'])) {
|
||||
try {
|
||||
$this->logger->logger('Reverse webhook command returned', yield $this->method_call_async_read($result['method'], $result, ['datacenter' => $this->datacenter->curdc]));
|
||||
} catch (\danog\MadelineProto\Exception $e) {
|
||||
} catch (\danog\MadelineProto\TL\Exception $e) {
|
||||
} catch (\danog\MadelineProto\RPCErrorException $e) {
|
||||
} catch (\danog\MadelineProto\SecurityException $e) {
|
||||
}
|
||||
}
|
||||
})());
|
||||
}
|
||||
}
|
||||
|
185
src/danog/MadelineProto/MTProtoTools/UpdatesState.php
Normal file
185
src/danog/MadelineProto/MTProtoTools/UpdatesState.php
Normal file
@ -0,0 +1,185 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* UpdatesState class.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
*
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
/**
|
||||
* Stores the state of updates
|
||||
*/
|
||||
class UpdatesState
|
||||
{
|
||||
/**
|
||||
* PTS
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $pts = 1;
|
||||
/**
|
||||
* QTS
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $qts = 0;
|
||||
/**
|
||||
* Seq
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $seq = 0;
|
||||
/**
|
||||
* Date
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $date = 1;
|
||||
|
||||
/**
|
||||
* Channel ID
|
||||
*
|
||||
* @var int|bool
|
||||
*/
|
||||
private $channelId;
|
||||
|
||||
/**
|
||||
* Is busy?
|
||||
*
|
||||
* @var boolean
|
||||
*/
|
||||
private $syncLoading = false;
|
||||
|
||||
/**
|
||||
* Init function
|
||||
*
|
||||
* @param array $init Initial parameters
|
||||
* @param boolean $channelId Channel ID
|
||||
*/
|
||||
public function __construct($init = [], $channelId = false)
|
||||
{
|
||||
$this->channelId = $channelId;
|
||||
$this->update($init);
|
||||
}
|
||||
/**
|
||||
* Sleep function
|
||||
*
|
||||
* @return array Parameters to serialize
|
||||
*/
|
||||
public function __sleep()
|
||||
{
|
||||
return $this->channelId ? ['pts', 'channelId'] : ['pts', 'qts', 'seq', 'date', 'channelId'];
|
||||
}
|
||||
/**
|
||||
* Is this state relative to a channel?
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public function isChannel()
|
||||
{
|
||||
return (bool) $this->channelId;
|
||||
}
|
||||
/**
|
||||
* Get the channel ID
|
||||
*
|
||||
* @return int|null
|
||||
*/
|
||||
public function getChannel()
|
||||
{
|
||||
return $this->channelId;
|
||||
}
|
||||
/**
|
||||
* Are we currently busy?
|
||||
*
|
||||
* @param boolean|null $set
|
||||
* @return boolean
|
||||
*/
|
||||
public function syncLoading($set = null)
|
||||
{
|
||||
if ($set !== null) {
|
||||
$this->syncLoading = $set;
|
||||
}
|
||||
return $this->syncLoading;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update multiple parameters
|
||||
*
|
||||
* @param array $init
|
||||
* @return self
|
||||
*/
|
||||
public function update($init)
|
||||
{
|
||||
foreach ($this->channelId ? ['pts'] : ['pts', 'qts', 'seq', 'date'] as $param) {
|
||||
if (isset($init[$param])) {
|
||||
$this->{$param}($init[$param]);
|
||||
}
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
/**
|
||||
* Get/set PTS
|
||||
*
|
||||
* @param integer $set
|
||||
* @return integer
|
||||
*/
|
||||
public function pts($set = 0)
|
||||
{
|
||||
if ($set !== 0) {
|
||||
$this->pts = $set;
|
||||
}
|
||||
return $this->pts;
|
||||
}
|
||||
/**
|
||||
* Get/set QTS
|
||||
*
|
||||
* @param integer $set
|
||||
* @return integer
|
||||
*/
|
||||
public function qts($set = 0)
|
||||
{
|
||||
if ($set !== 0) {
|
||||
$this->qts = $set;
|
||||
}
|
||||
return $this->qts;
|
||||
}
|
||||
/**
|
||||
* Get/set seq
|
||||
*
|
||||
* @param integer $set
|
||||
* @return integer
|
||||
*/
|
||||
public function seq($set = 0)
|
||||
{
|
||||
if ($set !== 0) {
|
||||
$this->seq = $set;
|
||||
}
|
||||
return $this->seq;
|
||||
}
|
||||
/**
|
||||
* Get/set date
|
||||
*
|
||||
* @param integer $set
|
||||
* @return integer
|
||||
*/
|
||||
public function date($set = 0)
|
||||
{
|
||||
if ($set !== 0 && $set > $this->date) {
|
||||
$this->date = $set;
|
||||
}
|
||||
return $this->date;
|
||||
}
|
||||
}
|
@ -29,7 +29,6 @@ trait DialogHandler
|
||||
if (!isset($this->dialog_params['hash'])) {
|
||||
$this->dialog_params['hash'] = 0;
|
||||
}
|
||||
$this->updates_state['sync_loading'] = true;
|
||||
$res = ['dialogs' => [0], 'count' => 1];
|
||||
$datacenter = $this->datacenter->curdc;
|
||||
$peers = [];
|
||||
@ -76,8 +75,7 @@ trait DialogHandler
|
||||
}
|
||||
} finally {
|
||||
$this->postpone_updates = false;
|
||||
$this->updates_state['sync_loading'] = false;
|
||||
yield $this->handle_pending_updates_async();
|
||||
$this->handle_pending_updates_async();
|
||||
}
|
||||
|
||||
return $peers;
|
||||
|
Loading…
Reference in New Issue
Block a user