Szopen Xiao

PHP 线程管理

2015-12-13

通过封装pthreads扩展相关类,得到了两个使用起来比较简单的类

在线程中,需要避免直接使用主线程中静态变量保存的外部资源句柄(比如mysql的长连接资源),否则容易导致段错误。对于pthreads扩展,提供了一个Pool类做此类操作

下载 threading.php 文件

<?php

if(!extension_loaded('pthreads')) {
    die('require pthreads extension');
}

/**
 * PHP 线程状态类
 *
 * @author chopin
 */
class Threading extends Thread
{

    private $complete = false;

    private $data = '';

    private $callback = '';

    /**
     * 获取线程是否完成
     *
     * @return boolean
     */
    final public function getComplete(): bool
    {
        return $this->complete;
    }

    /**
     * 设置线程是否完成 继承本类后,在方法中调用设置
     */
    final protected function setComplate()
    {
        $this->complete = true;
    }

    /**
     * 获取线程数据,此数据是序列化后的数据
     */
    final public function getData(): String
    {
        return $this->data;
    }

    /**
     * 设置线程数据
     * 
     * @param mixed $value            
     */
    final public function setData($value)
    {
        $this->data = serialize($value);
    }

    public function run()
    {
        $callback = unserialize($this->callback);
        if (is_callable($callback)) {
            $callback();
        }
        $this->setComplate();
    }

    /**
     * 传入一个函数在另一个线程中执行
     * 
     * @param callable $callback            
     */
    final public function workerCall(callable $callback,$pool=null)
    {
        $this->callback = serialize($callback);
        if($pool) {
           $pool->addWorker($this);
        } else {
           $this->start();
        }
    }
}

/**
 * PHP 线程池
 *
 * @author chopin
 */
class PoolManager extends SplObjectStorage
{

    private $size = 0;

    private $callback;

    /**
     *
     * @param int $size
     *            设置线程池大小
     */
    public function __construct(int $size)
    {
        $this->size = $size;
    }

    /**
     * 设置loop期间调用函数
     *
     * @param callable $call
     *            可调用函数
     */
    public function loopCall(callable $call)
    {
        $this->callback = $call;
    }

    private function checkLoop($size = 0)
    {
        while ($this->count() > $size) {
            foreach ($this as $w) {
                if ($w->getComplete()) {
                    if (is_callable($this->callback)) {
                        ($this->callback)($w);
                    }
                    $this->detach($w);
                }
            }
            usleep(100);
        }
    }

    /**
     * 最后循环等待
     */
    public function loop()
    {
        $this->checkLoop();
    }

    /**
     * 添加一个线程类到线程池
     *
     * @param StateWorker $worker
     *            线程类
     */
    public function addWorker(Threading $worker)
    {
        $this->checkLoop($this->size);
        $worker->start();
        $this->attach($worker);
    }
}

用法:

<?php
class myThread extends Threading
{
    public function run()
    {
        //你的业务逻辑
        $this->setComplate();
    }
}
$pool = new PoolManager(5);
$pool->addWorker(new myThread);
$pool->addWorker(new myThread);
$pool->addWorker(new myThread);
$pool->addWorker(new myThread);
$pool->addWorker(new myThread);

//下面的线程会在前面的执行完毕才会加入
$pool->addWorker(new myThread);

$w = new Threading;
function callFunc() {
}
$w->workerCall('callFunc', $pool);

//等待所有线程执行结束
$pool->loop();

comments powered by Disqus