parent
15c2090945
commit
e257237b61
@ -11,6 +11,7 @@
|
||||
"homepage": "http://amphp.org",
|
||||
"license": "MIT",
|
||||
"require": {
|
||||
"php": ">=7",
|
||||
"amphp/amp": "^2"
|
||||
},
|
||||
"require-dev": {
|
||||
|
@ -13,6 +13,9 @@ abstract class AbstractPool implements Pool
|
||||
{
|
||||
use CallableMaker;
|
||||
|
||||
const DEFAULT_MAX_CONNECTIONS = 100;
|
||||
const DEFAULT_IDLE_TIMEOUT = 60;
|
||||
|
||||
/** @var Connector */
|
||||
private $connector;
|
||||
|
||||
@ -28,10 +31,10 @@ abstract class AbstractPool implements Pool
|
||||
/** @var \SplObjectStorage */
|
||||
private $connections;
|
||||
|
||||
/** @var \Amp\Promise|null */
|
||||
/** @var Promise|null */
|
||||
private $promise;
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
/** @var Deferred|null */
|
||||
private $deferred;
|
||||
|
||||
/** @var callable */
|
||||
@ -106,8 +109,8 @@ abstract class AbstractPool implements Pool
|
||||
*/
|
||||
public function __construct(
|
||||
ConnectionConfig $config,
|
||||
int $maxConnections = Pool::DEFAULT_MAX_CONNECTIONS,
|
||||
int $idleTimeout = Pool::DEFAULT_IDLE_TIMEOUT,
|
||||
int $maxConnections = self::DEFAULT_MAX_CONNECTIONS,
|
||||
int $idleTimeout = self::DEFAULT_IDLE_TIMEOUT,
|
||||
Connector $connector = null
|
||||
) {
|
||||
$this->connector = $connector ?? $this->createDefaultConnector();
|
||||
@ -136,7 +139,7 @@ abstract class AbstractPool implements Pool
|
||||
$connection = $idle->bottom();
|
||||
\assert($connection instanceof Link);
|
||||
|
||||
if ($connection->lastUsedAt() + $idleTimeout > $now) {
|
||||
if ($connection->getLastUsedAt() + $idleTimeout > $now) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -160,7 +163,7 @@ abstract class AbstractPool implements Pool
|
||||
return $this->idleTimeout;
|
||||
}
|
||||
|
||||
public function lastUsedAt(): int
|
||||
public function getLastUsedAt(): int
|
||||
{
|
||||
// Simple implementation... can be improved if needed.
|
||||
|
||||
@ -168,7 +171,7 @@ abstract class AbstractPool implements Pool
|
||||
|
||||
foreach ($this->connections as $connection) {
|
||||
\assert($connection instanceof Link);
|
||||
if (($lastUsedAt = $connection->lastUsedAt()) > $time) {
|
||||
if (($lastUsedAt = $connection->getLastUsedAt()) > $time) {
|
||||
$time = $lastUsedAt;
|
||||
}
|
||||
}
|
||||
@ -229,7 +232,7 @@ abstract class AbstractPool implements Pool
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMaxConnections(): int
|
||||
public function getConnectionLimit(): int
|
||||
{
|
||||
return $this->maxConnections;
|
||||
}
|
||||
@ -248,14 +251,14 @@ abstract class AbstractPool implements Pool
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
||||
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getConnectionLimit()) {
|
||||
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
|
||||
}
|
||||
|
||||
do {
|
||||
// While loop to ensure an idle connection is available after promises below are resolved.
|
||||
while ($this->idle->isEmpty()) {
|
||||
if ($this->connections->count() + $this->pending < $this->getMaxConnections()) {
|
||||
if ($this->connections->count() + $this->pending < $this->getConnectionLimit()) {
|
||||
// Max connection count has not been reached, so open another connection.
|
||||
++$this->pending;
|
||||
try {
|
||||
@ -409,14 +412,14 @@ abstract class AbstractPool implements Pool
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise
|
||||
public function beginTransaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise
|
||||
{
|
||||
return call(function () use ($isolation) {
|
||||
$connection = yield from $this->pop();
|
||||
\assert($connection instanceof Link);
|
||||
|
||||
try {
|
||||
$transaction = yield $connection->transaction($isolation);
|
||||
$transaction = yield $connection->beginTransaction($isolation);
|
||||
\assert($transaction instanceof Transaction);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
|
@ -9,5 +9,5 @@ interface CommandResult
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
public function affectedRows(): int;
|
||||
public function getAffectedRowCount(): int;
|
||||
}
|
||||
|
@ -13,5 +13,5 @@ interface Link extends Executor
|
||||
*
|
||||
* @return Promise<Transaction>
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise;
|
||||
public function beginTransaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise;
|
||||
}
|
||||
|
@ -6,9 +6,6 @@ use Amp\Promise;
|
||||
|
||||
interface Pool extends Link
|
||||
{
|
||||
const DEFAULT_MAX_CONNECTIONS = 100;
|
||||
const DEFAULT_IDLE_TIMEOUT = 60;
|
||||
|
||||
/**
|
||||
* @return Promise<Link>
|
||||
*/
|
||||
@ -27,7 +24,7 @@ interface Pool extends Link
|
||||
/**
|
||||
* @return int Maximum number of connections this pool will create.
|
||||
*/
|
||||
public function getMaxConnections(): int;
|
||||
public function getConnectionLimit(): int;
|
||||
|
||||
/**
|
||||
* @return int Number of seconds a connection may remain idle before it is automatically closed.
|
||||
|
@ -9,7 +9,7 @@ class PooledResultSet implements ResultSet
|
||||
/** @var ResultSet */
|
||||
private $result;
|
||||
|
||||
/** @var callable|null */
|
||||
/** @var callable */
|
||||
private $release;
|
||||
|
||||
public function __construct(ResultSet $result, callable $release)
|
||||
|
@ -74,8 +74,8 @@ abstract class PooledStatement implements Statement
|
||||
return $this->statement->getQuery();
|
||||
}
|
||||
|
||||
public function lastUsedAt(): int
|
||||
public function getLastUsedAt(): int
|
||||
{
|
||||
return $this->statement->lastUsedAt();
|
||||
return $this->statement->getLastUsedAt();
|
||||
}
|
||||
}
|
||||
|
@ -123,13 +123,13 @@ abstract class PooledTransaction implements Transaction
|
||||
return $this->transaction && $this->transaction->isAlive();
|
||||
}
|
||||
|
||||
public function lastUsedAt(): int
|
||||
public function getLastUsedAt(): int
|
||||
{
|
||||
if (!$this->transaction) {
|
||||
throw new TransactionError("The transaction has been committed or rolled back");
|
||||
}
|
||||
|
||||
return $this->transaction->lastUsedAt();
|
||||
return $this->transaction->getLastUsedAt();
|
||||
}
|
||||
|
||||
public function close()
|
||||
|
@ -57,7 +57,7 @@ abstract class StatementPool implements Statement
|
||||
$statement = $statements->bottom();
|
||||
\assert($statement instanceof Statement);
|
||||
|
||||
if ($statement->lastUsedAt() + $idleTimeout > $now) {
|
||||
if ($statement->getLastUsedAt() + $idleTimeout > $now) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -113,7 +113,7 @@ abstract class StatementPool implements Statement
|
||||
*/
|
||||
protected function push(Statement $statement)
|
||||
{
|
||||
$maxConnections = $this->pool->getMaxConnections();
|
||||
$maxConnections = $this->pool->getConnectionLimit();
|
||||
|
||||
if ($this->statements->count() > ($maxConnections / 10)) {
|
||||
return;
|
||||
@ -160,7 +160,7 @@ abstract class StatementPool implements Statement
|
||||
}
|
||||
|
||||
/** {@inheritdoc} */
|
||||
public function lastUsedAt(): int
|
||||
public function getLastUsedAt(): int
|
||||
{
|
||||
return $this->lastUsedAt;
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ interface TransientResource
|
||||
/**
|
||||
* Get the timestamp of the last usage of this resource.
|
||||
*
|
||||
* @return int
|
||||
* @return int Unix timestamp in seconds.
|
||||
*/
|
||||
public function lastUsedAt(): int;
|
||||
public function getLastUsedAt(): int;
|
||||
}
|
||||
|
@ -36,14 +36,14 @@ class StatementPoolTest extends TestCase
|
||||
->getMockForAbstractClass();
|
||||
|
||||
$this->assertTrue($statementPool->isAlive());
|
||||
$this->assertSame(\time(), $statementPool->lastUsedAt());
|
||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
||||
|
||||
yield new Delayed(1500); // Give timeout watcher enough time to execute.
|
||||
|
||||
$statementPool->execute();
|
||||
|
||||
$this->assertTrue($statementPool->isAlive());
|
||||
$this->assertSame(\time(), $statementPool->lastUsedAt());
|
||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
||||
});
|
||||
}
|
||||
|
||||
@ -72,7 +72,7 @@ class StatementPoolTest extends TestCase
|
||||
->getMockForAbstractClass();
|
||||
|
||||
$this->assertTrue($statementPool->isAlive());
|
||||
$this->assertSame(\time(), $statementPool->lastUsedAt());
|
||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
||||
|
||||
$statementPool->execute();
|
||||
|
||||
@ -81,7 +81,7 @@ class StatementPoolTest extends TestCase
|
||||
$statementPool->execute();
|
||||
|
||||
$this->assertTrue($statementPool->isAlive());
|
||||
$this->assertSame(\time(), $statementPool->lastUsedAt());
|
||||
$this->assertSame(\time(), $statementPool->getLastUsedAt());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user