Avoid issues with bad connections

This commit is contained in:
Daniil Gentili 2019-09-13 18:03:18 +02:00
parent d681df8aff
commit 310b489dfb
5 changed files with 35 additions and 11 deletions

View File

@ -310,7 +310,7 @@ class Connection extends Session
$ctx->setReadCallback([$this, 'haveRead']); $ctx->setReadCallback([$this, 'haveRead']);
$this->stream = yield $ctx->getStream(); $this->stream = yield $ctx->getStream();
if (isset($this->old)) { if (isset($this->old)) {
unset($this->old); unset($this->old);
} }

View File

@ -495,19 +495,26 @@ class DataCenter
public function dcConnectAsync(string $dc_number, int $id = -1): \Generator public function dcConnectAsync(string $dc_number, int $id = -1): \Generator
{ {
if (isset($this->sockets[$dc_number]) && !isset($this->sockets[$dc_number]->old)) { $old = isset($this->sockets[$dc_number]) && (
isset($this->sockets[$dc_number]->old) ||
($id !== -1 && isset($this->sockets[$dc_number]->getConnection($id)->old))
);
if (isset($this->sockets[$dc_number]) && !$old) {
return false; return false;
} }
$ctxs = $this->generateContexts($dc_number); $ctxs = $this->generateContexts($dc_number);
if (empty($ctxs)) { if (empty($ctxs)) {
return false; return false;
} }
foreach ($ctxs as $ctx) { foreach ($ctxs as $ctx) {
try { try {
if (isset($this->sockets[$dc_number]->old)) { if ($old) {
$this->API->logger->logger("Reconnecting to DC $dc_number ($id) from existing", \danog\MadelineProto\Logger::WARNING);
$this->sockets[$dc_number]->setExtra($this->API); $this->sockets[$dc_number]->setExtra($this->API);
yield $this->sockets[$dc_number]->connect($ctx, $id); yield $this->sockets[$dc_number]->connect($ctx, $id);
} else { } else {
$this->API->logger->logger("Connecting to DC $dc_number from scratch", \danog\MadelineProto\Logger::WARNING);
$this->sockets[$dc_number] = new DataCenterConnection(); $this->sockets[$dc_number] = new DataCenterConnection();
$this->sockets[$dc_number]->setExtra($this->API); $this->sockets[$dc_number]->setExtra($this->API);
yield $this->sockets[$dc_number]->connect($ctx); yield $this->sockets[$dc_number]->connect($ctx);

View File

@ -450,8 +450,11 @@ class DataCenterConnection implements JsonSerializable
* *
* @return Connection * @return Connection
*/ */
public function getConnection(): Connection public function getConnection(int $id = -1): Connection
{ {
if ($id >= 0) {
return $this->connections[$id];
}
if (\count($this->availableConnections) <= 1) { if (\count($this->availableConnections) <= 1) {
return $this->connections[0]; return $this->connections[0];
} }

View File

@ -62,7 +62,6 @@ class ReadLoop extends SignalLoop
{ {
$this->connection = $connection; $this->connection = $connection;
$this->API = $connection->getExtra(); $this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $connection->getDatacenterID(); $this->datacenter = $connection->getDatacenterID();
$this->datacenterConnection = $connection->getShared(); $this->datacenterConnection = $connection->getShared();
} }
@ -81,9 +80,11 @@ class ReadLoop extends SignalLoop
if (isset($connection->old)) { if (isset($connection->old)) {
return; return;
} }
$API->logger->logger($e); Tools::callForkDefer((function () use ($API, $connection, $datacenter, $e) {
$API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR); $API->logger->logger($e);
Tools::callForkDefer($connection->reconnect()); $API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR);
yield $connection->reconnect();
})());
return; return;
} }

View File

@ -74,12 +74,23 @@ class WriteLoop extends ResumableSignalLoop
$please_wait = false; $please_wait = false;
while (true) { while (true) {
while (empty($connection->pending_outgoing) || $please_wait) { while (empty($connection->pending_outgoing) || $please_wait) {
if (isset($connection->old)) {
$API->logger->logger('Not writing because connection is old');
return;
}
$please_wait = false; $please_wait = false;
$API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE); $API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE);
if (yield $this->waitSignal($this->pause())) { if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this", Logger::ULTRA_VERBOSE);
return; return;
} }
$API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE); $API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE);
if (isset($connection->old)) {
$API->logger->logger('Not writing because connection is old');
return;
}
} }
$connection->writing(true); $connection->writing(true);
@ -89,9 +100,11 @@ class WriteLoop extends ResumableSignalLoop
if (isset($connection->old)) { if (isset($connection->old)) {
return; return;
} }
$API->logger($e); Tools::callForkDefer((function () use ($API, $connection, $datacenter, $e) {
$API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR); $API->logger->logger($e);
Tools::callForkDefer($connection->reconnect()); $API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR);
yield $connection->reconnect();
})());
return; return;
} finally { } finally {
$connection->writing(false); $connection->writing(false);