feat(audit): Live-Stream im Dashboard via WebSocket
Recent-Activity-Karte zeigt neue audit_log-Events jetzt sofort statt
in 15s-Polls.
internal/services/audit/audit.go:
- Repo bekommt Subscribe()-Methode mit fan-out-channel (Buffer 32,
non-blocking-send — langsame Clients droppen Events statt die
Pipeline zu blockieren).
- Log() macht jetzt INSERT … RETURNING id, created_at und broadcastet
den fertigen Entry an alle Subscribers. Broadcast nur nach
erfolgreichem INSERT — failed inserts erscheinen nicht.
internal/handlers/audit.go:
- Neuer GET /api/v1/audit/live (WebSocket): sendet beim Connect die
letzten 50 Einträge (oldest→newest), danach Live-Stream aus
Subscribe-Channel. 30s-Ping gegen HAProxy-Tunnel-Timeout.
- Recent (Poll-Endpoint) bleibt für Fallbacks erhalten.
UI Dashboard:
- useAuditLive(keep=15)-Hook ersetzt das 15s-useQuery-Poll.
- WebSocket auf wss://<host>/api/v1/audit/live; Auto-Reconnect alle
2s nach Drop.
- dedupe per id (Snapshot + erste live-Events können sich kurz
überschneiden während des Subscribe-Race).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -52,7 +52,7 @@ import (
|
|||||||
wgsvc "git.netcell-it.de/projekte/edgeguard-native/internal/services/wireguard"
|
wgsvc "git.netcell-it.de/projekte/edgeguard-native/internal/services/wireguard"
|
||||||
)
|
)
|
||||||
|
|
||||||
var version = "1.0.72"
|
var version = "1.0.73"
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
addr := os.Getenv("EDGEGUARD_API_ADDR")
|
addr := os.Getenv("EDGEGUARD_API_ADDR")
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
var version = "1.0.72"
|
var version = "1.0.73"
|
||||||
|
|
||||||
const usage = `edgeguard-ctl — EdgeGuard CLI
|
const usage = `edgeguard-ctl — EdgeGuard CLI
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ import (
|
|||||||
"git.netcell-it.de/projekte/edgeguard-native/internal/services/tlscerts"
|
"git.netcell-it.de/projekte/edgeguard-native/internal/services/tlscerts"
|
||||||
)
|
)
|
||||||
|
|
||||||
var version = "1.0.72"
|
var version = "1.0.73"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// renewTickInterval — how often we re-evaluate expiring certs.
|
// renewTickInterval — how often we re-evaluate expiring certs.
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
package handlers
|
package handlers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
|
||||||
"git.netcell-it.de/projekte/edgeguard-native/internal/handlers/response"
|
"git.netcell-it.de/projekte/edgeguard-native/internal/handlers/response"
|
||||||
"git.netcell-it.de/projekte/edgeguard-native/internal/services/audit"
|
"git.netcell-it.de/projekte/edgeguard-native/internal/services/audit"
|
||||||
@@ -18,10 +22,12 @@ func NewAuditHandler(repo *audit.Repo) *AuditHandler { return &AuditHandler{Repo
|
|||||||
func (h *AuditHandler) Register(rg *gin.RouterGroup) {
|
func (h *AuditHandler) Register(rg *gin.RouterGroup) {
|
||||||
g := rg.Group("/audit")
|
g := rg.Group("/audit")
|
||||||
g.GET("/recent", h.Recent)
|
g.GET("/recent", h.Recent)
|
||||||
|
g.GET("/live", h.Live)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recent returns the most recent audit_log entries — used by the
|
// Recent returns the most recent audit_log entries — used by the
|
||||||
// dashboard's recent-activity card. ?limit=N (1–100, default 10).
|
// dashboard fallback path (z.B. wenn WebSocket nicht verbinden kann).
|
||||||
|
// ?limit=N (1–100, default 10).
|
||||||
func (h *AuditHandler) Recent(c *gin.Context) {
|
func (h *AuditHandler) Recent(c *gin.Context) {
|
||||||
limit := 10
|
limit := 10
|
||||||
if v := c.Query("limit"); v != "" {
|
if v := c.Query("limit"); v != "" {
|
||||||
@@ -36,3 +42,74 @@ func (h *AuditHandler) Recent(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
response.OK(c, gin.H{"entries": rows})
|
response.OK(c, gin.H{"entries": rows})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// auditUpgrader: same-origin durch HAProxy, kein CheckOrigin.
|
||||||
|
var auditUpgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 4 * 1024,
|
||||||
|
CheckOrigin: func(r *http.Request) bool { return true },
|
||||||
|
}
|
||||||
|
|
||||||
|
// Live upgraded auf WebSocket: sendet einen Snapshot der letzten 50
|
||||||
|
// audit_log-Rows, danach jeden neuen Eintrag direkt aus dem
|
||||||
|
// Repo.broadcast()-Channel.
|
||||||
|
func (h *AuditHandler) Live(c *gin.Context) {
|
||||||
|
conn, err := auditUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Snapshot
|
||||||
|
if rows, err := h.Repo.ListRecent(c.Request.Context(), 50); err == nil {
|
||||||
|
// In aufsteigender Reihenfolge schicken (newest last) damit der
|
||||||
|
// Client nach unten scrollt + neue Events natürlich anhängt.
|
||||||
|
for i := len(rows) - 1; i >= 0; i-- {
|
||||||
|
if err := conn.WriteJSON(rows[i]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Live-Subscribe
|
||||||
|
ch, unsub := h.Repo.Subscribe()
|
||||||
|
defer unsub()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Read-Loop für close-frame + ping-pong
|
||||||
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
for {
|
||||||
|
if _, _, err := conn.NextReader(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ping := time.NewTicker(30 * time.Second)
|
||||||
|
defer ping.Stop()
|
||||||
|
_ = conn.SetWriteDeadline(time.Now().Add(60 * time.Second))
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case e, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
if err := conn.WriteJSON(e); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ping.C:
|
||||||
|
_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
if err := conn.WriteControl(websocket.PingMessage, nil,
|
||||||
|
time.Now().Add(5*time.Second)); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ package audit
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
@@ -13,9 +14,48 @@ import (
|
|||||||
|
|
||||||
type Repo struct {
|
type Repo struct {
|
||||||
Pool *pgxpool.Pool
|
Pool *pgxpool.Pool
|
||||||
|
|
||||||
|
subsMu sync.RWMutex
|
||||||
|
subs map[chan Entry]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(pool *pgxpool.Pool) *Repo { return &Repo{Pool: pool} }
|
func New(pool *pgxpool.Pool) *Repo {
|
||||||
|
return &Repo{Pool: pool, subs: map[chan Entry]struct{}{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe gibt einen Channel für Live-Audit-Events zurück + ein
|
||||||
|
// Unsubscribe-Cleanup. Channel-Buffer 32 — bei stehenden Clients
|
||||||
|
// werden Events gedropt (non-blocking-send).
|
||||||
|
func (r *Repo) Subscribe() (<-chan Entry, func()) {
|
||||||
|
c := make(chan Entry, 32)
|
||||||
|
r.subsMu.Lock()
|
||||||
|
if r.subs == nil {
|
||||||
|
r.subs = map[chan Entry]struct{}{}
|
||||||
|
}
|
||||||
|
r.subs[c] = struct{}{}
|
||||||
|
r.subsMu.Unlock()
|
||||||
|
return c, func() {
|
||||||
|
r.subsMu.Lock()
|
||||||
|
delete(r.subs, c)
|
||||||
|
r.subsMu.Unlock()
|
||||||
|
close(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Repo) broadcast(e Entry) {
|
||||||
|
r.subsMu.RLock()
|
||||||
|
subs := make([]chan Entry, 0, len(r.subs))
|
||||||
|
for c := range r.subs {
|
||||||
|
subs = append(subs, c)
|
||||||
|
}
|
||||||
|
r.subsMu.RUnlock()
|
||||||
|
for _, c := range subs {
|
||||||
|
select {
|
||||||
|
case c <- e:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Entry mirrors one audit_log row — ListRecent returns these for
|
// Entry mirrors one audit_log row — ListRecent returns these for
|
||||||
// the dashboard's recent-activity card.
|
// the dashboard's recent-activity card.
|
||||||
@@ -82,9 +122,34 @@ func (r *Repo) Log(ctx context.Context, actor, action, subject string, detail an
|
|||||||
if nodeID == "" {
|
if nodeID == "" {
|
||||||
nodeArg = nil
|
nodeArg = nil
|
||||||
}
|
}
|
||||||
_, err := r.Pool.Exec(ctx,
|
// RETURNING id+created_at damit der Subscribe-Channel direkt einen
|
||||||
`INSERT INTO audit_log (actor, action, subject, detail, node_id)
|
// vollständigen Entry verteilen kann — Subscriber müssen nicht
|
||||||
VALUES ($1, $2, $3, $4, $5)`,
|
// erneut die DB hitten für die Anzeige.
|
||||||
actor, action, subjectArg, detailJSON, nodeArg)
|
var e Entry
|
||||||
|
e.Actor = actor
|
||||||
|
e.Action = action
|
||||||
|
if subject != "" {
|
||||||
|
s := subject
|
||||||
|
e.Subject = &s
|
||||||
|
}
|
||||||
|
if len(detailJSON) > 0 {
|
||||||
|
e.Detail = json.RawMessage(detailJSON)
|
||||||
|
}
|
||||||
|
if nodeID != "" {
|
||||||
|
n := nodeID
|
||||||
|
e.NodeID = &n
|
||||||
|
}
|
||||||
|
err := r.Pool.QueryRow(ctx, `
|
||||||
|
INSERT INTO audit_log (actor, action, subject, detail, node_id)
|
||||||
|
VALUES ($1, $2, $3, $4, $5)
|
||||||
|
RETURNING id, created_at`,
|
||||||
|
actor, action, subjectArg, detailJSON, nodeArg).
|
||||||
|
Scan(&e.ID, &e.CreatedAt)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Broadcast nach erfolgreichem INSERT — wenn DB ablehnt, sollen
|
||||||
|
// Subscribers das Event auch nicht sehen.
|
||||||
|
r.broadcast(e)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ const NAV: NavSection[] = [
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
const VERSION = '1.0.72'
|
const VERSION = '1.0.73'
|
||||||
|
|
||||||
// Sidebar-Pattern 1:1 aus netcell-webpanel (enconf) übernommen:
|
// Sidebar-Pattern 1:1 aus netcell-webpanel (enconf) übernommen:
|
||||||
// - <nav> als root, dunkler Gradient + Teal/Blue-Accent
|
// - <nav> als root, dunkler Gradient + Teal/Blue-Accent
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import {
|
|||||||
SafetyCertificateOutlined, ThunderboltOutlined,
|
SafetyCertificateOutlined, ThunderboltOutlined,
|
||||||
} from '@ant-design/icons'
|
} from '@ant-design/icons'
|
||||||
import { useQuery } from '@tanstack/react-query'
|
import { useQuery } from '@tanstack/react-query'
|
||||||
|
import { useEffect, useRef, useState } from 'react'
|
||||||
import { useTranslation } from 'react-i18next'
|
import { useTranslation } from 'react-i18next'
|
||||||
|
|
||||||
import apiClient, { isEnvelope } from '../../api/client'
|
import apiClient, { isEnvelope } from '../../api/client'
|
||||||
@@ -13,6 +14,51 @@ import StatusDot from '../../components/StatusDot'
|
|||||||
|
|
||||||
const { Text } = Typography
|
const { Text } = Typography
|
||||||
|
|
||||||
|
// useAuditLive verbindet sich mit dem WebSocket-Stream /audit/live.
|
||||||
|
// Server sendet beim Connect die letzten 50 Einträge (oldest→newest),
|
||||||
|
// danach jeden neuen INSERT direkt. UI behält das letzte `keep` und
|
||||||
|
// zeigt newest-first. Reconnect alle 2s bei Drop.
|
||||||
|
function useAuditLive(keep = 15) {
|
||||||
|
const [data, setData] = useState<AuditEntry[]>([])
|
||||||
|
const wsRef = useRef<WebSocket | null>(null)
|
||||||
|
useEffect(() => {
|
||||||
|
let cancelled = false
|
||||||
|
let timer: ReturnType<typeof setTimeout> | null = null
|
||||||
|
const connect = () => {
|
||||||
|
if (cancelled) return
|
||||||
|
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
||||||
|
const url = `${proto}//${window.location.host}/api/v1/audit/live`
|
||||||
|
let ws: WebSocket
|
||||||
|
try { ws = new WebSocket(url) } catch { scheduleReconnect(); return }
|
||||||
|
wsRef.current = ws
|
||||||
|
ws.onmessage = (ev) => {
|
||||||
|
try {
|
||||||
|
const e: AuditEntry = JSON.parse(ev.data as string)
|
||||||
|
setData((prev) => {
|
||||||
|
// dedupe by id (Snapshot + live können sich kurz überschneiden)
|
||||||
|
if (prev.some((p) => p.id === e.id)) return prev
|
||||||
|
const next = [e, ...prev]
|
||||||
|
return next.length > keep ? next.slice(0, keep) : next
|
||||||
|
})
|
||||||
|
} catch { /* ignore parse fail */ }
|
||||||
|
}
|
||||||
|
ws.onclose = () => { if (!cancelled) scheduleReconnect() }
|
||||||
|
ws.onerror = () => { /* onclose feuert danach */ }
|
||||||
|
}
|
||||||
|
const scheduleReconnect = () => {
|
||||||
|
if (timer) clearTimeout(timer)
|
||||||
|
timer = setTimeout(connect, 2000)
|
||||||
|
}
|
||||||
|
connect()
|
||||||
|
return () => {
|
||||||
|
cancelled = true
|
||||||
|
if (timer) clearTimeout(timer)
|
||||||
|
if (wsRef.current) wsRef.current.close()
|
||||||
|
}
|
||||||
|
}, [keep])
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
// ── Wire shapes ───────────────────────────────────────────────
|
// ── Wire shapes ───────────────────────────────────────────────
|
||||||
|
|
||||||
interface Domain { id: number; active: boolean; primary_backend_id?: number | null }
|
interface Domain { id: number; active: boolean; primary_backend_id?: number | null }
|
||||||
@@ -130,11 +176,7 @@ export default function DashboardPage() {
|
|||||||
queryFn: () => fetchList<HAProxyBackend>('/haproxy/stats', 'backends'),
|
queryFn: () => fetchList<HAProxyBackend>('/haproxy/stats', 'backends'),
|
||||||
refetchInterval: 10_000,
|
refetchInterval: 10_000,
|
||||||
})
|
})
|
||||||
const auditEntries = useQuery({
|
const auditEntries = useAuditLive(15)
|
||||||
queryKey: ['audit', 'recent'],
|
|
||||||
queryFn: () => fetchList<AuditEntry>('/audit/recent?limit=10', 'entries'),
|
|
||||||
refetchInterval: 15_000,
|
|
||||||
})
|
|
||||||
|
|
||||||
const domains = useQuery({ queryKey: ['domains'], queryFn: () => fetchList<Domain>('/domains', 'domains') })
|
const domains = useQuery({ queryKey: ['domains'], queryFn: () => fetchList<Domain>('/domains', 'domains') })
|
||||||
const backends = useQuery({ queryKey: ['backends'], queryFn: () => fetchList<Backend>('/backends', 'backends') })
|
const backends = useQuery({ queryKey: ['backends'], queryFn: () => fetchList<Backend>('/backends', 'backends') })
|
||||||
@@ -220,11 +262,11 @@ export default function DashboardPage() {
|
|||||||
{/* ── Recent activity (audit log) ─────────────────── */}
|
{/* ── Recent activity (audit log) ─────────────────── */}
|
||||||
<Col xs={24} lg={12}>
|
<Col xs={24} lg={12}>
|
||||||
<Card size="small" title={<><ApiOutlined /> {t('dashboard.activityCard.title')}</>} className="h-100">
|
<Card size="small" title={<><ApiOutlined /> {t('dashboard.activityCard.title')}</>} className="h-100">
|
||||||
{(auditEntries.data ?? []).length === 0 ? (
|
{auditEntries.length === 0 ? (
|
||||||
<Text type="secondary">{t('dashboard.activityCard.empty')}</Text>
|
<Text type="secondary">{t('dashboard.activityCard.empty')}</Text>
|
||||||
) : (
|
) : (
|
||||||
<Space direction="vertical" style={{ width: '100%' }} size={2}>
|
<Space direction="vertical" style={{ width: '100%' }} size={2}>
|
||||||
{(auditEntries.data ?? []).map(e => (
|
{auditEntries.map(e => (
|
||||||
<div key={e.id} style={{ fontSize: 12, color: '#334155', borderBottom: '1px solid #F1F5F9', padding: '4px 0' }}>
|
<div key={e.id} style={{ fontSize: 12, color: '#334155', borderBottom: '1px solid #F1F5F9', padding: '4px 0' }}>
|
||||||
<Space size={6} wrap>
|
<Space size={6} wrap>
|
||||||
<Tag color="blue" style={{ margin: 0, fontSize: 10 }}>{e.action}</Tag>
|
<Tag color="blue" style={{ margin: 0, fontSize: 10 }}>{e.action}</Tag>
|
||||||
|
|||||||
Reference in New Issue
Block a user