编译的时候没有带--enable-pcntl参数的补救办法
cd php-5.2.17/ext/pcntl /srv/php-5.2.17/bin/phpsize ./configure --with-php-config=/srv/php-5.2.17/bin/php-config make && make install
加入到php.ini
cat > /srv/php-5.2.17/etc/conf.d/pcntl.ini <<EOF extension=pcntl.so EOF
php -m | grep pcntl
SIGHUP 终止进程 终端线路挂断 SIGINT 终止进程 中断进程 SIGQUIT 建立CORE文件终止进程,并且生成core文件 SIGILL 建立CORE文件 非法指令 SIGTRAP 建立CORE文件 跟踪自陷 SIGBUS 建立CORE文件 总线错误 SIGSEGV 建立CORE文件 段非法错误 SIGFPE 建立CORE文件 浮点异常 SIGIOT 建立CORE文件 执行I/O自陷 SIGKILL 终止进程 杀死进程 SIGPIPE 终止进程 向一个没有读进程的管道写数据 SIGALARM 终止进程 计时器到时 SIGTERM 终止进程 软件终止信号 SIGSTOP 停止进程 非终端来的停止信号 SIGTSTP 停止进程 终端来的停止信号 SIGCONT 忽略信号 继续执行一个停止的进程 SIGURG 忽略信号 I/O紧急信号 SIGIO 忽略信号 描述符上可以进行I/O SIGCHLD 忽略信号 当子进程停止或退出时通知父进程 SIGTTOU 停止进程 后台进程写终端 SIGTTIN 停止进程 后台进程读终端 SIGXGPU 终止进程 CPU时限超时 SIGXFSZ 终止进程 文件长度过长 SIGWINCH 忽略信号 窗口大小发生变化 SIGPROF 终止进程 统计分布图用计时器到时 SIGUSR1 终止进程 用户定义信号1 SIGUSR2 终止进程 用户定义信号2 SIGVTALRM 终止进程 虚拟计时器到时 1) SIGHUP 本信号在用户终端连接(正常或非正常)结束时发出, 通常是在终端的控 制进程结束时, 通知同一session内的各个作业, 这时它们与控制终端 不再关联. 2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出 3) SIGQUIT 和SIGINT类似, 但由QUIT字符(通常是Ctrl-)来控制. 进程在因收到 SIGQUIT退出时会产生core文件, 在这个意义上类似于一个程序错误信 号. 4) SIGILL 执行了非法指令. 通常是因为可执行文件本身出现错误, 或者试图执行 数据段. 堆栈溢出时也有可能产生这个信号. 5) SIGTRAP 由断点指令或其它trap指令产生. 由debugger使用. 6) SIGABRT 程序自己发现错误并调用abort时产生. 6) SIGIOT 在PDP-11上由iot指令产生, 在其它机器上和SIGABRT一样. 7) SIGBUS 非法地址, 包括内存地址对齐(alignment)出错. eg: 访问一个四个字长 的整数, 但其地址不是4的倍数. 8) SIGFPE 在发生致命的算术运算错误时发出. 不仅包括浮点运算错误, 还包括溢 出及除数为0等其它所有的算术的错误. 9) SIGKILL 用来立即结束程序的运行. 本信号不能被阻塞, 处理和忽略. 10) SIGUSR1 留给用户使用 11) SIGSEGV 试图访问未分配给自己的内存, 或试图往没有写权限的内存地址写数据. 12) SIGUSR2 留给用户使用 13) SIGPIPE Broken pipe 14) SIGALRM 时钟定时信号, 计算的是实际的时间或时钟时间. alarm函数使用该 信号. 15) SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号. 17) SIGCHLD 子进程结束时, 父进程会收到这个信号. 18) SIGCONT 让一个停止(stopped)的进程继续执行. 本信号不能被阻塞. 可以用 一个handler来让程序在由stopped状态变为继续执行时完成特定的 工作. 例如, 重新显示提示符 19) SIGSTOP 停止(stopped)进程的执行. 注意它和terminate以及interrupt的区别: 该进程还未结束, 只是暂停执行. 本信号不能被阻塞, 处理或忽略. 20) SIGTSTP 停止进程的运行, 但该信号可以被处理和忽略. 用户键入SUSP字符时 (通常是Ctrl-Z)发出这个信号 21) SIGTTIN 当后台作业要从用户终端读数据时, 该作业中的所有进程会收到SIGTTIN 信号. 缺省时这些进程会停止执行. 22) SIGTTOU 类似于SIGTTIN, 但在写终端(或修改终端模式)时收到. 23) SIGURG 有"紧急"数据或out-of-band数据到达socket时产生. 24) SIGXCPU 超过CPU时间资源限制. 这个限制可以由getrlimit/setrlimit来读取/ 改变 25) SIGXFSZ 超过文件大小资源限制. 26) SIGVTALRM 虚拟时钟信号. 类似于SIGALRM, 但是计算的是该进程占用的CPU时间. 27) SIGPROF 类似于SIGALRM/SIGVTALRM, 但包括该进程用的CPU时间以及系统调用的 时间. 28) SIGWINCH 窗口大小改变时发出. 29) SIGIO 文件描述符准备就绪, 可以开始进行输入/输出操作. 30) SIGPWR Power failure 有 两个信号可以停止进程:SIGTERM和SIGKILL。 SIGTERM比较友好,进程能捕捉这个信号,根据您的需要来关闭程序。在关闭程序之前,您 可以结束打开的记录文件和完成正在做的任务。在某些情况下,假 如进程正在进行作业而且不能中断,那么进程可以忽略这个SIGTERM信号。 对于SIGKILL信号,进程是不能忽略的。这是一个 “我不管您在做什么,立刻停止”的信号。假如您发送SIGKILL信号给进程,Linux就将进程停止在那里。
<?php declare(ticks = 1); pcntl_signal(SIGHUP, function ($signal) { echo 'HANDLE SIGNAL ' . $signal . PHP_EOL; }); pcntl_signal(SIGTERM, function ($signal) { echo 'HANDLE SIGNAL ' . $signal . PHP_EOL; exit(1); }); posix_kill(posix_getpid(), SIGHUP);
在class中使用pcntl_signal
<?php declare(ticks = 1); class SignalHandler { function __construct() { $this->_init(); } function _init() { pcntl_signal(SIGTERM, array(&$this,"handleSignals")); } function handleSignals($signal) { echo "$signal\n"; } } $o = new SignalHandler(); posix_kill(getmypid(),SIGTERM); // prints 15 ?>
编译PHP时需要加入 --enable-maintainer-zts 选项才能安装pthreads
# pecl install pthread
配置文件
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF extension=pthreads.so EOF
$ php -m |grep pthreads pthreads
pthreads 3.0.0 之后不再支持5.6.x,PHP 版本大于等于 7.0.0
# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16 No valid packages found install failed
解决方法是安装较低版本的pthreads-2.0.10
# pecl install pthreads-2.0.10
<?php class test extends Thread { public $name = ''; public $runing = false; public function __construct($name) { $this->name = $name; $this->runing = true; } public function run() { $n = 0; while ($this->runing) { printf("name: %s %s\n",$this->name, $n); $n++; sleep(1); } } } $pool[] = new test('a'); $pool[] = new test('b'); $pool[] = new test('c'); foreach ($pool as $w) { $w->start(); }
线程池实现方法
$pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while ( true ){ if(count($pool) < 2000){ //定义线程池数量,小于线程池数量则开启新的线程直到小于2000为止 $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ //如果线程已经运行结束,销毁线程,给新的任务使用 if(! $worker->isRunning()){ unset($pool[$name]); } } } } }
<?php class ExampleWork extends Stackable { public function __construct($data) { $this->local = $data; } public function run() { // print_r($this->local);echo "\r\n"; echo '------------------- '. $this->local . " -----------------\r\n"; sleep(1); } } class ExampleWorker extends Worker { public function __construct($name) { $this->name = $name; $this->data = array(); } public function run(){ $this->name = sprintf("(%lu)", $this->getThreadId()); } } /* Dead simple pthreads pool */ class Pool { /* to hold worker threads */ public $workers; /* to hold exit statuses */ public $status; /* prepare $size workers */ public function __construct($size = 10) { $this->size = $size; } /* submit Stackable to Worker */ public function submit(Stackable $stackable) { if (count($this->workers)<$this->size) { $id = count($this->workers); $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id)); $this->workers[$id]->start(PTHREADS_INHERIT_NONE); if ($this->workers[$id]->stack($stackable)) { return $stackable; } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING); }else{ for ($i=0;$i<count($this->workers);$i++){ if( ! $this->workers[$i]->isWorking()){ $this->workers[$i]->stack($stackable); return $stackable; } } } return false; } public function status(){ for ($i=0;$i<count($this->workers);$i++){ printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking()); } printf("\r\n"); } /* Shutdown the pool of threads cleanly, retaining exit status locally */ public function shutdown() { foreach($this->workers as $worker) { $this->status[$worker->getThreadId()]=$worker->shutdown(); } } } /* Create a pool of ten threads */ $pool = new Pool(100); /* Create and submit an array of Stackables */ $work = array(); for ($target = 0; $target < 1000; $target++){ $work[$target]=$pool->submit(new ExampleWork($target)); if($work[$target] == false){ $target--; sleep(1); continue; } for ($i=0;$i<count($work);$i++){ if($work[$i]->isRunning()){ printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning()); } } printf("\r\n"); } $pool->shutdown(); exit();
pthreads 自带 Pool
<?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); sleep(1); printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number); $this->status = "OK"; } } class Logging extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } } $pool = new Pool(5, \ExampleWorker::class, [new Logging()]); foreach (range(0, 100) as $number) { $pool->submit(new Work($number)); } $pool->shutdown(); var_dump($pool); ?>
例 5.7. Threads - Pool
# cat pool.php <?php class MyWork extends Stackable { public $name; public function __construct($name) { echo "Stackable executed $name\n"; $this->name = $name; } public function run() { echo "Stackable $this->name start running\n"; for ($i = 1; $i <= 5; $i++) { echo "Run $this->name : $i\n"; sleep(1); } } } class MyWorker extends Worker { public function __construct($name) { $this->name = $name; } public function run() { echo "Worker started $this->name\n"; } } $pool = new Pool(3, \MyWorker::class, array("pthreads")); $pool->submit(new MyWork("A")); $pool->submit(new MyWork("B")); $pool->submit(new MyWork("C")); $pool->shutdown();
PHP Fatal error: Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38 Stack trace: #0 /home/www/threads.php(38): PDO->__sleep() #1 [internal function]: SQLWorker->run() #2 {main} thrown in /home/www/threads.php on line 38 not ready
<?php class MyWorker extends Worker{ public static $pdo; function __construct($conf){ $this->conf = $conf; } function run(){ self::$pdo = new PDO( 'mysql:host=localhost;dbname=test'); } function get_connection(){ return self::$pdo; } } ?>
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。
下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。
<?php $counter = 0; //$handle=fopen("php://memory", "rw"); //$handle=fopen("php://temp", "rw"); $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //没有互斥锁 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥锁 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。
没有加入锁的结果是计数始终被覆盖,最终结果是2
而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作
上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。
重新编译加入 --enable-maintainer-zts
[root@localhost src]# pecl search pthreads Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.0.7 (stable) Threading API
[root@localhost src]# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13 No valid packages found install failed
解决方法,手工编译旧的安装包
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz [root@localhost src]# tar zxvf pthreads-3.0.6.tgz [root@localhost src]# cd pthreads-3.0.6 [root@localhost pthreads-3.0.6]# phpize [root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config [root@localhost pthreads-3.0.6]# make && make install
故障出现在PHP 7.x,pecl 已经升级至 3.1.6
Stackable 是 Threaded 的一个别名,这个类使用直到 pthreads v.2.0.0,之后便取消Stackable。
$ pecl search pthread Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.1.6 (stable) Threading API
源码
/** * Stackable is an alias of Threaded. This class name was used in pthreads until * version 2.0.0 * @link http://www.php.net/manual/en/class.threaded.php */ class Stackable extends Threaded implements Traversable, Countable, ArrayAccess { }
解决方案,将 Stackable 改为 Threaded
永远循环
<?php $timeouts = 10000000; //callback function function func($fd, $event,$arg) { $time = time(); for($i=0;$i<2;$i++) { echo "Timer-$arg: $time : out-$i\n"; sleep(3); } } //create base and event $base = event_base_new(); for($i=0;$i<2;$i++) { $event[$i] = event_new(); //set event flags event_set($event[$i], $i , EV_PERSIST, 'func', "$i"); //set event base event_base_set($event[$i], $base); //enable event event_add($event[$i], $timeouts); } //start event loop event_base_loop($base);
运行一次然后退出
event_base_loop($base, EVLOOP_ONCE );
<?php $timeouts = 10; //callback function function func($fd, $event,$arg) { $time = time(); echo "Timer-$arg: $time\n"; } //create base and event $base = event_base_new(); for($i=0;$i<10;$i++) { $event[$i] = event_new(); //set event flags event_set($event[$i], $i , EV_PERSIST, 'func', "$i"); //set event base event_base_set($event[$i], $base); //enable event event_add($event[$i], $timeouts); } //start event loop event_base_loop($base, EVLOOP_ONCE ); event_base_loop($base, EVLOOP_ONCE ); event_base_loop($base, EVLOOP_ONCE ); event_base_loop($base, EVLOOP_ONCE );