Optimized upload and download

This commit is contained in:
danogentili 2017-01-08 15:21:35 +03:00
parent 8f1b9fcf0f
commit 2012a3e1dd
5 changed files with 28 additions and 47 deletions

View File

@ -17,7 +17,7 @@ namespace danog\MadelineProto\MTProtoTools;
*/ */
trait CallHandler trait CallHandler
{ {
public function method_call($method, $args = [], $message_id = null) public function method_call($method, $args = [], $message_id = null, $heavy = false)
{ {
if (!is_array($args)) { if (!is_array($args)) {
throw new \danog\MadelineProto\Exception("Arguments aren't an array."); throw new \danog\MadelineProto\Exception("Arguments aren't an array.");
@ -38,10 +38,13 @@ trait CallHandler
$this->recv_message(); // This method receives data from the socket, and parses stuff $this->recv_message(); // This method receives data from the socket, and parses stuff
if (!isset($this->datacenter->outgoing_messages[$int_message_id]['response']) || !isset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content'])) { // Checks if I have received the response to the called method, if not continue looping if (!isset($this->datacenter->outgoing_messages[$int_message_id]['response']) || !isset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content'])) { // Checks if I have received the response to the called method, if not continue looping
if ($this->only_updates) $res_count--;
continue; continue;
} }
$server_answer = $this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content']; // continue was not called, so I got a response $server_answer = $this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content']; // continue was not called, so I got a response
if ($heavy) {
unset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]);
}
} catch (\danog\MadelineProto\Exception $e) { } catch (\danog\MadelineProto\Exception $e) {
\danog\MadelineProto\Logger::log('An error getting response of method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Retrying...'); \danog\MadelineProto\Logger::log('An error getting response of method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Retrying...');
continue; continue;
@ -105,6 +108,10 @@ trait CallHandler
$this->datacenter->close_and_reopen(); $this->datacenter->close_and_reopen();
sleep(1); // To avoid flooding sleep(1); // To avoid flooding
continue; continue;
} finally {
if ($heavy) {
unset($this->datacenter->outgoing_messages[$int_message_id]);
}
} }
if ($server_answer == null) { if ($server_answer == null) {
throw new \danog\MadelineProto\Exception('An error occurred while calling method '.$method.'.'); throw new \danog\MadelineProto\Exception('An error occurred while calling method '.$method.'.');

View File

@ -70,6 +70,7 @@ trait ResponseHandler
public function handle_messages() public function handle_messages()
{ {
foreach ($this->datacenter->new_incoming as $current_msg_id) { foreach ($this->datacenter->new_incoming as $current_msg_id) {
$this->only_updates = false;
$response = $this->datacenter->incoming_messages[$current_msg_id]['content']; $response = $this->datacenter->incoming_messages[$current_msg_id]['content'];
\danog\MadelineProto\Logger::log('Received '.$response['_'].'.'); \danog\MadelineProto\Logger::log('Received '.$response['_'].'.');
@ -244,6 +245,7 @@ trait ResponseHandler
case 'Updates': case 'Updates':
unset($this->datacenter->new_incoming[$current_msg_id]); unset($this->datacenter->new_incoming[$current_msg_id]);
$this->handle_updates($response); $this->handle_updates($response);
$this->only_updates = true;
break; break;
default: default:
\danog\MadelineProto\Logger::log('Trying to assign a response of type '.$response_type.' to its request...'); \danog\MadelineProto\Logger::log('Trying to assign a response of type '.$response_type.' to its request...');

View File

@ -182,9 +182,8 @@ trait UpdateHandler
case 'updateNewChannelMessage': case 'updateNewChannelMessage':
case 'updateEditChannelMessage': case 'updateEditChannelMessage':
if ($update['message']['_'] == 'messageEmpty') { if ($update['message']['_'] == 'messageEmpty') {
\danog\MadelineProto\Logger::log('Got message empty, saving...'); \danog\MadelineProto\Logger::log('Got message empty, not saving');
return false;
return $this->save_update($update);
} }
$channel_id = $update['message']['to_id']['channel_id']; $channel_id = $update['message']['to_id']['channel_id'];
break; break;
@ -206,13 +205,13 @@ trait UpdateHandler
} else { } else {
$cur_state = &$this->get_channel_state($channel_id, (isset($update['pts']) ? $update['pts'] : 0) - (isset($update['pts_count']) ? $update['pts_count'] : 0)); $cur_state = &$this->get_channel_state($channel_id, (isset($update['pts']) ? $update['pts'] : 0) - (isset($update['pts_count']) ? $update['pts_count'] : 0));
} }
/*
if ($cur_state['sync_loading']) { if ($cur_state['sync_loading']) {
\danog\MadelineProto\Logger::log('Sync loading, not handling update'); \danog\MadelineProto\Logger::log('Sync loading, not handling update');
return false; return false;
} }
*/
switch ($update['_']) { switch ($update['_']) {
case 'updateChannelTooLong': case 'updateChannelTooLong':
$this->get_channel_difference($channel_id); $this->get_channel_difference($channel_id);
@ -222,12 +221,11 @@ trait UpdateHandler
case 'updateEditMessage': case 'updateEditMessage':
case 'updateNewChannelMessage': case 'updateNewChannelMessage':
case 'updateEditChannelMessage': case 'updateEditChannelMessage':
$message = &$update['message']; if ((isset($update['message']['from_id']) && !$this->peer_isset($update['message']['from_id'])) ||
if ((isset($message['from_id']) && !$this->peer_isset($message['from_id'])) || !$this->peer_isset($update['message']['to_id']) ||
!$this->peer_isset($message['to_id']) || (isset($update['message']['via_bot_id']) && !$this->peer_isset($update['message']['via_bot_id'])) ||
(isset($message['via_bot_id']) && !$this->peer_isset($message['via_bot_id'])) || (isset($update['message']['entities']) && !$this->entities_peer_isset($update['message']['entities'])) ||
(isset($message['entities']) && !$this->entities_peer_isset($message['entities'])) || (isset($update['message']['fwd_from']) && !$this->fwd_peer_isset($update['message']['fwd_from']))) {
(isset($message['fwd_from']) && !$this->fwd_peer_isset($message['fwd_from']))) {
\danog\MadelineProto\Logger::log('Not enough data for message update, getting difference...'); \danog\MadelineProto\Logger::log('Not enough data for message update, getting difference...');
if ($channel_id !== false && $this->peer_isset('-100'.$channel_id)) { if ($channel_id !== false && $this->peer_isset('-100'.$channel_id)) {
@ -238,9 +236,6 @@ trait UpdateHandler
return false; return false;
} }
if (isset($message['from_id']) && $message['from_id'] == $this->datacenter->authorization['user']['id']) {
$message['out'] = true;
}
break; break;
default: default:
if ($channel_id !== false && !$this->peer_isset('channel#'.$channel_id)) { if ($channel_id !== false && !$this->peer_isset('channel#'.$channel_id)) {
@ -373,11 +368,11 @@ trait UpdateHandler
$this->handle_update($update, $options); $this->handle_update($update, $options);
continue 2; continue 2;
} }
$this->handle_update($update); $this->save_update($update);
} }
} else { } else {
foreach ($updates as $update) { foreach ($updates as $update) {
$this->handle_update($update); $this->save_update($update);
} }
} }
} }
@ -385,7 +380,7 @@ trait UpdateHandler
public function handle_update_messages($messages, $channel = false) public function handle_update_messages($messages, $channel = false)
{ {
foreach ($messages as $message) { foreach ($messages as $message) {
$this->handle_update(['_' => $channel == false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel == false ? $this->get_update_state()['pts'] : $this->get_channel_state($channel)['pts'], 'pts_count' => 0]); $this->save_update(['_' => $channel == false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => $channel == false ? $this->get_update_state()['pts'] : $this->get_channel_state($channel)['pts'], 'pts_count' => 0]);
} }
} }
@ -394,6 +389,9 @@ trait UpdateHandler
if (!$this->settings['updates']['handle_updates']) { if (!$this->settings['updates']['handle_updates']) {
return; return;
} }
if (isset($update['message']['from_id']) && $update['message']['from_id'] == $this->datacenter->authorization['user']['id']) {
$update['message']['out'] = true;
}
\danog\MadelineProto\Logger::log('Saving an update of type '.$update['_'].'...'); \danog\MadelineProto\Logger::log('Saving an update of type '.$update['_'].'...');
$this->settings['updates']['callback']($update); $this->settings['updates']['callback']($update);
} }

View File

@ -428,7 +428,7 @@ trait FilesHandler
$f = fopen($file, 'r'); $f = fopen($file, 'r');
fseek($f, 0); fseek($f, 0);
while (ftell($f) !== $file_size) { while (ftell($f) !== $file_size) {
if (!$this->API->method_call($method, ['file_id' => $file_id, 'file_part' => $part_num++, 'file_total_parts' => $part_total_num, 'bytes' => stream_get_contents($f, $part_size)])) { if (!$this->API->method_call($method, ['file_id' => $file_id, 'file_part' => $part_num++, 'file_total_parts' => $part_total_num, 'bytes' => stream_get_contents($f, $part_size)], null, true)) {
throw new \danog\MadelineProto\Exception('An error occurred while uploading file part '.$part_num); throw new \danog\MadelineProto\Exception('An error occurred while uploading file part '.$part_num);
} }
$cb(ftell($f) * 100 / $file_size); $cb(ftell($f) * 100 / $file_size);
@ -531,7 +531,7 @@ trait FilesHandler
$info = $this->get_download_info($message_media); $info = $this->get_download_info($message_media);
$offset = ftell($stream); $offset = ftell($stream);
$part_size = 512 * 1024; $part_size = 512 * 1024;
while (fwrite($stream, $this->API->method_call('upload.getFile', ['location' => $info['InputFileLocation'], 'offset' => $offset += $part_size, 'limit' => $part_size])['bytes']) != false) { while (fwrite($stream, $this->API->method_call('upload.getFile', ['location' => $info['InputFileLocation'], 'offset' => $offset += $part_size, 'limit' => $part_size], null, true)['bytes']) != false) {
$cb(ftell($stream) * 100 / $info['size']); $cb(ftell($stream) * 100 / $info['size']);
} }

View File

@ -111,29 +111,3 @@ foreach (['@pwrtelegramgroup', '@pwrtelegramgroupita'] as $peer) {
} }
sleep(5); sleep(5);
var_dump($MadelineProto->API->get_updates()); var_dump($MadelineProto->API->get_updates());
$MadelineProto = \danog\MadelineProto\Serialization::deserialize('session_old.madeline');
if (file_exists('number.php') && $MadelineProto === false) {
include_once 'number.php';
$settings['tl_schema'] = ['layer' => 46, 'src' => ['telegram' => __DIR__.'/src/danog/MadelineProto/TL_telegram_v46.tl']];
$MadelineProto = new \danog\MadelineProto\API($settings);
$checkedPhone = $MadelineProto->auth->checkPhone(// auth.checkPhone becomes auth->checkPhone
[
'phone_number' => $number,
]
);
\danog\MadelineProto\Logger::log($checkedPhone);
$sentCode = $MadelineProto->phone_login($number);
\danog\MadelineProto\Logger::log($sentCode);
echo 'Enter the code you received: ';
$code = fgets(STDIN, (isset($sentCode['type']['length']) ? $sentCode['type']['length'] : 5) + 1);
$authorization = $MadelineProto->complete_phone_login($code);
\danog\MadelineProto\Logger::log($authorization);
echo 'Serializing MadelineProto to session_old.madeline...'.PHP_EOL;
echo 'Wrote '.\danog\MadelineProto\Serialization::serialize('session_old.madeline', $MadelineProto).' bytes'.PHP_EOL;
}
echo 'Deserializing MadelineProto from session_old.madeline...'.PHP_EOL;
$MadelineProto = \danog\MadelineProto\Serialization::deserialize('session_old.madeline');