Remove classes moved to sql-common
This commit is contained in:
parent
a87c1e65f4
commit
5c65ef09c4
@ -1,439 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql;
|
|
||||||
|
|
||||||
use Amp\CallableMaker;
|
|
||||||
use Amp\Deferred;
|
|
||||||
use Amp\Loop;
|
|
||||||
use Amp\Promise;
|
|
||||||
use function Amp\call;
|
|
||||||
use function Amp\coroutine;
|
|
||||||
|
|
||||||
abstract class AbstractPool implements Pool
|
|
||||||
{
|
|
||||||
use CallableMaker;
|
|
||||||
|
|
||||||
const DEFAULT_MAX_CONNECTIONS = 100;
|
|
||||||
const DEFAULT_IDLE_TIMEOUT = 60;
|
|
||||||
|
|
||||||
/** @var Connector */
|
|
||||||
private $connector;
|
|
||||||
|
|
||||||
/** @var ConnectionConfig */
|
|
||||||
private $connectionConfig;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $maxConnections;
|
|
||||||
|
|
||||||
/** @var \SplQueue */
|
|
||||||
private $idle;
|
|
||||||
|
|
||||||
/** @var \SplObjectStorage */
|
|
||||||
private $connections;
|
|
||||||
|
|
||||||
/** @var Promise|null */
|
|
||||||
private $promise;
|
|
||||||
|
|
||||||
/** @var Deferred|null */
|
|
||||||
private $deferred;
|
|
||||||
|
|
||||||
/** @var callable */
|
|
||||||
private $prepare;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $pending = 0;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $idleTimeout;
|
|
||||||
|
|
||||||
/** @var string */
|
|
||||||
private $timeoutWatcher;
|
|
||||||
|
|
||||||
/** @var bool */
|
|
||||||
private $closed = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a default connector object based on the library of the extending class.
|
|
||||||
*
|
|
||||||
* @return Connector
|
|
||||||
*/
|
|
||||||
abstract protected function createDefaultConnector(): Connector;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Link object and the
|
|
||||||
* given release callable.
|
|
||||||
*
|
|
||||||
* @param ResultSet $resultSet
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return ResultSet
|
|
||||||
*/
|
|
||||||
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a Statement of the appropriate type using the Statement object returned by the Link object and the
|
|
||||||
* given release callable.
|
|
||||||
*
|
|
||||||
* @param Statement $statement
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return Statement
|
|
||||||
*/
|
|
||||||
abstract protected function createStatement(Statement $statement, callable $release): Statement;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param Pool $pool
|
|
||||||
* @param Statement $statement
|
|
||||||
* @param callable $prepare
|
|
||||||
*
|
|
||||||
* @return StatementPool
|
|
||||||
*/
|
|
||||||
abstract protected function createStatementPool(Pool $pool, Statement $statement, callable $prepare): StatementPool;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a Transaction of the appropriate type using the Transaction object returned by the Link object and the
|
|
||||||
* given release callable.
|
|
||||||
*
|
|
||||||
* @param Transaction $transaction
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return Transaction
|
|
||||||
*/
|
|
||||||
abstract protected function createTransaction(Transaction $transaction, callable $release): Transaction;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param ConnectionConfig $config
|
|
||||||
* @param int $maxConnections Maximum number of active connections in the pool.
|
|
||||||
* @param int $idleTimeout Number of seconds until idle connections are removed from the pool.
|
|
||||||
* @param Connector|null $connector
|
|
||||||
*/
|
|
||||||
public function __construct(
|
|
||||||
ConnectionConfig $config,
|
|
||||||
int $maxConnections = self::DEFAULT_MAX_CONNECTIONS,
|
|
||||||
int $idleTimeout = self::DEFAULT_IDLE_TIMEOUT,
|
|
||||||
Connector $connector = null
|
|
||||||
) {
|
|
||||||
$this->connector = $connector ?? $this->createDefaultConnector();
|
|
||||||
|
|
||||||
$this->connectionConfig = $config;
|
|
||||||
|
|
||||||
$this->idleTimeout = $idleTimeout;
|
|
||||||
if ($this->idleTimeout < 1) {
|
|
||||||
throw new \Error("The idle timeout must be 1 or greater");
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->maxConnections = $maxConnections;
|
|
||||||
if ($this->maxConnections < 1) {
|
|
||||||
throw new \Error("Pool must contain at least one connection");
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->connections = $connections = new \SplObjectStorage;
|
|
||||||
$this->idle = $idle = new \SplQueue;
|
|
||||||
$this->prepare = coroutine($this->callableFromInstanceMethod("prepareStatement"));
|
|
||||||
|
|
||||||
$idleTimeout = &$this->idleTimeout;
|
|
||||||
|
|
||||||
$this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) {
|
|
||||||
$now = \time();
|
|
||||||
while (!$idle->isEmpty()) {
|
|
||||||
$connection = $idle->bottom();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
if ($connection->getLastUsedAt() + $idleTimeout > $now) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close connection and remove it from the pool.
|
|
||||||
$idle->shift();
|
|
||||||
$connections->detach($connection);
|
|
||||||
$connection->close();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Loop::unreference($this->timeoutWatcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __destruct()
|
|
||||||
{
|
|
||||||
Loop::cancel($this->timeoutWatcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getIdleTimeout(): int
|
|
||||||
{
|
|
||||||
return $this->idleTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getLastUsedAt(): int
|
|
||||||
{
|
|
||||||
// Simple implementation... can be improved if needed.
|
|
||||||
|
|
||||||
$time = 0;
|
|
||||||
|
|
||||||
foreach ($this->connections as $connection) {
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
if (($lastUsedAt = $connection->getLastUsedAt()) > $time) {
|
|
||||||
$time = $lastUsedAt;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return $time;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
public function isAlive(): bool
|
|
||||||
{
|
|
||||||
return !$this->closed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close all connections in the pool. No further queries may be made after a pool is closed.
|
|
||||||
*/
|
|
||||||
public function close()
|
|
||||||
{
|
|
||||||
$this->closed = true;
|
|
||||||
foreach ($this->connections as $connection) {
|
|
||||||
$connection->close();
|
|
||||||
}
|
|
||||||
$this->idle = new \SplQueue;
|
|
||||||
$this->prepare = null;
|
|
||||||
|
|
||||||
if ($this->deferred instanceof Deferred) {
|
|
||||||
$deferred = $this->deferred;
|
|
||||||
$this->deferred = null;
|
|
||||||
$deferred->fail(new FailureException("Connection pool closed"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function extractConnection(): Promise
|
|
||||||
{
|
|
||||||
return call(function () {
|
|
||||||
$connection = yield from $this->pop();
|
|
||||||
$this->connections->detach($connection);
|
|
||||||
return $connection;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function getConnectionCount(): int
|
|
||||||
{
|
|
||||||
return $this->connections->count();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function getIdleConnectionCount(): int
|
|
||||||
{
|
|
||||||
return $this->idle->count();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function getConnectionLimit(): int
|
|
||||||
{
|
|
||||||
return $this->maxConnections;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return \Generator
|
|
||||||
*
|
|
||||||
* @resolve Link
|
|
||||||
*
|
|
||||||
* @throws FailureException If creating a new connection fails.
|
|
||||||
* @throws \Error If the pool has been closed.
|
|
||||||
*/
|
|
||||||
protected function pop(): \Generator
|
|
||||||
{
|
|
||||||
if ($this->closed) {
|
|
||||||
throw new \Error("The pool has been closed");
|
|
||||||
}
|
|
||||||
|
|
||||||
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getConnectionLimit()) {
|
|
||||||
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
|
|
||||||
}
|
|
||||||
|
|
||||||
do {
|
|
||||||
// While loop to ensure an idle connection is available after promises below are resolved.
|
|
||||||
while ($this->idle->isEmpty()) {
|
|
||||||
if ($this->connections->count() + $this->pending < $this->getConnectionLimit()) {
|
|
||||||
// Max connection count has not been reached, so open another connection.
|
|
||||||
++$this->pending;
|
|
||||||
try {
|
|
||||||
$connection = yield $this->connector->connect($this->connectionConfig);
|
|
||||||
if (!$connection instanceof Link) {
|
|
||||||
throw new \Error(\sprintf(
|
|
||||||
"%s::connect() must resolve to an instance of %s",
|
|
||||||
\get_class($this->connector),
|
|
||||||
Link::class
|
|
||||||
));
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
--$this->pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->connections->attach($connection);
|
|
||||||
return $connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
// All possible connections busy, so wait until one becomes available.
|
|
||||||
try {
|
|
||||||
$this->deferred = new Deferred;
|
|
||||||
// May be resolved with defunct connection, but that connection will not be added to $this->idle.
|
|
||||||
yield $this->promise = $this->deferred->promise();
|
|
||||||
} finally {
|
|
||||||
$this->deferred = null;
|
|
||||||
$this->promise = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$connection = $this->idle->shift();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
if ($connection->isAlive()) {
|
|
||||||
return $connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->connections->detach($connection);
|
|
||||||
} while (!$this->closed);
|
|
||||||
|
|
||||||
throw new FailureException("Pool closed before an active connection could be obtained");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param Link $connection
|
|
||||||
*
|
|
||||||
* @throws \Error If the connection is not part of this pool.
|
|
||||||
*/
|
|
||||||
protected function push(Link $connection)
|
|
||||||
{
|
|
||||||
\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');
|
|
||||||
|
|
||||||
if ($connection->isAlive()) {
|
|
||||||
$this->idle->push($connection);
|
|
||||||
} else {
|
|
||||||
$this->connections->detach($connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->deferred instanceof Deferred) {
|
|
||||||
$this->deferred->resolve($connection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function query(string $sql): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($sql) {
|
|
||||||
$connection = yield from $this->pop();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$result = yield $connection->query($sql);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->push($connection);
|
|
||||||
throw $exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
$result = $this->createResultSet($result, function () use ($connection) {
|
|
||||||
$this->push($connection);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
$this->push($connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function execute(string $sql, array $params = []): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($sql, $params) {
|
|
||||||
$connection = yield from $this->pop();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$result = yield $connection->execute($sql, $params);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->push($connection);
|
|
||||||
throw $exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
$result = $this->createResultSet($result, function () use ($connection) {
|
|
||||||
$this->push($connection);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
$this->push($connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*
|
|
||||||
* Prepared statements returned by this method will stay alive as long as the pool remains open.
|
|
||||||
*/
|
|
||||||
public function prepare(string $sql): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($sql) {
|
|
||||||
$statement = yield from $this->prepareStatement($sql);
|
|
||||||
return $this->createStatementPool($this, $statement, $this->prepare);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private function prepareStatement(string $sql): \Generator
|
|
||||||
{
|
|
||||||
$connection = yield from $this->pop();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$statement = yield $connection->prepare($sql);
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->push($connection);
|
|
||||||
throw $exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->createStatement($statement, function () use ($connection) {
|
|
||||||
$this->push($connection);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function beginTransaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($isolation) {
|
|
||||||
$connection = yield from $this->pop();
|
|
||||||
\assert($connection instanceof Link);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$transaction = yield $connection->beginTransaction($isolation);
|
|
||||||
\assert($transaction instanceof Transaction);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->push($connection);
|
|
||||||
throw $exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->createTransaction($transaction, function () use ($connection) {
|
|
||||||
$this->push($connection);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,35 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql;
|
|
||||||
|
|
||||||
use Amp\Promise;
|
|
||||||
|
|
||||||
class PooledResultSet implements ResultSet
|
|
||||||
{
|
|
||||||
/** @var ResultSet */
|
|
||||||
private $result;
|
|
||||||
|
|
||||||
/** @var callable */
|
|
||||||
private $release;
|
|
||||||
|
|
||||||
public function __construct(ResultSet $result, callable $release)
|
|
||||||
{
|
|
||||||
$this->result = $result;
|
|
||||||
$this->release = $release;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __destruct()
|
|
||||||
{
|
|
||||||
($this->release)();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function advance(): Promise
|
|
||||||
{
|
|
||||||
return $this->result->advance();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getCurrent(int $type = self::FETCH_ASSOC)
|
|
||||||
{
|
|
||||||
return $this->result->getCurrent($type);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,81 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql;
|
|
||||||
|
|
||||||
use Amp\Promise;
|
|
||||||
use function Amp\call;
|
|
||||||
|
|
||||||
abstract class PooledStatement implements Statement
|
|
||||||
{
|
|
||||||
/** @var Statement */
|
|
||||||
private $statement;
|
|
||||||
|
|
||||||
/** @var callable|null */
|
|
||||||
private $release;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $refCount = 1;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Statement object and
|
|
||||||
* the given release callable.
|
|
||||||
*
|
|
||||||
* @param ResultSet $resultSet
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return ResultSet
|
|
||||||
*/
|
|
||||||
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
|
|
||||||
|
|
||||||
public function __construct(Statement $statement, callable $release)
|
|
||||||
{
|
|
||||||
$this->statement = $statement;
|
|
||||||
|
|
||||||
if (!$this->statement->isAlive()) {
|
|
||||||
$release();
|
|
||||||
} else {
|
|
||||||
$refCount = &$this->refCount;
|
|
||||||
$this->release = static function () use (&$refCount, $release) {
|
|
||||||
if (--$refCount === 0) {
|
|
||||||
$release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __destruct()
|
|
||||||
{
|
|
||||||
if ($this->release) {
|
|
||||||
($this->release)();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function execute(array $params = []): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($params) {
|
|
||||||
$result = yield $this->statement->execute($params);
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
++$this->refCount;
|
|
||||||
return $this->createResultSet($result, $this->release);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function isAlive(): bool
|
|
||||||
{
|
|
||||||
return $this->statement->isAlive();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getQuery(): string
|
|
||||||
{
|
|
||||||
return $this->statement->getQuery();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getLastUsedAt(): int
|
|
||||||
{
|
|
||||||
return $this->statement->getLastUsedAt();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,215 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql;
|
|
||||||
|
|
||||||
use Amp\Promise;
|
|
||||||
use function Amp\call;
|
|
||||||
|
|
||||||
abstract class PooledTransaction implements Transaction
|
|
||||||
{
|
|
||||||
/** @var Transaction|null */
|
|
||||||
private $transaction;
|
|
||||||
|
|
||||||
/** @var callable|null */
|
|
||||||
private $release;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $refCount = 1;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a Statement of the appropriate type using the Statement object returned by the Transaction object and
|
|
||||||
* the given release callable.
|
|
||||||
*
|
|
||||||
* @param Statement $statement
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return Statement
|
|
||||||
*/
|
|
||||||
abstract protected function createStatement(Statement $statement, callable $release): Statement;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Transaction object and
|
|
||||||
* the given release callable.
|
|
||||||
*
|
|
||||||
* @param ResultSet $resultSet
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return ResultSet
|
|
||||||
*/
|
|
||||||
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* PooledTransaction constructor.
|
|
||||||
*
|
|
||||||
* @param Transaction $transaction
|
|
||||||
* @param callable $release
|
|
||||||
*/
|
|
||||||
public function __construct(Transaction $transaction, callable $release)
|
|
||||||
{
|
|
||||||
$this->transaction = $transaction;
|
|
||||||
$this->release = $release;
|
|
||||||
|
|
||||||
if (!$this->transaction->isActive()) {
|
|
||||||
$release();
|
|
||||||
$this->transaction = null;
|
|
||||||
} else {
|
|
||||||
$refCount = &$this->refCount;
|
|
||||||
$this->release = static function () use (&$refCount, $release) {
|
|
||||||
if (--$refCount === 0) {
|
|
||||||
$release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __destruct()
|
|
||||||
{
|
|
||||||
if ($this->transaction && $this->transaction->isActive()) {
|
|
||||||
$this->close(); // Invokes $this->release callback.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function query(string $sql): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($sql) {
|
|
||||||
$result = yield $this->transaction->query($sql);
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
++$this->refCount;
|
|
||||||
return $this->createResultSet($result, $this->release);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function prepare(string $sql): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($sql) {
|
|
||||||
$statement = yield $this->transaction->prepare($sql);
|
|
||||||
++$this->refCount;
|
|
||||||
return $this->createStatement($statement, $this->release);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function execute(string $sql, array $params = []): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return call(function () use ($sql, $params) {
|
|
||||||
$result = yield $this->transaction->execute($sql, $params);
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
++$this->refCount;
|
|
||||||
return $this->createResultSet($result, $this->release);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function isAlive(): bool
|
|
||||||
{
|
|
||||||
return $this->transaction && $this->transaction->isAlive();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getLastUsedAt(): int
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->transaction->getLastUsedAt();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function close()
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$promise = $this->transaction->commit();
|
|
||||||
$promise->onResolve($this->release);
|
|
||||||
|
|
||||||
$this->transaction = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getIsolationLevel(): int
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->transaction->getIsolationLevel();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function isActive(): bool
|
|
||||||
{
|
|
||||||
return $this->transaction && $this->transaction->isActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function commit(): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
$promise = $this->transaction->commit();
|
|
||||||
$promise->onResolve($this->release);
|
|
||||||
|
|
||||||
$this->transaction = null;
|
|
||||||
|
|
||||||
return $promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function rollback(): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
$promise = $this->transaction->rollback();
|
|
||||||
$promise->onResolve($this->release);
|
|
||||||
|
|
||||||
$this->transaction = null;
|
|
||||||
|
|
||||||
return $promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function createSavepoint(string $identifier): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->transaction->createSavepoint($identifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function rollbackTo(string $identifier): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->transaction->rollbackTo($identifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function releaseSavepoint(string $identifier): Promise
|
|
||||||
{
|
|
||||||
if (!$this->transaction) {
|
|
||||||
throw new TransactionError("The transaction has been committed or rolled back");
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->transaction->releaseSavepoint($identifier);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,179 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql;
|
|
||||||
|
|
||||||
use Amp\Loop;
|
|
||||||
use Amp\Promise;
|
|
||||||
use function Amp\call;
|
|
||||||
|
|
||||||
abstract class StatementPool implements Statement
|
|
||||||
{
|
|
||||||
/** @var \Amp\Sql\Pool */
|
|
||||||
private $pool;
|
|
||||||
|
|
||||||
/** @var \SplQueue */
|
|
||||||
private $statements;
|
|
||||||
|
|
||||||
/** @var string */
|
|
||||||
private $sql;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
private $lastUsedAt;
|
|
||||||
|
|
||||||
/** @var string */
|
|
||||||
private $timeoutWatcher;
|
|
||||||
|
|
||||||
/** @var callable */
|
|
||||||
private $prepare;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs any necessary actions to the statement to prepare it for execution, returning a promise for the same or
|
|
||||||
* a new Statement object if necessary.
|
|
||||||
*
|
|
||||||
* @param Statement $statement
|
|
||||||
*
|
|
||||||
* @return Promise<Statement>
|
|
||||||
*/
|
|
||||||
abstract protected function prepare(Statement $statement): Promise;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param ResultSet $resultSet
|
|
||||||
* @param callable $release
|
|
||||||
*
|
|
||||||
* @return ResultSet
|
|
||||||
*/
|
|
||||||
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param Pool $pool Pool used to re-create the statement if the original closes.
|
|
||||||
* @param Statement $statement Original prepared statement returned from the Link.
|
|
||||||
* @param callable $prepare Callable that returns a new prepared statement.
|
|
||||||
*/
|
|
||||||
public function __construct(Pool $pool, Statement $statement, callable $prepare)
|
|
||||||
{
|
|
||||||
$this->lastUsedAt = \time();
|
|
||||||
$this->statements = $statements = new \SplQueue;
|
|
||||||
$this->pool = $pool;
|
|
||||||
$this->prepare = $prepare;
|
|
||||||
$this->sql = $statement->getQuery();
|
|
||||||
|
|
||||||
$this->statements->push($statement);
|
|
||||||
|
|
||||||
$this->timeoutWatcher = Loop::repeat(1000, static function () use ($pool, $statements) {
|
|
||||||
$now = \time();
|
|
||||||
$idleTimeout = ((int) ($pool->getIdleTimeout() / 10)) ?: 1;
|
|
||||||
|
|
||||||
while (!$statements->isEmpty()) {
|
|
||||||
$statement = $statements->bottom();
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
|
|
||||||
if ($statement->getLastUsedAt() + $idleTimeout > $now) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$statements->shift();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Loop::unreference($this->timeoutWatcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __destruct()
|
|
||||||
{
|
|
||||||
Loop::cancel($this->timeoutWatcher);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*
|
|
||||||
* Unlike regular statements, as long as the pool is open this statement will not die.
|
|
||||||
*/
|
|
||||||
public function execute(array $params = []): Promise
|
|
||||||
{
|
|
||||||
$this->lastUsedAt = \time();
|
|
||||||
|
|
||||||
return call(function () use ($params) {
|
|
||||||
$statement = yield from $this->pop();
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
|
|
||||||
try {
|
|
||||||
$statement = yield $this->prepare($statement);
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
$result = yield $statement->execute($params);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->push($statement);
|
|
||||||
throw $exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($result instanceof ResultSet) {
|
|
||||||
$result = $this->createResultSet($result, function () use ($statement) {
|
|
||||||
$this->push($statement);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
$this->push($statement);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Only retains statements if less than 10% of the pool is consumed by this statement and the pool has
|
|
||||||
* available connections.
|
|
||||||
*
|
|
||||||
* @param Statement $statement
|
|
||||||
*/
|
|
||||||
protected function push(Statement $statement)
|
|
||||||
{
|
|
||||||
$maxConnections = $this->pool->getConnectionLimit();
|
|
||||||
|
|
||||||
if ($this->statements->count() > ($maxConnections / 10)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($maxConnections === $this->pool->getConnectionCount() && $this->pool->getIdleConnectionCount() === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->statements->push($statement);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Coroutine returning a Statement object from the pool or creating a new Statement.
|
|
||||||
*
|
|
||||||
* @return \Generator
|
|
||||||
*/
|
|
||||||
protected function pop(): \Generator
|
|
||||||
{
|
|
||||||
while (!$this->statements->isEmpty()) {
|
|
||||||
$statement = $this->statements->shift();
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
|
|
||||||
if ($statement->isAlive()) {
|
|
||||||
return $statement;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$statement = yield ($this->prepare)($this->sql);
|
|
||||||
\assert($statement instanceof Statement);
|
|
||||||
return $statement;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** {@inheritdoc} */
|
|
||||||
public function isAlive(): bool
|
|
||||||
{
|
|
||||||
return $this->pool->isAlive();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** {@inheritdoc} */
|
|
||||||
public function getQuery(): string
|
|
||||||
{
|
|
||||||
return $this->sql;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** {@inheritdoc} */
|
|
||||||
public function getLastUsedAt(): int
|
|
||||||
{
|
|
||||||
return $this->lastUsedAt;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,76 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql\Test;
|
|
||||||
|
|
||||||
use Amp\Delayed;
|
|
||||||
use Amp\Loop;
|
|
||||||
use Amp\Promise;
|
|
||||||
use Amp\Sql\AbstractPool;
|
|
||||||
use Amp\Sql\ConnectionConfig;
|
|
||||||
use Amp\Sql\Connector;
|
|
||||||
use Amp\Sql\Link;
|
|
||||||
use Amp\Success;
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
|
|
||||||
class AbstractPoolTest extends TestCase
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* @expectedException \Error
|
|
||||||
* @expectedExceptionMessage Pool must contain at least one connection
|
|
||||||
*/
|
|
||||||
public function testInvalidMaxConnections()
|
|
||||||
{
|
|
||||||
$mock = $this->getMockBuilder(AbstractPool::class)
|
|
||||||
->setConstructorArgs([$this->createMock(ConnectionConfig::class), 0])
|
|
||||||
->getMock();
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testIdleConnectionsRemovedAfterTimeout()
|
|
||||||
{
|
|
||||||
Loop::run(function () {
|
|
||||||
$now = \time();
|
|
||||||
|
|
||||||
$connector = $this->createMock(Connector::class);
|
|
||||||
$connector->method('connect')
|
|
||||||
->willReturnCallback(function () use ($now): Promise {
|
|
||||||
$link = $this->createMock(Link::class);
|
|
||||||
$link->method('getLastUsedAt')
|
|
||||||
->willReturn($now);
|
|
||||||
|
|
||||||
$link->method('isAlive')
|
|
||||||
->willReturn(true);
|
|
||||||
|
|
||||||
$link->method('query')
|
|
||||||
->willReturnCallback(function () {
|
|
||||||
return new Delayed(100);
|
|
||||||
});
|
|
||||||
|
|
||||||
return new Success($link);
|
|
||||||
});
|
|
||||||
|
|
||||||
/** @var AbstractPool $pool */
|
|
||||||
$pool = $this->getMockBuilder(AbstractPool::class)
|
|
||||||
->setConstructorArgs([$this->createMock(ConnectionConfig::class), 100, 2, $connector])
|
|
||||||
->getMockForAbstractClass();
|
|
||||||
|
|
||||||
$count = 3;
|
|
||||||
|
|
||||||
$promises = [];
|
|
||||||
for ($i = 0; $i < $count; ++$i) {
|
|
||||||
$promises[] = $pool->query("SELECT $i");
|
|
||||||
}
|
|
||||||
|
|
||||||
$results = yield $promises;
|
|
||||||
|
|
||||||
$this->assertSame($count, $pool->getConnectionCount());
|
|
||||||
|
|
||||||
yield new Delayed(1000);
|
|
||||||
|
|
||||||
$this->assertSame($count, $pool->getConnectionCount());
|
|
||||||
|
|
||||||
yield new Delayed(1000);
|
|
||||||
|
|
||||||
$this->assertSame(0, $pool->getConnectionCount());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,98 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Sql\Test;
|
|
||||||
|
|
||||||
use Amp\Delayed;
|
|
||||||
use Amp\Loop;
|
|
||||||
use Amp\PHPUnit\TestCase;
|
|
||||||
use Amp\Sql\Pool;
|
|
||||||
use Amp\Sql\Statement;
|
|
||||||
use Amp\Sql\StatementPool;
|
|
||||||
use Amp\Success;
|
|
||||||
|
|
||||||
class StatementPoolTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testActiveStatementsRemainAfterTimeout()
|
|
||||||
{
|
|
||||||
Loop::run(function () {
|
|
||||||
$pool = $this->createMock(Pool::class);
|
|
||||||
$pool->method('isAlive')
|
|
||||||
->willReturn(true);
|
|
||||||
$pool->method('getIdleTimeout')
|
|
||||||
->willReturn(60);
|
|
||||||
|
|
||||||
$statement = $this->createMock(Statement::class);
|
|
||||||
$statement->method('isAlive')
|
|
||||||
->willReturn(true);
|
|
||||||
$statement->method('getQuery')
|
|
||||||
->willReturn('SELECT 1');
|
|
||||||
$statement->method('getLastUsedAt')
|
|
||||||
->willReturn(\time());
|
|
||||||
$statement->expects($this->once())
|
|
||||||
->method('execute');
|
|
||||||
|
|
||||||
/** @var StatementPool $statementPool */
|
|
||||||
$statementPool = $this->getMockBuilder(StatementPool::class)
|
|
||||||
->setConstructorArgs([$pool, $statement, $this->createCallback(0)])
|
|
||||||
->getMockForAbstractClass();
|
|
||||||
|
|
||||||
$statementPool->method('prepare')
|
|
||||||
->willReturnCallback(function (Statement $statement) {
|
|
||||||
return new Success($statement);
|
|
||||||
});
|
|
||||||
|
|
||||||
$this->assertTrue($statementPool->isAlive());
|
|
||||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
|
||||||
|
|
||||||
yield new Delayed(1500); // Give timeout watcher enough time to execute.
|
|
||||||
|
|
||||||
$statementPool->execute();
|
|
||||||
|
|
||||||
$this->assertTrue($statementPool->isAlive());
|
|
||||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testIdleStatementsRemovedAfterTimeout()
|
|
||||||
{
|
|
||||||
Loop::run(function () {
|
|
||||||
$pool = $this->createMock(Pool::class);
|
|
||||||
$pool->method('isAlive')
|
|
||||||
->willReturn(true);
|
|
||||||
$pool->method('getIdleTimeout')
|
|
||||||
->willReturn(1);
|
|
||||||
|
|
||||||
$statement = $this->createMock(Statement::class);
|
|
||||||
$statement->method('isAlive')
|
|
||||||
->willReturn(true);
|
|
||||||
$statement->method('getQuery')
|
|
||||||
->willReturn('SELECT 1');
|
|
||||||
$statement->method('getLastUsedAt')
|
|
||||||
->willReturn(\time());
|
|
||||||
$statement->expects($this->once())
|
|
||||||
->method('execute');
|
|
||||||
|
|
||||||
/** @var StatementPool $statementPool */
|
|
||||||
$statementPool = $this->getMockBuilder(StatementPool::class)
|
|
||||||
->setConstructorArgs([$pool, $statement, $this->createCallback(1)])
|
|
||||||
->getMockForAbstractClass();
|
|
||||||
|
|
||||||
$statementPool->method('prepare')
|
|
||||||
->willReturnCallback(function (Statement $statement) {
|
|
||||||
return new Success($statement);
|
|
||||||
});
|
|
||||||
|
|
||||||
$this->assertTrue($statementPool->isAlive());
|
|
||||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
|
||||||
|
|
||||||
$statementPool->execute();
|
|
||||||
|
|
||||||
yield new Delayed(1500); // Give timeout watcher enough time to execute.
|
|
||||||
|
|
||||||
$statementPool->execute();
|
|
||||||
|
|
||||||
$this->assertTrue($statementPool->isAlive());
|
|
||||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user