並列処理プログラミングのメモ#3(Producer / Consumer)
| 登録日 | :2025/05/02 18:26 |
|---|---|
| カテゴリ | :Python基礎 |
同期を設計する問題(プロデューサー/コンシューマー問題)
以下のような問題を考える。
- プロデューサーが1つ以上存在し、それぞれがアイテムを生成してバッファに保存する。
- 複数のコンシューマーが同じバッファからアイテムを取り出し、一度に一つずつ処理する。
プロデューサーはそれぞれ自分のペースでアイテムを生成してバッファに保存する。
一方でコンシューマも同じように自分のペースでアイテムを消費するが、空のバッファを読み込まないように調整しなければならない(バッファでの相反する操作を阻止する制御が必要)
つまり、
- バッファがいっぱいの状態でプロデューサーはデータを追加しないようにする。
- バッファが空の状態では、コンシューマはデータにアクセスしないようにする。
ミューテックスで共有アクセスを制限しつつ、セマフォを用いて並列処理を実現する。
サンプルコード
"""
Chapter 9 producer_consumer.py
"""
import time
from threading import Thread, Semaphore, Lock
SIZE = 5
BUFFER = ["" for i in range(SIZE)]
producer_idx: int = 0
mutex = Lock()
empty = Semaphore(SIZE)
full = Semaphore(0)
class Producer(Thread):
def __init__(self, name: str, maximum_items: int = 5):
super(Producer, self).__init__()
self.counter = 0
self.name = name
self.maximum_items = maximum_items
@staticmethod
def next_index(index: int) -> int:
return (index + 1) % SIZE
def run(self) -> None:
global producer_idx
while self.counter < self.maximum_items:
# buffer
empty.acquire()
# Lock
mutex.acquire()
self.counter += 1
BUFFER[producer_idx] = f"{self.name} - {self.counter}"
print(f"{self.name} produced:"
f"'{BUFFER[producer_idx]}' into slot {producer_idx}")
producer_idx = self.next_index(producer_idx)
mutex.release()
full.release()
time.sleep(1)
class Consumer(Thread):
def __init__(self, name: str, maximum_items: int = 10):
super(Consumer, self).__init__()
self.name = name
self.idx = 0
self.counter = 0
self.maximum_items = maximum_items
def next_index(self) -> int:
return (self.idx + 1) % SIZE
def run(self) -> None:
while self.counter < self.maximum_items:
full.acquire()
mutex.acquire()
item = BUFFER[self.idx]
print(f"{self.name} consumed item:"
f"'{item}' for slot {self.idx}")
self.idx = self.next_index()
self.counter += 1
mutex.release()
empty.release()
time.sleep(2)
if __name__ == "__main__":
threads = [
Producer("SpongeBob"),
Producer("Patric"),
Consumer("Squidward"),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()