KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

並列処理プログラミングのメモ#5 (Server通信)

登録日 :2025/05/10 10:14
カテゴリ :Python基礎

ピザ注文を受け付けるサーバのサンプルを通して、通信による処理の並列化を考える。

  • Socket通信によってクライアントからの応答をまつ処理(並列化なし)
    バッファサイズ(1024)、ポート番号12345を使ってクライアントからの応答をListenする
"""
Chapter 10 pizza_server.py
"""
from socket import socket, create_server


BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("¥nServer stopped.")

    def accept(self) -> socket:
        conn, client_address = self.server_socket.accept()
        print(f"Connected to {client_address}")
        return conn

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!¥n"
                except ValueError:
                    response = f"Wrong number of pizzas, please try agen¥n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        finally:
            print(f"Connection with {conn.getpeername()} has been closed")
            conn.close()

    def start(self) -> None:
        print("Start listening for incoming connections")
        try:
            while True:
                conn = self.accept()
                self.serve(conn)
        finally:
            self.server_socket.close()
            print("¥nServer stopped")


if __name__ == '__main__':
    server = Server()
    server.start()

サンプルプログラムの欠点は、1クライアントがTCP通信を開放されるまで、別のクライアントはサーバに通信できず待たされること。多くのクライアントからのリクエストに答えることが難しい。(ピザ注文を失注してしまうだろう)

  • Threadによるサーバ処理の並列化
    改善策の一つとしては、リクエストごとにThreadを立てて並列化する方法がある
"""
Chapter 10 /threaded_pizza_server.py
"""
from socket import socket, create_server
from threading import Thread


BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Handler(Thread):
    def __init__(self, conn: socket):
        super(Handler, self).__init__()
        self.conn = conn

    def run(self) -> None:
        print(f"Connected to {self.conn.getpeername()}")
        try:
            while True:
                data = self.conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!¥n"
                except ValueError:
                    response = "Wrong number of pizzas, please try agen¥n"
                print(f"Sending message to {self.conn.getpeername()}")
                self.conn.send(response.encode())
        finally:
            print(f"Connection with {self.conn.getpeername()} has been closed")
            self.conn.close()


class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at : {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("¥n Server stopped.")

    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                conn, address = self.server_socket.accept()
                print(f"Client connection request from {address}")
                thread = Handler(conn)
                thread.start()
        finally:
            self.server_socket.close()
            print("¥n Server stopped.")


if __name__ == '__main__':
    server = Server()
    server.start()

これによって、複数のクライアントからリクエスト(注文)が同時にあっても、Server側で別のThreadを立てることで並行して対応することができる。
Threadを立てることによる注意点としては、共有リソースへのアクセスをする場合(例えば注文内容をデータベースに入出力するなど)は、適切に共有ロックをかけるなどの調整が必要となる。

また、Threadは起動するのにサーバのCPUへの負荷がかかることやメモリを消費するなど、少なくない負荷がサーバにかかるため、多数のリクエストが発生した時に、サーバがオーバフローするなどのリスクがあるため、Threadの起動数も注意しなければならない。

  • ノンブロッキングI/O

Threadによる並列化のデメリットである共有リソース管理や、多数のThreadが立ち上がることによるサーバリソースへの負荷への対策として、ノンブロッキングI/Oがある。
これは、通信やI/O処理に時間がかかる場合などに、CPUやリソースに空きができることを利用して別の処理を実行することで、効率的に処理をする考え方である。
(pythonのAsyncioもこれの考え方に近いと思われる)

Threadとの違いとして、ノンブロッキングI/Oの場合はThread一つであり、一つの処理の中でCPUの空きがある時に別の処理をするようにスケジュールする。
そのアプローチの一つとして、busy-waitingがある

"""
Chapter 10 pizza_server_busy_wait.py
"""
import typing as T
from socket import socket, create_server


BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)


class Server:
    clients: T.Set[socket] = set()

    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket = create_server(ADDRESS)
            # non blocking mode: busy wait
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("Server Stopped.")

    def accept(self) -> None:
        try:
            conn, address = self.server_socket.accept()
            print(f"Connected to {address}")
            conn.setblocking(False)
            self.clients.add(conn)
        except BlockingIOError:
            """
            If there is no readable socket data, fetch non blocking error.
            This is for avoiding blocking, and continuing the server process that recieves the client message.
            """
            pass

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!"
                except ValueError:
                    response = "Wrong number of pizzas, please try again"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        except BlockingIOError:
            pass

    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                self.accept()
                for conn in self.clients.copy():
                    self.serve(conn)
        finally:
            self.server_socket.close()
            print("Server stopped.")


if __name__ == '__main__':
    server = Server()
    server.start()

busy-waitingアプローチにも非効率な点はある。
すべてのソケットをポーリングするため、ソケットが増えてくると無駄なチェックが多くなる可能性があり、それによるCPU消費が高くなる可能性がある。