Home | 简体中文 | 繁体中文 | 杂文 | 打赏(Donations) | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | 知乎专栏 | Search | Email

5.13. Process Control Extensions

5.13.1. pcntl

编译的时候没有带--enable-pcntl参数的补救办法

		
cd php-5.2.17/ext/pcntl

/srv/php-5.2.17/bin/phpsize
./configure --with-php-config=/srv/php-5.2.17/bin/php-config
make && make install
		
		

加入到php.ini

		
cat > /srv/php-5.2.17/etc/conf.d/pcntl.ini <<EOF
extension=pcntl.so
EOF
		
		

php -m | grep pcntl
		
SIGHUP     终止进程     终端线路挂断
SIGINT     终止进程     中断进程
SIGQUIT    建立CORE文件终止进程,并且生成core文件
SIGILL   建立CORE文件       非法指令
SIGTRAP    建立CORE文件       跟踪自陷
SIGBUS   建立CORE文件       总线错误
SIGSEGV   建立CORE文件        段非法错误
SIGFPE   建立CORE文件       浮点异常
SIGIOT   建立CORE文件        执行I/O自陷
SIGKILL   终止进程     杀死进程
SIGPIPE   终止进程      向一个没有读进程的管道写数据
SIGALARM   终止进程     计时器到时
SIGTERM   终止进程      软件终止信号
SIGSTOP   停止进程     非终端来的停止信号
SIGTSTP   停止进程      终端来的停止信号
SIGCONT   忽略信号     继续执行一个停止的进程
SIGURG   忽略信号      I/O紧急信号
SIGIO     忽略信号     描述符上可以进行I/O
SIGCHLD   忽略信号      当子进程停止或退出时通知父进程
SIGTTOU   停止进程     后台进程写终端
SIGTTIN   停止进程      后台进程读终端
SIGXGPU   终止进程     CPU时限超时
SIGXFSZ   终止进程     文件长度过长
SIGWINCH    忽略信号     窗口大小发生变化
SIGPROF   终止进程     统计分布图用计时器到时
SIGUSR1   终止进程      用户定义信号1
SIGUSR2   终止进程     用户定义信号2
SIGVTALRM 终止进程     虚拟计时器到时
 
1)  SIGHUP 本信号在用户终端连接(正常或非正常)结束时发出, 通常是在终端的控 
制进程结束时, 通知同一session内的各个作业,  这时它们与控制终端 
不再关联. 
2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出  
3) SIGQUIT 和SIGINT类似, 但由QUIT字符(通常是Ctrl-)来控制. 进程在因收到 
SIGQUIT退出时会产生core文件,  在这个意义上类似于一个程序错误信 
号. 
4) SIGILL 执行了非法指令. 通常是因为可执行文件本身出现错误, 或者试图执行 
数据段.  堆栈溢出时也有可能产生这个信号. 
5) SIGTRAP 由断点指令或其它trap指令产生. 由debugger使用. 
6) SIGABRT  程序自己发现错误并调用abort时产生. 
6) SIGIOT 在PDP-11上由iot指令产生, 在其它机器上和SIGABRT一样. 
7)  SIGBUS 非法地址, 包括内存地址对齐(alignment)出错. eg: 访问一个四个字长 
的整数, 但其地址不是4的倍数. 
8)  SIGFPE 在发生致命的算术运算错误时发出. 不仅包括浮点运算错误, 还包括溢 
出及除数为0等其它所有的算术的错误. 
9) SIGKILL  用来立即结束程序的运行. 本信号不能被阻塞, 处理和忽略. 
10) SIGUSR1 留给用户使用 
11) SIGSEGV  试图访问未分配给自己的内存, 或试图往没有写权限的内存地址写数据. 
12) SIGUSR2 留给用户使用 
13) SIGPIPE Broken  pipe 
14) SIGALRM 时钟定时信号, 计算的是实际的时间或时钟时间. alarm函数使用该 
信号. 
15) SIGTERM  程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 
处理. 通常用来要求程序自己正常退出.  shell命令kill缺省产生这 
个信号. 
17) SIGCHLD 子进程结束时, 父进程会收到这个信号. 
18) SIGCONT  让一个停止(stopped)的进程继续执行. 本信号不能被阻塞. 可以用 
一个handler来让程序在由stopped状态变为继续执行时完成特定的  
工作. 例如, 重新显示提示符 
19) SIGSTOP 停止(stopped)进程的执行.  注意它和terminate以及interrupt的区别: 
该进程还未结束, 只是暂停执行. 本信号不能被阻塞, 处理或忽略. 
20)  SIGTSTP 停止进程的运行, 但该信号可以被处理和忽略. 用户键入SUSP字符时 
(通常是Ctrl-Z)发出这个信号 
21) SIGTTIN  当后台作业要从用户终端读数据时, 该作业中的所有进程会收到SIGTTIN 
信号. 缺省时这些进程会停止执行. 
22) SIGTTOU  类似于SIGTTIN, 但在写终端(或修改终端模式)时收到. 
23) SIGURG 有"紧急"数据或out-of-band数据到达socket时产生.  
24) SIGXCPU 超过CPU时间资源限制. 这个限制可以由getrlimit/setrlimit来读取/ 
改变 
25)  SIGXFSZ 超过文件大小资源限制. 
26) SIGVTALRM 虚拟时钟信号. 类似于SIGALRM, 但是计算的是该进程占用的CPU时间.  
27) SIGPROF 类似于SIGALRM/SIGVTALRM, 但包括该进程用的CPU时间以及系统调用的 
时间. 
28)  SIGWINCH 窗口大小改变时发出. 
29) SIGIO 文件描述符准备就绪, 可以开始进行输入/输出操作. 
30) SIGPWR Power  failure 
 
有 两个信号可以停止进程:SIGTERM和SIGKILL。  SIGTERM比较友好,进程能捕捉这个信号,根据您的需要来关闭程序。在关闭程序之前,您 可以结束打开的记录文件和完成正在做的任务。在某些情况下,假  如进程正在进行作业而且不能中断,那么进程可以忽略这个SIGTERM信号。
 
对于SIGKILL信号,进程是不能忽略的。这是一个  “我不管您在做什么,立刻停止”的信号。假如您发送SIGKILL信号给进程,Linux就将进程停止在那里。		
		

5.13.1.1. 

			
<?php
declare(ticks = 1);

pcntl_signal(SIGHUP, function ($signal) {
	echo 'HANDLE SIGNAL ' . $signal . PHP_EOL;
});
pcntl_signal(SIGTERM, function ($signal) {
	echo 'HANDLE SIGNAL ' . $signal . PHP_EOL;
	exit(1);
});

posix_kill(posix_getpid(), SIGHUP);			
			
			

在class中使用pcntl_signal

			
<?php
declare(ticks = 1);

class SignalHandler
{
   function __construct() {
       $this->_init();
   }
   function _init() {
       pcntl_signal(SIGTERM, array(&$this,"handleSignals"));
   }
   function handleSignals($signal)
   {
       echo "$signal\n";
   }

}
$o = new SignalHandler();
posix_kill(getmypid(),SIGTERM);
// prints 15
?>			
			
			

5.13.2. pthreads

编译PHP时需要加入 --enable-maintainer-zts 选项才能安装pthreads

# pecl install pthread
	

配置文件

	
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF
extension=pthreads.so
EOF
	
	
$ php -m |grep pthreads
pthreads
	

5.13.2.1. pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16

pthreads 3.0.0 之后不再支持5.6.x,PHP 版本大于等于 7.0.0

			
# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16
No valid packages found
install failed
			
			

解决方法是安装较低版本的pthreads-2.0.10

# pecl install pthreads-2.0.10
			

5.13.2.2. Thread

		
<?php

class test extends Thread {

    public $name   = '';
    public $runing = false;

    public function __construct($name) {

        $this->name   = $name;
        $this->runing = true;
    }

    public function run() {
	$n = 0;
        while ($this->runing) {
		printf("name: %s %s\n",$this->name, $n);
		$n++;
                sleep(1);
        }
    }

}

$pool[] = new test('a');
$pool[] = new test('b');
$pool[] = new test('c');

foreach ($pool as $w) {
    $w->start();
}
		
		

线程池实现方法

		
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while ( true ){
			if(count($pool) < 2000){ //定义线程池数量,小于线程池数量则开启新的线程直到小于2000为止
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					//如果线程已经运行结束,销毁线程,给新的任务使用
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}

		}

	}
		
		

5.13.2.3. Pool

		
<?php
class ExampleWork extends Stackable {
        public function __construct($data) {
                $this->local = $data;
        }
        public function run() {
	//	print_r($this->local);echo "\r\n";
	echo '------------------- '. $this->local . " -----------------\r\n";
	sleep(1);
        }
}
class ExampleWorker extends Worker {

        public function __construct($name) {
                $this->name = $name;
                $this->data = array();
        }
        public function run(){
                $this->name = sprintf("(%lu)", $this->getThreadId());
        }
}
/* Dead simple pthreads pool */
class Pool {
        /* to hold worker threads */
        public $workers;
        /* to hold exit statuses */
        public $status;
        /* prepare $size workers */
        public function __construct($size = 10) {
                $this->size = $size;
        }
        /* submit Stackable to Worker */
        public function submit(Stackable $stackable) {
            if (count($this->workers)<$this->size) {
                    $id = count($this->workers);
                    $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id));
                    $this->workers[$id]->start(PTHREADS_INHERIT_NONE);

                    if ($this->workers[$id]->stack($stackable)) {
                           return $stackable;
                    } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING);
            }else{
				for ($i=0;$i<count($this->workers);$i++){
                	if( ! $this->workers[$i]->isWorking()){
						$this->workers[$i]->stack($stackable);
						return $stackable;
				}
        	}
		}

                return false;
        }
	public function status(){
		for ($i=0;$i<count($this->workers);$i++){
		printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking());
		}
		printf("\r\n");

	}
        /* Shutdown the pool of threads cleanly, retaining exit status locally */
        public function shutdown() {
                foreach($this->workers as $worker) {
                        $this->status[$worker->getThreadId()]=$worker->shutdown();
                }
        }
}
/* Create a pool of ten threads */
$pool = new Pool(100);
/* Create and submit an array of Stackables */
$work = array();
for ($target = 0; $target < 1000; $target++){

    $work[$target]=$pool->submit(new ExampleWork($target));

	if($work[$target] == false){
		$target--;
		sleep(1);
		continue;
	}
	for ($i=0;$i<count($work);$i++){
		if($work[$i]->isRunning()){
			printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning());
		}
	}
	printf("\r\n");
}
$pool->shutdown();
exit();
		
		

pthreads 自带 Pool

		
<?php
class ExampleWorker extends Worker {
 
	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}
 
	protected $logger;	
}
 
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		sleep(1);
		printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number);
		$this->status = "OK";
	}
}
 
class Logging extends Stackable {
 
	protected function log($message, $args = []) {
		$args = func_get_args();	
 
		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}
}
 
$pool = new Pool(5, \ExampleWorker::class, [new Logging()]);
 
foreach (range(0, 100) as $number) {
	$pool->submit(new Work($number));
}
 
 
$pool->shutdown();
 
var_dump($pool);
?>
		
		

例 5.7. Threads - Pool

			
# cat pool.php 
<?php

class MyWork extends Stackable {

    public $name;

    public function __construct($name) {
        echo "Stackable executed  $name\n";
        $this->name = $name;
    }

    public function run() {
        echo "Stackable $this->name start running\n";
        for ($i = 1; $i <= 5; $i++) {
            echo "Run $this->name : $i\n";
            sleep(1);
        }
    }

}

class MyWorker extends Worker {
    public function __construct($name) {
	$this->name = $name;
    }
    public function run() {
        echo "Worker started $this->name\n";
    }
}

$pool = new Pool(3, \MyWorker::class, array("pthreads"));
$pool->submit(new MyWork("A"));
$pool->submit(new MyWork("B"));
$pool->submit(new MyWork("C"));
$pool->shutdown();
			
			

5.13.2.4. FAQ

5.13.2.4.1. You cannot serialize or unserialize PDO instances
			
PHP Fatal error:  Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38
Stack trace:
#0 /home/www/threads.php(38): PDO->__sleep()
#1 [internal function]: SQLWorker->run()
#2 {main}
  thrown in /home/www/threads.php on line 38
 not ready
 			
			
			
<?php
class MyWorker extends Worker{
    public static $pdo;

    function __construct($conf){
        $this->conf = $conf;
    }

    function run(){
        self::$pdo = new PDO(
            'mysql:host=localhost;dbname=test');
    }

    function get_connection(){
        return self::$pdo;
    }
}
?>
			
			

5.13.2.5. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

		
<?php
$counter = 0;
//$handle=fopen("php://memory", "rw"); 
//$handle=fopen("php://temp", "rw"); 
$handle=fopen("/tmp/counter.txt", "w"); 
fwrite($handle, $counter ); 
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter ); 
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
		
		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>
		
		

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

5.13.2.6. FAQ

5.13.2.6.1. configure: error: pthreads requires ZTS, please re-compile PHP with ZTS enabled

重新编译加入 --enable-maintainer-zts

5.13.2.6.2. pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
[root@localhost src]# pecl search pthreads
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.0.7 (stable)        Threading API
			
			
[root@localhost src]# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
No valid packages found
install failed
			

解决方法,手工编译旧的安装包

			
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz
[root@localhost src]# tar zxvf pthreads-3.0.6.tgz 		
[root@localhost src]# cd pthreads-3.0.6
[root@localhost pthreads-3.0.6]# phpize
[root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config
[root@localhost pthreads-3.0.6]# make && make install
			
			
5.13.2.6.3. Class 'Stackable' not found in /path/to/file.php

故障出现在PHP 7.x,pecl 已经升级至 3.1.6

Stackable 是 Threaded 的一个别名,这个类使用直到 pthreads v.2.0.0,之后便取消Stackable。

$ pecl search pthread
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.1.6 (stable)        Threading API			
			

源码

/**
 * Stackable is an alias of Threaded. This class name was used in pthreads until
 * version 2.0.0
 * @link http://www.php.net/manual/en/class.threaded.php
 */
class Stackable extends Threaded implements Traversable, Countable, ArrayAccess {

}
			

解决方案,将 Stackable 改为 Threaded

5.13.3. libevent

5.13.3.1. event_base_loop

永远循环

		
<?php
$timeouts = 10000000;
//callback function
function func($fd, $event,$arg) {
$time = time();
for($i=0;$i<2;$i++) {
    echo "Timer-$arg: $time : out-$i\n";
    sleep(3);
}
}
//create base and event 
$base = event_base_new();
for($i=0;$i<2;$i++) {
$event[$i] = event_new();
//set event flags
event_set($event[$i], $i , EV_PERSIST, 'func', "$i");
//set event base
event_base_set($event[$i], $base);
//enable event
event_add($event[$i], $timeouts);
}
//start event loop
event_base_loop($base);
		
		

运行一次然后退出

event_base_loop($base, EVLOOP_ONCE );
		
		
<?php
$timeouts = 10;
//callback function
function func($fd, $event,$arg) {
$time = time();
    echo "Timer-$arg: $time\n";
}
//create base and event 
$base = event_base_new();
for($i=0;$i<10;$i++) {
$event[$i] = event_new();
//set event flags
event_set($event[$i], $i , EV_PERSIST, 'func', "$i");
//set event base
event_base_set($event[$i], $base);
//enable event
event_add($event[$i], $timeouts);
}
//start event loop
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );		
		
		

5.13.3.2.