Reduce overhead

This commit is contained in:
Daniil Gentili 2020-02-05 17:29:48 +01:00
parent 066a10e2dc
commit cd217c147d
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
5 changed files with 27 additions and 68 deletions

View File

@ -549,9 +549,9 @@ class DataCenter
*
* @param string $dc DC ID
*
* @return Promise<Connection>
* @return \Generator<Connection>
*/
public function waitGetConnection(string $dc): Promise
public function waitGetConnection(string $dc): \Generator
{
return $this->sockets[$dc]->waitGetConnection();
}

View File

@ -506,18 +506,14 @@ class DataCenterConnection implements JsonSerializable
/**
* Get best socket in round robin, asynchronously.
*
* @return Promise<Connection>
* @return \Generator<Connection>
*/
public function waitGetConnection(): Promise
public function waitGetConnection(): \Generator
{
if (empty($this->availableConnections)) {
$deferred = new Deferred();
$this->connectionsPromise->onResolve(function ($e, $v) use ($deferred) {
$deferred->resolve($this->getConnection());
});
return $deferred->promise();
yield $this->connectionsPromise;
}
return new Success($this->getConnection());
return $this->getConnection();
}
/**
* Get best socket in round robin.

View File

@ -82,39 +82,17 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return Promise
* @return \Generator
*/
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{
$deferred = new Deferred();
$this->methodCallAsyncWrite($method, $args, $aargs)->onResolve(
static function (?\Throwable $e, $readDeferred) use ($deferred, $method): void {
if ($e) {
$deferred->fail($e);
} else {
$readDeferred = yield from $this->methodCallAsyncWrite($method, $args, $aargs);
if (\is_array($readDeferred)) {
$readDeferred = \array_map(fn (Deferred $value) => $value->promise(), $readDeferred);
$deferred->resolve(all($readDeferred));
$readDeferred = Tools::all(\array_map(fn (Deferred $value) => $value->promise(), $readDeferred));
} else {
$deferred->resolve($readDeferred->promise());
$readDeferred = $readDeferred->promise();
}
}
}
);
return ($aargs['noResponse'] ?? false) ? new Success() : $deferred->promise();
}
/**
* Call method and make sure it is asynchronously sent.
*
* @param string $method Method name
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return Promise
*/
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
{
return \danog\MadelineProto\Tools::call($this->methodCallAsyncWriteGenerator($method, $args, $aargs));
return ($aargs['noResponse'] ?? false) ? new Success() : $readDeferred;
}
/**
* Call method and make sure it is asynchronously sent (generator).
@ -125,16 +103,16 @@ trait CallHandler
*
* @return Generator
*/
public function methodCallAsyncWriteGenerator(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{
if (\is_array($args) && isset($args['id']['_']) && isset($args['id']['dc_id']) && $args['id']['_'] === 'inputBotInlineMessageID' && $this->datacenter !== $args['id']['dc_id']) {
$aargs['datacenter'] = $args['id']['dc_id'];
return $this->API->methodCallAsyncWrite($method, $args, $aargs);
return yield from $this->API->methodCallAsyncWrite($method, $args, $aargs);
}
if (($aargs['file'] ?? false) && !$this->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) {
$this->logger->logger('Using media DC');
$aargs['datacenter'] = $this->datacenter.'_media';
return $this->API->methodCallAsyncWrite($method, $args, $aargs);
return yield from $this->API->methodCallAsyncWrite($method, $args, $aargs);
}
if (\in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) {
$aargs['queue'] = 'secret';
@ -157,7 +135,7 @@ trait CallHandler
unset($args['multiple']);
}
foreach ($args as $single_args) {
$promises[] = yield from $this->methodCallAsyncWriteGenerator($method, $single_args, $new_aargs);
$promises[] = yield from $this->methodCallAsyncWrite($method, $single_args, $new_aargs);
}
if (!isset($aargs['postpone'])) {
$this->writer->resume();

View File

@ -35,7 +35,7 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return array
* @return mixed
*/
public function methodCall(string $method, $args = [], array $aargs = ['msg_id' => null])
{
@ -50,19 +50,11 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return Promise
* @return \Generator
*/
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{
$deferred = new Deferred();
$this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve(static function (?\Throwable $e, ?Connection $res) use (&$method, &$args, &$aargs, &$deferred): void {
if ($e) {
$deferred->fail($e);
return;
}
$deferred->resolve($res->methodCallAsyncRead($method, $args, $aargs));
});
return $deferred->promise();
return yield from (yield from $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncRead($method, $args, $aargs);
}
/**
* Call method and make sure it is asynchronously sent.
@ -71,17 +63,10 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return Promise
* @return \Generator
*/
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
{
$deferred = new Deferred();
$this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve(static function ($e, $res) use (&$method, &$args, &$aargs, &$deferred) {
if ($e) {
throw $e;
}
$deferred->resolve($res->methodCallAsyncWrite($method, $args, $aargs));
});
return $deferred->promise();
return yield from (yield from $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncWrite($method, $args, $aargs);
}
}

View File

@ -276,7 +276,7 @@ trait Files
$exception = null;
$start = \microtime(true);
while ($part_num < $part_total_num) {
$writePromise = $this->methodCallAsyncWrite($method, $callable($part_num), ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter]);
$writePromise = Tools::call($this->methodCallAsyncWrite($method, $callable($part_num), ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter]));
if (!$seekable) {
yield $writePromise;
}