KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

[Multiprocess版]pythonとbashを用いたLinuxサーバのバックアッププログラム

登録日 :2024/09/22 20:49
カテゴリ :Python基礎

バックアッププログラムについて、Multiprocessを用いたプログラムも作成してみた。
ThreadとMultiprocessとどちらが良いのか。
本番サーバの場合は、スペックとしてマルチコアのため、Multiprocessのほうが良いのではないかと考えたのである。

一方で、テスト環境は1コアのLinuxサーバのため、Threadとの差はないことも確認した。
ちなみに、1コアのマシンでMultiprocessのプログラムを動かした場合もOSの機能やコンテキストスイッチの機能でコアを切り替えて、見た目ではマルチコアで動いていないように見えることもわかった。
※Threadも内部でGLOを取り合って、細かく処理の主導権を渡すことで並列化しているので、Threadと同じような動きになるのだなろうと思う。

Thread版と設定ファイルやbash,logフォルダなどの構成は全く同じである。
参考)Thread版へのリンク

そのため、メインプログラムを記載する。

backup_homedir_mutiprocess.py

#!/usr/bin/python3

from abc import ABC, abstractmethod
import subprocess
from subprocess import PIPE
#import queue
#import threading
import multiprocessing
from multiprocessing import Queue, Process
import logging
import time
import datetime
import signal
import os
import sys
import gc
import socket
import psutil

dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)

from config import settings

"""
created by Me.
submit shell command
method Multiprocess
version 2024.09.22.1
"""

logging.basicConfig(
        filename=settings.LOG_FILE,
        level=logging.INFO,
        format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)

logger.debug({'add path': dir_path})


class ShellCommand(object):
    def __init__(self, dt_now, timeout: int, command: str):
        self.stdout = False
        self.stderr = False
        self.returncode = False
        self.command = False
        self._timeout = timeout
        self._command = command
        self._dt_now = dt_now
        self._command_result = False
        self._errlog = False

    def submit_command(self, command):
        self.command = command
        result = subprocess.run(
                self.command,
                shell=True,
                stdout=PIPE,
                stderr=PIPE,
                timeout=self._timeout)
        self.stdout = result.stdout.decode('utf-8')
        self.stderr = result.stderr.decode('utf-8')
        self.returncode = result.returncode

        if result.returncode != 0:
            raise Exception(self.stderr)

    def execute_command(self):
        try:
            self.submit_command(self._command)
            self._command_result = self.stdout
        except Exception as e:
            self._command_result = self.stderr
            self._errlog = str(e)
            logger.error({
                'time': self._dt_now,
                'status': 'failed',
                'action':'ExceuteShellComand',
                'error': self._errlog,
                'command': self._command})


class FetchHomeDir(object):
    def __init__(self, dt_now, timeout, home):
        self._dt_now = dt_now
        self._timeout = timeout
        self._home = home
        self._status = None
        self._command = 'ls -a ' + home
        self.shell = ShellCommand(dt_now, timeout, self._command)
        self.homedirs = []

    def run_command(self):
        self.shell.execute_command()
        if not self.shell._errlog and self.shell._command_result != "":
            self._status = 'success'
            homedirs = self.shell._command_result.split('\n')
            for _home in homedirs[2:]:
                # skip '.', '..'
                if _home != "":
                    _path = self._home + '/' + _home
                    #print(_path)
                    self.homedirs.append(_path)
        else:
            self._status = 'failed'
            logger.error({
                'time': self._dt_now,
                'status': self._status,
                'action': FetchHomeDir,
                'command': self._command,
                'home': self._home})

        if settings.DEBUG:
            print(f'{self._status}: FetchHomeDir from {self._home}')


class IProcessWorker(ABC):
    def __init__(self, dt_now, queue, num_of_process, timeout):
        self.dt_now = dt_now
        self.queue = queue
        self.num_of_process = num_of_process
        self.timeout = timeout
        self.command = None

    def run(self):
        processes = []
        for _ in range(self.num_of_process):
            p = Process(target=self.worker)
            p.start()
            processes.append(p)
        [self.queue.put(None) for _ in range(len(processes))]
        #[p.join(self.timeout) for p in processes]
        [p.join() for p in processes]

        # kill zombie process
        #for p in processes:
        #    if p.is_alive():
        #        logger.warning(f"Process {p.pid} did not terminate in time. Terminating...")
        #        p.terminate()
        #        p.join(1)

        #for p in processes:
        #    if p.is_alive():
        #        logger.error(f"Process {p.pid} could not be terminated. Killing...")
        #        p.kill()

    @abstractmethod
    def worker(self):
        logging.debug('start')
        while True:
            item = self.queue.get()
            if item is None:
                break
            print({'process': item})
            self.some_process()
            #self.queue.task_done()
        logging.debug('end')

    def some_process(self):
        pass


class MultiprocessWorker(IProcessWorker):
    def __init__(self, dt_now, queue, num_of_thread, timeout, backup_script):
        super().__init__(dt_now, queue, num_of_thread, timeout)
        #self.command = 'ls -l '
        #self.command = backup_script + ' '
        self.command = 'sleep 3 || ls -l '
        self.result = []

    def worker(self):
        logging.debug('start')
        while True:
            path_dir = self.queue.get()
            if path_dir is None:
                break
            self.check_home_dir(path_dir)
            #self.queue.task_done()
        logging.debug('end')

    def check_home_dir(self, path_dir):
        try:
            _command = self.command + path_dir
            _shell = ShellCommand(self.dt_now, self.timeout, _command)
            _shell.execute_command()
            logger.info({
                'date': self.dt_now,
                'status': 'success',
                'path': path_dir,
                'command': self.command,
                'result': _shell._command_result.split('\n')[0],
                })

        except Exception as e:
            print({'command Error': str(e)})
            logger.error({
                'time': self.dt_now,
                'status': 'failed',
                'action': 'ThreadHomeDirChecker',
                'message': str(e),
                'path': path_dir})


"""
test code
"""
def test_shell_command():

    timeout = settings.TIMEOUT
    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')

    home = '/home'
    check_home_dir = FetchHomeDir(dt_now, timeout, home)
    check_home_dir.run_command()

    print({'result': check_home_dir.homedirs})


if __name__ == '__main__':

    #test_shell_command()

    home = settings.HOME_DIR
    timeout = settings.TIMEOUT
    threads = settings.THREADING_NUM
    processes = settings.PROCESSES_NUM
    backup_script = settings.BACKUP_SCRIPT
    #cpu_num = len(os.sched_getaffinity(0))
    cpu_num = len(psutil.Process().cpu_affinity())

    #if processes > cpu_num:
    #    processes = cpu_num

    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
    fetch_home_dir = FetchHomeDir(dt_now, timeout, home)
    homedirs_queue = Queue()
    start = time.time()
    logger.info({'backup start date': dt_now})

    fetch_home_dir.run_command()
    for homedir in fetch_home_dir.homedirs:
        homedirs_queue.put(homedir)


    multiprocess_worker = MultiprocessWorker(
            dt_now, homedirs_queue, processes, timeout, backup_script)
    multiprocess_worker.run()


    end = time.time()

    logger.info({'multiprocess action': 'end'})
    print('multiprocess time: {: 4f}\n'.format(end - start))

    logger.info({
        'time': dt_now,
        'action': 'multiprocess',
        'processes': processes,
        'status': 'finished',
        'elapsed time': 'time: {: 4f}'.format(end - start)})


    del fetch_home_dir,multiprocess_worker,homedirs_queue
    gc.collect()