並列処理プログラミングのメモ#1(Fork/Join)
| 登録日 | :2025/05/01 08:22 |
|---|---|
| カテゴリ | :Python基礎 |
ForkとJoinについて
並行処理アプリケーションを作成するためのプログラミングパターンとしてよく知られるFork/Joinパターン。
アプリケーションには、逐次処理部分と並行処理部分が含まれることが多い。
逐次処理部分・・・独立しておらず、特定の順序で実行しなければならない部分
並行処理部分・・・順不同で、場合によっては並列に処理できる部分
大量のデータを処理する場合、各データは独立していて、それぞれ同じ処理を並列に処理することが可能であると仮定する。(例えば、選挙の投票用紙を一斉に開票してカウントする作業をイメージしてもらう)
処理の順序としては以下のように考える
・Forkでまず並列処理をするグループ(チャンク)に分割する:Poolにする
・Forkで分割したデータグループ(Pool)ごとに並行してそれぞれ独立に処理を実行する。
・最後にJoin処理で、各チャンクごとに並行処理した結果を、集計してまとめる。
ForkとJoinについて理解するためのサンプルプログラム
"""
chapter7 count_votes_concurrent.py
"""
from typing import Mapping, List
import random
import time
from multiprocessing.pool import ThreadPool
Summary = Mapping[int, int]
def process_pile(_pile: List[int]) -> Summary:
summary = {}
for vote in _pile:
if vote in summary:
summary[vote] += 1
else:
summary[vote] = 1
return summary
def process_votes_concurrent(_pile: List[int], _worker_count: int = 4) -> Summary:
vote_count = len(pile)
# Step1: Fork
vpw = vote_count // _worker_count
vote_piles = [
pile[i * vpw:(i+1) * vpw] for i in range(_worker_count)]
# check chunk-----------
# print(len(vote_piles))
# for item in vote_piles:
# print(len(item))
with ThreadPool(_worker_count) as pool:
worker_summaries = pool.map(process_pile, vote_piles)
# Step2: Join
total_summary = {}
for worker_summary in worker_summaries:
# print(f"Votes from staff member: {worker_summary}")
for candidate, count in worker_summary.items():
if candidate in total_summary:
total_summary[candidate] += count
else:
total_summary[candidate] = count
return total_summary
if __name__ == '__main__':
num_candidates = 3
num_voters = 1000000
pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
print("start sequential")
start_time = time.perf_counter()
counts = process_pile(pile)
process_time = time.perf_counter() - start_time
print(f"Total number of votes:{counts}")
print(f"PROCESS TIME: {process_time}")
print("start concurrent")
start_time = time.perf_counter()
counts = process_votes_concurrent(pile, 100)
process_time = time.perf_counter() - start_time
print(f"Total number of votes:{counts}")
print(f"PROCESS TIME: {process_time}")