KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

SLURMによる課金プログラム(Version2 出力日付やGroupの追加)

登録日 :2026/02/25 06:23
カテゴリ :SLURM

Version1
CPU計算は、cpuの稼働時間、インタラクティブやGPUは占有時間で課金するなどを考慮したプログラムを検討。CPUの稼働時間をSLURMで取得するには、cgroupと連携する必要がある。

Version2
GroupやCSV出力時のDateフォーマット調整

使用方法
cputime
cputime 20260223
cputime 20260223 --info
cputime 20260223 --debug

プログラム

"""
SLURM billing base (Python 3.6) - dict dataset version.
Created by @nobuyuki on 2026-02-23.
Version 0.1.0 2026-02-22: Initial version.
Version 0.1.1 2026-02-25: Add date formatter in BillingReporter, and parse_arg checks the date format.

Key points:

- sacct output is parsed into dict rows:
    {field: value, ...}

- dataset is dict-based and separated into parents / steps:

    dataset = {
        "parents": {jobid: parent_row_dict},
        "steps": {jobid: [step_row_dict, ...]}
    }

  (step-based CPU sums and final billing rows are generated later by BillingEngine)

- CPU aggregation policy:
    if any step rows exist for jobid -> use step sums only
    else -> fallback to parent row CPU fields

- Billing target selection (A-mode):
    only parent jobs whose End timestamp falls within target day are billed;
    corresponding step rows are always retained for CPU aggregation

- Interactive jobs are detected by SubmitLine ("--pty" or "salloc")

- GPU jobs are detected by Partition name (Config.GPU_SM_TABLE)

- Billing metric (TotalCPU / Elapsed) is configurable per job class
  (cpu / gpu / interactive) via Config

- Cluster filtering:
    --gpu option selects GPU partitions only;
    default selects CPU partitions only

"""
from abc import ABC, abstractmethod
import re
import math
import logging
import subprocess
import sys
import json
from datetime import datetime, timedelta
import gc


# -----------------------------
# Config
# -----------------------------
class Config(object):
    # DEFAULT_STARTTIME = "2026-01-01"
    DEFAULT_SPAN = 90
    # DEFAULT_SPAN = 0
    SSH_HOST = "192.168.64.2"
    SSH_USER = "root"

    # Classification keywords
    INTERACTIVE_KEYWORDS = ["--pty", "salloc"]
    GPUS_KEYWORD = ["gpu", "gres/gpu"]
    UNKNOWN_AS_CPU_BILLING = True

    # Policy switches (easy to flip later)
    # USE_OCCUPIED_FOR_INTERACTIVE = True
    CLASSIFY_PRIORITY = ["interactive_gpu", "interactive", "gpu", "default"]
    BILL_METRIC_BY_CLASS = {
        "interactive": "elapsed",
        "interactive_gpu": "elapsed",
        "gpu": "elapsed",
        "default": "elapsed",
        # "default": "totalcpu",
    }

    #
    GPU_SM_TABLE = {
        "part2": 132,
    }

    SACCT_PATH = "/usr/bin/sacct"
    # log_level = logging.DEBUG
    log_level = logging.INFO


# -----------------------------
# Logger + trace formatting
# -----------------------------
def setup_logger(level):
    logger = logging.getLogger("slurm_billing")
    logger.setLevel(level)
    if not logger.handlers:
        h = logging.StreamHandler()
        fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
        h.setFormatter(fmt)
        logger.addHandler(h)
    return logger


# -----------------------------
# Utilities
# -----------------------------

class SlurmTime(object):
    @staticmethod
    def to_seconds(t):
        if not t:
            return 0.0
        t = t.strip()
        if t == "Unknown":
            return 0.0
        t = t.replace(",", ".")
        days = 0
        if "-" in t:
            d, t = t.split("-", 1)
            try:
                days = int(d)
            except ValueError:
                days = 0
        parts = t.split(":")
        try:
            if len(parts) == 3:
                h = int(parts[0]); m = int(parts[1]); s = float(parts[2])
            elif len(parts) == 2:
                h = 0; m = int(parts[0]); s = float(parts[1])
            elif len(parts) == 1:
                h = 0; m = 0; s = float(parts[0])
            else:
                return 0.0
        except ValueError:
            return 0.0
        return days * 86400.0 + h * 3600.0 + m * 60.0 + s


# -----------------------------
# sacct access -> dict rows
# -----------------------------
class ISchema(ABC):
    @abstractmethod
    def format_arg(self):
        pass

    def parse_line(self, line):
        pass


class ISacctClientBase(ABC):
    def __init__(self, sacct_path, logger, schema:ISchema, days_ago=90,
                 ssh_host=None, ssh_user=None):
        self.sacct_path = sacct_path
        self.log = logger
        self.schema = schema
        self.days_ago = int(days_ago)
        self.ssh_host = ssh_host
        self.ssh_user = ssh_user

    @staticmethod
    def _parse_endtime(endtime):
        """
        endtime:
          - "now"
          - None
          - "YYYY-MM-DD"
          - "YYYY-MM-DDTHH:MM"
        """
        if endtime is None or endtime == "now":
            return datetime.now()

        # 日付だけ
        try:
            return datetime.strptime(endtime, "%Y-%m-%d")
        except ValueError:
            pass

        # 日時分まで
        try:
            return datetime.strptime(endtime, "%Y-%m-%dT%H:%M")
        except ValueError:
            raise ValueError("Unsupported endtime format: %s" % endtime)

    def calc_range(self, endtime=None):
        """
        return (start_str, end_str) in "YYYY-MM-DDTHH:MM"
        days_ago:
          0 => 指定日の 00:00 - 23:59:59(= 1日分)
          N => 指定日を含めて N+1日分(例: N=1 なら前日+当日の2日分)
        """
        end_dt = self._parse_endtime(endtime)

        # 指定日の範囲に丸める
        day_start = end_dt.replace(hour=0, minute=0, second=0, microsecond=0)
        day_end = end_dt.replace(hour=23, minute=59, second=59, microsecond=0)

        if self.days_ago == 0:
            start_dt = day_start
            end_dt2 = day_end
        else:
            # : days_ago=1 なら前日0時から
            start_dt = day_start - timedelta(days=self.days_ago)
            end_dt2 = day_end

        return (
            start_dt.strftime("%Y-%m-%dT%H:%M"),
            end_dt2.strftime("%Y-%m-%dT%H:%M"),
        )

    @abstractmethod
    def fetch_rows(self, endtime="now"):
        pass


class IDatasetBuilderBase(ABC):
    def __init__(self, logger):
        self.log = logger

    @abstractmethod
    def build(self, rows):
        pass


class SacctSchema(ISchema):
    def __init__(self, fields=None):
        self.FIELDS = fields or [
            "JobID",
            "Group",
            "Elapsed",
            "CPUTime",
            "TotalCPU",
            "UserCPU",
            "SystemCPU",
            "State",
            "SubmitLine",
            "JobName",
            "User",
            "Partition",
            "NCPUS",
            "AllocTRES",
            "NodeList",
            "Start",
            "End",
        ]

    def format_arg(self):
        return ",".join(self.FIELDS)

    def parse_line(self, line):
        cols = line.split("|")
        if len(cols) < len(self.FIELDS):
            cols = cols + [""] * (len(self.FIELDS) - len(cols))
        row = {}
        for i, k in enumerate(self.FIELDS):
            row[k] = cols[i] if i < len(cols) else ""
        return row


class SacctClient(ISacctClientBase):
    def fetch_rows(self, endtime="now"):
        # starttime = self.calc_starttime(endtime)
        starttime, endtime2 = self.calc_range(endtime=endtime)
        self.log.debug({"starttime": starttime, "endtime2": endtime2})
        cmd = []
        if self.ssh_host:
            user_at = "{}@{}".format(self.ssh_user, self.ssh_host) if self.ssh_user else self.ssh_host
            cmd += ["ssh", user_at]

        cmd += [
            self.sacct_path,
            "--starttime", starttime,
            "--endtime", endtime2,
            "--format", self.schema.format_arg(),
            "--parsable2",
            "-n",
        ]

        self.log.debug("Running sacct: %s", " ".join(cmd))
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        if p.returncode != 0:
            self.log.info("sacct failed rc=%s stderr=%s", p.returncode, p.stderr.strip())
            raise RuntimeError("sacct failed")
        raw_lines = [ln for ln in p.stdout.splitlines() if ln.strip()]

        # debug: raw all
        if self.log.isEnabledFor(logging.DEBUG):
            self.log.debug("Query range: %s -> %s", starttime, endtime)
            self.log.debug("RAW_ALL_BEGIN total=%d", len(raw_lines))
            for ln in raw_lines:
                self.log.debug("RAW|%s", ln)
            self.log.debug("RAW_ALL_END")

        return [self.schema.parse_line(ln) for ln in raw_lines]


class SacctParserEnd(object):
    """sacctのEnd/Startをパースして、指定日の範囲内か判定するユーティリティクラス(必要に応じて拡張)"""
    @staticmethod
    def end_in_range(row, day_start, day_end, log=None):
        def parse_slurm_dt(s):
            # sacct  End/Start: "YYYY-MM-DDTHH:MM:SS"
            return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S")

        end_s = (row.get("End") or "").strip()
        if not end_s:
            return False
        try:
            end_dt = parse_slurm_dt(end_s)
        except ValueError:
            if log:
                log.info("Bad End format: End=%r JobID=%r", end_s, row.get("JobID"))
            return False
        return day_start <= end_dt <= day_end


class DatasetBuilder(IDatasetBuilderBase):
    STEP_RE = re.compile(r"^(\d+)\.(.+)$")
    JOB_RE = re.compile(r"^\d+$")

    def build(self, rows):
        ds = {"parents": {}, "steps": {}}
        for r in rows:
            jid = (r.get("JobID") or "").strip()
            if not jid:
                continue
            m = self.STEP_RE.match(jid)
            if m:
                parent = m.group(1)
                ds["steps"].setdefault(parent, []).append(r)
                continue
            if self.JOB_RE.match(jid):
                ds["parents"][jid] = r
                continue
        for jid in ds["parents"]:
            ds["steps"].setdefault(jid, [])

        self.log.debug("Dataset built: parents=%d step_parents=%d", len(ds["parents"]), len(ds["steps"]))
        return ds


# -----------------------------
# Template Method: TimeCalculator
# -----------------------------
class KeywordClassifier(object):
    def __init__(self, keywords):
        self.keywords = [k.lower() for k in keywords]

    def matches(self, text):
        if not text:
            return False
        s = text.lower()
        return any(k in s for k in self.keywords)


class CpuStepSummer(object):
    """単純責務:stepのCPUを合算する(ここを差し替えるのも簡単)"""
    @staticmethod
    def sum_steps(step_rows):
        total = 0.0
        user = 0.0
        sysc = 0.0
        for r in step_rows:
            total += SlurmTime.to_seconds(r.get("TotalCPU", ""))
            user += SlurmTime.to_seconds(r.get("UserCPU", ""))
            sysc += SlurmTime.to_seconds(r.get("SystemCPU", ""))
        return {"TotalCPU_s": total, "UserCPU_s": user, "SystemCPU_s": sysc}


class ICalculatorBase(ABC):
    """
        Template Method:
          calculate(jobid, parent_row, step_rows, ctx) -> (final_row_dict, trace_dict)
          context: TotalCPU policy, Interactive classification, Billing mode, etc.
        """
    NAME = "base"

    def __init__(self, logger):
        self.log = logger

    @abstractmethod
    def calculate(self, jobid, parent_row, step_rows):
        pass

    @abstractmethod
    def select_cpu_source(self, parent_row, step_rows):
        """
        Default: steps exist => sum steps, else parent.
        Return: string: "steps" or "parent", and cpu_sums dict if steps, or parent cpu fields if parent.
        """
        pass

    @abstractmethod
    def compute_raw(self, jobid, parent_row, cpu_sums):
        pass

    @abstractmethod
    def build_final_row(self, parent_row, cpu_sums):
        pass


class TimeCalculator(ICalculatorBase):
    NAME = "execute_time_total"
    def __init__(self, logger):
        super().__init__(logger)
        self.tools = SlurmTime()
        # shared context (inject)
        self.ctx = {
            "interactive_classifier": KeywordClassifier(Config.INTERACTIVE_KEYWORDS),
            "gpu_classifier": KeywordClassifier(Config.GPUS_KEYWORD),
            "cpu_summer": CpuStepSummer(),
        }

    @staticmethod
    def parse_gpu_count(alloc_tres):
        if not alloc_tres:
            return 0
        alloc_tres = alloc_tres.strip().lower()
        gpu_count = 0
        for part in alloc_tres.split(","):
            if part.startswith("gpu=") or part.startswith("gres/gpu="):
                try:
                    gpu_count += int(part.split("=", 1)[1])
                except ValueError:
                    pass
        return gpu_count

    @staticmethod
    def uid_to_user(uid):
        # : "1001(jdoe)" -> "jdoe"
        if not uid:
            return ""
        return uid

    def calculate(self, jobid, parent_row, step_rows):
        cpu_source, cpu_sums = self.select_cpu_source(parent_row, step_rows)
        raw_seconds, metric, chosen_class, ngpus, reason = self.compute_raw(jobid, parent_row, cpu_sums)
        final_row = self.build_final_row(
            parent_row=parent_row,
            cpu_sums=cpu_sums,
            raw=raw_seconds,
            bill_mode=metric,
            chosen_class=chosen_class,
            ngpus=ngpus,
            reason=reason,
        )
        return final_row

    def select_cpu_source(self, parent_row, step_rows):
        """
        Default: steps exist => sum steps, else parent.
        """
        if step_rows:
            cpu_sums = self.ctx["cpu_summer"].sum_steps(step_rows)
            return "steps", cpu_sums

        cpu_sums = {
            "TotalCPU_s": self.tools.to_seconds(parent_row.get("TotalCPU", "")),
            "UserCPU_s": self.tools.to_seconds(parent_row.get("UserCPU", "")),
            "SystemCPU_s": self.tools.to_seconds(parent_row.get("SystemCPU", "")),
        }
        return "parent", cpu_sums

    def compute_raw(self, jobid, parent_row, cpu_sums):
        # Classification by SubmitLine and AllocTRES
        submit = (parent_row.get("SubmitLine") or "").strip()
        interactive = bool(self.ctx["interactive_classifier"].matches(submit))

        alloc_tres = (parent_row.get("AllocTRES") or "").strip()
        gpu_job = bool(self.ctx["gpu_classifier"].matches(alloc_tres))

        # debug print
        self.log.debug({"alloc_tres": alloc_tres, "gpu_job": gpu_job, "interactive": interactive})

        # num of CPUs/GPUs to consider for elapsed-based billing (: 2 GPUsなら実時間の2倍にする)
        elapsed_counter = {
            "ncpus": int(parent_row.get("NCPUS") or 1),
            "ngpus": self.parse_gpu_count(alloc_tres)  # gres/gpu=2  2
        }

        # 候補値
        candidates = {
            "totalcpu": float(cpu_sums.get("TotalCPU_s") or 0.0),  # step優先で作った値
            "elapsed": self.tools.to_seconds(parent_row.get("Elapsed", "")),  # 将来必要なら
        }

        # どのクラスに該当したか
        classes = {
            "interactive": interactive,
            "gpu": gpu_job,
            "default": True,
        }

        # 優先順位に従って最初に該当したクラスを採用
        chosen_class = None
        for c in Config.CLASSIFY_PRIORITY:
            if classes.get(c):
                chosen_class = c
                break

        metric = Config.BILL_METRIC_BY_CLASS.get(
            chosen_class, "totalcpu")

        if metric not in candidates:
            # 設定ミスを早期に検知課金系はフェイルファスト推奨
            raise ValueError("Unknown metric '{}' for class '{}'".format(metric, chosen_class))

        raw_seconds = candidates[metric]
        if metric == "elapsed":
            # Elapsedは実時間なのでCPU数もしくはGPU数で倍率が必要
            # raw_seconds(Billing_raw) = Elapsed * NCPUS or Elapsed * NGPUS
            # ポリシーGPUが割り当てられているジョブは常にGPU数で積算interactiveでも同じ
            mult = elapsed_counter["ngpus"] if elapsed_counter["ngpus"] > 0 else elapsed_counter["ncpus"]
            raw_seconds *= max(1, mult)

        # trace ログに出すならここを返す/保存
        reason = "class={} metric={} interactive={} gpu={}".format(
            chosen_class, metric, interactive, gpu_job
        )

        return raw_seconds, metric, chosen_class, elapsed_counter['ngpus'], reason

    def build_final_row(
            self, parent_row, cpu_sums, raw=None,
            bill_mode=None, chosen_class=None, ngpus=None, reason=None):
        # dict dataset: keep meta + seconds + decision
        final = {}
        fields = [
            "JobID",
            "User",
            "Group",
            "JobName",
            "Partition",
            "NCPUS",
            "NodeList",
            "AllocTRES",
            "Elapsed",
            "CPUTime",
            "TotalCPU",
            "Start",
            "End",
            "State",
            "SubmitLine"]
        for f in fields:
            if f == "User":
                final[f] = self.uid_to_user(parent_row.get(f, ""))
            else:
                final[f] = parent_row.get(f, "")
        final.update(cpu_sums)
        final.update({
            "Elapsed_s": self.tools.to_seconds(parent_row.get("Elapsed", "")),
            "CPUTime_s": self.tools.to_seconds(parent_row.get("CPUTime", "")),
            "NGPUs": ngpus,
            "BillMode": bill_mode,
            "BillSeconds_raw": raw,
            "chosen_class": chosen_class,
            "DecisionNote": reason,
        })
        return final


# -----------------------------
# Orchestrator (pipeline)
# -----------------------------
class BillingEngine(object):
    def __init__(self, logger, calculator:ICalculatorBase):
        self.log = logger
        self.calculator = calculator

    def process(self, dataset):
        final = {}
        for jobid, parent_row in dataset["parents"].items():
            step_rows = dataset["steps"].get(jobid, [])
            final_row = self.calculator.calculate(jobid, parent_row, step_rows)
            final[jobid] = final_row

        # dataset["final"] = final
        return final


# -----------------------------
# Reporter (minimal)
# -----------------------------
class IReporterBase(ABC):
    def __init__(self, logger):
        self.log = logger

    @abstractmethod
    def print_table(self, final_map):
        pass


class BillReporter(IReporterBase):
    def date_formatter(self, day):
        # 2026.02.25 add function
        # "2026-02-23T14:08:29" -> "2026/02/23 14:08:29"
        """tuple/list/string どれでも「T」をスペースに変換"""
        self.log.debug("date_formatter: day=%s", day)
        if not day:
            return ""

        # tuple/list なら最初の要素を取り出す
        if isinstance(day, (tuple, list)):
            day = day[0]

        # 文字列に変換してから置換
        day = str(day).replace("T", " ")
        day = str(day).replace("-", "/")
        self.log.debug("date_formatter: day=%s", day)
        return day

    def print_table(self, final_map):
        for jid in sorted(final_map.keys(), key=lambda x: int(x)):
            r = final_map[jid]

            partition = r.get("Partition", "")
            nums = int(r.get("NGPUs", ""))
            elapsed = r.get("Elapsed_s", 0.0)
            bill_seconds = r.get("BillSeconds_raw", 0.0)
            # 2026.02.25 add formatting
            start = r.get("Start", ""),  # Start
            end = r.get("End", ""),  # End
            starttime = self.date_formatter(start)
            endtime = self.date_formatter(end)

            sm = int(Config.GPU_SM_TABLE[partition]) if partition in Config.GPU_SM_TABLE else 0

            self.log.debug({"Partition": partition})
            self.log.debug({'NGPUs': nums})
            self.log.debug({"Elapsed": elapsed})
            self.log.debug({'BillSeconds_raw': bill_seconds})
            self.log.debug({"GPU SM": sm})

            if nums == 0:
                # CPU
                nums = r.get("NCPUS") or 0  # NCPUS
                nums = int(nums)
                self.log.debug({'NCPUS': nums})
            else:
                # GPU
                # GPUジョブのプロセス数はSMを積算する: 2 GPU x 132 SM = 264 NCPUS相当
                nums *= sm
                bill_seconds *= sm
                self.log.debug({'GPU processes(GPUs * SM)': nums})# Partに応じたGPUあたりのNCPUS換算
                self.log.debug({'GPU bill seconds(Elapsed * NGPUs * SM)': bill_seconds})

            if bill_seconds > 0:
                efc = 1.0
            else:
                efc = 0.0

            row = [
                partition,  # Part
                (r.get("User", "") or ""),  # User
                (r.get("Group", "") or ""),  # User
                starttime,  # Start
                endtime,  # End
                str(nums),  # NCPUS or GPU換算NCPUS
                bill_seconds,  # Bill(raw)
                efc,
            ]
            print(",".join(str(x) for x in row))


class DebugReporter(IReporterBase):
    def print_table(self, final_map):
        header = [
            "JobID",
            "User",
            "Group",
            "JobName",
            "Part",
            "NCPUS",
            "NGPUS",
            # "Start",
            "End",
            "Elapsed",
            # "CPUTime",
            "TotalCPU",
            "Elapsed(s)",
            "CPUTime(s)",
            "TotalCPU(s)",
            "BillMode",
            "Type",
            "Bill(raw)",
        ]
        fmt = (
            "{:<6} "  # JobID
            "{:<7} "  # User
            "{:<7} "  # Group
            "{:<6} "  # JobName
            "{:<8} "  # Part
            "{:>5} "  # NCPUS
            "{:>5} "  # NGPUS
            # "{:<19} "  # Start
            "{:<19} "  # End
            "{:>10} "  # Elapsed
            # "{:>10} "  # CPUTime
            "{:>10} "  # TotalCPU
            "{:>11} "  # Elapsed(s)
            "{:>11} "  # CPUTime(s)
            "{:>12} "  # TotalCPU(s)
            "{:<10} "  # BillMode
            "{:<11} "  # Type
            "{:>12}"  # Bill(raw)
        )
        print(fmt.format(*header))

        # widths = [8, 10, 12, 8, 6, 6, 19, 19, 10, 10, 10, 11, 11, 12, 10, 12]
        widths = [6, 7, 7, 6, 8, 5, 5, 19, 10, 10, 11, 11, 12, 10, 11, 12]
        sep = 1  # 各列の後ろスペース
        total_width = sum(widths) + sep * (len(widths) - 1)

        print("-" * total_width)

        for jid in sorted(final_map.keys(), key=lambda x: int(x)):
            r = final_map[jid]
            row = [
                str(r.get("JobID", jid)),  # JobID
                (r.get("User", "") or "")[:10],  # User
                (r.get("Group", "") or "")[:10],  # User
                (r.get("JobName", "") or "")[:12],  # JobName
                (r.get("Partition", "") or "")[:8],  # Part
                str(r.get("NCPUS", "")),  # NCPUS
                str(r.get("NGPUs", "")),  # NGPUS
                # r.get("Start", ""),  # Start
                r.get("End", ""),  # End
                r.get("Elapsed", ""),  # Elapsed
                # r.get("CPUTime", ""),  # CPUTime
                r.get("TotalCPU", ""),  # TotalCPU
                "{:.1f}".format(r.get("Elapsed_s", 0.0)),  # Elapsed(s)
                "{:.1f}".format(r.get("CPUTime_s", 0.0)),  # CPUTime(s)
                "{:.3f}".format(r.get("TotalCPU_s", 0.0)),  # TotalCPU(s)
                r.get("BillMode", ""),  # BillMode
                r.get("chosen_class", ""), # Type
                "{:.3f}".format(r.get("BillSeconds_raw", 0.0)),  # Bill(raw)
            ]
            print(fmt.format(*row))


class App(object):
    def __init__(self, target_day=None, gpu_only=False):
        self.rows = None
        self.rows_end = None
        self.dataset = None
        self.bull_datasets = None
        self.logger = setup_logger(Config.log_level)
        self.target_day = target_day
        self.gpu_only = gpu_only

    @staticmethod
    def filter_dataset_by_cluster(ds, gpu_only=False):
        """
        ds: {"parents": {jid: row}, "steps": {jid: [step_rows]}}
        gpu_only:
          True  -> GPU partitions only
          False -> CPU partitions only
        """
        out = {"parents": {}, "steps": {}}

        for jid, parent_row in ds.get("parents", {}).items():
            part = parent_row.get("Partition", "")
            is_gpu = (part or "").strip() in Config.GPU_SM_TABLE

            if gpu_only and not is_gpu:
                continue
            if (not gpu_only) and is_gpu:
                continue

            out["parents"][jid] = parent_row
            out["steps"][jid] = ds.get("steps", {}).get(jid, [])
        return out

    def run(self):
        day_start = datetime.strptime(self.target_day, "%Y-%m-%d").replace(hour=0, minute=0, second=0, microsecond=0)
        day_end = datetime.strptime(self.target_day, "%Y-%m-%d").replace(hour=23, minute=59, second=59, microsecond=0)

        cpu_schema = SacctSchema()
        client = SacctClient(
            sacct_path=Config.SACCT_PATH,
            logger=self.logger,
            schema=cpu_schema,
            days_ago=Config.DEFAULT_SPAN,
            ssh_host=Config.SSH_HOST,
            ssh_user=Config.SSH_USER,)
        calculator = TimeCalculator(logger=self.logger)
        engine = BillingEngine(logger=self.logger, calculator=calculator)

        self.rows = client.fetch_rows(endtime=self.target_day)
        # まず全件で dataset を作る親とstepを確保
        ds_all = DatasetBuilder(logger=self.logger).build(self.rows)

        # 親だけ End でフィルタして課金対象の JobID を決める
        target_ids = []
        for jid, parent_row in ds_all["parents"].items():
            if SacctParserEnd.end_in_range(parent_row, day_start, day_end, log=self.logger):
                target_ids.append(jid)

        # 対象 JobID の親stepだけ残した dataset を作る
        ds = {"parents": {}, "steps": {}}
        for jid in target_ids:
            if jid in ds_all["parents"]:
                ds["parents"][jid] = ds_all["parents"][jid]
            # stepは End を見ずに丸ごと同梱無ければ空配列
            ds["steps"][jid] = ds_all["steps"].get(jid, [])

        self.dataset = ds
        # デバッグ表示対象ジョブだけ
        self.logger.debug(json.dumps(self.dataset, indent=2, ensure_ascii=False))
        # クラスタ別フィルタ: self.gpu_only  --gpu  True
        ds2 = self.filter_dataset_by_cluster(self.dataset, gpu_only=self.gpu_only)
        self.logger.debug(
            "Cluster filter gpu_only=%s: parents=%d",
            self.gpu_only, len(ds2["parents"])
        )
        self.bull_datasets = engine.process(ds2)

    def info_print(self):
        print('')
        self.logger.info(json.dumps(self.bull_datasets, indent=2, ensure_ascii=False))
        reporter = DebugReporter(logger=self.logger)
        reporter.print_table(self.bull_datasets)

    def print(self):
        reporter = BillReporter(logger=self.logger)
        reporter.print_table(self.bull_datasets)


# -----------------------------
# CLI / main
# -----------------------------
def parse_args(argv):
    # today = datetime.now().strftime("%Y-%m-%d")
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    target = yesterday
    log_level = logging.WARNING
    debug = False
    info = False
    gpu_only = False

    i = 1
    while i < len(argv):
        a = argv[i]
        if a == "--debug":
            log_level = logging.DEBUG
            debug = True
            i += 1
            continue
        if a == "--info":
            log_level = logging.INFO
            info = True
            i += 1
            continue
        if a == "--gpu":
            gpu_only = True
            i += 1
            continue

        # 日付変換20260223形式のみ許可
        if len(a) == 8 and a.isdigit():
            try:
                target = datetime.strptime(a, "%Y%m%d").strftime("%Y-%m-%d")
            except ValueError:
                print(f"[ERROR]: Invalid date '{a}'. Please specify a valid YYYYMMDD date (e.g., 20260223)")
                return False, log_level, gpu_only, debug, info
        else:
            print(f"[ERROR]: invalid date format '{a}'")
            print(f"[INFO] : Usage:YYYYMMDD (ex: 20260223) or --debug or --info or --gpu")
            return False, log_level, gpu_only, debug, info

        i += 1
    return target, log_level, gpu_only, debug, info


def main(argv=None):
    target, log_level, gpu_only, debug, info = parse_args(argv)
    if target is False:
        print("[ERROR]: Failed to parse arguments")
        return
    Config.log_level = log_level
    app = App(target_day=target, gpu_only=gpu_only)
    app.run()
    if info:
        app.info_print()
        print('')
        print('--- Billing results (CSV) ---')
    #results
    app.print()

if __name__ == "__main__":
    main(sys.argv)

    gc.collect()

出力例(--info)

(.venv) accounting$ python cputime.py 20260223 --info 

2026-02-25 06:37:03,443 INFO slurm_billing: {
  "21": {
    "JobID": "21",
    "User": "user01",
    "Group": "user01",
    "JobName": "test1",
    "Partition": "part1",
    "NCPUS": "1",
    "NodeList": "rx8node01",
    "AllocTRES": "billing=1,cpu=1,node=1",
    "Elapsed": "00:00:00",
    "CPUTime": "00:00:00",
    "TotalCPU": "00:00:00",
    "Start": "2026-02-23T14:08:29",
    "End": "2026-02-23T14:08:29",
    "State": "FAILED",
    "SubmitLine": "sbatch cpu.sh",
    "TotalCPU_s": 0.0,
    "UserCPU_s": 0.0,
    "SystemCPU_s": 0.0,
    "Elapsed_s": 0.0,
    "CPUTime_s": 0.0,
    "NGPUs": 0,
    "BillMode": "elapsed",
    "BillSeconds_raw": 0.0,
    "chosen_class": "default",
    "DecisionNote": "class=default metric=elapsed interactive=False gpu=False"
  },
  "22": {
    "JobID": "22",
    "User": "user01",
    "Group": "user01",
    "JobName": "test1",
    "Partition": "part1",
    "NCPUS": "1",
    "NodeList": "rx8node01",
    "AllocTRES": "billing=1,cpu=1,node=1",
    "Elapsed": "00:00:00",
    "CPUTime": "00:00:00",
    "TotalCPU": "00:00:00",
    "Start": "2026-02-23T14:10:19",
    "End": "2026-02-23T14:10:19",
    "State": "FAILED",
    "SubmitLine": "sbatch cpu.sh",
    "TotalCPU_s": 0.0,
    "UserCPU_s": 0.0,
    "SystemCPU_s": 0.0,
    "Elapsed_s": 0.0,
    "CPUTime_s": 0.0,
    "NGPUs": 0,
    "BillMode": "elapsed",
    "BillSeconds_raw": 0.0,
    "chosen_class": "default",
    "DecisionNote": "class=default metric=elapsed interactive=False gpu=False"
  },
  "23": {
    "JobID": "23",
    "User": "user01",
    "Group": "user01",
    "JobName": "test1",
    "Partition": "part1",
    "NCPUS": "1",
    "NodeList": "rx8node01",
    "AllocTRES": "billing=1,cpu=1,node=1",
    "Elapsed": "00:02:01",
    "CPUTime": "00:02:01",
    "TotalCPU": "00:56.601",
    "Start": "2026-02-23T14:13:47",
    "End": "2026-02-23T14:15:48",
    "State": "COMPLETED",
    "SubmitLine": "sbatch cpu.sh",
    "TotalCPU_s": 56.601,
    "UserCPU_s": 56.428,
    "SystemCPU_s": 0.172,
    "Elapsed_s": 121.0,
    "CPUTime_s": 121.0,
    "NGPUs": 0,
    "BillMode": "elapsed",
    "BillSeconds_raw": 121.0,
    "chosen_class": "default",
    "DecisionNote": "class=default metric=elapsed interactive=False gpu=False"
  },
  "24": {
    "JobID": "24",
    "User": "user01",
    "Group": "user01",
    "JobName": "ash",
    "Partition": "part1",
    "NCPUS": "1",
    "NodeList": "rx8node01",
    "AllocTRES": "billing=1,cpu=1,node=1",
    "Elapsed": "00:00:00",
    "CPUTime": "00:00:00",
    "TotalCPU": "00:00.010",
    "Start": "2026-02-23T14:19:02",
    "End": "2026-02-23T14:19:02",
    "State": "FAILED",
    "SubmitLine": "srun -p part1 --pty ash",
    "TotalCPU_s": 0.01,
    "UserCPU_s": 0.01,
    "SystemCPU_s": 0.0,
    "Elapsed_s": 0.0,
    "CPUTime_s": 0.0,
    "NGPUs": 0,
    "BillMode": "elapsed",
    "BillSeconds_raw": 0.0,
    "chosen_class": "interactive",
    "DecisionNote": "class=interactive metric=elapsed interactive=True gpu=False"
  },
  "25": {
    "JobID": "25",
    "User": "user01",
    "Group": "user01",
    "JobName": "bash",
    "Partition": "part1",
    "NCPUS": "1",
    "NodeList": "rx8node01",
    "AllocTRES": "billing=1,cpu=1,node=1",
    "Elapsed": "00:03:23",
    "CPUTime": "00:03:23",
    "TotalCPU": "00:57.402",
    "Start": "2026-02-23T14:19:05",
    "End": "2026-02-23T14:22:28",
    "State": "COMPLETED",
    "SubmitLine": "srun -p part1 --pty bash",
    "TotalCPU_s": 57.402,
    "UserCPU_s": 56.587,
    "SystemCPU_s": 0.815,
    "Elapsed_s": 203.0,
    "CPUTime_s": 203.0,
    "NGPUs": 0,
    "BillMode": "elapsed",
    "BillSeconds_raw": 203.0,
    "chosen_class": "interactive",
    "DecisionNote": "class=interactive metric=elapsed interactive=True gpu=False"
  }
}
JobID  User    Group   JobName Part     NCPUS NGPUS End                    Elapsed   TotalCPU  Elapsed(s)  CPUTime(s)  TotalCPU(s) BillMode   Type           Bill(raw)
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
21     user01  user01  test1  part1        1     0 2026-02-23T14:08:29   00:00:00   00:00:00         0.0         0.0        0.000 elapsed    default            0.000
22     user01  user01  test1  part1        1     0 2026-02-23T14:10:19   00:00:00   00:00:00         0.0         0.0        0.000 elapsed    default            0.000
23     user01  user01  test1  part1        1     0 2026-02-23T14:15:48   00:02:01  00:56.601       121.0       121.0       56.601 elapsed    default          121.000
24     user01  user01  ash    part1        1     0 2026-02-23T14:19:02   00:00:00  00:00.010         0.0         0.0        0.010 elapsed    interactive        0.000
25     user01  user01  bash   part1        1     0 2026-02-23T14:22:28   00:03:23  00:57.402       203.0       203.0       57.402 elapsed    interactive      203.000

--- Billing results (CSV) ---
part1,user01,user01,2026/02/23 14:08:29,2026/02/23 14:08:29,1,0.0,0.0
part1,user01,user01,2026/02/23 14:10:19,2026/02/23 14:10:19,1,0.0,0.0
part1,user01,user01,2026/02/23 14:13:47,2026/02/23 14:15:48,1,121.0,1.0
part1,user01,user01,2026/02/23 14:19:02,2026/02/23 14:19:02,1,0.0,0.0
part1,user01,user01,2026/02/23 14:19:05,2026/02/23 14:22:28,1,203.0,1.0