Finish writing feeder logic

This commit is contained in:
Daniil Gentili 2019-05-30 13:28:50 +02:00
parent b4114cd5f8
commit 558afb9183
6 changed files with 159 additions and 156 deletions

View File

@ -18,10 +18,10 @@
namespace danog\MadelineProto\Loop\Update;
use Amp\Loop;
use Amp\Success;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use Amp\Loop;
/**
* update feed loop.
@ -96,7 +96,10 @@ class FeedLoop extends ResumableSignalLoop
}
$this->parsedUpdates = [];
}
if ($API->update_deferred) Loop::defer([$API->update_deferred, 'resolve']);
if ($API->update_deferred) {
Loop::defer([$API->update_deferred, 'resolve']);
}
}
}
public function parse($updates)
@ -152,20 +155,110 @@ class FeedLoop extends ResumableSignalLoop
}
public function feed($updates)
{
$this->incomingUpdates = array_merge($this->incomingUpdates, $updates);
$result = [];
foreach ($updates as $update) {
$res = $this->feedSingle($update);
if ($res instanceof \Generator) {
$res = yield $res;
}
$result[$res] = true;
}
return $result;
}
public function feedSingle($update)
{
$this->incomingUpdates []= $update;
if (!$this->channelId) {
$channelId = false;
switch ($update['_']) {
case 'updateChannelWebPage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$channelId = $update['message']['to_id']['channel_id'];
break;
case 'updateDeleteChannelMessages':
$channelId = $update['channel_id'];
break;
case 'updateChannelTooLong':
$channelId = $update['channel_id'];
if (!isset($update['pts'])) {
$update['pts'] = 1;
}
break;
}
if ($channelId && !$this->API->getChannelStates()->has($channelId)) {
$this->API->loadChannelState($channelId, $update);
if (!isset($this->API->feeders[$channelId])) {
$this->API->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->API->updaters[$channelId])) {
$this->API->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->API->feeders[$channelId]->start();
$this->API->updaters[$channelId]->start();
}
switch ($update['_']) {
case 'updateNewMessage':
case 'updateEditMessage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$to = false;
$from = false;
$via_bot = false;
$entities = false;
if (($from = isset($update['message']['from_id']) && !yield $this->peer_isset_async($update['message']['from_id'])) ||
($to = !yield $this->peer_isset_async($update['message']['to_id'])) ||
($via_bot = isset($update['message']['via_bot_id']) && !yield $this->peer_isset_async($update['message']['via_bot_id'])) ||
($entities = isset($update['message']['entities']) && !yield $this->entities_peer_isset_async($update['message']['entities'])) // ||
//isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from'])
) {
$log = '';
if ($from) {
$log .= "from_id {$update['message']['from_id']}, ";
}
if ($to) {
$log .= "to_id ".json_encode($update['message']['to_id']).", ";
}
if ($via_bot) {
$log .= "via_bot {$update['message']['via_bot_id']}, ";
}
if ($entities) {
$log .= "entities ".json_encode($update['message']['entities']).", ";
}
$this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
$update = ['_' => 'updateChannelTooLong'];
}
break;
default:
if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return;
}
break;
}
if ($channelId) {
return $this->feeders[$channelId]->feedSingle($update);
}
}
$this->logger->logger('Was fed an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
$this->incomingUpdates[] = $update;
return $this->channelId;
}
public function save($update)
{
$this->parsedUpdates []= $update;
$this->parsedUpdates[] = $update;
}
public function saveMessages($messages)
{
foreach ($messages as $message) {
$this->parsedUpdates []= ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1];
$this->parsedUpdates[] = ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1];
}
}

View File

@ -31,8 +31,8 @@ class SeqLoop extends ResumableSignalLoop
{
use \danog\MadelineProto\Tools;
private $incomingUpdates = [];
private $channelId;
private $feeder;
private $pendingWakeups = [];
public function __construct($API)
{
@ -81,21 +81,22 @@ class SeqLoop extends ResumableSignalLoop
$this->exitedLoop();
return;
}
$result = [];
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = [];
$result = array_merge(yield $this->parse($updates), $result);
yield $this->parse($updates);
$updates = null;
}
foreach ($result as $channelId => $boh) {
$this->API->feeders[$channelId]->resumeDefer();
while ($this->pendingWakeups) {
reset($this->pendingWakeups);
$channelId = key($this->pendingWakeups);
unset($this->pendingWakeups[$channelId]);
$this->API->feeders[$channelId]->resume();
}
}
}
public function parse($updates)
{
$fresult = [];
reset($updates);
while ($updates) {
$options = [];
@ -128,9 +129,8 @@ class SeqLoop extends ResumableSignalLoop
$this->state->date($options['date']);
}
$fresult = array_merge(yield $this->save($update), $fresult);
yield $this->save($update);
}
return $fresult;
}
public function feed($updates)
{
@ -138,11 +138,11 @@ class SeqLoop extends ResumableSignalLoop
}
public function save($updates)
{
$result = [];
foreach ($updates['updates'] as $update) {
$result[yield $this->API->feedSingle($update)] = true;
}
return $result;
$this->pendingWakeups = array_merge($this->pendingWakeups, yield $this->feeder->feed($updates['updates']));
}
public function addPendingWakeups($wakeups)
{
$this->pendingWakeups = array_merge($wakeups, $this->pendingWakeups);
}
public function has_all_auth()
{

View File

@ -68,6 +68,7 @@ class UpdateLoop extends ResumableSignalLoop
return;
}
}
$result = [];
$toPts = $this->toPts;
$this->toPts = null;
while (true) {
@ -98,7 +99,7 @@ class UpdateLoop extends ResumableSignalLoop
$difference['pts'] = $state->pts() + 1;
}
$state->update($difference);
$feeder->feed($difference['other_updates']);
$result = array_merge($result, yield $feeder->feed($difference['other_updates']));
$feeder->saveMessages($difference['new_messages']);
if (!$difference['final']) {
@ -135,8 +136,8 @@ class UpdateLoop extends ResumableSignalLoop
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
$result = array_merge($result, yield $feeder->feed($difference['other_updates']));
$result = array_merge($result, yield $feeder->feed($difference['new_encrypted_messages']));
$feeder->saveMessages($difference['new_messages']);
$state->update($difference['state']);
unset($difference);
@ -145,8 +146,8 @@ class UpdateLoop extends ResumableSignalLoop
foreach ($difference['new_encrypted_messages'] as &$encrypted) {
$encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted];
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
$result = array_merge($result, yield $feeder->feed($difference['other_updates']));
$result = array_merge($result, yield $feeder->feed($difference['new_encrypted_messages']));
$feeder->saveMessages($difference['new_messages']);
$state->update($difference['intermediate_state']);
if ($difference['intermediate_state']['pts'] >= $toPts) {
@ -160,7 +161,9 @@ class UpdateLoop extends ResumableSignalLoop
}
}
}
$feeder->resumeDefer();
foreach ($result as $channelId) {
$this->API->feeders[$channelId]->resumeDefer();
}
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting update loop in channel {$this->channelId}");
$this->exitedLoop();

View File

@ -21,15 +21,15 @@ namespace danog\MadelineProto;
use Amp\Loop;
use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
use danog\MadelineProto\Loop\Update\UpdateLoop;
use danog\MadelineProto\MTProtoTools\CombinedUpdatesState;
use danog\MadelineProto\MTProtoTools\ReferenceDatabase;
use danog\MadelineProto\MTProtoTools\UpdatesState;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\TL\TLCallback;
use danog\MadelineProto\Loop\Update\UpdateLoop;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
/**
* Manages all of the mtproto stuff.
@ -223,7 +223,7 @@ class MTProto extends AsyncConstruct implements TLCallback
public function __sleep()
{
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'pending_updates', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
}
public function isAltervista()
@ -840,6 +840,22 @@ class MTProto extends AsyncConstruct implements TLCallback
// Connects to all datacenters and if necessary creates authorization keys, binds them and writes client info
public function connect_to_all_dcs_async(): \Generator
{
foreach ($this->channels_state->get() as $state) {
$channelId = $state->getChannel();
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
if (!isset($this->seqUpdater)) {
$this->seqUpdater = new SeqLoop($this);
}
$this->seqUpdater->start();
$this->datacenter->__construct($this, $this->settings['connection'], $this->settings['connection_settings']);
$dcs = [];
foreach ($this->datacenter->get_dcs() as $new_dc) {
@ -859,21 +875,6 @@ class MTProto extends AsyncConstruct implements TLCallback
yield $this->get_phone_config_async();
foreach ($this->channels_state->get() as $state) {
$channelId = $state->getChannel();
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
if (!isset($this->seqUpdater)) {
$this->seqUpdater = new SeqLoop($this);
}
$this->seqUpdater->start();
}
public function get_phone_config_async($watcherId = null)

View File

@ -341,7 +341,7 @@ trait ResponseHandler
unset($request['serialized_body']);
}
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
}
@ -349,7 +349,7 @@ trait ResponseHandler
case 500:
if ($response['error_message'] === 'MSG_WAIT_FAILED') {
$this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = [];
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
}
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
@ -370,7 +370,7 @@ trait ResponseHandler
$this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc;
}
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]);
//$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
//$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
return;
case 401:
@ -511,7 +511,7 @@ trait ResponseHandler
switch ($response['error_code']) {
case 48:
$this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt'];
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
case 16:
@ -570,8 +570,7 @@ trait ResponseHandler
$updates = $actual_updates;
}
$this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
$result = [];
$this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
switch ($updates['_']) {
case 'updates':
case 'updatesCombined':
@ -580,11 +579,11 @@ trait ResponseHandler
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||
$update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' ||
$update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') {
$result[yield $this->feedSingle($update)] = true;
$result[yield $this->feeder[false]->feedSingle($update)] = true;
unset($updates['updates'][$key]);
}
}
$this->seqUpdater->addPendingWakeups($result);
if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') {
$updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
@ -592,19 +591,18 @@ trait ResponseHandler
$updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
}
$this->seqUpdater->feed($updates);
$this->seqUpdater->resumeDefer();
}
$this->seqUpdater->resume();
break;
case 'updateShort':
$updates['update']['options'] = ['date' => $updates['date']];
$this->feedSingle($updates['update']);
$this->feeders[yield $this->feeder[false]->feedSingle($update)]->resume();
break;
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
$to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']);
if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) {
yield $this->updaters[false]->resumeDefer();
yield $this->updaters[false]->resume();
return;
// TOFIX
}
@ -624,21 +622,17 @@ trait ResponseHandler
break;
}
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
$updates['update']['options'] = ['date' => $updates['date']];
$result[yield $this->feedSingle($update)] = true;
$this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume();
break;
case 'updateShortSentMessage':
//yield $this->set_update_state_async(['date' => $updates['date']]);
break;
case 'updatesTooLong':
$this->updaters[false]->resumeDefer();
$this->updaters[false]->resume();
break;
default:
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true));
break;
}
foreach ($result as $channelId => $Boh) {
$this->feeders[$channelId]->resumeDefer();
}
}
}

View File

@ -31,13 +31,11 @@ use danog\MadelineProto\Loop\Update\UpdateLoop;
*/
trait UpdateHandler
{
private $pending_updates = [];
private $updates_state;
private $got_state = false;
private $channels_state;
public $updates = [];
public $updates_key = 0;
public $last_getdifference = 0;
public function pwr_update_handler($update)
{
@ -132,9 +130,13 @@ trait UpdateHandler
return $this->updates_state;
}
public function loadChannelState($channelId = null)
public function loadChannelState($channelId = null, $init = [])
{
return $this->channels_state->get($channelId);
return $this->channels_state->get($channelId, $init);
}
public function getChannelStates()
{
return $this->channels_state;
}
public function get_updates_state_async()
@ -145,96 +147,6 @@ trait UpdateHandler
return $data;
}
public function feedSingle($update)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
$this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
$channelId = false;
switch ($update['_']) {
case 'updateChannelWebPage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$channelId = $update['message']['to_id']['channel_id'];
break;
case 'updateDeleteChannelMessages':
$channelId = $update['channel_id'];
break;
case 'updateChannelTooLong':
$channelId = $update['channel_id'];
if (!$this->channels_state->has($channelId) && !isset($update['pts'])) {
$update['pts'] = 1;
}
break;
}
if ($channelId && !$this->channels_state->has($channelId)) {
$this->channels_state->get($channelId, $update);
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
switch ($update['_']) {
case 'updateNewMessage':
case 'updateEditMessage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$to = false;
$from = false;
$via_bot = false;
$entities = false;
if (($from = isset($update['message']['from_id']) && !yield $this->peer_isset_async($update['message']['from_id'])) ||
($to = !yield $this->peer_isset_async($update['message']['to_id'])) ||
($via_bot = isset($update['message']['via_bot_id']) && !yield $this->peer_isset_async($update['message']['via_bot_id'])) ||
($entities = isset($update['message']['entities']) && !yield $this->entities_peer_isset_async($update['message']['entities'])) // ||
//isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from'])
) {
$log = '';
if ($from) {
$log .= "from_id {$update['message']['from_id']}, ";
}
if ($to) {
$log .= "to_id ".json_encode($update['message']['to_id']).", ";
}
if ($via_bot) {
$log .= "via_bot {$update['message']['via_bot_id']}, ";
}
if ($entities) {
$log .= "entities ".json_encode($update['message']['entities']).", ";
}
$this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
if ($channelId !== false && yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->updaters[$channelId]->resumeDefer();
} else {
$this->updaters[false]->resumeDefer();
}
return;
}
break;
default:
if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return;
}
break;
}
$this->feeders[$channelId]->feedSingle($update);
return $channelId;
}
public function save_update_async($update)
{
if ($update['_'] === 'updateConfig') {