KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

ARPテーブルの見える化(Streamlit)

登録日 :2026/07/02 04:58
カテゴリ :python データ分析

以下のSSHログコレクターの一種
https://nonoco.believeriver.site/detail_mark/596/

フォルダ構成

project/
├── streamlit_app.py                ← ★ 必要(Streamlitのエントリーポイント)
├── config.yaml           ✅
├── config.py             ✅
├── pages/
│   └── arp_table.py      ✅
└── snmp/
    ├── __init__.py       ← ★ 必要(パッケージ化)
    └── arp_collector.py  ✅

プログラムの実行は

streamlit run streamlit_app.py

パッケージ

インストールするパッケージ

pip install sqlalchemy
pip install "pysnmp==4.4.12" "pyasn1==0.4.8"
pip install streamlit-authenticator pyyaml
pip install streamlit

プログラム一式

config.yaml

credentials:
  usernames:
    admin:
      name: admin
      # パスワード生成: python -c "import streamlit_authenticator as sa; print(sa.Hasher(['password']).generate())"
      password: "admin"
      role: admin           # admin / viewer
    viewer:
      name: 閲覧ユーザー01
      password: "$2b$12$YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY"
      role: viewer

cookie:
  name: switch_monitor_auth
  key: "change_this_to_a_random_secret_string_32chars"   # 必ず変更すること
  expiry_days: 1

# ロール別アクセス制御
roles:
  admin:
    can_refresh: true      # 再取得ボタンの表示
    can_download: true     # CSVダウンロードの表示
  viewer:
    can_refresh: false
    can_download: false

streamlit_app.py

"""
ネットワーク監視ツール - メインページ
"""

import streamlit as st

st.set_page_config(
    page_title="ネットワーク監視",
    page_icon="🖥️",
    layout="wide",
    initial_sidebar_state="expanded",
)



# ---------------------------------------------------------------------------
# VLAN 定義(実環境に合わせて編集してください)
# ---------------------------------------------------------------------------

VLAN_LIST = [
    {"VLAN ID": 10,  "ネットワーク名": "業務系 A棟",       "用途": "一般業務PC・プリンター",         "帯域": "1 Gbps"},
    {"VLAN ID": 20,  "ネットワーク名": "業務系 B棟",       "用途": "一般業務PC・プリンター",         "帯域": "1 Gbps"},
    {"VLAN ID": 30,  "ネットワーク名": "研究系",           "用途": "ワークステーション・計算サーバー", "帯域": "10 Gbps"},
    {"VLAN ID": 40,  "ネットワーク名": "管理系",           "用途": "ネットワーク機器・サーバー管理",  "帯域": "1 Gbps"},
    {"VLAN ID": 100, "ネットワーク名": "サーバーセグメント","用途": "ファイルサーバー・認証サーバー",  "帯域": "10 Gbps"},
]


# ---------------------------------------------------------------------------
# ページ本体
# ---------------------------------------------------------------------------

st.title("🖥️ ネットワーク監視ツール")
st.caption("社内ネットワークの接続状況を確認するためのツールです。")

st.divider()

# ---- 使い方 ----
st.subheader("📖 使い方")

col1, col2, col3 = st.columns(3)

with col1:
    st.info(
        "**① 左メニューを選択**\n\n"
        "画面左のサイドバーから\n"
        "確認したい機能を選んでください。"
    )
with col2:
    st.info(
        "**② フィルタで絞り込み**\n\n"
        "VLANやIPアドレスで絞り込むと\n"
        "目的の機器をすばやく見つけられます。"
    )
with col3:
    st.info(
        "**③ CSVで書き出し**\n\n"
        "「CSVダウンロード」ボタンから\n"
        "Excelで開ける形式で保存できます。"
    )

st.divider()

# ---- 機能一覧 ----
st.subheader("🗂️ 機能一覧")

st.markdown("""
| ページ | 内容 | 更新タイミング |
|--------|------|--------------|
| 🔍 ARP確認 | 各VLANに接続中の機器(IPアドレス・MACアドレス)を一覧表示 | ページを開いたとき(60秒キャッシュ) |
""")

st.divider()

# ---- VLAN 一覧 ----
st.subheader("🌐 VLANとネットワーク構成")
st.caption("社内ネットワークは用途ごとに以下のVLANに分かれています。")

import pandas as pd
df_vlan = pd.DataFrame(VLAN_LIST)

st.dataframe(
    df_vlan,
    use_container_width=True,
    hide_index=True,
    column_config={
        "VLAN ID": st.column_config.NumberColumn("VLAN ID", format="%d", width="small"),
        "ネットワーク名": st.column_config.TextColumn("ネットワーク名", width="medium"),
        "用途":          st.column_config.TextColumn("用途",          width="large"),
        "帯域":          st.column_config.TextColumn("帯域",          width="small"),
    },
)

st.divider()

# ---- 注意事項 ----
st.subheader("⚠️ ご利用にあたって")

st.warning(
    "- このツールはネットワーク機器への読み取り専用アクセスを行います。設定変更はできません。\n"
    "- 表示される情報は取得時点のものです。リアルタイムの変化は「再取得」ボタンで更新してください。\n"
    "- 不明な点はネットワーク管理者(情報システム担当)までお問い合わせください。"
)

# ---- フッター ----
st.divider()
st.caption("Network Monitor v1.0 / 管理: 情報システム部門")

snmp/arp_collector.py

"""
SNMP経由でコアスイッチからARPテーブルを取得するモジュール。
ipNetToMediaTable (1.3.6.1.2.1.4.22) を使用。
"""

from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Optional

from pysnmp.hlapi import (
    CommunityData, ContextData, ObjectIdentity, ObjectType,
    SnmpEngine, UdpTransportTarget, nextCmd,
)


# ---------------------------------------------------------------------------
# データクラス
# ---------------------------------------------------------------------------

@dataclass
class ArpEntry:
    switch_label: str       # 表示名(設定ファイル由来)
    switch_host:  str       # IPまたはホスト名
    vlan_id:      Optional[int]
    interface:    str       # "Vlan100" など
    ip_address:   str
    mac_address:  str       # "xx:xx:xx:xx:xx:xx"


# ---------------------------------------------------------------------------
# 内部ユーティリティ
# ---------------------------------------------------------------------------

def _mac_to_str(mac_val) -> str:
    """pysnmp の OctetString を "xx:xx:xx:xx:xx:xx" 形式に変換"""
    try:
        raw = bytes(mac_val)
        if len(raw) == 6:
            return ':'.join(f'{b:02x}' for b in raw)
    except Exception:
        pass
    return str(mac_val)


def _snmp_walk(host: str, community: str, oid: str, port: int) -> dict[str, object]:
    """指定OID以下をwalkしてdict {oid_str: value} で返す"""
    result = {}
    for (err_indication, err_status, _, var_binds) in nextCmd(
        SnmpEngine(),
        CommunityData(community, mpModel=1),          # SNMPv2c
        UdpTransportTarget((host, port), timeout=3, retries=1),
        ContextData(),
        ObjectType(ObjectIdentity(oid)),
        lexicographicMode=False,
    ):
        if err_indication or err_status:
            raise RuntimeError(
                err_indication or err_status.prettyPrint()
            )
        for var_bind in var_binds:
            result[str(var_bind[0])] = var_bind[1]
    return result


# ---------------------------------------------------------------------------
# 公開API
# ---------------------------------------------------------------------------

def get_arp_entries(
    host: str,
    community: str,
    label: str = "",
    port: int = 161,
) -> list[ArpEntry]:
    """
    1台のスイッチからARPエントリ一覧を取得する。

    使用OID:
      1.3.6.1.2.1.4.22.1.2  ipNetToMediaPhysAddress
        └── suffix: {ifIndex}.{a.b.c.d}
      1.3.6.1.2.1.2.2.1.2   ifDescr
        └── suffix: {ifIndex}
    """
    OID_MAC    = "1.3.6.1.2.1.4.22.1.2"
    OID_IFDESC = "1.3.6.1.2.1.2.2.1.2"

    # ifIndex -> インターフェース名
    if_descr_raw = _snmp_walk(host, community, OID_IFDESC, port)
    if_map: dict[str, str] = {}
    for oid_str, val in if_descr_raw.items():
        idx = oid_str.rsplit(".", 1)[-1]
        if_map[idx] = str(val)

    # MACアドレステーブル
    mac_raw = _snmp_walk(host, community, OID_MAC, port)

    entries: list[ArpEntry] = []
    for oid_str, mac_val in mac_raw.items():
        # OID末尾: {ifIndex}.{a.b.c.d}
        suffix = oid_str[len(OID_MAC) + 1:]
        parts = suffix.split(".", 1)
        if len(parts) != 2:
            continue
        if_index, ip_addr = parts

        if_name = if_map.get(if_index, f"ifIndex={if_index}")
        m = re.search(r"[Vv]lan(\d+)", if_name)
        vlan_id = int(m.group(1)) if m else None

        entries.append(ArpEntry(
            switch_label=label or host,
            switch_host=host,
            vlan_id=vlan_id,
            interface=if_name,
            ip_address=ip_addr,
            mac_address=_mac_to_str(mac_val),
        ))

    return entries

pages/arp_table.py

"""
Streamlit ページ: ARPテーブル リアルタイム確認
- YAML ベースのユーザー認証(streamlit-authenticator)
- ロール別機能制御(admin / viewer)
- DB保存なし / st.cache_data(ttl=60) で短期キャッシュ
"""

from __future__ import annotations
import sys
from pathlib import Path

import streamlit as st
import pandas as pd
import yaml
import streamlit_authenticator as stauth
from yaml.loader import SafeLoader

# snmp モジュールをパスに追加(プロジェクト構成に合わせて調整)
sys.path.append(str(Path(__file__).resolve().parents[1]))
from snmp.arp_collector import get_arp_entries


# ---------------------------------------------------------------------------
# 定数
# ---------------------------------------------------------------------------

CONFIG_PATH = Path(__file__).resolve().parents[1] / "config.yaml"

CORE_SWITCHES = [
    {"host": "192.168.0.1", "community": "public", "label": "Core-SW1"},
    {"host": "192.168.0.2", "community": "public", "label": "Core-SW2"},
]


# ---------------------------------------------------------------------------
# 設定ファイル読み込み
# ---------------------------------------------------------------------------

@st.cache_resource
def load_config() -> dict:
    """config.yaml を読み込む(アプリ起動時1回のみ)"""
    with open(CONFIG_PATH, encoding="utf-8") as f:
        return yaml.load(f, Loader=SafeLoader)


# ---------------------------------------------------------------------------
# 認証
# ---------------------------------------------------------------------------

def setup_authenticator(config: dict) -> stauth.Authenticate:
    return stauth.Authenticate(
        credentials=config["credentials"],
        cookie_name=config["cookie"]["name"],
        cookie_key=config["cookie"]["key"],
        cookie_expiry_days=config["cookie"]["expiry_days"],
    )


def get_role(config: dict, username: str) -> str:
    """ユーザー名からロールを取得。未設定の場合は viewer を返す"""
    return (
        config["credentials"]["usernames"]
        .get(username, {})
        .get("role", "viewer")
    )


def can(config: dict, role: str, permission: str) -> bool:
    """ロール権限チェック"""
    return config["roles"].get(role, {}).get(permission, False)


# ---------------------------------------------------------------------------
# データ取得(短期キャッシュ: 60秒)
# ---------------------------------------------------------------------------

@st.cache_data(ttl=60, show_spinner=False)
def fetch_arp_dataframe() -> tuple[pd.DataFrame, list[str]]:
    """
    全コアスイッチからARPエントリを取得してDataFrameに変換。
    errors: 取得失敗スイッチのメッセージリスト。
    """
    rows: list[dict] = []
    errors: list[str] = []

    for sw in CORE_SWITCHES:
        try:
            entries = get_arp_entries(
                host=sw["host"],
                community=sw["community"],
                label=sw["label"],
            )
            for e in entries:
                rows.append({
                    "スイッチ":         e.switch_label,
                    "VLAN":             e.vlan_id,
                    "インターフェース": e.interface,
                    "IPアドレス":       e.ip_address,
                    "MACアドレス":      e.mac_address,
                })
        except Exception as ex:
            errors.append(f"⚠️ {sw['label']} ({sw['host']}): {ex}")

    df = pd.DataFrame(rows) if rows else pd.DataFrame(
        columns=["スイッチ", "VLAN", "インターフェース", "IPアドレス", "MACアドレス"]
    )
    return df, errors


# ---------------------------------------------------------------------------
# ARPテーブル本体 UI
# ---------------------------------------------------------------------------

def render_arp_page(config: dict, username: str, display_name: str):
    role = get_role(config, username)

    st.title("🔍 ARPテーブル リアルタイム確認")
    st.caption("コアスイッチからSNMPで取得(DB保存なし・キャッシュ60秒)")

    # ---- 再取得ボタン(admin のみ表示)----
    if can(config, role, "can_refresh"):
        if st.button("🔄 再取得", use_container_width=False):
            st.cache_data.clear()
            st.rerun()

    # ---- データ取得 ----
    with st.spinner("SNMPでARPテーブルを取得中..."):
        df, errors = fetch_arp_dataframe()

    # ---- エラー表示 ----
    for msg in errors:
        st.warning(msg)

    if df.empty:
        st.error("ARPエントリを取得できませんでした。")
        return

    # ---- フィルタUI ----
    with st.expander("🔎 フィルタ", expanded=True):
        f_col1, f_col2, f_col3, f_col4 = st.columns(4)

        with f_col1:
            sw_labels = sorted(df["スイッチ"].unique().tolist())
            sel_sw = st.multiselect("スイッチ", options=sw_labels)

        with f_col2:
            vlans = sorted(df["VLAN"].dropna().astype(int).unique().tolist())
            sel_vlan = st.multiselect("VLAN", options=vlans)

        with f_col3:
            ip_filter = st.text_input("IPアドレス(部分一致)", placeholder="192.168.10")

        with f_col4:
            mac_filter = st.text_input("MACアドレス(部分一致)", placeholder="aa:bb")

    # ---- フィルタ適用 ----
    filtered = df.copy()
    if sel_sw:
        filtered = filtered[filtered["スイッチ"].isin(sel_sw)]
    if sel_vlan:
        filtered = filtered[filtered["VLAN"].isin(sel_vlan)]
    if ip_filter:
        filtered = filtered[filtered["IPアドレス"].str.contains(ip_filter, na=False)]
    if mac_filter:
        filtered = filtered[
            filtered["MACアドレス"].str.contains(mac_filter, case=False, na=False)
        ]

    # ---- 集計メトリクス ----
    m1, m2, m3 = st.columns(3)
    m1.metric("総エントリ数", len(df))
    m2.metric("フィルタ後", len(filtered))
    m3.metric("取得スイッチ数", df["スイッチ"].nunique())

    # ---- テーブル表示 ----
    st.dataframe(
        filtered.sort_values(["VLAN", "IPアドレス"]),
        use_container_width=True,
        hide_index=True,
        column_config={
            "VLAN": st.column_config.NumberColumn("VLAN", format="%d"),
        },
    )

    # ---- CSVエクスポート(admin のみ表示)----
    if can(config, role, "can_download"):
        st.download_button(
            label="📥 CSV ダウンロード",
            data=filtered.to_csv(index=False).encode("utf-8-sig"),
            file_name="arp_table.csv",
            mime="text/csv",
        )


# ---------------------------------------------------------------------------
# エントリーポイント
# ---------------------------------------------------------------------------

def main():
    st.set_page_config(page_title="ARPテーブル確認", layout="wide")

    config = load_config()
    authenticator = setup_authenticator(config)

    # ---- ログインフォーム ----
    # name, auth_status, username = authenticator.login(
    #     form_name="ログイン",
    #     location="main",
    # )
    result = authenticator.login(location="main")

    if result is not None:
        name, auth_status, username = result
    else:
        name = st.session_state.get("name")
        auth_status = st.session_state.get("authentication_status")
        username = st.session_state.get("username")

    # ---- 認証結果ハンドリング ----
    if auth_status is True:
        # サイドバー: ユーザー情報 + ログアウト
        with st.sidebar:
            role = get_role(config, username)
            st.markdown(f"**👤 {name}**")
            st.caption(f"ロール: `{role}`")
            st.divider()
            authenticator.logout("ログアウト", location="sidebar")

        render_arp_page(config, username, name)

    elif auth_status is False:
        st.error("❌ ユーザー名またはパスワードが正しくありません")

    else:  # None: 未入力
        st.info("ユーザー名とパスワードを入力してください")


if __name__ == "__main__":
    main()