KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

並列処理プログラミングのメモ#3(Producer / Consumer)

登録日 :2025/05/02 18:26
カテゴリ :Python基礎

同期を設計する問題(プロデューサー/コンシューマー問題)

以下のような問題を考える。
- プロデューサーが1つ以上存在し、それぞれがアイテムを生成してバッファに保存する。
- 複数のコンシューマーが同じバッファからアイテムを取り出し、一度に一つずつ処理する。

プロデューサーはそれぞれ自分のペースでアイテムを生成してバッファに保存する。
一方でコンシューマも同じように自分のペースでアイテムを消費するが、空のバッファを読み込まないように調整しなければならない(バッファでの相反する操作を阻止する制御が必要)

つまり、
- バッファがいっぱいの状態でプロデューサーはデータを追加しないようにする。
- バッファが空の状態では、コンシューマはデータにアクセスしないようにする。

ミューテックスで共有アクセスを制限しつつ、セマフォを用いて並列処理を実現する。
サンプルコード

"""
Chapter 9 producer_consumer.py
"""

import time
from threading import Thread, Semaphore, Lock


SIZE = 5
BUFFER = ["" for i in range(SIZE)]
producer_idx: int = 0


mutex = Lock()
empty = Semaphore(SIZE)
full = Semaphore(0)


class Producer(Thread):
    def __init__(self, name: str, maximum_items: int = 5):
        super(Producer, self).__init__()
        self.counter = 0
        self.name = name
        self.maximum_items = maximum_items

    @staticmethod
    def next_index(index: int) -> int:
        return (index + 1) % SIZE

    def run(self) -> None:
        global producer_idx
        while self.counter < self.maximum_items:
            # buffer
            empty.acquire()
            # Lock
            mutex.acquire()
            self.counter += 1
            BUFFER[producer_idx] = f"{self.name} - {self.counter}"
            print(f"{self.name} produced:"
                  f"'{BUFFER[producer_idx]}' into slot {producer_idx}")
            producer_idx = self.next_index(producer_idx)
            mutex.release()
            full.release()
            time.sleep(1)


class Consumer(Thread):
    def __init__(self, name: str, maximum_items: int = 10):
        super(Consumer, self).__init__()
        self.name = name
        self.idx = 0
        self.counter = 0
        self.maximum_items = maximum_items

    def next_index(self) -> int:
        return (self.idx + 1) % SIZE

    def run(self) -> None:
        while self.counter < self.maximum_items:
            full.acquire()
            mutex.acquire()
            item = BUFFER[self.idx]
            print(f"{self.name} consumed item:"
                  f"'{item}' for slot {self.idx}")
            self.idx = self.next_index()
            self.counter += 1
            mutex.release()
            empty.release()
            time.sleep(2)


if __name__ == "__main__":
    threads = [
        Producer("SpongeBob"),
        Producer("Patric"),
        Consumer("Squidward"),
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()