Improve retry management on unstable connections, peformance improvements

This commit is contained in:
Daniil Gentili 2018-03-22 02:07:44 +00:00
parent e8a349fa7b
commit eae850eedb
7 changed files with 188 additions and 176 deletions

View File

@ -34,7 +34,8 @@ class HttpProxy implements \danog\MadelineProto\Proxy
} }
public function setExtra(array $extra = []) { public function setExtra(array $extra = []) {
$this->extra = $extra; $this->extra = $extra;
$this->sock = new \Socket(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol); $name = $this->protocol === PHP_INT_MAX ? '\\FSocket' : '\\Socket';
$this->sock = new $name(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol);
} }
public function setOption($level, $name, $value) { public function setOption($level, $name, $value) {
return $this->sock->setOption($level, $name, $value); return $this->sock->setOption($level, $name, $value);

View File

@ -155,14 +155,14 @@ class Connection
if ($has_proxy) { if ($has_proxy) {
if ($this->extra !== []) { if ($this->extra !== []) {
$this->sock->setExtra($this->extra); $this->sock->setExtra($this->extra);
} }/*
if ($this->protocol === 'http') { if ($this->protocol === 'http') {
$this->parsed['path'] = $this->parsed['scheme'].'://'.$this->parsed['host']. $this->parsed['path'] = $this->parsed['scheme'].'://'.$this->parsed['host'].
$this->parsed['path']; $this->parsed['path'];
$port = 80; $port = 80;
} elseif ($this->protocol === 'https') { } elseif ($this->protocol === 'https') {
$port = 443; $port = 443;
} }*/
} }
$this->sock->setOption(\SOL_SOCKET, \SO_RCVTIMEO, $timeout); $this->sock->setOption(\SOL_SOCKET, \SO_RCVTIMEO, $timeout);
$this->sock->setOption(\SOL_SOCKET, \SO_SNDTIMEO, $timeout); $this->sock->setOption(\SOL_SOCKET, \SO_SNDTIMEO, $timeout);

View File

@ -129,7 +129,8 @@ class Logger
$prefix .= ' (p)'; $prefix .= ' (p)';
} }
if (!is_string($param)) { if (!is_string($param)) {
$param = json_encode($param, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); $parame = json_encode($param, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES);
if ($parame == '') $param = var_export($param, true); else $param = $parame;
} }
$param = str_pad(basename(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php').$prefix.': ', 16 + strlen($prefix))."\t".$param; $param = str_pad(basename(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php').$prefix.': ', 16 + strlen($prefix))."\t".$param;
/*if (self::$isatty) { /*if (self::$isatty) {

View File

@ -41,7 +41,7 @@ trait AckHandler
// || (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack']) && $this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'])) { // || (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack']) && $this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'])) {
return; return;
} }
$this->datacenter->sockets[$datacenter]->ack_queue[] = $message_id; $this->datacenter->sockets[$datacenter]->ack_queue[$message_id] = $message_id;
//$this->object_call('msgs_ack', ['msg_ids' => [$message_id]], ['datacenter' => $datacenter]); //$this->object_call('msgs_ack', ['msg_ids' => [$message_id]], ['datacenter' => $datacenter]);
return true; return true;
//$this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'] = true; //$this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'] = true;

View File

@ -91,6 +91,7 @@ trait CallHandler
$serialized = $this->serialize_object(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data'); $serialized = $this->serialize_object(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data');
\danog\MadelineProto\Logger::log('Using GZIP compression for '.$method.', saved '.($l - $g).' bytes of data, reduced call size by '.$g * 100 / $l.'%', \danog\MadelineProto\Logger::ULTRA_VERBOSE); \danog\MadelineProto\Logger::log('Using GZIP compression for '.$method.', saved '.($l - $g).' bytes of data, reduced call size by '.$g * 100 / $l.'%', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} }
$to_ack = [];
$last_recv = $this->datacenter->sockets[$aargs['datacenter']]->last_recv; $last_recv = $this->datacenter->sockets[$aargs['datacenter']]->last_recv;
for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) { for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) {
if ($canunset = !$this->updates_state['sync_loading']) { if ($canunset = !$this->updates_state['sync_loading']) {
@ -117,7 +118,7 @@ trait CallHandler
} }
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => $method, 'body' => $serialized, 'content_related' => $content_related, 'msg_id' => $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter'])]; $this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => $method, 'body' => $serialized, 'content_related' => $content_related, 'msg_id' => $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter'])];
if (count($this->datacenter->sockets[$aargs['datacenter']]->ack_queue)) { if (count($to_ack = $this->datacenter->sockets[$aargs['datacenter']]->ack_queue)) {
$this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => 'msgs_ack', 'body' => $this->serialize_object(['type' => 'msgs_ack'], ['msg_ids' => $this->datacenter->sockets[$aargs['datacenter']]->ack_queue], 'msgs_ack'), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])]; $this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => 'msgs_ack', 'body' => $this->serialize_object(['type' => 'msgs_ack'], ['msg_ids' => $this->datacenter->sockets[$aargs['datacenter']]->ack_queue], 'msgs_ack'), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])];
} }
if ($this->is_http($aargs['datacenter']) && $method !== 'http_wait') { if ($this->is_http($aargs['datacenter']) && $method !== 'http_wait') {
@ -129,10 +130,6 @@ trait CallHandler
$aargs['message_id'] = $message_id; $aargs['message_id'] = $message_id;
} }
if ($this->is_http($aargs['datacenter']) && isset($aargs['reopen'])) {
\danog\MadelineProto\Logger::log('Closing and reopening connection...', \danog\MadelineProto\Logger::WARNING);
$this->close_and_reopen($aargs['datacenter']);
}
if (isset($queue)) { if (isset($queue)) {
$this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue][] = $message_id; $this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue][] = $message_id;
if (count($this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue]) > $this->settings['msg_array_limit']['call_queue']) { if (count($this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue]) > $this->settings['msg_array_limit']['call_queue']) {
@ -198,10 +195,15 @@ trait CallHandler
} catch (\danog\MadelineProto\NothingInTheSocketException $e) { } catch (\danog\MadelineProto\NothingInTheSocketException $e) {
$last_error = 'Nothing in the socket'; $last_error = 'Nothing in the socket';
\danog\MadelineProto\Logger::log('An error getting response of method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Retrying...', \danog\MadelineProto\Logger::WARNING); \danog\MadelineProto\Logger::log('An error getting response of method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Retrying...', \danog\MadelineProto\Logger::WARNING);
if ($last_recv === $this->datacenter->sockets[$aargs['datacenter']]->last_recv || ($this->datacenter->sockets[$aargs['datacenter']]->last_recv < time() - 1 && $this->is_http($aargs['datacenter']))) {
$this->close_and_reopen($aargs['datacenter']);
}
$only_updates = false; $only_updates = false;
if ($last_recv === $this->datacenter->sockets[$aargs['datacenter']]->last_recv) { // the socket is dead, resend request
$this->close_and_reopen($aargs['datacenter']);
continue 2;
}
//if ($this->datacenter->sockets[$aargs['datacenter']]->last_recv < time() - 1 && $this->is_http($aargs['datacenter'])) {
// $this->close_and_reopen($aargs['datacenter']);
// continue 2;
//}
continue; //2; continue; //2;
} }
} }
@ -216,6 +218,12 @@ trait CallHandler
$this->postpone_updates = false; $this->postpone_updates = false;
$this->handle_pending_updates(); $this->handle_pending_updates();
} }
foreach ($to_ack as $msg_id) {
$this->datacenter->sockets[$aargs['datacenter']]->incoming_messages[$msg_id]['ack'] = true;
if (isset($this->datacenter->sockets[$aargs['datacenter']]->ack_queue[$msg_id])) {
unset($this->datacenter->sockets[$aargs['datacenter']]->ack_queue[$msg_id]);
}
}
if ($server_answer === null) { if ($server_answer === null) {
throw new \danog\MadelineProto\Exception("Couldn't get response"); throw new \danog\MadelineProto\Exception("Couldn't get response");
} }

View File

@ -28,7 +28,7 @@ trait MessageHandler
public function send_messages($datacenter) public function send_messages($datacenter)
{ {
$has_ack = false; //$has_ack = false;
if (count($this->datacenter->sockets[$datacenter]->object_queue) > 1) { if (count($this->datacenter->sockets[$datacenter]->object_queue) > 1) {
$messages = []; $messages = [];
@ -37,11 +37,11 @@ trait MessageHandler
foreach ($this->datacenter->sockets[$datacenter]->object_queue as $message) { foreach ($this->datacenter->sockets[$datacenter]->object_queue as $message) {
$message['seqno'] = $this->generate_out_seq_no($datacenter, $message['content_related']); $message['seqno'] = $this->generate_out_seq_no($datacenter, $message['content_related']);
$message['bytes'] = strlen($message['body']); $message['bytes'] = strlen($message['body']);
$has_ack = $has_ack || $message['_'] === 'msgs_ack'; //$has_ack = $has_ack || $message['_'] === 'msgs_ack';
\danog\MadelineProto\Logger::log("Inside of msg_container, sending {$message['_']} as encrypted message to DC $datacenter", \danog\MadelineProto\Logger::ULTRA_VERBOSE); \danog\MadelineProto\Logger::log("Inside of msg_container, sending {$message['_']} as encrypted message to DC $datacenter", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message['_'] = 'MTmessage'; $message['_'] = 'MTmessage';
$messages[] = $message; $messages[] = $message;
$this->datacenter->sockets[$datacenter]->outgoing_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'response' => -1, 'content' => $this->deserialize($message['body'], ['type' => '', 'datacenter' => $datacenter])]; $this->datacenter->sockets[$datacenter]->outgoing_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'response' => -1];//, 'content' => $this->deserialize($message['body'], ['type' => '', 'datacenter' => $datacenter])];
} }
$message_data = $this->serialize_object(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'lol'); $message_data = $this->serialize_object(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'lol');
$message_id = $this->generate_message_id($datacenter); $message_id = $this->generate_message_id($datacenter);
@ -68,12 +68,12 @@ trait MessageHandler
$this->datacenter->sockets[$datacenter]->send_message($message); $this->datacenter->sockets[$datacenter]->send_message($message);
$this->datacenter->sockets[$datacenter]->object_queue = []; $this->datacenter->sockets[$datacenter]->object_queue = [];
if ($has_ack) { /*if ($has_ack) {
foreach ($this->datacenter->sockets[$datacenter]->ack_queue as $msg_id) { foreach ($this->datacenter->sockets[$datacenter]->ack_queue as $msg_id) {
$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['ack'] = true; $this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['ack'] = true;
} }
$this->datacenter->sockets[$datacenter]->ack_queue = []; $this->datacenter->sockets[$datacenter]->ack_queue = [];
} }*/
} }
/** /**

View File

@ -67,6 +67,8 @@ trait ResponseHandler
// Acknowledge that I received the server's response // Acknowledge that I received the server's response
$this->ack_outgoing_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id'], $datacenter); $this->ack_outgoing_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id'], $datacenter);
// Acknowledge that the server received my request // Acknowledge that the server received my request
//\danog\MadelineProto\Logger::log($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']]);
//\danog\MadelineProto\Logger::log($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
$this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']]['response'] = $current_msg_id; $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']]['response'] = $current_msg_id;
$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['result']; $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['result'];
$this->check_in_seq_no($datacenter, $current_msg_id); $this->check_in_seq_no($datacenter, $current_msg_id);