python进阶09并发之五生产者消费者

这也是实际项目中使用较多的一种并发模式,用Queue(JoinableQueue)实现,是Python中最常用的方式(这里的queue特指multiprocess包下的queue,非queue.Queue)。

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,Queue
import time,random

def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一个空,生产者已经完事了')
break

print('\033[31m{}消费了{}\033[0m'.format(name,food))
time.sleep(random.random())

def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生产了{}{}'.format(name,food,i)
print(f)
q.put(f)



if __name__ == '__main__':
q = Queue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','馒头',q))
p1.start()
p2.start()

c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.start()
c2.start()

# 让主程序可以等待子进程的结束.
p1.join()
p2.join()
# 生产者的进程结束,这里需要放置两个空值,供消费者获取,用来判断已经没有存货了
q.put(None)
q.put(None)

print('主程序结束..........')

JoinableQueue

创建可连接的共享进程队列,它们也是队列,但是这些队列比较特殊.它们可以允许消费者通知生产者项目已经被成功处理.注意,这里必须是生产者生产完了,生产者的进程被挂起,等到消费者完全消费的时候,生产者进程就结束,然后主程序结束.将消费者进程设置为守护进程,这样的话,主进程结束的时候,消费进程也就结束了.
JoinableQueue()比普通的Queue()多了两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,JoinableQueue
import time,random

def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一个空,生产者已经完事了')
break

print('\033[31m{}消费了{}\033[0m'.format(name,food))
time.sleep(random.random())
q.task_done() # 向生产者发送信号,表示消费了一个

def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生产了{}{}'.format(name,food,i)
print(f)
q.put(f)
q.join() # 当生产者生产完毕之后,会在这里阻塞.等待消费者的消费.



if __name__ == '__main__':
q = JoinableQueue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','馒头',q))
p1.start()
p2.start()

c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.daemon = True # 将消费者设置为守护进程
c2.daemon = True # 将消费者设置为守护进程
c1.start()
c2.start()

# 让主程序可以等待生产子进程的结束.
p1.join()
p2.join()


print('主程序结束..........')

个人不习惯使用JoinableQueue,为什么呢?因为他是通过生产者来“得知”,整个生产消费流程的终结.
在消费者调用q.task_done()时,会触发一次q.join()的检查(q.join()是用来阻塞进程的,最后一个任务完成后,q.task_done()=》q.join()=》阻塞解除),之后生产者进程退出。而消费者呢?业务逻辑层面上是没有退出的(本例)。比如,本例中通过设置为守护进程的方式进行退出。但如果后续主进程还有其他任务,而没有退出呢?那么这些子进程则沦为僵尸进程,虽然对系统资源消耗很少(消费者的queue.get()也是阻塞的,所以不会执行循环,仅仅会“卡”在那里,但也不会自动消亡),但感觉非常别扭的。所以个人还是倾向于用”生产者queue.put(None) ,消费者见到None则break(退出循环)”的传统方式 进行消费者进程触发退出。如果采用这种方式那么JoinableQueue相比Queue就没有优势了。

一点思考

关于生产者和消费者,曾经思考过这么一种实现方式。
假如有一种队列,内置了状态信息(存活生产者个数),设置目前存活的生产者个数
StatusableQueue(product_n=2,size=20) #product_n=2含义:存活的生产者个数,size=20,队列长度
生产者:生产结束,q.product_n - 1(存活生产者个数-1)
消费者:存活生产者个数=0(生产者均已经完成生成) 且 队列长度=0(队列也已经消费结束) 则退出消费者进程.
这种情况下,只需要 消费者.join() 就可以保证整个生产消费进程的执行结束(这一点和JoinableQueue很像,不过JoinableQueue是 生产者.join())
一共只改动3处,就可以完成生产者消费者的并行化控制。 而且更符合逻辑,因为生产者是明确知道自己的退出条件的,而消费者依赖生产者,所以只需要观察消费者就可以知道(生成者是否结束)整个-生成消费链是否完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

def consumer(q,name):
while not (q.product_n==0 and q.size==0):# 存活生产者=0,意味着全部结束生产,队列不会新增数据,queue.size=0说明消费完毕
food = q.get()
print('\033[31m{}消费了{}\033[0m'.format(name,food))
time.sleep(random.random())

def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生产了{}{}'.format(name,food,i)
print(f)
q.put(f)
q.product_n -= 1 # 当生产者生产完毕之后,q.product_n - 1(存活生产者个数-1)



if __name__ == '__main__':
q = StatusableQueue(product_n=2,size=20)#默认状态=正常,n=2含义:生产者个数,size=20,对列长度
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','馒头',q))
p1.start()
p2.start()

c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.start()
c2.start()

# 消费者消费结束(说明生产也一定结束了),则说明整个生产-消费逻辑完成
c1.join()
c2.join()

文中加注释地方为修改点,这样代码最简单,调用方面,含义清晰。
缺点:必须知道生产者个数,这个数据应该不难获取,毕竟后面在创建生产者时也需要使用这个变量控制。

参考

Python的进程间通信

python进阶系列
python进阶01偏函数
python进阶02yield
python进阶03UnboundLocalError和NameError错误
python进阶04IO的同步异步,阻塞非阻塞
python进阶04IO的同步异步,阻塞非阻塞
python进阶05并发之一基本概念
python进阶05并发之一基本概念
python进阶06并发之二技术点关键词
python进阶07并发之三其他问题
python进阶08并发之四map, apply, map_async, apply_async差异
python进阶09并发之五生产者消费者
python进阶10并发之六并行化改造
python进阶11并发之七多种并发方式的效率测试
python进阶12并发之八多线程与数据同步
python进阶13并发之九多进程和数据共享
python进阶14变量作用域LEGB
python进阶15多继承与Mixin
python进阶16炫技巧
python进阶17正则表达式
python进阶18垃圾回收GC
python进阶19装饰器和闭包
python进阶20之actor
python进阶21再识单例模式

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×