This commit is contained in:
Daniil Gentili 2019-05-12 20:33:18 +02:00
parent 714ade6fbc
commit f7bd6a922e
25 changed files with 82 additions and 153 deletions

View File

@ -54,7 +54,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler
}
}
}
$settings = ['logger' => ['logger_level' => 5], 'connection_settings' => ['all' => ['proxy' => '\\Socket', 'protocol' => 'tcp_abridged']]];
$settings = ['logger' => ['logger_level' => 5], 'connection_settings' => ['all' => ['proxy' => '\\Socket', 'protocol' => 'tcp_abridged', 'ipv6' => false]]];
$MadelineProto = new \danog\MadelineProto\API('bot.madeline', $settings);

View File

@ -292,7 +292,7 @@ if (!class_exists('\\danog\\MadelineProto\\VoIPServerConfig')) die('Install the
'audio_congestion_window' => 4 * 1024,
]
);
$MadelineProto = new \danog\MadelineProto\API('session.madeline', ['secret_chats' => ['accept_chats' => false], 'logger' => ['logger' => 3, 'logger_param' => getcwd() . '/MadelineProto.log']]);
$MadelineProto = new \danog\MadelineProto\API('session.madeline', ['connection_settings' => ['all' => ['ipv6' => false]], 'secret_chats' => ['accept_chats' => false], 'logger' => ['logger' => 3, 'logger_param' => getcwd() . '/MadelineProto.log']]);
$MadelineProto->start();
if (!isset($MadelineProto->programmed_call)) {

View File

@ -33,7 +33,6 @@ use Amp\Failure;
use Amp\Internal;
use Amp\Promise;
use Amp\Success;
use React\Promise\PromiseInterface as ReactPromise;
/**
@ -64,12 +63,6 @@ final class Coroutine implements Promise
*/
public function __construct(\Generator $generator)
{
$backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 3);
$frame = '';
$frame .= basename($backtrace[1]['file']).': '.$backtrace[2]['function'];
$this->frames []= $frame;
var_dump($frame);
$this->generator = $generator;
try {
@ -87,12 +80,9 @@ final class Coroutine implements Promise
$this->resolve(null);
}
return;
}
$yielded = $this->transform($yielded);
} else {
var_dump(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS));
}
} catch (\Throwable $exception) {
$this->fail($exception);
@ -149,6 +139,7 @@ final class Coroutine implements Promise
} catch (\Throwable $exception) {
$this->fail($exception);
$this->onResolve = null;
} finally {
$this->exception = null;
$this->value = null;
@ -168,6 +159,9 @@ final class Coroutine implements Promise
private function transform($yielded): Promise
{
try {
if ($yielded instanceof \Generator) {
return new self($yielded);
}
if (\is_array($yielded)) {
foreach ($yielded as &$val) {
if ($val instanceof \Generator) {
@ -177,9 +171,6 @@ final class Coroutine implements Promise
return Promise\all($yielded);
}
if ($yielded instanceof \Generator) {
return new self($yielded);
}
if ($yielded instanceof ReactPromise) {
return Promise\adapt($yielded);

View File

@ -38,8 +38,6 @@ class CheckLoop extends ResumableSignalLoop
$this->startedLoop();
$API->logger->logger("Entered check loop in DC {$datacenter}", Logger::ULTRA_VERBOSE);
$try_count = 0;
$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
while (empty($connection->new_outgoing)) {
@ -49,18 +47,19 @@ class CheckLoop extends ResumableSignalLoop
return;
}
$try_count = 0;
}
if ($connection->hasPendingCalls()) {
$last_recv = $connection->last_recv;
$last_recv = $connection->get_max_id(true);
if ($connection->temp_auth_key !== null) {
$message_ids = array_values($connection->new_outgoing);
$deferred = new Deferred();
$deferred->promise()->onResolve(
function ($e, $result) use ($message_ids, $API, $connection, $datacenter) {
if ($e) {
throw $e;
$API->logger("Got exception in check loop for DC $datacenter");
$API->logger((string) $e);
return;
}
$reply = [];
foreach (str_split($result['info']) as $key => $chr) {
@ -104,7 +103,7 @@ class CheckLoop extends ResumableSignalLoop
}
}
if ($reply) {
$API->object_call_async('msg_resend_ans_req', ['msg_ids' => $reply], ['datacenter' => $datacenter, 'postpone' => true]);
$this->callFork($API->object_call_async('msg_resend_ans_req', ['msg_ids' => $reply], ['datacenter' => $datacenter, 'postpone' => true]));
}
$connection->writer->resume();
}
@ -136,8 +135,7 @@ class CheckLoop extends ResumableSignalLoop
}
//var_dumP("after ".(time() - $t).", with timeout ".$timeout);
$try_count++;
if ($connection->last_recv === $last_recv) {
if ($connection->get_max_id(true) === $last_recv) {
$API->logger->logger("Reconnecting and exiting check loop on DC $datacenter");
$this->exitedLoop();
yield $connection->reconnect();
@ -151,7 +149,6 @@ class CheckLoop extends ResumableSignalLoop
return;
}
$try_count = 0;
}
}
}

View File

@ -104,11 +104,9 @@ class ReadLoop extends SignalLoop
$this->exitedLoop();
}
$this->startedLoop();
//var_dump(count($connection->incoming_messages));
// Loop::defer(function () use ($datacenter) {
if ($this->API->is_http($datacenter)) {
$this->API->datacenter->sockets[$datacenter]->waiter->resume();
} // });
}
}
}

View File

@ -269,7 +269,7 @@ class WriteLoop extends ResumableSignalLoop
$message_id = $message['msg_id'];
$seq_no = $message['seqno'];
} else {
$API->logger->logger('NO MESSAGE SENT', \danog\MadelineProto\Logger::WARNING);
$API->logger->logger("NO MESSAGE SENT in DC $datacenter", \danog\MadelineProto\Logger::WARNING);
return;
}

View File

@ -53,7 +53,7 @@ abstract class Loop implements LoopInterface
return false;
}
Promise\rethrow($this->call($this->loop()));
$this->callFork($this->loop());
return true;
}

View File

@ -129,7 +129,6 @@ class MTProto implements TLCallback
public $authorized = 0;
public $authorized_dc = -1;
private $rsa_keys = [];
private $last_recv = 0;
private $dh_config = ['version' => 0];
public $chats = [];
public $channel_participants = [];
@ -221,7 +220,7 @@ class MTProto implements TLCallback
public function __sleep()
{
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'last_recv', 'dh_config', 'chats', 'last_stored', 'qres', 'pending_updates', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'pending_updates', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos'];
}
public function isAltervista()

View File

@ -564,7 +564,10 @@ trait AuthKeyHandler
// Creates authorization keys
public function init_authorization_async()
{
var_dump((string) new Exception());
return $this->ainit_authorization_async();
}
public function ainit_authorization_async()
{
if ($this->pending_auth) {
return;
}
@ -595,17 +598,13 @@ trait AuthKeyHandler
}
if ($dcs) {
$first = array_shift($dcs)();
var_dumP("Yielding first ");
var_dump(yield $first);
var_dumP("Yielded first");
yield $first;
}
foreach ($dcs as $id => &$dc) {
$dc = $dc();
}
var_dump("yielding all", array_keys($dcs));
yield $dcs;
var_dump("yielded all");
foreach ($postpone as $id => $socket) {
yield $this->init_authorization_socket_async($id, $socket);
@ -657,9 +656,7 @@ trait AuthKeyHandler
//$socket->authorized = false;
$socket->temp_auth_key = null;
var_dump("creating auth k $id");
$socket->temp_auth_key = yield $this->create_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id);
var_dump("done creating auth k $id");
yield $this->bind_temp_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id);
//$socket->authorized = $authorized;

View File

@ -50,9 +50,11 @@ trait CallHandler
foreach ($message_ids as $message_id) {
if (isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['body'])) {
$this->datacenter->sockets[$new_datacenter]->sendMessage($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id], false);
$this->callFork($this->datacenter->sockets[$new_datacenter]->sendMessage($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id], false));
$this->ack_outgoing_message_id($message_id, $old_datacenter);
$this->got_response_for_outgoing_message_id($message_id, $old_datacenter);
} else {
$this->logger->logger("Could not resend ".isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_']) ? $this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_'] : $message_id);
}
}
if (!$postpone) {
@ -62,12 +64,10 @@ trait CallHandler
public function method_call($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
{
$promise = $this->method_call_async_read($method, $args, $aargs);
return $this->wait($promise);
return $this->wait($this->method_call_async_read($method, $args, $aargs));
}
public function method_call_async_read($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): Promise
public function method_call_async_read($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
{
$deferred = new Deferred();
$this->method_call_async_write($method, $args, $aargs)->onResolve(function ($e, $read_deferred) use ($deferred) {
@ -85,7 +85,7 @@ trait CallHandler
}
});
return isset($aargs['noResponse']) && $aargs['noResponse'] ? new \Amp\Success(0) : $deferred->promise();
return isset($aargs['noResponse']) && $aargs['noResponse'] ? 0 : $deferred->promise();
}
public function method_call_async_write($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): Promise
@ -168,7 +168,7 @@ trait CallHandler
return $deferred;
}
public function object_call_async($object, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): Promise
public function object_call_async($object, $args = [], $aargs = ['msg_id' => null, 'heavy' => false])
{
$message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key === null, 'method' => false];
if (isset($aargs['promise'])) {

View File

@ -140,7 +140,7 @@ trait PeerHandler
public function cache_pwr_chat($id, $full_fetch, $send)
{
$this->call((function () use ($id, $full_fetch, $send) {
$this->callFork((function () use ($id, $full_fetch, $send) {
try {
yield $this->get_pwr_chat_async($id, $full_fetch, $send);
} catch (\danog\MadelineProto\Exception $e) {

View File

@ -454,7 +454,7 @@ class ReferenceDatabase implements TLCallback
}
}
public function refreshReference(int $locationType, array $location): Promise
public function refreshReference(int $locationType, array $location)
{
return $this->refreshReferenceInternal($this->serializeLocation($locationType, $location));
}
@ -525,18 +525,10 @@ class ReferenceDatabase implements TLCallback
throw new Exception('Did not refresh reference');
}
public function populateReference(array $object): Promise
public function populateReference(array $object)
{
$deferred = new Deferred();
$this->getReference(self::LOCATION_CONTEXT[$object['_']], $object)->onResolve(function ($e, $res) use ($deferred, $object) {
if ($e) {
throw $e;
}
$object['file_reference'] = $res;
$deferred->resolve($object);
});
return $deferred->promise();
$object['file_reference'] = yield $this->getReference(self::LOCATION_CONTEXT[$object['_']], $object);
return $object;
}
public function getReference(int $locationType, array $location)

View File

@ -114,7 +114,7 @@ trait ResponseHandler
// Acknowledge that I received the server's response
if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) {
$this->call($this->get_updates_difference_async());
$this->callFork($this->get_updates_difference_async());
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
@ -169,7 +169,7 @@ trait ResponseHandler
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->call($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
break;
case 'msgs_all_info':
@ -200,7 +200,7 @@ trait ResponseHandler
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->call($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
}
}
break;
@ -212,7 +212,7 @@ trait ResponseHandler
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->call($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter]));
}
break;
case 'msg_resend_req':
@ -231,7 +231,7 @@ trait ResponseHandler
$this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter]);
}
} else {
$this->call($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
}
break;
case 'msg_resend_ans_req':
@ -239,10 +239,10 @@ trait ResponseHandler
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->call($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']) && isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']])) {
$this->call($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter]));
$this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter]));
}
}
break;
@ -257,7 +257,7 @@ trait ResponseHandler
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
if (strpos($datacenter, 'cdn') === false) {
$this->call($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']));
$this->callFork($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']));
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
@ -398,7 +398,7 @@ trait ResponseHandler
$this->authorized = self::NOT_LOGGED_IN;
$this->authorization = null;
$this->call((function () use ($datacenter, &$request, &$response) {
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code']));
@ -410,7 +410,7 @@ trait ResponseHandler
if ($this->authorized !== self::LOGGED_IN) {
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->call((function () use ($datacenter, &$request, &$response) {
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code']));
@ -445,7 +445,7 @@ trait ResponseHandler
$this->authorized = self::NOT_LOGGED_IN;
$this->authorization = null;
$this->call((function () use ($datacenter, &$request, &$response) {
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code']));
@ -453,7 +453,7 @@ trait ResponseHandler
return;
}
$this->call((function () use ($request_id, $datacenter) {
$this->callFork((function () use ($request_id, $datacenter) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
@ -464,7 +464,7 @@ trait ResponseHandler
$this->logger->logger('Temporary auth key not bound, resetting temporary auth key...', \danog\MadelineProto\Logger::ERROR);
$this->datacenter->sockets[$datacenter]->temp_auth_key = null;
$this->call((function () use ($request_id, $datacenter) {
$this->callFork((function () use ($request_id, $datacenter) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
})());
@ -517,7 +517,7 @@ trait ResponseHandler
$this->logger->logger('Set time delta to '.$this->datacenter->sockets[$datacenter]->time_delta, \danog\MadelineProto\Logger::WARNING);
$this->reset_session();
$this->datacenter->sockets[$datacenter]->temp_auth_key = null;
$this->call((function () use ($datacenter, $request_id) {
$this->callFork((function () use ($datacenter, $request_id) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
})());
@ -543,7 +543,7 @@ trait ResponseHandler
$botAPI = isset($request['botAPI']) && $request['botAPI'];
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->call((
$this->callFork((
function () use ($request_id, $response, $datacenter, $botAPI) {
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$this->logger->logger("Deferred: sent $r to deferred");

View File

@ -631,7 +631,7 @@ trait UpdateHandler
return false;
}
$this->call((function () use ($payload) {
$this->callFork((function () use ($payload) {
$request = (new Request($this->hook_url, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload);
$result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody();

View File

@ -1,36 +0,0 @@
<?php
/**
* PayloadStream.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto;
use Amp\ByteStream\InputStream;
use Amp\Promise;
/**
* PayloadStream.
*
* Represents an MTProto payload
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class PayloadStream implements InputStream
{
public function read(): Promise
{
}
}

View File

@ -19,7 +19,6 @@
namespace danog\MadelineProto\Stream\Async;
use Amp\Promise;
use function Amp\call;
/**
* Buffer helper trait.
@ -32,11 +31,11 @@ trait Buffer
{
public function bufferRead(int $length): Promise
{
return call([$this, 'bufferReadAsync'], $length);
return $this->call($this->bufferReadAsync($length));
}
public function bufferWrite(string $data): Promise
{
return call([$this, 'bufferWriteAsync'], $data);
return $this->call($this->bufferWriteAsync($data));
}
}

View File

@ -18,11 +18,9 @@
namespace danog\MadelineProto\Stream\Async;
use Amp\Coroutine;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
/**
* Buffered stream helper trait.
@ -44,19 +42,7 @@ trait BufferedStream
*/
public function getReadBuffer(&$length): Promise
{
try {
$result = $this->getReadBufferAsync($length);
} catch (\Throwable $exception) {
return new Failure($exception);
}
if ($result instanceof \Generator) {
return new Coroutine($result);
}
if ($result instanceof Promise) {
return $result;
}
return new Success($result);
return $this->call($this->getReadBufferAsync($length));
}
/**
@ -69,6 +55,6 @@ trait BufferedStream
*/
public function getWriteBuffer(int $length, string $append = ''): Promise
{
return call([$this, 'getWriteBufferAsync'], $length, $append);
return $this->call($this->getWriteBufferAsync($length, $append));
}
}

View File

@ -19,7 +19,6 @@
namespace danog\MadelineProto\Stream\Async;
use Amp\Promise;
use function Amp\call;
/**
* Raw stream helper trait.
@ -34,16 +33,16 @@ trait RawStream
public function read(): Promise
{
return call([$this, 'readAsync']);
return $this->call($this->readAsync());
}
public function write(string $data): Promise
{
return call([$this, 'writeAsync'], $data);
return $this->call($this->writeAsync($data));
}
public function end(string $finalData = ''): Promise
{
return call([$this, 'endAsync'], $finalData);
return $this->call($this->endAsync($finalData));
}
}

View File

@ -21,6 +21,7 @@ namespace danog\MadelineProto\Stream\Async;
use Amp\Promise;
use danog\MadelineProto\Stream\ConnectionContext;
use function Amp\call;
use danog\MadelineProto\Tools;
/**
* Generic stream helper trait.
@ -31,8 +32,9 @@ use function Amp\call;
*/
trait Stream
{
use Tools;
public function connect(ConnectionContext $ctx, string $header = ''): Promise
{
return call([$this, 'connectAsync'], $ctx, $header);
return $this->call($this->connectAsync($ctx, $header));
}
}

View File

@ -155,7 +155,7 @@ class BufferedRawStream implements \danog\MadelineProto\Stream\BufferedStreamInt
return new Success(fread($this->memory_stream, $length));
}
return call([$this, 'bufferReadAsync'], $length);
return $this->call($this->bufferReadAsync($length));
}
/**

View File

@ -275,7 +275,7 @@ class HashedBufferedStream implements BufferedProxyStreamInterface, BufferInterf
return $this->read_buffer->bufferRead($length);
}
return call([$this, 'bufferReadAsync'], $length);
return call($this->bufferReadAsync($length));
}
/**

View File

@ -22,7 +22,6 @@ use Amp\CancellationToken;
use Amp\Promise;
use Amp\Socket\ClientConnectContext;
use Amp\Uri\Uri;
use function Amp\call;
/**
* Connection context class.
@ -321,15 +320,6 @@ class ConnectionContext
return $this->nextStreams[$this->key][0];
}
/**
* Get a stream from the stream chain.
*
* @return Promise
*/
public function getStream(string $buffer = ''): Promise
{
return call([$this, 'getStreamAsync'], $buffer);
}
/**
* Get a stream from the stream chain.
@ -338,7 +328,7 @@ class ConnectionContext
*
* @return \Generator
*/
public function getStreamAsync(string $buffer = ''): \Generator
public function getStream(string $buffer = ''): \Generator
{
list($clazz, $extra) = $this->nextStreams[$this->key--];
$obj = new $clazz();

View File

@ -850,7 +850,7 @@ trait TL
if (isset($this->tl_callbacks[TLCallback::CONSTRUCTOR_CALLBACK][$x['_']])) {
foreach ($this->tl_callbacks[TLCallback::CONSTRUCTOR_CALLBACK][$x['_']] as $callback) {
$this->call($callback($x));
$this->callFork($callback($x));
}
} elseif ($x['_'] === 'rpc_result'
&& isset($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_'])

View File

@ -23,6 +23,7 @@ use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use function Amp\Promise\wait;
use Amp\Failure;
/**
* Some tools.
@ -220,19 +221,33 @@ trait Tools
return $promise;
}
public function callFork($promise)
{
$this->call($promise)->onResolve(function ($e, $res) {
if ($e) {
$this->rethrow($e);
}
});
}
public function rethrow($e)
{
$logger = isset($this->logger) ? $this->logger : Logger::$default;
$logger->logger("Got the following exception within a forked strand, trying to rethrow");
$logger->logger((string) $e);
Promise\rethrow(new Failure($e));
}
public function after($a, $b)
{
$a = $this->call($a());
$deferred = new Deferred();
$a->onResolve(function ($e, $res) use ($b, $deferred) {
if ($e) {
throw $e;
return $this->rethrow($e);
}
$b = $this->call($b());
$b->onResolve(static function ($e, $res) use ($deferred) {
if ($e) {
throw $e;
return $this->rethrow($e);
}
$deferred->resolve($res);
});

View File

@ -109,7 +109,7 @@ trait Loop
foreach ($updates as $update) {
$r = $this->settings['updates']['callback']($update);
if (is_object($r)) {
\Amp\Promise\rethrow($this->call($r));
$this->callFork($r);
}
}
$updates = [];