Async HTTP requests

This commit is contained in:
Daniil Gentili 2019-05-12 13:45:52 +02:00
parent 07043f2d7d
commit 5ec4195173
5 changed files with 62 additions and 56 deletions

2
docs

@ -1 +1 @@
Subproject commit 0fcff8e5a31af300511949c4dbafc7be6c0d4dd7
Subproject commit e8e22ca165ac1bc8a7435d5d55b3e94b1883d9d8

View File

@ -369,10 +369,19 @@ class DataCenter
return $ctxs;
}
/**
* Get Artax async HTTP client
*
* @return \Amp\Artax\DefaultClient
*/
public function getHTTPClient()
{
return $this->HTTPClient;
}
public function fileGetContents($url): \Generator
{
return yield (yield $this->getHTTPClient()->request($url))->getBody();
}
public function get_dcs($all = true)
{
$test = $this->settings['all']['test_mode'] ? 'test' : 'main';

View File

@ -518,7 +518,7 @@ trait AuthKeyHandler
public function wolfram_single_async($what)
{
$code = yield (yield $this->datacenter->getHTTPClient()->request('http://www.wolframalpha.com/api/v1/code'))->getBody();
$code = yield $this->datacenter->fileGetContents('http://www.wolframalpha.com/api/v1/code');
$query = 'Do prime factorization of '.$what;
$params = [
'async' => true,

View File

@ -20,6 +20,7 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use Amp\Artax\Request;
/**
* Manages peers.
@ -382,7 +383,12 @@ trait PeerHandler
}
}
if (!isset($this->settings['pwr']['requests']) || $this->settings['pwr']['requests'] === true && $recursive) {
$dbres = json_decode(@file_get_contents('https://id.pwrtelegram.xyz/db/getusername?id='.$id, false, stream_context_create(['http' => ['timeout' => 2]])), true);
$dbres = [];
try {
$dbres = json_decode(yield $this->datacenter->fileGetContents('https://id.pwrtelegram.xyz/db/getusername?id='.$id), true);
} catch (\Throwable $e) {
$this->logger->logger($e);
}
if (isset($dbres['ok']) && $dbres['ok']) {
yield $this->resolve_username_async('@'.$dbres['result']);
@ -814,22 +820,15 @@ trait PeerHandler
//$path = '/tmp/ids'.hash('sha256', $payload);
//file_put_contents($path, $payload);
$id = isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id'];
$ch = curl_init();
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_URL, 'https://id.pwrtelegram.xyz/db'.$this->settings['pwr']['db_token'].'/addnewmadeline?d=pls&from='.$id);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
$result = curl_exec($ch);
curl_close($ch);
//$result = shell_exec('curl '.escapeshellarg('https://id.pwrtelegram.xyz/db'.$this->settings['pwr']['db_token'].'/addnewmadeline?d=pls&from='.$id).' -d '.escapeshellarg('@'.$path).' -s >/dev/null 2>/dev/null & ');
$request = (new Request('https://id.pwrtelegram.xyz/db'.$this->settings['pwr']['db_token'].'/addnewmadeline?d=pls&from='.$id, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload);
$result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody();
$this->logger->logger("============ $result =============", \danog\MadelineProto\Logger::VERBOSE);
$this->qres = [];
$this->last_stored = time() + 10;
} catch (\danog\MadelineProto\Exception $e) {
if (file_exists($path)) {
unlink($path);
}
$this->logger->logger('======= COULD NOT STORE IN DB DUE TO '.$e->getMessage().' =============', \danog\MadelineProto\Logger::VERBOSE);
}
}

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Request;
use Amp\Deferred;
use Amp\Delayed;
use function Amp\Promise\any;
@ -303,7 +304,7 @@ trait UpdateHandler
$this->updates_state->syncLoading(true);
try {
$data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
$data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]);
yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']);
} finally {
$this->updates_state->syncLoading($last);
@ -349,11 +350,11 @@ trait UpdateHandler
$cur_state = $this->channels_state->get($channel_id, $update);
}
/*
if ($cur_state['sync_loading'] && in_array($update['_'], ['updateNewMessage', 'updateEditMessage', 'updateNewChannelMessage', 'updateEditChannelMessage'])) {
$this->logger->logger('Sync loading, not handling update', \danog\MadelineProto\Logger::NOTICE);
if ($cur_state['sync_loading'] && in_array($update['_'], ['updateNewMessage', 'updateEditMessage', 'updateNewChannelMessage', 'updateEditChannelMessage'])) {
$this->logger->logger('Sync loading, not handling update', \danog\MadelineProto\Logger::NOTICE);
return false;
}*/
return false;
}*/
switch ($update['_']) {
case 'updateChannelTooLong':
yield $this->get_channel_difference_async($channel_id);
@ -374,10 +375,22 @@ trait UpdateHandler
//isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from'])
) {
$log = '';
if ($from) $log .= "from_id {$update['message']['from_id']}, ";
if ($to) $log .= "to_id ".json_encode($update['message']['to_id']).", ";
if ($via_bot) $log .= "via_bot {$update['message']['via_bot_id']}, ";
if ($entities) $log .= "entities ".json_encode($update['message']['entities']).", ";
if ($from) {
$log .= "from_id {$update['message']['from_id']}, ";
}
if ($to) {
$log .= "to_id ".json_encode($update['message']['to_id']).", ";
}
if ($via_bot) {
$log .= "via_bot {$update['message']['via_bot_id']}, ";
}
if ($entities) {
$log .= "entities ".json_encode($update['message']['entities']).", ";
}
$this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE);
if ($channel_id !== false && yield $this->peer_isset_async($this->to_supergroup($channel_id))) {
yield $this->get_channel_difference_async($channel_id);
@ -400,7 +413,7 @@ trait UpdateHandler
$logger = function ($msg) use ($update, $cur_state, $channel_id) {
$pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0;
$this->logger->logger($update);
$double = isset($update['message']['id']) ? $update['message']['id']*2 : '-';
$double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-';
$mid = isset($update['message']['id']) ? $update['message']['id'] : '-';
$mypts = $cur_state->pts();
$this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: $channel_id", \danog\MadelineProto\Logger::ERROR);
@ -557,9 +570,9 @@ trait UpdateHandler
}
/*
if ($update['_'] === 'updateEncryptedChatTyping') {
$update = ['_' => 'updateUserTyping', 'user_id' => $this->encrypted_chats[$update['chat_id']]['user_id'], 'action' => ['_' => 'sendMessageTypingAction']];
$update = ['_' => 'updateUserTyping', 'user_id' => $this->encrypted_chats[$update['chat_id']]['user_id'], 'action' => ['_' => 'sendMessageTypingAction']];
}
*/
*/
if ($update['_'] === 'updateEncryption') {
switch ($update['chat']['_']) {
case 'encryptedChatRequested':
@ -590,7 +603,7 @@ trait UpdateHandler
//$this->logger->logger($update, \danog\MadelineProto\Logger::NOTICE);
}
//if ($update['_'] === 'updateServiceNotification' && strpos($update['type'], 'AUTH_KEY_DROP_') === 0) {
//}
if (!$this->settings['updates']['handle_updates']) {
return;
@ -611,40 +624,25 @@ trait UpdateHandler
public function pwr_webhook($update)
{
$this->call((function () use ($update) {
$payload = json_encode($update);
//$this->logger->logger($update, $payload, json_last_error());
if ($payload === '') {
$this->logger->logger('EMPTY UPDATE');
$payload = json_encode($update);
//$this->logger->logger($update, $payload, json_last_error());
if ($payload === '') {
$this->logger->logger('EMPTY UPDATE');
return false;
}
$this->call((function () use ($payload) {
$request = (new Request($this->hook_url, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload);
$result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody();
return false;
}
$ch = curl_init();
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_URL, $this->hook_url);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
$parse = parse_url($this->hook_url);
if (isset($parse['scheme']) && $parse['scheme'] == 'https') {
if (isset($this->pem_path) && file_exists($this->pem_path)) {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, true);
curl_setopt($ch, CURLOPT_CAINFO, $this->pem_path);
} else {
//curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
}
}
$result = curl_exec($ch);
curl_close($ch);
$this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE);
$result = json_decode($result, true);
if (is_array($result) && isset($result['method']) && $result['method'] != '' && is_string($result['method'])) {
try {
$this->logger->logger('Reverse webhook command returned', yield $this->method_call_async_read($result['method'], $result, ['datacenter' => $this->datacenter->curdc]));
} catch (\danog\MadelineProto\Exception $e) {
} catch (\danog\MadelineProto\TL\Exception $e) {
} catch (\danog\MadelineProto\RPCErrorException $e) {
} catch (\danog\MadelineProto\SecurityException $e) {
} catch (\Throwable $e) {
$this->logger->logger("Reverse webhook command returned: $e");
}
}
})());