Avoid problems with reconnections

This commit is contained in:
Daniil Gentili 2019-12-29 14:04:02 +01:00
parent ca074f436e
commit c8baa8092d
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
4 changed files with 67 additions and 9 deletions

View File

@ -32,6 +32,7 @@ use Amp\Http\Client\Cookie\InMemoryCookieJar;
use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request; use Amp\Http\Client\Request;
use Amp\Promise;
use Amp\Socket\ConnectContext; use Amp\Socket\ConnectContext;
use Amp\Websocket\Client\Rfc6455Connector; use Amp\Websocket\Client\Rfc6455Connector;
use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\PermAuthKey;
@ -621,6 +622,17 @@ class DataCenter
{ {
return $this->sockets[$dc]->getConnection(); return $this->sockets[$dc]->getConnection();
} }
/**
* Get Connection instance asynchronously.
*
* @param string $dc DC ID
*
* @return Promise<Connection>
*/
public function waitGetConnection(string $dc): Promise
{
return $this->sockets[$dc]->waitGetConnection();
}
/** /**
* Get DataCenterConnection instance. * Get DataCenterConnection instance.
* *

View File

@ -18,6 +18,9 @@
namespace danog\MadelineProto; namespace danog\MadelineProto;
use Amp\Deferred;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Generic\PeriodicLoop;
use danog\MadelineProto\MTProto\AuthKey; use danog\MadelineProto\MTProto\AuthKey;
use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\PermAuthKey;
@ -34,6 +37,19 @@ class DataCenterConnection implements JsonSerializable
const READ_WEIGHT_MEDIA = 5; const READ_WEIGHT_MEDIA = 5;
const WRITE_WEIGHT = 10; const WRITE_WEIGHT = 10;
/**
* Promise for connection.
*
* @var Promise
*/
private $connectionsPromise;
/**
* Deferred for connection.
*
* @var Deferred
*/
private $connectionsDeferred;
/** /**
* Temporary auth key. * Temporary auth key.
* *
@ -379,6 +395,13 @@ class DataCenterConnection implements JsonSerializable
} }
yield $this->connectMore($count); yield $this->connectMore($count);
yield $this->restoreBackup(); yield $this->restoreBackup();
$this->connectionsPromise = new Success();
if ($this->connectionsDeferred) {
$connectionsDeferred = $this->connectionsDeferred;
$this->connectionsDeferred = null;
$connectionsDeferred->resolve();
}
} else { } else {
$this->availableConnections[$id] = 0; $this->availableConnections[$id] = 0;
yield $this->connections[$id]->connect($ctx); yield $this->connections[$id]->connect($ctx);
@ -435,6 +458,9 @@ class DataCenterConnection implements JsonSerializable
*/ */
public function disconnect() public function disconnect()
{ {
$this->connectionsDeferred = new Deferred;
$this->connectionsPromise = $this->connectionsDeferred->promise();
$this->API->logger->logger("Disconnecting from shared DC {$this->datacenter}"); $this->API->logger->logger("Disconnecting from shared DC {$this->datacenter}");
if ($this->robinLoop) { if ($this->robinLoop) {
$this->robinLoop->signal(true); $this->robinLoop->signal(true);
@ -500,6 +526,20 @@ class DataCenterConnection implements JsonSerializable
{ {
return $id < 0 ? \count($this->connections) : isset($this->connections[$id]); return $id < 0 ? \count($this->connections) : isset($this->connections[$id]);
} }
/**
* Get best socket in round robin, asynchronously.
*
* @return Promise<Connection>
*/
public function waitGetConnection(): Promise
{
if (empty($this->availableConnections)) {
return $this->connectionsPromise->onResolve(function ($e, $v) {
return $this->getConnection();
});
}
return new Success($this->getConnection());
}
/** /**
* Get best socket in round robin. * Get best socket in round robin.
* *

View File

@ -54,7 +54,11 @@ trait CallHandler
foreach ($message_ids as $message_id) { foreach ($message_ids as $message_id) {
if (isset($this->outgoing_messages[$message_id]['body'])) { if (isset($this->outgoing_messages[$message_id]['body'])) {
if ($datacenter) { if ($datacenter) {
$res = $this->API->datacenter->getConnection($datacenter)->sendMessage($this->outgoing_messages[$message_id], false); $res = $this->API->datacenter->waitGetConnection($datacenter)->onResolve(
function ($e, $r) use ($message_id) {
return $r->sendMessage($this->outgoing_messages[$message_id], false);
}
);
} else { } else {
$res = $this->sendMessage($this->outgoing_messages[$message_id], false); $res = $this->sendMessage($this->outgoing_messages[$message_id], false);
} }
@ -140,11 +144,13 @@ trait CallHandler
&& $args['id']['_'] === 'inputBotInlineMessageID' && $args['id']['_'] === 'inputBotInlineMessageID'
&& $this->datacenter !== $args['id']['dc_id'] && $this->datacenter !== $args['id']['dc_id']
) { ) {
return yield $this->API->datacenter->getConnection($args['id']['dc_id'])->methodCallAsyncWriteGenerator($method, $args, $aargs); $aargs['datacenter'] = $args['id']['dc_id'];
return $this->API->methodCallAsyncWriteGenerator($method, $args, $aargs);
} }
if (($aargs['file'] ?? false) && !$this->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) { if (($aargs['file'] ?? false) && !$this->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) {
$this->logger->logger('Using media DC'); $this->logger->logger('Using media DC');
return yield $this->API->datacenter->getConnection($this->datacenter.'_media')->methodCallAsyncWriteGenerator($method, $args, $aargs); $aargs['datacenter'] = $this->datacenter.'_media';
return $this->API->methodCallAsyncWriteGenerator($method, $args, $aargs);
} }
if (\in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) { if (\in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) {
$aargs['queue'] = 'secret'; $aargs['queue'] = 'secret';

View File

@ -49,11 +49,11 @@ trait CallHandler
* @param array $args Arguments * @param array $args Arguments
* @param array $aargs Additional arguments * @param array $aargs Additional arguments
* *
* @return Promise * @return \Generator<Promise>
*/ */
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{ {
return $this->datacenter->getConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->methodCallAsyncRead($method, $args, $aargs); return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncRead($method, $args, $aargs);
} }
/** /**
* Call method and make sure it is asynchronously sent. * Call method and make sure it is asynchronously sent.
@ -62,10 +62,10 @@ trait CallHandler
* @param array $args Arguments * @param array $args Arguments
* @param array $aargs Additional arguments * @param array $aargs Additional arguments
* *
* @return Promise * @return \Generator<Promise>
*/ */
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{ {
return $this->datacenter->getConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->methodCallAsyncWrite($method, $args, $aargs); return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncWrite($method, $args, $aargs);
} }
} }