Async file upload with sendMedia & callback fixes

This commit is contained in:
Daniil Gentili 2018-12-27 18:17:18 +01:00
parent e5582beb87
commit 79f7d92767
4 changed files with 31 additions and 26 deletions

View File

@ -190,9 +190,9 @@ class Connection
} }
if ($message['method']) { if ($message['method']) {
$body = $this->API->serialize_method($message['_'], $body); $body = yield $this->API->serialize_method_async($message['_'], $body);
} else { } else {
$body = $this->API->serialize_object(['type' => $message['_']], $body, $message['_']); $body = yield $this->API->serialize_object_async(['type' => $message['_']], $body, $message['_']);
} }
if ($refresh_next) { if ($refresh_next) {
$this->API->referenceDatabase->refreshNext(false); $this->API->referenceDatabase->refreshNext(false);

View File

@ -187,11 +187,11 @@ class WriteLoop extends ResumableSignalLoop
if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') { if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') {
if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') { if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') {
$API->logger->logger(sprintf(\danog\MadelineProto\Lang::$current_lang['write_client_info'], $message['_']), \danog\MadelineProto\Logger::NOTICE); $API->logger->logger(sprintf(\danog\MadelineProto\Lang::$current_lang['write_client_info'], $message['_']), \danog\MadelineProto\Logger::NOTICE);
$MTmessage['body'] = $API->serialize_method( $MTmessage['body'] = yield $API->serialize_method_async(
'invokeWithLayer', 'invokeWithLayer',
[ [
'layer' => $API->settings['tl_schema']['layer'], 'layer' => $API->settings['tl_schema']['layer'],
'query' => $API->serialize_method( 'query' => yield $API->serialize_method_async(
'initConnection', 'initConnection',
[ [
'api_id' => $API->settings['app_info']['api_id'], 'api_id' => $API->settings['app_info']['api_id'],
@ -212,7 +212,7 @@ class WriteLoop extends ResumableSignalLoop
if (!isset($connection->call_queue[$message['queue']])) { if (!isset($connection->call_queue[$message['queue']])) {
$connection->call_queue[$message['queue']] = []; $connection->call_queue[$message['queue']] = [];
} }
$MTmessage['body'] = $API->serialize_method('invokeAfterMsgs', ['msg_ids' => $connection->call_queue[$message['queue']], 'query' => $MTmessage['body']]); $MTmessage['body'] = yield $API->serialize_method_async('invokeAfterMsgs', ['msg_ids' => $connection->call_queue[$message['queue']], 'query' => $MTmessage['body']]);
$connection->call_queue[$message['queue']][$message_id] = $message_id; $connection->call_queue[$message['queue']][$message_id] = $message_id;
if (count($connection->call_queue[$message['queue']]) > $API->settings['msg_array_limit']['call_queue']) { if (count($connection->call_queue[$message['queue']]) > $API->settings['msg_array_limit']['call_queue']) {
@ -224,7 +224,7 @@ class WriteLoop extends ResumableSignalLoop
/* if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) { /* if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) {
if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) { if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) {
$MTmessage['body'] = $API->serialize_object(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data'); $MTmessage['body'] = yield $API->serialize_object_async(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data');
$API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} }
unset($gzipped); unset($gzipped);
@ -259,7 +259,7 @@ class WriteLoop extends ResumableSignalLoop
$keys[$connection->pending_outgoing_key++] = $message_id; $keys[$connection->pending_outgoing_key++] = $message_id;
$connection->pending_outgoing_key %= Connection::PENDING_MAX; $connection->pending_outgoing_key %= Connection::PENDING_MAX;
$message_data = $API->serialize_object(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'); $message_data = yield $API->serialize_object_async(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container');
$message_data_length = strlen($message_data); $message_data_length = strlen($message_data);
$seq_no = $connection->generate_out_seq_no(false); $seq_no = $connection->generate_out_seq_no(false);

View File

@ -110,6 +110,7 @@ trait Files
), ),
['heavy' => true, 'file' => true, 'datacenter' => $datacenter] ['heavy' => true, 'file' => true, 'datacenter' => $datacenter]
); );
$cb(ftell($f) * 100 / $file_size);
$this->logger->logger('Speed for chunk: '.(($part_size * 8 / 1000000) / (microtime(true) - $t))); $this->logger->logger('Speed for chunk: '.(($part_size * 8 / 1000000) / (microtime(true) - $t)));
$part_num++; $part_num++;
$promises[] = $read_deferred->promise(); $promises[] = $read_deferred->promise();
@ -132,19 +133,14 @@ trait Files
fclose($f); fclose($f);
clearstatcache(); clearstatcache();
$this->logger->logger('Speed: '.((filesize($file) * 8) / (microtime(true) - $t) / 1000000)); $this->logger->logger('Speed: '.(($file_size * 8) / (microtime(true) - $t) / 1000000));
return $constructor; return $constructor;
} }
public function upload($file, $file_name = '', $cb = null, $encrypted = false, $datacenter = null) public function upload_encrypted_async($file, $file_name = '', $cb = null)
{ {
return $this->wait(call([$this, 'upload_async'], $file, $file_name, $cb, $encrypted, $datacenter)); return $this->upload_async($file, $file_name, $cb, true);
}
public function upload_encrypted($file, $file_name = '', $cb = null)
{
return $this->upload($file, $file_name, $cb, true);
} }
public function gen_all_file($media, $regenerate) public function gen_all_file($media, $regenerate)

View File

@ -256,7 +256,7 @@ trait TL
return $tl_elem['predicate'] === 'boolTrue'; return $tl_elem['predicate'] === 'boolTrue';
} }
public function serialize_object($type, $object, $ctx, $layer = -1) public function serialize_object_async($type, $object, $ctx, $layer = -1)
{ {
switch ($type['type']) { switch ($type['type']) {
case 'int': case 'int':
@ -366,7 +366,7 @@ trait TL
$concat = $this->constructors->find_by_predicate('vector')['id']; $concat = $this->constructors->find_by_predicate('vector')['id'];
$concat .= $this->pack_unsigned_int(count($object)); $concat .= $this->pack_unsigned_int(count($object));
foreach ($object as $k => $current_object) { foreach ($object as $k => $current_object) {
$concat .= $this->serialize_object(['type' => $type['subtype']], $current_object, $k); $concat .= yield $this->serialize_object_async(['type' => $type['subtype']], $current_object, $k);
} }
return $concat; return $concat;
@ -376,7 +376,7 @@ trait TL
} }
$concat = $this->pack_unsigned_int(count($object)); $concat = $this->pack_unsigned_int(count($object));
foreach ($object as $k => $current_object) { foreach ($object as $k => $current_object) {
$concat .= $this->serialize_object(['type' => $type['subtype']], $current_object, $k); $concat .= yield $this->serialize_object_async(['type' => $type['subtype']], $current_object, $k);
} }
return $concat; return $concat;
@ -430,10 +430,19 @@ trait TL
$concat = $constructorData['id']; $concat = $constructorData['id'];
} }
return $concat.$this->serialize_params($constructorData, $object, '', $layer); return $concat.yield $this->serialize_params($constructorData, $object, '', $layer);
} }
public function serialize_object($type, $object, $ctx, $layer = -1)
{
return $this->wait($this->serialize_object_async($type, $object, $ctx, $layer));
}
public function serialize_method($method, $arguments) public function serialize_method($method, $arguments)
{
return $this->wait($this->serialize_method_async($method, $arguments));
}
public function serialize_method_async($method, $arguments)
{ {
if ($method === 'messages.importChatInvite' && isset($arguments['hash']) && is_string($arguments['hash']) && preg_match('@(?:t|telegram)\.(?:me|dog)/(joinchat/)?([a-z0-9_-]*)@i', $arguments['hash'], $matches)) { if ($method === 'messages.importChatInvite' && isset($arguments['hash']) && is_string($arguments['hash']) && preg_match('@(?:t|telegram)\.(?:me|dog)/(joinchat/)?([a-z0-9_-]*)@i', $arguments['hash'], $matches)) {
if ($matches[1] === '') { if ($matches[1] === '') {
@ -464,7 +473,7 @@ trait TL
} elseif ($method === 'messages.sendEncryptedFile') { } elseif ($method === 'messages.sendEncryptedFile') {
if (isset($arguments['file'])) { if (isset($arguments['file'])) {
if (!is_array($arguments['file']) && $this->settings['upload']['allow_automatic_upload']) { if (!is_array($arguments['file']) && $this->settings['upload']['allow_automatic_upload']) {
$arguments['file'] = $this->upload_encrypted($arguments['file']); $arguments['file'] = yield $this->upload_encrypted_async($arguments['file']);
} }
if (isset($arguments['file']['key'])) { if (isset($arguments['file']['key'])) {
$arguments['message']['media']['key'] = $arguments['file']['key']; $arguments['message']['media']['key'] = $arguments['file']['key'];
@ -504,7 +513,7 @@ trait TL
throw new Exception(\danog\MadelineProto\Lang::$current_lang['method_not_found'].$method); throw new Exception(\danog\MadelineProto\Lang::$current_lang['method_not_found'].$method);
} }
return $tl['id'].$this->serialize_params($tl, $arguments, $method); return $tl['id'].yield $this->serialize_params($tl, $arguments, $method);
} }
public function serialize_params($tl, $arguments, $ctx, $layer = -1) public function serialize_params($tl, $arguments, $ctx, $layer = -1)
@ -540,11 +549,11 @@ trait TL
continue; continue;
} }
if ($current_argument['name'] === 'random_bytes') { if ($current_argument['name'] === 'random_bytes') {
$serialized .= $this->serialize_object(['type' => 'bytes'], $this->random(15 + 4 * $this->random_int($modulus = 3)), 'random_bytes'); $serialized .= yield $this->serialize_object_async(['type' => 'bytes'], $this->random(15 + 4 * $this->random_int($modulus = 3)), 'random_bytes');
continue; continue;
} }
if ($current_argument['name'] === 'data' && isset($tl['method']) && in_array($tl['method'], ['messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService']) && isset($arguments['message'])) { if ($current_argument['name'] === 'data' && isset($tl['method']) && in_array($tl['method'], ['messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService']) && isset($arguments['message'])) {
$serialized .= $this->serialize_object($current_argument, $this->encrypt_secret_message($arguments['peer']['chat_id'], $arguments['message']), 'data'); $serialized .= yield $this->serialize_object_async($current_argument, $this->encrypt_secret_message($arguments['peer']['chat_id'], $arguments['message']), 'data');
continue; continue;
} }
if ($current_argument['name'] === 'random_id') { if ($current_argument['name'] === 'random_id') {
@ -569,7 +578,7 @@ trait TL
continue; continue;
} }
if ($tl['type'] === 'InputMedia' && $current_argument['name'] === 'mime_type') { if ($tl['type'] === 'InputMedia' && $current_argument['name'] === 'mime_type') {
$serialized .= $this->serialize_object($current_argument, $arguments['file']['mime_type'], $current_argument['name'], $layer); $serialized .= yield $this->serialize_object_async($current_argument, $arguments['file']['mime_type'], $current_argument['name'], $layer);
continue; continue;
} }
if ($tl['type'] === 'DocumentAttribute' && in_array($current_argument['name'], ['w', 'h', 'duration'])) { if ($tl['type'] === 'DocumentAttribute' && in_array($current_argument['name'], ['w', 'h', 'duration'])) {
@ -603,7 +612,7 @@ trait TL
} }
if (!is_array($arguments[$current_argument['name']]) && $current_argument['type'] === 'InputFile' && $this->settings['upload']['allow_automatic_upload']) { if (!is_array($arguments[$current_argument['name']]) && $current_argument['type'] === 'InputFile' && $this->settings['upload']['allow_automatic_upload']) {
$arguments[$current_argument['name']] = $this->upload($arguments[$current_argument['name']]); $arguments[$current_argument['name']] = yield $this->upload_async($arguments[$current_argument['name']]);
} }
if ($current_argument['type'] === 'InputEncryptedChat' && (!is_array($arguments[$current_argument['name']]) || isset($arguments[$current_argument['name']]['_']) && $this->constructors->find_by_predicate($arguments[$current_argument['name']]['_'])['type'] !== $current_argument['type'])) { if ($current_argument['type'] === 'InputEncryptedChat' && (!is_array($arguments[$current_argument['name']]) || isset($arguments[$current_argument['name']]['_']) && $this->constructors->find_by_predicate($arguments[$current_argument['name']]['_'])['type'] !== $current_argument['type'])) {
@ -617,7 +626,7 @@ trait TL
} }
} }
//$this->logger->logger('Serializing '.$current_argument['name'].' of type '.$current_argument['type'); //$this->logger->logger('Serializing '.$current_argument['name'].' of type '.$current_argument['type');
$serialized .= $this->serialize_object($current_argument, $arguments[$current_argument['name']], $current_argument['name'], $layer); $serialized .= yield $this->serialize_object_async($current_argument, $arguments[$current_argument['name']], $current_argument['name'], $layer);
} }
return $serialized; return $serialized;