[Multiprocess版]pythonとbashを用いたLinuxサーバのバックアッププログラム
| 登録日 | :2024/09/22 20:49 |
|---|---|
| カテゴリ | :Python基礎 |
バックアッププログラムについて、Multiprocessを用いたプログラムも作成してみた。
ThreadとMultiprocessとどちらが良いのか。
本番サーバの場合は、スペックとしてマルチコアのため、Multiprocessのほうが良いのではないかと考えたのである。
一方で、テスト環境は1コアのLinuxサーバのため、Threadとの差はないことも確認した。
ちなみに、1コアのマシンでMultiprocessのプログラムを動かした場合もOSの機能やコンテキストスイッチの機能でコアを切り替えて、見た目ではマルチコアで動いていないように見えることもわかった。
※Threadも内部でGLOを取り合って、細かく処理の主導権を渡すことで並列化しているので、Threadと同じような動きになるのだなろうと思う。
Thread版と設定ファイルやbash,logフォルダなどの構成は全く同じである。
参考)Thread版へのリンク
そのため、メインプログラムを記載する。
backup_homedir_mutiprocess.py
#!/usr/bin/python3
from abc import ABC, abstractmethod
import subprocess
from subprocess import PIPE
#import queue
#import threading
import multiprocessing
from multiprocessing import Queue, Process
import logging
import time
import datetime
import signal
import os
import sys
import gc
import socket
import psutil
dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)
from config import settings
"""
created by Me.
submit shell command
method Multiprocess
version 2024.09.22.1
"""
logging.basicConfig(
filename=settings.LOG_FILE,
level=logging.INFO,
format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)
logger.debug({'add path': dir_path})
class ShellCommand(object):
def __init__(self, dt_now, timeout: int, command: str):
self.stdout = False
self.stderr = False
self.returncode = False
self.command = False
self._timeout = timeout
self._command = command
self._dt_now = dt_now
self._command_result = False
self._errlog = False
def submit_command(self, command):
self.command = command
result = subprocess.run(
self.command,
shell=True,
stdout=PIPE,
stderr=PIPE,
timeout=self._timeout)
self.stdout = result.stdout.decode('utf-8')
self.stderr = result.stderr.decode('utf-8')
self.returncode = result.returncode
if result.returncode != 0:
raise Exception(self.stderr)
def execute_command(self):
try:
self.submit_command(self._command)
self._command_result = self.stdout
except Exception as e:
self._command_result = self.stderr
self._errlog = str(e)
logger.error({
'time': self._dt_now,
'status': 'failed',
'action':'ExceuteShellComand',
'error': self._errlog,
'command': self._command})
class FetchHomeDir(object):
def __init__(self, dt_now, timeout, home):
self._dt_now = dt_now
self._timeout = timeout
self._home = home
self._status = None
self._command = 'ls -a ' + home
self.shell = ShellCommand(dt_now, timeout, self._command)
self.homedirs = []
def run_command(self):
self.shell.execute_command()
if not self.shell._errlog and self.shell._command_result != "":
self._status = 'success'
homedirs = self.shell._command_result.split('\n')
for _home in homedirs[2:]:
# skip '.', '..'
if _home != "":
_path = self._home + '/' + _home
#print(_path)
self.homedirs.append(_path)
else:
self._status = 'failed'
logger.error({
'time': self._dt_now,
'status': self._status,
'action': FetchHomeDir,
'command': self._command,
'home': self._home})
if settings.DEBUG:
print(f'{self._status}: FetchHomeDir from {self._home}')
class IProcessWorker(ABC):
def __init__(self, dt_now, queue, num_of_process, timeout):
self.dt_now = dt_now
self.queue = queue
self.num_of_process = num_of_process
self.timeout = timeout
self.command = None
def run(self):
processes = []
for _ in range(self.num_of_process):
p = Process(target=self.worker)
p.start()
processes.append(p)
[self.queue.put(None) for _ in range(len(processes))]
#[p.join(self.timeout) for p in processes]
[p.join() for p in processes]
# kill zombie process
#for p in processes:
# if p.is_alive():
# logger.warning(f"Process {p.pid} did not terminate in time. Terminating...")
# p.terminate()
# p.join(1)
#for p in processes:
# if p.is_alive():
# logger.error(f"Process {p.pid} could not be terminated. Killing...")
# p.kill()
@abstractmethod
def worker(self):
logging.debug('start')
while True:
item = self.queue.get()
if item is None:
break
print({'process': item})
self.some_process()
#self.queue.task_done()
logging.debug('end')
def some_process(self):
pass
class MultiprocessWorker(IProcessWorker):
def __init__(self, dt_now, queue, num_of_thread, timeout, backup_script):
super().__init__(dt_now, queue, num_of_thread, timeout)
#self.command = 'ls -l '
#self.command = backup_script + ' '
self.command = 'sleep 3 || ls -l '
self.result = []
def worker(self):
logging.debug('start')
while True:
path_dir = self.queue.get()
if path_dir is None:
break
self.check_home_dir(path_dir)
#self.queue.task_done()
logging.debug('end')
def check_home_dir(self, path_dir):
try:
_command = self.command + path_dir
_shell = ShellCommand(self.dt_now, self.timeout, _command)
_shell.execute_command()
logger.info({
'date': self.dt_now,
'status': 'success',
'path': path_dir,
'command': self.command,
'result': _shell._command_result.split('\n')[0],
})
except Exception as e:
print({'command Error': str(e)})
logger.error({
'time': self.dt_now,
'status': 'failed',
'action': 'ThreadHomeDirChecker',
'message': str(e),
'path': path_dir})
"""
test code
"""
def test_shell_command():
timeout = settings.TIMEOUT
dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
home = '/home'
check_home_dir = FetchHomeDir(dt_now, timeout, home)
check_home_dir.run_command()
print({'result': check_home_dir.homedirs})
if __name__ == '__main__':
#test_shell_command()
home = settings.HOME_DIR
timeout = settings.TIMEOUT
threads = settings.THREADING_NUM
processes = settings.PROCESSES_NUM
backup_script = settings.BACKUP_SCRIPT
#cpu_num = len(os.sched_getaffinity(0))
cpu_num = len(psutil.Process().cpu_affinity())
#if processes > cpu_num:
# processes = cpu_num
dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
fetch_home_dir = FetchHomeDir(dt_now, timeout, home)
homedirs_queue = Queue()
start = time.time()
logger.info({'backup start date': dt_now})
fetch_home_dir.run_command()
for homedir in fetch_home_dir.homedirs:
homedirs_queue.put(homedir)
multiprocess_worker = MultiprocessWorker(
dt_now, homedirs_queue, processes, timeout, backup_script)
multiprocess_worker.run()
end = time.time()
logger.info({'multiprocess action': 'end'})
print('multiprocess time: {: 4f}\n'.format(end - start))
logger.info({
'time': dt_now,
'action': 'multiprocess',
'processes': processes,
'status': 'finished',
'elapsed time': 'time: {: 4f}'.format(end - start)})
del fetch_home_dir,multiprocess_worker,homedirs_queue
gc.collect()