From 1f35f9fdb7af058575fdd5961143e34d9331d943 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 16 May 2017 20:19:54 +0000 Subject: [PATCH] Apply fixes from StyleCI --- src/Collectable.php | 9 +- src/Pool.php | 107 ++++++++-------- src/Thread.php | 86 ++++++++----- src/Threaded.php | 289 ++++++++++++++++++++++++++------------------ src/Volatile.php | 36 +++--- src/Worker.php | 116 +++++++++++------- 6 files changed, 374 insertions(+), 269 deletions(-) diff --git a/src/Collectable.php b/src/Collectable.php index 709b9091..5fbf3343 100644 --- a/src/Collectable.php +++ b/src/Collectable.php @@ -1,7 +1,8 @@ size = $size; + $this->clazz = $class; + $this->ctor = $ctor; + } - public function __construct($size, $class = \Worker::class, $ctor = []) { - $this->size = $size; - $this->clazz = $class; - $this->ctor = $ctor; - } + public function submit(Threaded $collectable) + { + if ($this->last > $this->size) { + $this->last = 0; + } - public function submit(Threaded $collectable) { - if ($this->last > $this->size) { - $this->last = 0; - } + if (!isset($this->workers[$this->last])) { + $this->workers[$this->last] = + new $this->clazz(...$this->ctor); + $this->workers[$this->last]->start(); + } - if (!isset($this->workers[$this->last])) { - $this->workers[$this->last] = - new $this->clazz(...$this->ctor); - $this->workers[$this->last]->start(); - } + $this->workers[$this->last++]->stack($collectable); + } - $this->workers[$this->last++]->stack($collectable); - } + public function submitTo($worker, Threaded $collectable) + { + if (isset($this->workers[$worker])) { + $this->workers[$worker]->stack($collectable); + } + } - public function submitTo($worker, Threaded $collectable) { - if (isset($this->workers[$worker])) { - $this->workers[$worker]->stack($collectable); - } - } + public function collect(Closure $collector = null) + { + $total = 0; + foreach ($this->workers as $worker) { + $total += $worker->collect($collector); + } - public function collect(Closure $collector = null) { - $total = 0; - foreach ($this->workers as $worker) - $total += $worker->collect($collector); - return $total; - } + return $total; + } - public function resize($size) { - if ($size < $this->size) { - while ($this->size > $size) { - if (isset($this->workers[$this->size-1])) - $this->workers[$this->size-1]->shutdown(); - unset($this->workers[$this->size-1]); - $this->size--; - } - } - } + public function resize($size) + { + if ($size < $this->size) { + while ($this->size > $size) { + if (isset($this->workers[$this->size - 1])) { + $this->workers[$this->size - 1]->shutdown(); + } + unset($this->workers[$this->size - 1]); + $this->size--; + } + } + } - public function shutdown() { - $this->workers = null; - } + public function shutdown() + { + $this->workers = null; + } - protected $workers; - protected $size; - protected $last; - protected $clazz; - protected $ctor; - } + protected $workers; + protected $size; + protected $last; + protected $clazz; + protected $ctor; + } } - - diff --git a/src/Thread.php b/src/Thread.php index f53c2e46..254f1f87 100644 --- a/src/Thread.php +++ b/src/Thread.php @@ -1,42 +1,64 @@ state & THREAD::STARTED); } - public function isJoined() { return (bool) ($this->state & THREAD::JOINED); } - public function kill() { - $this->state |= THREAD::ERROR; - return true; - } +if (!extension_loaded('pthreads')) { + class Thread extends Threaded + { + public function isStarted() + { + return (bool) ($this->state & self::STARTED); + } - public static function getCurrentThreadId() { return 1; } - public function getThreadId() { return 1; } + public function isJoined() + { + return (bool) ($this->state & self::JOINED); + } - public function start() { - if ($this->state & THREAD::STARTED) { - throw new \RuntimeException(); - } + public function kill() + { + $this->state |= self::ERROR; - $this->state |= THREAD::STARTED; - $this->state |= THREAD::RUNNING; + return true; + } - try { - $this->run(); - } catch(Exception $t) { - $this->state |= THREAD::ERROR; - } + public static function getCurrentThreadId() + { + return 1; + } - $this->state &= ~THREAD::RUNNING; - return true; - } + public function getThreadId() + { + return 1; + } - public function join() { - if ($this->state & THREAD::JOINED) { - throw new \RuntimeException(); - } + public function start() + { + if ($this->state & self::STARTED) { + throw new \RuntimeException(); + } - $this->state |= THREAD::JOINED; - return true; - } - } + $this->state |= self::STARTED; + $this->state |= self::RUNNING; + + try { + $this->run(); + } catch (Exception $t) { + $this->state |= self::ERROR; + } + + $this->state &= ~self::RUNNING; + + return true; + } + + public function join() + { + if ($this->state & self::JOINED) { + throw new \RuntimeException(); + } + + $this->state |= self::JOINED; + + return true; + } + } } diff --git a/src/Threaded.php b/src/Threaded.php index 6f08545a..e7a79f36 100644 --- a/src/Threaded.php +++ b/src/Threaded.php @@ -1,150 +1,199 @@ __set($offset, $value); - } +if (!extension_loaded('pthreads')) { + class Threaded implements ArrayAccess, Countable, IteratorAggregate, Collectable + { + const NOTHING = (0); + const STARTED = (1 << 0); + const RUNNING = (1 << 1); + const JOINED = (1 << 2); + const ERROR = (1 << 3); - public function offsetGet($offset) { - return $this->__get($offset); - } + public function offsetSet($offset, $value) + { + $this->__set($offset, $value); + } - public function offsetUnset($offset) { - $this->__unset($offset); - } + public function offsetGet($offset) + { + return $this->__get($offset); + } - public function offsetExists($offset) { - return $this->__isset($offset); - } + public function offsetUnset($offset) + { + $this->__unset($offset); + } - public function count() { - return count($this->data); - } + public function offsetExists($offset) + { + return $this->__isset($offset); + } - public function getIterator() { - return new ArrayIterator($this->data); - } + public function count() + { + return count($this->data); + } - public function __set($offset, $value) { - if ($offset === null) { - $offset = count($this->data); - } + public function getIterator() + { + return new ArrayIterator($this->data); + } - if (!$this instanceof Volatile) { - if (isset($this->data[$offset]) && - $this->data[$offset] instanceof Threaded) { - throw new \RuntimeException(); - } - } + public function __set($offset, $value) + { + if ($offset === null) { + $offset = count($this->data); + } - if (is_array($value)) { - $safety = - new Volatile(); - $safety->merge( - $this->convertToVolatile($value)); - $value = $safety; - } - - return $this->data[$offset] = $value; - } + if (!$this instanceof Volatile) { + if (isset($this->data[$offset]) && + $this->data[$offset] instanceof self) { + throw new \RuntimeException(); + } + } - public function __get($offset) { - return $this->data[$offset]; - } + if (is_array($value)) { + $safety = + new Volatile(); + $safety->merge( + $this->convertToVolatile($value)); + $value = $safety; + } - public function __isset($offset) { - return isset($this->data[$offset]); - } + return $this->data[$offset] = $value; + } - public function __unset($offset) { - if (!$this instanceof Volatile) { - if (isset($this->data[$offset]) && $this->data[$offset] instanceof Threaded) { - throw new \RuntimeException(); - } - } - unset($this->data[$offset]); - } + public function __get($offset) + { + return $this->data[$offset]; + } - public function shift() { - return array_shift($this->data); - } + public function __isset($offset) + { + return isset($this->data[$offset]); + } - public function chunk($size) { - $chunk = []; - while (count($chunk) < $size) { - $chunk[] = $this->shift(); - } - return $chunk; - } + public function __unset($offset) + { + if (!$this instanceof Volatile) { + if (isset($this->data[$offset]) && $this->data[$offset] instanceof self) { + throw new \RuntimeException(); + } + } + unset($this->data[$offset]); + } - public function pop() { - return array_pop($this->data); - } + public function shift() + { + return array_shift($this->data); + } - public function merge($merge) { - foreach ($merge as $k => $v) { - $this->data[$k] = $v; - } - } - - public function wait($timeout = 0) { - return true; - } + public function chunk($size) + { + $chunk = []; + while (count($chunk) < $size) { + $chunk[] = $this->shift(); + } - public function notify() { - return true; - } + return $chunk; + } - public function synchronized(Closure $closure, ... $args) { - return $closure(...$args); - } + public function pop() + { + return array_pop($this->data); + } - public function isRunning() { - return $this->state & THREAD::RUNNING; - } + public function merge($merge) + { + foreach ($merge as $k => $v) { + $this->data[$k] = $v; + } + } - public function isTerminated() { - return $this->state & THREAD::ERROR; - } + public function wait($timeout = 0) + { + return true; + } - public static function extend($class) { return true; } + public function notify() + { + return true; + } - public function addRef() {} - public function delRef() {} - public function getRefCount() {} + public function synchronized(Closure $closure, ...$args) + { + return $closure(...$args); + } - public function lock() { return true; } - public function unlock() { return true; } - public function isWaiting() { return false; } + public function isRunning() + { + return $this->state & THREAD::RUNNING; + } - public function run() {} + public function isTerminated() + { + return $this->state & THREAD::ERROR; + } - public function isGarbage() { return true; } + public static function extend($class) + { + return true; + } - private function convertToVolatile($value) { - /* - if (is_array($value)) { - foreach ($value as $k => $v) { - if (is_array($v)) { - $value[$k] = - new Volatile(); - $value[$k]->merge( - $this->convertToVolatile($v)); - } - } - } + public function addRef() + { + } + + public function delRef() + { + } + + public function getRefCount() + { + } + + public function lock() + { + return true; + } + + public function unlock() + { + return true; + } + + public function isWaiting() + { + return false; + } + + public function run() + { + } + + public function isGarbage() + { + return true; + } + + private function convertToVolatile($value) + { + /* + if (is_array($value)) { + foreach ($value as $k => $v) { + if (is_array($v)) { + $value[$k] = + new Volatile(); + $value[$k]->merge( + $this->convertToVolatile($v)); + } + } + } */ - return $value; - } + return $value; + } - protected $data; - protected $state; - } + protected $data; + protected $state; + } } diff --git a/src/Volatile.php b/src/Volatile.php index fadaf436..d62cc378 100644 --- a/src/Volatile.php +++ b/src/Volatile.php @@ -1,21 +1,23 @@ data); - } +if (!extension_loaded('pthreads')) { + class Volatile extends Threaded + { + public function __set($offset, $value) + { + if ($offset === null) { + $offset = count($this->data); + } - if (is_array($value)) { - $safety = - new Volatile(); - $safety->merge( - $this->convertToVolatile($value)); - $value = $safety; - } - - return $this->data[$offset] = $value; - } - } + if (is_array($value)) { + $safety = + new self(); + $safety->merge( + $this->convertToVolatile($value)); + $value = $safety; + } + + return $this->data[$offset] = $value; + } + } } diff --git a/src/Worker.php b/src/Worker.php index c9579398..8b58e5f9 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -1,53 +1,77 @@ gc as $idx => $collectable) { - if ($collector) { - if ($collector($collectable)) { - unset($this->gc[$idx]); - } - } else { - if ($this->collector($collectable)) { - unset($this->gc[$idx]); - } - } - } +if (!extension_loaded('pthreads')) { + class Worker extends Thread + { + public function collect(Closure $collector = null) + { + foreach ($this->gc as $idx => $collectable) { + if ($collector) { + if ($collector($collectable)) { + unset($this->gc[$idx]); + } + } else { + if ($this->collector($collectable)) { + unset($this->gc[$idx]); + } + } + } - return count($this->gc) + count($this->stack); - } - public function collector(Collectable $collectable) { return $collectable->isGarbage(); } - public function shutdown() { return $this->join(); } - public function isShutdown() { return $this->isJoined(); } - public function getStacked() { return count($this->stack); } - public function unstack() { return array_shift($this->stack); } - public function stack(Threaded $collectable) { - $this->stack[] = $collectable; - if ($this->isStarted()) { - $this->runCollectable(count($this->stack)-1, $collectable); - } - } + return count($this->gc) + count($this->stack); + } - public function run() { - foreach ($this->stack as $idx => $collectable) { - $this - ->runCollectable($idx, $collectable); - } - } + public function collector(Collectable $collectable) + { + return $collectable->isGarbage(); + } - private function runCollectable($idx, Collectable $collectable) { - $collectable->worker = $this; - $collectable->state |= THREAD::RUNNING; - $collectable->run(); - $collectable->state &= ~THREAD::RUNNING; - $this->gc[] = $collectable; - unset($this->stack[$idx]); - } + public function shutdown() + { + return $this->join(); + } - private $stack = []; - private $gc = []; - } + public function isShutdown() + { + return $this->isJoined(); + } + + public function getStacked() + { + return count($this->stack); + } + + public function unstack() + { + return array_shift($this->stack); + } + + public function stack(Threaded $collectable) + { + $this->stack[] = $collectable; + if ($this->isStarted()) { + $this->runCollectable(count($this->stack) - 1, $collectable); + } + } + + public function run() + { + foreach ($this->stack as $idx => $collectable) { + $this + ->runCollectable($idx, $collectable); + } + } + + private function runCollectable($idx, Collectable $collectable) + { + $collectable->worker = $this; + $collectable->state |= THREAD::RUNNING; + $collectable->run(); + $collectable->state &= ~THREAD::RUNNING; + $this->gc[] = $collectable; + unset($this->stack[$idx]); + } + + private $stack = []; + private $gc = []; + } } - -