並列処理プログラミングのメモ#5 (Server通信)
| 登録日 | :2025/05/10 10:14 |
|---|---|
| カテゴリ | :Python基礎 |
ピザ注文を受け付けるサーバのサンプルを通して、通信による処理の並列化を考える。
- Socket通信によってクライアントからの応答をまつ処理(並列化なし)
バッファサイズ(1024)、ポート番号12345を使ってクライアントからの応答をListenする
"""
Chapter 10 pizza_server.py
"""
from socket import socket, create_server
BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)
class Server:
def __init__(self) -> None:
try:
print(f"Starting up at: {ADDRESS}")
self.server_socket: socket = create_server(ADDRESS)
except OSError:
self.server_socket.close()
print("¥nServer stopped.")
def accept(self) -> socket:
conn, client_address = self.server_socket.accept()
print(f"Connected to {client_address}")
return conn
def serve(self, conn: socket) -> None:
try:
while True:
data = conn.recv(BUFFER_SIZE)
if not data:
break
try:
order = int(data.decode())
response = f"Thank you for ordering {order} pizzas!¥n"
except ValueError:
response = f"Wrong number of pizzas, please try agen¥n"
print(f"Sending message to {conn.getpeername()}")
conn.send(response.encode())
finally:
print(f"Connection with {conn.getpeername()} has been closed")
conn.close()
def start(self) -> None:
print("Start listening for incoming connections")
try:
while True:
conn = self.accept()
self.serve(conn)
finally:
self.server_socket.close()
print("¥nServer stopped")
if __name__ == '__main__':
server = Server()
server.start()
サンプルプログラムの欠点は、1クライアントがTCP通信を開放されるまで、別のクライアントはサーバに通信できず待たされること。多くのクライアントからのリクエストに答えることが難しい。(ピザ注文を失注してしまうだろう)
- Threadによるサーバ処理の並列化
改善策の一つとしては、リクエストごとにThreadを立てて並列化する方法がある
"""
Chapter 10 /threaded_pizza_server.py
"""
from socket import socket, create_server
from threading import Thread
BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)
class Handler(Thread):
def __init__(self, conn: socket):
super(Handler, self).__init__()
self.conn = conn
def run(self) -> None:
print(f"Connected to {self.conn.getpeername()}")
try:
while True:
data = self.conn.recv(BUFFER_SIZE)
if not data:
break
try:
order = int(data.decode())
response = f"Thank you for ordering {order} pizzas!¥n"
except ValueError:
response = "Wrong number of pizzas, please try agen¥n"
print(f"Sending message to {self.conn.getpeername()}")
self.conn.send(response.encode())
finally:
print(f"Connection with {self.conn.getpeername()} has been closed")
self.conn.close()
class Server:
def __init__(self) -> None:
try:
print(f"Starting up at : {ADDRESS}")
self.server_socket = create_server(ADDRESS)
except OSError:
self.server_socket.close()
print("¥n Server stopped.")
def start(self) -> None:
print("Server listening for incoming connections")
try:
while True:
conn, address = self.server_socket.accept()
print(f"Client connection request from {address}")
thread = Handler(conn)
thread.start()
finally:
self.server_socket.close()
print("¥n Server stopped.")
if __name__ == '__main__':
server = Server()
server.start()
これによって、複数のクライアントからリクエスト(注文)が同時にあっても、Server側で別のThreadを立てることで並行して対応することができる。
Threadを立てることによる注意点としては、共有リソースへのアクセスをする場合(例えば注文内容をデータベースに入出力するなど)は、適切に共有ロックをかけるなどの調整が必要となる。
また、Threadは起動するのにサーバのCPUへの負荷がかかることやメモリを消費するなど、少なくない負荷がサーバにかかるため、多数のリクエストが発生した時に、サーバがオーバフローするなどのリスクがあるため、Threadの起動数も注意しなければならない。
- ノンブロッキングI/O
Threadによる並列化のデメリットである共有リソース管理や、多数のThreadが立ち上がることによるサーバリソースへの負荷への対策として、ノンブロッキングI/Oがある。
これは、通信やI/O処理に時間がかかる場合などに、CPUやリソースに空きができることを利用して別の処理を実行することで、効率的に処理をする考え方である。
(pythonのAsyncioもこれの考え方に近いと思われる)
Threadとの違いとして、ノンブロッキングI/Oの場合はThread一つであり、一つの処理の中でCPUの空きがある時に別の処理をするようにスケジュールする。
そのアプローチの一つとして、busy-waitingがある
"""
Chapter 10 pizza_server_busy_wait.py
"""
import typing as T
from socket import socket, create_server
BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)
class Server:
clients: T.Set[socket] = set()
def __init__(self) -> None:
try:
print(f"Starting up at: {ADDRESS}")
self.server_socket = create_server(ADDRESS)
# non blocking mode: busy wait
self.server_socket.setblocking(False)
except OSError:
self.server_socket.close()
print("Server Stopped.")
def accept(self) -> None:
try:
conn, address = self.server_socket.accept()
print(f"Connected to {address}")
conn.setblocking(False)
self.clients.add(conn)
except BlockingIOError:
"""
If there is no readable socket data, fetch non blocking error.
This is for avoiding blocking, and continuing the server process that recieves the client message.
"""
pass
def serve(self, conn: socket) -> None:
try:
while True:
data = conn.recv(BUFFER_SIZE)
if not data:
break
try:
order = int(data.decode())
response = f"Thank you for ordering {order} pizzas!"
except ValueError:
response = "Wrong number of pizzas, please try again"
print(f"Sending message to {conn.getpeername()}")
conn.send(response.encode())
except BlockingIOError:
pass
def start(self) -> None:
print("Server listening for incoming connections")
try:
while True:
self.accept()
for conn in self.clients.copy():
self.serve(conn)
finally:
self.server_socket.close()
print("Server stopped.")
if __name__ == '__main__':
server = Server()
server.start()
busy-waitingアプローチにも非効率な点はある。
すべてのソケットをポーリングするため、ソケットが増えてくると無駄なチェックが多くなる可能性があり、それによるCPU消費が高くなる可能性がある。