Reduce latency

This commit is contained in:
Daniil Gentili 2019-05-27 15:42:16 +02:00
parent ec6ba6ea76
commit ff271bbf68
2 changed files with 14 additions and 14 deletions

View File

@ -50,7 +50,7 @@ trait ResponseHandler
}
$info .= chr($cur_info);
}
$this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter])]['response'] = $req_msg_id;
$this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id;
}
public $n = 0;
@ -131,8 +131,8 @@ trait ResponseHandler
$this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id'];
}
ksort($this->datacenter->sockets[$datacenter]->new_incoming);
$this->handle_messages($datacenter);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
//$this->handle_messages($datacenter);
//$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
break;
@ -151,7 +151,6 @@ trait ResponseHandler
$this->datacenter->sockets[$datacenter]->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id'];
$this->handle_messages($datacenter);
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
@ -203,7 +202,7 @@ trait ResponseHandler
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
}
}
break;
@ -215,7 +214,7 @@ trait ResponseHandler
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
}
break;
case 'msg_resend_req':
@ -231,7 +230,7 @@ trait ResponseHandler
}
if ($ok) {
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
$this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter]);
$this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]);
}
} else {
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
@ -245,7 +244,7 @@ trait ResponseHandler
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']) && isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']])) {
$this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true]));
}
}
break;
@ -292,6 +291,8 @@ trait ResponseHandler
break;
}
}
$this->datacenter->sockets[$datacenter]->writer->resume();
//$this->n--;
return $only_updates;
@ -341,7 +342,7 @@ trait ResponseHandler
unset($request['serialized_body']);
}
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
}
@ -349,7 +350,7 @@ trait ResponseHandler
case 500:
if ($response['error_message'] === 'MSG_WAIT_FAILED') {
$this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = [];
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
}
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
@ -369,8 +370,8 @@ trait ResponseHandler
if (isset($request['user_related']) && $request['user_related']) {
$this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc;
}
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]);
//$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
return;
case 401:
@ -511,7 +512,7 @@ trait ResponseHandler
switch ($response['error_code']) {
case 48:
$this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt'];
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
case 16:

View File

@ -24,7 +24,6 @@ use danog\MadelineProto\Stream\Async\Stream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
use danog\MadelineProto\Stream\BufferInterface;
use danog\MadelineProto\Stream\ConnectionContext;
use function Amp\call;
/**
* Hash stream wrapper.
@ -275,7 +274,7 @@ class HashedBufferedStream implements BufferedProxyStreamInterface, BufferInterf
return $this->read_buffer->bufferRead($length);
}
return call($this->bufferReadAsync($length));
return $this->call($this->bufferReadAsync($length));
}
/**