From eae850eedbb1f3ca3022c9d036005db9e01e0634 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 22 Mar 2018 02:07:44 +0000 Subject: [PATCH] Improve retry management on unstable connections, peformance improvements --- src/HttpProxy.php | 319 +++++++++--------- src/danog/MadelineProto/Connection.php | 4 +- src/danog/MadelineProto/Logger.php | 3 +- .../MadelineProto/MTProtoTools/AckHandler.php | 2 +- .../MTProtoTools/CallHandler.php | 24 +- .../MTProtoTools/MessageHandler.php | 10 +- .../MTProtoTools/ResponseHandler.php | 2 + 7 files changed, 188 insertions(+), 176 deletions(-) diff --git a/src/HttpProxy.php b/src/HttpProxy.php index bbdb4c12..fe4073f9 100644 --- a/src/HttpProxy.php +++ b/src/HttpProxy.php @@ -1,160 +1,161 @@ -. -*/ - - -class HttpProxy implements \danog\MadelineProto\Proxy -{ - private $domain; - private $type; - private $protocol; - private $extra; - private $sock; - public function __construct($domain, $type, $protocol) { - if (!in_array($domain, [AF_INET, AF_INET6])) { - throw new \danog\MadelineProto\Exception('Wrong protocol family provided'); - } - if (!in_array($type, [SOCK_STREAM])) { - throw new \danog\MadelineProto\Exception('Wrong connection type provided'); +. +*/ + + +class HttpProxy implements \danog\MadelineProto\Proxy +{ + private $domain; + private $type; + private $protocol; + private $extra; + private $sock; + public function __construct($domain, $type, $protocol) { + if (!in_array($domain, [AF_INET, AF_INET6])) { + throw new \danog\MadelineProto\Exception('Wrong protocol family provided'); } - if (!in_array($protocol, [getprotobyname('tcp'), PHP_INT_MAX])) { - throw new \danog\MadelineProto\Exception('Wrong protocol provided'); - } - $this->domain = $domain; - $this->type = $type; - $this->protocol = $protocol; - } - public function setExtra(array $extra = []) { - $this->extra = $extra; - $this->sock = new \Socket(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol); - } - public function setOption($level, $name, $value) { - return $this->sock->setOption($level, $name, $value); - } - - public function getOption($level, $name) { - return $this->sock->getOption($level, $name); - } - - public function setBlocking($blocking) { - return $this->sock->setBlocking($blocking); - } - - public function bind($address, $port = 0) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - - public function listen($backlog = 0) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - public function accept() { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - - - public function select(array &$read, array &$write, array &$except, $tv_sec, $tv_usec = 0) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - public function connect($address, $port = 0) { - $this->sock->connect($this->extra['address'], $this->extra['port']); - - try { - if (strlen(inet_pton($address)) !== 4) { - $address = '['.$address.']'; - } - } catch (\danog\MadelineProto\Exception $e) { - } - $this->sock->write("CONNECT $address:$port HTTP/1.1\r\nHost: $address:$port\r\n\r\n"); - $response = $this->read_http_payload(); - if ($response['code'] !== 200) { - \danog\MadelineProto\Logger::log(trim($response['body'])); - throw new \danog\MadelineProto\Exception($response['description'], $response['code']); - } - \danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http'); - return true; - } - private function http_read($length) { - $packet = ''; - while (strlen($packet) < $length) { - $packet .= $this->sock->read($length - strlen($packet)); - if ($packet === false || strlen($packet) === 0) { - throw new \danog\MadelineProto\NothingInTheSocketException(\danog\MadelineProto\Lang::$current_lang['nothing_in_socket']); - } - } - return $packet; - } - public function read_http_line() - { - $line = ''; - while (($curchar = $this->http_read(1)) !== "\n") { - $line .= $curchar; - } - - return rtrim($line); - } - - public function read_http_payload() - { - $header = explode(' ', $this->read_http_line(), 3); - $protocol = $header[0]; - $code = (int) $header[1]; - $description = $header[2]; - $headers = []; - while (strlen($current_header = $this->read_http_line())) { - $current_header = explode(':', $current_header, 2); - $headers[strtolower($current_header[0])] = trim($current_header[1]); - } - - $read = ''; - if (isset($headers['content-length'])) { - $read = $this->http_read((int) $headers['content-length']); - }/* elseif (isset($headers['transfer-encoding']) && $headers['transfer-encoding'] === 'chunked') { - do { - $length = hexdec($this->read_http_line()); - $read .= $this->http_read($length); - $this->read_http_line(); - } while ($length); - }*/ - - return ['protocol' => $protocol, 'code' => $code, 'description' => $description, 'body' => $read, 'headers' => $headers]; - } - - public function read($length, $flags = 0) { - $read = $this->sock->read($length, $flags); - if ($read === 0) { - throw new \danog\MadelineProto\Exception('pls reconnect'); - } - return $read; - } - - public function write($buffer, $length = -1) { - return $this->sock->write($buffer, $length); - } - - public function send($data, $length, $flags) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - - public function close() { - $this->sock->close(); - } - - public function getPeerName($port = true) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - - public function getSockName($port = true) { - throw new \danog\MadelineProto\Exception('Not Implemented'); - } - public function getProxyHeaders() { - - } -} \ No newline at end of file + if (!in_array($type, [SOCK_STREAM])) { + throw new \danog\MadelineProto\Exception('Wrong connection type provided'); + } + if (!in_array($protocol, [getprotobyname('tcp'), PHP_INT_MAX])) { + throw new \danog\MadelineProto\Exception('Wrong protocol provided'); + } + $this->domain = $domain; + $this->type = $type; + $this->protocol = $protocol; + } + public function setExtra(array $extra = []) { + $this->extra = $extra; + $name = $this->protocol === PHP_INT_MAX ? '\\FSocket' : '\\Socket'; + $this->sock = new $name(strlen(@inet_pton($this->extra['address'])) !== 4 ? \AF_INET6 : \AF_INET, \SOCK_STREAM, $this->protocol); + } + public function setOption($level, $name, $value) { + return $this->sock->setOption($level, $name, $value); + } + + public function getOption($level, $name) { + return $this->sock->getOption($level, $name); + } + + public function setBlocking($blocking) { + return $this->sock->setBlocking($blocking); + } + + public function bind($address, $port = 0) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + + public function listen($backlog = 0) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + public function accept() { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + + + public function select(array &$read, array &$write, array &$except, $tv_sec, $tv_usec = 0) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + public function connect($address, $port = 0) { + $this->sock->connect($this->extra['address'], $this->extra['port']); + + try { + if (strlen(inet_pton($address)) !== 4) { + $address = '['.$address.']'; + } + } catch (\danog\MadelineProto\Exception $e) { + } + $this->sock->write("CONNECT $address:$port HTTP/1.1\r\nHost: $address:$port\r\n\r\n"); + $response = $this->read_http_payload(); + if ($response['code'] !== 200) { + \danog\MadelineProto\Logger::log(trim($response['body'])); + throw new \danog\MadelineProto\Exception($response['description'], $response['code']); + } + \danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http'); + return true; + } + private function http_read($length) { + $packet = ''; + while (strlen($packet) < $length) { + $packet .= $this->sock->read($length - strlen($packet)); + if ($packet === false || strlen($packet) === 0) { + throw new \danog\MadelineProto\NothingInTheSocketException(\danog\MadelineProto\Lang::$current_lang['nothing_in_socket']); + } + } + return $packet; + } + public function read_http_line() + { + $line = ''; + while (($curchar = $this->http_read(1)) !== "\n") { + $line .= $curchar; + } + + return rtrim($line); + } + + public function read_http_payload() + { + $header = explode(' ', $this->read_http_line(), 3); + $protocol = $header[0]; + $code = (int) $header[1]; + $description = $header[2]; + $headers = []; + while (strlen($current_header = $this->read_http_line())) { + $current_header = explode(':', $current_header, 2); + $headers[strtolower($current_header[0])] = trim($current_header[1]); + } + + $read = ''; + if (isset($headers['content-length'])) { + $read = $this->http_read((int) $headers['content-length']); + }/* elseif (isset($headers['transfer-encoding']) && $headers['transfer-encoding'] === 'chunked') { + do { + $length = hexdec($this->read_http_line()); + $read .= $this->http_read($length); + $this->read_http_line(); + } while ($length); + }*/ + + return ['protocol' => $protocol, 'code' => $code, 'description' => $description, 'body' => $read, 'headers' => $headers]; + } + + public function read($length, $flags = 0) { + $read = $this->sock->read($length, $flags); + if ($read === 0) { + throw new \danog\MadelineProto\Exception('pls reconnect'); + } + return $read; + } + + public function write($buffer, $length = -1) { + return $this->sock->write($buffer, $length); + } + + public function send($data, $length, $flags) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + + public function close() { + $this->sock->close(); + } + + public function getPeerName($port = true) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + + public function getSockName($port = true) { + throw new \danog\MadelineProto\Exception('Not Implemented'); + } + public function getProxyHeaders() { + + } +} diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index 1d71989f..fa5eeec2 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -155,14 +155,14 @@ class Connection if ($has_proxy) { if ($this->extra !== []) { $this->sock->setExtra($this->extra); - } + }/* if ($this->protocol === 'http') { $this->parsed['path'] = $this->parsed['scheme'].'://'.$this->parsed['host']. $this->parsed['path']; $port = 80; } elseif ($this->protocol === 'https') { $port = 443; - } + }*/ } $this->sock->setOption(\SOL_SOCKET, \SO_RCVTIMEO, $timeout); $this->sock->setOption(\SOL_SOCKET, \SO_SNDTIMEO, $timeout); diff --git a/src/danog/MadelineProto/Logger.php b/src/danog/MadelineProto/Logger.php index 5b7dba4c..0eb0ec87 100644 --- a/src/danog/MadelineProto/Logger.php +++ b/src/danog/MadelineProto/Logger.php @@ -129,7 +129,8 @@ class Logger $prefix .= ' (p)'; } if (!is_string($param)) { - $param = json_encode($param, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + $parame = json_encode($param, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + if ($parame == '') $param = var_export($param, true); else $param = $parame; } $param = str_pad(basename(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php').$prefix.': ', 16 + strlen($prefix))."\t".$param; /*if (self::$isatty) { diff --git a/src/danog/MadelineProto/MTProtoTools/AckHandler.php b/src/danog/MadelineProto/MTProtoTools/AckHandler.php index 985cfcbf..ef9ec729 100644 --- a/src/danog/MadelineProto/MTProtoTools/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AckHandler.php @@ -41,7 +41,7 @@ trait AckHandler // || (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack']) && $this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'])) { return; } - $this->datacenter->sockets[$datacenter]->ack_queue[] = $message_id; + $this->datacenter->sockets[$datacenter]->ack_queue[$message_id] = $message_id; //$this->object_call('msgs_ack', ['msg_ids' => [$message_id]], ['datacenter' => $datacenter]); return true; //$this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'] = true; diff --git a/src/danog/MadelineProto/MTProtoTools/CallHandler.php b/src/danog/MadelineProto/MTProtoTools/CallHandler.php index 04a2d448..20e6707d 100644 --- a/src/danog/MadelineProto/MTProtoTools/CallHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/CallHandler.php @@ -91,6 +91,7 @@ trait CallHandler $serialized = $this->serialize_object(['type' => 'gzip_packed'], ['packed_data' => $gzipped], 'gzipped data'); \danog\MadelineProto\Logger::log('Using GZIP compression for '.$method.', saved '.($l - $g).' bytes of data, reduced call size by '.$g * 100 / $l.'%', \danog\MadelineProto\Logger::ULTRA_VERBOSE); } + $to_ack = []; $last_recv = $this->datacenter->sockets[$aargs['datacenter']]->last_recv; for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) { if ($canunset = !$this->updates_state['sync_loading']) { @@ -117,7 +118,7 @@ trait CallHandler } $this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => $method, 'body' => $serialized, 'content_related' => $content_related, 'msg_id' => $message_id = isset($aargs['message_id']) ? $aargs['message_id'] : $this->generate_message_id($aargs['datacenter'])]; - if (count($this->datacenter->sockets[$aargs['datacenter']]->ack_queue)) { + if (count($to_ack = $this->datacenter->sockets[$aargs['datacenter']]->ack_queue)) { $this->datacenter->sockets[$aargs['datacenter']]->object_queue[] = ['_' => 'msgs_ack', 'body' => $this->serialize_object(['type' => 'msgs_ack'], ['msg_ids' => $this->datacenter->sockets[$aargs['datacenter']]->ack_queue], 'msgs_ack'), 'content_related' => false, 'msg_id' => $this->generate_message_id($aargs['datacenter'])]; } if ($this->is_http($aargs['datacenter']) && $method !== 'http_wait') { @@ -129,10 +130,6 @@ trait CallHandler $aargs['message_id'] = $message_id; } - if ($this->is_http($aargs['datacenter']) && isset($aargs['reopen'])) { - \danog\MadelineProto\Logger::log('Closing and reopening connection...', \danog\MadelineProto\Logger::WARNING); - $this->close_and_reopen($aargs['datacenter']); - } if (isset($queue)) { $this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue][] = $message_id; if (count($this->datacenter->sockets[$aargs['datacenter']]->call_queue[$queue]) > $this->settings['msg_array_limit']['call_queue']) { @@ -198,10 +195,15 @@ trait CallHandler } catch (\danog\MadelineProto\NothingInTheSocketException $e) { $last_error = 'Nothing in the socket'; \danog\MadelineProto\Logger::log('An error getting response of method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Retrying...', \danog\MadelineProto\Logger::WARNING); - if ($last_recv === $this->datacenter->sockets[$aargs['datacenter']]->last_recv || ($this->datacenter->sockets[$aargs['datacenter']]->last_recv < time() - 1 && $this->is_http($aargs['datacenter']))) { - $this->close_and_reopen($aargs['datacenter']); - } $only_updates = false; + if ($last_recv === $this->datacenter->sockets[$aargs['datacenter']]->last_recv) { // the socket is dead, resend request + $this->close_and_reopen($aargs['datacenter']); + continue 2; + } + //if ($this->datacenter->sockets[$aargs['datacenter']]->last_recv < time() - 1 && $this->is_http($aargs['datacenter'])) { + // $this->close_and_reopen($aargs['datacenter']); + // continue 2; + //} continue; //2; } } @@ -216,6 +218,12 @@ trait CallHandler $this->postpone_updates = false; $this->handle_pending_updates(); } + foreach ($to_ack as $msg_id) { + $this->datacenter->sockets[$aargs['datacenter']]->incoming_messages[$msg_id]['ack'] = true; + if (isset($this->datacenter->sockets[$aargs['datacenter']]->ack_queue[$msg_id])) { + unset($this->datacenter->sockets[$aargs['datacenter']]->ack_queue[$msg_id]); + } + } if ($server_answer === null) { throw new \danog\MadelineProto\Exception("Couldn't get response"); } diff --git a/src/danog/MadelineProto/MTProtoTools/MessageHandler.php b/src/danog/MadelineProto/MTProtoTools/MessageHandler.php index f17cfd9e..24c71fcd 100644 --- a/src/danog/MadelineProto/MTProtoTools/MessageHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/MessageHandler.php @@ -28,7 +28,7 @@ trait MessageHandler public function send_messages($datacenter) { - $has_ack = false; + //$has_ack = false; if (count($this->datacenter->sockets[$datacenter]->object_queue) > 1) { $messages = []; @@ -37,11 +37,11 @@ trait MessageHandler foreach ($this->datacenter->sockets[$datacenter]->object_queue as $message) { $message['seqno'] = $this->generate_out_seq_no($datacenter, $message['content_related']); $message['bytes'] = strlen($message['body']); - $has_ack = $has_ack || $message['_'] === 'msgs_ack'; + //$has_ack = $has_ack || $message['_'] === 'msgs_ack'; \danog\MadelineProto\Logger::log("Inside of msg_container, sending {$message['_']} as encrypted message to DC $datacenter", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $message['_'] = 'MTmessage'; $messages[] = $message; - $this->datacenter->sockets[$datacenter]->outgoing_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'response' => -1, 'content' => $this->deserialize($message['body'], ['type' => '', 'datacenter' => $datacenter])]; + $this->datacenter->sockets[$datacenter]->outgoing_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'response' => -1];//, 'content' => $this->deserialize($message['body'], ['type' => '', 'datacenter' => $datacenter])]; } $message_data = $this->serialize_object(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'lol'); $message_id = $this->generate_message_id($datacenter); @@ -68,12 +68,12 @@ trait MessageHandler $this->datacenter->sockets[$datacenter]->send_message($message); $this->datacenter->sockets[$datacenter]->object_queue = []; - if ($has_ack) { + /*if ($has_ack) { foreach ($this->datacenter->sockets[$datacenter]->ack_queue as $msg_id) { $this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['ack'] = true; } $this->datacenter->sockets[$datacenter]->ack_queue = []; - } + }*/ } /** diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 3b3301fd..be18b7b4 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -67,6 +67,8 @@ trait ResponseHandler // Acknowledge that I received the server's response $this->ack_outgoing_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id'], $datacenter); // Acknowledge that the server received my request + //\danog\MadelineProto\Logger::log($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']]); + //\danog\MadelineProto\Logger::log($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']]['response'] = $current_msg_id; $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['result']; $this->check_in_seq_no($datacenter, $current_msg_id);