diff --git a/src/PooledResultSet.php b/src/PooledResultSet.php new file mode 100644 index 0000000..14fab65 --- /dev/null +++ b/src/PooledResultSet.php @@ -0,0 +1,35 @@ +result = $result; + $this->release = $release; + } + + public function __destruct() + { + ($this->release)(); + } + + public function advance(int $type = self::FETCH_ASSOC): Promise + { + return $this->result->advance($type); + } + + public function getCurrent() + { + return $this->result->getCurrent(); + } +} diff --git a/src/PooledStatement.php b/src/PooledStatement.php new file mode 100644 index 0000000..c3b1087 --- /dev/null +++ b/src/PooledStatement.php @@ -0,0 +1,81 @@ +statement = $statement; + + if (!$this->statement->isAlive()) { + $release(); + } else { + $refCount = &$this->refCount; + $this->release = static function () use (&$refCount, $release) { + if (--$refCount === 0) { + $release(); + } + }; + } + } + + public function __destruct() + { + if ($this->release) { + ($this->release)(); + } + } + + public function execute(array $params = []): Promise + { + return call(function () use ($params) { + $result = yield $this->statement->execute($params); + + if ($result instanceof ResultSet) { + ++$this->refCount; + return $this->createResultSet($result, $this->release); + } + + return $result; + }); + } + + public function isAlive(): bool + { + return $this->statement->isAlive(); + } + + public function getQuery(): string + { + return $this->statement->getQuery(); + } + + public function lastUsedAt(): int + { + return $this->statement->lastUsedAt(); + } +} diff --git a/src/PooledTransaction.php b/src/PooledTransaction.php new file mode 100644 index 0000000..67e54b3 --- /dev/null +++ b/src/PooledTransaction.php @@ -0,0 +1,215 @@ +transaction = $transaction; + $this->release = $release; + + if (!$this->transaction->isActive()) { + $release(); + $this->transaction = null; + } else { + $refCount = &$this->refCount; + $this->release = static function () use (&$refCount, $release) { + if (--$refCount === 0) { + $release(); + } + }; + } + } + + public function __destruct() + { + if ($this->transaction && $this->transaction->isActive()) { + $this->close(); // Invokes $this->release callback. + } + } + + public function query(string $sql): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return call(function () use ($sql) { + $result = yield $this->transaction->query($sql); + + if ($result instanceof ResultSet) { + ++$this->refCount; + return $this->createResultSet($result, $this->release); + } + + return $result; + }); + } + + public function prepare(string $sql): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return call(function () use ($sql) { + $statement = yield $this->transaction->prepare($sql); + ++$this->refCount; + return $this->createStatement($statement, $this->release); + }); + } + + public function execute(string $sql, array $params = []): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return call(function () use ($sql, $params) { + $result = yield $this->transaction->execute($sql, $params); + + if ($result instanceof ResultSet) { + ++$this->refCount; + return $this->createResultSet($result, $this->release); + } + + return $result; + }); + } + + public function isAlive(): bool + { + return $this->transaction && $this->transaction->isAlive(); + } + + public function lastUsedAt(): int + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->lastUsedAt(); + } + + public function close() + { + if (!$this->transaction) { + return; + } + + $promise = $this->transaction->commit(); + $promise->onResolve($this->release); + + $this->transaction = null; + } + + public function getIsolationLevel(): int + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->getIsolationLevel(); + } + + public function isActive(): bool + { + return $this->transaction && $this->transaction->isActive(); + } + + public function commit(): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->transaction->commit(); + $promise->onResolve($this->release); + + $this->transaction = null; + + return $promise; + } + + public function rollback(): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->transaction->rollback(); + $promise->onResolve($this->release); + + $this->transaction = null; + + return $promise; + } + + public function createSavepoint(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->createSavepoint($identifier); + } + + public function rollbackTo(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->rollbackTo($identifier); + } + + public function releaseSavepoint(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->releaseSavepoint($identifier); + } +}