Better logging, better ack management

This commit is contained in:
Daniil Gentili 2018-03-01 13:22:18 +01:00
parent 01c02d5c12
commit 6acacdaa00
3 changed files with 29 additions and 12 deletions

View File

@ -83,7 +83,7 @@ trait CallHandler
}
if ($this->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($serialized)) > $this->settings['requests']['gzip_encode_if_gt'] && ($g = strlen($gzipped = gzencode($serialized))) < $l) {
$serialized = $this->serialize_object(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data');
\danog\MadelineProto\Logger::log(['Using GZIP compression for '.$method.', saved '.($l - $g).' bytes of data, reduced call size by '.$g * 100 / $l.'%'], \danog\MadelineProto\Logger::VERBOSE);
\danog\MadelineProto\Logger::log(['Using GZIP compression for '.$method.', saved '.($l - $g).' bytes of data, reduced call size by '.$g * 100 / $l.'%'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
}
$last_recv = $this->last_recv;
for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) {
@ -94,16 +94,22 @@ trait CallHandler
try {
\danog\MadelineProto\Logger::log(['Calling method (try number '.$count.' for '.$method.')...'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key !== null) {
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['body' => $serialized, 'content_related' => $content_related, 'msg_id' => $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter'])];
if (isset($message_id)) {
\danog\MadelineProto\Logger::log(['Clearing old method call'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if (isset($this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$message_id])) unset($this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$message_id]);
if (isset($this->datacenter->sockets[$aargs['datacenter']]->new_outgoing[$message_id])) unset($this->datacenter->sockets[$aargs['datacenter']]->new_outgoing[$message_id]);
}
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => $method, 'body' => $serialized, 'content_related' => $content_related, 'msg_id' => $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter'])];
if (count($this->datacenter->sockets[$aargs['datacenter']]->ack_queue)) {
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['body' => $this->serialize_object(['type' => 'msgs_ack'], ['msg_ids' => $this->datacenter->sockets[$aargs['datacenter']]->ack_queue], 'msgs_ack'), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => 'msgs_ack', 'body' => $this->serialize_object(['type' => 'msgs_ack'], ['msg_ids' => $this->datacenter->sockets[$aargs['datacenter']]->ack_queue], 'msgs_ack'), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
}
if ($this->is_http($aargs['datacenter']) && $method !== 'http_wait') {
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['body' => $this->serialize_method('http_wait', ['max_wait' => 500, 'wait_after' => 150, 'max_delay' => 500]), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => 'http_wait', 'body' => $this->serialize_method('http_wait', ['max_wait' => 500, 'wait_after' => 150, 'max_delay' => 500]), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
}
$this->send_messages($aargs['datacenter']);
} else {
$this->send_unencrypted_message($serialized, $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter']), $aargs['datacenter']);
$this->send_unencrypted_message($method, $serialized, $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter']), $aargs['datacenter']);
$aargs['message_id'] = $message_id;
}
@ -273,6 +279,6 @@ trait CallHandler
throw new \danog\MadelineProto\Exception('No datacenter provided');
}
$serialized = $this->serialize_object(['type' => $object], $args, $object);
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['body' => $serialized, 'content_related' => $this->content_related($object), 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => $object, 'body' => $serialized, 'content_related' => $this->content_related($object), 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
}
}

View File

@ -18,8 +18,9 @@ namespace danog\MadelineProto\MTProtoTools;
*/
trait MessageHandler
{
public function send_unencrypted_message($message_data, $message_id, $datacenter)
public function send_unencrypted_message($type, $message_data, $message_id, $datacenter)
{
\danog\MadelineProto\Logger::log(["Sending $type as unencrypted message"], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_data = "\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int(strlen($message_data)).$message_data;
$this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id] = ['response' => -1];
$this->datacenter->sockets[$datacenter]->send_message($message_data);
@ -27,11 +28,17 @@ trait MessageHandler
public function send_messages($datacenter)
{
$has_ack = false;
if (count($this->datacenter->sockets[$datacenter]->object_queue) > 1) {
$messages = [];
\danog\MadelineProto\Logger::log(["Sending msg_container as encrypted message"], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
foreach ($this->datacenter->sockets[$datacenter]->object_queue as $message) {
$message['seqno'] = $this->generate_out_seq_no($datacenter, $message['content_related']);
$message['bytes'] = strlen($message['body']);
$has_ack = $has_ack || $message['_'] === 'msgs_ack';
\danog\MadelineProto\Logger::log(["Inside of msg_container, sending {$message['_']} as encrypted message"], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message['_'] = 'MTmessage';
$messages[] = $message;
$this->datacenter->sockets[$datacenter]->outgoing_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'response' => -1, 'content' => $this->deserialize($message['body'], ['type' => '', 'datacenter' => $datacenter])];
@ -41,6 +48,7 @@ trait MessageHandler
$seq_no = $this->generate_out_seq_no($datacenter, false);
} elseif (count($this->datacenter->sockets[$datacenter]->object_queue)) {
$message = array_shift($this->datacenter->sockets[$datacenter]->object_queue);
\danog\MadelineProto\Logger::log(["Sending {$message['_']} as encrypted message"], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_data = $message['body'];
$message_id = $message['msg_id'];
$seq_no = $this->generate_out_seq_no($datacenter, $message['content_related']);
@ -59,10 +67,13 @@ trait MessageHandler
$this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id] = ['seq_no' => $seq_no, 'response' => -1];
$this->datacenter->sockets[$datacenter]->send_message($message);
$this->datacenter->sockets[$datacenter]->object_queue = [];
foreach ($this->datacenter->sockets[$datacenter]->ack_queue as $msg_id) {
$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['ack'] = true;
if ($has_ack) {
foreach ($this->datacenter->sockets[$datacenter]->ack_queue as $msg_id) {
$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['ack'] = true;
}
$this->datacenter->sockets[$datacenter]->ack_queue = [];
}
$this->datacenter->sockets[$datacenter]->ack_queue = [];
}
/**

View File

@ -48,7 +48,7 @@ trait ResponseHandler
$only_updates = true;
foreach ($this->datacenter->sockets[$datacenter]->new_incoming as $current_msg_id) {
$unset = false;
\danog\MadelineProto\Logger::log(['Received '.$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'].'.'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
\danog\MadelineProto\Logger::log([(isset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'].'.'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
//\danog\MadelineProto\Logger::log([$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if (\danog\MadelineProto\Logger::$has_thread && is_object(\Thread::getCurrentThread())) {
if (!$this->synchronized(function ($zis, $datacenter, $current_msg_id) {
@ -148,7 +148,7 @@ trait ResponseHandler
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['messages'] as $message) {
$this->check_message_id($message['msg_id'], ['outgoing' => false, 'datacenter' => $datacenter, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body']];
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body'], 'from_container' => true];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id'];
$this->handle_messages($datacenter);
}