一个以前写的生产者-消费者的线程代码的启示
今天研究了一个以前的写的生产者-消费者的线程代码,这个代码会一直hang下去. 代码如下
# two entities here that try to share a common resource, a queue
import threading
from threading import Thread,Event
from queue import Queue
import time,random
import logging
import sys
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-10s) %(message)s',)
_sentinel=object()
#_sentinel = None
class Producer(threading.Thread):
def __init__(self,queue):
Thread.__init__(self)
self.queue=queue
self.daemon=True
def run(self):
for i in range(5):
item=random.randint(0,256)
self.queue.put(item)
logging.debug('Producer notify: item %d appended to queue by %s\n' %(item,self.name))
self.queue.put(_sentinel)
class Consumer(threading.Thread):
def __init__(self,queue):
Thread.__init__(self)
self.queue=queue
self.daemon=True
def run(self):
while True:
item=self.queue.get()
if item is _sentinel:
self.queue.put(_sentinel)
break
else:
logging.debug('Consumer notify: %d poped from queue by %s' %(item,self.name))
self.queue.task_done()
def main():
shared_queue=Queue()
consumers = []
#start the producer
p1=Producer(shared_queue)
p1.start()
#start 3 consumer
for i in range(0, 3):
t=Consumer(shared_queue)
t.start()
consumers.append(t)
#join the queue
shared_queue.join()
#stop producer
p1.join()
#stop consumers
for t in consumers:
t.join()
if __name__=='__main__':
main()
root@kali:/usr/local/src/py/network# ./thread_share_queue_hang.py
2018-09-05 20:20:01,481 (Thread-1 ) Producer notify: item 93 appended to queue by Thread-1
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 144 appended to queue by Thread-1
2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 93 poped from queue by Thread-2
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 45 appended to queue by Thread-1
2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 144 poped from queue by Thread-2
2018-09-05 20:20:01,482 (Thread-3 ) Consumer notify: 45 poped from queue by Thread-3
2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 106 appended to queue by Thread-1
2018-09-05 20:20:01,483 (Thread-4 ) Consumer notify: 106 poped from queue by Thread-4
2018-09-05 20:20:01,483 (Thread-1 ) Producer notify: item 219 appended to queue by Thread-1
2018-09-05 20:20:01,483 (Thread-2 ) Consumer notify: 219 poped from queue by Thread-2
^CTraceback (most recent call last):
File "./thread_share_queue_hang.py", line 73, in <module>
main()
File "./thread_share_queue_hang.py", line 63, in main
shared_queue.join()
这段代码会hang在63行,shared_queue.join(). 为什么呢?看官方文档的解释,join方法会阻塞直到queue里面所有的item都被处理完。producer 和3个consumer 都往queue里面放了一个None, 共有4个None, 而其中三个None被用来做sentinel来使得3个consumer线程能够结束。最后留下一个None在queue里面,而没有人来处理最后的这个None,所以会导致queue.join()会一直hang.
Queue.join()
Blocks until all items in the queue have been gotten and processed.
那么问题找到了,我就重新写了这个main()部分,但是我用了不同的思路: 第一种: main thread来检查是否只剩下最后一个线程了,如果只剩下最后一个线程(即它自己) 就从queue里面取出最后一个元素处理它.用task_done()函数。
def main():
shared_queue=Queue()
consumers = []
#start the producer
p1=Producer(shared_queue)
p1.start()
#start 3 consumer
for i in range(0, 3):
t=Consumer(shared_queue)
t.start()
consumers.append(t)
while True:
#if I - the main thread also am the last thread
#print(threading.active_count())
if threading.active_count() == 1:
#get the last elem from the queue
if shared_queue.get() == _sentinel:
shared_queue.task_done()
break
else:
time.sleep(0.5)
第二种:
使用qsize() 方法来检查如果queue里面只剩下一个elem,那么我处理它就可以了。
因为这时候其他线程都已经结束了。但是这个qsize() 根据文档来说并不可靠。not reliable
#create shared queue
shared_queue=Queue()
consumers = []
#start the producer
p1=Thread(target=producer, args=(shared_queue, ))
p1.start()
#start 3 consumer
for i in range(0, 3):
t=Thread(target=consumer, args=(shared_queue,))
t.start()
consumers.append(t)
while True:
#print(threading.active_count())
#qsize() method of queue.Queue instance
#Return the approximate size of the queue (not
#use the qsize() method to justify whether there is only 1 None left in the shared queue
if shared_queue.qsize() == 1:
#get the last elem from the queue
if shared_queue.get_nowait() is None:
shared_queue.task_done()
break
else:
time.sleep(0.5)
第三种:
干脆我不对queue做join操作,只要我的线程做join就可以了。
shared_queue=Queue()
consumers = []
#start the producer
p1=Producer(shared_queue)
p1.start()
#start 3 consumer
for i in range(0, 3):
t=Consumer(shared_queue)
t.start()
consumers.append(t)
#block until all task in shared_queue are done
#we can not do this, because we put 4 None in the queue, but only use 3 of them
#as sentinel for consumer, thus use shared_queue.join() will block forever
#shared_queue.join()
#stop producer
p1.join()
#stop consumers
for t in consumers:
t.join()
这三种写法都达到了同样的效果,但是重新看了queue的文档,让我感觉学到了新知识。