From 15336ca7fc0463fe6afdd6d8a6d017e0bf855881 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 29 Jun 2018 11:13:09 -0500 Subject: [PATCH] Proper AbstractPool and add PooledStatement --- src/AbstractPool.php | 486 ++++++++++++++++++++++++---------------- src/PooledStatement.php | 145 ++++++++++++ 2 files changed, 436 insertions(+), 195 deletions(-) create mode 100644 src/PooledStatement.php diff --git a/src/AbstractPool.php b/src/AbstractPool.php index b8887d9..4e4861d 100644 --- a/src/AbstractPool.php +++ b/src/AbstractPool.php @@ -2,72 +2,70 @@ namespace Amp\Sql; -use function Amp\call; use Amp\CallableMaker; -use function Amp\coroutine; use Amp\Deferred; use Amp\Loop; use Amp\Promise; +use function Amp\call; +use function Amp\coroutine; abstract class AbstractPool implements Pool { use CallableMaker; /** @var Connector */ - protected $connector; + private $connector; /** @var ConnectionConfig */ - protected $config; + private $connectionConfig; /** @var int */ - protected $maxConnections; + private $maxConnections; /** @var \SplQueue */ - protected $idle; + private $idle; /** @var \SplObjectStorage */ - protected $connections; + private $connections; - /** @var bool */ - protected $closed = false; + /** @var \Amp\Promise|null */ + private $promise; - /** @var string */ - protected $timeoutWatcher; - - /** @var int */ - protected $idleTimeout = Pool::DEFAULT_IDLE_TIMEOUT; + /** @var \Amp\Deferred|null */ + private $deferred; /** @var callable */ - protected $prepare; + private $prepare; /** @var int */ - protected $lastUsedAt; + private $pending = 0; - /** @var Promise|null */ - protected $promise; + /** @var int */ + private $idleTimeout = self::DEFAULT_IDLE_TIMEOUT; - /** @var Deferred|null */ - protected $deferred; + /** @var string */ + private $timeoutWatcher; - /** @var int Number of pending connections. */ - protected $pending = 0; + /** @var bool */ + private $closed = false; /** - * @param ConnectionConfig $config - * @param int $maxConnections - * @param Connector $connector + * Create a default connector object based on the library of the extending class. * - * @throws \Error If $maxConnections is less than 1. + * @return Connector */ + abstract protected function createDefaultConnector(): Connector; + public function __construct( ConnectionConfig $config, - int $maxConnections = Pool::DEFAULT_MAX_CONNECTIONS, + int $maxConnections = self::DEFAULT_MAX_CONNECTIONS, Connector $connector = null ) { - $this->connector = $connector ?? $this->defaultConnector(); - $this->config = $config; - $this->maxConnections = $maxConnections; + $this->connector = $connector ?? $this->createDefaultConnector(); + $this->connectionConfig = $config; + + $this->maxConnections = $maxConnections; if ($this->maxConnections < 1) { throw new \Error("Pool must contain at least one connection"); } @@ -81,8 +79,8 @@ abstract class AbstractPool implements Pool $this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) { $now = \time(); while (!$idle->isEmpty()) { - /** @var Connection $connection */ $connection = $idle->bottom(); + \assert($connection instanceof Link); if ($connection->lastUsedAt() + $idleTimeout > $now) { return; @@ -96,8 +94,6 @@ abstract class AbstractPool implements Pool }); Loop::unreference($this->timeoutWatcher); - - $this->lastUsedAt = \time(); } public function __destruct() @@ -105,143 +101,6 @@ abstract class AbstractPool implements Pool Loop::cancel($this->timeoutWatcher); } - public function query(string $sql): Promise - { - return call(function () use ($sql) { - /** @var Connection $connection */ - $connection = yield from $this->pop(); - - try { - $result = yield $connection->query($sql); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof Operation) { - $result->onDestruct(function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - $this->lastUsedAt = \time(); - - return $result; - }); - } - - public function prepare(string $sql): Promise - { - return call(function () use ($sql) { - $statement = yield from $this->doPrepare($sql); - - $this->lastUsedAt = \time(); - - return $this->newPooledStatement($this, $statement, $this->prepare); - }); - } - - public function execute(string $sql, array $params = []): Promise - { - return call(function () use ($sql, $params) { - /** @var Connection $connection */ - $connection = yield from $this->pop(); - - try { - $result = yield $connection->execute($sql, $params); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof Operation) { - $result->onDestruct(function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - $this->lastUsedAt = \time(); - - return $result; - }); - } - - public function isAlive(): bool - { - return !$this->closed; - } - - public function close() - { - // TODO: Implement close() method. - } - - public function lastUsedAt(): int - { - return $this->lastUsedAt; - } - - public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise - { - return call(function () use ($isolation) { - /** @var Connection $connection */ - $connection = yield from $this->pop(); - - try { - /** @var Transaction $transaction */ - $transaction = yield $connection->transaction($isolation); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - $transaction->onDestruct(function () use ($connection) { - $this->push($connection); - }); - - $this->lastUsedAt = \time(); - - return $transaction; - }); - } - - /** - * Extracts an idle connection from the pool. The connection is completely removed from the pool and cannot be - * put back into the pool. Useful for operations where connection state must be changed. - * - * @return Promise - */ - public function extractConnection(): Promise - { - return call(function () { - $connection = yield from $this->pop(); - $this->connections->detach($connection); - - $this->lastUsedAt = \time(); - - return $connection; - }); - } - - public function getConnectionCount(): int - { - return $this->connections->count(); - } - - public function getIdleConnectionCount(): int - { - return $this->idle->count(); - } - - public function getMaxConnections(): int - { - return $this->maxConnections; - } - public function getIdleTimeout(): int { return $this->idleTimeout; @@ -256,38 +115,275 @@ abstract class AbstractPool implements Pool $this->idleTimeout = $timeout; } - protected function doPrepare(string $sql): \Generator + public function lastUsedAt(): int { - /** @var Connection $connection */ - $connection = yield from $this->pop(); + // Simple implementation... can be improved if needed. - try { - /** @var Statement $statement */ - $statement = yield $connection->prepare($sql); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; + $time = 0; + + foreach ($this->connections as $connection) { + \assert($connection instanceof Link); + if (($lastUsedAt = $connection->lastUsedAt()) > $time) { + $time = $lastUsedAt; + } } - \assert( - $statement instanceof Operation, - Statement::class . " instances returned from connections must implement " . Operation::class - ); - - $statement->onDestruct(function () use ($connection) { - $this->push($connection); - }); - - $this->lastUsedAt = \time(); - - return $statement; + return $time; } - abstract protected function pop(): \Generator; + /** + * @return bool + */ + public function isAlive(): bool + { + return !$this->closed; + } - abstract protected function push(Connection $connection); + /** + * Close all connections in the pool. No further queries may be made after a pool is closed. + */ + public function close() + { + $this->closed = true; + foreach ($this->connections as $connection) { + $connection->close(); + } + $this->idle = new \SplQueue; + $this->connections = new \SplObjectStorage; + $this->prepare = null; + } - abstract protected function defaultConnector(): Connector; + /** + * {@inheritdoc} + */ + public function extractConnection(): Promise + { + return call(function () { + $connection = yield from $this->pop(); + $this->connections->detach($connection); + return $connection; + }); + } - abstract protected function newPooledStatement(Pool $pool, Statement $statement, callable $prepare): Statement; + /** + * {@inheritdoc} + */ + public function getConnectionCount(): int + { + return $this->connections->count(); + } + + /** + * {@inheritdoc} + */ + public function getIdleConnectionCount(): int + { + return $this->idle->count(); + } + + /** + * {@inheritdoc} + */ + public function getMaxConnections(): int + { + return $this->maxConnections; + } + + /** + * @return \Generator + * + * @resolve Link + * + * @throws FailureException If creating a new connection fails. + * @throws \Error If the pool has been closed. + */ + protected function pop(): \Generator + { + if ($this->closed) { + throw new \Error("The pool has been closed"); + } + + while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) { + yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1. + } + + do { + // While loop to ensure an idle connection is available after promises below are resolved. + while ($this->idle->isEmpty()) { + if ($this->connections->count() + $this->pending < $this->getMaxConnections()) { + // Max connection count has not been reached, so open another connection. + ++$this->pending; + try { + $connection = yield $this->connector->connect($this->connectionConfig); + if (!$connection instanceof Link) { + throw new \Error(\sprintf( + "%s::connect() must resolve to an instance of %s", + \get_class($this->connector), + Link::class + )); + } + } finally { + --$this->pending; + } + + $this->connections->attach($connection); + return $connection; + } + + // All possible connections busy, so wait until one becomes available. + try { + $this->deferred = new Deferred; + // May be resolved with defunct connection, but that connection will not be added to $this->idle. + yield $this->promise = $this->deferred->promise(); + } finally { + $this->deferred = null; + $this->promise = null; + } + } + + $connection = $this->idle->shift(); + \assert($connection instanceof Link); + + if ($connection->isAlive()) { + return $connection; + } + + $this->connections->detach($connection); + } while (!$this->closed); + + throw new FailureException("Pool closed before an active connection could be obtained"); + } + + /** + * @param Link $connection + * + * @throws \Error If the connection is not part of this pool. + */ + protected function push(Link $connection) + { + \assert(isset($this->connections[$connection]), 'Connection is not part of this pool'); + + if ($connection->isAlive()) { + $this->idle->push($connection); + } else { + $this->connections->detach($connection); + } + + if ($this->deferred instanceof Deferred) { + $this->deferred->resolve($connection); + } + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Promise + { + return call(function () use ($sql) { + $connection = yield from $this->pop(); + \assert($connection instanceof Link); + + try { + $result = yield $connection->query($sql); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + if ($result instanceof Operation) { + $result->onDestruct(function () use ($connection) { + $this->push($connection); + }); + } else { + $this->push($connection); + } + + return $result; + }); + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, array $params = []): Promise + { + return call(function () use ($sql, $params) { + $connection = yield from $this->pop(); + \assert($connection instanceof Link); + + try { + $result = yield $connection->execute($sql, $params); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + if ($result instanceof Operation) { + $result->onDestruct(function () use ($connection) { + $this->push($connection); + }); + } else { + $this->push($connection); + } + + return $result; + }); + } + + /** + * {@inheritdoc} + * + * Prepared statements returned by this method will stay alive as long as the pool remains open. + */ + public function prepare(string $sql): Promise + { + return call(function () use ($sql) { + $connection = yield from $this->pop(); + \assert($connection instanceof Link); + + try { + $statement = yield $connection->prepare($sql); + \assert($statement instanceof Statement); + + \assert( + $statement instanceof Operation, + Statement::class . " instances returned from connections must implement " . Operation::class + ); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + $statement->onDestruct(function () use ($connection) { + $this->push($connection); + }); + + return new PooledStatement($this, $statement, $this->prepare); + }); + } + + /** + * {@inheritdoc} + */ + public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise + { + return call(function () use ($isolation) { + $connection = yield from $this->pop(); + \assert($connection instanceof Link); + + try { + $transaction = yield $connection->transaction($isolation); + \assert($transaction instanceof Transaction); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + $transaction->onDestruct(function () use ($connection) { + $this->push($connection); + }); + + return $transaction; + }); + } } diff --git a/src/PooledStatement.php b/src/PooledStatement.php new file mode 100644 index 0000000..a4a80a9 --- /dev/null +++ b/src/PooledStatement.php @@ -0,0 +1,145 @@ +lastUsedAt = \time(); + $this->statements = $statements = new \SplQueue; + $this->pool = $pool; + $this->prepare = $prepare; + $this->sql = $statement->getQuery(); + + $this->statements->push($statement); + + $this->timeoutWatcher = Loop::repeat(1000, static function () use ($pool, $statements) { + $now = \time(); + $idleTimeout = ((int) ($pool->getIdleTimeout() / 10)) ?: 1; + + while (!$statements->isEmpty()) { + /** @var Statement $statement */ + $statement = $statements->bottom(); + + if ($statement->lastUsedAt() + $idleTimeout > $now) { + return; + } + + $statements->shift(); + } + }); + + Loop::unreference($this->timeoutWatcher); + } + + public function __destruct() + { + Loop::cancel($this->timeoutWatcher); + } + + /** + * {@inheritdoc} + * + * Unlike regular statements, as long as the pool is open this statement will not die. + */ + public function execute(array $params = []): Promise + { + $this->lastUsedAt = \time(); + + return call(function () use ($params) { + if (!$this->statements->isEmpty()) { + do { + /** @var Statement $statement */ + $statement = $this->statements->shift(); + } while (!$statement->isAlive() && !$this->statements->isEmpty()); + } else { + $statement = yield ($this->prepare)($this->sql); + } + + try { + $result = yield $statement->execute($params); + } catch (\Throwable $exception) { + $this->push($statement); + throw $exception; + } + + if ($result instanceof Operation) { + $result->onDestruct(function () use ($statement) { + $this->push($statement); + }); + } else { + $this->push($statement); + } + + return $result; + }); + } + + /** + * Only retains statements if less than 10% of the pool is consumed by this statement and the pool has + * available connections. + * + * @param Statement $statement + */ + private function push(Statement $statement) + { + $maxConnections = $this->pool->getMaxConnections(); + + if ($this->statements->count() > ($maxConnections / 10)) { + return; + } + + if ($maxConnections === $this->pool->getConnectionCount() && $this->pool->getIdleConnectionCount() === 0) { + return; + } + + $this->statements->push($statement); + } + + + /** {@inheritdoc} */ + public function isAlive(): bool + { + return $this->pool->isAlive(); + } + + /** {@inheritdoc} */ + public function getQuery(): string + { + return $this->sql; + } + + /** {@inheritdoc} */ + public function lastUsedAt(): int + { + return $this->lastUsedAt; + } +}