Apply fixes from StyleCI

This commit is contained in:
Daniil Gentili 2017-05-16 20:19:54 +00:00 committed by StyleCI Bot
parent ab1bd5330d
commit 1f35f9fdb7
6 changed files with 374 additions and 269 deletions

View File

@ -1,7 +1,8 @@
<?php <?php
if (!extension_loaded("pthreads")) {
interface Collectable { if (!extension_loaded('pthreads')) {
public function isGarbage(); interface Collectable
} {
public function isGarbage();
}
} }

View File

@ -1,62 +1,69 @@
<?php <?php
if (!extension_loaded("pthreads")) {
class Pool { if (!extension_loaded('pthreads')) {
class Pool
{
public function __construct($size, $class = \Worker::class, $ctor = [])
{
$this->size = $size;
$this->clazz = $class;
$this->ctor = $ctor;
}
public function __construct($size, $class = \Worker::class, $ctor = []) { public function submit(Threaded $collectable)
$this->size = $size; {
$this->clazz = $class; if ($this->last > $this->size) {
$this->ctor = $ctor; $this->last = 0;
} }
public function submit(Threaded $collectable) { if (!isset($this->workers[$this->last])) {
if ($this->last > $this->size) { $this->workers[$this->last] =
$this->last = 0; new $this->clazz(...$this->ctor);
} $this->workers[$this->last]->start();
}
if (!isset($this->workers[$this->last])) { $this->workers[$this->last++]->stack($collectable);
$this->workers[$this->last] = }
new $this->clazz(...$this->ctor);
$this->workers[$this->last]->start();
}
$this->workers[$this->last++]->stack($collectable); public function submitTo($worker, Threaded $collectable)
} {
if (isset($this->workers[$worker])) {
$this->workers[$worker]->stack($collectable);
}
}
public function submitTo($worker, Threaded $collectable) { public function collect(Closure $collector = null)
if (isset($this->workers[$worker])) { {
$this->workers[$worker]->stack($collectable); $total = 0;
} foreach ($this->workers as $worker) {
} $total += $worker->collect($collector);
}
public function collect(Closure $collector = null) { return $total;
$total = 0; }
foreach ($this->workers as $worker)
$total += $worker->collect($collector);
return $total;
}
public function resize($size) { public function resize($size)
if ($size < $this->size) { {
while ($this->size > $size) { if ($size < $this->size) {
if (isset($this->workers[$this->size-1])) while ($this->size > $size) {
$this->workers[$this->size-1]->shutdown(); if (isset($this->workers[$this->size - 1])) {
unset($this->workers[$this->size-1]); $this->workers[$this->size - 1]->shutdown();
$this->size--; }
} unset($this->workers[$this->size - 1]);
} $this->size--;
} }
}
}
public function shutdown() { public function shutdown()
$this->workers = null; {
} $this->workers = null;
}
protected $workers; protected $workers;
protected $size; protected $size;
protected $last; protected $last;
protected $clazz; protected $clazz;
protected $ctor; protected $ctor;
} }
} }

View File

@ -1,42 +1,64 @@
<?php <?php
if (!extension_loaded("pthreads")) {
class Thread extends Threaded { if (!extension_loaded('pthreads')) {
public function isStarted() { return (bool) ($this->state & THREAD::STARTED); } class Thread extends Threaded
public function isJoined() { return (bool) ($this->state & THREAD::JOINED); } {
public function kill() { public function isStarted()
$this->state |= THREAD::ERROR; {
return true; return (bool) ($this->state & self::STARTED);
} }
public static function getCurrentThreadId() { return 1; } public function isJoined()
public function getThreadId() { return 1; } {
return (bool) ($this->state & self::JOINED);
}
public function start() { public function kill()
if ($this->state & THREAD::STARTED) { {
throw new \RuntimeException(); $this->state |= self::ERROR;
}
$this->state |= THREAD::STARTED; return true;
$this->state |= THREAD::RUNNING; }
try { public static function getCurrentThreadId()
$this->run(); {
} catch(Exception $t) { return 1;
$this->state |= THREAD::ERROR; }
}
$this->state &= ~THREAD::RUNNING; public function getThreadId()
return true; {
} return 1;
}
public function join() { public function start()
if ($this->state & THREAD::JOINED) { {
throw new \RuntimeException(); if ($this->state & self::STARTED) {
} throw new \RuntimeException();
}
$this->state |= THREAD::JOINED; $this->state |= self::STARTED;
return true; $this->state |= self::RUNNING;
}
} try {
$this->run();
} catch (Exception $t) {
$this->state |= self::ERROR;
}
$this->state &= ~self::RUNNING;
return true;
}
public function join()
{
if ($this->state & self::JOINED) {
throw new \RuntimeException();
}
$this->state |= self::JOINED;
return true;
}
}
} }

View File

@ -1,150 +1,199 @@
<?php <?php
if (!extension_loaded("pthreads")) {
class Threaded implements ArrayAccess, Countable, IteratorAggregate, Collectable {
const NOTHING = (0);
const STARTED = (1<<0);
const RUNNING = (1<<1);
const JOINED = (1<<2);
const ERROR = (1<<3);
public function offsetSet($offset, $value) { if (!extension_loaded('pthreads')) {
$this->__set($offset, $value); class Threaded implements ArrayAccess, Countable, IteratorAggregate, Collectable
} {
const NOTHING = (0);
const STARTED = (1 << 0);
const RUNNING = (1 << 1);
const JOINED = (1 << 2);
const ERROR = (1 << 3);
public function offsetGet($offset) { public function offsetSet($offset, $value)
return $this->__get($offset); {
} $this->__set($offset, $value);
}
public function offsetUnset($offset) { public function offsetGet($offset)
$this->__unset($offset); {
} return $this->__get($offset);
}
public function offsetExists($offset) { public function offsetUnset($offset)
return $this->__isset($offset); {
} $this->__unset($offset);
}
public function count() { public function offsetExists($offset)
return count($this->data); {
} return $this->__isset($offset);
}
public function getIterator() { public function count()
return new ArrayIterator($this->data); {
} return count($this->data);
}
public function __set($offset, $value) { public function getIterator()
if ($offset === null) { {
$offset = count($this->data); return new ArrayIterator($this->data);
} }
if (!$this instanceof Volatile) { public function __set($offset, $value)
if (isset($this->data[$offset]) && {
$this->data[$offset] instanceof Threaded) { if ($offset === null) {
throw new \RuntimeException(); $offset = count($this->data);
} }
}
if (is_array($value)) { if (!$this instanceof Volatile) {
$safety = if (isset($this->data[$offset]) &&
new Volatile(); $this->data[$offset] instanceof self) {
$safety->merge( throw new \RuntimeException();
$this->convertToVolatile($value)); }
$value = $safety; }
}
return $this->data[$offset] = $value;
}
public function __get($offset) { if (is_array($value)) {
return $this->data[$offset]; $safety =
} new Volatile();
$safety->merge(
$this->convertToVolatile($value));
$value = $safety;
}
public function __isset($offset) { return $this->data[$offset] = $value;
return isset($this->data[$offset]); }
}
public function __unset($offset) { public function __get($offset)
if (!$this instanceof Volatile) { {
if (isset($this->data[$offset]) && $this->data[$offset] instanceof Threaded) { return $this->data[$offset];
throw new \RuntimeException(); }
}
}
unset($this->data[$offset]);
}
public function shift() { public function __isset($offset)
return array_shift($this->data); {
} return isset($this->data[$offset]);
}
public function chunk($size) { public function __unset($offset)
$chunk = []; {
while (count($chunk) < $size) { if (!$this instanceof Volatile) {
$chunk[] = $this->shift(); if (isset($this->data[$offset]) && $this->data[$offset] instanceof self) {
} throw new \RuntimeException();
return $chunk; }
} }
unset($this->data[$offset]);
}
public function pop() { public function shift()
return array_pop($this->data); {
} return array_shift($this->data);
}
public function merge($merge) { public function chunk($size)
foreach ($merge as $k => $v) { {
$this->data[$k] = $v; $chunk = [];
} while (count($chunk) < $size) {
} $chunk[] = $this->shift();
}
public function wait($timeout = 0) {
return true;
}
public function notify() { return $chunk;
return true; }
}
public function synchronized(Closure $closure, ... $args) { public function pop()
return $closure(...$args); {
} return array_pop($this->data);
}
public function isRunning() { public function merge($merge)
return $this->state & THREAD::RUNNING; {
} foreach ($merge as $k => $v) {
$this->data[$k] = $v;
}
}
public function isTerminated() { public function wait($timeout = 0)
return $this->state & THREAD::ERROR; {
} return true;
}
public static function extend($class) { return true; } public function notify()
{
return true;
}
public function addRef() {} public function synchronized(Closure $closure, ...$args)
public function delRef() {} {
public function getRefCount() {} return $closure(...$args);
}
public function lock() { return true; } public function isRunning()
public function unlock() { return true; } {
public function isWaiting() { return false; } return $this->state & THREAD::RUNNING;
}
public function run() {} public function isTerminated()
{
return $this->state & THREAD::ERROR;
}
public function isGarbage() { return true; } public static function extend($class)
{
return true;
}
private function convertToVolatile($value) { public function addRef()
/* {
if (is_array($value)) { }
foreach ($value as $k => $v) {
if (is_array($v)) { public function delRef()
$value[$k] = {
new Volatile(); }
$value[$k]->merge(
$this->convertToVolatile($v)); public function getRefCount()
} {
} }
}
public function lock()
{
return true;
}
public function unlock()
{
return true;
}
public function isWaiting()
{
return false;
}
public function run()
{
}
public function isGarbage()
{
return true;
}
private function convertToVolatile($value)
{
/*
if (is_array($value)) {
foreach ($value as $k => $v) {
if (is_array($v)) {
$value[$k] =
new Volatile();
$value[$k]->merge(
$this->convertToVolatile($v));
}
}
}
*/ */
return $value; return $value;
} }
protected $data; protected $data;
protected $state; protected $state;
} }
} }

View File

@ -1,21 +1,23 @@
<?php <?php
if (!extension_loaded("pthreads")) {
class Volatile extends Threaded { if (!extension_loaded('pthreads')) {
public function __set($offset, $value) { class Volatile extends Threaded
if ($offset === null) { {
$offset = count($this->data); public function __set($offset, $value)
} {
if ($offset === null) {
$offset = count($this->data);
}
if (is_array($value)) { if (is_array($value)) {
$safety = $safety =
new Volatile(); new self();
$safety->merge( $safety->merge(
$this->convertToVolatile($value)); $this->convertToVolatile($value));
$value = $safety; $value = $safety;
} }
return $this->data[$offset] = $value; return $this->data[$offset] = $value;
} }
} }
} }

View File

@ -1,53 +1,77 @@
<?php <?php
if (!extension_loaded("pthreads")) {
class Worker extends Thread { if (!extension_loaded('pthreads')) {
public function collect(Closure $collector = null) { class Worker extends Thread
foreach ($this->gc as $idx => $collectable) { {
if ($collector) { public function collect(Closure $collector = null)
if ($collector($collectable)) { {
unset($this->gc[$idx]); foreach ($this->gc as $idx => $collectable) {
} if ($collector) {
} else { if ($collector($collectable)) {
if ($this->collector($collectable)) { unset($this->gc[$idx]);
unset($this->gc[$idx]); }
} } else {
} if ($this->collector($collectable)) {
} unset($this->gc[$idx]);
}
}
}
return count($this->gc) + count($this->stack); return count($this->gc) + count($this->stack);
} }
public function collector(Collectable $collectable) { return $collectable->isGarbage(); }
public function shutdown() { return $this->join(); }
public function isShutdown() { return $this->isJoined(); }
public function getStacked() { return count($this->stack); }
public function unstack() { return array_shift($this->stack); }
public function stack(Threaded $collectable) {
$this->stack[] = $collectable;
if ($this->isStarted()) {
$this->runCollectable(count($this->stack)-1, $collectable);
}
}
public function run() { public function collector(Collectable $collectable)
foreach ($this->stack as $idx => $collectable) { {
$this return $collectable->isGarbage();
->runCollectable($idx, $collectable); }
}
}
private function runCollectable($idx, Collectable $collectable) { public function shutdown()
$collectable->worker = $this; {
$collectable->state |= THREAD::RUNNING; return $this->join();
$collectable->run(); }
$collectable->state &= ~THREAD::RUNNING;
$this->gc[] = $collectable;
unset($this->stack[$idx]);
}
private $stack = []; public function isShutdown()
private $gc = []; {
} return $this->isJoined();
}
public function getStacked()
{
return count($this->stack);
}
public function unstack()
{
return array_shift($this->stack);
}
public function stack(Threaded $collectable)
{
$this->stack[] = $collectable;
if ($this->isStarted()) {
$this->runCollectable(count($this->stack) - 1, $collectable);
}
}
public function run()
{
foreach ($this->stack as $idx => $collectable) {
$this
->runCollectable($idx, $collectable);
}
}
private function runCollectable($idx, Collectable $collectable)
{
$collectable->worker = $this;
$collectable->state |= THREAD::RUNNING;
$collectable->run();
$collectable->state &= ~THREAD::RUNNING;
$this->gc[] = $collectable;
unset($this->stack[$idx]);
}
private $stack = [];
private $gc = [];
}
} }