Remove Operation

This commit is contained in:
Aaron Piotrowski 2018-07-04 23:05:55 -05:00
parent ed9d2b1002
commit 7df38b63fc
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 71 additions and 34 deletions

View File

@ -56,6 +56,48 @@ abstract class AbstractPool implements Pool
*/
abstract protected function createDefaultConnector(): Connector;
/**
* Creates a ResultSet of the appropriate type using the ResultSet object returned by the Link object and the
* given release callable.
*
* @param ResultSet $resultSet
* @param callable $release
*
* @return ResultSet
*/
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
/**
* Creates a Statement of the appropriate type using the Statement object returned by the Link object and the
* given release callable.
*
* @param Statement $statement
* @param callable $release
*
* @return Statement
*/
abstract protected function createStatement(Statement $statement, callable $release): Statement;
/**
* @param Pool $pool
* @param Statement $statement
* @param callable $prepare
*
* @return PooledStatement
*/
abstract protected function createPooledStatement(Pool $pool, Statement $statement, callable $prepare): PooledStatement;
/**
* Creates a Transaction of the appropriate type using the Transaction object returned by the Link object and the
* given release callable.
*
* @param Transaction $transaction
* @param callable $release
*
* @return Transaction
*/
abstract protected function createTransaction(Transaction $transaction, callable $release): Transaction;
public function __construct(
ConnectionConfig $config,
int $maxConnections = Pool::DEFAULT_MAX_CONNECTIONS,
@ -72,7 +114,7 @@ abstract class AbstractPool implements Pool
$this->connections = $connections = new \SplObjectStorage;
$this->idle = $idle = new \SplQueue;
$this->prepare = coroutine($this->callableFromInstanceMethod("createStatement"));
$this->prepare = coroutine($this->callableFromInstanceMethod("prepareStatement"));
$idleTimeout = &$this->idleTimeout;
@ -290,8 +332,8 @@ abstract class AbstractPool implements Pool
throw $exception;
}
if ($result instanceof Operation) {
$result->onDestruct(function () use ($connection) {
if ($result instanceof ResultSet) {
$result = $this->createResultSet($result, function () use ($connection) {
$this->push($connection);
});
} else {
@ -318,8 +360,8 @@ abstract class AbstractPool implements Pool
throw $exception;
}
if ($result instanceof Operation) {
$result->onDestruct(function () use ($connection) {
if ($result instanceof ResultSet) {
$result = $this->createResultSet($result, function () use ($connection) {
$this->push($connection);
});
} else {
@ -338,12 +380,12 @@ abstract class AbstractPool implements Pool
public function prepare(string $sql): Promise
{
return call(function () use ($sql) {
$statement = yield from $this->createStatement($sql);
return new PooledStatement($this, $statement, $this->prepare);
$statement = yield from $this->prepareStatement($sql);
return $this->createPooledStatement($this, $statement, $this->prepare);
});
}
private function createStatement(string $sql): \Generator
private function prepareStatement(string $sql): \Generator
{
$connection = yield from $this->pop();
\assert($connection instanceof Link);
@ -351,21 +393,14 @@ abstract class AbstractPool implements Pool
try {
$statement = yield $connection->prepare($sql);
\assert($statement instanceof Statement);
\assert(
$statement instanceof Operation,
Statement::class . " instances returned from connections must implement " . Operation::class
);
} catch (\Throwable $exception) {
$this->push($connection);
throw $exception;
}
$statement->onDestruct(function () use ($connection) {
return $this->createStatement($statement, function () use ($connection) {
$this->push($connection);
});
return $statement;
}
/**
@ -385,11 +420,9 @@ abstract class AbstractPool implements Pool
throw $exception;
}
$transaction->onDestruct(function () use ($connection) {
return $this->createTransaction($transaction, function () use ($connection) {
$this->push($connection);
});
return $transaction;
});
}
}

View File

@ -1,11 +0,0 @@
<?php
namespace Amp\Sql;
interface Operation
{
/**
* @param callable $onDestruct Callback executed when the operation completes or the object is destroyed.
*/
public function onDestruct(callable $onDestruct);
}

View File

@ -6,7 +6,7 @@ use Amp\Loop;
use Amp\Promise;
use function Amp\call;
final class PooledStatement implements Statement
abstract class PooledStatement implements Statement
{
/** @var \Amp\Sql\Pool */
private $pool;
@ -26,6 +26,21 @@ final class PooledStatement implements Statement
/** @var callable */
private $prepare;
/**
* @param ResultSet $resultSet
* @param callable $release
*
* @return ResultSet
*/
abstract protected function createResultSet(ResultSet $resultSet, callable $release): ResultSet;
/**
* Perform any necessary operation on the given Statement object before execute() is invoked.
*
* @param Statement $statement
*/
abstract protected function prepare(Statement $statement);
/**
* @param Pool $pool Pool used to re-create the statement if the original closes.
* @param Statement $statement Original prepared statement returned from the Link.
@ -91,8 +106,8 @@ final class PooledStatement implements Statement
throw $exception;
}
if ($result instanceof Operation) {
$result->onDestruct(function () use ($statement) {
if ($result instanceof ResultSet) {
$result = $this->createResultSet($result, function () use ($statement) {
$this->push($statement);
});
} else {

View File

@ -4,7 +4,7 @@ namespace Amp\Sql;
use Amp\Promise;
interface Transaction extends Executor, Operation
interface Transaction extends Executor
{
const ISOLATION_UNCOMMITTED = 0;
const ISOLATION_COMMITTED = 1;