复习
# 信号量from multiprocessing import Semaphore# 用锁的原理实现的,内置了一个计数器# 在同一个事件,只能有指定数量的进程执行某一段被控制住的代码# 事件# wait阻塞受到事件控制的同步组件# 状态 True Flase is_set# true--》false 用clear()# false --->true 用set()# wait方法 状态为true不阻塞 状态为false的时候阻塞# 队列# Queue# put 当队列满的时候阻塞等待队列有空位置# get 当队列空的时候阻塞等待队列有数据# full empty 不完全准确# JoinableQuere# task_done 与get连用# join 与put连用
管道
from multiprocessing import Pipe,Processdef func(conn1,conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except EOFError: conn1.close() breakif __name__ == '__main__': conn1,conn2 = Pipe() Process(target=func,args=(conn1,conn2)).start() conn1.close() for i in range(20): conn2.send('吃了吗') conn2.close()
from multiprocessing import Pipe,Processimport time,randomdef producer(con,pro,name,food): con.close() for i in range(4): time.sleep(random.randint(1,3)) f = '%s生产%s%s'%(name,food,i) print(f) pro.send(f) pro.close()def consumer(con,pro,name): pro.close() while True: try: food = con.recv() print('%s吃了%s'%(name,food)) time.sleep(random.randint(1,3)) except EOFError: con.close() breakif __name__ == '__main__': con,pro = Pipe() p = Process(target=producer,args = (con,pro,'egon','泔水')) p.start() c = Process(target=consumer,args = (con,pro,'alex')) c.start() con.close() pro.close()
进程之间的数据共享
from multiprocessing import Manager,Process,Lockdef main(dic,lock): lock.acquire() dic['count'] -= 1 lock.release()if __name__ == '__main__': m = Manager() l = Lock() dic = m.dict({ 'count':100}) p_list = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_list.append(p) for i in p_list: i.join() print('主进程:',dic)
进程池
# 为什么有进程池 # 效率 # 每开启进程,开启属于这个进程的内存空间 # 寄存器 堆栈 文件 # 进程过多,操作系统调度进程 # 进程池 # python中的先创建一个属于进程的池子 # 这个池子指定能存放多少个进程 # 先将这些进程创建好
from multiprocessing import Poolimport os,timedef func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s'%n,os.getpid())if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(func,args=(i,)) p.close() #结束进程池接受任务 p.join() #感知进程池中的任务执行结束
socket_server-进程池
#serverimport socketfrom multiprocessing import Pooldef func(conn): conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn.close()if __name__ == '__main__': p = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() p.apply_async(func,args=(conn,)) sk.close()#clientimport socketsk = socket.socket()sk.connect(('127.0.0.1',8080))ret = sk.recv(1024).decode('utf-8')print(ret)msg = input('>>>').encode('utf-8')sk.send(msg)sk.close()
进程池返回值
# p.map(funcname,iterable) 默认异步的执行任务,自带close和join # p.apply 同步调用 # p.apply_async 异步调用 和主进程完全异步 需要手动close和join
from multiprocessing import Pooldef func(i): return i*iif __name__ == '__main__': p = Pool(5) for i in range(10): res = p.apply(func,args=(i,)) #apply的结果就是func的返回值 print(res)
import timefrom multiprocessing import Pooldef func(i): time.sleep(0.5) return i*iif __name__ == '__main__': p = Pool(5) res_list = [] for i in range(10): res = p.apply_async(func,args=(i,)) # res_list.append(res) for res in res_list:print(res.get())
#mapimport timefrom multiprocessing import Pooldef func(i): time.sleep(0.5) return i*iif __name__ == '__main__': p = Pool(5) ret = p.map(func,range(10)) print(ret)[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
进程池的回调函数
from multiprocessing import Pooldef func1(n): print('in func1') return n*ndef func2(nn): print('in func2') print(nn)if __name__ == '__main__': p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()
in func1
in func2100
from multiprocessing import Poolimport osdef func1(n): print('in func1',os.getpid()) return n*ndef func2(nn): #参数只能是func1的返回值 print('in func2',os.getpid()) print(nn)if __name__ == '__main__': print('主进程: ',os.getpid()) p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()主进程: 11172in func1 11760in func2 11172100