diff --git a/src/AbstractPool.php b/src/AbstractPool.php deleted file mode 100644 index 043a7da..0000000 --- a/src/AbstractPool.php +++ /dev/null @@ -1,439 +0,0 @@ -connector = $connector ?? $this->createDefaultConnector(); - - $this->connectionConfig = $config; - - $this->idleTimeout = $idleTimeout; - if ($this->idleTimeout < 1) { - throw new \Error("The idle timeout must be 1 or greater"); - } - - $this->maxConnections = $maxConnections; - if ($this->maxConnections < 1) { - throw new \Error("Pool must contain at least one connection"); - } - - $this->connections = $connections = new \SplObjectStorage; - $this->idle = $idle = new \SplQueue; - $this->prepare = coroutine($this->callableFromInstanceMethod("prepareStatement")); - - $idleTimeout = &$this->idleTimeout; - - $this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) { - $now = \time(); - while (!$idle->isEmpty()) { - $connection = $idle->bottom(); - \assert($connection instanceof Link); - - if ($connection->getLastUsedAt() + $idleTimeout > $now) { - return; - } - - // Close connection and remove it from the pool. - $idle->shift(); - $connections->detach($connection); - $connection->close(); - } - }); - - Loop::unreference($this->timeoutWatcher); - } - - public function __destruct() - { - Loop::cancel($this->timeoutWatcher); - } - - public function getIdleTimeout(): int - { - return $this->idleTimeout; - } - - public function getLastUsedAt(): int - { - // Simple implementation... can be improved if needed. - - $time = 0; - - foreach ($this->connections as $connection) { - \assert($connection instanceof Link); - if (($lastUsedAt = $connection->getLastUsedAt()) > $time) { - $time = $lastUsedAt; - } - } - - return $time; - } - - /** - * @return bool - */ - public function isAlive(): bool - { - return !$this->closed; - } - - /** - * Close all connections in the pool. No further queries may be made after a pool is closed. - */ - public function close() - { - $this->closed = true; - foreach ($this->connections as $connection) { - $connection->close(); - } - $this->idle = new \SplQueue; - $this->prepare = null; - - if ($this->deferred instanceof Deferred) { - $deferred = $this->deferred; - $this->deferred = null; - $deferred->fail(new FailureException("Connection pool closed")); - } - } - - /** - * {@inheritdoc} - */ - public function extractConnection(): Promise - { - return call(function () { - $connection = yield from $this->pop(); - $this->connections->detach($connection); - return $connection; - }); - } - - /** - * {@inheritdoc} - */ - public function getConnectionCount(): int - { - return $this->connections->count(); - } - - /** - * {@inheritdoc} - */ - public function getIdleConnectionCount(): int - { - return $this->idle->count(); - } - - /** - * {@inheritdoc} - */ - public function getConnectionLimit(): int - { - return $this->maxConnections; - } - - /** - * @return \Generator - * - * @resolve Link - * - * @throws FailureException If creating a new connection fails. - * @throws \Error If the pool has been closed. - */ - protected function pop(): \Generator - { - if ($this->closed) { - throw new \Error("The pool has been closed"); - } - - while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getConnectionLimit()) { - yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1. - } - - do { - // While loop to ensure an idle connection is available after promises below are resolved. - while ($this->idle->isEmpty()) { - if ($this->connections->count() + $this->pending < $this->getConnectionLimit()) { - // Max connection count has not been reached, so open another connection. - ++$this->pending; - try { - $connection = yield $this->connector->connect($this->connectionConfig); - if (!$connection instanceof Link) { - throw new \Error(\sprintf( - "%s::connect() must resolve to an instance of %s", - \get_class($this->connector), - Link::class - )); - } - } finally { - --$this->pending; - } - - $this->connections->attach($connection); - return $connection; - } - - // All possible connections busy, so wait until one becomes available. - try { - $this->deferred = new Deferred; - // May be resolved with defunct connection, but that connection will not be added to $this->idle. - yield $this->promise = $this->deferred->promise(); - } finally { - $this->deferred = null; - $this->promise = null; - } - } - - $connection = $this->idle->shift(); - \assert($connection instanceof Link); - - if ($connection->isAlive()) { - return $connection; - } - - $this->connections->detach($connection); - } while (!$this->closed); - - throw new FailureException("Pool closed before an active connection could be obtained"); - } - - /** - * @param Link $connection - * - * @throws \Error If the connection is not part of this pool. - */ - protected function push(Link $connection) - { - \assert(isset($this->connections[$connection]), 'Connection is not part of this pool'); - - if ($connection->isAlive()) { - $this->idle->push($connection); - } else { - $this->connections->detach($connection); - } - - if ($this->deferred instanceof Deferred) { - $this->deferred->resolve($connection); - } - } - - /** - * {@inheritdoc} - */ - public function query(string $sql): Promise - { - return call(function () use ($sql) { - $connection = yield from $this->pop(); - \assert($connection instanceof Link); - - try { - $result = yield $connection->query($sql); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof ResultSet) { - $result = $this->createResultSet($result, function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - return $result; - }); - } - - /** - * {@inheritdoc} - */ - public function execute(string $sql, array $params = []): Promise - { - return call(function () use ($sql, $params) { - $connection = yield from $this->pop(); - \assert($connection instanceof Link); - - try { - $result = yield $connection->execute($sql, $params); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof ResultSet) { - $result = $this->createResultSet($result, function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - return $result; - }); - } - - /** - * {@inheritdoc} - * - * Prepared statements returned by this method will stay alive as long as the pool remains open. - */ - public function prepare(string $sql): Promise - { - return call(function () use ($sql) { - $statement = yield from $this->prepareStatement($sql); - return $this->createStatementPool($this, $statement, $this->prepare); - }); - } - - private function prepareStatement(string $sql): \Generator - { - $connection = yield from $this->pop(); - \assert($connection instanceof Link); - - try { - $statement = yield $connection->prepare($sql); - \assert($statement instanceof Statement); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - return $this->createStatement($statement, function () use ($connection) { - $this->push($connection); - }); - } - - /** - * {@inheritdoc} - */ - public function beginTransaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise - { - return call(function () use ($isolation) { - $connection = yield from $this->pop(); - \assert($connection instanceof Link); - - try { - $transaction = yield $connection->beginTransaction($isolation); - \assert($transaction instanceof Transaction); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - return $this->createTransaction($transaction, function () use ($connection) { - $this->push($connection); - }); - }); - } -} diff --git a/src/PooledResultSet.php b/src/PooledResultSet.php deleted file mode 100644 index 65b574a..0000000 --- a/src/PooledResultSet.php +++ /dev/null @@ -1,35 +0,0 @@ -result = $result; - $this->release = $release; - } - - public function __destruct() - { - ($this->release)(); - } - - public function advance(): Promise - { - return $this->result->advance(); - } - - public function getCurrent(int $type = self::FETCH_ASSOC) - { - return $this->result->getCurrent($type); - } -} diff --git a/src/PooledStatement.php b/src/PooledStatement.php deleted file mode 100644 index 7c53ce6..0000000 --- a/src/PooledStatement.php +++ /dev/null @@ -1,81 +0,0 @@ -statement = $statement; - - if (!$this->statement->isAlive()) { - $release(); - } else { - $refCount = &$this->refCount; - $this->release = static function () use (&$refCount, $release) { - if (--$refCount === 0) { - $release(); - } - }; - } - } - - public function __destruct() - { - if ($this->release) { - ($this->release)(); - } - } - - public function execute(array $params = []): Promise - { - return call(function () use ($params) { - $result = yield $this->statement->execute($params); - - if ($result instanceof ResultSet) { - ++$this->refCount; - return $this->createResultSet($result, $this->release); - } - - return $result; - }); - } - - public function isAlive(): bool - { - return $this->statement->isAlive(); - } - - public function getQuery(): string - { - return $this->statement->getQuery(); - } - - public function getLastUsedAt(): int - { - return $this->statement->getLastUsedAt(); - } -} diff --git a/src/PooledTransaction.php b/src/PooledTransaction.php deleted file mode 100644 index 0088860..0000000 --- a/src/PooledTransaction.php +++ /dev/null @@ -1,215 +0,0 @@ -transaction = $transaction; - $this->release = $release; - - if (!$this->transaction->isActive()) { - $release(); - $this->transaction = null; - } else { - $refCount = &$this->refCount; - $this->release = static function () use (&$refCount, $release) { - if (--$refCount === 0) { - $release(); - } - }; - } - } - - public function __destruct() - { - if ($this->transaction && $this->transaction->isActive()) { - $this->close(); // Invokes $this->release callback. - } - } - - public function query(string $sql): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return call(function () use ($sql) { - $result = yield $this->transaction->query($sql); - - if ($result instanceof ResultSet) { - ++$this->refCount; - return $this->createResultSet($result, $this->release); - } - - return $result; - }); - } - - public function prepare(string $sql): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return call(function () use ($sql) { - $statement = yield $this->transaction->prepare($sql); - ++$this->refCount; - return $this->createStatement($statement, $this->release); - }); - } - - public function execute(string $sql, array $params = []): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return call(function () use ($sql, $params) { - $result = yield $this->transaction->execute($sql, $params); - - if ($result instanceof ResultSet) { - ++$this->refCount; - return $this->createResultSet($result, $this->release); - } - - return $result; - }); - } - - public function isAlive(): bool - { - return $this->transaction && $this->transaction->isAlive(); - } - - public function getLastUsedAt(): int - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->transaction->getLastUsedAt(); - } - - public function close() - { - if (!$this->transaction) { - return; - } - - $promise = $this->transaction->commit(); - $promise->onResolve($this->release); - - $this->transaction = null; - } - - public function getIsolationLevel(): int - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->transaction->getIsolationLevel(); - } - - public function isActive(): bool - { - return $this->transaction && $this->transaction->isActive(); - } - - public function commit(): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $promise = $this->transaction->commit(); - $promise->onResolve($this->release); - - $this->transaction = null; - - return $promise; - } - - public function rollback(): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $promise = $this->transaction->rollback(); - $promise->onResolve($this->release); - - $this->transaction = null; - - return $promise; - } - - public function createSavepoint(string $identifier): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->transaction->createSavepoint($identifier); - } - - public function rollbackTo(string $identifier): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->transaction->rollbackTo($identifier); - } - - public function releaseSavepoint(string $identifier): Promise - { - if (!$this->transaction) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->transaction->releaseSavepoint($identifier); - } -} diff --git a/src/StatementPool.php b/src/StatementPool.php deleted file mode 100644 index 49eb26d..0000000 --- a/src/StatementPool.php +++ /dev/null @@ -1,179 +0,0 @@ - - */ - abstract protected function prepare(Statement $statement): Promise; - - /** - * @param ResultSet $resultSet - * @param callable $release - * - * @return ResultSet - */ - abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet; - - /** - * @param Pool $pool Pool used to re-create the statement if the original closes. - * @param Statement $statement Original prepared statement returned from the Link. - * @param callable $prepare Callable that returns a new prepared statement. - */ - public function __construct(Pool $pool, Statement $statement, callable $prepare) - { - $this->lastUsedAt = \time(); - $this->statements = $statements = new \SplQueue; - $this->pool = $pool; - $this->prepare = $prepare; - $this->sql = $statement->getQuery(); - - $this->statements->push($statement); - - $this->timeoutWatcher = Loop::repeat(1000, static function () use ($pool, $statements) { - $now = \time(); - $idleTimeout = ((int) ($pool->getIdleTimeout() / 10)) ?: 1; - - while (!$statements->isEmpty()) { - $statement = $statements->bottom(); - \assert($statement instanceof Statement); - - if ($statement->getLastUsedAt() + $idleTimeout > $now) { - return; - } - - $statements->shift(); - } - }); - - Loop::unreference($this->timeoutWatcher); - } - - public function __destruct() - { - Loop::cancel($this->timeoutWatcher); - } - - /** - * {@inheritdoc} - * - * Unlike regular statements, as long as the pool is open this statement will not die. - */ - public function execute(array $params = []): Promise - { - $this->lastUsedAt = \time(); - - return call(function () use ($params) { - $statement = yield from $this->pop(); - \assert($statement instanceof Statement); - - try { - $statement = yield $this->prepare($statement); - \assert($statement instanceof Statement); - $result = yield $statement->execute($params); - } catch (\Throwable $exception) { - $this->push($statement); - throw $exception; - } - - if ($result instanceof ResultSet) { - $result = $this->createResultSet($result, function () use ($statement) { - $this->push($statement); - }); - } else { - $this->push($statement); - } - - return $result; - }); - } - - /** - * Only retains statements if less than 10% of the pool is consumed by this statement and the pool has - * available connections. - * - * @param Statement $statement - */ - protected function push(Statement $statement) - { - $maxConnections = $this->pool->getConnectionLimit(); - - if ($this->statements->count() > ($maxConnections / 10)) { - return; - } - - if ($maxConnections === $this->pool->getConnectionCount() && $this->pool->getIdleConnectionCount() === 0) { - return; - } - - $this->statements->push($statement); - } - - /** - * Coroutine returning a Statement object from the pool or creating a new Statement. - * - * @return \Generator - */ - protected function pop(): \Generator - { - while (!$this->statements->isEmpty()) { - $statement = $this->statements->shift(); - \assert($statement instanceof Statement); - - if ($statement->isAlive()) { - return $statement; - } - } - - $statement = yield ($this->prepare)($this->sql); - \assert($statement instanceof Statement); - return $statement; - } - - /** {@inheritdoc} */ - public function isAlive(): bool - { - return $this->pool->isAlive(); - } - - /** {@inheritdoc} */ - public function getQuery(): string - { - return $this->sql; - } - - /** {@inheritdoc} */ - public function getLastUsedAt(): int - { - return $this->lastUsedAt; - } -} diff --git a/test/AbstractPoolTest.php b/test/AbstractPoolTest.php deleted file mode 100644 index f64dc91..0000000 --- a/test/AbstractPoolTest.php +++ /dev/null @@ -1,76 +0,0 @@ -getMockBuilder(AbstractPool::class) - ->setConstructorArgs([$this->createMock(ConnectionConfig::class), 0]) - ->getMock(); - } - - public function testIdleConnectionsRemovedAfterTimeout() - { - Loop::run(function () { - $now = \time(); - - $connector = $this->createMock(Connector::class); - $connector->method('connect') - ->willReturnCallback(function () use ($now): Promise { - $link = $this->createMock(Link::class); - $link->method('getLastUsedAt') - ->willReturn($now); - - $link->method('isAlive') - ->willReturn(true); - - $link->method('query') - ->willReturnCallback(function () { - return new Delayed(100); - }); - - return new Success($link); - }); - - /** @var AbstractPool $pool */ - $pool = $this->getMockBuilder(AbstractPool::class) - ->setConstructorArgs([$this->createMock(ConnectionConfig::class), 100, 2, $connector]) - ->getMockForAbstractClass(); - - $count = 3; - - $promises = []; - for ($i = 0; $i < $count; ++$i) { - $promises[] = $pool->query("SELECT $i"); - } - - $results = yield $promises; - - $this->assertSame($count, $pool->getConnectionCount()); - - yield new Delayed(1000); - - $this->assertSame($count, $pool->getConnectionCount()); - - yield new Delayed(1000); - - $this->assertSame(0, $pool->getConnectionCount()); - }); - } -} diff --git a/test/StatementPoolTest.php b/test/StatementPoolTest.php deleted file mode 100644 index 3ffa3c1..0000000 --- a/test/StatementPoolTest.php +++ /dev/null @@ -1,98 +0,0 @@ -createMock(Pool::class); - $pool->method('isAlive') - ->willReturn(true); - $pool->method('getIdleTimeout') - ->willReturn(60); - - $statement = $this->createMock(Statement::class); - $statement->method('isAlive') - ->willReturn(true); - $statement->method('getQuery') - ->willReturn('SELECT 1'); - $statement->method('getLastUsedAt') - ->willReturn(\time()); - $statement->expects($this->once()) - ->method('execute'); - - /** @var StatementPool $statementPool */ - $statementPool = $this->getMockBuilder(StatementPool::class) - ->setConstructorArgs([$pool, $statement, $this->createCallback(0)]) - ->getMockForAbstractClass(); - - $statementPool->method('prepare') - ->willReturnCallback(function (Statement $statement) { - return new Success($statement); - }); - - $this->assertTrue($statementPool->isAlive()); - $this->assertSame(\time(), $statementPool->getLastUsedAt()); - - yield new Delayed(1500); // Give timeout watcher enough time to execute. - - $statementPool->execute(); - - $this->assertTrue($statementPool->isAlive()); - $this->assertSame(\time(), $statementPool->getLastUsedAt()); - }); - } - - public function testIdleStatementsRemovedAfterTimeout() - { - Loop::run(function () { - $pool = $this->createMock(Pool::class); - $pool->method('isAlive') - ->willReturn(true); - $pool->method('getIdleTimeout') - ->willReturn(1); - - $statement = $this->createMock(Statement::class); - $statement->method('isAlive') - ->willReturn(true); - $statement->method('getQuery') - ->willReturn('SELECT 1'); - $statement->method('getLastUsedAt') - ->willReturn(\time()); - $statement->expects($this->once()) - ->method('execute'); - - /** @var StatementPool $statementPool */ - $statementPool = $this->getMockBuilder(StatementPool::class) - ->setConstructorArgs([$pool, $statement, $this->createCallback(1)]) - ->getMockForAbstractClass(); - - $statementPool->method('prepare') - ->willReturnCallback(function (Statement $statement) { - return new Success($statement); - }); - - $this->assertTrue($statementPool->isAlive()); - $this->assertSame(\time(), $statementPool->getLastUsedAt()); - - $statementPool->execute(); - - yield new Delayed(1500); // Give timeout watcher enough time to execute. - - $statementPool->execute(); - - $this->assertTrue($statementPool->isAlive()); - $this->assertSame(\time(), $statementPool->getLastUsedAt()); - }); - } -}