Threadを使ったSSHログコレクター
| 登録日 | :2026/03/15 08:30 |
|---|---|
| カテゴリ | :Python基礎 |
SSHでサーバやスイッチにログインしてコマンドを実行するプログラム。
構成
out : コマンド実行結果をテストで出力するフォルダ
settings/config.ini : 接続対象のリスト
config.py : 設定ファイル
executor.py : SSHログインとコマンド実行するクラスを定義しているライブラリ
utils.py : 設定ファイルの読み込みや、サマリーを表示する関数を定義しているライブラリ
main.py : Threadを考慮してプログラムを実行する
(使用手順)
python main.py 1 : 設定の確認
python main.py 2 : 逐次実行(Threadなし)
python main.py 2 -t : Threadモードで実行
python main.py 2 -t --info : インフォメーションを表示しながら実行
python main.py 2 -t --debug : デバックモードで実行
新しいコマンドを定義したい場合は、executor.pyで concrete executorを作成し、config.pyで 新しいexecutor_class_listに登録後、これを有効にするため、executor_idxで設定すれば良い。
サマリーの表示はReporterで行っており、サマリーもexecutorクラスの方法と同じようにカスタマイズ可能。
(プログラム)
main.py
from abc import ABC, abstractmethod
import datetime
import os
import sys
import logging
import queue
import threading
import json
from typing import List, Type
from pprint import pformat
import gc
from rpds.rpds import Queue
from config import Config, setup_logger
from utils import SwitchListDataset, IReporterInterface, ReporterSample
from executor import (
ISSHExecutorInterface,
FetchFileListExecutor,
FetchLSDFExecutor,
ServerInfo,
main_single,
)
#-----------------------------
# Thread Worker
#-----------------------------
class IThreadWorkerInterface(ABC):
def __init__(self,
executor: Type[ISSHExecutorInterface],
_queue, workers=1, timeout=Config.TIMEOUT, level=Config.LEVEL):
self.executor = executor
self.queue = _queue
self.workers = workers
self.timeout_s = timeout
self.logger = setup_logger(self.name, level)
self.result_lock = threading.Lock()
self.results = []
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def run(self):
pass
@abstractmethod
def worker(self):
pass
class ThreadWorkers(IThreadWorkerInterface):
"""
QueueからServerInfoを取り出し、SSH Executorを実行するスレッドワーカーの例。
- executor: ISSHExecutorInterfaceを継承したクラスを指定
- _queue: ServerInfoオブジェクトを入れたQueueを指定
- workers: スレッド数
- timeout: SSHコマンドのタイムアウト秒数
- level: ログレベル
- 結果はself.resultsに格納(必要に応じてロックを使用して安全にアクセス)
- 結果の格納方法は必要に応じて変更してください(例: self.results.append((server_info, res))など)
- ログにはスレッド名も含まれるので、どのスレッドがどのサーバーを処理しているかがわかるようになっています。
- 例: ThreadWorkers(executor=FetchLSDFExecutor, _queue=q, workers=5, timeout=10, level=logging.DEBUG).run()
- 例: ThreadWorkers(executor=FetchFileListExecutor, _queue=q, workers=5, timeout=10, level=logging.DEBUG).run()
"""
@property
def name(self) -> str:
return "ThreadWorkers"
def run(self):
ts = []
for _ in range(self.workers):
t = threading.Thread(target=self.worker)
t.start()
ts.append(t)
[self.queue.put(None) for _ in range(len(ts))]
[t.join() for t in ts]
def worker(self):
self.logger.debug('workers start')
while True:
item = self.queue.get()
if item is None:
break
self.logger.info({'thread': (item.hostname, item.ipaddr)})
executor = self.executor(server_info=item, timeout=self.timeout_s, level=self.logger.level)
res = executor.execute_command()
messages = {item.hostname: res}
self.logger.info(f"done: {messages}")
# self.logger.info(f"done:\n{pformat(messages, indent=2)}")
with self.result_lock:
self.results.append({item.hostname: res})
self.logger.debug(type(res))
if type(res) == list:
self.logger.debug(json.dumps(res, indent=2, ensure_ascii=False))
else:
self.logger.debug(res)
self.queue.task_done()
self.logger.debug('workers end')
#-----------------------------
# Set Queue
#-----------------------------
def set_queue(_targets: List[dict]):
q = queue.Queue()
for t in _targets:
username = t.get("username", Config.USERNAME)
if username == "" or username is None:
username = Config.USERNAME
password = t.get("password", Config.PASSWORD)
if password == "" or password is None:
password = Config.PASSWORD
server_info = ServerInfo(
ipaddr=t.get("ipaddr", None),
hostname=t.get("hostname", None),
username=username,
password=password,)
q.put(server_info)
return q
#-----------------------------
# Main
#-----------------------------
def main_threads(_q: Queue,
workers=1,
executor_cls: Type[ISSHExecutorInterface] = FetchFileListExecutor,
reporter_cls: Type[IReporterInterface] = ReporterSample,
level=Config.LEVEL):
worker = ThreadWorkers(
executor=executor_cls,
_queue=_q,
workers=workers,
timeout=Config.TIMEOUT,
level=level)
worker.run()
reporter_cls.print_results(worker.results)
# -----------------------------
# CLI / main
# -----------------------------
def parse_args(argv):
today = datetime.datetime.now().strftime("%Y-%m-%d")
log_level = Config.LEVEL
debug = False
info = False
threaded = False
print(f"[INFO] {today}")
if len(argv) == 0:
print("[ERROR]Please input option (number, log level)")
print("[INFO]Example: python main.py 1 -> Check Config.EXECUTOR_CLS")
print("[INFO]Example: python main.py 2 -> Run SSH Executor")
print("[INFO]Example: python main.py 2 -t -> Run SSH Executor with threading")
print("[INFO]Example: python main.py 2 --debug -> Run SSH Executor with DEBUG log level")
exit(1)
target = argv[0]
i = 1
while i < len(argv):
a = argv[i]
if a == "--debug":
log_level = logging.DEBUG
debug = True
i += 1
continue
if a == "--info":
log_level = logging.INFO
info = True
i += 1
continue
if a == "-t" or a == "--threaded":
threaded = True
i += 1
return int(target), log_level, debug, info, threaded
def main(argv):
# targets from config.ini
main_logger = setup_logger("main", Config.LEVEL)
dataset = SwitchListDataset()
main_logger.debug(dataset)
targets = dataset.targets_list
target, log_level, debug, info, threaded = parse_args(argv)
executor = None
reporter = None
if target == 1:
print("[INFO] Checking EXECUTOR_CLS...")
print(f"[INFO] EXECUTOR_CLS: {Config.EXECUTOR_CLS}")
print(f"[INFO] REPORTER_CLS: {Config.REPORTER_CLS}")
if target == 2:
# executor = FetchLSDFExecutor
executor_name = Config.EXECUTOR_CLS
reporter_cls = Config.REPORTER_CLS
module = sys.modules[__name__]
executor = getattr(module, executor_name, None)
reporter = getattr(module, reporter_cls, None)
if executor is None:
print("[ERROR]Please input option (number, log level): number 1 or 2")
exit(1)
if threaded:
# THREADING version
q = set_queue(_targets=targets)
main_threads(_q=q,
workers=Config.MAX_WORKERS,
executor_cls=executor,
reporter_cls=reporter,
level=log_level)
else:
# NO threading version
main_single(_targets=targets, executor_cls=executor, level=log_level)
if __name__ == "__main__":
main(sys.argv[1:])
gc.collect()
config.py
import logging
# -----------------------------
#Config
# -----------------------------
class Config(object):
# SSH Connection Settings
USERNAME = "root"
PASSWORD = "rootroot"
PORT = 22
TIMEOUT = 10
# Other Settings
CONFIG_FILE = "config.ini"
SETTINGS_DIR = "settings"
OUTPUT_DIR = "out"
LEVEL = logging.WARN
# SET EXECUTOR_CLS
executor_class_list = [
"FetchFileListExecutor",
"FetchLSDFExecutor",]
reporter_class_list = [
"ReporterSample",]
executor_idx = 1
reporter_idx = 0
EXECUTOR_CLS = executor_class_list[executor_idx]
REPORTER_CLS = reporter_class_list[reporter_idx]
# THREADING
MAX_WORKERS = 3
# -----------------------------
# Logger
# -----------------------------
def setup_logger(name, level=logging.INFO):
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
h = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(threadName)s: %(message)s")
h.setFormatter(fmt)
logger.addHandler(h)
return logger
executor.py
from abc import ABC, abstractmethod
import subprocess
import paramiko
import datetime
import time
import os
import socket
import logging
import shlex
import json
from typing import List, Type
from dataclasses import dataclass
from config import Config, setup_logger
# -----------------------------
# SSH Connection
# -----------------------------
class ISSHClientInterface(ABC):
"""SSH Client implementation using subprocess or paramiko."""
def __init__(self,
commands: List[str],
ipaddr=None,
hostname=None,
username: str =Config.USERNAME,
password: str = Config.PASSWORD,
port: int = Config.PORT,
timeout: int = Config.TIMEOUT,
level=Config.LEVEL,):
self.log = self._set_logger(level=level)
self.commands = commands
self.hostname = hostname
self.ssh_host = ipaddr
self.ssh_user = username
self.password = password
self.port = port
self.timeout = timeout
@staticmethod
@abstractmethod
def _set_logger(level):
return setup_logger("ISSHClientInterface", level)
@abstractmethod
def execute_command(self):
pass
class SSHClientSubprocess(ISSHClientInterface):
"""SSH Client implementation using subprocess"""
@staticmethod
def _set_logger(level):
return setup_logger("SSHClientSubprocess", level)
def execute_command(self):
self.log.debug({"commands": self.commands})
cmd = []
if self.ssh_host:
user_at = "{}@{}".format(self.ssh_user, self.ssh_host) if self.ssh_user else self.ssh_host
cmd += ["ssh", user_at]
# cmd += self.commands
cmd += shlex.split(self.commands[0])
self.log.debug({"commands split": cmd})
self.log.debug("Running commands: %s", " ".join(cmd))
try:
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=self.timeout,)
if p.returncode != 0:
self.log.info("commands failed rc=%s stderr=%s", p.returncode, p.stderr.strip())
raise RuntimeError("commands failed")
raw_lines = [ln for ln in p.stdout.splitlines() if ln.strip()]
# debug: raw all
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug("RAW_ALL_BEGIN total=%d", len(raw_lines))
for ln in raw_lines:
self.log.debug("RAW|%s", ln)
self.log.debug("RAW_ALL_END")
except subprocess.TimeoutExpired as e:
message = f"[WARN] SSH command timed out: {e.cmd} (after {e.timeout} sec)"
self.log.error(message)
return [message] # or None, or 特別なオブジェクト
except FileNotFoundError as e:
# FileNotFoundError専用の処理
self.log.error(f"[ERROR] Command not found: {e.filename}")
return [f"[ERROR] Command not found: {e.filename}"]
except Exception as e:
# その他の例外
self.log.error(f"[ERROR] Unexpected: {e}")
return [f"[ERROR] {e}"]
return raw_lines
class ParamikoSSHClient(ISSHClientInterface):
"""SSH Client implementation using Paramiko"""
@staticmethod
def _set_logger(level):
return setup_logger("ParamikoSSHClient", level)
@staticmethod
def _recv_all(chan: paramiko.Channel, max_wait_s: float = 1.0) -> str:
"""
簡易版: 受信が止まるまで読む(プロンプト判定なし)
- max_wait_s: 最後の受信からこの秒数何も来なければ終了
"""
data = bytearray()
last_rx = time.time()
while True:
if chan.recv_ready():
chunk = chan.recv(65535)
if not chunk:
break
data.extend(chunk)
last_rx = time.time()
else:
if (time.time() - last_rx) >= max_wait_s:
break
time.sleep(0.05)
return data.decode("utf-8", errors="replace")
def execute_command(self):
self.log.debug("execute command: %s", self.commands)
if self.ssh_host is None:
return "[ERROR] SSH host is required (ipaddr or hostname)"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
hostname=self.ssh_host,
username=self.ssh_user,
password=self.password,
port=self.port,
timeout=self.timeout,
banner_timeout=30,
auth_timeout=30,
look_for_keys=False,
allow_agent=False,
)
chan = client.invoke_shell()
time.sleep(0.2)
# 接続直後のバナー/プロンプト等を読み捨て(必要なら残してもOK)
_ = self._recv_all(chan, max_wait_s=0.5)
out_parts = []
if self.commands:
for cmd in self.commands:
if not cmd.endswith("\n"):
cmd += "\n"
chan.send(cmd)
# コマンド出力を収集(コマンドにより待ち時間は変わるので少し長めでもOK)
out_parts.append(self._recv_all(chan, max_wait_s=1.2))
return "".join(out_parts)
# return out_parts
except (paramiko.SSHException,
socket.error,
TimeoutError) as e:
return f"[ERROR] {self.ssh_host} : {e}"
finally:
try:
client.close()
except Exception as e:
self.log.debug("Error during close(): %s", e)
#-----------------------------
# SSH Executor Interface
#-----------------------------
@dataclass
class ServerInfo:
ipaddr: str = None
hostname: str = None
username: str = Config.USERNAME
password: str = Config.PASSWORD
port: int = Config.PORT
timeout: int = Config.TIMEOUT
class ISSHExecutorInterface(ABC):
def __init__(self,
server_info: ServerInfo,
timeout=Config.TIMEOUT,
level=Config.LEVEL):
self.server_info = server_info
self.ssh_client_cls = self.build_ssh_client_cls()
self.logger = setup_logger(self.name, level)
self.commands = self.build_command()
self.timeout = timeout
self.result = None
self.out_filename = self.set_out_filename(server_info.hostname, Config.OUTPUT_DIR)
@staticmethod
def set_out_filename(filename: str, out_dir) -> str:
now_date = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
filename = f"{filename}_{now_date}.txt"
if out_dir is None:
return filename
else:
filepath = os.path.join(out_dir, filename)
return filepath
def write_log(self):
"""
output command results
:return:
"""
if self.result is None:
self.execute_command()
self.logger.debug(f'write to {self.out_filename}')
with open(self.out_filename, mode="w") as f:
for text in self.result:
text = str(text).lstrip("b'")
text = str(text).lstrip("'")
f.write(text + "\n")
self.logger.debug(text)
self.logger.debug('end to write logs')
def execute(self) -> None:
ssh_client = self.ssh_client_cls(
ipaddr=self.server_info.ipaddr,
hostname=self.server_info.hostname,
username=self.server_info.username,
password=self.server_info.password,
commands=self.commands,
timeout=self.timeout,
level=self.logger.level,
)
self.result = ssh_client.execute_command()
@staticmethod
@abstractmethod
def build_ssh_client_cls() -> Type[ISSHClientInterface]:
"""実行するSSHクライアントのクラスを返す
ssh_client_cls: Type[ISSHClientInterface]
例: return SSHClientSubprocess
"""
pass
@staticmethod
@abstractmethod
def build_command() -> List[str]:
pass
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def execute_command(self):
pass
# -----------------------------
# Concrete Executor.
# -----------------------------
class FetchFileListExecutor(ISSHExecutorInterface):
"""
2026.03.11 sample code for Linux command.
SSHクライアントはSSHClientSubprocessを使用する例。
show /home file list.
"""
@staticmethod
def build_ssh_client_cls():
return SSHClientSubprocess
@staticmethod
def build_command() -> List[str]:
return [
"ls -l /home",
]
@property
def name(self) -> str:
return "FetchFileListExecutor"
def execute_command(self):
self.execute()
self.write_log()
return self.result
class FetchLSDFExecutor(ISSHExecutorInterface):
"""
2026.03.11 sample code for Linux command.
show /home file list.
show nfs volume.
2つ以上のコマンドを実行する例。
ParamikoSSHClient()で実行することを想定(複数コマンドの実行はsubprocessだと少し面倒なので)。
"""
@staticmethod
def build_ssh_client_cls():
return ParamikoSSHClient
@staticmethod
def build_command() -> List[str]:
return [
"ls /home --color=never",
"df -h"
]
@property
def name(self) -> str:
return "FetchFileListExecutor"
def execute_command(self):
self.execute()
self.result = self.result.split("\n")
self.write_log()
# text = self.result.splitlines()
text = self.result
text_lines = [ln for ln in text if ln.strip()]
result = text_lines[1:2]
return result
#-----------------------------
# Example usage: main_single
#-----------------------------
def main_single(_targets: List[dict],
executor_cls: Type[ISSHExecutorInterface] = FetchFileListExecutor,
level=Config.LEVEL):
datasets = []
for t in _targets:
server_info = ServerInfo(
ipaddr=t.get("ipaddr", None),
hostname=t.get("hostname", None),
username=t.get("username", Config.USERNAME),
password=t.get("password", Config.PASSWORD))
datasets.append(server_info)
print(len(datasets))
results = []
for dataset in datasets:
executor = executor_cls(server_info=dataset, level=level)
res = executor.execute_command()
results.append({dataset.hostname: res})
print(json.dumps(results, indent=2, ensure_ascii=False))
#-------------------------------------------------
# Sample Dataset(for testing without config.ini)
#-------------------------------------------------
def sample_datasets() -> List[dict]:
return [
{"ipaddr": "192.168.64.2", "hostname": "rx8headnode"},
{"ipaddr": "192.168.64.4", "hostname": "rx8node01", "username": Config.USERNAME, "password": Config.PASSWORD},
{"ipaddr": "192.168.64.2", "hostname": "rx8headnode", "username": Config.USERNAME, "password": Config.PASSWORD},
{"ipaddr": None, "host": None},
# Add more targets as needed
]
def sample_one_target() -> List[dict]:
return [
{"ipaddr": "192.168.64.2", "hostname": "rx8headnode", "username": Config.USERNAME, "password": Config.PASSWORD},]
#--------------------------------------------------------------
# CLI / main ( Test for single execution without threads )
#--------------------------------------------------------------
if __name__ == '__main__':
targets = sample_one_target()
executor = FetchLSDFExecutor
log_level = logging.DEBUG
main_single(_targets=targets, executor_cls=executor, level=log_level)
utils.py
import os
import json
from abc import ABC, abstractmethod
from typing import List
from pprint import pformat
from config import Config
# -----------------------------------
# Fetch Target List from config file
# -----------------------------------
class SwitchListDataset(object):
"""
fetch ip address and hostname list from config life.
- config.iniの内容を読み込んで、IPアドレスとホスト名のリストを作成する例。
- config.iniは以下のような形式を想定(1行目がヘッダー、2行目以降がデータ):
ipaddr,hostname,username,password
folder: settings
file: config.ini
"""
def __init__(self):
cur_dir = os.getcwd()
self.targets_file = os.path.join(cur_dir, Config.SETTINGS_DIR, Config.CONFIG_FILE)
self.targets_list = []
self.import_config()
def import_config(self):
with open(self.targets_file, 'r', encoding="utf-8") as f:
headers = []
lines = f.readlines()
for cnt, line in enumerate(lines):
device = {}
line = line.rstrip("\n")
items = line.split(",")
if cnt == 0:
for item in items:
headers.append(item)
else:
for idx, item in enumerate(items):
if item == "":
item = None
device[headers[idx]] = item
if not line:
continue
if device != {}:
self.targets_list.append(device)
def __str__(self):
return json.dumps(self.targets_list, indent=2, ensure_ascii=False)
# -----------------------------
# Reporter
# -----------------------------
class IReporterInterface(ABC):
@staticmethod
@abstractmethod
def print_results(results: List[dict]) -> None:
pass
class ReporterSample(IReporterInterface):
@staticmethod
def print_results(results: List[dict]) -> None:
print('*' * 80)
print("[INFO]: Results Summary")
# print(json.dumps(worker.results, indent=2, ensure_ascii=False))
width = 12
for res in results:
for hostname, lines in res.items():
print("-" * 80)
hostname_str = str(hostname) if hostname is not None else "Unknown Host"
# print(f"--- results for {hostname} ---")
if lines == [] or lines is None:
print(f"{hostname_str:<{width}} | No data or error occurred.")
# print(f"{hostname_str}, No data or error occurred.")
continue
for line in lines:
msg = line.replace("\\r", "").replace("\\n", "\n")
print(f"{hostname_str:<{width}} | {msg}")
# print(f"{hostname_str}, {msg}")
settings/config.ini (サンプル)
hostname,ipaddr,username,password
rx8headnode,192.168.64.2,root,rootroot
rx8node01,192.168.64.4,root,rootroot
rx8headnode,192.168.64.2,,
,,,
実行例
(.venv) 03_Threads$ python main.py 2 -t --info
[INFO] 2026-03-15
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-1: {'thread': ('rx8headnode', '192.168.64.2')}
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-2: {'thread': ('rx8node01', '192.168.64.4')}
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-3: {'thread': ('rx8headnode', '192.168.64.2')}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: done: {'rx8headnode': []}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: {'thread': (None, None)}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: done: {None: []}
2026-03-15 08:37:25,848 INFO ThreadWorkers: Thread-1: done: {'rx8headnode': ['APPLI n1001 n1003 n1005 n1007 n1009 user01\r']}
2026-03-15 08:37:27,095 INFO ThreadWorkers: Thread-2: done: {'rx8node01': ['df -h\r']}
********************************************************************************
[INFO]: Results Summary
--------------------------------------------------------------------------------
rx8headnode | No data or error occurred.
--------------------------------------------------------------------------------
Unknown Host | No data or error occurred.
--------------------------------------------------------------------------------
rx8headnode | APPLI n1001 n1003 n1005 n1007 n1009 user01
--------------------------------------------------------------------------------
rx8node01 | df -h
(.venv) 03_Threads$
2026.03.26
時間を計測するための追記
import time
if __name__ == "__main__":
dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
start = time.time()
main(sys.argv[1:])
end = time.time()
print(f"[INFO] Start: {dt_now}, Elapsed Time: {end - start:.2f} seconds")
gc.collect()
def main(argv)には以下があると良い
print(f"[INFO] Target: {target}, Log Level: {logging.getLevelName(log_level)}, Threaded: {threaded}")
ISSHClientInterface(ABC)では、set_queueを使わないと、username, passwordにNoneが入った状態で入ってくるバグがあるので、以下のコードを追加する
class ISSHClientInterface(ABC):
"""SSH Client implementation using subprocess or paramiko."""
def __init__(self,
commands: List[str],
ipaddr=None,
hostname=None,
username: str =Config.USERNAME,
password: str = Config.PASSWORD,
port: int = Config.PORT,
timeout: int = Config.TIMEOUT,
level=Config.LEVEL,):
self.log = self._set_logger(level=level)
self.commands = commands
self.hostname = hostname
self.ssh_host = ipaddr
if username == "" or username is None:
self.log.debug(username)
username = Config.USERNAME
self.ssh_user = username
if password == "" or password is None:
self.log.debug(password)
password = Config.PASSWORD
self.password = password
self.port = port
self.timeout = timeout