Database properties types

This commit is contained in:
Alexander Pankratov 2020-04-25 22:57:55 +03:00
parent ff7d93f52d
commit ed493330a6
9 changed files with 650 additions and 13 deletions

View File

@ -27,6 +27,7 @@
"amphp/dns": "^1",
"amphp/byte-stream": "^1",
"amphp/file": "^1",
"amphp/mysql": "^2.0",
"danog/dns-over-https": "^0.2",
"amphp/http-client-cookies": "^1",
"danog/tg-file-decoder": "^0.1",

View File

@ -0,0 +1,11 @@
<?php
namespace danog\MadelineProto\Db;
abstract class DbArray extends \ArrayIterator implements DbType
{
protected function __construct($array = [], $flags = 0)
{
parent::__construct((array) $array, $flags | self::STD_PROP_LIST);
}
}

View File

@ -0,0 +1,49 @@
<?php
namespace danog\MadelineProto\Db;
class DbPropertiesFabric
{
/**
* @param array $dbSettings
* @param string $propertyType
* @param string $name
* @param $value
*
* @return mixed
*
* @uses \danog\MadelineProto\Db\MemoryArray
* @uses \danog\MadelineProto\Db\SharedMemoryArray
* @uses \danog\MadelineProto\Db\MysqlArray
*/
public static function get(array $dbSettings, string $propertyType, string $name, $value = null): DbType
{
$class = __NAMESPACE__;
switch (strtolower($dbSettings['type'])) {
case 'memory':
$class .= '\Memory';
break;
case 'sharedmemory':
$class .= '\SharedMemory';
break;
case 'mysql':
$class .= '\Mysql';
break;
default:
throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}");
}
switch (strtolower($propertyType)){
case 'array':
$class .= 'Array';
break;
default:
throw new \InvalidArgumentException("Unknown $propertyType: {$propertyType}");
}
/** @var DbType $class */
return $class::getInstance($dbSettings, $name, $value);
}
}

View File

@ -0,0 +1,8 @@
<?php
namespace danog\MadelineProto\Db;
interface DbType
{
static function getInstance(array $settings, string $name, $value): self;
}

View File

@ -0,0 +1,15 @@
<?php
namespace danog\MadelineProto\Db;
class MemoryArray extends DbArray
{
static function getInstance(array $settings, string $name, $value = []): DbArray
{
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
return new static($value);
}
}

View File

@ -0,0 +1,45 @@
<?php
namespace danog\MadelineProto\Db;
use Amp\Loop;
use Amp\Mysql\ConnectionConfig;
use Amp\Mysql\Pool;
use function Amp\Mysql\Pool;
class Mysql
{
/** @var Pool[] */
private static array $connections;
private static function connect(
string $host = '127.0.0.1',
int $port = 3306,
string $user = 'root',
string $password = '',
string $db = 'MadelineProto'
) {
$config = ConnectionConfig::fromString(
"host={$host} port={$port} user={$user} password={$password} db={$db}"
);
return Pool($config);
}
public static function getConnection(
string $host = '127.0.0.1',
int $port = 3306,
string $user = 'root',
string $password = '',
string $db = 'MadelineProto'
): Pool
{
$dbKey = "$host:$port:$db";
if (empty(static::$connections[$dbKey])) {
static::$connections[$dbKey] = static::connect($host, $port, $user, $password, $db);
}
return static::$connections[$dbKey];
}
}

View File

@ -0,0 +1,429 @@
<?php
namespace danog\MadelineProto\Db;
use Amp\Mysql\Pool;
use Amp\Sql\ResultSet;
use danog\MadelineProto\Tools;
use function Amp\call;
class MysqlArray extends DbArray
{
private string $table;
private array $settings;
private Pool $db;
private ?string $key = null;
public function __serialize(): array
{
return [
'table' => $this->table,
'key' => $this->key,
'settings' => $this->settings
];
}
public function __unserialize($data): void
{
foreach ($data as $property => $value) {
$this->{$property} = $value;
}
$this->initDbConnection();
}
public static function getInstance(array $settings, string $name, $value = []): DbType
{
$instance = new static();
$instance->table = $name;
$instance->settings = $settings['mysql'];
$instance->initDbConnection();
$instance->prepareTable();
if (!empty($value) && !$value instanceof static) {
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
foreach ((array) $value as $key => $item) {
$instance[$key] = $item;
}
}
return $instance;
}
/**
* Check if offset exists
*
* @link https://php.net/manual/en/arrayiterator.offsetexists.php
*
* @param string $index <p>
* The offset being checked.
* </p>
*
* @return bool true if the offset exists, otherwise false
* @throws \Throwable
*/
public function offsetExists($index)
{
$row = $this->syncRequest(
"SELECT count(`key`) as `count` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $index]
);
$row = reset($row);
return !empty($row['count']);
}
/**
* Get value for an offset
*
* @link https://php.net/manual/en/arrayiterator.offsetget.php
*
* @param string $index <p>
* The offset to get the value from.
* </p>
*
* @return mixed The value at offset <i>index</i>.
* @throws \Throwable
*/
public function offsetGet($index)
{
$row = $this->syncRequest(
"SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $index]
);
$row = reset($row);
if ($row) {
return unserialize($row['value']);
}
return null;
}
/**
* Set value for an offset
*
* @link https://php.net/manual/en/arrayiterator.offsetset.php
*
* @param string $index <p>
* The index to set for.
* </p>
* @param $value
*
* @return void
* @throws \Throwable
*/
public function offsetSet($index, $value)
{
$this->syncRequest("
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => serialize($value),
]
);
}
/**
* Unset value for an offset
*
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
*
* @param string $index <p>
* The offset to unset.
* </p>
*
* @return void
* @throws \Throwable
*/
public function offsetUnset($index)
{
$this->syncRequest("
DELETE FROM `{$this->table}`
WHERE `key` = :index
",
['index' => $index]
);
}
/**
* Append an element
* @link https://php.net/manual/en/arrayiterator.append.php
* @param mixed $value <p>
* The value to append.
* </p>
* @return void
*/
public function append($value)
{
throw new \BadMethodCallException('Append operation does not supported');
}
/**
* Get array copy
*
* @link https://php.net/manual/en/arrayiterator.getarraycopy.php
* @return array A copy of the array, or array of public properties
* if ArrayIterator refers to an object.
* @throws \Throwable
*/
public function getArrayCopy(): array
{
$rows = $this->syncRequest("SELECT `key`, `value` FROM {$this->table}");
$result = [];
foreach ($rows as $row) {
$result[$row['key']] = unserialize($row['value']);
}
return $result;
}
/**
* Count elements
*
* @link https://php.net/manual/en/arrayiterator.count.php
* @return int The number of elements or public properties in the associated
* array or object, respectively.
* @throws \Throwable
*/
public function count(): int
{
return $this->syncRequest("SELECT count(`key`) as `count` FROM {$this->table}")['count'] ?? 0;
}
/**
* Sort array by values
* @link https://php.net/manual/en/arrayiterator.asort.php
* @return void
*/
public function asort()
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* Sort array by keys
* @link https://php.net/manual/en/arrayiterator.ksort.php
* @return void
*/
public function ksort()
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* User defined sort
* @link https://php.net/manual/en/arrayiterator.uasort.php
* @param string $cmp_function <p>
* The compare function used for the sort.
* </p>
* @return void
*/
public function uasort($cmp_function)
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* User defined sort
* @link https://php.net/manual/en/arrayiterator.uksort.php
* @param string $cmp_function <p>
* The compare function used for the sort.
* </p>
* @return void
*/
public function uksort($cmp_function)
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* Sort an array naturally
* @link https://php.net/manual/en/arrayiterator.natsort.php
* @return void
*/
public function natsort()
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* Sort an array naturally, case insensitive
* @link https://php.net/manual/en/arrayiterator.natcasesort.php
* @return void
*/
public function natcasesort()
{
throw new \BadMethodCallException('Sort operation does not supported');
}
/**
* Rewind array back to the start
*
* @link https://php.net/manual/en/arrayiterator.rewind.php
* @return void
* @throws \Throwable
*/
public function rewind()
{
$this->key = null;
$this->key();
}
/**
* Return current array entry
*
* @link https://php.net/manual/en/arrayiterator.current.php
* @return mixed The current array entry.
* @throws \Throwable
*/
public function current()
{
return $this->offsetGet($this->key());
}
/**
* Return current array key
*
* @link https://php.net/manual/en/arrayiterator.key.php
* @return string|float|int|bool|null The current array key.
* @throws \Throwable
*/
public function key(): ?string
{
if ($this->key === null) {
$row = $this->syncRequest(
"SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1"
);
if ($row) {
$row = reset($row);
$this->key = $row['key'] ?? null;
}
}
return $this->key;
}
/**
* Move to next entry
*
* @link https://php.net/manual/en/arrayiterator.next.php
* @return void
* @throws \Throwable
*/
public function next() {
$row = $this->syncRequest(
"SELECT `key` FROM {$this->table} WHERE `key` > :key LIMIT 1",
['key' => $this->key()]
);
$row = reset($row);
$this->key = $row['key'] ?? null;
}
/**
* Check whether array contains more entries
*
* @link https://php.net/manual/en/arrayiterator.valid.php
* @return bool
* @throws \Throwable
*/
public function valid() {
if ($this->key() === null) {
return false;
}
$row = $this->syncRequest(
"SELECT `key` FROM {$this->table} WHERE `key` > :key LIMIT 1",
['key' => $this->key()]
);
return $row !== null;
}
/**
* Seek to position
* @link https://php.net/manual/en/arrayiterator.seek.php
* @param int $position <p>
* The position to seek to.
* </p>
* @return void
*/
public function seek($position)
{
$row = $this->syncRequest(
"SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1, :position",
['offset' => $position]
);
$row = reset($row);
if (isset($row['key'])) {
$this->key = $row['key'];
}
}
private function initDbConnection()
{
$this->db = Mysql::getConnection(
$this->settings['host'],
$this->settings['port'],
$this->settings['user'],
$this->settings['password'],
$this->settings['database'],
);
}
/**
* Create table for property
*
* @return array|null
* @throws \Throwable
*/
private function prepareTable()
{
return $this->syncRequest("
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` VARCHAR(255) NOT NULL,
`value` LONGTEXT NULL,
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`key`)
)
");
}
/**
* Perform blocking request to db
*
* @param string $query
* @param array $params
*
* @return array|null
* @throws \Throwable
*/
private function syncRequest(string $query, array $params = []): array
{
return Tools::wait(
call(
function() use($query, $params) {
$request = yield $this->db->execute($query, $params);
$result = [];
if ($request instanceof ResultSet) {
while (yield $request->advance()) {
$row = $request->getCurrent();
if (isset($row['key'])) {
$result[$row['key']] = $row;
} else {
$result[] = $row;
}
}
}
return $result;
}
)
);
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace danog\MadelineProto\Db;
class SharedMemoryArray extends DbArray
{
private static SharedMemoryArray $instance;
public static function getInstance(array $settings, string $name, $value = []): DbArray
{
if (empty(static::$instance)) {
static::$instance = new static($value);
} else {
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
$value = array_replace_recursive(static::$instance->getArrayCopy(), (array) $value);
foreach ($value as $key => $item) {
static::$instance[$key] = $item;
}
}
return static::$instance;
}
}

View File

@ -23,6 +23,12 @@ use Amp\Dns\Resolver;
use Amp\File\StatCache;
use Amp\Http\Client\HttpClient;
use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DbType;
use danog\MadelineProto\Db\Engines\DbInterface;
use danog\MadelineProto\Db\DbPropertiesFabric;
use danog\MadelineProto\Db\Mysql;
use danog\MadelineProto\Db\Types\ArrayType;
use danog\MadelineProto\Loop\Generic\PeriodicLoop;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
@ -85,7 +91,7 @@ class MTProto extends AsyncConstruct implements TLCallback
*
* @var int
*/
const V = 138;
const V = 139;
/**
* String release version.
*
@ -278,15 +284,15 @@ class MTProto extends AsyncConstruct implements TLCallback
/**
* Internal peer database.
*
* @var array
* @var DbArray
*/
public $chats = [];
public $chats;
/**
* Cached parameters for fetching channel participants.
*
* @var array
* @var DbArray
*/
public $channel_participants = [];
public $channel_participants;
/**
* When we last stored data in remote peer database (now doesn't exist anymore).
*
@ -302,9 +308,9 @@ class MTProto extends AsyncConstruct implements TLCallback
/**
* Full chat info database.
*
* @var array
* @var DbArray
*/
public $full_chats = [];
public $full_chats;
/**
* Latest chat message ID map for update handling.
*
@ -407,6 +413,18 @@ class MTProto extends AsyncConstruct implements TLCallback
* @var \danog\MadelineProto\TL\TL
*/
private $TL;
/**
* List of properties stored in database (memory or external)
* @see DbPropertiesFabric
* @var array
*/
private array $dbProperies = [
'chats' => 'array',
'full_chats' => 'array',
'channel_participants' => 'array'
];
/**
* Constructor function.
*
@ -540,6 +558,18 @@ class MTProto extends AsyncConstruct implements TLCallback
'reportDest'
];
}
public function initDb(bool $reset = false): void
{
foreach ($this->dbProperies as $property => $type) {
if ($reset) {
unset($this->{$property});
} else {
$this->{$property} = DbPropertiesFabric::get($this->settings['db'], $type, $property, $this->{$property});
}
}
}
/**
* Cleanup memory and session file.
*
@ -751,6 +781,9 @@ class MTProto extends AsyncConstruct implements TLCallback
}
$this->TL->init($this->settings['tl_schema']['src'], $callbacks);
}
$this->initDb();
}
/**
* Upgrade MadelineProto instance.
@ -777,9 +810,9 @@ class MTProto extends AsyncConstruct implements TLCallback
if (isset($settings['authorization']['rsa_key'])) {
unset($settings['authorization']['rsa_key']);
}
if (!isset($this->full_chats)) {
$this->full_chats = [];
}
$this->initDb();
if (!isset($this->secret_chats)) {
$this->secret_chats = [];
}
@ -799,6 +832,8 @@ class MTProto extends AsyncConstruct implements TLCallback
$chat['mtproto'] = 1;
}
}
unset($chat);
foreach ($settings['connection_settings'] as $key => &$connection) {
if (\in_array($key, ['default_dc', 'media_socket_count', 'robin_period'])) {
continue;
@ -821,6 +856,8 @@ class MTProto extends AsyncConstruct implements TLCallback
$connection['obfuscated'] = true;
}
}
unset($connection);
$this->resetMTProtoSession(true, true);
$this->config = ['expires' => -1];
$this->dh_config = ['version' => 0];
@ -1244,6 +1281,23 @@ class MTProto extends AsyncConstruct implements TLCallback
'run_callback' => true,
], 'secret_chats' => ['accept_chats' => true],
'serialization' => ['serialization_interval' => 30, 'cleanup_before_serialization' => false],
/**
* Where internal database will be stored?
* memory - session file
* sharedMemory - multiples instances share db if run in single process
* mysql - mysql database, shared by all instances in all processes.
*/
'db' => [
'type' => 'memory',
/** @see Mysql */
'mysql' => [
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => '',
'database' => 'MadelineProto'
]
],
'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [
'pwr' => false,
// Need info ?
@ -1469,13 +1523,13 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->authorization = null;
$this->updates = [];
$this->secret_chats = [];
$this->chats = [];
$this->users = [];
$this->initDb(true);
$this->tos = ['expires' => 0, 'accepted' => true];
$this->referenceDatabase = new ReferenceDatabase($this);
$this->minDatabase = new MinDatabase($this);
$this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0];
$this->full_chats = [];
}
/**
* Reset the update state and fetch all updates from the beginning.