KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

PythonによるSSHログコレクター

登録日 :2026/03/01 15:32
カテゴリ :Python基礎

SSHにて、多数の機器のログやコンフィグ取得、そのほかコマンドを流すためのプログラム

対象機器リスト
 settings/config.ini
出力結果
 out
メインプログラミング
 ssh_log_collector.py

"""
Description:
    SSH-based log collection tool for network devices and Linux hosts.

    This script connects to one or more targets over SSH (via Paramiko),
    runs a predefined set of commands (Executor), writes the command output
    to timestamped log files, and prints a JSON-formatted execution summary.

    Targets are loaded from a simple CSV-style config file:
        settings/config.ini  (one line per target: "hostname,ipaddr")

    Key components:
      - ParamikoSSHClient: minimal interactive-shell SSH runner (invoke_shell)
      - ExecutorInterface: defines a command set per task (e.g., file list, pwd)
      - FetchLogFactory: orchestrates execution across all targets and builds a summary

    Usage:
        python <script_name>.py <target> [--debug|--info]

        target:
            1 = FetchFileListExecutor  (e.g., "ls -l /home --color=never", "df -h")
            2 = FetchPwdExecutor       (e.g., "pwd")

    Output:
        - Per-target log file under ./out/
        - JSON summary printed to stdout/log

Author:
    Nobuyuki Tagawa

Version:
    1.0.0  (2026-02-28)
"""
from abc import ABC, abstractmethod
import paramiko
import subprocess
import datetime
import time
import os
import sys
import socket
import logging
import json
from typing import List, Type
import gc


# -----------------------------
#Config
# -----------------------------
class Config(object):
    USERNAME = "root"
    PASSWORD = "rootroot"
    PORT = 22

    CONFIG_FILE = "config.ini"
    SETTINGS_DIR = "settings"
    OUTPUT_DIR = "out"
    # LEVEL = logging.DEBUG
    LEVEL = logging.INFO


# -----------------------------
# Logger
# -----------------------------
def setup_logger(name, level=logging.INFO):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    if not logger.handlers:
        h = logging.StreamHandler()
        fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
        h.setFormatter(fmt)
        logger.addHandler(h)
    return logger


# -----------------------------
# SSH Client Interface
# -----------------------------
class SSHClientInterface(ABC):
    @abstractmethod
    def execute_command(self) -> str:
        pass


# -----------------------------
# Paramiko SSH Client Implementation
# -----------------------------
class ParamikoSSHClient(SSHClientInterface):
    def __init__(self,
                 ip: str,
                 username: str,
                 password: str,
                 port: int = 22,
                 commands:  List[str] = None,
                 level=logging.INFO):
        self.ip = ip
        self.username = username
        self.password = password
        self.port = port
        self.commands = commands
        self.logger = setup_logger("ParamikoSSHClient", level)

    @staticmethod
    def _recv_all(chan: paramiko.Channel, max_wait_s: float = 1.0) -> str:
        """
        簡易版: 受信が止まるまで読む(プロンプト判定なし)
        - max_wait_s: 最後の受信からこの秒数何も来なければ終了
        """
        data = bytearray()
        last_rx = time.time()

        while True:
            if chan.recv_ready():
                chunk = chan.recv(65535)
                if not chunk:
                    break
                data.extend(chunk)
                last_rx = time.time()
            else:
                if (time.time() - last_rx) >= max_wait_s:
                    break
                time.sleep(0.05)

        return data.decode("utf-8", errors="replace")

    def execute_command(self) -> str:
        self.logger.debug("execute command: %s", self.commands)

        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        try:
            client.connect(
                self.ip,
                username=self.username,
                password=self.password,
                port=self.port,
                timeout=30,
                banner_timeout=30,
                auth_timeout=30,
                look_for_keys=False,
                allow_agent=False,
            )

            chan = client.invoke_shell()
            time.sleep(0.2)

            # 接続直後のバナー/プロンプト等を読み捨て(必要なら残してもOK)
            _ = self._recv_all(chan, max_wait_s=0.5)
            out_parts = []
            if self.commands:
                for cmd in self.commands:
                    if not cmd.endswith("\n"):
                        cmd += "\n"
                    chan.send(cmd)
                    # コマンド出力を収集(コマンドにより待ち時間は変わるので少し長めでもOK)
                    out_parts.append(self._recv_all(chan, max_wait_s=1.2))
            return "".join(out_parts)

        except (paramiko.SSHException, socket.gaierror, TimeoutError) as e:
            return f"[ERROR] {self.ip} : {e}"

        finally:
            try:
                client.close()
            except Exception as e:
                self.logger.debug("Error during close(): %s", e)


# -----------------------------
# Executor
# -----------------------------
class ExecutorInterface(ABC):
    def __init__(self,
                 ip: str,
                 username: str,
                 password: str,
                 port: int = 22,
                 filename: str = 'hostname',
                 out_dir: str = None,
                 level = logging.INFO,
                 ):
        self.commands = self.build_command()
        self.filename = filename
        self.out_filename = self.set_out_filename(self.filename, out_dir)
        self.logger = setup_logger(self.name, level=level)
        self.results = None
        self.ssh_client = ParamikoSSHClient(
            ip = ip,
            username = username,
            password = password,
            port = port,
            commands=self.commands,
            level=level,
        )

    @staticmethod
    @abstractmethod
    def build_command() -> List[str]:
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        pass

    @staticmethod
    def set_out_filename(filename: str, out_dir) -> str:
        now_date = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        filename = f"{filename}_{now_date}.txt"
        if out_dir is None:
            return filename
        else:
            filepath = os.path.join(out_dir, filename)
            return filepath

    def write_log(self):
        """
        output command results
        :return:
        """
        if self.results is None:
            self.run()

        self.logger.info(f'write to {self.out_filename}')
        with open(self.out_filename, mode="w") as f:
            for text in self.results:
                text = str(text).lstrip("b'")
                text = str(text).lstrip("'")
                f.write(text + "\n")
                self.logger.debug(text)
        self.logger.info('end to write logs')

    def run(self) -> None:
        self.results = self.ssh_client.execute_command().split("\n")


# -----------------------------
# Concrete Executor.
# -----------------------------
class FetchFileListExecutor(ExecutorInterface):
    """
    2026.03.01 sample code for Linux command.
    show /home file list.
    show nfs volume.
    """
    @staticmethod
    def build_command() -> List[str]:
        return [
            "ls -l /home --color=never",
            "df -h"
        ]

    @property
    def name(self) -> str:
        return "FetchFileListExecutor"


class FetchPwdExecutor(ExecutorInterface):
    """
    2026.03.01 sample code for Linux command.
    show current directory.
    """
    @staticmethod
    def build_command() -> List[str]:
        return [
            "pwd",
        ]

    @property
    def name(self) -> str:
        return "FetchPwdExecutor"


# -----------------------------
# Utils
# -----------------------------
class SwitchListDataset(object):
    """
    fetch ip address and hostname list from config life.
    """
    def __init__(self, config_file :str):
        self.config_file = config_file
        self.hostname_list = []
        self.ipaddr_list = []
        self.import_config()

    def import_config(self):
        with open(self.config_file, mode="r") as f:
            items = f.read().splitlines()

        for item in items:
            hostname, ipaddr = item.split(",")
            self.hostname_list.append(hostname)
            self.ipaddr_list.append(ipaddr)

    def __str__(self):
        for cnt in range(len(self.hostname_list)):
            print(f' --- {cnt} ---')
            print(f'hostname : {self.hostname_list[cnt]}')
            print(f'ipaddr   : {self.ipaddr_list[cnt]}')
        return 'end of SwitchConfigList'


# -----------------------------
# Orchestrator
# -----------------------------
class FetchLogFactory(object):
    def __init__(self,
                 dataset_cls: Type[SwitchListDataset],
                 executor_cls: Type[ExecutorInterface]):
        self.dataset_cls = dataset_cls
        self.executor_cls = executor_cls
        self.username = Config.USERNAME
        self.password = Config.PASSWORD
        self.port = Config.PORT
        self.level = Config.LEVEL
        self.logger = setup_logger(self.name, level=self.level)
        self.config_file = None
        self.output_dir = None
        self.create_file_path()
        self.summary = []

    def create_file_path(self):
        cur_dir = os.getcwd()
        self.config_file = os.path.join(cur_dir, Config.SETTINGS_DIR, Config.CONFIG_FILE)
        self. output_dir = os.path.join(cur_dir, Config.OUTPUT_DIR)
        self.logger.debug(f"config file: {self.config_file}")
        self.logger.debug(f"output_dir: {self.output_dir}")

    @property
    def name(self) -> str:
        """
        The method for generating the summary can be freely customized by overriding it.
        :return: None
        """
        return 'FetchLogFactory'

    def fetch_summary(self, hostname: str, ipaddr: str, command_result: List[str]):
        """
        The method for generating the summary can be freely customized by overriding it.
        :return: None
        """
        result = dict()
        result['HOSTNAME'] = hostname
        result['IPADDR'] = ipaddr
        texts = []
        for text in command_result:
            text = str(text).lstrip("b'")
            text = str(text).lstrip("'")
            self.logger.debug(text)
            texts.append(text)
        result['COMMAND_RESULT'] = texts
        self.summary.append(result)

    def print_summary(self):
        """
        The method for generating the summary can be freely customized by overriding it.
        :return: None
        """
        # for item in self.summary:
        #     print(",".join(str(x) for x in item))
        self.logger.info(json.dumps(self.summary, indent=2, ensure_ascii=False))

    def fetch_log_from_targets(self):
        dataset = self.dataset_cls(self.config_file)
        self.logger.info("Start fetch log from dataset in FetchLogs")
        for cnt in range(len(dataset.ipaddr_list)):
            self.logger.info(f' --- {cnt} ---')
            self.logger.info(f'hostname : {dataset.hostname_list[cnt]}')
            self.logger.info(f'ipaddr   : {dataset.ipaddr_list[cnt]}')
            executor_cls = self.executor_cls(
                ip=dataset.ipaddr_list[cnt],
                username=self.username, password=self.password, port=self.port,
                filename=dataset.hostname_list[cnt],out_dir=self.output_dir,
                level=self.level)
            executor_cls.run()
            executor_cls.write_log()
            self.fetch_summary(hostname=dataset.hostname_list[cnt],
                               ipaddr=dataset.ipaddr_list[cnt],
                               command_result=executor_cls.results)

    def run(self):
        self.fetch_log_from_targets()
        self.logger.info("Finish fetch log from targets in FetchLogs")
        self.print_summary()


# -----------------------------
# CLI / main
# -----------------------------
def parse_args(argv):
    today = datetime.datetime.now().strftime("%Y-%m-%d")
    log_level = logging.WARNING
    debug = False
    info = False

    print(f"[INFO] {today}")
    if len(argv) == 0:
        print("Please input option (number, log level)")
        exit(1)

    target = argv[0]
    i = 1
    while i < len(argv):
        a = argv[i]
        if a == "--debug":
            log_level = logging.DEBUG
            debug = True
            i += 1
            continue
        if a == "--info":
            log_level = logging.INFO
            info = True
            i += 1
            continue
        i += 1
    return int(target), log_level, debug, info


def main_cls(factory_cls: Type[FetchLogFactory], executor_cls: Type[ExecutorInterface]):
    dataset_cls = SwitchListDataset
    executor_factory = factory_cls(dataset_cls, executor_cls)
    executor_factory.run()


def main(argv=None):
    target, log_level, debug, info = parse_args(argv)
    factory = FetchLogFactory
    executor = None
    if target == 1:
        executor = FetchFileListExecutor
    if target == 2:
        executor = FetchPwdExecutor

    if executor is None:
        exit(1)
    else:
        main_cls(factory, executor)


if __name__ == '__main__':
    # print(sys.argv)
    main(sys.argv[1:])

    gc.collect()