一个以前写的生产者-消费者的线程代码的启示

 

一个以前写的生产者-消费者的线程代码的启示

今天研究了一个以前的写的生产者-消费者的线程代码,这个代码会一直hang下去. 代码如下

    # two entities here that try to share a common resource, a queue
    import threading
    from threading import Thread,Event
    from queue import Queue
    import time,random
    import logging
    import sys

    logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-10s) %(message)s',)
    _sentinel=object()
    #_sentinel = None

    class Producer(threading.Thread):
        def __init__(self,queue):
            Thread.__init__(self)
            self.queue=queue
            self.daemon=True

        def run(self):
            for i in range(5):
                item=random.randint(0,256)
                self.queue.put(item)
                logging.debug('Producer notify: item %d appended to queue by %s\n' %(item,self.name))
            self.queue.put(_sentinel)
            

    class Consumer(threading.Thread):
        def __init__(self,queue):
            Thread.__init__(self)
            self.queue=queue
            self.daemon=True
        
        def run(self):
            while True:
                item=self.queue.get()
                if item is _sentinel:
                    self.queue.put(_sentinel)
                    break
                else:
                    logging.debug('Consumer notify: %d poped from queue by %s' %(item,self.name))
                    self.queue.task_done()

    def main():
        shared_queue=Queue()
        consumers = []
        #start the producer
        p1=Producer(shared_queue)
        p1.start()
       
        #start 3 consumer
        for i in range(0, 3):
            t=Consumer(shared_queue)
            t.start()
            consumers.append(t)

         
        #join the queue
        shared_queue.join()
        
        #stop producer
        p1.join()
        
        #stop consumers
        for t in consumers:
            t.join()

    if __name__=='__main__':
        main()

    root@kali:/usr/local/src/py/network# ./thread_share_queue_hang.py
    2018-09-05 20:20:01,481 (Thread-1 ) Producer notify: item 93 appended to queue by Thread-1

    2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 144 appended to queue by Thread-1

    2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 93 poped from queue by Thread-2
    2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 45 appended to queue by Thread-1

    2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 144 poped from queue by Thread-2
    2018-09-05 20:20:01,482 (Thread-3 ) Consumer notify: 45 poped from queue by Thread-3
    2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 106 appended to queue by Thread-1

    2018-09-05 20:20:01,483 (Thread-4 ) Consumer notify: 106 poped from queue by Thread-4
    2018-09-05 20:20:01,483 (Thread-1 ) Producer notify: item 219 appended to queue by Thread-1

    2018-09-05 20:20:01,483 (Thread-2 ) Consumer notify: 219 poped from queue by Thread-2
    ^CTraceback (most recent call last):
      File "./thread_share_queue_hang.py", line 73, in <module>
        main()
      File "./thread_share_queue_hang.py", line 63, in main
        shared_queue.join()

这段代码会hang在63行,shared_queue.join(). 为什么呢?看官方文档的解释,join方法会阻塞直到queue里面所有的item都被处理完。producer 和3个consumer 都往queue里面放了一个None, 共有4个None, 而其中三个None被用来做sentinel来使得3个consumer线程能够结束。最后留下一个None在queue里面,而没有人来处理最后的这个None,所以会导致queue.join()会一直hang.

    Queue.join()

        Blocks until all items in the queue have been gotten and processed.

那么问题找到了,我就重新写了这个main()部分,但是我用了不同的思路:   第一种:   main thread来检查是否只剩下最后一个线程了,如果只剩下最后一个线程(即它自己)   就从queue里面取出最后一个元素处理它.用task_done()函数。  

    def main():
        shared_queue=Queue()
        consumers = []
        #start the producer
        p1=Producer(shared_queue)
        p1.start()
       
        #start 3 consumer
        for i in range(0, 3):
            t=Consumer(shared_queue)
            t.start()
            consumers.append(t)

         
        while True:
            #if I - the main thread also am the last thread
            #print(threading.active_count())
            if threading.active_count() == 1:
                #get the last elem from the queue
                if shared_queue.get() == _sentinel:
                    shared_queue.task_done()
                    break
            else:
                time.sleep(0.5)

第二种:
使用qsize() 方法来检查如果queue里面只剩下一个elem,那么我处理它就可以了。
因为这时候其他线程都已经结束了。但是这个qsize() 根据文档来说并不可靠。not reliable

    #create shared queue
        shared_queue=Queue()
        consumers = []

        #start the producer
        p1=Thread(target=producer, args=(shared_queue, ))
        p1.start()
       
        #start 3 consumer
        for i in range(0, 3):
            t=Thread(target=consumer, args=(shared_queue,))
            t.start()
            consumers.append(t)

         
        while True:
            #print(threading.active_count())
            #qsize() method of queue.Queue instance
            #Return the approximate size of the queue (not
            #use the qsize() method to justify whether there is only 1 None left in the shared queue
            if shared_queue.qsize() == 1:
                #get the last elem from the queue
                if shared_queue.get_nowait() is None:
                    shared_queue.task_done()
                    break
            else:
                time.sleep(0.5)

第三种:
干脆我不对queue做join操作,只要我的线程做join就可以了。

    shared_queue=Queue()
        consumers = []
        #start the producer
        p1=Producer(shared_queue)
        p1.start()
       
        #start 3 consumer
        for i in range(0, 3):
            t=Consumer(shared_queue)
            t.start()
            consumers.append(t)

        #block until all task in shared_queue are done
        #we can not do this, because we put 4 None in the queue, but only use 3 of them
        #as sentinel for consumer, thus use shared_queue.join() will block forever
        #shared_queue.join()
        
        #stop producer
        p1.join()
        
        #stop consumers
        for t in consumers:
            t.join()

这三种写法都达到了同样的效果,但是重新看了queue的文档,让我感觉学到了新知识。