ARPテーブルの見える化(Streamlit)
| 登録日 | :2026/07/02 04:58 |
|---|---|
| カテゴリ | :python データ分析 |
以下のSSHログコレクターの一種
https://nonoco.believeriver.site/detail_mark/596/
フォルダ構成
project/
├── streamlit_app.py ← ★ 必要(Streamlitのエントリーポイント)
├── config.yaml ✅
├── config.py ✅
├── pages/
│ └── arp_table.py ✅
└── snmp/
├── __init__.py ← ★ 必要(パッケージ化)
└── arp_collector.py ✅
プログラムの実行は
streamlit run streamlit_app.py
パッケージ
インストールするパッケージ
pip install sqlalchemy
pip install "pysnmp==4.4.12" "pyasn1==0.4.8"
pip install streamlit-authenticator pyyaml
pip install streamlit
プログラム一式
config.yaml
credentials:
usernames:
admin:
name: admin
# パスワード生成: python -c "import streamlit_authenticator as sa; print(sa.Hasher(['password']).generate())"
password: "admin"
role: admin # admin / viewer
viewer:
name: 閲覧ユーザー01
password: "$2b$12$YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY"
role: viewer
cookie:
name: switch_monitor_auth
key: "change_this_to_a_random_secret_string_32chars" # 必ず変更すること
expiry_days: 1
# ロール別アクセス制御
roles:
admin:
can_refresh: true # 再取得ボタンの表示
can_download: true # CSVダウンロードの表示
viewer:
can_refresh: false
can_download: false
streamlit_app.py
"""
ネットワーク監視ツール - メインページ
"""
import streamlit as st
st.set_page_config(
page_title="ネットワーク監視",
page_icon="🖥️",
layout="wide",
initial_sidebar_state="expanded",
)
# ---------------------------------------------------------------------------
# VLAN 定義(実環境に合わせて編集してください)
# ---------------------------------------------------------------------------
VLAN_LIST = [
{"VLAN ID": 10, "ネットワーク名": "業務系 A棟", "用途": "一般業務PC・プリンター", "帯域": "1 Gbps"},
{"VLAN ID": 20, "ネットワーク名": "業務系 B棟", "用途": "一般業務PC・プリンター", "帯域": "1 Gbps"},
{"VLAN ID": 30, "ネットワーク名": "研究系", "用途": "ワークステーション・計算サーバー", "帯域": "10 Gbps"},
{"VLAN ID": 40, "ネットワーク名": "管理系", "用途": "ネットワーク機器・サーバー管理", "帯域": "1 Gbps"},
{"VLAN ID": 100, "ネットワーク名": "サーバーセグメント","用途": "ファイルサーバー・認証サーバー", "帯域": "10 Gbps"},
]
# ---------------------------------------------------------------------------
# ページ本体
# ---------------------------------------------------------------------------
st.title("🖥️ ネットワーク監視ツール")
st.caption("社内ネットワークの接続状況を確認するためのツールです。")
st.divider()
# ---- 使い方 ----
st.subheader("📖 使い方")
col1, col2, col3 = st.columns(3)
with col1:
st.info(
"**① 左メニューを選択**\n\n"
"画面左のサイドバーから\n"
"確認したい機能を選んでください。"
)
with col2:
st.info(
"**② フィルタで絞り込み**\n\n"
"VLANやIPアドレスで絞り込むと\n"
"目的の機器をすばやく見つけられます。"
)
with col3:
st.info(
"**③ CSVで書き出し**\n\n"
"「CSVダウンロード」ボタンから\n"
"Excelで開ける形式で保存できます。"
)
st.divider()
# ---- 機能一覧 ----
st.subheader("🗂️ 機能一覧")
st.markdown("""
| ページ | 内容 | 更新タイミング |
|--------|------|--------------|
| 🔍 ARP確認 | 各VLANに接続中の機器(IPアドレス・MACアドレス)を一覧表示 | ページを開いたとき(60秒キャッシュ) |
""")
st.divider()
# ---- VLAN 一覧 ----
st.subheader("🌐 VLANとネットワーク構成")
st.caption("社内ネットワークは用途ごとに以下のVLANに分かれています。")
import pandas as pd
df_vlan = pd.DataFrame(VLAN_LIST)
st.dataframe(
df_vlan,
use_container_width=True,
hide_index=True,
column_config={
"VLAN ID": st.column_config.NumberColumn("VLAN ID", format="%d", width="small"),
"ネットワーク名": st.column_config.TextColumn("ネットワーク名", width="medium"),
"用途": st.column_config.TextColumn("用途", width="large"),
"帯域": st.column_config.TextColumn("帯域", width="small"),
},
)
st.divider()
# ---- 注意事項 ----
st.subheader("⚠️ ご利用にあたって")
st.warning(
"- このツールはネットワーク機器への読み取り専用アクセスを行います。設定変更はできません。\n"
"- 表示される情報は取得時点のものです。リアルタイムの変化は「再取得」ボタンで更新してください。\n"
"- 不明な点はネットワーク管理者(情報システム担当)までお問い合わせください。"
)
# ---- フッター ----
st.divider()
st.caption("Network Monitor v1.0 / 管理: 情報システム部門")
snmp/arp_collector.py
"""
SNMP経由でコアスイッチからARPテーブルを取得するモジュール。
ipNetToMediaTable (1.3.6.1.2.1.4.22) を使用。
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Optional
from pysnmp.hlapi import (
CommunityData, ContextData, ObjectIdentity, ObjectType,
SnmpEngine, UdpTransportTarget, nextCmd,
)
# ---------------------------------------------------------------------------
# データクラス
# ---------------------------------------------------------------------------
@dataclass
class ArpEntry:
switch_label: str # 表示名(設定ファイル由来)
switch_host: str # IPまたはホスト名
vlan_id: Optional[int]
interface: str # "Vlan100" など
ip_address: str
mac_address: str # "xx:xx:xx:xx:xx:xx"
# ---------------------------------------------------------------------------
# 内部ユーティリティ
# ---------------------------------------------------------------------------
def _mac_to_str(mac_val) -> str:
"""pysnmp の OctetString を "xx:xx:xx:xx:xx:xx" 形式に変換"""
try:
raw = bytes(mac_val)
if len(raw) == 6:
return ':'.join(f'{b:02x}' for b in raw)
except Exception:
pass
return str(mac_val)
def _snmp_walk(host: str, community: str, oid: str, port: int) -> dict[str, object]:
"""指定OID以下をwalkしてdict {oid_str: value} で返す"""
result = {}
for (err_indication, err_status, _, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(community, mpModel=1), # SNMPv2c
UdpTransportTarget((host, port), timeout=3, retries=1),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False,
):
if err_indication or err_status:
raise RuntimeError(
err_indication or err_status.prettyPrint()
)
for var_bind in var_binds:
result[str(var_bind[0])] = var_bind[1]
return result
# ---------------------------------------------------------------------------
# 公開API
# ---------------------------------------------------------------------------
def get_arp_entries(
host: str,
community: str,
label: str = "",
port: int = 161,
) -> list[ArpEntry]:
"""
1台のスイッチからARPエントリ一覧を取得する。
使用OID:
1.3.6.1.2.1.4.22.1.2 ipNetToMediaPhysAddress
└── suffix: {ifIndex}.{a.b.c.d}
1.3.6.1.2.1.2.2.1.2 ifDescr
└── suffix: {ifIndex}
"""
OID_MAC = "1.3.6.1.2.1.4.22.1.2"
OID_IFDESC = "1.3.6.1.2.1.2.2.1.2"
# ifIndex -> インターフェース名
if_descr_raw = _snmp_walk(host, community, OID_IFDESC, port)
if_map: dict[str, str] = {}
for oid_str, val in if_descr_raw.items():
idx = oid_str.rsplit(".", 1)[-1]
if_map[idx] = str(val)
# MACアドレステーブル
mac_raw = _snmp_walk(host, community, OID_MAC, port)
entries: list[ArpEntry] = []
for oid_str, mac_val in mac_raw.items():
# OID末尾: {ifIndex}.{a.b.c.d}
suffix = oid_str[len(OID_MAC) + 1:]
parts = suffix.split(".", 1)
if len(parts) != 2:
continue
if_index, ip_addr = parts
if_name = if_map.get(if_index, f"ifIndex={if_index}")
m = re.search(r"[Vv]lan(\d+)", if_name)
vlan_id = int(m.group(1)) if m else None
entries.append(ArpEntry(
switch_label=label or host,
switch_host=host,
vlan_id=vlan_id,
interface=if_name,
ip_address=ip_addr,
mac_address=_mac_to_str(mac_val),
))
return entries
pages/arp_table.py
"""
Streamlit ページ: ARPテーブル リアルタイム確認
- YAML ベースのユーザー認証(streamlit-authenticator)
- ロール別機能制御(admin / viewer)
- DB保存なし / st.cache_data(ttl=60) で短期キャッシュ
"""
from __future__ import annotations
import sys
from pathlib import Path
import streamlit as st
import pandas as pd
import yaml
import streamlit_authenticator as stauth
from yaml.loader import SafeLoader
# snmp モジュールをパスに追加(プロジェクト構成に合わせて調整)
sys.path.append(str(Path(__file__).resolve().parents[1]))
from snmp.arp_collector import get_arp_entries
# ---------------------------------------------------------------------------
# 定数
# ---------------------------------------------------------------------------
CONFIG_PATH = Path(__file__).resolve().parents[1] / "config.yaml"
CORE_SWITCHES = [
{"host": "192.168.0.1", "community": "public", "label": "Core-SW1"},
{"host": "192.168.0.2", "community": "public", "label": "Core-SW2"},
]
# ---------------------------------------------------------------------------
# 設定ファイル読み込み
# ---------------------------------------------------------------------------
@st.cache_resource
def load_config() -> dict:
"""config.yaml を読み込む(アプリ起動時1回のみ)"""
with open(CONFIG_PATH, encoding="utf-8") as f:
return yaml.load(f, Loader=SafeLoader)
# ---------------------------------------------------------------------------
# 認証
# ---------------------------------------------------------------------------
def setup_authenticator(config: dict) -> stauth.Authenticate:
return stauth.Authenticate(
credentials=config["credentials"],
cookie_name=config["cookie"]["name"],
cookie_key=config["cookie"]["key"],
cookie_expiry_days=config["cookie"]["expiry_days"],
)
def get_role(config: dict, username: str) -> str:
"""ユーザー名からロールを取得。未設定の場合は viewer を返す"""
return (
config["credentials"]["usernames"]
.get(username, {})
.get("role", "viewer")
)
def can(config: dict, role: str, permission: str) -> bool:
"""ロール権限チェック"""
return config["roles"].get(role, {}).get(permission, False)
# ---------------------------------------------------------------------------
# データ取得(短期キャッシュ: 60秒)
# ---------------------------------------------------------------------------
@st.cache_data(ttl=60, show_spinner=False)
def fetch_arp_dataframe() -> tuple[pd.DataFrame, list[str]]:
"""
全コアスイッチからARPエントリを取得してDataFrameに変換。
errors: 取得失敗スイッチのメッセージリスト。
"""
rows: list[dict] = []
errors: list[str] = []
for sw in CORE_SWITCHES:
try:
entries = get_arp_entries(
host=sw["host"],
community=sw["community"],
label=sw["label"],
)
for e in entries:
rows.append({
"スイッチ": e.switch_label,
"VLAN": e.vlan_id,
"インターフェース": e.interface,
"IPアドレス": e.ip_address,
"MACアドレス": e.mac_address,
})
except Exception as ex:
errors.append(f"⚠️ {sw['label']} ({sw['host']}): {ex}")
df = pd.DataFrame(rows) if rows else pd.DataFrame(
columns=["スイッチ", "VLAN", "インターフェース", "IPアドレス", "MACアドレス"]
)
return df, errors
# ---------------------------------------------------------------------------
# ARPテーブル本体 UI
# ---------------------------------------------------------------------------
def render_arp_page(config: dict, username: str, display_name: str):
role = get_role(config, username)
st.title("🔍 ARPテーブル リアルタイム確認")
st.caption("コアスイッチからSNMPで取得(DB保存なし・キャッシュ60秒)")
# ---- 再取得ボタン(admin のみ表示)----
if can(config, role, "can_refresh"):
if st.button("🔄 再取得", use_container_width=False):
st.cache_data.clear()
st.rerun()
# ---- データ取得 ----
with st.spinner("SNMPでARPテーブルを取得中..."):
df, errors = fetch_arp_dataframe()
# ---- エラー表示 ----
for msg in errors:
st.warning(msg)
if df.empty:
st.error("ARPエントリを取得できませんでした。")
return
# ---- フィルタUI ----
with st.expander("🔎 フィルタ", expanded=True):
f_col1, f_col2, f_col3, f_col4 = st.columns(4)
with f_col1:
sw_labels = sorted(df["スイッチ"].unique().tolist())
sel_sw = st.multiselect("スイッチ", options=sw_labels)
with f_col2:
vlans = sorted(df["VLAN"].dropna().astype(int).unique().tolist())
sel_vlan = st.multiselect("VLAN", options=vlans)
with f_col3:
ip_filter = st.text_input("IPアドレス(部分一致)", placeholder="192.168.10")
with f_col4:
mac_filter = st.text_input("MACアドレス(部分一致)", placeholder="aa:bb")
# ---- フィルタ適用 ----
filtered = df.copy()
if sel_sw:
filtered = filtered[filtered["スイッチ"].isin(sel_sw)]
if sel_vlan:
filtered = filtered[filtered["VLAN"].isin(sel_vlan)]
if ip_filter:
filtered = filtered[filtered["IPアドレス"].str.contains(ip_filter, na=False)]
if mac_filter:
filtered = filtered[
filtered["MACアドレス"].str.contains(mac_filter, case=False, na=False)
]
# ---- 集計メトリクス ----
m1, m2, m3 = st.columns(3)
m1.metric("総エントリ数", len(df))
m2.metric("フィルタ後", len(filtered))
m3.metric("取得スイッチ数", df["スイッチ"].nunique())
# ---- テーブル表示 ----
st.dataframe(
filtered.sort_values(["VLAN", "IPアドレス"]),
use_container_width=True,
hide_index=True,
column_config={
"VLAN": st.column_config.NumberColumn("VLAN", format="%d"),
},
)
# ---- CSVエクスポート(admin のみ表示)----
if can(config, role, "can_download"):
st.download_button(
label="📥 CSV ダウンロード",
data=filtered.to_csv(index=False).encode("utf-8-sig"),
file_name="arp_table.csv",
mime="text/csv",
)
# ---------------------------------------------------------------------------
# エントリーポイント
# ---------------------------------------------------------------------------
def main():
st.set_page_config(page_title="ARPテーブル確認", layout="wide")
config = load_config()
authenticator = setup_authenticator(config)
# ---- ログインフォーム ----
# name, auth_status, username = authenticator.login(
# form_name="ログイン",
# location="main",
# )
result = authenticator.login(location="main")
if result is not None:
name, auth_status, username = result
else:
name = st.session_state.get("name")
auth_status = st.session_state.get("authentication_status")
username = st.session_state.get("username")
# ---- 認証結果ハンドリング ----
if auth_status is True:
# サイドバー: ユーザー情報 + ログアウト
with st.sidebar:
role = get_role(config, username)
st.markdown(f"**👤 {name}**")
st.caption(f"ロール: `{role}`")
st.divider()
authenticator.logout("ログアウト", location="sidebar")
render_arp_page(config, username, name)
elif auth_status is False:
st.error("❌ ユーザー名またはパスワードが正しくありません")
else: # None: 未入力
st.info("ユーザー名とパスワードを入力してください")
if __name__ == "__main__":
main()