Handle updateShortSentMessages

This commit is contained in:
Daniil Gentili 2019-06-01 21:16:52 +02:00
parent b6fc41612a
commit fd03b39a46
12 changed files with 47 additions and 128 deletions

View File

@ -214,7 +214,7 @@ image: https://docs.madelineproto.xyz/favicons/android-chrome-256x256.png
$example = '';
if (!isset($this->settings['td'])) {
$example .= '### Can bots use this method: **'.($bot ? 'YES' : 'NO')."**\n\n\n";
$example .= str_replace('[]', '', '### MadelineProto Example ([now async!](https://docs.madelineproto.xyz/docs/ASYNC.html)):
$example .= str_replace('[]', '', '### MadelineProto Example ([now async for huge speed and parallelism!](https://docs.madelineproto.xyz/docs/ASYNC.html)):
```php

View File

@ -38,25 +38,19 @@ class CheckLoop extends ResumableSignalLoop
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop()
{
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
while (true) {
while (empty($connection->new_outgoing)) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
@ -64,7 +58,7 @@ class CheckLoop extends ResumableSignalLoop
if ($connection->hasPendingCalls()) {
$last_recv = $connection->get_max_id(true);
if ($connection->temp_auth_key !== null) {
$message_ids = $connection->getPendingCalls();//array_values($connection->new_outgoing);
$message_ids = $connection->getPendingCalls(); //array_values($connection->new_outgoing);
$deferred = new Deferred();
$deferred->promise()->onResolve(
function ($e, $result) use ($message_ids, $API, $connection, $datacenter) {
@ -140,9 +134,6 @@ class CheckLoop extends ResumableSignalLoop
$connection->writer->resume();
}
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
@ -155,9 +146,6 @@ class CheckLoop extends ResumableSignalLoop
}
} else {
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}

View File

@ -47,46 +47,28 @@ class HttpWaitLoop extends ResumableSignalLoop
$connection = $this->connection;
if (!in_array($connection->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()])) {
yield new Success(0);
return;
}
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
//var_dump("http loop DC $datacenter");
if ($a = yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
if (yield $this->waitSignal($this->pause($timeout))) {
return;
}
if (!in_array($connection->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()])) {
$this->exitedLoop();
yield new Success(0);
return;
}
while ($connection->temp_auth_key === null) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
//if (time() - $connection->last_http_wait >= $timeout) {
$API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}");
if ($connection->http_req_count === $connection->http_res_count && (!empty($connection->pending_outgoing) || (!empty($connection->new_outgoing) && !$connection->hasPendingCalls()))) {
yield $connection->sendMessage(['_' => 'http_wait', 'body' => ['max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'content_related' => true, 'unencrypted' => false, 'method' => false]);
//var_dump('sent wait');
}
$API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}");
//($connection->last_http_wait + $timeout) - time()
}
}

View File

@ -19,7 +19,6 @@
namespace danog\MadelineProto\Loop\Connection;
use Amp\Loop;
use Amp\Promise;
use Amp\Websocket\ClosedException;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\SignalLoop;
@ -52,18 +51,12 @@ class ReadLoop extends SignalLoop
$datacenter = $this->datacenter;
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
//$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
try {
$error = yield $this->waitSignal($this->readMessage());
} catch (NothingInTheSocketException $e) {
if (isset($connection->old)) {
$this->exitedLoop();
$API->logger->logger("Exiting $this");
return;
}
$API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR);
@ -107,14 +100,8 @@ class ReadLoop extends SignalLoop
$connection->http_res_count++;
try {
//$API->logger->logger("Handling messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
Loop::defer([$API, 'handle_messages'], $datacenter);
//$API->logger->logger("Handled messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} finally {
$this->exitedLoop();
}
$this->startedLoop();
Loop::defer([$API, 'handle_messages'], $datacenter);
if ($this->API->is_http($datacenter)) {
Loop::defer([$connection->waiter, 'resume']);
}

View File

@ -18,13 +18,12 @@
namespace danog\MadelineProto\Loop\Connection;
use Amp\Success;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\Magic;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Tools;
use danog\MadelineProto\Magic;
/**
* Socket write loop.
@ -45,41 +44,28 @@ class WriteLoop extends ResumableSignalLoop
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
}
public function loop(): \Generator
{
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$please_wait = false;
while (true) {
if (empty($connection->pending_outgoing) || $please_wait) {
$API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE);
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
yield new Success(0);
return;
}
$API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE);
}
try {
if ($connection->temp_auth_key === null) {
$res = $this->unencryptedWriteLoopAsync();
} else {
$res = $this->encryptedWriteLoopAsync();
}
$please_wait = yield $res;
} finally {
$this->exitedLoop();
if ($connection->temp_auth_key === null) {
$res = $this->unencryptedWriteLoopAsync();
} else {
$res = $this->encryptedWriteLoopAsync();
}
$this->startedLoop();
$please_wait = yield $res;
//$connection->waiter->resume();
}

View File

@ -45,22 +45,16 @@ class PeriodicFetcherLoop extends ResumableSignalLoop
public function __construct($API, $callback, $name)
{
$this->API = $API;
$this->callback = $callback->bindTo($this);
$this->callback = $callback;
$this->name = $name;
}
public function loop()
{
$API = $this->API;
$callback = $this->callback;
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (true) {
$timeout = yield $callback();
if ($timeout === self::STOP || yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}

View File

@ -19,8 +19,8 @@
namespace danog\MadelineProto\Loop\Impl;
use Amp\Promise;
use danog\MadelineProto\Loop\LoopInterface;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\LoopInterface;
/**
* Loop helper trait.
@ -48,14 +48,28 @@ abstract class Loop implements LoopInterface
return false;
}
$this->callFork($this->loop());
$this->callFork($this->loopImpl());
return true;
}
private function loopImpl()
{
$this->startedLoop();
$this->API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
try {
yield $this->loop();
} finally {
$this->exitedLoop();
$this->API->logger->logger("Exited $this", Logger::ULTRA_VERBOSE);
}
}
public function exitedLoop()
{
$this->count--;
if ($this->count) {
$this->count--;
}
}
public function startedLoop()
@ -68,5 +82,4 @@ abstract class Loop implements LoopInterface
return $this->count;
}
abstract public function __toString(): string;
}

View File

@ -38,4 +38,7 @@ interface LoopInterface
* @return void
*/
public function loop();
public function __toString(): string;
}

View File

@ -44,21 +44,14 @@ class FeedLoop extends ResumableSignalLoop
public function loop()
{
$API = $this->API;
$updater = ($this->updater = $API->updaters[$this->channelId]);
$this->updater = $API->updaters[$this->channelId];
if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0);
return false;
}
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
@ -67,21 +60,13 @@ class FeedLoop extends ResumableSignalLoop
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
$API->logger->logger("Resumed $this");
@ -263,6 +248,7 @@ class FeedLoop extends ResumableSignalLoop
continue;
}
$this->API->logger->logger('Getdiff fed me message of type '.$message['_']." in $this...", \danog\MadelineProto\Logger::VERBOSE);
$this->parsedUpdates[] = ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1];
}

View File

@ -41,21 +41,14 @@ class SeqLoop extends ResumableSignalLoop
public function loop()
{
$API = $this->API;
$feeder = $this->feeder = $API->feeders[false];
$this->feeder = $API->feeders[false];
if (!$this->API->settings['updates']['handle_updates']) {
yield new Success(0);
return false;
}
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
@ -64,21 +57,13 @@ class SeqLoop extends ResumableSignalLoop
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
}
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting $this");
$this->exitedLoop();
return;
}
while ($this->incomingUpdates) {

View File

@ -49,24 +49,17 @@ class UpdateLoop extends ResumableSignalLoop
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this due to signal");
$this->exitedLoop();
return;
}
}
$this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId);
$this->startedLoop();
$API->logger->logger("Entered $this", Logger::ULTRA_VERBOSE);
$timeout = $API->settings['updates']['getdifference_interval'];
while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this due to signal");
$this->exitedLoop();
return;
}
}
@ -90,7 +83,6 @@ class UpdateLoop extends ResumableSignalLoop
if (in_array($e->rpc, ['CHANNEL_PRIVATE', 'CHAT_FORBIDDEN'])) {
$feeder->signal(true);
$API->logger->logger("Channel private, exiting $this");
$this->exitedLoop();
return true;
}
throw $e;
@ -183,8 +175,6 @@ class UpdateLoop extends ResumableSignalLoop
if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this due to signal");
$this->exitedLoop();
return;
}
}

View File

@ -553,11 +553,12 @@ trait ResponseHandler
return;
}
$botAPI = isset($request['botAPI']) && $request['botAPI'];
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
if (isset($response['_']) && strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
$response['request'] = $request;
$this->callForkDefer($this->handle_updates_async($response));
}
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$this->logger->logger("Defer sending $r to deferred");
$this->callFork((
@ -610,6 +611,13 @@ trait ResponseHandler
case 'updateShort':
$this->feeders[yield $this->feeders[false]->feedSingle($updates['update'])]->resume();
break;
case 'updateShortSentMessage':
if (!isset($updates['request']['body'])) {
break;
}
$updates['user_id'] = $updates['request']['body']['peer'];
$updates['message'] = $updates['request']['body']['message'];
unset($updates['request']);
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
@ -637,9 +645,6 @@ trait ResponseHandler
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
$this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume();
break;
case 'updateShortSentMessage':
//yield $this->set_update_state_async(['date' => $updates['date']]);
break;
case 'updatesTooLong':
$this->updaters[false]->resume();
break;