MadelineProto/src/Socket.php
Daniil Gentili df24fa4611
Merge alpha into master (async, huge bugfixes and more) (#546)
* Implement async and lots of bugfixes

* Implement more async

* Implement async, implement bugfixes for the connection module, for the datacenter module, huge bugfixes, huge perfomance improvements, media DCs for https, advanced selecting, custom var_dump, totally rewritten IOLoop and response mechanism, promises, improvements to the TL parser, custom mb_substr

* Apply fixes from StyleCI

* Bugfixes

* Apply fixes from StyleCI

* Bugfixes, implement combined promises

* Apply fixes from StyleCI

* Support passing method arguments as callable

* Starting to write async upload logic

* Apply fixes from StyleCI

* Start implementing async file upload

* Apply fixes from StyleCI

* bugfix

* Apply fixes from StyleCI

* Start rewriting connection module

* Add PHP file docblocks for all classes

* Start working on new async stream API

* Finish writing stream API

* More stream API fixes

* Apply fixes from StyleCI

* Rewrite DataCenter and Connection modules

* Clean up stream API documentation

* Fixes

* Apply fixes from StyleCI

* Add referenced parameter to get length of buffer to read in getReadBuffer API

* Moved all MessageHandler code in the Connection module, added a PHP version warning in the phar

* Start fixing reads

* Fix all protocol stream wrappers

* Apply fixes from StyleCI

* Implement disconnection, and remove end function

* Working async RPC

* Implement async file upload

* Bugfix

* Method recall bugfixes

* Bugfixes

* Trait bugfixes

* Fix FIFO buffer

* Bugfixes and speedtests

* Async logging

* Implement websocket streams

* Implement loop API, signal API, clean closing and start changing layer

* Small magna, websocket and HTTP fixes

* Clean up loop API

* Improved stack traces, 2FA and async

* Login fixes

* Added instructions for manual verification

* Small fixes

* More app info improvements

* More app info improvements

* TL and 2FA fixes

* Update to layer 89

* More bugfixes

* Implement broken media reporting

* Remove debug comments

* PHP 7.2 backwards compatibility

* Bugfixes

* Async key generation

* Some simplifications

* Transport fixes

* Cleanup

* async API

* Performance fixes

* Fixes to async API

* Bugfixes

* Implement one-time async loop

* Authorization and logging fixes

* Update to layer 91

* 7to5 fix

* Null coalesce conversion

* Implement socks5 proxy

* Implement HTTP proxy

* Fixes to HTTP proxy

* MTProxy and socks5 fixes

* Disable PHP 5 conversion

* Proxies have higher priority

* Avoid error handling in vendor

* Override composer dependencies

* Fix travis build

* Final composer fixes

* Proxy logic fixes

* Fix get_updates update handling

* Do not use parallel file driver if not supported

* Refactor loader and implement HTTP fixes

* Suppress errors in loader

* HTTP and authorization fixes

* HTTP fixes

* Improved peer management

* Use HTTP protocol on altervista

* Small bugfixes

* Minor fixes

* Docufix

* Docufix

* Legacy fixes

* Fix message queue

* Avoid updating if using MTProxy

* Improve logs and examples

* Trim final newlines while converting parse mode

* Reimplement noResponse flag

* Async combined event handler and APIFactory fixes

* Actually return config

* Case-insensitive methods

* Bugfix

* Apply fixes from StyleCI (#545)

* MTProxy fixes

* PHP 5 warning

* Improved PHP 5 warning

* Use <br> along with newlines in web logs

* Update docs
2018-12-26 20:51:14 +01:00

393 lines
13 KiB
PHP

<?php
/**
* Socket module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2018 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
class FSocket
{
private $sock;
private $protocol;
private $timeout = ['sec' => 0, 'usec' => 0];
private $blocking = false;
private $domain;
private $type;
public function __construct(int $domain, int $type, int $protocol)
{
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol === PHP_INT_MAX ? 'tls' : ($protocol === PHP_INT_MAX - 1 ? 'tcp' : getprotobynumber($protocol));
}
public function __destruct()
{
if ($this->sock !== null) {
fclose($this->sock);
}
}
public function setOption(int $level, int $name, $value)
{
if (in_array($name, [\SO_RCVTIMEO, \SO_SNDTIMEO])) {
$this->timeout = ['sec' => (int) $value, 'usec' => (int) (($value - (int) $value) * 1000000)];
return true;
}
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getOption(int $level, int $name)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function setBlocking(bool $blocking)
{
return stream_set_blocking($this->sock, $blocking);
}
public function bind(string $address, int $port = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function listen(int $backlog = 0)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function accept()
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function connect(string $address, int $port = -1)
{
if ($this->domain === AF_INET6 && strpos($address, ':') !== false) {
$address = '['.$address.']';
}
$errno = 0;
$errstr = '';
$this->sock = fsockopen($this->protocol.'://'.$address, $port, $errno, $errstr, $this->timeout['sec'] + ($this->timeout['usec'] / 1000000));
stream_set_timeout($this->sock, $this->timeout['sec'], $this->timeout['usec']);
return true;
}
public static function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
$actual_read = [];
foreach ($read as $key => $resource) {
$actual_read[$key] = $resource->getResource();
}
$actual_write = [];
foreach ($write as $key => $resource) {
$actual_write[$key] = $resource->getResource();
}
$actual_except = [];
foreach ($except as $key => $resource) {
$actual_except[$key] = $resource->getResource();
}
$res = stream_select($actual_read, $actual_write, $actual_except, $tv_sec, $tv_usec);
foreach ($read as $key => $resource) {
if (!isset($actual_read[$key])) {
unset($read[$key]);
}
}
foreach ($write as $key => $resource) {
if (!isset($actual_write[$key])) {
unset($write[$key]);
}
}
foreach ($except as $key => $resource) {
if (!isset($actual_except[$key])) {
unset($except[$key]);
}
}
return $res;
}
public function read(int $length, int $flags = 0)
{
$packet = '';
$try = 0;
while (($current_length = strlen($packet)) < $length) {
$read = stream_get_contents($this->sock, $length - $current_length);
if ($read === false || (strlen($read) === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing in the socket!');
}
$packet .= $read;
$try++;
}
return $packet;
}
public function write(string $buffer, int $length = -1)
{
if ($length === -1) {
$length = strlen($buffer);
} else {
$buffer = substr($buffer, 0, $length);
}
$try = 0;
$wrote = fwrite($this->sock, $buffer, $length);
while ($wrote < $length) {
$wrote_now = fwrite($this->sock, substr($buffer, $wrote), $length - $wrote);
if ($wrote_now === false || ($wrote_now === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing could be written in the socket!');
}
$wrote += $wrote_now;
$try++;
}
return $wrote;
}
public function send(string $data, int $length, int $flags)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function close()
{
fclose($this->sock);
$this->sock = null;
}
public function getPeerName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function getSockName(bool $port = true)
{
throw new \danog\MadelineProto\Exception('Not supported');
}
public function setExtra(array $extra = [])
{
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock;
}
}
if (!extension_loaded('pthreads')) {
if (extension_loaded('sockets')) {
class SocketBase
{
private $sock;
public function __construct($sock)
{
$this->sock = $sock;
}
public function __destruct()
{
socket_close($this->sock);
unset($this->sock);
}
public function setOption(int $level, int $name, $value)
{
if (in_array($name, [\SO_RCVTIMEO, \SO_SNDTIMEO])) {
$value = ['sec' => (int) $value, 'usec' => (int) (($value - (int) $value) * 1000000)];
}
return socket_set_option($this->sock, $level, $name, $value);
}
public function getOption(int $level, int $name)
{
return socket_get_option($this->sock, $level, $name);
}
public function setBlocking(bool $blocking)
{
if ($blocking) {
return socket_set_block($this->sock);
}
return socket_set_nonblock($this->sock);
}
public function bind(string $address, int $port = 0)
{
return socket_bind($this->sock, $address, $port);
}
public function listen(int $backlog = 0)
{
return socket_listen($this->sock, $backlog);
}
public function accept()
{
if ($socket = socket_accept($this->sock)) {
return new self($socket);
} else {
return $socket;
}
}
public function connect(string $address, int $port = 0)
{
return socket_connect($this->sock, $address, $port);
}
public static function select(array &$read, array &$write, array &$except, int $tv_sec, int $tv_usec = 0)
{
$actual_read = [];
foreach ($read as $key => $resource) {
$actual_read[$key] = $resource->getResource();
}
$actual_write = [];
foreach ($write as $key => $resource) {
$actual_write[$key] = $resource->getResource();
}
$actual_except = [];
foreach ($except as $key => $resource) {
$actual_except[$key] = $resource->getResource();
}
$res = socket_select($actual_read, $actual_write, $actual_except, $tv_sec, $tv_usec);
foreach ($read as $key => $resource) {
if (!isset($actual_read[$key])) {
unset($read[$key]);
}
}
foreach ($write as $key => $resource) {
if (!isset($actual_write[$key])) {
unset($write[$key]);
}
}
foreach ($except as $key => $resource) {
if (!isset($actual_except[$key])) {
unset($except[$key]);
}
}
return $res;
}
public function read(int $length, int $flags = 0)
{
$packet = '';
$try = 0;
while (strlen($packet) < $length) {
$read = socket_read($this->sock, $length - strlen($packet), $flags);
if ($read === false || (strlen($read) === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing in the socket!');
}
$packet .= $read;
$try++;
}
return $packet;
}
public function write(string $buffer, int $length = -1)
{
if ($length === -1) {
$length = strlen($buffer);
} else {
$buffer = substr($buffer, 0, $length);
}
$try = 0;
$wrote = socket_write($this->sock, $buffer, $length);
while ($wrote < $length) {
$wrote_now = socket_write($this->sock, substr($buffer, $wrote), $length - $wrote);
if ($wrote_now === false || ($wrote_now === 0 && $try > 10)) {
throw new \danog\MadelineProto\NothingInTheSocketException('Nothing could be written in the socket!');
}
$wrote += $wrote_now;
$try++;
}
return $wrote;
}
public function send(string $data, int $length, int $flags)
{
return socket_send($data, $length, $flags);
}
public function close()
{
socket_close($this->sock);
$this->sock = null;
}
public function getPeerName(bool $port = true)
{
$address = '';
$port = 0;
$port ? socket_getpeername($this->sock, $address, $ip) : socket_getpeername($this->sock, $address);
return $port ? ['host' => $address, 'port' => $port] : ['host' => $address];
}
public function getSockName(bool $port = true)
{
$address = '';
$port = 0;
$port ? socket_getsockname($this->sock, $address, $ip) : socket_getsockname($this->sock, $address);
return $port ? ['host' => $address, 'port' => $port] : ['host' => $address];
}
public function getProxyHeaders()
{
return '';
}
public function getResource()
{
return $this->sock;
}
}
class Socket extends SocketBase
{
public function __construct(int $domain, int $type, int $protocol)
{
parent::__construct(socket_create($domain, $type, $protocol));
}
}
} else {
define('AF_INET', 0);
define('AF_INET6', 1);
define('SOCK_STREAM', 2);
define('SOL_SOCKET', 3);
define('SO_RCVTIMEO', 4);
define('SO_SNDTIMEO', 5);
class Socket extends FSocket
{
}
}
}