Various bugfixes

This commit is contained in:
Daniil Gentili 2019-06-07 00:48:25 +00:00
parent 7bed1c6493
commit 53b72c8a29
10 changed files with 60 additions and 36 deletions

2
docs

@ -1 +1 @@
Subproject commit 9754f506d36c41260096b384ed82677faf2ab5bf Subproject commit dc05dc5cebfcec90ac7851928c522a4d635dbab6

View File

@ -48,6 +48,11 @@ class API extends APIFactory
public function __construct_async($params, $settings, $deferred) public function __construct_async($params, $settings, $deferred)
{ {
if (is_string($params)) { if (is_string($params)) {
if (!\danog\MadelineProto\Logger::$default) {
if (!isset($settings['logger']['logger_param'])) $settings['logger']['logger_param'] = Magic::$script_cwd.'/MadelineProto.log';
if (!isset($settings['logger']['logger'])) $settings['logger']['logger'] = php_sapi_name() === 'cli' ? 3 : 2;
\danog\MadelineProto\Logger::constructor($settings['logger']['logger'], $settings['logger']['logger_param'], '', isset($settings['logger']['logger_level']) ? $settings['logger']['logger_level'] : Logger::VERBOSE, isset($settings['logger']['max_size']) ? $settings['logger']['max_size'] : 100 * 1024 * 1024);
}
$realpaths = Serialization::realpaths($params); $realpaths = Serialization::realpaths($params);
$this->session = $realpaths['file']; $this->session = $realpaths['file'];
@ -85,7 +90,6 @@ class API extends APIFactory
if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') { if (defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
throw $e; throw $e;
} }
class_exists('\\Volatile'); class_exists('\\Volatile');
$tounserialize = str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); $tounserialize = str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
foreach (['RSA', 'TL\\TLMethod', 'TL\\TLConstructor', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { foreach (['RSA', 'TL\\TLMethod', 'TL\\TLConstructor', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
@ -96,6 +100,9 @@ class API extends APIFactory
$tounserialize = str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize); $tounserialize = str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize);
} }
$unserialized = \danog\Serialization::unserialize($tounserialize); $unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\Throwable $e) {
Logger::log((string) $e, Logger::ERROR);
throw $e;
} }
if ($unserialized instanceof \danog\PlaceHolder) { if ($unserialized instanceof \danog\PlaceHolder) {
$unserialized = \danog\Serialization::unserialize($tounserialize); $unserialized = \danog\Serialization::unserialize($tounserialize);

View File

@ -160,6 +160,10 @@ class APIFactory extends AsyncConstruct
if (Magic::is_fork() && !Magic::$processed_fork) { if (Magic::is_fork() && !Magic::$processed_fork) {
throw new Exception('Forking not supported, use async logic, instead: https://docs.madelineproto.xyz/docs/ASYNC.html'); throw new Exception('Forking not supported, use async logic, instead: https://docs.madelineproto.xyz/docs/ASYNC.html');
} }
if ($this->API->asyncInitPromise) {
yield $this->API->initAsync();
$this->API->logger->logger('Finished init asynchronously');
}
if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) { if (isset($this->session) && !is_null($this->session) && time() - $this->serialized > $this->API->settings['serialization']['serialization_interval']) {
Logger::log("Didn't serialize in a while, doing that now..."); Logger::log("Didn't serialize in a while, doing that now...");
$this->serialize($this->session); $this->serialize($this->session);

View File

@ -53,7 +53,8 @@ class WriteLoop extends ResumableSignalLoop
$please_wait = false; $please_wait = false;
while (true) { while (true) {
if (empty($connection->pending_outgoing) || $please_wait) { while (empty($connection->pending_outgoing) || $please_wait) {
$please_wait = false;
$API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE); $API->logger->logger("Waiting in $this", Logger::ULTRA_VERBOSE);
if (yield $this->waitSignal($this->pause())) { if (yield $this->waitSignal($this->pause())) {
return; return;
@ -94,12 +95,10 @@ class WriteLoop extends ResumableSignalLoop
} }
$skipped_all = false; $skipped_all = false;
$body = $message['serialized_body'];
$API->logger->logger("Sending {$message['_']} as unencrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger("Sending {$message['_']} as unencrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id(); $message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id();
$length = strlen($body); $length = strlen($message['serialized_body']);
$pad_length = -$length & 15; $pad_length = -$length & 15;
$pad_length += 16 * $this->random_int($modulus = 16); $pad_length += 16 * $this->random_int($modulus = 16);
@ -107,7 +106,7 @@ class WriteLoop extends ResumableSignalLoop
$pad = $this->random($pad_length); $pad = $this->random($pad_length);
$buffer = yield $connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length); $buffer = yield $connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length);
yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$body.$pad); yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$message['serialized_body'].$pad);
//var_dump("plain ".bin2hex($message_id)); //var_dump("plain ".bin2hex($message_id));
$connection->http_req_count++; $connection->http_req_count++;
@ -184,14 +183,18 @@ class WriteLoop extends ResumableSignalLoop
$skipped = true; $skipped = true;
continue; continue;
} }
$body_length = strlen($message['serialized_body']);
$body = $message['serialized_body']; $actual_length = $body_length + 32;
if ($total_length && $total_length + $actual_length > 32760 || $count >= 1020) {
$API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE);
break;
}
$message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id($datacenter); $message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id($datacenter);
$API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $body, 'seqno' => $connection->generate_out_seq_no($message['content_related'])]; $MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generate_out_seq_no($message['content_related'])];
if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') { if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') {
if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') { if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') {
@ -241,25 +244,22 @@ class WriteLoop extends ResumableSignalLoop
} }
} }
$body_length = strlen($MTmessage['body']); $body_length = strlen($MTmessage['body']);
if ($total_length && $total_length + $body_length + 32 > 655360) { $actual_length = $body_length + 32;
if ($total_length && $total_length + $actual_length > 32760) {
$API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE);
break; break;
} }
$count++; $count++;
$total_length += $body_length + 32; $total_length += $actual_length;
$MTmessage['bytes'] = $body_length; $MTmessage['bytes'] = $body_length;
$messages[] = $MTmessage; $messages[] = $MTmessage;
$keys[$k] = $message_id; $keys[$k] = $message_id;
if ($total_length && $total_length + 32 > 655360) {
$API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE);
break;
}
} }
$MTmessage = null;
if (count($messages) > 1) { if ($count > 1) {
$API->logger->logger("Wrapping in msg_container as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger("Wrapping in msg_container ($count messages of total size $total_length) as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_id = $connection->generate_message_id($datacenter); $message_id = $connection->generate_message_id($datacenter);
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => array_values($keys), 'content_related' => false, 'method' => false, 'unencrypted' => false]; $connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => array_values($keys), 'content_related' => false, 'method' => false, 'unencrypted' => false];
@ -272,7 +272,7 @@ class WriteLoop extends ResumableSignalLoop
$message_data_length = strlen($message_data); $message_data_length = strlen($message_data);
$seq_no = $connection->generate_out_seq_no(false); $seq_no = $connection->generate_out_seq_no(false);
} elseif (count($messages)) { } elseif ($count) {
$message = $messages[0]; $message = $messages[0];
$message_data = $message['body']; $message_data = $message['body'];
$message_data_length = $message['bytes']; $message_data_length = $message['bytes'];

View File

@ -45,7 +45,7 @@ abstract class Loop implements LoopInterface
public function start() public function start()
{ {
if ($this->count) { if ($this->count) {
$this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR); //$this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR);
return false; return false;
} }

View File

@ -81,6 +81,7 @@ class FeedLoop extends ResumableSignalLoop
foreach ($parsedUpdates as $update) { foreach ($parsedUpdates as $update) {
yield $API->save_update_async($update); yield $API->save_update_async($update);
} }
$parsedUpdates = null;
$this->API->signalUpdate(); $this->API->signalUpdate();
} }
} }

View File

@ -47,7 +47,7 @@ class UpdateLoop extends ResumableSignalLoop
$API = $this->API; $API = $this->API;
$feeder = $this->feeder = $API->feeders[$this->channelId]; $feeder = $this->feeder = $API->feeders[$this->channelId];
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { while (!$API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) { if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this due to signal"); $API->logger->logger("Exiting $this due to signal");
@ -57,8 +57,9 @@ class UpdateLoop extends ResumableSignalLoop
$this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId); $this->state = $state = $this->channelId === false ? (yield $API->load_update_state_async()) : $API->loadChannelState($this->channelId);
$timeout = $API->settings['updates']['getdifference_interval']; $timeout = $API->settings['updates']['getdifference_interval'];
$first = true;
while (true) { while (true) {
while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { while (!$API->settings['updates']['handle_updates'] || !$this->has_all_auth()) {
if (yield $this->waitSignal($this->pause())) { if (yield $this->waitSignal($this->pause())) {
$API->logger->logger("Exiting $this due to signal"); $API->logger->logger("Exiting $this due to signal");
@ -70,7 +71,7 @@ class UpdateLoop extends ResumableSignalLoop
$this->toPts = null; $this->toPts = null;
while (true) { while (true) {
if ($this->channelId) { if ($this->channelId) {
$this->API->logger->logger('Resumed and fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger('Resumed and fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($state->pts() <= 1) { if ($state->pts() <= 1) {
$limit = 10; $limit = 10;
} elseif ($API->authorization['user']['bot']) { } elseif ($API->authorization['user']['bot']) {
@ -81,10 +82,13 @@ class UpdateLoop extends ResumableSignalLoop
$request_pts = $state->pts(); $request_pts = $state->pts();
try { try {
$difference = yield $this->API->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $request_pts, 'limit' => $limit, 'force' => true], ['datacenter' => $this->API->datacenter->curdc]); $difference = yield $API->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $request_pts, 'limit' => $limit, 'force' => true], ['datacenter' => $API->datacenter->curdc, 'postpone' => $first]);
} catch (RPCErrorException $e) { } catch (RPCErrorException $e) {
if (in_array($e->rpc, ['CHANNEL_PRIVATE', 'CHAT_FORBIDDEN'])) { if (in_array($e->rpc, ['CHANNEL_PRIVATE', 'CHAT_FORBIDDEN'])) {
$feeder->signal(true); $feeder->signal(true);
unset($API->updaters[$this->channelId]);
unset($API->feeders[$this->channelId]);
$API->getChannelStates()->remove($this->channelId);
$API->logger->logger("Channel private, exiting $this"); $API->logger->logger("Channel private, exiting $this");
return true; return true;
@ -92,6 +96,9 @@ class UpdateLoop extends ResumableSignalLoop
} catch (Exception $e) { } catch (Exception $e) {
if (in_array($e->getMessage(), ['This peer is not present in the internal peer database'])) { if (in_array($e->getMessage(), ['This peer is not present in the internal peer database'])) {
$feeder->signal(true); $feeder->signal(true);
//$API->getChannelStates()->remove($this->channelId);
unset($API->updaters[$this->channelId]);
unset($API->feeders[$this->channelId]);
$API->logger->logger("Channel private, exiting $this"); $API->logger->logger("Channel private, exiting $this");
return true; return true;
@ -103,7 +110,7 @@ class UpdateLoop extends ResumableSignalLoop
$timeout = $difference['timeout']; $timeout = $difference['timeout'];
} }
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); $API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
switch ($difference['_']) { switch ($difference['_']) {
case 'updates.channelDifferenceEmpty': case 'updates.channelDifferenceEmpty':
$state->update($difference); $state->update($difference);
@ -111,7 +118,7 @@ class UpdateLoop extends ResumableSignalLoop
break 2; break 2;
case 'updates.channelDifference': case 'updates.channelDifference':
if ($request_pts >= $difference['pts'] && $request_pts > 1) { if ($request_pts >= $difference['pts'] && $request_pts > 1) {
$this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().', using '.($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE); $API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().', using '.($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE);
$difference['pts'] = $request_pts + 1; $difference['pts'] = $request_pts + 1;
} }
$state->update($difference); $state->update($difference);
@ -137,10 +144,10 @@ class UpdateLoop extends ResumableSignalLoop
throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true));
} }
} else { } else {
$this->API->logger->logger('Resumed and fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger('Resumed and fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); $difference = yield $API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $API->settings['connection_settings']['default_dc']]);
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
switch ($difference['_']) { switch ($difference['_']) {
case 'updates.differenceEmpty': case 'updates.differenceEmpty':
@ -181,9 +188,10 @@ class UpdateLoop extends ResumableSignalLoop
} }
} }
foreach ($result as $channelId => $boh) { foreach ($result as $channelId => $boh) {
$this->API->feeders[$channelId]->resumeDefer(); $API->feeders[$channelId]->resumeDefer();
} }
$this->API->signalUpdate(); $API->signalUpdate();
$first = false;
if (yield $this->waitSignal($this->pause($timeout))) { if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this due to signal"); $API->logger->logger("Exiting $this due to signal");

View File

@ -924,13 +924,14 @@ class MTProto extends AsyncConstruct implements TLCallback
if (!isset($this->updaters[$channelId])) { if (!isset($this->updaters[$channelId])) {
$this->updaters[$channelId] = new UpdateLoop($this, $channelId); $this->updaters[$channelId] = new UpdateLoop($this, $channelId);
} }
if ($this->feeders[$channelId]->start()) { if ($this->feeders[$channelId]->start() && isset($this->feeders[$channelId])) {
$this->feeders[$channelId]->resume(); $this->feeders[$channelId]->resume();
} }
if ($this->updaters[$channelId]->start()) { if ($this->updaters[$channelId]->start() && isset($this->updaters[$channelId])) {
$this->updaters[$channelId]->resume(); $this->updaters[$channelId]->resume();
} }
} }
foreach ($this->datacenter->sockets as $datacenter) { $datacenter->writer->resume(); }
if ($this->seqUpdater->start()) { if ($this->seqUpdater->start()) {
$this->seqUpdater->resume(); $this->seqUpdater->resume();
} }

View File

@ -507,6 +507,7 @@ trait PeerHandler
$res['type'] = $constructor['megagroup'] ? 'supergroup' : 'channel'; $res['type'] = $constructor['megagroup'] ? 'supergroup' : 'channel';
break; break;
case 'channelForbidden': case 'channelForbidden':
throw new \danog\MadelineProto\Exception('This peer is not present in the internal peer database');
throw new \danog\MadelineProto\RPCErrorException('CHAT_FORBIDDEN'); throw new \danog\MadelineProto\RPCErrorException('CHAT_FORBIDDEN');
default: default:
throw new \danog\MadelineProto\Exception('Invalid constructor given '.var_export($constructor, true)); throw new \danog\MadelineProto\Exception('Invalid constructor given '.var_export($constructor, true));

View File

@ -308,8 +308,9 @@ trait ResponseHandler
if (isset($request['promise']) && is_object($request['promise'])) { if (isset($request['promise']) && is_object($request['promise'])) {
Loop::defer(function () use (&$request, $data) { Loop::defer(function () use (&$request, $data) {
if (isset($request['promise'])) { if (isset($request['promise'])) {
$request['promise']->fail($data); $promise = $request['promise'];
unset($request['promise']); unset($request['promise']);
$promise->fail($data);
} else { } else {
$this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-')); $this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-'));
$this->logger->logger("Rejecting: $data"); $this->logger->logger("Rejecting: $data");
@ -572,8 +573,9 @@ trait ResponseHandler
$response = yield $this->MTProto_to_botAPI_async($response); $response = yield $this->MTProto_to_botAPI_async($response);
} }
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug
$this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']->resolve($response); $promise = $this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'];
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']); unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']);
$promise->resolve($response);
} }
} }
)()); )());