KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

Threadを使ったSSHログコレクター

登録日 :2026/03/15 08:30
カテゴリ :Python基礎

SSHでサーバやスイッチにログインしてコマンドを実行するプログラム。
構成

out : コマンド実行結果をテストで出力するフォルダ
settings/config.ini  :  接続対象のリスト
config.py : 設定ファイル
executor.py : SSHログインとコマンド実行するクラスを定義しているライブラリ
utils.py : 設定ファイルの読み込みや、サマリーを表示する関数を定義しているライブラリ
main.py : Threadを考慮してプログラムを実行する

(使用手順)

python main.py 1 : 設定の確認
python main.py 2 : 逐次実行(Threadなし)
python main.py 2 -t : Threadモードで実行
python main.py 2 -t --info : インフォメーションを表示しながら実行
python main.py 2 -t --debug : デバックモードで実行


新しいコマンドを定義したい場合は、executor.pyで concrete executorを作成し、config.pyで 新しいexecutor_class_listに登録後、これを有効にするため、executor_idxで設定すれば良い。
サマリーの表示はReporterで行っており、サマリーもexecutorクラスの方法と同じようにカスタマイズ可能。


(プログラム)
main.py

from abc import ABC, abstractmethod
import datetime
import os
import sys
import logging
import queue
import threading
import json
from typing import List, Type
from pprint import pformat
import gc

from rpds.rpds import Queue

from config import Config, setup_logger
from utils import SwitchListDataset, IReporterInterface, ReporterSample
from executor import (
    ISSHExecutorInterface,
    FetchFileListExecutor,
    FetchLSDFExecutor,
    ServerInfo,
    main_single,
)


#-----------------------------
# Thread Worker
#-----------------------------
class IThreadWorkerInterface(ABC):
    def __init__(self,
                 executor: Type[ISSHExecutorInterface],
                 _queue, workers=1, timeout=Config.TIMEOUT, level=Config.LEVEL):
        self.executor = executor
        self.queue = _queue
        self.workers = workers
        self.timeout_s = timeout
        self.logger = setup_logger(self.name, level)
        self.result_lock = threading.Lock()
        self.results = []

    @property
    @abstractmethod
    def name(self) -> str:
        pass

    @abstractmethod
    def run(self):
        pass

    @abstractmethod
    def worker(self):
        pass


class ThreadWorkers(IThreadWorkerInterface):
    """
    QueueからServerInfoを取り出し、SSH Executorを実行するスレッドワーカーの例。
     - executor: ISSHExecutorInterfaceを継承したクラスを指定
     - _queue: ServerInfoオブジェクトを入れたQueueを指定
     - workers: スレッド数
     - timeout: SSHコマンドのタイムアウト秒数
     - level: ログレベル
     - 結果はself.resultsに格納(必要に応じてロックを使用して安全にアクセス)
     - 結果の格納方法は必要に応じて変更してください(例: self.results.append((server_info, res))など)
     - ログにはスレッド名も含まれるので、どのスレッドがどのサーバーを処理しているかがわかるようになっています。
     - 例: ThreadWorkers(executor=FetchLSDFExecutor, _queue=q, workers=5, timeout=10, level=logging.DEBUG).run()
     - 例: ThreadWorkers(executor=FetchFileListExecutor, _queue=q, workers=5, timeout=10, level=logging.DEBUG).run()
    """
    @property
    def name(self) -> str:
        return "ThreadWorkers"

    def run(self):
        ts = []
        for _ in range(self.workers):
            t = threading.Thread(target=self.worker)
            t.start()
            ts.append(t)
        [self.queue.put(None) for _ in range(len(ts))]
        [t.join() for t in ts]

    def worker(self):
        self.logger.debug('workers start')
        while True:
            item = self.queue.get()
            if item is None:
                break
            self.logger.info({'thread': (item.hostname, item.ipaddr)})
            executor = self.executor(server_info=item, timeout=self.timeout_s, level=self.logger.level)
            res = executor.execute_command()
            messages = {item.hostname: res}
            self.logger.info(f"done: {messages}")
            # self.logger.info(f"done:\n{pformat(messages, indent=2)}")

            with self.result_lock:
                self.results.append({item.hostname: res})

            self.logger.debug(type(res))
            if type(res) == list:
                self.logger.debug(json.dumps(res, indent=2, ensure_ascii=False))
            else:
                self.logger.debug(res)
            self.queue.task_done()
        self.logger.debug('workers end')


#-----------------------------
# Set Queue
#-----------------------------
def set_queue(_targets: List[dict]):
    q = queue.Queue()
    for t in _targets:
        username = t.get("username", Config.USERNAME)
        if username == "" or username is None:
            username = Config.USERNAME
        password = t.get("password", Config.PASSWORD)
        if password == "" or password is None:
            password = Config.PASSWORD

        server_info = ServerInfo(
            ipaddr=t.get("ipaddr", None),
            hostname=t.get("hostname", None),
            username=username,
            password=password,)
        q.put(server_info)
    return q


#-----------------------------
# Main
#-----------------------------
def main_threads(_q: Queue,
                 workers=1,
                 executor_cls: Type[ISSHExecutorInterface] = FetchFileListExecutor,
                 reporter_cls: Type[IReporterInterface] = ReporterSample,
                 level=Config.LEVEL):
    worker = ThreadWorkers(
        executor=executor_cls,
        _queue=_q,
        workers=workers,
        timeout=Config.TIMEOUT,
        level=level)
    worker.run()

    reporter_cls.print_results(worker.results)


# -----------------------------
# CLI / main
# -----------------------------
def parse_args(argv):
    today = datetime.datetime.now().strftime("%Y-%m-%d")
    log_level = Config.LEVEL
    debug = False
    info = False
    threaded = False

    print(f"[INFO] {today}")
    if len(argv) == 0:
        print("[ERROR]Please input option (number, log level)")
        print("[INFO]Example: python main.py 1 -> Check Config.EXECUTOR_CLS")
        print("[INFO]Example: python main.py 2 -> Run SSH Executor")
        print("[INFO]Example: python main.py 2 -t -> Run SSH Executor with threading")
        print("[INFO]Example: python main.py 2 --debug -> Run SSH Executor with DEBUG log level")
        exit(1)

    target = argv[0]
    i = 1
    while i < len(argv):
        a = argv[i]
        if a == "--debug":
            log_level = logging.DEBUG
            debug = True
            i += 1
            continue
        if a == "--info":
            log_level = logging.INFO
            info = True
            i += 1
            continue
        if a == "-t" or a == "--threaded":
            threaded = True
        i += 1
    return int(target), log_level, debug, info, threaded


def main(argv):
    # targets from config.ini
    main_logger = setup_logger("main", Config.LEVEL)
    dataset = SwitchListDataset()
    main_logger.debug(dataset)
    targets = dataset.targets_list

    target, log_level, debug, info, threaded = parse_args(argv)
    executor = None
    reporter = None
    if target == 1:
        print("[INFO] Checking EXECUTOR_CLS...")
        print(f"[INFO] EXECUTOR_CLS: {Config.EXECUTOR_CLS}")
        print(f"[INFO] REPORTER_CLS: {Config.REPORTER_CLS}")
    if target == 2:
        # executor = FetchLSDFExecutor
        executor_name = Config.EXECUTOR_CLS
        reporter_cls = Config.REPORTER_CLS
        module = sys.modules[__name__]
        executor = getattr(module, executor_name, None)
        reporter = getattr(module, reporter_cls, None)

    if executor is None:
        print("[ERROR]Please input option (number, log level): number 1 or 2")
        exit(1)

    if threaded:
        # THREADING version
        q = set_queue(_targets=targets)
        main_threads(_q=q,
                     workers=Config.MAX_WORKERS,
                     executor_cls=executor,
                     reporter_cls=reporter,
                     level=log_level)
    else:
        # NO threading version
        main_single(_targets=targets, executor_cls=executor, level=log_level)


if __name__ == "__main__":
    main(sys.argv[1:])

    gc.collect()

config.py

import logging


# -----------------------------
#Config
# -----------------------------
class Config(object):
    # SSH Connection Settings
    USERNAME = "root"
    PASSWORD = "rootroot"
    PORT = 22
    TIMEOUT = 10

    # Other Settings
    CONFIG_FILE = "config.ini"
    SETTINGS_DIR = "settings"
    OUTPUT_DIR = "out"
    LEVEL = logging.WARN

    # SET EXECUTOR_CLS
    executor_class_list = [
        "FetchFileListExecutor",
        "FetchLSDFExecutor",]
    reporter_class_list = [
        "ReporterSample",]

    executor_idx = 1
    reporter_idx = 0

    EXECUTOR_CLS = executor_class_list[executor_idx]
    REPORTER_CLS = reporter_class_list[reporter_idx]

    # THREADING
    MAX_WORKERS = 3


# -----------------------------
# Logger
# -----------------------------
def setup_logger(name, level=logging.INFO):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    if not logger.handlers:
        h = logging.StreamHandler()
        fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(threadName)s: %(message)s")
        h.setFormatter(fmt)
        logger.addHandler(h)
    return logger

executor.py

from abc import ABC, abstractmethod
import subprocess
import paramiko
import datetime
import time
import os
import socket
import logging
import shlex
import json
from typing import List, Type
from dataclasses import dataclass

from config import Config, setup_logger

# -----------------------------
# SSH Connection
# -----------------------------
class ISSHClientInterface(ABC):
    """SSH Client implementation using subprocess or paramiko."""
    def __init__(self,
                 commands: List[str],
                 ipaddr=None,
                 hostname=None,
                 username: str =Config.USERNAME,
                 password: str = Config.PASSWORD,
                 port: int = Config.PORT,
                 timeout: int = Config.TIMEOUT,
                 level=Config.LEVEL,):
        self.log = self._set_logger(level=level)
        self.commands = commands
        self.hostname = hostname
        self.ssh_host = ipaddr
        self.ssh_user = username
        self.password = password
        self.port = port
        self.timeout = timeout

    @staticmethod
    @abstractmethod
    def _set_logger(level):
        return setup_logger("ISSHClientInterface", level)

    @abstractmethod
    def execute_command(self):
        pass


class SSHClientSubprocess(ISSHClientInterface):
    """SSH Client implementation using subprocess"""
    @staticmethod
    def _set_logger(level):
        return setup_logger("SSHClientSubprocess", level)

    def execute_command(self):
        self.log.debug({"commands": self.commands})
        cmd = []
        if self.ssh_host:
            user_at = "{}@{}".format(self.ssh_user, self.ssh_host) if self.ssh_user else self.ssh_host
            cmd += ["ssh", user_at]

        # cmd += self.commands
        cmd += shlex.split(self.commands[0])

        self.log.debug({"commands split": cmd})
        self.log.debug("Running commands: %s", " ".join(cmd))
        try:
            p = subprocess.run(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True,
                timeout=self.timeout,)
            if p.returncode != 0:
                self.log.info("commands failed rc=%s stderr=%s", p.returncode, p.stderr.strip())
                raise RuntimeError("commands failed")
            raw_lines = [ln for ln in p.stdout.splitlines() if ln.strip()]

            # debug: raw all
            if self.log.isEnabledFor(logging.DEBUG):
                self.log.debug("RAW_ALL_BEGIN total=%d", len(raw_lines))
                for ln in raw_lines:
                    self.log.debug("RAW|%s", ln)
                self.log.debug("RAW_ALL_END")
        except subprocess.TimeoutExpired as e:
            message = f"[WARN] SSH command timed out: {e.cmd} (after {e.timeout} sec)"
            self.log.error(message)
            return [message]  # or None, or 特別なオブジェクト

        except FileNotFoundError as e:
            # FileNotFoundError専用の処理
            self.log.error(f"[ERROR] Command not found: {e.filename}")
            return [f"[ERROR] Command not found: {e.filename}"]

        except Exception as e:
            # その他の例外
            self.log.error(f"[ERROR] Unexpected: {e}")
            return [f"[ERROR] {e}"]

        return raw_lines


class ParamikoSSHClient(ISSHClientInterface):
    """SSH Client implementation using Paramiko"""
    @staticmethod
    def _set_logger(level):
        return setup_logger("ParamikoSSHClient", level)

    @staticmethod
    def _recv_all(chan: paramiko.Channel, max_wait_s: float = 1.0) -> str:
        """
        簡易版: 受信が止まるまで読む(プロンプト判定なし)
        - max_wait_s: 最後の受信からこの秒数何も来なければ終了
        """
        data = bytearray()
        last_rx = time.time()

        while True:
            if chan.recv_ready():
                chunk = chan.recv(65535)
                if not chunk:
                    break
                data.extend(chunk)
                last_rx = time.time()
            else:
                if (time.time() - last_rx) >= max_wait_s:
                    break
                time.sleep(0.05)

        return data.decode("utf-8", errors="replace")

    def execute_command(self):
        self.log.debug("execute command: %s", self.commands)
        if self.ssh_host is None:
            return "[ERROR] SSH host is required (ipaddr or hostname)"

        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        try:
            client.connect(
                hostname=self.ssh_host,
                username=self.ssh_user,
                password=self.password,
                port=self.port,
                timeout=self.timeout,
                banner_timeout=30,
                auth_timeout=30,
                look_for_keys=False,
                allow_agent=False,
            )

            chan = client.invoke_shell()
            time.sleep(0.2)

            # 接続直後のバナー/プロンプト等を読み捨て(必要なら残してもOK)
            _ = self._recv_all(chan, max_wait_s=0.5)
            out_parts = []
            if self.commands:
                for cmd in self.commands:
                    if not cmd.endswith("\n"):
                        cmd += "\n"
                    chan.send(cmd)
                    # コマンド出力を収集(コマンドにより待ち時間は変わるので少し長めでもOK)
                    out_parts.append(self._recv_all(chan, max_wait_s=1.2))
            return "".join(out_parts)
            # return out_parts

        except (paramiko.SSHException,
                socket.error,
                TimeoutError) as e:
            return f"[ERROR] {self.ssh_host} : {e}"

        finally:
            try:
                client.close()
            except Exception as e:
                self.log.debug("Error during close(): %s", e)


#-----------------------------
# SSH Executor Interface
#-----------------------------
@dataclass
class ServerInfo:
    ipaddr: str = None
    hostname: str = None
    username: str = Config.USERNAME
    password: str = Config.PASSWORD
    port: int = Config.PORT
    timeout: int = Config.TIMEOUT


class ISSHExecutorInterface(ABC):
    def __init__(self,
                 server_info: ServerInfo,
                 timeout=Config.TIMEOUT,
                 level=Config.LEVEL):
        self.server_info = server_info
        self.ssh_client_cls = self.build_ssh_client_cls()
        self.logger = setup_logger(self.name, level)
        self.commands = self.build_command()
        self.timeout = timeout
        self.result = None
        self.out_filename = self.set_out_filename(server_info.hostname, Config.OUTPUT_DIR)

    @staticmethod
    def set_out_filename(filename: str, out_dir) -> str:
        now_date = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        filename = f"{filename}_{now_date}.txt"
        if out_dir is None:
            return filename
        else:
            filepath = os.path.join(out_dir, filename)
            return filepath

    def write_log(self):
        """
        output command results
        :return:
        """
        if self.result is None:
            self.execute_command()

        self.logger.debug(f'write to {self.out_filename}')
        with open(self.out_filename, mode="w") as f:
            for text in self.result:
                text = str(text).lstrip("b'")
                text = str(text).lstrip("'")
                f.write(text + "\n")
                self.logger.debug(text)
        self.logger.debug('end to write logs')

    def execute(self) -> None:
        ssh_client = self.ssh_client_cls(
            ipaddr=self.server_info.ipaddr,
            hostname=self.server_info.hostname,
            username=self.server_info.username,
            password=self.server_info.password,
            commands=self.commands,
            timeout=self.timeout,
            level=self.logger.level,
        )
        self.result = ssh_client.execute_command()

    @staticmethod
    @abstractmethod
    def build_ssh_client_cls() -> Type[ISSHClientInterface]:
        """実行するSSHクライアントのクラスを返す
        ssh_client_cls: Type[ISSHClientInterface]
        例: return SSHClientSubprocess
        """
        pass

    @staticmethod
    @abstractmethod
    def build_command() -> List[str]:
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        pass

    @abstractmethod
    def execute_command(self):
        pass


# -----------------------------
# Concrete Executor.
# -----------------------------
class FetchFileListExecutor(ISSHExecutorInterface):
    """
    2026.03.11 sample code for Linux command.
    SSHクライアントはSSHClientSubprocessを使用する例。
    show /home file list.
    """
    @staticmethod
    def build_ssh_client_cls():
        return SSHClientSubprocess

    @staticmethod
    def build_command() -> List[str]:
        return [
            "ls -l /home",
        ]

    @property
    def name(self) -> str:
        return "FetchFileListExecutor"

    def execute_command(self):
        self.execute()
        self.write_log()
        return self.result


class FetchLSDFExecutor(ISSHExecutorInterface):
    """
    2026.03.11 sample code for Linux command.
    show /home file list.
    show nfs volume.
    2つ以上のコマンドを実行する例。
    ParamikoSSHClient()で実行することを想定(複数コマンドの実行はsubprocessだと少し面倒なので)。
    """
    @staticmethod
    def build_ssh_client_cls():
        return ParamikoSSHClient

    @staticmethod
    def build_command() -> List[str]:
        return [
            "ls /home --color=never",
            "df -h"
        ]

    @property
    def name(self) -> str:
        return "FetchFileListExecutor"

    def execute_command(self):
        self.execute()
        self.result = self.result.split("\n")
        self.write_log()
        # text = self.result.splitlines()
        text = self.result
        text_lines = [ln for ln in text if ln.strip()]
        result = text_lines[1:2]
        return result


#-----------------------------
# Example usage: main_single
#-----------------------------
def main_single(_targets: List[dict],
                executor_cls: Type[ISSHExecutorInterface] = FetchFileListExecutor,
                level=Config.LEVEL):
    datasets = []
    for t in _targets:
        server_info = ServerInfo(
            ipaddr=t.get("ipaddr", None),
            hostname=t.get("hostname", None),
            username=t.get("username", Config.USERNAME),
            password=t.get("password", Config.PASSWORD))
        datasets.append(server_info)

    print(len(datasets))
    results = []
    for dataset in datasets:
        executor = executor_cls(server_info=dataset, level=level)
        res = executor.execute_command()
        results.append({dataset.hostname: res})

    print(json.dumps(results, indent=2, ensure_ascii=False))


#-------------------------------------------------
# Sample Dataset(for testing without config.ini)
#-------------------------------------------------
def sample_datasets() -> List[dict]:
    return [
        {"ipaddr": "192.168.64.2", "hostname": "rx8headnode"},
        {"ipaddr": "192.168.64.4", "hostname": "rx8node01", "username": Config.USERNAME, "password": Config.PASSWORD},
        {"ipaddr": "192.168.64.2", "hostname": "rx8headnode", "username": Config.USERNAME, "password": Config.PASSWORD},
        {"ipaddr": None, "host": None},
        # Add more targets as needed
    ]

def sample_one_target() -> List[dict]:
    return [
        {"ipaddr": "192.168.64.2", "hostname": "rx8headnode", "username": Config.USERNAME, "password": Config.PASSWORD},]


#--------------------------------------------------------------
# CLI / main ( Test for single execution without threads )
#--------------------------------------------------------------
if __name__ == '__main__':

    targets = sample_one_target()
    executor = FetchLSDFExecutor
    log_level = logging.DEBUG

    main_single(_targets=targets, executor_cls=executor, level=log_level)

utils.py

import os
import json
from abc import ABC, abstractmethod
from typing import List
from pprint import pformat

from config import Config

# -----------------------------------
# Fetch Target List from config file
# -----------------------------------
class SwitchListDataset(object):
    """
    fetch ip address and hostname list from config life.
        - config.iniの内容を読み込んで、IPアドレスとホスト名のリストを作成する例。
        - config.iniは以下のような形式を想定(1行目がヘッダー、2行目以降がデータ):
        ipaddr,hostname,username,password
    folder: settings
    file: config.ini
    """
    def __init__(self):
        cur_dir = os.getcwd()
        self.targets_file = os.path.join(cur_dir, Config.SETTINGS_DIR, Config.CONFIG_FILE)
        self.targets_list = []
        self.import_config()

    def import_config(self):
        with open(self.targets_file, 'r', encoding="utf-8") as f:
            headers = []
            lines = f.readlines()
            for cnt, line in enumerate(lines):
                device = {}
                line = line.rstrip("\n")
                items = line.split(",")
                if cnt == 0:
                    for item in items:
                        headers.append(item)
                else:
                    for idx, item in enumerate(items):
                        if item == "":
                            item = None
                        device[headers[idx]] = item
                if not line:
                    continue
                if device != {}:
                    self.targets_list.append(device)

    def __str__(self):
        return json.dumps(self.targets_list, indent=2, ensure_ascii=False)


# -----------------------------
# Reporter
# -----------------------------
class IReporterInterface(ABC):
    @staticmethod
    @abstractmethod
    def print_results(results: List[dict]) -> None:
        pass


class ReporterSample(IReporterInterface):
    @staticmethod
    def print_results(results: List[dict]) -> None:
        print('*' * 80)
        print("[INFO]: Results Summary")
        # print(json.dumps(worker.results, indent=2, ensure_ascii=False))
        width = 12
        for res in results:
            for hostname, lines in res.items():
                print("-" * 80)
                hostname_str = str(hostname) if hostname is not None else "Unknown Host"
                # print(f"--- results for {hostname} ---")
                if lines == [] or lines is None:
                    print(f"{hostname_str:<{width}} | No data or error occurred.")
                    # print(f"{hostname_str}, No data or error occurred.")
                    continue
                for line in lines:
                    msg = line.replace("\\r", "").replace("\\n", "\n")
                    print(f"{hostname_str:<{width}} | {msg}")
                    # print(f"{hostname_str}, {msg}")

settings/config.ini (サンプル)

hostname,ipaddr,username,password
rx8headnode,192.168.64.2,root,rootroot
rx8node01,192.168.64.4,root,rootroot
rx8headnode,192.168.64.2,,
,,,

実行例

(.venv) 03_Threads$ python main.py 2 -t --info 
[INFO] 2026-03-15
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-1: {'thread': ('rx8headnode', '192.168.64.2')}
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-2: {'thread': ('rx8node01', '192.168.64.4')}
2026-03-15 08:37:21,703 INFO ThreadWorkers: Thread-3: {'thread': ('rx8headnode', '192.168.64.2')}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: done: {'rx8headnode': []}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: {'thread': (None, None)}
2026-03-15 08:37:21,873 INFO ThreadWorkers: Thread-3: done: {None: []}
2026-03-15 08:37:25,848 INFO ThreadWorkers: Thread-1: done: {'rx8headnode': ['APPLI    n1001  n1003  n1005  n1007  n1009  user01\r']}
2026-03-15 08:37:27,095 INFO ThreadWorkers: Thread-2: done: {'rx8node01': ['df -h\r']}
********************************************************************************
[INFO]: Results Summary
--------------------------------------------------------------------------------
rx8headnode  | No data or error occurred.
--------------------------------------------------------------------------------
Unknown Host | No data or error occurred.
--------------------------------------------------------------------------------
rx8headnode  | APPLI    n1001  n1003  n1005  n1007  n1009  user01
--------------------------------------------------------------------------------
rx8node01    | df -h
(.venv) 03_Threads$ 

2026.03.26
時間を計測するための追記

import time

if __name__ == "__main__":
    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
    start = time.time()
    main(sys.argv[1:])
    end = time.time()
    print(f"[INFO] Start: {dt_now}, Elapsed Time: {end - start:.2f} seconds")

    gc.collect()

def main(argv)には以下があると良い

print(f"[INFO] Target: {target}, Log Level: {logging.getLevelName(log_level)}, Threaded: {threaded}")

ISSHClientInterface(ABC)では、set_queueを使わないと、username, passwordにNoneが入った状態で入ってくるバグがあるので、以下のコードを追加する

class ISSHClientInterface(ABC):
    """SSH Client implementation using subprocess or paramiko."""
    def __init__(self,
                 commands: List[str],
                 ipaddr=None,
                 hostname=None,
                 username: str =Config.USERNAME,
                 password: str = Config.PASSWORD,
                 port: int = Config.PORT,
                 timeout: int = Config.TIMEOUT,
                 level=Config.LEVEL,):
        self.log = self._set_logger(level=level)
        self.commands = commands
        self.hostname = hostname
        self.ssh_host = ipaddr
        if username == "" or username is None:
            self.log.debug(username)
            username = Config.USERNAME
        self.ssh_user = username
        if password == "" or password is None:
            self.log.debug(password)
            password = Config.PASSWORD
        self.password = password
        self.port = port
        self.timeout = timeout