This commit is contained in:
Daniil Gentili 2019-05-30 14:09:15 +02:00
parent 558afb9183
commit d42f677884
11 changed files with 176 additions and 122 deletions

View File

@ -46,7 +46,7 @@ class CheckLoop extends ResumableSignalLoop
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered check loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
@ -54,7 +54,7 @@ class CheckLoop extends ResumableSignalLoop
while (true) {
while (empty($connection->new_outgoing)) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting check loop in DC {$datacenter}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -140,7 +140,7 @@ class CheckLoop extends ResumableSignalLoop
$connection->writer->resume();
}
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting check loop in DC $datacenter");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -155,7 +155,7 @@ class CheckLoop extends ResumableSignalLoop
}
} else {
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting check loop in DC $datacenter");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -163,4 +163,9 @@ class CheckLoop extends ResumableSignalLoop
}
}
}
public function __toString(): string
{
return "check loop in DC {$this->datacenter}";
}
}

View File

@ -53,13 +53,13 @@ class HttpWaitLoop extends ResumableSignalLoop
}
$this->startedLoop();
$API->logger->logger("Entered HTTP wait loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
//var_dump("http loop DC $datacenter");
if ($a = yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting HTTP wait loop in DC $datacenter");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -72,7 +72,7 @@ class HttpWaitLoop extends ResumableSignalLoop
}
while ($connection->temp_auth_key === null) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting HTTP wait loop in DC $datacenter");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -89,4 +89,9 @@ class HttpWaitLoop extends ResumableSignalLoop
//($connection->last_http_wait + $timeout) - time()
}
}
public function __toString(): string
{
return "HTTP wait loop in DC {$this->datacenter}";
}
}

View File

@ -53,7 +53,7 @@ class ReadLoop extends SignalLoop
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered read loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
//$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
@ -62,7 +62,7 @@ class ReadLoop extends SignalLoop
} catch (NothingInTheSocketException $e) {
if (isset($connection->old)) {
$this->exitedLoop();
$API->logger->logger("Exiting read loop in DC $datacenter");
$API->logger->logger("Exiting $this");
return;
}
@ -231,4 +231,9 @@ class ReadLoop extends SignalLoop
return true;
}
public function __toString(): string
{
return "read loop in DC {$this->datacenter}";
}
}

View File

@ -53,20 +53,20 @@ class WriteLoop extends ResumableSignalLoop
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered write loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$please_wait = false;
while (true) {
if (empty($connection->pending_outgoing) || $please_wait) {
$API->logger->logger("Waiting in write loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE);
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting write loop in DC $datacenter");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
yield new Success(0);
return;
}
$API->logger->logger("Done waiting in write loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE);
}
try {
@ -345,4 +345,9 @@ class WriteLoop extends ResumableSignalLoop
$connection->pending_outgoing_key = 0;
return $skipped;
}
public function __toString(): string
{
return "write loop in DC {$this->datacenter}";
}
}

View File

@ -52,18 +52,22 @@ class PeriodicFetcherLoop extends ResumableSignalLoop
{
$API = $this->API;
$callback = $this->callback;
$name = $this->name;
$this->startedLoop();
$API->logger->logger("Entered $name loop", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (true) {
$timeout = yield $callback();
if ($timeout === self::STOP || yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $name loop");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
}
public function __toString(): string
{
return "{$this->name} loop";
}
}

View File

@ -44,7 +44,7 @@ abstract class Loop implements LoopInterface
public function start()
{
if ($this->count) {
$this->API->logger->logger("NOT entering loop with running count {$this->count}", Logger::ERROR);
$this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR);
return false;
}
@ -67,4 +67,6 @@ abstract class Loop implements LoopInterface
{
return $this->count;
}
abstract public function __toString(): string;
}

View File

@ -53,10 +53,10 @@ class FeedLoop extends ResumableSignalLoop
}
$this->startedLoop();
$API->logger->logger("Entered update feed loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop in channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -67,23 +67,24 @@ class FeedLoop extends ResumableSignalLoop
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
$API->logger->logger("Resumed $this");
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = [];
@ -95,9 +96,9 @@ class FeedLoop extends ResumableSignalLoop
yield $API->save_update_async($update);
}
$this->parsedUpdates = [];
}
if ($API->update_deferred) {
Loop::defer([$API->update_deferred, 'resolve']);
if ($API->update_deferred) {
Loop::defer([$API->update_deferred, 'resolve']);
}
}
}
@ -111,7 +112,7 @@ class FeedLoop extends ResumableSignalLoop
unset($updates[$key]);
if ($update['_'] === 'updateChannelTooLong') {
$this->API->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE);
$this->API->updaters[$this->channelId]->resumeDefer();
$this->API->updaters[$this->channelId]->resume();
continue;
}
@ -167,87 +168,85 @@ class FeedLoop extends ResumableSignalLoop
}
public function feedSingle($update)
{
if (!$this->channelId) {
$channelId = false;
switch ($update['_']) {
case 'updateChannelWebPage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$channelId = $update['message']['to_id']['channel_id'];
break;
case 'updateDeleteChannelMessages':
$channelId = $update['channel_id'];
break;
case 'updateChannelTooLong':
$channelId = $update['channel_id'];
if (!isset($update['pts'])) {
$update['pts'] = 1;
}
break;
}
if ($channelId && !$this->API->getChannelStates()->has($channelId)) {
$this->API->loadChannelState($channelId, $update);
if (!isset($this->API->feeders[$channelId])) {
$this->API->feeders[$channelId] = new FeedLoop($this, $channelId);
$channelId = false;
switch ($update['_']) {
case 'updateChannelWebPage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$channelId = $update['message']['to_id']['channel_id'];
break;
case 'updateDeleteChannelMessages':
$channelId = $update['channel_id'];
break;
case 'updateChannelTooLong':
$channelId = $update['channel_id'];
if (!isset($update['pts'])) {
$update['pts'] = 1;
}
if (!isset($this->API->updaters[$channelId])) {
$this->API->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->API->feeders[$channelId]->start();
$this->API->updaters[$channelId]->start();
}
switch ($update['_']) {
case 'updateNewMessage':
case 'updateEditMessage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$to = false;
$from = false;
$via_bot = false;
$entities = false;
if (($from = isset($update['message']['from_id']) && !yield $this->peer_isset_async($update['message']['from_id'])) ||
($to = !yield $this->peer_isset_async($update['message']['to_id'])) ||
($via_bot = isset($update['message']['via_bot_id']) && !yield $this->peer_isset_async($update['message']['via_bot_id'])) ||
($entities = isset($update['message']['entities']) && !yield $this->entities_peer_isset_async($update['message']['entities'])) // ||
//isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from'])
) {
$log = '';
if ($from) {
$log .= "from_id {$update['message']['from_id']}, ";
}
if ($to) {
$log .= "to_id ".json_encode($update['message']['to_id']).", ";
}
if ($via_bot) {
$log .= "via_bot {$update['message']['via_bot_id']}, ";
}
if ($entities) {
$log .= "entities ".json_encode($update['message']['entities']).", ";
}
$this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
$update = ['_' => 'updateChannelTooLong'];
}
break;
default:
if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) {
$this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return;
}
break;
}
if ($channelId) {
return $this->feeders[$channelId]->feedSingle($update);
}
break;
}
$this->logger->logger('Was fed an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
if ($channelId && !$this->API->getChannelStates()->has($channelId)) {
$this->API->loadChannelState($channelId, $update);
if (!isset($this->API->feeders[$channelId])) {
$this->API->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->API->updaters[$channelId])) {
$this->API->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->API->feeders[$channelId]->start();
$this->API->updaters[$channelId]->start();
}
switch ($update['_']) {
case 'updateNewMessage':
case 'updateEditMessage':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
$to = false;
$from = false;
$via_bot = false;
$entities = false;
if (($from = isset($update['message']['from_id']) && !yield $this->API->peer_isset_async($update['message']['from_id'])) ||
($to = !yield $this->API->peer_isset_async($update['message']['to_id'])) ||
($via_bot = isset($update['message']['via_bot_id']) && !yield $this->API->peer_isset_async($update['message']['via_bot_id'])) ||
($entities = isset($update['message']['entities']) && !yield $this->API->entities_peer_isset_async($update['message']['entities'])) // ||
//isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from'])
) {
$log = '';
if ($from) {
$log .= "from_id {$update['message']['from_id']}, ";
}
if ($to) {
$log .= "to_id ".json_encode($update['message']['to_id']).", ";
}
if ($via_bot) {
$log .= "via_bot {$update['message']['via_bot_id']}, ";
}
if ($entities) {
$log .= "entities ".json_encode($update['message']['entities']).", ";
}
$this->API->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
$update = ['_' => 'updateChannelTooLong'];
}
break;
default:
if ($channelId !== false && !yield $this->API->peer_isset_async($this->API->to_supergroup($channelId))) {
$this->API->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR);
return;
}
break;
}
if ($channelId !== $this->channelId) {
return yield $this->API->feeders[$channelId]->feedSingle($update);
}
$this->API->logger->logger('Was fed an update of type '.$update['_']." in $this...", \danog\MadelineProto\Logger::VERBOSE);
$this->incomingUpdates[] = $update;
return $this->channelId;
}
@ -276,4 +275,9 @@ class FeedLoop extends ResumableSignalLoop
return true;
}
public function __toString(): string
{
return !$this->channelId ? "update feed loop generic" : "update feed loop channel {$this->channelId}";
}
}

View File

@ -50,10 +50,10 @@ class SeqLoop extends ResumableSignalLoop
}
$this->startedLoop();
$API->logger->logger("Entered update seq loop", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -64,20 +64,20 @@ class SeqLoop extends ResumableSignalLoop
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update seq loop");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update seq loop");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
@ -134,6 +134,8 @@ class SeqLoop extends ResumableSignalLoop
}
public function feed($updates)
{
$this->API->logger->logger('Was fed updates of type '.$updates['_'].'...', \danog\MadelineProto\Logger::VERBOSE);
$this->incomingUpdates[] = $updates;
}
public function save($updates)
@ -158,4 +160,9 @@ class SeqLoop extends ResumableSignalLoop
return true;
}
public function __toString(): string
{
return "update seq loop";
}
}

View File

@ -46,7 +46,7 @@ class UpdateLoop extends ResumableSignalLoop
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update feed loop in channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -56,13 +56,13 @@ class UpdateLoop extends ResumableSignalLoop
$this->startedLoop();
$API->logger->logger("Entered updates loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE);
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['updates']['getdifference_interval'];
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting update loop in channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -73,7 +73,7 @@ class UpdateLoop extends ResumableSignalLoop
$this->toPts = null;
while (true) {
if ($this->channelId) {
$this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->API->logger->logger('Resumed and fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($state->pts() <= 1) {
$limit = 10;
} else if ($API->authorization['user']['bot']) {
@ -86,14 +86,13 @@ class UpdateLoop extends ResumableSignalLoop
$timeout = $difference['timeout'];
}
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
switch ($difference['_']) {
case 'updates.channelDifferenceEmpty':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
unset($difference);
break 2;
case 'updates.channelDifference':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
if ($state->pts() >= $difference['pts'] && $state->pts() > 1) {
$this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE);
$difference['pts'] = $state->pts() + 1;
@ -113,7 +112,6 @@ class UpdateLoop extends ResumableSignalLoop
unset($difference);
break 2;
case 'updates.channelDifferenceTooLong':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
$feeder->saveMessages($difference['messages']);
unset($difference);
@ -122,7 +120,7 @@ class UpdateLoop extends ResumableSignalLoop
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
}
} else {
$this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->API->logger->logger('Resumed and fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]);
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
@ -161,11 +159,11 @@ class UpdateLoop extends ResumableSignalLoop
}
}
}
foreach ($result as $channelId) {
foreach ($result as $channelId => $boh) {
$this->API->feeders[$channelId]->resumeDefer();
}
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting update loop in channel {$this->channelId}");
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
@ -190,4 +188,9 @@ class UpdateLoop extends ResumableSignalLoop
return true;
}
public function __toString(): string
{
return !$this->channelId ? "getUpdate loop generic" : "getUpdate loop channel {$this->channelId}";
}
}

View File

@ -236,6 +236,10 @@ class MTProto extends AsyncConstruct implements TLCallback
return $this->initing_authorization;
}
public function getHTTPClient()
{
return $this->datacenter->getHTTPClient();
}
public function __wakeup()
{
$backtrace = debug_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT, 3);
@ -848,13 +852,10 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
if (!isset($this->seqUpdater)) {
$this->seqUpdater = new SeqLoop($this);
}
$this->seqUpdater->start();
$this->datacenter->__construct($this, $this->settings['connection'], $this->settings['connection_settings']);
$dcs = [];
@ -875,6 +876,18 @@ class MTProto extends AsyncConstruct implements TLCallback
yield $this->get_phone_config_async();
foreach ($this->channels_state->get() as $state) {
$channelId = $state->getChannel();
if (!isset($this->feeders[$channelId])) {
$this->feeders[$channelId] = new FeedLoop($this, $channelId);
}
if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId);
}
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
$this->seqUpdater->start();
}
public function get_phone_config_async($watcherId = null)

View File

@ -574,6 +574,7 @@ trait ResponseHandler
switch ($updates['_']) {
case 'updates':
case 'updatesCombined':
$result = [];
foreach ($updates['updates'] as $key => $update) {
if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' ||
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||