処理をThreadで並列化する処理をクラスにまとめた
| 登録日 | :2023/11/19 07:46 |
|---|---|
| カテゴリ | :Python基礎 |
pythonのスレッドで並列処理するためのロジックをクラスにまとめてみました。
import queue
import threading
import logging
import time
logging.basicConfig(
level=logging.INFO, format='%(threadName)s: %(message)s')
def create_log_file(filename, num):
with open(filename, 'w') as f:
for i in range(num):
f.write(f'{i} Hello\n')
def read_log_to_queue(filename) -> queue:
logfile = queue.Queue()
with open(filename, 'r') as f:
rows = f.read().split('\n')
for row in rows:
# print(row)
logfile.put(row)
return logfile
class LogChecker(object):
def __init__(self, logs, num_of_thread):
self.queue = logs
self.num_of_thread = num_of_thread
def run_read_log(self):
ts = []
for _ in range(self.num_of_thread):
t = threading.Thread(target=self.worker)
t.start()
ts.append(t)
[self.queue.put(None) for _ in range(len(ts))]
[t.join() for t in ts]
def worker(self):
logging.debug('start')
while True:
item = self.queue.get()
if item is None:
break
logging.info(self.some_process(item))
self.queue.task_done()
logging.info('end')
@staticmethod
def some_process(item):
time.sleep(0.4)
if item != '':
return item + ' : some process is done'
if __name__ == '__main__':
logging.debug('start')
NUM_OF_THREAD = 10
file_name = 'sample.log'
# create_log_file(file_name, 100)
start = time.time()
queue = read_log_to_queue(file_name)
log_checker = LogChecker(queue, NUM_OF_THREAD)
log_checker.run_read_log()
end = time.time()
logging.debug('end')
print('thread time: {: 4f}\n'.format(end - start))