Add polyfill

This commit is contained in:
Daniil Gentili 2017-05-16 21:19:42 +01:00
parent e1bc813beb
commit ab1bd5330d
7 changed files with 337 additions and 133 deletions

View File

@ -34,7 +34,12 @@
},
"files": [
"src/Socket.php",
"src/Volatile.php"
"src/Collectable.php",
"src/Threaded.php",
"src/Volatile.php",
"src/Thread.php",
"src/Worker.php",
"src/Pool.php"
]
}
}

7
src/Collectable.php Normal file
View File

@ -0,0 +1,7 @@
<?php
if (!extension_loaded("pthreads")) {
interface Collectable {
public function isGarbage();
}
}

62
src/Pool.php Normal file
View File

@ -0,0 +1,62 @@
<?php
if (!extension_loaded("pthreads")) {
class Pool {
public function __construct($size, $class = \Worker::class, $ctor = []) {
$this->size = $size;
$this->clazz = $class;
$this->ctor = $ctor;
}
public function submit(Threaded $collectable) {
if ($this->last > $this->size) {
$this->last = 0;
}
if (!isset($this->workers[$this->last])) {
$this->workers[$this->last] =
new $this->clazz(...$this->ctor);
$this->workers[$this->last]->start();
}
$this->workers[$this->last++]->stack($collectable);
}
public function submitTo($worker, Threaded $collectable) {
if (isset($this->workers[$worker])) {
$this->workers[$worker]->stack($collectable);
}
}
public function collect(Closure $collector = null) {
$total = 0;
foreach ($this->workers as $worker)
$total += $worker->collect($collector);
return $total;
}
public function resize($size) {
if ($size < $this->size) {
while ($this->size > $size) {
if (isset($this->workers[$this->size-1]))
$this->workers[$this->size-1]->shutdown();
unset($this->workers[$this->size-1]);
$this->size--;
}
}
}
public function shutdown() {
$this->workers = null;
}
protected $workers;
protected $size;
protected $last;
protected $clazz;
protected $ctor;
}
}

42
src/Thread.php Normal file
View File

@ -0,0 +1,42 @@
<?php
if (!extension_loaded("pthreads")) {
class Thread extends Threaded {
public function isStarted() { return (bool) ($this->state & THREAD::STARTED); }
public function isJoined() { return (bool) ($this->state & THREAD::JOINED); }
public function kill() {
$this->state |= THREAD::ERROR;
return true;
}
public static function getCurrentThreadId() { return 1; }
public function getThreadId() { return 1; }
public function start() {
if ($this->state & THREAD::STARTED) {
throw new \RuntimeException();
}
$this->state |= THREAD::STARTED;
$this->state |= THREAD::RUNNING;
try {
$this->run();
} catch(Exception $t) {
$this->state |= THREAD::ERROR;
}
$this->state &= ~THREAD::RUNNING;
return true;
}
public function join() {
if ($this->state & THREAD::JOINED) {
throw new \RuntimeException();
}
$this->state |= THREAD::JOINED;
return true;
}
}
}

150
src/Threaded.php Normal file
View File

@ -0,0 +1,150 @@
<?php
if (!extension_loaded("pthreads")) {
class Threaded implements ArrayAccess, Countable, IteratorAggregate, Collectable {
const NOTHING = (0);
const STARTED = (1<<0);
const RUNNING = (1<<1);
const JOINED = (1<<2);
const ERROR = (1<<3);
public function offsetSet($offset, $value) {
$this->__set($offset, $value);
}
public function offsetGet($offset) {
return $this->__get($offset);
}
public function offsetUnset($offset) {
$this->__unset($offset);
}
public function offsetExists($offset) {
return $this->__isset($offset);
}
public function count() {
return count($this->data);
}
public function getIterator() {
return new ArrayIterator($this->data);
}
public function __set($offset, $value) {
if ($offset === null) {
$offset = count($this->data);
}
if (!$this instanceof Volatile) {
if (isset($this->data[$offset]) &&
$this->data[$offset] instanceof Threaded) {
throw new \RuntimeException();
}
}
if (is_array($value)) {
$safety =
new Volatile();
$safety->merge(
$this->convertToVolatile($value));
$value = $safety;
}
return $this->data[$offset] = $value;
}
public function __get($offset) {
return $this->data[$offset];
}
public function __isset($offset) {
return isset($this->data[$offset]);
}
public function __unset($offset) {
if (!$this instanceof Volatile) {
if (isset($this->data[$offset]) && $this->data[$offset] instanceof Threaded) {
throw new \RuntimeException();
}
}
unset($this->data[$offset]);
}
public function shift() {
return array_shift($this->data);
}
public function chunk($size) {
$chunk = [];
while (count($chunk) < $size) {
$chunk[] = $this->shift();
}
return $chunk;
}
public function pop() {
return array_pop($this->data);
}
public function merge($merge) {
foreach ($merge as $k => $v) {
$this->data[$k] = $v;
}
}
public function wait($timeout = 0) {
return true;
}
public function notify() {
return true;
}
public function synchronized(Closure $closure, ... $args) {
return $closure(...$args);
}
public function isRunning() {
return $this->state & THREAD::RUNNING;
}
public function isTerminated() {
return $this->state & THREAD::ERROR;
}
public static function extend($class) { return true; }
public function addRef() {}
public function delRef() {}
public function getRefCount() {}
public function lock() { return true; }
public function unlock() { return true; }
public function isWaiting() { return false; }
public function run() {}
public function isGarbage() { return true; }
private function convertToVolatile($value) {
/*
if (is_array($value)) {
foreach ($value as $k => $v) {
if (is_array($v)) {
$value[$k] =
new Volatile();
$value[$k]->merge(
$this->convertToVolatile($v));
}
}
}
*/
return $value;
}
protected $data;
protected $state;
}
}

View File

@ -1,136 +1,21 @@
<?php
if (!extension_loaded("pthreads")) {
if (!extension_loaded('pthreads')) {
class Volatile implements ArrayAccess, Countable, IteratorAggregate
{
const NOTHING = (0);
const STARTED = (1 << 0);
const RUNNING = (1 << 1);
const JOINED = (1 << 2);
const ERROR = (1 << 3);
public function offsetSet($offset, $value)
{
$this->__set($offset, $value);
}
public function offsetGet($offset)
{
return $this->{$offset};
}
public function offsetUnset($offset)
{
$this->__unset($offset);
}
public function offsetExists($offset)
{
return isset($this->{$offset});
}
public function count()
{
return count(get_object_vars($this));
}
public function getIterator()
{
return new ArrayIterator(get_object_vars($this));
}
public function __set($offset, $value)
{
class Volatile extends Threaded {
public function __set($offset, $value) {
if ($offset === null) {
$offset = count(get_object_vars($this));
$offset = count($this->data);
}
if (!$this instanceof self) {
if (isset($this->{$offset}) &&
$this->{$offset} instanceof Threaded) {
throw new \RuntimeException();
}
if (is_array($value)) {
$safety =
new Volatile();
$safety->merge(
$this->convertToVolatile($value));
$value = $safety;
}
return $this->{$offset} = $value;
return $this->data[$offset] = $value;
}
public function __unset($offset)
{
if (!$this instanceof self) {
if (isset($this->{$offset}) && $this->{$offset} instanceof Threaded) {
throw new \RuntimeException();
}
}
unset($this->{$offset});
}
public function wait($timeout = 0)
{
return true;
}
public function notify()
{
return true;
}
public function synchronized(Closure $closure, ...$args)
{
return $closure(...$args);
}
public function isRunning()
{
return $this->state & THREAD::RUNNING;
}
public function isTerminated()
{
return $this->state & THREAD::ERROR;
}
public static function extend($class)
{
return true;
}
public function addRef()
{
}
public function delRef()
{
}
public function getRefCount()
{
}
public function lock()
{
return true;
}
public function unlock()
{
return true;
}
public function isWaiting()
{
return false;
}
public function run()
{
}
public function isGarbage()
{
return true;
}
protected $state;
}
}

53
src/Worker.php Normal file
View File

@ -0,0 +1,53 @@
<?php
if (!extension_loaded("pthreads")) {
class Worker extends Thread {
public function collect(Closure $collector = null) {
foreach ($this->gc as $idx => $collectable) {
if ($collector) {
if ($collector($collectable)) {
unset($this->gc[$idx]);
}
} else {
if ($this->collector($collectable)) {
unset($this->gc[$idx]);
}
}
}
return count($this->gc) + count($this->stack);
}
public function collector(Collectable $collectable) { return $collectable->isGarbage(); }
public function shutdown() { return $this->join(); }
public function isShutdown() { return $this->isJoined(); }
public function getStacked() { return count($this->stack); }
public function unstack() { return array_shift($this->stack); }
public function stack(Threaded $collectable) {
$this->stack[] = $collectable;
if ($this->isStarted()) {
$this->runCollectable(count($this->stack)-1, $collectable);
}
}
public function run() {
foreach ($this->stack as $idx => $collectable) {
$this
->runCollectable($idx, $collectable);
}
}
private function runCollectable($idx, Collectable $collectable) {
$collectable->worker = $this;
$collectable->state |= THREAD::RUNNING;
$collectable->run();
$collectable->state &= ~THREAD::RUNNING;
$this->gc[] = $collectable;
unset($this->stack[$idx]);
}
private $stack = [];
private $gc = [];
}
}