KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

並列処理プログラミングのメモ#1(Fork/Join)

登録日 :2025/05/01 08:22
カテゴリ :Python基礎

ForkとJoinについて
並行処理アプリケーションを作成するためのプログラミングパターンとしてよく知られるFork/Joinパターン。

アプリケーションには、逐次処理部分と並行処理部分が含まれることが多い。
逐次処理部分・・・独立しておらず、特定の順序で実行しなければならない部分
並行処理部分・・・順不同で、場合によっては並列に処理できる部分

大量のデータを処理する場合、各データは独立していて、それぞれ同じ処理を並列に処理することが可能であると仮定する。(例えば、選挙の投票用紙を一斉に開票してカウントする作業をイメージしてもらう)

処理の順序としては以下のように考える
・Forkでまず並列処理をするグループ(チャンク)に分割する:Poolにする
・Forkで分割したデータグループ(Pool)ごとに並行してそれぞれ独立に処理を実行する。
・最後にJoin処理で、各チャンクごとに並行処理した結果を、集計してまとめる。

ForkとJoinについて理解するためのサンプルプログラム

"""
chapter7 count_votes_concurrent.py
"""
from typing import Mapping, List
import random
import time
from multiprocessing.pool import ThreadPool


Summary = Mapping[int, int]


def process_pile(_pile: List[int]) -> Summary:
    summary = {}
    for vote in _pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary


def process_votes_concurrent(_pile: List[int], _worker_count: int = 4) -> Summary:
    vote_count = len(pile)
    # Step1: Fork
    vpw = vote_count // _worker_count
    vote_piles = [
        pile[i * vpw:(i+1) * vpw] for i in range(_worker_count)]
    # check chunk-----------
    # print(len(vote_piles))
    # for item in vote_piles:
    #     print(len(item))

    with ThreadPool(_worker_count) as pool:
        worker_summaries = pool.map(process_pile, vote_piles)

    # Step2: Join
    total_summary = {}
    for worker_summary in worker_summaries:
        # print(f"Votes from staff member: {worker_summary}")
        for candidate, count in worker_summary.items():
            if candidate in total_summary:
                total_summary[candidate] += count
            else:
                total_summary[candidate] = count

    return total_summary


if __name__ == '__main__':
    num_candidates = 3
    num_voters = 1000000
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]

    print("start sequential")
    start_time = time.perf_counter()
    counts = process_pile(pile)
    process_time = time.perf_counter() - start_time
    print(f"Total number of votes:{counts}")
    print(f"PROCESS TIME: {process_time}")

    print("start concurrent")
    start_time = time.perf_counter()
    counts = process_votes_concurrent(pile, 100)
    process_time = time.perf_counter() - start_time
    print(f"Total number of votes:{counts}")
    print(f"PROCESS TIME: {process_time}")