KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

Threadを使って、端末やホストの通信チェック、コマンドの実行などを行う

登録日 :2024/10/28 04:39
カテゴリ :Python基礎

Threadをつかって、SSHやpingを使った通信チェック、Linuxのコマンドを実行するなどを行うことができるライブラリプログラムを作成した。

#!/usr/bin/python3

from abc import ABC, abstractmethod
import subprocess
from subprocess import PIPE
import queue
import threading
import logging
import time
import datetime
import os
import sys
import gc
import socket

dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)

dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dir_path)
#print(dir_path)

import lib_shellcommand as shellcommand
from utils_common import FetchHomeDir
#from tools.shellcommand import ShellCommand
#from tools.utils import FetchHomeDir


"""
created by Tagawa.
submit shell command
method Thread
version 2024.10.27
"""

#logging.basicConfig(
#        level=logging.INFO,
#        format='%(threadName)s: %(message)s')

handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s:%(name)s:%(threadName)s:%(levelname)s:%(message)s')
handler.setFormatter(formatter)

logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)
logger.addHandler(handler)


logger.debug({'add path': dir_path})


class IThreadWorker(ABC):
    def __init__(self, dt_now, queue, num_of_thread, timeout):
        self.dt_now = dt_now
        self.queue = queue
        self.num_of_thread = num_of_thread
        self.timeout = timeout
        self.command = None

    def run(self):
        ts = []
        for _ in range(self.num_of_thread):
            t = threading.Thread(target=self.worker)
            t.start()
            ts.append(t)
        [self.queue.put(None) for _ in range(len(ts))]
        [t.join() for t in ts]

    @abstractmethod
    def worker(self):
        logging.debug('start')
        while True:
            item = self.queue.get()
            if item is None:
                break
            print({'thread': item})
            self.some_process()
            self.queue.task_done()
        logging.debug('end')

    def some_process(self):
        pass


class ThreadExcecuteCommand(IThreadWorker):
    def __init__(self, _dt_now, _queue, _num_of_thread, _timeout, _command):
        super().__init__(_dt_now, _queue, _num_of_thread, _timeout)
        self.command = _command
        self.result = []

    def worker(self):
        logging.debug('start')
        while True:
            path_dir = self.queue.get()
            if path_dir is None:
                break
            self.check_home_dir(path_dir)
            self.queue.task_done()
        logging.debug('end')

    def check_home_dir(self, path_dir):
        try:
            _command = self.command + path_dir
            _shell = shellcommand.ShellCommand(
                    self.dt_now, self.timeout, _command)
            _shell.execute_command()
            logger.info({
                'date': self.dt_now,
                'status': 'success',
                'path': path_dir,
                'command': self.command,
                'result': _shell._command_result.split('\n')[0],
                })

        except Exception as e:
            print({'command Error': str(e)})
            logger.error({
                'time': self.dt_now,
                'status': 'failed',
                'action': 'ThreadHomeDirChecker',
                'message': str(e),
                'path': path_dir})


class ThreadConnectionChecker(IThreadWorker):
    def __init__(self, dt_now, queue, num_of_thread, timeout):
        super().__init__(dt_now, queue, num_of_thread, timeout)
        self._error_message = 'No route to host'
        self.failed_list = []

    def worker(self):
        logging.debug('thread worker start')
        while True:
            hostname = self.queue.get()
            if hostname is None:
                break
            self.check_server_by_socket(hostname)
            self.queue.task_done()
        logging.debug('thread worker end')

    def check_server_by_socket(self, server):
        #print('check server by socket')
        logger.debug('check server by socket')
        server_address = (server, 22)
        try:
            with socket.create_connection(server_address, timeout=self.timeout) as sock:
                #print(f"Server is UP   -> {server}")
                logger.debug(f"Server is UP   -> {server}")
        except (socket.timeout, ConnectionRefusedError) as e:
            #print(f"Server is DOWN -> {server} :Timeout Error")
            logger.debug(f"Server is DOWN -> {server} :Timeout Error")
            self.failed_list.append(server)
            logger.error({
                'status': 'failed timeout',
                'action': 'ThreadConnectionChecker timeout',
                'message': str(e),
                'hostname': server})
        except OSError as e:
            #print(f"Server is DOWN -> {server} :OSError")
            logger.debug(f"Server is DOWN -> {server} :OSError")
            if self._error_message in str(e):
                self.failed_list.append(server)
            logger.error({
                'status': 'failed',
                'action': 'ThreadConnectionChecker OSError',
                'message': str(e),
                'hostname': server})
        except Exception as e:
            #print(f"Server is DOWN -> {server} :Other Error")
            logger.debug(f"Server is DOWN -> {server} :Other Error")
            if self._error_message in str(e):
                self.failed_list.append(server)
            logger.error({
                'status': 'failed',
                'action': 'ThreadConnectionChecker Other',
                'message': str(e),
                'hostname': server})


class ThreadPingChecker(IThreadWorker):
    def __init__(self, dt_now, queue, num_of_thread, timeout):
        super().__init__(dt_now, queue, num_of_thread, timeout)
        self._error_message = 'No route to host'
        self.success_list = []
        self.failed_list = []

    def worker(self):
        logging.debug('thread worker start')
        while True:
            hostname = self.queue.get()
            if hostname is None:
                break
            self.check_server_by_ping(hostname)
            self.queue.task_done()
        logging.debug('thread worker end')

    def check_server_by_ping(self, server):
        logger.debug('*** check server by ping ***')
        """Check if a server is up by pinging it."""
        result = subprocess.run(
            ["ping", "-c", "1", server],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)

        if result.returncode == 0:
            self.success_list.append(server)
            logger.info(f"Server {server} is up.")
        else:
            self.failed_list.append(server)
            logger.info(f"Server {server} is down.")


"""
test code
"""
def test_import_fetch_homedir():

    timeout = 10
    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')

    home = '/home'
    check_home_dir = FetchHomeDir(dt_now, timeout, home)
    check_home_dir.run_command()

    print({'result': check_home_dir.homedirs})


def test_main_thread(dt_now, timeout, threads):

    home = '/home'
    script = 'sleep 3; ls -l '

    fetch_home_dir = FetchHomeDir(dt_now, timeout, home)
    homedirs_queue = queue.Queue()

    start = time.time()
    logger.info({'backup start date': dt_now})

    # setting queue
    fetch_home_dir.run_command()
    for homedir in fetch_home_dir.homedirs:
        homedirs_queue.put(homedir)

    # start thead workers
    thread_workers = ThreadExcecuteCommand(
            dt_now, homedirs_queue, threads, timeout, script)
    thread_workers.run()

    end = time.time()
    logger.info({'thread action': 'end'})
    print('thread time: {: 4f}\n'.format(end - start))

    logger.info({
        'time': dt_now,
        'action': 'threads',
        'status': 'finished',
        'elapsed time': 'time: {: 4f}'.format(end - start)})

    del fetch_home_dir,thread_workers,homedirs_queue


def test_ThreadConnectionChecker(dt_now, timeout, num_of_thread):

    targets = ['compute01', 'node1', 'node2', 'node3']
    # check nodes and setting queue
    targets_queue = queue.Queue()
    start = time.time()

    for target in targets:
        targets_queue.put(target)

    # start thread workers and collect failed nodes.
    thread_connection_checker = ThreadConnectionChecker(
            dt_now, targets_queue, num_of_thread, timeout)
    thread_connection_checker.run()

    end = time.time()

    logger.info({
        'test': 'test_ThreadConnectionChecker',
        'status': 'success',
        'failed list': thread_connection_checker.failed_list,
        'elapsed time': '{: 4f}'.format(end - start)})

    del thread_connection_checker


def test_ThreadPingChecker(dt_now, timeout, num_of_thread):
    targets = ['headnode','compute01', 'node1', 'node2', 'node3']
    # check nodes and setting queue
    targets_queue = queue.Queue()
    start = time.time()

    for target in targets:
        targets_queue.put(target)

    # start thread workers and collect failed nodes.
    thread_connection_checker = ThreadPingChecker(
            dt_now, targets_queue, num_of_thread, timeout)
    thread_connection_checker.run()

    end = time.time()

    logger.info({
        'test': 'test_ThreadPingChecker',
        'status': 'success',
        'failed list': thread_connection_checker.failed_list,
        'elapsed time': '{: 4f}'.format(end - start)})

    logger.info(
            {'success': thread_connection_checker.success_list})

    del thread_connection_checker


if __name__ == '__main__':

    timeout = 30
    threads = 3
    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')

    #test_main_thread(dt_now, timeout, threads)
    #del test_main_thread

    #test_ThreadConnectionChecker(dt_now, timeout, threads)
    #del test_ThreadConnectionChecker

    test_ThreadPingChecker(dt_now, timeout, threads)
    del test_ThreadPingChecker

    gc.collect()

あなたのプログラムは、Pythonのスレッドとキューを活用して複数ノードへのping疎通を並列で確認する、よく設計されたコードです。
クラス設計やロギング、抽象基底クラスの利用など、しっかりとした構造になっています。
以下に添削ポイントとアドバイスをまとめます。

良い点
抽象基底クラス(ABC)の活用:拡張性の高い設計です。
ロギングの導入:デバッグや運用時に有用です。
スレッドとキューの組み合わせ:Pythonらしい並列処理の実装です。
リストによる成功・失敗ノードの記録:結果の集計が容易です。

添削・改善アドバイス
1. ロギングの使い方
logging.debug() ではなく、logger.debug() を使いましょう(loggerの設定を活かすため)。

print({'thread': item}) も logger.debug() に統一した方が良いです。

  1. スレッドの終了処理
    queue.put(None) をスレッド数分だけ投入するのはOKですが、
    ワーカーがNoneを受け取った後もqueue.task_done()を呼ぶ必要があります(今の実装はOK)。

  2. コードの簡潔化・可読性
    リスト内包表記([self.queue.put(None) for _ in range(len(ts))])は副作用目的なので、
    for文で書いた方がPythonicです。

for _ in range(len(ts)):
    self.queue.put(None)
  1. 抽象メソッドの実装
    IThreadWorker.worker()は抽象メソッドですが、中身があります。
    通常はpassまたはraise NotImplementedError()にして、サブクラスで必ず実装させる形が推奨されます。

  2. サブプロセスのエラー処理
    subprocess.run()で例外が発生する可能性があるため、try-exceptで囲むとより堅牢です。

def check_server_by_ping(self, server):
    try:
        result = subprocess.run(
            ["ping", "-c", "1", server],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=self.timeout
        )
        if result.returncode == 0:
            self.success_list.append(server)
            logger.info(f"Server {server} is up.")
        else:
            self.failed_list.append(server)
            logger.info(f"Server {server} is down.")
    except Exception as e:
        self.failed_list.append(server)
        logger.error(f"Error pinging {server}: {e}")
  1. スレッドセーフなリスト操作
    self.success_listやself.failed_listは複数スレッドから同時にappendされるため、
    厳密にはthreading.Lock()で保護した方が安全です(小規模なら問題になりにくいですが)。
self.lock = threading.Lock()
...
with self.lock:
    self.success_list.append(server)
  1. コードの整理・不要な処理
    del test_ThreadPingCheckerやgc.collect()は通常不要です(Pythonのガベージコレクタに任せてOK)。

sys.path.append()の2回呼び出しは、必要なパスだけに限定しましょう。

  1. その他
    timeoutをsubprocess.run()の引数として使うと、pingのタイムアウトも制御できます。

Windows環境の場合、pingコマンドのオプションが異なるので注意(-nなど)。

添削後の一例(抜粋)

class IThreadWorker(ABC):
    def __init__(self, dt_now, queue, num_of_thread, timeout):
        self.dt_now = dt_now
        self.queue = queue
        self.num_of_thread = num_of_thread
        self.timeout = timeout
        self.command = None

    def run(self):
        ts = []
        for _ in range(self.num_of_thread):
            t = threading.Thread(target=self.worker)
            t.start()
            ts.append(t)
        for _ in range(len(ts)):
            self.queue.put(None)
        for t in ts:
            t.join()

    @abstractmethod
    def worker(self):
        raise NotImplementedError("サブクラスで実装してください")

まとめ
全体としてよくまとまっており、実用的な設計です。
上記の細かな点を修正すれば、さらに堅牢で可読性の高いコードになります。
スレッドセーフやエラー処理を強化することで、プロダクション用途にも耐える品質になります。
もし他にも「こうしたい」「この部分を拡張したい」など要望があれば、ぜひご相談ください!