| 知乎专栏 |
sp = subprocess.check_output(cmd)
text = sp.decode('UTF8')
print(text)
获取IP地址
import subprocess
command = "/usr/bin/ip addr show eth0 | grep 'inet ' | awk '{print $2}' | cut -d/ -f1"
screen = subprocess.check_output(command, shell=True)
print(screen.decode().replace("\n", ""))
制定运行目录
#!/usr/bin/python
# -*-coding:utf-8-*-
import subprocess
output = subprocess.check_output("ls", cwd="/")
print(output.decode())
output = subprocess.check_output("/usr/bin/git pull", cwd="/opt/netkiller", shell=True)
print(output.decode())
创建线程
from threading import Thread
import time
def fun1():
print("fun1 begin")
time.sleep(2)
print("fun1 end")
def fun2():
print("fun2 begin")
time.sleep(6)
print("fun2 end")
threads = []
threads.append(Thread(target=fun1))
threads.append(Thread(target=fun2))
print(threads)
if __name__ == "__main__":
for t in threads:
print(t)
t.start()
print("Done")
import threading
class MyThread(threading.Thread):
def __init__(self, name=None):
threading.Thread.__init__(self)
self.name = name
def run(self):
print self.name
def test():
for i in range(0, 100):
t = MyThread("thread_" + str(i))
t.start()
if __name__=='__main__':
test()
这里实现了一个计数器 count 这个全局变量会被多个线程同时操作,使其能够被顺序相加,需要靠线程锁的帮助。
#-*- encoding: utf-8 -*-
import threading
import time
class Test(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self._run_num = num
def run(self):
global count, mutex
threadname = threading.currentThread().getName()
for x in range(int(self._run_num)):
mutex.acquire()
count = count + 1
mutex.release()
print (threadname, x, count)
time.sleep(1)
if __name__ == '__main__':
global count, mutex
threads = []
num = 5
count = 0
# 创建锁
mutex = threading.Lock()
# 创建线程对象
for x in range(num):
threads.append(Test(10))
# 启动线程
for t in threads:
t.start()
# 等待子线程结束
for t in threads:
t.join()
import threading
# 共享变量
count = 0
# 创建锁
lock = threading.Lock()
def add():
global count
for _ in range(100000):
lock.acquire() # 加锁
count += 1
lock.release() # 释放锁
# 两个线程同时修改共享变量
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("最终结果:", count) # 正确结果 200000
用 with 加锁
import threading
# 共享变量
count = 0
# 创建锁
lock = threading.Lock()
def add():
global count
for _ in range(100000):
with lock:
count += 1
# 两个线程同时修改共享变量
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print("最终结果:", count) # 正确结果 200000
ref: http://www.ibm.com/developerworks/aix/library/au-threadingpython/
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print url.read(1024)
#signals to queue job is done
self.queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
#wait on the queue until everything has been processed
queue.join()
main()
print "Elapsed Time: %s" % (time.time() - start)
import threading
import queue
# 创建线程通信队列
q = queue.Queue()
# 生产者线程:往队列放数据
def producer():
for i in range(5):
q.put(f"数据{i}")
print(f"生产:数据{i}")
# 消费者线程:从队列取数据
def consumer():
while True:
data = q.get() # 阻塞等待数据
print(f"消费:{data}")
q.task_done() # 标记处理完成
# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, daemon=True) # 守护线程
# 启动
t1.start()
t2.start()
# 等待生产者结束 + 队列所有数据处理完
t1.join()
q.join()
print("所有通信完成")
import threading
import time
event = threading.Event()
def waiter():
print("等待信号...")
event.wait() # 阻塞等待
print("收到信号!")
def sender():
time.sleep(2)
print("发送信号!")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)
t1.start()
t2.start()
from datetime import datetime
import threading
import time
event = threading.Event()
def waiter():
while(True):
print("等待信号...")
event.wait() # 阻塞等待
print("收到信号!")
event.clear()
print("-"*20, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "-"*20)
def sender():
while(True):
time.sleep(2)
print("发送信号!")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)
t1.start()
t2.start()
from datetime import datetime
import threading
import time
event = threading.Event()
def waiter():
print("等待信号...")
while(event.wait()): # 阻塞等待
print("收到信号!")
event.clear()
print("-"*20, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "-"*20)
def sender():
while(True):
time.sleep(2)
print("发送信号!")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=sender)
t1.start()
t2.start()
import threading
import time
cond = threading.Condition()
products = []
def producer():
with cond:
products.append("商品")
print("生产完成,通知消费者")
cond.notify() # 唤醒消费者
def consumer():
with cond:
cond.wait() # 等待生产
print("消费者拿到:", products[0])
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
import threading
import time
cond = threading.Condition()
count = 0
def producer():
while True:
with cond:
# while True:
time.sleep(2)
cond.notify()
def consumer():
global count
print("开始计数器")
while True:
with cond:
cond.wait()
count += 1
print(count)
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
import threading
import time
cond = threading.Condition()
data = None
def consumer():
global data
# 手动加锁(代替 with)
cond.acquire()
print("消费者:等待数据...")
cond.wait() # 自动释放锁 → 阻塞 → 唤醒后自动重新拿锁
print(f"消费者:拿到数据 -> {data}")
# 手动释放锁
cond.release()
def producer():
global data
cond.acquire()
print("生产者:正在生产数据...")
time.sleep(2)
data = "Hello Threading!"
print("生产者:数据生产完成,通知消费者")
cond.notify() # 唤醒,但不释放锁
cond.release() # 这里释放锁,消费者才能真正运行
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
time.sleep(0.1)
t2.start()
t1.join()
t2.join()
http://www.myelin.co.nz/post/2003/3/13/#200303135
#!/usr/bin/env python
import os, sys
print "I'm going to fork now - the child will write something to a pipe, and the parent will read it back"
r, w = os.pipe() # r,w是文件描述符, 不是文件对象
pid = os.fork()
if pid:
# 父进程
os.close(w) # 关闭一个文件描述符
r = os.fdopen(r) # 将r转化为文件对象
print "parent: reading"
txt = r.read()
os.waitpid(pid, 0) # 确保子进程被撤销
else:
# 子进程
os.close(r)
w = os.fdopen(w, 'w')
print "child: writing"
w.write("here's some text from the child")
w.close()
print "child: closing"
sys.exit(0)
print "parent: got it; text =", txt
import sys, os
if __name__ == "__main__":
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent, print eventual PID before
print "Daemon PID %d" % pid
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# start the daemon main loop
# Redirect standard file descriptors
sys.stdin = open('/dev/null', 'r')
sys.stdout = open('/dev/null', 'w')
sys.stderr = open('/dev/null', 'w')