Huge bugfixes, avoid problems with unbound keys and add more config loops

This commit is contained in:
Daniil Gentili 2019-09-17 21:35:53 +02:00
parent f6573be332
commit 8147d5fca5
14 changed files with 276 additions and 64 deletions

View File

@ -23,10 +23,15 @@ use Amp\Deferred;
use Amp\Promise;
use danog\MadelineProto\Loop\Connection\CheckLoop;
use danog\MadelineProto\Loop\Connection\HttpWaitLoop;
use danog\MadelineProto\Loop\Connection\PingLoop;
use danog\MadelineProto\Loop\Connection\ReadLoop;
use danog\MadelineProto\Loop\Connection\WriteLoop;
use danog\MadelineProto\MTProtoSession\Session;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\Stream\Transport\WssStream;
use danog\MadelineProto\Stream\Transport\WsStream;
/**
* Connection class.
@ -64,6 +69,12 @@ class Connection extends Session
* @var \danog\MadelineProto\Loop\Connection\HttpWaitLoop
*/
protected $waiter;
/**
* Ping loop.
*
* @var \danog\MadelineProto\Loop\Connection\PingLoop
*/
protected $pinger;
/**
* The actual socket.
*
@ -137,6 +148,32 @@ class Connection extends Session
*/
private $datacenterId = '';
/**
* Whether this socket has to be reconnected.
*
* @var boolean
*/
private $needsReconnect = false;
/**
* Indicate if this socket needs to be reconnected.
*
* @param boolean $needsReconnect Whether the socket has to be reconnected
*
* @return void
*/
public function needReconnect(bool $needsReconnect)
{
$this->needsReconnect = $needsReconnect;
}
/**
* Whether this sockets needs to be reconnected.
*
* @return boolean
*/
public function shouldReconnect(): bool
{
return $this->needsReconnect;
}
/**
* Check if the socket is writing stuff.
*
@ -190,9 +227,9 @@ class Connection extends Session
/**
* Get the receive date of the latest chunk of data from the socket.
*
* @return void
* @return int
*/
public function getLastChunk()
public function getLastChunk(): int
{
return $this->lastChunk;
}
@ -311,9 +348,11 @@ class Connection extends Session
$ctx->setReadCallback([$this, 'haveRead']);
$this->stream = yield $ctx->getStream();
if (isset($this->old)) {
unset($this->old);
if ($this->needsReconnect) {
$this->needsReconnect = false;
}
$this->httpReqCount = 0;
$this->httpResCount = 0;
if (!isset($this->writer)) {
$this->writer = new WriteLoop($this);
@ -327,6 +366,9 @@ class Connection extends Session
if (!isset($this->waiter)) {
$this->waiter = new HttpWaitLoop($this);
}
if (!isset($this->pinger) && ($this->ctx->hasStreamName(WssStream::getName()) || $this->ctx->hasStreamName(WsStream::getName()))) {
$this->pinger = new PingLoop($this);
}
foreach ($this->new_outgoing as $message_id) {
if ($this->outgoing_messages[$message_id]['unencrypted']) {
$promise = $this->outgoing_messages[$message_id]['promise'];
@ -336,8 +378,6 @@ class Connection extends Session
unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]);
}
}
$this->httpReqCount = 0;
$this->httpResCount = 0;
$this->writer->start();
$this->reader->start();
@ -345,6 +385,9 @@ class Connection extends Session
$this->checker->resume();
}
$this->waiter->start();
if ($this->pinger) {
$this->pinger->start();
}
}
/**
@ -428,7 +471,7 @@ class Connection extends Session
}
}
/**
* Resume HttpWaiter
* Resume HttpWaiter.
*
* @return void
*/
@ -437,6 +480,9 @@ class Connection extends Session
if (isset($this->waiter)) {
$this->waiter->resume();
}
if (isset($this->pinger)) {
$this->pinger->resume();
}
}
/**
* Connect main instance.
@ -482,8 +528,8 @@ class Connection extends Session
public function disconnect()
{
$this->API->logger->logger("Disconnecting from DC {$this->datacenterId}");
$this->old = true;
foreach (['reader', 'writer', 'checker', 'waiter', 'updater'] as $loop) {
$this->needsReconnect = true;
foreach (['reader', 'writer', 'checker', 'waiter', 'updater', 'pinger'] as $loop) {
if (isset($this->{$loop}) && $this->{$loop}) {
$this->{$loop}->signal($loop === 'reader' ? new NothingInTheSocketException() : true);
}

View File

@ -189,7 +189,7 @@ class DataCenter
foreach ($this->sockets as $key => $socket) {
if ($socket instanceof DataCenterConnection && !\strpos($key, '_bk')) {
//$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE);
$socket->old = true;
$socket->needReconnect(true);
$socket->setExtra($this->API);
$socket->disconnect();
} else {
@ -496,8 +496,8 @@ class DataCenter
public function dcConnectAsync(string $dc_number, int $id = -1): \Generator
{
$old = isset($this->sockets[$dc_number]) && (
isset($this->sockets[$dc_number]->old) ||
($id !== -1 && isset($this->sockets[$dc_number]->getConnection($id)->old))
$this->sockets[$dc_number]->shouldReconnect() ||
($id !== -1 && $this->sockets[$dc_number]->hasConnection($id) && $this->sockets[$dc_number]->getConnection($id)->shouldReconnect())
);
if (isset($this->sockets[$dc_number]) && !$old) {
return false;
@ -903,6 +903,18 @@ class DataCenter
return $this->sockets[$datacenter]->isHttp();
}
/**
* Check if connected to datacenter directly using IP address.
*
* @param string $datacenter DC ID
*
* @return boolean
*/
public function byIPAddress(string $datacenter): bool
{
return $this->sockets[$datacenter]->byIPAddress();
}
/**
* Get all DCs.
*

View File

@ -25,6 +25,7 @@ use danog\MadelineProto\MTProto\TempAuthKey;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\Stream\Transport\WssStream;
use JsonSerializable;
class DataCenterConnection implements JsonSerializable
@ -113,6 +114,33 @@ class DataCenterConnection implements JsonSerializable
* @var array
*/
private $backup = [];
/**
* Whether this socket has to be reconnected.
*
* @var boolean
*/
private $needsReconnect = false;
/**
* Indicate if this socket needs to be reconnected.
*
* @param boolean $needsReconnect Whether the socket has to be reconnected
*
* @return void
*/
public function needReconnect(bool $needsReconnect)
{
$this->needsReconnect = $needsReconnect;
}
/**
* Whether this sockets needs to be reconnected.
*
* @return boolean
*/
public function shouldReconnect(): bool
{
return $this->needsReconnect;
}
/**
* Get auth key.
*
@ -445,9 +473,22 @@ class DataCenterConnection implements JsonSerializable
return $this->connections[0];
}
/**
* Check if any connection is available.
*
* @param integer $id Connection ID
*
* @return boolean
*/
public function hasConnection(int $id = -1): bool
{
return $id < 0 ? \count($this->connections) : isset($this->connections[$id]);
}
/**
* Get best socket in round robin.
*
* @param integer $id Connection ID, for manual fetching
*
* @return Connection
*/
public function getConnection(int $id = -1): Connection
@ -548,6 +589,16 @@ class DataCenterConnection implements JsonSerializable
return \in_array($this->ctx->getStreamName(), [HttpStream::getName(), HttpsStream::getName()]);
}
/**
* Check if is connected directly by IP address.
*
* @return boolean
*/
public function byIPAddress(): bool
{
return !$this->ctx->hasStreamName(WssStream::getName()) && !$this->ctx->hasStreamName(HttpsStream::getName());
}
/**
* Check if is a media connection.
*

View File

@ -20,8 +20,6 @@ namespace danog\MadelineProto\Loop\Connection;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
/**
* HttpWait loop.
@ -54,7 +52,6 @@ class HttpWaitLoop extends ResumableSignalLoop
{
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $connection->getDatacenterID();
$this->datacenterConnection = $connection->getShared();
}
@ -74,7 +71,7 @@ class HttpWaitLoop extends ResumableSignalLoop
if (yield $this->waitSignal($this->pause())) {
return;
}
if (!\in_array($connection->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()])) {
if (!$connection->isHttp()) {
return;
}
while (!$shared->hasTempAuthKey()) {

View File

@ -77,7 +77,7 @@ class ReadLoop extends SignalLoop
try {
$error = yield $this->waitSignal($this->readMessage());
} catch (NothingInTheSocketException | StreamException | PendingReadError | \Error $e) {
if (isset($connection->old)) {
if ($connection->shouldReconnect()) {
return;
}
Tools::callForkDefer((function () use ($API, $connection, $datacenter, $e) {
@ -142,7 +142,7 @@ class ReadLoop extends SignalLoop
$connection = $this->connection;
$shared = $this->datacenterConnection;
if (isset($this->connection->old)) {
if ($connection->shouldReconnect()) {
$API->logger->logger('Not reading because connection is old');
throw new NothingInTheSocketException();

View File

@ -22,7 +22,6 @@ use Amp\ByteStream\StreamException;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\Magic;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Tools;
@ -74,7 +73,7 @@ class WriteLoop extends ResumableSignalLoop
$please_wait = false;
while (true) {
while (empty($connection->pending_outgoing) || $please_wait) {
if (isset($connection->old)) {
if ($connection->shouldReconnect()) {
$API->logger->logger('Not writing because connection is old');
return;
}
@ -87,7 +86,7 @@ class WriteLoop extends ResumableSignalLoop
}
$API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE);
if (isset($connection->old)) {
if ($connection->shouldReconnect()) {
$API->logger->logger('Not writing because connection is old');
return;
}
@ -97,7 +96,7 @@ class WriteLoop extends ResumableSignalLoop
try {
$please_wait = yield $this->{$shared->hasTempAuthKey() ? 'encryptedWriteLoopAsync' : 'unencryptedWriteLoopAsync'}();
} catch (StreamException $e) {
if (isset($connection->old)) {
if ($connection->shouldReconnect()) {
return;
}
Tools::callForkDefer((function () use ($API, $connection, $datacenter, $e) {
@ -180,16 +179,19 @@ class WriteLoop extends ResumableSignalLoop
if ($shared->isHttp() && empty($connection->pending_outgoing)) {
return;
}
$temporary_keys = [];
if (\count($to_ack = $connection->ack_queue)) {
foreach (\array_chunk($connection->ack_queue, 8192) as $acks) {
$connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'msgs_ack', 'serialized_body' => yield $this->API->serialize_object_async(['type' => 'msgs_ack'], ['msg_ids' => $acks], 'msgs_ack'), 'content_related' => false, 'unencrypted' => false, 'method' => false];
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msgs_ack', 'serialized_body' => yield $this->API->serialize_object_async(['type' => 'msgs_ack'], ['msg_ids' => $acks], 'msgs_ack'), 'content_related' => false, 'unencrypted' => false, 'method' => false];
$temporary_keys[$connection->pending_outgoing_key] = true;
$API->logger->logger("Adding msgs_ack {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE);
$connection->pending_outgoing_key++;
}
}
$has_http_wait = false;
$messages = [];
$keys = [];
foreach ($connection->pending_outgoing as $message) {
if ($message['_'] === 'http_wait') {
$has_http_wait = true;
@ -197,14 +199,17 @@ class WriteLoop extends ResumableSignalLoop
}
}
if ($shared->isHttp() && !$has_http_wait) {
$connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'http_wait', 'serialized_body' => yield $this->API->serialize_object_async(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'content_related' => true, 'unencrypted' => false, 'method' => true];
$has_http_wait = true;
$API->logger->logger("Adding http_wait {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE);
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'http_wait', 'serialized_body' => yield $this->API->serialize_object_async(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'content_related' => true, 'unencrypted' => false, 'method' => true];
$temporary_keys[$connection->pending_outgoing_key] = true;
$connection->pending_outgoing_key++;
}
$total_length = 0;
$count = 0;
\ksort($connection->pending_outgoing);
$skipped = false;
$inited = false;
foreach ($connection->pending_outgoing as $k => $message) {
if ($message['unencrypted']) {
continue;
@ -232,7 +237,8 @@ class WriteLoop extends ResumableSignalLoop
$MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generate_out_seq_no($message['content_related'])];
if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') {
if (!$shared->getTempAuthKey()->isInited() && $message['_'] !== 'auth.bindTempAuthKey') {
if (!$shared->getTempAuthKey()->isInited() && $message['_'] !== 'auth.bindTempAuthKey' && !$inited) {
$inited = true;
$API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['write_client_info'], $message['_']), \danog\MadelineProto\Logger::NOTICE);
$MTmessage['body'] = yield $API->serialize_method_async(
'invokeWithLayer',
@ -292,6 +298,14 @@ class WriteLoop extends ResumableSignalLoop
$messages[] = $MTmessage;
$keys[$k] = $message_id;
}
if ($shared->isHttp() && $skipped && $count === \count($temporary_keys)) {
foreach ($temporary_keys as $key => $true) {
$API->logger->logger("Removing temporary {$connection->pending_outgoing[$key]['_']} by $key", Logger::ULTRA_VERBOSE);
unset($connection->pending_outgoing[$key]);
$count--;
}
}
$MTmessage = null;
if ($count > 1) {
@ -346,12 +360,6 @@ class WriteLoop extends ResumableSignalLoop
$connection->ack_queue = [];
}
/*if ($has_http_wait) {
$connection->last_http_wait = $sent;
} elseif (Magic::$altervista) {
$connection->last_http_wait = PHP_INT_MAX;
}*/
foreach ($keys as $key => $message_id) {
$connection->outgoing_messages[$message_id] = &$connection->pending_outgoing[$key];
@ -371,7 +379,9 @@ class WriteLoop extends ResumableSignalLoop
//if (!empty($connection->pending_outgoing)) $connection->select();
} while (!empty($connection->pending_outgoing) && !$skipped);
$connection->pending_outgoing_key = 0;
if (empty($connection->pending_outgoing)) {
$connection->pending_outgoing_key = 'a';
}
return $skipped;
}

View File

@ -54,7 +54,6 @@ abstract class Loop implements LoopInterface
return false;
}
return $this->callFork($this->loopImpl());
}

View File

@ -354,12 +354,24 @@ class MTProto extends AsyncConstruct implements TLCallback
* @var \danog\MadelineProto\MTProtoTools\MinDatabase
*/
public $minDatabase;
/**
* TOS check loop.
*
* @var \danog\MadelineProto\Loop\Update\PeriodicLoop
*/
public $checkTosLoop;
/**
* Phone config loop.
*
* @var \danog\MadelineProto\Loop\Update\PeriodicLoop
*/
public $phoneConfigLoop;
/**
* Config loop.
*
* @var \danog\MadelineProto\Loop\Update\PeriodicLoop
*/
public $configLoop;
/**
* Call checker loop.
*
@ -578,12 +590,20 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->serializeLoop = new PeriodicLoop($this, [$this, 'serialize'], 'serialize', $this->settings['serialization']['serialization_interval']);
}
if (!$this->phoneConfigLoop) {
$this->phoneConfigLoop = new PeriodicLoop($this, [$this, 'get_phone_config_async'], 'phone config', 24 * 3600 * 1000);
$this->phoneConfigLoop = new PeriodicLoop($this, [$this, 'get_phone_config_async'], 'phone config', 24 * 3600);
}
if (!$this->checkTosLoop) {
$this->checkTosLoop = new PeriodicLoop($this, [$this, 'check_tos_async'], 'TOS', 24 * 3600);
}
if (!$this->configLoop) {
$this->configLoop = new PeriodicLoop($this, [$this, 'get_config_async'], 'config', 24 * 3600);
}
$this->callCheckerLoop->start();
$this->serializeLoop->start();
$this->phoneConfigLoop->start();
$this->configLoop->start();
$this->checkTosLoop->start();
}
public function stopLoops()
{
@ -599,6 +619,14 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->phoneConfigLoop->signal(true);
$this->phoneConfigLoop = null;
}
if ($this->configLoop) {
$this->configLoop->signal(true);
$this->configLoop = null;
}
if ($this->checkTosLoop) {
$this->checkTosLoop->signal(true);
$this->checkTosLoop = null;
}
}
public function __wakeup()
{
@ -1222,12 +1250,14 @@ class MTProto extends AsyncConstruct implements TLCallback
}
yield $this->all($dcs);
yield $this->init_authorization_async();
yield $this->parse_config_async();
$dcs = [];
foreach ($this->datacenter->get_dcs(false) as $new_dc) {
$dcs[] = $this->datacenter->dcConnectAsync($new_dc);
}
yield $this->all($dcs);
yield $this->init_authorization_async();
yield $this->parse_config_async();
yield $this->get_phone_config_async();
}
@ -1340,7 +1370,7 @@ class MTProto extends AsyncConstruct implements TLCallback
public function get_phone_config_async($watcherId = null)
{
if ($this->authorized === self::LOGGED_IN && \class_exists('\\danog\\MadelineProto\\VoIPServerConfigInternal') && !$this->authorization['user']['bot'] && $this->datacenter->getDataCenterConnection($this->settings['connection_settings']['default_dc'])->hasTempAuthKey()) {
if ($this->authorized === self::LOGGED_IN && \class_exists(VoIPServerConfigInternal::class) && !$this->authorization['user']['bot'] && $this->datacenter->getDataCenterConnection($this->settings['connection_settings']['default_dc'])->hasTempAuthKey()) {
$this->logger->logger('Fetching phone config...');
VoIPServerConfig::updateDefault(yield $this->method_call_async_read('phone.getCallConfig', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]));
} else {
@ -1348,16 +1378,6 @@ class MTProto extends AsyncConstruct implements TLCallback
}
}
public function get_config_async($config = [], $options = [])
{
if ($this->config['expires'] > \time()) {
return $this->config;
}
$this->config = empty($config) ? yield $this->method_call_async_read('help.getConfig', $config, empty($options) ? ['datacenter' => $this->settings['connection_settings']['default_dc']] : $options) : $config;
yield $this->parse_config_async();
return $this->config;
}
public function get_cdn_config_async($datacenter)
{
@ -1375,6 +1395,17 @@ class MTProto extends AsyncConstruct implements TLCallback
}
}
public function get_config_async($config = [], $options = [])
{
if ($this->config['expires'] > \time()) {
return $this->config;
}
$this->config = empty($config) ? yield $this->method_call_async_read('help.getConfig', $config, empty($options) ? ['datacenter' => $this->settings['connection_settings']['default_dc']] : $options) : $config;
yield $this->parse_config_async();
return $this->config;
}
public function parse_config_async()
{
if (isset($this->config['dc_options'])) {
@ -1387,7 +1418,7 @@ class MTProto extends AsyncConstruct implements TLCallback
public function parse_dc_options_async($dc_options)
{
unset($this->settings[$this->config['test_mode']]);
$changed = [];
foreach ($dc_options as $dc) {
$test = $this->config['test_mode'] ? 'test' : 'main';
$id = $dc['id'];
@ -1405,13 +1436,35 @@ class MTProto extends AsyncConstruct implements TLCallback
}
unset($dc['cdn'], $dc['media_only'], $dc['id'], $dc['ipv6']);
if ($dc !== $this->settings['connection'][$test][$ipv6][$id] ?? []) {
$changed[$id] = true;
}
$this->settings['connection'][$test][$ipv6][$id] = $dc;
}
$curdc = $this->datacenter->curdc;
$this->logger->logger('Got new DC options, reconnecting');
yield $this->connect_to_all_dcs_async();
if ($changed) {
$this->logger->logger('Got new DC options, reconnecting');
foreach ($this->datacenter->sockets as $key => $socket) {
if ($socket instanceof DataCenterConnection && isset($changed[$key]) && $socket->byIPAddress()) {
//$this->API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['dc_con_stop'], $key), \danog\MadelineProto\Logger::VERBOSE);
$socket->shouldReconnect(true);
$socket->disconnect();
unset($changed[$key]);
}
}
$dcs = [];
foreach ($this->datacenter->get_dcs() as $new_dc) {
$dcs[] = $this->datacenter->dcConnectAsync($new_dc);
}
yield $this->all($dcs);
yield $this->init_authorization_async();
$dcs = [];
foreach ($this->datacenter->get_dcs(false) as $new_dc) {
$dcs[] = $this->datacenter->dcConnectAsync($new_dc);
}
yield $this->all($dcs);
yield $this->init_authorization_async();
}
$this->datacenter->curdc = $curdc;
}

View File

@ -346,7 +346,7 @@ trait ResponseHandler
if (isset($response['_'])) {
switch ($response['_']) {
case 'rpc_error':
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
if (($request['method'] ?? false) && $request['_'] !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
$this->shared->getTempAuthKey()->init(true);
}
@ -558,7 +558,7 @@ trait ResponseHandler
}
}
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
if (($request['method'] ?? false) && $request['_'] !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
$this->shared->getTempAuthKey()->init(true);
}

View File

@ -560,7 +560,7 @@ trait AuthKeyHandler
$encrypted_message = $datacenterConnection->getPermAuthKey()->getID().$message_key.$this->ige_encrypt($encrypted_data.$padding, $aes_key, $aes_iv);
$res = yield $connection->method_call_async_read('auth.bindTempAuthKey', ['perm_auth_key_id' => $perm_auth_key_id, 'nonce' => $nonce, 'expires_at' => $expires_at, 'encrypted_message' => $encrypted_message], ['msg_id' => $message_id]);
if ($res === true) {
$this->logger->logger('Successfully binded temporary and permanent authorization keys, DC '.$datacenter, \danog\MadelineProto\Logger::NOTICE);
$this->logger->logger('Bound temporary and permanent authorization keys, DC '.$datacenter, \danog\MadelineProto\Logger::NOTICE);
$datacenterConnection->bind();
$datacenterConnection->flush();
@ -705,7 +705,7 @@ trait AuthKeyHandler
$cdn = $socket->isCDN();
$media = $socket->isMedia();
if (!$socket->hasTempAuthKey() || !$socket->hasPermAuthKey()) {
if (!$socket->hasTempAuthKey() || !$socket->hasPermAuthKey() || !$socket->isBound()) {
if (!$socket->hasPermAuthKey() && !$cdn && !$media) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_perm_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);
$socket->setPermAuthKey(yield $this->create_auth_key_async(-1, $id));
@ -728,10 +728,9 @@ trait AuthKeyHandler
$socket->setTempAuthKey(yield $this->create_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id));
yield $this->bind_temp_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id);
$config = yield $connection->method_call_async_read('help.getConfig', []);
$this->config = yield $connection->method_call_async_read('help.getConfig', []);
yield $this->sync_authorization_async($id);
yield $this->get_config_async($config);
} elseif (!$socket->hasTempAuthKey()) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_temp_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);
$socket->setTempAuthKey(yield $this->create_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id));
@ -739,9 +738,8 @@ trait AuthKeyHandler
} else {
if (!$cdn) {
$socket->bind(false);
$config = yield $connection->method_call_async_read('help.getConfig', []);
$this->config = yield $connection->method_call_async_read('help.getConfig', []);
yield $this->sync_authorization_async($id);
yield $this->get_config_async($config);
} elseif (!$socket->hasTempAuthKey()) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_temp_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);
$socket->setTempAuthKey(yield $this->create_auth_key_async($this->settings['authorization']['default_temp_auth_key_expires_in'], $id));

View File

@ -263,7 +263,8 @@ trait UpdateHandler
}
if ($update['_'] === 'updateDcOptions') {
$this->logger->logger('Got new dc options', \danog\MadelineProto\Logger::VERBOSE);
yield $this->parse_dc_options_async($update['dc_options']);
$this->config['dc_options'] = $update['dc_options'];
yield $this->parse_config_async();
return;
}

View File

@ -423,6 +423,23 @@ class ConnectionContext
return $this->nextStreams[$this->key][0];
}
/**
* Check if has stream within stream chain.
*
* @param string $stream Stream name
*
* @return boolean
*/
public function hasStreamName(string $stream): bool
{
foreach ($this->nextStreams as list($name)) {
if ($name === $stream) {
return true;
}
}
return false;
}
/**
* Get a stream from the stream chain.
*

View File

@ -28,9 +28,11 @@ use Amp\Websocket\Client\Internal\ClientSocket;
use Amp\Websocket\Client\Rfc6455Connection;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\Rfc7692CompressionFactory;
use danog\MadelineProto\API;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\RawStreamInterface;
use danog\MadelineProto\Stream\RawProxyStreamInterface;
use function Amp\Websocket\generateKey;
use function Amp\Websocket\validateAcceptForKey;
@ -39,11 +41,27 @@ use function Amp\Websocket\validateAcceptForKey;
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class WsStream implements RawStreamInterface
class WsStream implements RawProxyStreamInterface
{
use RawStream;
/**
* API instance.
*
* @var MTProto
*/
private $API;
/**
* Websocket stream.
*
* @var Rfc6455Connection
*/
private $stream;
/**
* Websocket message.
*
* @var Message
*/
private $message;
/**
@ -209,6 +227,16 @@ class WsStream implements RawStreamInterface
return null;
}
/**
* Set API instance.
*
* @param MTProto $extra
* @return void
*/
public function setExtra($extra)
{
$this->API = $extra;
}
/**
* {@inheritdoc}
*

View File

@ -20,7 +20,7 @@
namespace danog\MadelineProto\Wrappers;
/**
* Manages logging in and out.
* Manages terms of service.
*/
trait TOS
{