diff --git a/composer.json b/composer.json index 3e5debca..0555de72 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,7 @@ "amphp/dns": "^1", "amphp/byte-stream": "^1", "amphp/file": "^1", + "amphp/mysql": "^2.0", "danog/dns-over-https": "^0.2", "amphp/http-client-cookies": "^1", "danog/tg-file-decoder": "^0.1", diff --git a/src/danog/MadelineProto/Db/DbArray.php b/src/danog/MadelineProto/Db/DbArray.php new file mode 100644 index 00000000..263d9a93 --- /dev/null +++ b/src/danog/MadelineProto/Db/DbArray.php @@ -0,0 +1,11 @@ +getArrayCopy(); + } + return new static($value); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/Mysql.php b/src/danog/MadelineProto/Db/Mysql.php new file mode 100644 index 00000000..cada90af --- /dev/null +++ b/src/danog/MadelineProto/Db/Mysql.php @@ -0,0 +1,45 @@ + $this->table, + 'key' => $this->key, + 'settings' => $this->settings + ]; + } + + public function __unserialize($data): void + { + foreach ($data as $property => $value) { + $this->{$property} = $value; + } + $this->initDbConnection(); + } + + public static function getInstance(array $settings, string $name, $value = []): DbType + { + $instance = new static(); + $instance->table = $name; + $instance->settings = $settings['mysql']; + $instance->initDbConnection(); + $instance->prepareTable(); + + if (!empty($value) && !$value instanceof static) { + if ($value instanceof DbArray) { + $value = $value->getArrayCopy(); + } + foreach ((array) $value as $key => $item) { + $instance[$key] = $item; + } + } + + + return $instance; + } + + /** + * Check if offset exists + * + * @link https://php.net/manual/en/arrayiterator.offsetexists.php + * + * @param string $index

+ * The offset being checked. + *

+ * + * @return bool true if the offset exists, otherwise false + * @throws \Throwable + */ + public function offsetExists($index) + { + $row = $this->syncRequest( + "SELECT count(`key`) as `count` FROM {$this->table} WHERE `key` = :index LIMIT 1", + ['index' => $index] + ); + + $row = reset($row); + return !empty($row['count']); + } + + /** + * Get value for an offset + * + * @link https://php.net/manual/en/arrayiterator.offsetget.php + * + * @param string $index

+ * The offset to get the value from. + *

+ * + * @return mixed The value at offset index. + * @throws \Throwable + */ + public function offsetGet($index) + { + $row = $this->syncRequest( + "SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1", + ['index' => $index] + ); + $row = reset($row); + if ($row) { + return unserialize($row['value']); + } + return null; + + } + + /** + * Set value for an offset + * + * @link https://php.net/manual/en/arrayiterator.offsetset.php + * + * @param string $index

+ * The index to set for. + *

+ * @param $value + * + * @return void + * @throws \Throwable + */ + public function offsetSet($index, $value) + { + $this->syncRequest(" + INSERT INTO `{$this->table}` + SET `key` = :index, `value` = :value + ON DUPLICATE KEY UPDATE `value` = :value + ", + [ + 'index' => $index, + 'value' => serialize($value), + ] + ); + } + + /** + * Unset value for an offset + * + * @link https://php.net/manual/en/arrayiterator.offsetunset.php + * + * @param string $index

+ * The offset to unset. + *

+ * + * @return void + * @throws \Throwable + */ + public function offsetUnset($index) + { + $this->syncRequest(" + DELETE FROM `{$this->table}` + WHERE `key` = :index + ", + ['index' => $index] + ); + } + + /** + * Append an element + * @link https://php.net/manual/en/arrayiterator.append.php + * @param mixed $value

+ * The value to append. + *

+ * @return void + */ + public function append($value) + { + throw new \BadMethodCallException('Append operation does not supported'); + } + + /** + * Get array copy + * + * @link https://php.net/manual/en/arrayiterator.getarraycopy.php + * @return array A copy of the array, or array of public properties + * if ArrayIterator refers to an object. + * @throws \Throwable + */ + public function getArrayCopy(): array + { + $rows = $this->syncRequest("SELECT `key`, `value` FROM {$this->table}"); + $result = []; + foreach ($rows as $row) { + $result[$row['key']] = unserialize($row['value']); + } + return $result; + } + + /** + * Count elements + * + * @link https://php.net/manual/en/arrayiterator.count.php + * @return int The number of elements or public properties in the associated + * array or object, respectively. + * @throws \Throwable + */ + public function count(): int + { + return $this->syncRequest("SELECT count(`key`) as `count` FROM {$this->table}")['count'] ?? 0; + } + + /** + * Sort array by values + * @link https://php.net/manual/en/arrayiterator.asort.php + * @return void + */ + public function asort() + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * Sort array by keys + * @link https://php.net/manual/en/arrayiterator.ksort.php + * @return void + */ + public function ksort() + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * User defined sort + * @link https://php.net/manual/en/arrayiterator.uasort.php + * @param string $cmp_function

+ * The compare function used for the sort. + *

+ * @return void + */ + public function uasort($cmp_function) + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * User defined sort + * @link https://php.net/manual/en/arrayiterator.uksort.php + * @param string $cmp_function

+ * The compare function used for the sort. + *

+ * @return void + */ + public function uksort($cmp_function) + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * Sort an array naturally + * @link https://php.net/manual/en/arrayiterator.natsort.php + * @return void + */ + public function natsort() + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * Sort an array naturally, case insensitive + * @link https://php.net/manual/en/arrayiterator.natcasesort.php + * @return void + */ + public function natcasesort() + { + throw new \BadMethodCallException('Sort operation does not supported'); + } + + /** + * Rewind array back to the start + * + * @link https://php.net/manual/en/arrayiterator.rewind.php + * @return void + * @throws \Throwable + */ + public function rewind() + { + $this->key = null; + $this->key(); + } + + /** + * Return current array entry + * + * @link https://php.net/manual/en/arrayiterator.current.php + * @return mixed The current array entry. + * @throws \Throwable + */ + public function current() + { + return $this->offsetGet($this->key()); + } + + /** + * Return current array key + * + * @link https://php.net/manual/en/arrayiterator.key.php + * @return string|float|int|bool|null The current array key. + * @throws \Throwable + */ + public function key(): ?string + { + if ($this->key === null) { + $row = $this->syncRequest( + "SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1" + ); + if ($row) { + $row = reset($row); + $this->key = $row['key'] ?? null; + } + + } + return $this->key; + } + + /** + * Move to next entry + * + * @link https://php.net/manual/en/arrayiterator.next.php + * @return void + * @throws \Throwable + */ + public function next() { + $row = $this->syncRequest( + "SELECT `key` FROM {$this->table} WHERE `key` > :key LIMIT 1", + ['key' => $this->key()] + ); + $row = reset($row); + $this->key = $row['key'] ?? null; + } + + /** + * Check whether array contains more entries + * + * @link https://php.net/manual/en/arrayiterator.valid.php + * @return bool + * @throws \Throwable + */ + public function valid() { + if ($this->key() === null) { + return false; + } + + $row = $this->syncRequest( + "SELECT `key` FROM {$this->table} WHERE `key` > :key LIMIT 1", + ['key' => $this->key()] + ); + + return $row !== null; + } + + /** + * Seek to position + * @link https://php.net/manual/en/arrayiterator.seek.php + * @param int $position

+ * The position to seek to. + *

+ * @return void + */ + public function seek($position) + { + $row = $this->syncRequest( + "SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1, :position", + ['offset' => $position] + ); + $row = reset($row); + if (isset($row['key'])) { + $this->key = $row['key']; + } + } + + private function initDbConnection() + { + $this->db = Mysql::getConnection( + $this->settings['host'], + $this->settings['port'], + $this->settings['user'], + $this->settings['password'], + $this->settings['database'], + ); + } + + /** + * Create table for property + * + * @return array|null + * @throws \Throwable + */ + private function prepareTable() + { + return $this->syncRequest(" + CREATE TABLE IF NOT EXISTS `{$this->table}` + ( + `key` VARCHAR(255) NOT NULL, + `value` LONGTEXT NULL, + `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`key`) + ) + "); + } + + /** + * Perform blocking request to db + * + * @param string $query + * @param array $params + * + * @return array|null + * @throws \Throwable + */ + private function syncRequest(string $query, array $params = []): array + { + return Tools::wait( + call( + function() use($query, $params) { + $request = yield $this->db->execute($query, $params); + $result = []; + if ($request instanceof ResultSet) { + while (yield $request->advance()) { + $row = $request->getCurrent(); + if (isset($row['key'])) { + $result[$row['key']] = $row; + } else { + $result[] = $row; + } + + } + } + return $result; + } + ) + ); + } + +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/SharedMemoryArray.php b/src/danog/MadelineProto/Db/SharedMemoryArray.php new file mode 100644 index 00000000..2252556c --- /dev/null +++ b/src/danog/MadelineProto/Db/SharedMemoryArray.php @@ -0,0 +1,25 @@ +getArrayCopy(); + } + $value = array_replace_recursive(static::$instance->getArrayCopy(), (array) $value); + foreach ($value as $key => $item) { + static::$instance[$key] = $item; + } + } + + return static::$instance; + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 3a17eda4..a6c43204 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -23,6 +23,12 @@ use Amp\Dns\Resolver; use Amp\File\StatCache; use Amp\Http\Client\HttpClient; use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Db\DbArray; +use danog\MadelineProto\Db\DbType; +use danog\MadelineProto\Db\Engines\DbInterface; +use danog\MadelineProto\Db\DbPropertiesFabric; +use danog\MadelineProto\Db\Mysql; +use danog\MadelineProto\Db\Types\ArrayType; use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; @@ -85,7 +91,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @var int */ - const V = 138; + const V = 139; /** * String release version. * @@ -278,15 +284,15 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Internal peer database. * - * @var array + * @var DbArray */ - public $chats = []; + public $chats; /** * Cached parameters for fetching channel participants. * - * @var array + * @var DbArray */ - public $channel_participants = []; + public $channel_participants; /** * When we last stored data in remote peer database (now doesn't exist anymore). * @@ -302,9 +308,9 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Full chat info database. * - * @var array + * @var DbArray */ - public $full_chats = []; + public $full_chats; /** * Latest chat message ID map for update handling. * @@ -407,6 +413,18 @@ class MTProto extends AsyncConstruct implements TLCallback * @var \danog\MadelineProto\TL\TL */ private $TL; + + /** + * List of properties stored in database (memory or external) + * @see DbPropertiesFabric + * @var array + */ + private array $dbProperies = [ + 'chats' => 'array', + 'full_chats' => 'array', + 'channel_participants' => 'array' + ]; + /** * Constructor function. * @@ -540,6 +558,18 @@ class MTProto extends AsyncConstruct implements TLCallback 'reportDest' ]; } + + public function initDb(bool $reset = false): void + { + foreach ($this->dbProperies as $property => $type) { + if ($reset) { + unset($this->{$property}); + } else { + $this->{$property} = DbPropertiesFabric::get($this->settings['db'], $type, $property, $this->{$property}); + } + } + } + /** * Cleanup memory and session file. * @@ -751,6 +781,9 @@ class MTProto extends AsyncConstruct implements TLCallback } $this->TL->init($this->settings['tl_schema']['src'], $callbacks); } + + $this->initDb(); + } /** * Upgrade MadelineProto instance. @@ -777,9 +810,9 @@ class MTProto extends AsyncConstruct implements TLCallback if (isset($settings['authorization']['rsa_key'])) { unset($settings['authorization']['rsa_key']); } - if (!isset($this->full_chats)) { - $this->full_chats = []; - } + + $this->initDb(); + if (!isset($this->secret_chats)) { $this->secret_chats = []; } @@ -799,6 +832,8 @@ class MTProto extends AsyncConstruct implements TLCallback $chat['mtproto'] = 1; } } + unset($chat); + foreach ($settings['connection_settings'] as $key => &$connection) { if (\in_array($key, ['default_dc', 'media_socket_count', 'robin_period'])) { continue; @@ -821,6 +856,8 @@ class MTProto extends AsyncConstruct implements TLCallback $connection['obfuscated'] = true; } } + unset($connection); + $this->resetMTProtoSession(true, true); $this->config = ['expires' => -1]; $this->dh_config = ['version' => 0]; @@ -1244,6 +1281,23 @@ class MTProto extends AsyncConstruct implements TLCallback 'run_callback' => true, ], 'secret_chats' => ['accept_chats' => true], 'serialization' => ['serialization_interval' => 30, 'cleanup_before_serialization' => false], + /** + * Where internal database will be stored? + * memory - session file + * sharedMemory - multiples instances share db if run in single process + * mysql - mysql database, shared by all instances in all processes. + */ + 'db' => [ + 'type' => 'memory', + /** @see Mysql */ + 'mysql' => [ + 'host' => '127.0.0.1', + 'port' => 3306, + 'user' => 'root', + 'password' => '', + 'database' => 'MadelineProto' + ] + ], 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'pwr' => false, // Need info ? @@ -1469,13 +1523,13 @@ class MTProto extends AsyncConstruct implements TLCallback $this->authorization = null; $this->updates = []; $this->secret_chats = []; - $this->chats = []; - $this->users = []; + + $this->initDb(true); + $this->tos = ['expires' => 0, 'accepted' => true]; $this->referenceDatabase = new ReferenceDatabase($this); $this->minDatabase = new MinDatabase($this); $this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0]; - $this->full_chats = []; } /** * Reset the update state and fetch all updates from the beginning.