This commit is contained in:
Daniil Gentili 2019-09-02 14:37:30 +02:00
parent ea2b28dbb1
commit 0f7ffc2da7
17 changed files with 332 additions and 215 deletions

View File

@ -71,7 +71,7 @@ class Connection extends Session
*
* @var Stream
*/
private $stream;
public $stream;
/**
* Connection context.
*
@ -126,30 +126,18 @@ class Connection extends Session
protected $datacenter;
/**
* Whether the socket is reading data.
* Connection ID
*
* @var boolean
* @var int
*/
private $reading = false;
/**
* Whether the socket is writing data.
*
* @var boolean
*/
private $writing = false;
private $id = 0;
/**
* Writing callback.
* DC ID and connection ID concatenated
*
* @var callable
* @var
*/
private $writingCallback;
/**
* Reading callback.
*
* @var callable
*/
private $readingCallback;
private $datacenterId = '';
/**
* Check if the socket is writing stuff.
@ -178,8 +166,7 @@ class Connection extends Session
*/
public function writing(bool $writing)
{
$this->writing = $writing;
($this->writingCallback)($writing);
$this->shared->writing($writing, $this->id);
}
/**
* Set reading boolean.
@ -190,8 +177,7 @@ class Connection extends Session
*/
public function reading(bool $reading)
{
$this->reading = $reading;
($this->readingCallback)($writing);
$this->shared->reading($reading, $this->id);
}
/**
@ -250,6 +236,25 @@ class Connection extends Session
return $this->httpReqCount;
}
/**
* Get connection ID
*
* @return integer
*/
public function getID(): int
{
return $this->id;
}
/**
* Get datacenter concatenated with connection ID
*
* @return string
*/
public function getDatacenterID(): string
{
return $this->datacenterId;
}
/**
* Get connection context.
@ -276,6 +281,7 @@ class Connection extends Session
$this->ctx = $ctx->getCtx();
$this->datacenter = $ctx->getDc();
$this->datacenterId = $this->datacenter.'.'.$this->id;
$this->stream = yield $ctx->getStream();
if (isset($this->old)) {
unset($this->old);
@ -395,17 +401,15 @@ class Connection extends Session
/**
* Connect main instance.
*
* @param DataCenterConnection $extra Extra
* @param callable $readingCallback Read callback
* @param callable $writingCallback Write callback
* @param DataCenterConnection $extra Shared instance
* @param int $id Connection ID
*
* @return void
*/
public function setExtra(DataCenterConnection $extra, $readingCallback, $writingCallback)
public function setExtra(DataCenterConnection $extra, int $id)
{
$this->shared = $extra;
$this->readingCallback = $readingCallback;
$this->writingCallback = $writingCallback;
$this->id = $id;
$this->API = $extra->getExtra();
$this->logger = $this->API->logger;
}
@ -420,6 +424,16 @@ class Connection extends Session
return $this->API;
}
/**
* Get shared connection instance.
*
* @return DataCenterConnection
*/
public function getShared(): DataCenterConnection
{
return $this->shared;
}
/**
* Disconnect from DC.
*
@ -427,7 +441,7 @@ class Connection extends Session
*/
public function disconnect()
{
$this->API->logger->logger("Disconnecting from DC {$this->datacenter}");
$this->API->logger->logger("Disconnecting from DC {$this->datacenterId}");
$this->old = true;
foreach (['reader', 'writer', 'checker', 'waiter', 'updater'] as $loop) {
if (isset($this->{$loop}) && $this->{$loop}) {
@ -441,7 +455,7 @@ class Connection extends Session
$this->API->logger->logger($e);
}
}
$this->API->logger->logger("Disconnected from DC {$this->datacenter}");
$this->API->logger->logger("Disconnected from DC {$this->datacenterId}");
}
/**
@ -451,7 +465,7 @@ class Connection extends Session
*/
public function reconnect(): \Generator
{
$this->API->logger->logger("Reconnecting DC {$this->datacenter}");
$this->API->logger->logger("Reconnecting DC {$this->datacenterId}");
$this->disconnect();
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc());
if ($this->API->hasAllAuth() && !$this->hasPendingCalls()) {
@ -474,16 +488,4 @@ class Connection extends Session
{
return __CLASS__;
}
/**
* Sleep function.
*
* @internal
*
* @return array
*/
public function __sleep()
{
return ['peer_tag', 'temp_auth_key', 'auth_key', 'session_id', 'session_out_seq_no', 'session_in_seq_no', 'max_incoming_id', 'max_outgoing_id', 'authorized', 'ack_queue'];
}
}

View File

@ -879,6 +879,26 @@ class DataCenter
{
return isset($this->sockets[$dc]);
}
/**
* Check if connected to datacenter using HTTP
*
* @param string $datacenter DC ID
*
* @return boolean
*/
public function isHttp(string $datacenter)
{
return $this->sockets[$datacenter]->isHttp();
}
/**
* Get all DCs
*
* @param boolean $all
* @return void
*/
public function get_dcs($all = true)
{
$test = $this->settings['all']['test_mode'] ? 'test' : 'main';

View File

@ -22,6 +22,8 @@ use danog\MadelineProto\AuthKey\AuthKey;
use danog\MadelineProto\AuthKey\PermAuthKey;
use danog\MadelineProto\AuthKey\TempAuthKey;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use JsonSerializable;
class DataCenterConnection implements JsonSerializable
@ -87,6 +89,19 @@ class DataCenterConnection implements JsonSerializable
*/
private $robinLoop;
/**
* Decrement roundrobin weight by this value if busy reading.
*
* @var integer
*/
private $decRead = 1;
/**
* Decrement roundrobin weight by this value if busy writing.
*
* @var integer
*/
private $decWrite = 10;
/**
* Get auth key.
*
@ -294,15 +309,7 @@ class DataCenterConnection implements JsonSerializable
for ($x = 0; $x < $count; $x++) {
$this->availableConnections[$x] = 0;
$this->connections[$x] = new Connection();
$this->connections[$x]->setExtra(
$this,
function (bool $reading) use ($x, $incRead) {
$this->availableConnections[$x] += $reading ? -$incRead : $incRead;
},
function (bool $writing) use ($x) {
$this->availableConnections[$x] += $writing ? -10 : 10;
}
);
$this->connections[$x]->setExtra($this, $x);
yield $this->connections[$x]->connect($ctx);
$ctx = $this->ctx->getCtx();
}
@ -380,6 +387,32 @@ class DataCenterConnection implements JsonSerializable
}
}
/**
* Indicate that one of the sockets is busy reading.
*
* @param boolean $reading Whether we're busy reading
* @param int $x Connection ID
*
* @return void
*/
public function reading(bool $reading, int $x)
{
$this->availableConnections[$x] += $reading ? -$this->decRead : $this->decRead;
}
/**
* Indicate that one of the sockets is busy writing.
*
* @param boolean $writing Whether we're busy writing
* @param int $x Connection ID
*
* @return void
*/
public function writing(bool $writing, int $x)
{
$this->availableConnections[$x] += $writing ? -$this->decWrite : $this->decWrite;
}
/**
* Set main instance.
*
@ -401,6 +434,28 @@ class DataCenterConnection implements JsonSerializable
{
return $this->API;
}
/**
* Check if is an HTTP connection.
*
* @return boolean
*/
public function isHttp()
{
return \in_array($this->ctx->getStreamName(), [HttpStream::getName(), HttpsStream::getName()]);
}
/**
* Get DC-specific settings
*
* @return array
*/
public function getSettings(): array
{
$dc_config_number = isset($this->API->settings['connection_settings'][$this->datacenter]) ? $this->datacenter : 'all';
return $this->API->settings['connection_settings'][$dc_config_number];
}
/**
* JSON serialize function.
*

View File

@ -32,7 +32,7 @@ class CheckLoop extends ResumableSignalLoop
/**
* Connection instance.
*
* @var \danog\Madelineproto\Connection
* @var \danog\MadelineProto\Connection
*/
protected $connection;
/**
@ -42,12 +42,18 @@ class CheckLoop extends ResumableSignalLoop
*/
protected $datacenter;
/**
* DataCenterConnection instance.
*
* @var \danog\MadelineProto\DataCenterConnection
*/
protected $datacenterConnection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
$this->datacenter = $connection->getDatacenterID();
$this->datacenterConnection = $connection->getShared();
}
public function loop()
@ -55,10 +61,9 @@ class CheckLoop extends ResumableSignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$shared = $this->datacenterConnection;
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
$timeout = $shared->getSettings()['timeout'];
while (true) {
while (empty($connection->new_outgoing)) {
if (yield $this->waitSignal($this->pause())) {
@ -70,7 +75,7 @@ class CheckLoop extends ResumableSignalLoop
$last_msgid = $connection->get_max_id(true);
$last_chunk = $connection->getLastChunk();
if ($connection->temp_auth_key !== null) {
if ($shared->hasTempAuthKey()) {
$full_message_ids = $connection->getPendingCalls(); //array_values($connection->new_outgoing);
foreach (\array_chunk($full_message_ids, 8192) as $message_ids) {
$deferred = new Deferred();
@ -124,7 +129,7 @@ class CheckLoop extends ResumableSignalLoop
}
}
if ($reply) {
$this->callFork($API->object_call_async('msg_resend_ans_req', ['msg_ids' => $reply], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($connection->object_call_async('msg_resend_ans_req', ['msg_ids' => $reply], ['postpone' => true]));
}
$connection->writer->resume();
}
@ -135,7 +140,7 @@ class CheckLoop extends ResumableSignalLoop
$list .= $connection->outgoing_messages[$message_id]['_'].', ';
}
$API->logger->logger("Still missing $list on DC $datacenter, sending state request", \danog\MadelineProto\Logger::ERROR);
yield $API->object_call_async('msgs_state_req', ['msg_ids' => $message_ids], ['datacenter' => $datacenter, 'promise' => $deferred]);
yield $connection->object_call_async('msgs_state_req', ['msg_ids' => $message_ids], ['promise' => $deferred]);
}
} else {
foreach ($connection->new_outgoing as $message_id) {

View File

@ -33,7 +33,7 @@ class HttpWaitLoop extends ResumableSignalLoop
/**
* Connection instance.
*
* @var \danog\Madelineproto\Connection
* @var \danog\MadelineProto\Connection
*/
protected $connection;
/**
@ -43,12 +43,20 @@ class HttpWaitLoop extends ResumableSignalLoop
*/
protected $datacenter;
/**
* DataCenterConnection instance.
*
* @var \danog\MadelineProto\DataCenterConnection
*/
protected $datacenterConnection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
$this->datacenter = $connection->getDatacenterID();
$this->datacenterConnection = $connection->getShared();
}
public function loop()
@ -56,12 +64,12 @@ class HttpWaitLoop extends ResumableSignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$shared = $this->datacenterConnection;
if (!\in_array($connection->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()])) {
if (!$shared->isHttp()) {
return;
}
$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
if (yield $this->waitSignal($this->pause())) {
return;
@ -69,7 +77,7 @@ class HttpWaitLoop extends ResumableSignalLoop
if (!\in_array($connection->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()])) {
return;
}
while ($connection->temp_auth_key === null) {
while (!$shared->hasTempAuthKey()) {
if (yield $this->waitSignal($this->pause())) {
return;
}

View File

@ -42,9 +42,15 @@ class ReadLoop extends SignalLoop
/**
* Connection instance.
*
* @var \danog\Madelineproto\Connection
* @var \danog\MadelineProto\Connection
*/
protected $connection;
/**
* DataCenterConnection instance.
*
* @var \danog\MadelineProto\DataCenterConnection
*/
protected $datacenterConnection;
/**
* DC ID.
*
@ -57,7 +63,8 @@ class ReadLoop extends SignalLoop
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
$this->datacenter = $connection->getDatacenterID();
$this->datacenterConnection = $connection->getShared();
}
public function loop()
@ -65,8 +72,8 @@ class ReadLoop extends SignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$shared = $this->datacenterConnection;
//$timeout = $API->settings['connection_settings'][isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all']['timeout'];
while (true) {
try {
$error = yield $this->waitSignal($this->readMessage());
@ -84,14 +91,14 @@ class ReadLoop extends SignalLoop
$this->exitedLoop();
if ($error === -404) {
if ($connection->temp_auth_key !== null) {
if ($shared->getTemp) {
$API->logger->logger("WARNING: Resetting auth key in DC {$datacenter}...", \danog\MadelineProto\Logger::WARNING);
$connection->temp_auth_key = null;
$shared->setTempAuthKey(null);
$connection->session_id = null;
foreach ($connection->new_outgoing as $message_id) {
$connection->outgoing_messages[$message_id]['sent'] = 0;
}
yield $connection->reconnect();
yield $shared->reconnect();
yield $API->init_authorization_async();
} else {
yield $connection->reconnect();
@ -115,7 +122,7 @@ class ReadLoop extends SignalLoop
Loop::defer([$connection, 'handle_messages']);
if ($this->API->is_http($datacenter)) {
if ($shared->isHttp()) {
Loop::defer([$connection->waiter, 'resume']);
}
}
@ -126,6 +133,8 @@ class ReadLoop extends SignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$shared = $this->datacenterConnection;
if (isset($this->connection->old)) {
$API->logger->logger('Not reading because connection is old');
@ -152,84 +161,89 @@ class ReadLoop extends SignalLoop
return $payload;
}
$auth_key_id = yield $buffer->bufferRead(8);
if ($auth_key_id === "\0\0\0\0\0\0\0\0") {
$message_id = yield $buffer->bufferRead(8);
if (!\in_array($message_id, [1, 0])) {
$connection->check_message_id($message_id, ['outgoing' => false, 'container' => false]);
}
$message_length = \unpack('V', yield $buffer->bufferRead(4))[1];
$message_data = yield $buffer->bufferRead($message_length);
$left = $payload_length - $message_length - 4 - 8 - 8;
if ($left) {
$API->logger->logger('Padded unencrypted message', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($left < (-$message_length & 15)) {
$API->logger->logger('Protocol padded unencrypted message', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$connection->reading(true);
try {
$auth_key_id = yield $buffer->bufferRead(8);
if ($auth_key_id === "\0\0\0\0\0\0\0\0") {
$message_id = yield $buffer->bufferRead(8);
if (!\in_array($message_id, [1, 0])) {
$connection->check_message_id($message_id, ['outgoing' => false, 'container' => false]);
}
yield $buffer->bufferRead($left);
}
$connection->incoming_messages[$message_id] = [];
} elseif ($auth_key_id === $connection->temp_auth_key['id']) {
$message_key = yield $buffer->bufferRead(16);
list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $connection->temp_auth_key['auth_key'], false);
$encrypted_data = yield $buffer->bufferRead($payload_length - 24);
$message_length = \unpack('V', yield $buffer->bufferRead(4))[1];
$message_data = yield $buffer->bufferRead($message_length);
$left = $payload_length - $message_length - 4 - 8 - 8;
if ($left) {
$API->logger->logger('Padded unencrypted message', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($left < (-$message_length & 15)) {
$API->logger->logger('Protocol padded unencrypted message', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
}
yield $buffer->bufferRead($left);
}
$connection->incoming_messages[$message_id] = [];
} elseif ($auth_key_id === $shared->getTempAuthKey()->getID()) {
$message_key = yield $buffer->bufferRead(16);
list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $shared->getTempAuthKey()->getAuthKey(), false);
$encrypted_data = yield $buffer->bufferRead($payload_length - 24);
$protocol_padding = \strlen($encrypted_data) % 16;
if ($protocol_padding) {
$encrypted_data = \substr($encrypted_data, 0, -$protocol_padding);
}
$decrypted_data = $this->ige_decrypt($encrypted_data, $aes_key, $aes_iv);
/*
$server_salt = substr($decrypted_data, 0, 8);
if ($server_salt != $connection->temp_auth_key['server_salt']) {
$API->logger->logger('WARNING: Server salt mismatch (my server salt '.$connection->temp_auth_key['server_salt'].' is not equal to server server salt '.$server_salt.').', \danog\MadelineProto\Logger::WARNING);
}
*/
$session_id = \substr($decrypted_data, 8, 8);
if ($session_id != $connection->session_id) {
throw new \danog\MadelineProto\Exception('Session id mismatch.');
}
$message_id = \substr($decrypted_data, 16, 8);
$connection->check_message_id($message_id, ['outgoing' => false, 'container' => false]);
$seq_no = \unpack('V', \substr($decrypted_data, 24, 4))[1];
$protocol_padding = \strlen($encrypted_data) % 16;
if ($protocol_padding) {
$encrypted_data = \substr($encrypted_data, 0, -$protocol_padding);
}
$decrypted_data = $this->ige_decrypt($encrypted_data, $aes_key, $aes_iv);
/*
$server_salt = substr($decrypted_data, 0, 8);
if ($server_salt != $shared->getTempAuthKey()->getServerSalt()) {
$API->logger->logger('WARNING: Server salt mismatch (my server salt '.$shared->getTempAuthKey()->getServerSalt().' is not equal to server server salt '.$server_salt.').', \danog\MadelineProto\Logger::WARNING);
}
*/
$session_id = \substr($decrypted_data, 8, 8);
if ($session_id != $connection->session_id) {
throw new \danog\MadelineProto\Exception('Session id mismatch.');
}
$message_id = \substr($decrypted_data, 16, 8);
$connection->check_message_id($message_id, ['outgoing' => false, 'container' => false]);
$seq_no = \unpack('V', \substr($decrypted_data, 24, 4))[1];
$message_data_length = \unpack('V', \substr($decrypted_data, 28, 4))[1];
if ($message_data_length > \strlen($decrypted_data)) {
throw new \danog\MadelineProto\SecurityException('message_data_length is too big');
}
if (\strlen($decrypted_data) - 32 - $message_data_length < 12) {
throw new \danog\MadelineProto\SecurityException('padding is too small');
}
if (\strlen($decrypted_data) - 32 - $message_data_length > 1024) {
throw new \danog\MadelineProto\SecurityException('padding is too big');
}
if ($message_data_length < 0) {
throw new \danog\MadelineProto\SecurityException('message_data_length not positive');
}
if ($message_data_length % 4 != 0) {
throw new \danog\MadelineProto\SecurityException('message_data_length not divisible by 4');
}
$message_data = \substr($decrypted_data, 32, $message_data_length);
if ($message_key != \substr(\hash('sha256', \substr($connection->temp_auth_key['auth_key'], 96, 32).$decrypted_data, true), 8, 16)) {
throw new \danog\MadelineProto\SecurityException('msg_key mismatch');
}
$connection->incoming_messages[$message_id] = ['seq_no' => $seq_no];
} else {
$API->logger->logger('Got unknown auth_key id', \danog\MadelineProto\Logger::ERROR);
$message_data_length = \unpack('V', \substr($decrypted_data, 28, 4))[1];
if ($message_data_length > \strlen($decrypted_data)) {
throw new \danog\MadelineProto\SecurityException('message_data_length is too big');
}
if (\strlen($decrypted_data) - 32 - $message_data_length < 12) {
throw new \danog\MadelineProto\SecurityException('padding is too small');
}
if (\strlen($decrypted_data) - 32 - $message_data_length > 1024) {
throw new \danog\MadelineProto\SecurityException('padding is too big');
}
if ($message_data_length < 0) {
throw new \danog\MadelineProto\SecurityException('message_data_length not positive');
}
if ($message_data_length % 4 != 0) {
throw new \danog\MadelineProto\SecurityException('message_data_length not divisible by 4');
}
$message_data = \substr($decrypted_data, 32, $message_data_length);
if ($message_key != \substr(\hash('sha256', \substr($shared->getTempAuthKey()->getAuthKey(), 96, 32).$decrypted_data, true), 8, 16)) {
throw new \danog\MadelineProto\SecurityException('msg_key mismatch');
}
$connection->incoming_messages[$message_id] = ['seq_no' => $seq_no];
} else {
$API->logger->logger('Got unknown auth_key id', \danog\MadelineProto\Logger::ERROR);
return -404;
return -404;
}
$deserialized = $API->deserialize($message_data, ['type' => '', 'connection' => $connection]);
$API->referenceDatabase->reset();
$connection->incoming_messages[$message_id]['content'] = $deserialized;
$connection->incoming_messages[$message_id]['response'] = -1;
$connection->new_incoming[$message_id] = $message_id;
//$connection->last_http_wait = 0;
$API->logger->logger('Received payload from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} finally {
$connection->reading(false);
}
$deserialized = $API->deserialize($message_data, ['type' => '', 'datacenter' => $datacenter]);
$API->referenceDatabase->reset();
$connection->incoming_messages[$message_id]['content'] = $deserialized;
$connection->incoming_messages[$message_id]['response'] = -1;
$connection->new_incoming[$message_id] = $message_id;
//$connection->last_http_wait = 0;
$API->logger->logger('Received payload from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
return true;
}

View File

@ -39,9 +39,15 @@ class WriteLoop extends ResumableSignalLoop
/**
* Connection instance.
*
* @var \danog\Madelineproto\Connection
* @var \danog\MadelineProto\Connection
*/
protected $connection;
/**
* DataCenterConnection instance.
*
* @var \danog\MadelineProto\DataCenterConnection
*/
protected $datacenterConnection;
/**
* DC ID.
*
@ -52,15 +58,17 @@ class WriteLoop extends ResumableSignalLoop
public function __construct(Connection $connection)
{
$this->connection = $connection;
$this->datacenterConnection = $connection->getShared();
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
$this->datacenter = $connection->getDatacenterID();
}
public function loop(): \Generator
{
$API = $this->API;
$connection = $this->connection;
$shared = $this->datacenterConnection;
$datacenter = $this->datacenter;
$please_wait = false;
@ -74,8 +82,9 @@ class WriteLoop extends ResumableSignalLoop
$API->logger->logger("Done waiting in $this", Logger::ULTRA_VERBOSE);
}
$connection->writing(true);
try {
$please_wait = yield $this->{$connection->temp_auth_key === null ? 'unencryptedWriteLoopAsync' : 'encryptedWriteLoopAsync'}();
$please_wait = yield $this->{$shared->hasTempAuthKey() ? 'encryptedWriteLoopAsync' : 'unencryptedWriteLoopAsync'}();
} catch (StreamException $e) {
if (isset($connection->old)) {
return;
@ -84,6 +93,8 @@ class WriteLoop extends ResumableSignalLoop
$API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR);
yield $connection->reconnect();
continue;
} finally {
$connection->writing(false);
}
//$connection->waiter->resume();
@ -95,11 +106,12 @@ class WriteLoop extends ResumableSignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$shared = $this->datacenterConnection;
while ($connection->pending_outgoing) {
$skipped_all = true;
foreach ($connection->pending_outgoing as $k => $message) {
if ($connection->temp_auth_key !== null) {
if ($shared->hasTempAuthKey()) {
return;
}
if (!$message['unencrypted']) {
@ -146,13 +158,13 @@ class WriteLoop extends ResumableSignalLoop
$API = $this->API;
$datacenter = $this->datacenter;
$connection = $this->connection;
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
$shared = $this->datacenterConnection;
do {
if ($connection->temp_auth_key === null) {
if (!$shared->hasTempAuthKey()) {
return;
}
if ($this->API->is_http($datacenter) && empty($connection->pending_outgoing)) {
if ($shared->isHttp() && empty($connection->pending_outgoing)) {
return;
}
if (\count($to_ack = $connection->ack_queue)) {
@ -172,7 +184,7 @@ class WriteLoop extends ResumableSignalLoop
break;
}
}
if ($API->is_http($datacenter) && !$has_http_wait) {
if ($shared->isHttp() && !$has_http_wait) {
$connection->pending_outgoing[$connection->pending_outgoing_key++] = ['_' => 'http_wait', 'serialized_body' => yield $this->API->serialize_object_async(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'content_related' => true, 'unencrypted' => false, 'method' => true];
$connection->pending_outgoing_key %= Connection::PENDING_MAX;
$has_http_wait = true;
@ -190,7 +202,7 @@ class WriteLoop extends ResumableSignalLoop
unset($connection->pending_outgoing[$k]);
continue;
}
if ($API->settings['connection_settings'][$dc_config_number]['pfs'] && !isset($connection->temp_auth_key['bound']) && !\strpos($datacenter, 'cdn') && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
if ($shared->getSettings()['pfs'] && !$shared->getTempAuthKey()->isBound() && !$shared->getCtx()->isCDN() && !\in_array($message['_'], ['http_wait', 'auth.bindTempAuthKey']) && $message['method']) {
$API->logger->logger("Skipping {$message['_']} due to unbound keys in DC {$datacenter}");
$skipped = true;
continue;
@ -202,14 +214,14 @@ class WriteLoop extends ResumableSignalLoop
break;
}
$message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id($datacenter);
$message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->generate_message_id();
$API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generate_out_seq_no($message['content_related'])];
if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') {
if ((!isset($connection->temp_auth_key['connection_inited']) || $connection->temp_auth_key['connection_inited'] === false) && $message['_'] !== 'auth.bindTempAuthKey') {
if (!$shared->getTempAuthKey()->isInited() && $message['_'] !== 'auth.bindTempAuthKey') {
$API->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['write_client_info'], $message['_']), \danog\MadelineProto\Logger::NOTICE);
$MTmessage['body'] = yield $API->serialize_method_async(
'invokeWithLayer',
@ -220,8 +232,8 @@ class WriteLoop extends ResumableSignalLoop
[
'api_id' => $API->settings['app_info']['api_id'],
'api_hash' => $API->settings['app_info']['api_hash'],
'device_model' => \strpos($datacenter, 'cdn') === false ? $API->settings['app_info']['device_model'] : 'n/a',
'system_version' => \strpos($datacenter, 'cdn') === false ? $API->settings['app_info']['system_version'] : 'n/a',
'device_model' => !$connection->getCtx()->isCDN() ? $API->settings['app_info']['device_model'] : 'n/a',
'system_version' => !$connection->getCtx()->isCDN() ? $API->settings['app_info']['system_version'] : 'n/a',
'app_version' => $API->settings['app_info']['app_version'],
'system_lang_code' => $API->settings['app_info']['lang_code'],
'lang_code' => $API->settings['app_info']['lang_code'],
@ -274,7 +286,7 @@ class WriteLoop extends ResumableSignalLoop
if ($count > 1) {
$API->logger->logger("Wrapping in msg_container ($count messages of total size $total_length) as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$message_id = $connection->generate_message_id($datacenter);
$message_id = $connection->generate_message_id();
$connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => \array_values($keys), 'content_related' => false, 'method' => false, 'unencrypted' => false];
//var_dumP("container ".bin2hex($message_id));
@ -299,19 +311,19 @@ class WriteLoop extends ResumableSignalLoop
unset($messages);
$plaintext = $connection->temp_auth_key['server_salt'].$connection->session_id.$message_id.\pack('VV', $seq_no, $message_data_length).$message_data;
$plaintext = $shared->getTempAuthKey()->getServerSalt().$connection->session_id.$message_id.\pack('VV', $seq_no, $message_data_length).$message_data;
$padding = $this->posmod(-\strlen($plaintext), 16);
if ($padding < 12) {
$padding += 16;
}
$padding = $this->random($padding);
$message_key = \substr(\hash('sha256', \substr($connection->temp_auth_key['auth_key'], 88, 32).$plaintext.$padding, true), 8, 16);
list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $connection->temp_auth_key['auth_key']);
$message = $connection->temp_auth_key['id'].$message_key.$this->ige_encrypt($plaintext.$padding, $aes_key, $aes_iv);
$message_key = \substr(\hash('sha256', \substr($shared->getTempAuthKey()->getAuthKey(), 88, 32).$plaintext.$padding, true), 8, 16);
list($aes_key, $aes_iv) = $this->aes_calculate($message_key, $shared->getTempAuthKey()->getAuthKey());
$message = $shared->getTempAuthKey()->getID().$message_key.$this->ige_encrypt($plaintext.$padding, $aes_key, $aes_iv);
$buffer = yield $connection->stream->getWriteBuffer($len = \strlen($message));
$t = \microtime(true);
//$t = \microtime(true);
yield $buffer->bufferWrite($message);
$connection->httpSent();

View File

@ -1147,9 +1147,16 @@ class MTProto extends AsyncConstruct implements TLCallback
}
}
public function is_http($datacenter)
/**
* Check if connected to datacenter using HTTP
*
* @param string $datacenter DC ID
*
* @return boolean
*/
public function isHttp(string $datacenter)
{
return \in_array($this->datacenter->getDataCenterConection($datacenter)->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()]);
return $this->datacenter->isHttp($datacenter);
}
// Connects to all datacenters and if necessary creates authorization keys, binds them and writes client info

View File

@ -16,14 +16,14 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\AuthKey;
namespace danog\MadelineProto\MTProto;
use JsonSerializable;
/**
* MTProto auth key.
*/
abstract class AuthKey extends JsonSerializable
abstract class AuthKey implements JsonSerializable
{
/**
* Auth key.

View File

@ -16,7 +16,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\AuthKey;
namespace danog\MadelineProto\MTProto;
/**
* MTProto permanent auth key.

View File

@ -16,7 +16,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\AuthKey;
namespace danog\MadelineProto\MTProto;
use JsonSerializable;

View File

@ -24,7 +24,7 @@ namespace danog\MadelineProto\MTProtoSession;
*/
trait AckHandler
{
public function ack_outgoing_message_id($message_id)
public function ack_outgoing_message_id($message_id): bool
{
// The server acknowledges that it received my message
if (!isset($this->outgoing_messages[$message_id])) {
@ -43,7 +43,7 @@ trait AckHandler
return true;
}
public function got_response_for_outgoing_message_id($message_id)
public function got_response_for_outgoing_message_id($message_id): bool
{
// The server acknowledges that it received my message
if (!isset($this->outgoing_messages[$message_id])) {
@ -64,7 +64,7 @@ trait AckHandler
return true;
}
public function ack_incoming_message_id($message_id)
public function ack_incoming_message_id($message_id): bool
{
// I let the server know that I received its message
if (!isset($this->incoming_messages[$message_id])) {
@ -83,26 +83,23 @@ trait AckHandler
/**
* Check if there are some pending calls
* Check if there are some pending calls.
*
* @return boolean
*/
public function hasPendingCalls()
public function hasPendingCalls(): bool
{
$API = $this->API;
$datacenter = $this->datacenter;
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
$settings = $this->shared->getSettings();
$timeout = $settings['timeout'];
$pfs = $settings['pfs'];
foreach ($this->new_outgoing as $message_id) {
if (isset($this->outgoing_messages[$message_id]['sent'])
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
if ($pfs && !$this->shared->getTempAuthKey()->bound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
continue;
}
@ -114,27 +111,24 @@ trait AckHandler
}
/**
* Get all pending calls
* Get all pending calls.
*
* @return void
* @return array
*/
public function getPendingCalls()
public function getPendingCalls(): array
{
$API = $this->API;
$datacenter = $this->datacenter;
$dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all';
$timeout = $API->settings['connection_settings'][$dc_config_number]['timeout'];
$pfs = $API->settings['connection_settings'][$dc_config_number]['pfs'];
$settings = $this->shared->getSettings();
$timeout = $settings['timeout'];
$pfs = $settings['pfs'];
$result = [];
foreach ($this->new_outgoing as $message_id) {
if (isset($this->outgoing_messages[$message_id]['sent'])
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
&& $this->shared->hasTempAuthKey() === !$this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {
if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
if ($pfs && !$this->shared->getTempAuthKey()->bound() && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') {
continue;
}
@ -144,5 +138,4 @@ trait AckHandler
return $result;
}
}

View File

@ -184,7 +184,7 @@ trait CallHandler
$aargs,
[
'_' => $method,
'type' => $this->methods->find_by_method($method)['type'],
'type' => $this->API->methods->find_by_method($method)['type'],
'content_related' => $this->content_related($method),
'promise' => $deferred,
'method' => true,

View File

@ -257,7 +257,7 @@ trait ResponseHandler
$this->check_in_seq_no($current_msg_id);
$this->ack_incoming_message_id($current_msg_id);
// Acknowledge that I received the server's response
$response_type = $this->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type'];
$response_type = $this->API->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type'];
switch ($response_type) {
case 'Updates':
@ -568,7 +568,7 @@ trait ResponseHandler
return;
}
$botAPI = isset($request['botAPI']) && $request['botAPI'];
if (isset($response['_']) && \strpos($this->datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
if (isset($response['_']) && \strpos($this->datacenter, 'cdn') === false && $this->API->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
$response['request'] = $request;
$this->callForkDefer($this->API->handle_updates_async($response));
}

View File

@ -19,6 +19,8 @@
namespace danog\MadelineProto\MTProtoSession;
use danog\MadelineProto\MTProto;
/**
* Manages sequence number.
*/

View File

@ -714,7 +714,6 @@ trait AuthKeyHandler
$cdn = \strpos($id, 'cdn');
$media = \strpos($id, 'media');
if (!$socket->hasTempAuthKey() || !$socket->hasPermAuthKey()) {
$dc_config_number = isset($this->settings['connection_settings'][$id]) ? $id : 'all';
if (!$socket->hasPermAuthKey() && !$cdn && !$media) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_perm_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);
$socket->setPermAuthKey(yield $this->create_auth_key_async(-1, $id));
@ -723,7 +722,7 @@ trait AuthKeyHandler
if ($media) {
$socket->link(\intval($id));
}
if ($this->settings['connection_settings'][$dc_config_number]['pfs']) {
if ($this->datacenter->getDataCenterConnection($id)->getSettings()['pfs']) {
if (!$cdn) {
$this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['gen_temp_auth_key'], $id), \danog\MadelineProto\Logger::NOTICE);

View File

@ -744,7 +744,7 @@ trait TL
}
switch ($constructorData['predicate']) {
case 'gzip_packed':
return $this->deserialize(gzdecode($this->deserialize($stream, ['type' => 'bytes', 'datacenter' => $type['datacenter']])), ['type' => '', 'datacenter' => $type['datacenter']]);
return $this->deserialize(gzdecode($this->deserialize($stream, ['type' => 'bytes', 'connection' => $type['connection']])), ['type' => '', 'connection' => $type['connection']]);
case 'Vector t':
case 'vector':
break;
@ -786,10 +786,10 @@ trait TL
$type['subtype'] = '';
}
return $this->deserialize(gzdecode($this->deserialize($stream, ['type' => 'bytes'])), ['type' => '', 'datacenter' => $type['datacenter'], 'subtype' => $type['subtype']]);
return $this->deserialize(gzdecode($this->deserialize($stream, ['type' => 'bytes'])), ['type' => '', 'connection' => $type['connection'], 'subtype' => $type['subtype']]);
}
if ($constructorData['type'] === 'Vector t') {
$constructorData['datacenter'] = $type['datacenter'];
$constructorData['connection'] = $type['connection'];
$constructorData['subtype'] = isset($type['subtype']) ? $type['subtype'] : '';
$constructorData['type'] = 'vector';
@ -835,18 +835,18 @@ trait TL
$arg['type'] = 'string';
}
if ($x['_'] === 'rpc_result' && $arg['name'] === 'result') {
if (isset($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_'])
&& isset($this->tl_callbacks[TLCallback::METHOD_BEFORE_CALLBACK][$this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_']])
if (isset($type['connection']->outgoing_messages[$x['req_msg_id']]['_'])
&& isset($this->tl_callbacks[TLCallback::METHOD_BEFORE_CALLBACK][$type['connection']->outgoing_messages[$x['req_msg_id']]['_']])
) {
foreach ($this->tl_callbacks[TLCallback::METHOD_BEFORE_CALLBACK][$this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_']] as $callback) {
$callback($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_']);
foreach ($this->tl_callbacks[TLCallback::METHOD_BEFORE_CALLBACK][$type['connection']->outgoing_messages[$x['req_msg_id']]['_']] as $callback) {
$callback($type['connection']->outgoing_messages[$x['req_msg_id']]['_']);
}
}
if (isset($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['type'])
&& stripos($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['type'], '<') !== false
if (isset($type['connection']->outgoing_messages[$x['req_msg_id']]['type'])
&& stripos($type['connection']->outgoing_messages[$x['req_msg_id']]['type'], '<') !== false
) {
$arg['subtype'] = str_replace(['Vector<', '>'], '', $this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['type']);
$arg['subtype'] = str_replace(['Vector<', '>'], '', $type['connection']->outgoing_messages[$x['req_msg_id']]['type']);
}
}
if (isset($type['datacenter'])) {
@ -888,11 +888,11 @@ trait TL
$this->callFork($callback($x));
}
} elseif ($x['_'] === 'rpc_result'
&& isset($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_'])
&& isset($this->tl_callbacks[TLCallback::METHOD_CALLBACK][$this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_']])
&& isset($type['connection']->outgoing_messages[$x['req_msg_id']]['_'])
&& isset($this->tl_callbacks[TLCallback::METHOD_CALLBACK][$type['connection']->outgoing_messages[$x['req_msg_id']]['_']])
) {
foreach ($this->tl_callbacks[TLCallback::METHOD_CALLBACK][$this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']]['_']] as $callback) {
$callback($this->datacenter->sockets[$type['datacenter']]->outgoing_messages[$x['req_msg_id']], $x['result']);
foreach ($this->tl_callbacks[TLCallback::METHOD_CALLBACK][$type['connection']->outgoing_messages[$x['req_msg_id']]['_']] as $callback) {
$callback($type['connection']->outgoing_messages[$x['req_msg_id']], $x['result']);
}
}