Add Pooled* base classes

This commit is contained in:
Aaron Piotrowski 2018-07-13 10:36:10 -05:00
parent 97b10e4361
commit 6f72cb76da
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 331 additions and 0 deletions

35
src/PooledResultSet.php Normal file
View File

@ -0,0 +1,35 @@
<?php
namespace Amp\Sql;
use Amp\Promise;
class PooledResultSet implements ResultSet
{
/** @var ResultSet */
private $result;
/** @var callable|null */
private $release;
public function __construct(ResultSet $result, callable $release)
{
$this->result = $result;
$this->release = $release;
}
public function __destruct()
{
($this->release)();
}
public function advance(int $type = self::FETCH_ASSOC): Promise
{
return $this->result->advance($type);
}
public function getCurrent()
{
return $this->result->getCurrent();
}
}

81
src/PooledStatement.php Normal file
View File

@ -0,0 +1,81 @@
<?php
namespace Amp\Sql;
use Amp\Promise;
use function Amp\call;
abstract class PooledStatement implements Statement
{
/** @var Statement */
private $statement;
/** @var callable|null */
private $release;
/** @var int */
private $refCount = 1;
/**
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Statement object and
* the given release callable.
*
* @param ResultSet $resultSet
* @param callable $release
*
* @return ResultSet
*/
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
public function __construct(Statement $statement, callable $release)
{
$this->statement = $statement;
if (!$this->statement->isAlive()) {
$release();
} else {
$refCount = &$this->refCount;
$this->release = static function () use (&$refCount, $release) {
if (--$refCount === 0) {
$release();
}
};
}
}
public function __destruct()
{
if ($this->release) {
($this->release)();
}
}
public function execute(array $params = []): Promise
{
return call(function () use ($params) {
$result = yield $this->statement->execute($params);
if ($result instanceof ResultSet) {
++$this->refCount;
return $this->createResultSet($result, $this->release);
}
return $result;
});
}
public function isAlive(): bool
{
return $this->statement->isAlive();
}
public function getQuery(): string
{
return $this->statement->getQuery();
}
public function lastUsedAt(): int
{
return $this->statement->lastUsedAt();
}
}

215
src/PooledTransaction.php Normal file
View File

@ -0,0 +1,215 @@
<?php
namespace Amp\Sql;
use Amp\Promise;
use function Amp\call;
abstract class PooledTransaction implements Transaction
{
/** @var Transaction|null */
private $transaction;
/** @var callable|null */
private $release;
/** @var int */
private $refCount = 1;
/**
* Creates a Statement of the appropriate type using the Statement object returned by the Transaction object and
* the given release callable.
*
* @param Statement $statement
* @param callable $release
*
* @return Statement
*/
abstract protected function createStatement(Statement $statement, callable $release): Statement;
/**
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Transaction object and
* the given release callable.
*
* @param ResultSet $resultSet
* @param callable $release
*
* @return ResultSet
*/
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
/**
* PooledTransaction constructor.
*
* @param Transaction $transaction
* @param callable $release
*/
public function __construct(Transaction $transaction, callable $release)
{
$this->transaction = $transaction;
$this->release = $release;
if (!$this->transaction->isActive()) {
$release();
$this->transaction = null;
} else {
$refCount = &$this->refCount;
$this->release = static function () use (&$refCount, $release) {
if (--$refCount === 0) {
$release();
}
};
}
}
public function __destruct()
{
if ($this->transaction && $this->transaction->isActive()) {
$this->close(); // Invokes $this->release callback.
}
}
public function query(string $sql): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($sql) {
$result = yield $this->transaction->query($sql);
if ($result instanceof ResultSet) {
++$this->refCount;
return $this->createResultSet($result, $this->release);
}
return $result;
});
}
public function prepare(string $sql): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($sql) {
$statement = yield $this->transaction->prepare($sql);
++$this->refCount;
return $this->createStatement($statement, $this->release);
});
}
public function execute(string $sql, array $params = []): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($sql, $params) {
$result = yield $this->transaction->execute($sql, $params);
if ($result instanceof ResultSet) {
++$this->refCount;
return $this->createResultSet($result, $this->release);
}
return $result;
});
}
public function isAlive(): bool
{
return $this->transaction && $this->transaction->isAlive();
}
public function lastUsedAt(): int
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->lastUsedAt();
}
public function close()
{
if (!$this->transaction) {
return;
}
$promise = $this->transaction->commit();
$promise->onResolve($this->release);
$this->transaction = null;
}
public function getIsolationLevel(): int
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->getIsolationLevel();
}
public function isActive(): bool
{
return $this->transaction && $this->transaction->isActive();
}
public function commit(): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->transaction->commit();
$promise->onResolve($this->release);
$this->transaction = null;
return $promise;
}
public function rollback(): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->transaction->rollback();
$promise->onResolve($this->release);
$this->transaction = null;
return $promise;
}
public function createSavepoint(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->createSavepoint($identifier);
}
public function rollbackTo(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->rollbackTo($identifier);
}
public function releaseSavepoint(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->releaseSavepoint($identifier);
}
}