Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

2.3. 进程与线程

2.3.1. subprocess

2.3.1.1. check_output

			
sp = subprocess.check_output(cmd)
text = sp.decode('UTF8')
print(text)
		
			

获取IP地址

		 
import subprocess

command = "/usr/bin/ip addr show eth0 | grep 'inet ' | awk '{print $2}' | cut -d/ -f1"
screen = subprocess.check_output(command, shell=True)
print(screen.decode().replace("\n", ""))		
		
			

制定运行目录

		 
#!/usr/bin/python
# -*-coding:utf-8-*-
import subprocess

output = subprocess.check_output("ls", cwd="/")
print(output.decode())

output = subprocess.check_output("/usr/bin/git pull", cwd="/opt/netkiller", shell=True)
print(output.decode())		
		
			

2.3.2. Python 多线程

2.3.2.1. threading 高级线程接口

threading — Higher-level threading interface

创建线程

		
from threading import Thread
import time


def fun1():
    print("fun1 begin")
    time.sleep(2)
    print("fun1 end")


def fun2():
    print("fun2 begin")
    time.sleep(6)
    print("fun2 end")


threads = []
threads.append(Thread(target=fun1))
threads.append(Thread(target=fun2))
print(threads)

if __name__ == "__main__":
    for t in threads:
        print(t)
        t.start()
    print("Done")		
		
			
			
			
import threading
class MyThread(threading.Thread):
	def __init__(self, name=None):
		threading.Thread.__init__(self)
		self.name = name
	
	def run(self):
		print self.name
	
	def test():
		for i in range(0, 100):
			t = MyThread("thread_" + str(i))
			t.start()
	
	if __name__=='__main__':
		test()
			
			

2.3.2.2. Lock 线程锁

这里实现了一个计数器 count 这个全局变量会被多个线程同时操作,使其能够被顺序相加,需要靠线程锁的帮助。

		
#-*- encoding: utf-8 -*-
import threading
import time
 
class Test(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self._run_num = num
 
    def run(self):
        global count, mutex
        threadname = threading.currentThread().getName()
 
        for x in range(int(self._run_num)):
            mutex.acquire()
            count = count + 1
            mutex.release()
            print (threadname, x, count)
            time.sleep(1)
 
if __name__ == '__main__':
    global count, mutex
    threads = []
    num = 5
    count = 0
    # 创建锁
    mutex = threading.Lock()
    # 创建线程对象
    for x in range(num):
        threads.append(Test(10))
    # 启动线程
    for t in threads:
        t.start()
    # 等待子线程结束
    for t in threads:
        t.join()
        
			
			
import threading

# 共享变量
count = 0
# 创建锁
lock = threading.Lock()

def add():
    global count
    for _ in range(100000):
        lock.acquire()  # 加锁
        count += 1
        lock.release()  # 释放锁

# 两个线程同时修改共享变量
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)

t1.start()
t2.start()
t1.join()
t2.join()

print("最终结果:", count)  # 正确结果 200000			
			
			

用 with 加锁

			
import threading

# 共享变量
count = 0
# 创建锁
lock = threading.Lock()

def add():
    global count
    for _ in range(100000):
        with lock:
            count += 1

# 两个线程同时修改共享变量
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)

t1.start()
t2.start()
t1.join()
t2.join()

print("最终结果:", count)  # 正确结果 200000		
			
			

2.3.2.3. Queue 队列

ref: http://www.ibm.com/developerworks/aix/library/au-threadingpython/

		
 #!/usr/bin/env python
import Queue
import threading
import urllib2
import time

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

queue = Queue.Queue()

class ThreadUrl(threading.Thread):
	"""Threaded Url Grab"""
	def __init__(self, queue):
	  threading.Thread.__init__(self)
	  self.queue = queue

	def run(self):
	  while True:
		#grabs host from queue
		host = self.queue.get()

		#grabs urls of hosts and prints first 1024 bytes of page
		url = urllib2.urlopen(host)
		print url.read(1024)

		#signals to queue job is done
		self.queue.task_done()

start = time.time()
def main():

#spawn a pool of threads, and pass them queue instance 
for i in range(5):
  t = ThreadUrl(queue)
  t.setDaemon(True)
  t.start()
  
#populate queue with data   
  for host in hosts:
    queue.put(host)

#wait on the queue until everything has been processed     
queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)
		
			
			
import threading

import queue

# 创建线程通信队列
q = queue.Queue()

# 生产者线程:往队列放数据
def producer():
    for i in range(5):
        q.put(f"数据{i}")
        print(f"生产:数据{i}")

# 消费者线程:从队列取数据
def consumer():
    while True:
        data = q.get()  # 阻塞等待数据
        print(f"消费:{data}")
        q.task_done()  # 标记处理完成

# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, daemon=True)  # 守护线程

# 启动
t1.start()
t2.start()

# 等待生产者结束 + 队列所有数据处理完
t1.join()
q.join()
print("所有通信完成")			
			
			

2.3.2.4. 线程事件

			
import threading
import time

event = threading.Event()

def waiter():
    print("等待信号...")
    event.wait()  # 阻塞等待
    print("收到信号!")

def sender():
    time.sleep(2)
    print("发送信号!")
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)

t1.start()
t2.start()			
			
			
			
from datetime import datetime
import threading
import time

event = threading.Event()

def waiter():
    while(True):
        print("等待信号...")
        event.wait()  # 阻塞等待
        print("收到信号!")
        event.clear()
        print("-"*20, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "-"*20)

def sender():
    while(True):
        time.sleep(2)
        print("发送信号!")
        event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)

t1.start()
t2.start()			
			
			
			
from datetime import datetime
import threading
import time

event = threading.Event()

def waiter():
    print("等待信号...")
    while(event.wait()): # 阻塞等待
        print("收到信号!")
        event.clear()
        print("-"*20, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "-"*20)

def sender():
    while(True):
        time.sleep(2)
        print("发送信号!")
        event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)

t1.start()
t2.start()		
			
			

2.3.2.5. 线程通知

			
import threading
import time

cond = threading.Condition()
products = []

def producer():
    with cond:
        products.append("商品")
        print("生产完成,通知消费者")
        cond.notify()  # 唤醒消费者

def consumer():
    with cond:
        cond.wait()  # 等待生产
        print("消费者拿到:", products[0])

t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)

t1.start()
t2.start()			
			
			
			
import threading
import time

cond = threading.Condition()
count = 0

def producer():
    while True:
        with cond:
            # while True:
            time.sleep(2)
            cond.notify()  

def consumer():
    global count
    print("开始计数器")
    while True:
        with cond:
            cond.wait()
            count += 1
            print(count)


t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)

t1.start()
t2.start()			
			
			
			
import threading
import time

cond = threading.Condition()
data = None

def consumer():
    global data
    
    # 手动加锁(代替 with)
    cond.acquire()
    
    print("消费者:等待数据...")
    cond.wait()  # 自动释放锁 → 阻塞 → 唤醒后自动重新拿锁
    
    print(f"消费者:拿到数据 -> {data}")
    
    # 手动释放锁
    cond.release()

def producer():
    global data
    
    cond.acquire()
    
    print("生产者:正在生产数据...")
    time.sleep(2)
    data = "Hello Threading!"
    print("生产者:数据生产完成,通知消费者")
    
    cond.notify()  # 唤醒,但不释放锁
    
    cond.release()  # 这里释放锁,消费者才能真正运行


t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)

t1.start()
time.sleep(0.1)
t2.start()

t1.join()
t2.join()			
			
			

2.3.3. 守护进程(Daemon)

http://www.myelin.co.nz/post/2003/3/13/#200303135

	
#!/usr/bin/env python

import os, sys

print "I'm going to fork now - the child will write something to a pipe, and the parent will read it back"

r, w = os.pipe()           # r,w是文件描述符, 不是文件对象

pid = os.fork()
if pid:
    # 父进程
    os.close(w)           # 关闭一个文件描述符
    r = os.fdopen(r)      # 将r转化为文件对象
    print "parent: reading"
    txt = r.read()
    os.waitpid(pid, 0)   # 确保子进程被撤销
else:
    # 子进程
    os.close(r)
    w = os.fdopen(w, 'w')
    print "child: writing"
    w.write("here's some text from the child")
    w.close()
    print "child: closing"
    sys.exit(0)

print "parent: got it; text =", txt
	
		
	
import sys, os

if __name__ == "__main__":
    # do the UNIX double-fork magic, see Stevens' "Advanced
    # Programming in the UNIX Environment" for details (ISBN 0201563177)
    try:
        pid = os.fork()
        if pid > 0:
            # exit first parent
            sys.exit(0)
    except OSError, e:
        print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
        sys.exit(1)

    # decouple from parent environment
    os.chdir("/")
    os.setsid()
    os.umask(0)

    # do second fork
    try:
        pid = os.fork()
        if pid > 0:
            # exit from second parent, print eventual PID before
            print "Daemon PID %d" % pid
            sys.exit(0)
    except OSError, e:
        print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
        sys.exit(1)

    # start the daemon main loop
	
		

	
# Redirect standard file descriptors
sys.stdin = open('/dev/null', 'r')
sys.stdout = open('/dev/null', 'w')
sys.stderr = open('/dev/null', 'w')