From 53b72c8a2983270e1d04f4aeaa5e4d49e742ee95 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 7 Jun 2019 00:48:25 +0000 Subject: [PATCH] Various bugfixes --- docs | 2 +- src/danog/MadelineProto/API.php | 9 ++++- src/danog/MadelineProto/APIFactory.php | 4 +++ .../Loop/Connection/WriteLoop.php | 36 +++++++++---------- src/danog/MadelineProto/Loop/Impl/Loop.php | 2 +- .../MadelineProto/Loop/Update/FeedLoop.php | 1 + .../MadelineProto/Loop/Update/UpdateLoop.php | 30 ++++++++++------ src/danog/MadelineProto/MTProto.php | 5 +-- .../MTProtoTools/PeerHandler.php | 1 + .../MTProtoTools/ResponseHandler.php | 6 ++-- 10 files changed, 60 insertions(+), 36 deletions(-) diff --git a/docs b/docs index 9754f506..dc05dc5c 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 9754f506d36c41260096b384ed82677faf2ab5bf +Subproject commit dc05dc5cebfcec90ac7851928c522a4d635dbab6 diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index 1f3bc846..afefbdcb 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -48,6 +48,11 @@ class API extends APIFactory public function __construct_async($params, $settings, $deferred) { if (is_string($params)) { + if (!\danog\MadelineProto\Logger::$default) { + if (!isset($settings['logger']['logger_param'])) $settings['logger']['logger_param'] = Magic::$script_cwd.'/MadelineProto.log'; + if (!isset($settings['logger']['logger'])) $settings['logger']['logger'] = php_sapi_name() === 'cli' ? 3 : 2; + \danog\MadelineProto\Logger::constructor($settings['logger']['logger'], $settings['logger']['logger_param'], '', isset($settings['logger']['logger_level']) ? $settings['logger']['logger_level'] : Logger::VERBOSE, isset($settings['logger']['max_size']) ? $settings['logger']['max_size'] : 100 * 1024 * 1024); + } $realpaths = Serialization::realpaths($params); $this->session = $realpaths['file']; @@ -85,7 +90,6 @@ class API extends APIFactory if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') { throw $e; } - class_exists('\\Volatile'); $tounserialize = str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); foreach (['RSA', 'TL\\TLMethod', 'TL\\TLConstructor', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { @@ -96,6 +100,9 @@ class API extends APIFactory $tounserialize = str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize); } $unserialized = \danog\Serialization::unserialize($tounserialize); + } catch (\Throwable $e) { + Logger::log((string) $e, Logger::ERROR); + throw $e; } if ($unserialized instanceof \danog\PlaceHolder) { $unserialized = \danog\Serialization::unserialize($tounserialize); diff --git a/src/danog/MadelineProto/APIFactory.php b/src/danog/MadelineProto/APIFactory.php index e1238963..0a375ee5 100644 --- a/src/danog/MadelineProto/APIFactory.php +++ b/src/danog/MadelineProto/APIFactory.php @@ -160,6 +160,10 @@ class APIFactory extends AsyncConstruct if (Magic::is_fork() && !Magic::$processed_fork) { throw new Exception('Forking not supported, use async logic, instead: https://docs.madelineproto.xyz/docs/ASYNC.html'); } + if ($this->API->asyncInitPromise) { + yield $this->API->initAsync(); + $this->API->logger->logger('Finished init asynchronously'); + } if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) { Logger::log("Didn't serialize in a while, doing that now..."); $this->serialize($this->session); diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 854c4fef..34683034 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -53,7 +53,8 @@ class WriteLoop extends ResumableSignalLoop $please_wait = false; while (true) { - if (empty($connection->pending_outgoing) || $please_wait) { + while (empty($connection->pending_outgoing) || $please_wait) { + $please_wait = false; $API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE); if (yield $this->waitSignal($this->pause())) { return; @@ -94,12 +95,10 @@ class WriteLoop extends ResumableSignalLoop } $skipped_all = false; - $body = $message['serialized_body']; - $API->logger->logger("Sending {$message['_']} as unencrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id(); - $length = strlen($body); + $length = strlen($message['serialized_body']); $pad_length = -$length & 15; $pad_length += 16 * $this->random_int($modulus = 16); @@ -107,7 +106,7 @@ class WriteLoop extends ResumableSignalLoop $pad = $this->random($pad_length); $buffer = yield $connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length); - yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$body.$pad); + yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$message['serialized_body'].$pad); //var_dump("plain ".bin2hex($message_id)); $connection->http_req_count++; @@ -184,14 +183,18 @@ class WriteLoop extends ResumableSignalLoop $skipped = true; continue; } - - $body = $message['serialized_body']; + $body_length = strlen($message['serialized_body']); + $actual_length = $body_length + 32; + if ($total_length && $total_length + $actual_length > 32760 || $count >= 1020) { + $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); + break; + } $message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id($datacenter); $API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $body, 'seqno' => $connection->generate_out_seq_no($message['content_related'])]; + $MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generate_out_seq_no($message['content_related'])]; if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') { if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') { @@ -241,25 +244,22 @@ class WriteLoop extends ResumableSignalLoop } } $body_length = strlen($MTmessage['body']); - if ($total_length && $total_length + $body_length + 32 > 655360) { + $actual_length = $body_length + 32; + if ($total_length && $total_length + $actual_length > 32760) { $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); break; } $count++; - $total_length += $body_length + 32; + $total_length += $actual_length; $MTmessage['bytes'] = $body_length; $messages[] = $MTmessage; $keys[$k] = $message_id; - - if ($total_length && $total_length + 32 > 655360) { - $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); - break; - } } + $MTmessage = null; - if (count($messages) > 1) { - $API->logger->logger("Wrapping in msg_container as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); + if ($count > 1) { + $API->logger->logger("Wrapping in msg_container ($count messages of total size $total_length) as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $message_id = $connection->generate_message_id($datacenter); $connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => array_values($keys), 'content_related' => false, 'method' => false, 'unencrypted' => false]; @@ -272,7 +272,7 @@ class WriteLoop extends ResumableSignalLoop $message_data_length = strlen($message_data); $seq_no = $connection->generate_out_seq_no(false); - } elseif (count($messages)) { + } elseif ($count) { $message = $messages[0]; $message_data = $message['body']; $message_data_length = $message['bytes']; diff --git a/src/danog/MadelineProto/Loop/Impl/Loop.php b/src/danog/MadelineProto/Loop/Impl/Loop.php index 1f196b25..31026535 100644 --- a/src/danog/MadelineProto/Loop/Impl/Loop.php +++ b/src/danog/MadelineProto/Loop/Impl/Loop.php @@ -45,7 +45,7 @@ abstract class Loop implements LoopInterface public function start() { if ($this->count) { - $this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR); + //$this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR); return false; } diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index 8cbf58e0..89b5ec1d 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -81,6 +81,7 @@ class FeedLoop extends ResumableSignalLoop foreach ($parsedUpdates as $update) { yield $API->save_update_async($update); } + $parsedUpdates = null; $this->API->signalUpdate(); } } diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index 68cf4497..89b90236 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -47,7 +47,7 @@ class UpdateLoop extends ResumableSignalLoop $API = $this->API; $feeder = $this->feeder = $API->feeders[$this->channelId]; - while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + while (!$API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { if (yield $this->waitSignal($this->pause())) { $API->logger->logger("Exiting $this due to signal"); @@ -57,8 +57,9 @@ class UpdateLoop extends ResumableSignalLoop $this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId); $timeout = $API->settings['updates']['getdifference_interval']; + $first = true; while (true) { - while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + while (!$API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { if (yield $this->waitSignal($this->pause())) { $API->logger->logger("Exiting $this due to signal"); @@ -70,7 +71,7 @@ class UpdateLoop extends ResumableSignalLoop $this->toPts = null; while (true) { if ($this->channelId) { - $this->API->logger->logger('Resumed and fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $API->logger->logger('Resumed and fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); if ($state->pts() <= 1) { $limit = 10; } elseif ($API->authorization['user']['bot']) { @@ -81,10 +82,13 @@ class UpdateLoop extends ResumableSignalLoop $request_pts = $state->pts(); try { - $difference = yield $this->API->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $request_pts, 'limit' => $limit, 'force' => true], ['datacenter' => $this->API->datacenter->curdc]); + $difference = yield $API->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $request_pts, 'limit' => $limit, 'force' => true], ['datacenter' => $API->datacenter->curdc, 'postpone' => $first]); } catch (RPCErrorException $e) { if (in_array($e->rpc, ['CHANNEL_PRIVATE', 'CHAT_FORBIDDEN'])) { $feeder->signal(true); + unset($API->updaters[$this->channelId]); + unset($API->feeders[$this->channelId]); + $API->getChannelStates()->remove($this->channelId); $API->logger->logger("Channel private, exiting $this"); return true; @@ -92,6 +96,9 @@ class UpdateLoop extends ResumableSignalLoop } catch (Exception $e) { if (in_array($e->getMessage(), ['This peer is not present in the internal peer database'])) { $feeder->signal(true); + //$API->getChannelStates()->remove($this->channelId); + unset($API->updaters[$this->channelId]); + unset($API->feeders[$this->channelId]); $API->logger->logger("Channel private, exiting $this"); return true; @@ -103,7 +110,7 @@ class UpdateLoop extends ResumableSignalLoop $timeout = $difference['timeout']; } - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + $API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); switch ($difference['_']) { case 'updates.channelDifferenceEmpty': $state->update($difference); @@ -111,7 +118,7 @@ class UpdateLoop extends ResumableSignalLoop break 2; case 'updates.channelDifference': if ($request_pts >= $difference['pts'] && $request_pts > 1) { - $this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().', using '.($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE); + $API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().', using '.($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE); $difference['pts'] = $request_pts + 1; } $state->update($difference); @@ -137,10 +144,10 @@ class UpdateLoop extends ResumableSignalLoop throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); } } else { - $this->API->logger->logger('Resumed and fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $API->logger->logger('Resumed and fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $difference = yield $API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $API->settings['connection_settings']['default_dc']]); + $API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); switch ($difference['_']) { case 'updates.differenceEmpty': @@ -181,9 +188,10 @@ class UpdateLoop extends ResumableSignalLoop } } foreach ($result as $channelId => $boh) { - $this->API->feeders[$channelId]->resumeDefer(); + $API->feeders[$channelId]->resumeDefer(); } - $this->API->signalUpdate(); + $API->signalUpdate(); + $first = false; if (yield $this->waitSignal($this->pause($timeout))) { $API->logger->logger("Exiting $this due to signal"); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index e81424dc..ab1c6169 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -924,13 +924,14 @@ class MTProto extends AsyncConstruct implements TLCallback if (!isset($this->updaters[$channelId])) { $this->updaters[$channelId] = new UpdateLoop($this, $channelId); } - if ($this->feeders[$channelId]->start()) { + if ($this->feeders[$channelId]->start() && isset($this->feeders[$channelId])) { $this->feeders[$channelId]->resume(); } - if ($this->updaters[$channelId]->start()) { + if ($this->updaters[$channelId]->start() && isset($this->updaters[$channelId])) { $this->updaters[$channelId]->resume(); } } + foreach ($this->datacenter->sockets as $datacenter) { $datacenter->writer->resume(); } if ($this->seqUpdater->start()) { $this->seqUpdater->resume(); } diff --git a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php index 901ab015..b8261297 100644 --- a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php @@ -507,6 +507,7 @@ trait PeerHandler $res['type'] = $constructor['megagroup'] ? 'supergroup' : 'channel'; break; case 'channelForbidden': + throw new \danog\MadelineProto\Exception('This peer is not present in the internal peer database'); throw new \danog\MadelineProto\RPCErrorException('CHAT_FORBIDDEN'); default: throw new \danog\MadelineProto\Exception('Invalid constructor given '.var_export($constructor, true)); diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index ef064fc6..0b78c6c7 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -308,8 +308,9 @@ trait ResponseHandler if (isset($request['promise']) && is_object($request['promise'])) { Loop::defer(function () use (&$request, $data) { if (isset($request['promise'])) { - $request['promise']->fail($data); + $promise = $request['promise']; unset($request['promise']); + $promise->fail($data); } else { $this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-')); $this->logger->logger("Rejecting: $data"); @@ -572,8 +573,9 @@ trait ResponseHandler $response = yield $this->MTProto_to_botAPI_async($response); } if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug - $this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']->resolve($response); + $promise = $this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']; unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']); + $promise->resolve($response); } } )());