KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

並列処理プログラミングのメモ#7 (イベント:Reactorパターン)

登録日 :2025/05/11 17:49
カテゴリ :Python基礎

I/Oバウンドのアプリケーションでイベントベースの並行処理を実装するための最も一般的なパターンである。
シングルスレッドのイベントループとノンブロッキングイベントを使い、イベントを処理するために適切なコールバックを呼び出す。
(コールバックを呼び出すタイミングは、OSのselectを用いることで、OSの機能を使う)

"""
Chapter 11 /pizza_reactor.py
イベント駆動型のピザサーバ
I/Oの多重化
"""
import typing as T
import select
from socket import socket, create_server


Data = bytes
Action = T.Union[T.Callable[[socket], None], T.Tuple[T.Callable[[socket], None], str]]
Mask = int


class EventLoop:
    def __init__(self) -> None:
        # 読み書きができるI/Oの準備ができてきるソケットの追跡
        self.writers = {}
        self.readers = {}

    def register_event(self, source: socket, event: Mask, action: Action) -> None:
        # ソケットに関連づけられた一意な識別子を取得
        key = source.fileno()
        if event & select.POLLIN:
            # ソケットからデータを読み込めることを示す
            self.readers[key] = (source, event, action)
        elif event & select.POLLOUT:
            # ソケットにデータを書き込めることを示す
            self.writers[key] = (source, event, action)

    def unregister_event(self, source: socket) -> None:
        """
        クライアントが接続を終了したら、リーダ・ライターからソケットを削除
        """
        key = source.fileno()
        if self.readers.get(key):
            del self.readers[key]
        if self.writers.get(key):
            del self.writers[key]

    def run_forever(self) -> None:
        while True:
            readers, writers, _ = select.select(self.readers, self.writers, [])
            # 読み取り可能な状態のソケットごとに対応するアクションを実行した後、リーダからソケットを削除
            for reader in readers:
                source, event, action = self.readers.pop(reader)
                action(source)
            # 書き込み可能な状態のソケットごとに対応するアクションを実行し後、ライターからソケットを削除
            for writer in writers:
                source, event, action = self.writers.pop(writer)
                action, msg = action
                action(source, msg)


BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Server:
    def __init__(self, event_loop: EventLoop) -> None:
        self.event_loop = event_loop
        try:
            print(f"Starting up at : {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("¥n Server stopped ")

    def _on_accept(self, _: socket) -> None:
        try:
            conn, client_address = self.server_socket.accept()
        except BlockingIOError:
            return
        conn.setblocking(False)
        print(f"Connected to {client_address}")
        self.event_loop.register_event(conn, select.POLLIN, self._on_read)
        self.event_loop.register_event(self.server_socket, select.POLLIN, self._on_accept)

    def _on_read(self, conn: socket) -> None:
        try:
            data = conn.recv(BUFFER_SIZE)
        except BlockingIOError:
            return
        if not data:
            self.event_loop.unregister_event(conn)
            print(f"Connection with {conn.getpeername()} has been closed")
            conn.close()
            return
        message = data.decode().strip()
        self.event_loop.register_event(conn, select.POLLOUT, (self._on_write, message))

    def _on_write(self, conn: socket, message: bytes) -> None:
        try:
            order = int(message)
            response = f"Thank you for ordering {order} pizzas!¥n"
        except ValueError:
            response = "Wrong number of pizzas, please try again¥n"
        print(f"Sending message to {conn.getpeername()}")
        try:
            conn.send(response.encode())
        except BlockingIOError:
            return
        self.event_loop.register_event(conn, select.POLLIN, self._on_read)

    def start(self) -> None:
        print("Server listening for incoming connections")
        self.event_loop.register_event(self.server_socket, select.POLLIN, self._on_accept)


if __name__ == "__main__":
    event_loop = EventLoop()
    Server(event_loop=event_loop).start()
    event_loop.run_forever()

このプログラムは、イベント駆動型の非同期ピザ注文サーバーを実装したPythonコードです。I/O多重化(selectモジュール)を使用して、シングルスレッドで複数クライアントの接続を効率的に処理します。

主要コンポーネントの構造
1. EventLoopクラス(イベントループの核)

class EventLoop:
    def __init__(self)
    def register_event(self, source, event, action)
    def unregister_event(self, source)
    def run_forever(self)

役割: ソケットの監視とイベントディスパッチ

readers: 読み込み可能なソケットを追跡
writers: 書き込み可能なソケットを追跡
select.select()でI/Oの準備状態を監視

  1. Serverクラス(サーバー機能)
class Server:
    def __init__(self, event_loop)
    def _on_accept(self, _)
    def _on_read(self, conn)
    def _on_write(self, conn, message)
    def start(self)

主要メソッド:

_on_accept: 新規接続の受付
_on_read: クライアントからのデータ受信
_on_write: クライアントへのレスポンス送信

動作フロー
サーバー起動

event_loop = EventLoop()
Server(event_loop=event_loop).start()
event_loop.run_forever()

接続受付処理

A[クライアント接続] --> B[selectが読み込み可能検知]
B --> C[_on_accept呼び出し]
C --> D[accept()で接続確立]
D --> E[新しいソケットをreadersに登録]

データ受信処理

A[データ到着] --> B[selectが読み込み可能検知]
B --> C[_on_read呼び出し]
C --> D[recv()でデータ受信]
D --> E[writersに登録]

レスポンス送信処理

A[送信可能状態] --> B[selectが書き込み可能検知]
B --> C[_on_write呼び出し]
C --> D[send()でレスポンス送信]
D --> E[readersに再登録]

主な特徴
非同期I/O処理

1スレッドで複数接続を同時処理
ブロッキング操作を完全に排除
イベント駆動アーキテクチャ
コールバック関数を使用したイベント処理
状態管理を明示的に行わない
効率的なリソース管理
ソケットの登録/解除を動的に管理
必要時のみI/O操作を実行

注意点と改善提案
既知の制約
selectモジュールのファイルディスクリプタ数制限(通常1024)

大文字の¥n使用(正しくは\n)

エラーハンドリングの不足(接続リセット等)

使用例

サーバー起動

$ python pizza_reactor.py
Starting up at : ('127.0.0.1', 12345)
Server listening for incoming connections

クライアント接続例(telnet使用)

$ telnet 127.0.0.1 12345
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
3
Thank you for ordering 3 pizzas!

$nc 127.0.0.1 12345
```

この設計パターンは、現代のWebサーバーやメッセージブローカーで広く使用されるReactorパターンの基本的な実装例です。実際のプロダクション環境では、asyncioやuvloopなどのより高度な実装を使用することが推奨されます。