Avoid memory leaking
This commit is contained in:
parent
a63131583b
commit
becc68f41b
@ -451,7 +451,11 @@ class DataCenterConnection implements JsonSerializable
|
|||||||
{
|
{
|
||||||
$backup = $this->connections[$id]->backupSession();
|
$backup = $this->connections[$id]->backupSession();
|
||||||
$list = '';
|
$list = '';
|
||||||
foreach ($backup as $message) {
|
foreach ($backup as $k => $message) {
|
||||||
|
if (($message['_'] ?? '') === 'msgs_state_req') {
|
||||||
|
unset($backup[$k]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
$list .= $message['_'] ?? '-';
|
$list .= $message['_'] ?? '-';
|
||||||
$list .= ', ';
|
$list .= ', ';
|
||||||
}
|
}
|
||||||
|
@ -66,6 +66,7 @@ class CheckLoop extends ResumableSignalLoop
|
|||||||
$shared = $this->datacenterConnection;
|
$shared = $this->datacenterConnection;
|
||||||
|
|
||||||
$timeout = $shared->getSettings()['timeout'];
|
$timeout = $shared->getSettings()['timeout'];
|
||||||
|
$timeoutResend = $timeout * $timeout; // Typically 25 seconds, good enough
|
||||||
while (true) {
|
while (true) {
|
||||||
while (empty($connection->new_outgoing)) {
|
while (empty($connection->new_outgoing)) {
|
||||||
if (yield $this->waitSignal($this->pause())) {
|
if (yield $this->waitSignal($this->pause())) {
|
||||||
@ -82,7 +83,7 @@ class CheckLoop extends ResumableSignalLoop
|
|||||||
foreach (\array_chunk($full_message_ids, 8192) as $message_ids) {
|
foreach (\array_chunk($full_message_ids, 8192) as $message_ids) {
|
||||||
$deferred = new Deferred();
|
$deferred = new Deferred();
|
||||||
$deferred->promise()->onResolve(
|
$deferred->promise()->onResolve(
|
||||||
function ($e, $result) use ($message_ids, $API, $connection, $datacenter) {
|
function ($e, $result) use ($message_ids, $API, $connection, $datacenter, $timeoutResend) {
|
||||||
if ($e) {
|
if ($e) {
|
||||||
$API->logger("Got exception in check loop for DC $datacenter");
|
$API->logger("Got exception in check loop for DC $datacenter");
|
||||||
$API->logger((string) $e);
|
$API->logger((string) $e);
|
||||||
@ -117,7 +118,12 @@ class CheckLoop extends ResumableSignalLoop
|
|||||||
break;
|
break;
|
||||||
case 4:
|
case 4:
|
||||||
if ($chr & 32) {
|
if ($chr & 32) {
|
||||||
|
if ($connection->new_outgoing[$message_id]['sent'] + $timeoutResend < \time()) {
|
||||||
|
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' received by server and is being processed for way too long, resending request...', \danog\MadelineProto\Logger::ERROR);
|
||||||
|
$connection->methodRecall('', ['message_id' => $message_id, 'postpone' => true]);
|
||||||
|
} else {
|
||||||
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' received by server and is being processed, waiting...', \danog\MadelineProto\Logger::ERROR);
|
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' received by server and is being processed, waiting...', \danog\MadelineProto\Logger::ERROR);
|
||||||
|
}
|
||||||
} elseif ($chr & 64) {
|
} elseif ($chr & 64) {
|
||||||
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' received by server and was already processed, requesting reply...', \danog\MadelineProto\Logger::ERROR);
|
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' received by server and was already processed, requesting reply...', \danog\MadelineProto\Logger::ERROR);
|
||||||
$reply[] = $message_id;
|
$reply[] = $message_id;
|
||||||
|
@ -92,14 +92,18 @@ trait AckHandler
|
|||||||
$settings = $this->shared->getSettings();
|
$settings = $this->shared->getSettings();
|
||||||
$timeout = $settings['timeout'];
|
$timeout = $settings['timeout'];
|
||||||
$pfs = $settings['pfs'];
|
$pfs = $settings['pfs'];
|
||||||
|
$unencrypted = !$this->shared->hasTempAuthKey();
|
||||||
|
$notBound = !$this->shared->isBound();
|
||||||
|
|
||||||
|
$pfsNotBound = $pfs && $notBound;
|
||||||
|
|
||||||
foreach ($this->new_outgoing as $message_id) {
|
foreach ($this->new_outgoing as $message_id) {
|
||||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||||
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
|
&& $unencrypted === $this->outgoing_messages[$message_id]['unencrypted']
|
||||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
||||||
) {
|
) {
|
||||||
if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
if ($pfsNotBound && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,7 +115,7 @@ trait AckHandler
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all pending calls.
|
* Get all pending calls (also clear pending state requests).
|
||||||
*
|
*
|
||||||
* @return array
|
* @return array
|
||||||
*/
|
*/
|
||||||
@ -120,15 +124,23 @@ trait AckHandler
|
|||||||
$settings = $this->shared->getSettings();
|
$settings = $this->shared->getSettings();
|
||||||
$timeout = $settings['timeout'];
|
$timeout = $settings['timeout'];
|
||||||
$pfs = $settings['pfs'];
|
$pfs = $settings['pfs'];
|
||||||
|
$unencrypted = !$this->shared->hasTempAuthKey();
|
||||||
|
$notBound = !$this->shared->isBound();
|
||||||
|
|
||||||
|
$pfsNotBound = $pfs && $notBound;
|
||||||
|
|
||||||
$result = [];
|
$result = [];
|
||||||
foreach ($this->new_outgoing as $message_id) {
|
foreach ($this->new_outgoing as $k => $message_id) {
|
||||||
if (isset($this->outgoing_messages[$message_id]['sent'])
|
if (isset($this->outgoing_messages[$message_id]['sent'])
|
||||||
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
|
||||||
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
|
&& $unencrypted === $this->outgoing_messages[$message_id]['unencrypted']
|
||||||
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
|
|
||||||
) {
|
) {
|
||||||
if ($pfs && !$this->shared->isBound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
if ($pfsNotBound && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ($this->outgoing_messages[$message_id]['_'] === 'msgs_state_req') {
|
||||||
|
unset($this->new_outgoing[$k], $this->outgoing_messages[$message_id]);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ trait CallHandler
|
|||||||
*
|
*
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
public function methodRecall(string $watcherId, array $args)
|
public function methodRecall(string $watcherId, array $args): void
|
||||||
{
|
{
|
||||||
$message_id = $args['message_id'];
|
$message_id = $args['message_id'];
|
||||||
$postpone = $args['postpone'] ?? false;
|
$postpone = $args['postpone'] ?? false;
|
||||||
|
Loading…
Reference in New Issue
Block a user