首頁 > 軟體

python多程序中的生產者和消費者模型詳解

2023-03-27 06:01:33

Python生產者消費者模型

一、消費模式

生產者消費者模式 是Controlnet網路中特有的一種傳輸資料的模式。用於兩個CPU之間傳輸資料,即使是不同型別同一廠家的CPU也可以通過設定來使用。

二、傳輸原理

  • 類似與對等傳送,又略有不同,一個生產者可以對應N個消費者,但是一個消費者只能對應一個生產者;
  • 每個生產者消費者對應一個地址,佔一個網路節點,屬於預定性資料,在網路中優先順序最高;
  • 此模式如果在網路中設定過多會影響網路傳輸速度,一般用在傳輸比較重要的資訊上,比如裝置的啟動、停止、故障、急停等等;
  • 在Controlnet網路中節點數是有限制的,最高節點數為99。
  • 如果兩個控制器之前建立了多個生產者消費者的連線,只要一個失敗,則所有的均失敗,將資料整合到使用者自定義結構或陣列中 ,兩個控制器中只保留一個連線。
  • 生產者消費者資訊可以通過乙太網和Controlnet傳輸,但是同時只能通過一種途徑傳輸;
  • 建立標籤時必須建立在全域性變數裡面,不能建立在區域性變數裡標籤的大小不能超過500B;
  • 如果生產者幾個資料傳輸到到同一個控制器的的幾個消費者中,將幾個資料合併在一個使用者自定義標籤中,可以減少連線數,但合併後的資料將會會用相同的RPI。
  • 生產者消費者標籤只能用DINT和REAL,或它們的陣列,或使用者自定義結構資料,因為對外運算元據必須是32位元的,如果有SINT和INT的資料要傳輸,必須將它們組合在使用者自定義結構中傳送,生產者和消費者的標籤資料格式必須一致,才能確保資料的準確性,如果資料打包後超過了 32位元,那麼生產者和消費者雙方必須使用一個複製緩衝指令,以獲得資料的同步,例如Control Logix中的CPS指令。
  • 如果生產者要傳送的32位元資料,與非Control Logix的對方裝置的資料結構不匹配,例如對方是16位元的資料,為避免偏差,改為使用者自定義結構。
  • 消費者的 RPI必須大於等於網路重新整理時間NUT,如果幾個消費者請求同一個生產者,則會以最小最快的RPI為準。

三、實現方式

方法一:

import threading,queue,time
# 建立一個佇列,佇列最大長度為2
q = queue.Queue(maxsize=2)
def product():
    while True:
        # 生產者往佇列塞資料
        q.put('money')
        print('生產了money, 生產時間:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
def consume():
    while True:
        time.sleep(0.5)
        # 消費者取出資料
        data = q.get()
        print('消費了%s, 消費時間%s' % (data, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
t = threading.Thread(target=product)
t1 = threading.Thread(target=consume)
t.start()
t1.start()

缺點:

    實現了多少個消費者consumer程序,就需要在最後往佇列中新增多少個None標識,方便生產完畢結束消費者consumer程序。否則,p.get() 不到任務會阻塞子程序,因為while迴圈,直到佇列q中有新的任務加進來,才會再次執行。而我們的生產者只能生產這麼多東西,所以相當於程式卡死。

方法二:

from multiprocessing import JoinableQueue,Process
import time
def producer(q):
    for i in range(4):
        time.sleep(0.5)
        f = '生產者:已經生產'
        q.put(f)
        print(f)
    q.join()  # 一直阻塞,等待消耗完所有的資料後才釋放
def consumer(name, q):
    while True:
        food = q.get()
        print('33[消費者:消費了%s33' % name)
        time.sleep(0.5)
        q.task_done()  # 每次消耗減1
if __name__ == '__main__':
    q = JoinableQueue()  # 建立佇列
    # 模擬生產者佇列
    p1 = Process(target=producer, args=(q, ))
    p1.start()
    # 模擬消費者佇列
    c1 = Process(target=consumer, args=('money', q))
    c1.daemon = True  # 守護行程:主程序結束,子程序也會結束
    c1.start()
    p1.join()  # 阻塞主程序,等到p1子程序結束才往下執行

優點:參考地址

  • 使用JoinableQueue元件,是因為JoinableQueue中有兩個方法:task_done()和join() 。首先說join()和Process中的join()的效果類似,都是阻塞當前程序,防止當前程序結束。但是JoinableQueue的join()是和task_down()配合使用的。
  • Process中的join()是等到子程序中的程式碼執行完畢,就會執行主程序join()下面的程式碼。而JoinableQueue中的join()是等到佇列中的任務數量為0的時候才會執行q.join()下面的程式碼,否則會一直阻塞。
  • task_down()方法是每獲取一次佇列中的任務,就需要執行一次。直到佇列中的任務數為0的時候,就會執行JoinableQueue的join()後面的方法了。所以生產者生產完所有的資料後,會一直阻塞著。不讓p1和p2程序結束。等到消費者get()一次資料,就會執行一次task_down()方法,從而佇列中的任務數量減1,當數量為0後,執行JoinableQueue的join()後面程式碼,從而p1和p2程序結束。
  • 因為p1和p2新增了join()方法,所以當子程序中的consumer方法執行完後,才會往下執行。從而主程序結束。因為這裡把消費者程序c1和c2 設定成了守護行程,主程序結束的同時,c1和c2 程序也會隨之結束,程序都結束了。所以消費者consumer方法也會結束。

到此這篇關於python多程序中的生產者和消費者模型詳解的文章就介紹到這了,更多相關python生產者和消費者模型內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com