KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

処理をThreadで並列化する処理をクラスにまとめた

登録日 :2023/11/19 07:46
カテゴリ :Python基礎

pythonのスレッドで並列処理するためのロジックをクラスにまとめてみました。

import queue
import threading
import logging
import time

logging.basicConfig(
    level=logging.INFO, format='%(threadName)s: %(message)s')


def create_log_file(filename, num):
    with open(filename, 'w') as f:
        for i in range(num):
            f.write(f'{i} Hello\n')


def read_log_to_queue(filename) -> queue:
    logfile = queue.Queue()
    with open(filename, 'r') as f:
        rows = f.read().split('\n')

    for row in rows:
        # print(row)
        logfile.put(row)

    return logfile


class LogChecker(object):
    def __init__(self, logs, num_of_thread):
        self.queue = logs
        self.num_of_thread = num_of_thread

    def run_read_log(self):
        ts = []
        for _ in range(self.num_of_thread):
            t = threading.Thread(target=self.worker)
            t.start()
            ts.append(t)
        [self.queue.put(None) for _ in range(len(ts))]
        [t.join() for t in ts]

    def worker(self):
        logging.debug('start')
        while True:
            item = self.queue.get()
            if item is None:
                break
            logging.info(self.some_process(item))
            self.queue.task_done()
        logging.info('end')

    @staticmethod
    def some_process(item):
        time.sleep(0.4)
        if item != '':
            return item + ' : some process is done'


if __name__ == '__main__':
    logging.debug('start')
    NUM_OF_THREAD = 10
    file_name = 'sample.log'
    # create_log_file(file_name, 100)

    start = time.time()
    queue = read_log_to_queue(file_name)
    log_checker = LogChecker(queue, NUM_OF_THREAD)
    log_checker.run_read_log()

    end = time.time()
    logging.debug('end')
    print('thread time: {: 4f}\n'.format(end - start))