diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 15fbc1cf..4cbe2cb6 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -50,7 +50,7 @@ trait ResponseHandler } $info .= chr($cur_info); } - $this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter])]['response'] = $req_msg_id; + $this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id; } public $n = 0; @@ -131,8 +131,8 @@ trait ResponseHandler $this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id']; } ksort($this->datacenter->sockets[$datacenter]->new_incoming); - $this->handle_messages($datacenter); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + //$this->handle_messages($datacenter); + //$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); break; @@ -151,7 +151,6 @@ trait ResponseHandler $this->datacenter->sockets[$datacenter]->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]); $this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']]; $this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id']; - $this->handle_messages($datacenter); } unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); @@ -203,7 +202,7 @@ trait ResponseHandler if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { $this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); } } break; @@ -215,7 +214,7 @@ trait ResponseHandler if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { $this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); } break; case 'msg_resend_req': @@ -231,7 +230,7 @@ trait ResponseHandler } if ($ok) { foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { - $this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter]); + $this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]); } } else { $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); @@ -245,7 +244,7 @@ trait ResponseHandler $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']) && isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']])) { - $this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter])); + $this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true])); } } break; @@ -292,6 +291,8 @@ trait ResponseHandler break; } } + $this->datacenter->sockets[$datacenter]->writer->resume(); + //$this->n--; return $only_updates; @@ -341,7 +342,7 @@ trait ResponseHandler unset($request['serialized_body']); } - Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; } @@ -349,7 +350,7 @@ trait ResponseHandler case 500: if ($response['error_message'] === 'MSG_WAIT_FAILED') { $this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = []; - Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; } $this->got_response_for_outgoing_message_id($request_id, $datacenter); @@ -369,8 +370,8 @@ trait ResponseHandler if (isset($request['user_related']) && $request['user_related']) { $this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc; } - Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]); + //$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); return; case 401: @@ -511,7 +512,7 @@ trait ResponseHandler switch ($response['error_code']) { case 48: $this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt']; - Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; case 16: diff --git a/src/danog/MadelineProto/Stream/Common/HashedBufferedStream.php b/src/danog/MadelineProto/Stream/Common/HashedBufferedStream.php index 130b9c65..77e42bc4 100644 --- a/src/danog/MadelineProto/Stream/Common/HashedBufferedStream.php +++ b/src/danog/MadelineProto/Stream/Common/HashedBufferedStream.php @@ -24,7 +24,6 @@ use danog\MadelineProto\Stream\Async\Stream; use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\BufferInterface; use danog\MadelineProto\Stream\ConnectionContext; -use function Amp\call; /** * Hash stream wrapper. @@ -275,7 +274,7 @@ class HashedBufferedStream implements BufferedProxyStreamInterface, BufferInterf return $this->read_buffer->bufferRead($length); } - return call($this->bufferReadAsync($length)); + return $this->call($this->bufferReadAsync($length)); } /**