首页 > php框架 > php实现多线程

php实现多线程

php实现多线程
服务器发送多个请求要实现多进程要方便很多。只能使用在cli模式。可以用在特殊场合,如邮件发送任务等。
资源的共享访问使用了文件锁,并不是很可靠,主要是为了能够在Windwos下使用,如果确实有必要可以考虑自己改用相应的信号灯机制(这个扩展只能用于xUNIX)。

实例
[php]
define(DIR_PHP_EXEC, php);
define(DIR_MAIN_EXEC, __FILE__);
define(DIR_TMP, /tmp);
require_once(my_process.php);

class pp extends my_process_base {
    public function run($param = null) {
        for ($i = 0; $i < 4; $i++) {
            echo "111 $paramn";
            sleep(1);
        }
    }
}

init_my_process();
$obj = $GLOBALS[gal_obj_process_m];
if ($obj->is_main()) {
    $obj->run_task(pp, a);
    $obj->run_task(pp, b);
    $obj->run_task(pp, c);
    $obj->run_task(pp, d);
    //$obj->run_task(pp, b);
    $obj->set_max_run(10);
    $obj->run();
}

[/php]

进程管理类
[php]
/**
* @copyright 2007 movivi
* @author  徐智  <php>
*
* $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
*/
if (!defined(DIR_PHP_EXEC)) define(DIR_PHP_EXEC, php);
//if (!defined(DIR_MAIN_EXEC)) define(DIR_MAIN_EXEC, );
if (!defined(DIR_TMP)) define(DIR_TMP, );
/*****************************************************************************/
/* 初始化 */
define(CMD_MAIN_PROCESS_KEY, main_process_key);
define(CMD_CHILD_PROCESS_NAME, child_process_name);
define(CMD_CHILD_PROCESS_PARAM, child_process_param);

function init_my_process() {
    $GLOBALS[gal_obj_cmd] = new my_cmd_argv();
    $key = $GLOBALS[gal_obj_cmd]->get_value(CMD_MAIN_PROCESS_KEY);
    $key = $key === false ? : $key;
    $GLOBALS[gal_obj_process_m] = new my_process_m($key);
    if (!$GLOBALS[gal_obj_process_m]->is_main()) $GLOBALS[gal_obj_process_m]->run() ;
}

/**
* php多进程类
*
* 你需要从这个对象继承,然后实现你自己的run处理
*/
abstract class my_process_base {
    public function __construct($auto_run=true, $name=) {
    }

    public function __destruct() {
        echo "@endn";
    }

    abstract public function run($param = null);
}


class my_cmd_argv {
    private $cmd_argv = array();
    public function __construct() {
        $argv = $_SERVER[argv];
        for ($i = 1; $i < count($argv); $i++) {
            $cmd = explode(=, $argv[$i]);
            $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : ;
        }
    }

    public function get_key($key) {
        return isset($this->cmd_argv[$key]);
    }

    public function get_value($key) {
        return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;
    }
}

/**
* php多进程管理类
* 可以在PHP中实现多进程处理,只限在控制台方式使用
* 当前的信号实现机制采用文件方式
*
*/
class my_process_m {
    /**
     * @var array $task_list
     * 进程列表
     */
    private $task_list = array();
    private $lock_list = array();
    private $lock = null;
    private $is_main = false;
    private $max_run = 3600000;

    private function release_lock($key = null) {
        $lock = &$this->lock_list;
        if (!is_null($key)) {
            $key = md5($this->build_lock_id($key));
            if (isset($lock[$key])) {
                if (is_resource($lock[$key][0])) fclose($lock[$key][0]);
                unlink($lock[$key][1]);
                unset($lock[$key]);
            }
            return true;
        }

        foreach ($lock as $k => $h) {
            if (is_resource($h)) fclose($h);
            unset($lock[$k]);
        }
        return true;
    }

    private function release_task($key = null) {
        $task = &$this->task_list;
        if (!is_null($key) && isset($task[$key])) {
            if (is_resource($task[$key])) pclose($task[$key]);
            unset($task[$key]);
        } else {
            foreach ($task as $k => $h) {
                if (is_resource($h)) pclose($h);
                unset($task[$k]);
            }
        }
        return true;
    }
           
    private function build_lock_id($key) {
        return DIR_TMP . DIRECTORY_SEPARATOR . $key . _sem.lock;
    }

    protected function run_child_process() {
        $class = $GLOBALS[gal_obj_cmd]->get_value(CMD_CHILD_PROCESS_NAME);
        $param = $GLOBALS[gal_obj_cmd]->get_value(CMD_CHILD_PROCESS_PARAM);
        $param = $param == ? null : unserialize(base64_decode(trim($param)));
        $obj = new $class();
        $obj->run($param);
        $this->task_list[] = $obj;
    }

    public function __construct($lock=) {
        if ($lock === ) {
            $this->is_main = true;
            $key = md5(uniqid()) . _main.my_process;
            $lock = array($key, $this->get($key));
        } else {
            $this->is_main = false;
            $lock = array($lock, 0);
        }
        $this->lock = $lock;
    }

    public function __destruct() {
        $this->release_lock();
        $this->release_task();
    }

    /**
     * 停止所有进程
     *
     */
    public function stop_all() {
    }

    /**
     * 是否是主进程
     *
     */
    public function is_main() {
        return $this->is_main;
    }

    /**
     * 是不是已经存在一个活动信号
     *
     * @param   string      $key
     * @return  bool       
     */
    public function exist($key) {
        return file_exists($this->build_lock_id($key));
    }

    /**
     * 获取一个信号
     *
     * @param   string      $key    
     * @param   int         $max_acquire    最大请求阻塞数量
     * @return mix 如果成功返回一个信号ID
     *
     */
    public function get($key, $max_acquire=5) {
        $fn = $this->build_lock_id($key);
        if (isset($this->lock_list[md5($fn)])) return false;
        $id = fopen($fn, a+);
        if ($id) $this->lock_list[md5($fn)] = array($id, $fn);
        return $id;
    }

    /**
     * 释放一个信号
     *
     * @param   string      $key    
     * @return  bool        如果成功返回一个信号true
     *
     */
    public function remove($key) {
        return $this->release_lock($key);
    }

    /**
     * 获取一个信号
     *
     * @param string    $id         信号ID
     * @param bool      $block      是否阻塞
     */
    public function acquire($id, $block=false) {
        if ($block) {
            return flock($id, LOCK_EX);
        } else {
            return flock($id, LOCK_EX + LOCK_NB);
        }
    }

    /**
     * 释放一个信号
     *
     */
    public function release($id) {
        flock($id, LOCK_UN);
    }
    public function run_task($process_name, $param=null) {
        $this->task_list[] = popen(DIR_PHP_EXEC . -f . DIR_MAIN_EXEC . --
            . CMD_CHILD_PROCESS_NAME . = . $process_name .
            . CMD_CHILD_PROCESS_PARAM . =" . base64_encode(serialize($param)) . "
            . CMD_MAIN_PROCESS_KEY . =" . $this->lock[0] . " ,
            r);
    }

    public function run($auto_run = true) {
        if ($this->is_main) {
            $php = &$this->task_list;
            $max_run = &$this->max_run;
            $id = 0;
            do {
                //echo "process----------------------------------------: n";
                $c = 0;
                foreach ($ps as $k => $h) {
                    $c++;
                    $msg = fread($h, 8000);
                    if (substr($msg, -5, 4) === php) {
                        echo "end process:[$k][$id] echo n{$msg} n";
                        $this->release_task($k);
                    } else {
                        echo "process:[$k][$id] echo n{$msg} n";
                    }
                }
                sleep(1);
            } while ($auto_run && $id++ < $max_run && $c > 0);
        } else {
            $this->run_child_process();
        }
    }

    public function set_max_run($max=1000) {
        $this->max_run = $max;
    }
}


本文地址:http://www.phprm.com/frame/php1004979.html

转载随意,但请附上文章地址:-)

标签:none

发表留言