KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

並列処理プログラミングのメモ#2(Semaphoreとミューテックス)駐車場の利用シミュレーション

登録日 :2025/05/02 18:12
カテゴリ :Python基礎

並列化プログラミングでは、同時に動いている複数のプロセス(スレッド)から通信によって共有リソースへのアクセスが発生する。この時、リソースに対する競合状態が発生し、再現性のないバグ(ゼンバグ:heisenbugと呼ばれるしい)が発生する危険性がある。

これらのスレッドを用いた並列プログラミングにおけるバグへの対策として、共有リソースに対する注意が必要となる。スレッドセーフなプログラミングをする方法の一つとして、「同期」がある。
同期を実現する方法(考え方)として、ミューテックスやセマフォがある。
(なぜ再現性がないかというと、並列処理のため、予期せぬ書き込みが常に同じ条件で発生するわけではないことがある。そのため、再現や不具合の分離が難しいのである)

  • ミューテックスとは
    相互排他(mutual exclusion)を略して、ミューテックス(mutex)と呼ぶ。
    並列処理にいて、同じリソースにアクセスすることになるが、適切に排他的制御(書き込みや読み込み中に、別のスレッドから書き換えられることを防ぐ、など)をする必要がある。それを実現する考え方の一つとして、ミューテックスがある。プログラムでは、リソースをつかんだ時にLockを取得し、Lockが開放されるまで、別の処理は当該リソースにアクセスできないようにする。

  • セマフォ(Semaphore)とは
    ミューテックスと考え方は近いが、セマフォは同時にリソースにアクセスできるプロセス(スレッド)数の制御も行うことができる。(ミューテックスは1つだけが占有する)。そのため、リソースのLockの取得・開放は、Semaphoreで許可された複数のプロセスから可能となる。
    ちなみに、セマフォで1つのプロセスに制限した状態がミューテックスと同等である(バイナリセマフォと呼ぶらしい)

以下、Semaphoreとミューテックスの考え方を用いた、駐車場の利用に関するしみゅれーしょん。サンプルでは、3つの駐車スペースに対して、10台の車が待ち行列としてあり、それをSemaphoreでリソース管理して駐車の入庫と退場をシミュレートする。

"""
semaphore.py
"""

import typing as T
import time
import random
from threading import Thread, Semaphore, Lock


TOTAL_SPOTS = 3


class Garage:
    def __init__(self) -> None:
        self.semaphore = Semaphore(TOTAL_SPOTS)
        self.cars_lock = Lock()
        self.parked_cars: T.List[str] = []

    def count_parked_cars(self) -> int:
        return len(self.parked_cars)

    def enter(self, car_name: str) -> None:
        self.semaphore.acquire()
        self.cars_lock.acquire()
        self.parked_cars.append(car_name)
        print(f"{car_name} parked")
        self.cars_lock.release()

    def exit(self, car_name: str) -> None:
        self.cars_lock.acquire()
        self.parked_cars.remove(car_name)
        print(f"{car_name} leaving")
        self.semaphore.release()
        self.cars_lock.release()


def park_car(garage: Garage, _car_name: str) -> None:
    garage.enter(_car_name)
    time.sleep(random.uniform(1, 2))
    garage.exit(_car_name)


def test_garage(garage: Garage, _number_of_cars: int = 10) -> None:
    threads = []
    for car_num in range(_number_of_cars):
        t = Thread(target=park_car, args=(garage, f"Car #{car_num}"))
        threads.append(t)
        t.start()

    for thread in threads:
        thread.join()


if __name__ == "__main__":
    number_of_cars = 10
    garage = Garage()
    test_garage(garage, number_of_cars)

    print("Number of parked cars after a busy day:")
    print(f"Actual: {garage.count_parked_cars()} ¥n Expected: 0")