Files
Debian 4a34629023 feat(audit): Live-Stream im Dashboard via WebSocket
Recent-Activity-Karte zeigt neue audit_log-Events jetzt sofort statt
in 15s-Polls.

internal/services/audit/audit.go:
  - Repo bekommt Subscribe()-Methode mit fan-out-channel (Buffer 32,
    non-blocking-send — langsame Clients droppen Events statt die
    Pipeline zu blockieren).
  - Log() macht jetzt INSERT … RETURNING id, created_at und broadcastet
    den fertigen Entry an alle Subscribers. Broadcast nur nach
    erfolgreichem INSERT — failed inserts erscheinen nicht.

internal/handlers/audit.go:
  - Neuer GET /api/v1/audit/live (WebSocket): sendet beim Connect die
    letzten 50 Einträge (oldest→newest), danach Live-Stream aus
    Subscribe-Channel. 30s-Ping gegen HAProxy-Tunnel-Timeout.
  - Recent (Poll-Endpoint) bleibt für Fallbacks erhalten.

UI Dashboard:
  - useAuditLive(keep=15)-Hook ersetzt das 15s-useQuery-Poll.
  - WebSocket auf wss://<host>/api/v1/audit/live; Auto-Reconnect alle
    2s nach Drop.
  - dedupe per id (Snapshot + erste live-Events können sich kurz
    überschneiden während des Subscribe-Race).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:39:04 +02:00

156 lines
3.8 KiB
Go

// Package audit appends rows to the audit_log table. Every mutation
// in the API funnels through this so the operator can answer
// "who did what when?" from a single SELECT.
package audit
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repo struct {
Pool *pgxpool.Pool
subsMu sync.RWMutex
subs map[chan Entry]struct{}
}
func New(pool *pgxpool.Pool) *Repo {
return &Repo{Pool: pool, subs: map[chan Entry]struct{}{}}
}
// Subscribe gibt einen Channel für Live-Audit-Events zurück + ein
// Unsubscribe-Cleanup. Channel-Buffer 32 — bei stehenden Clients
// werden Events gedropt (non-blocking-send).
func (r *Repo) Subscribe() (<-chan Entry, func()) {
c := make(chan Entry, 32)
r.subsMu.Lock()
if r.subs == nil {
r.subs = map[chan Entry]struct{}{}
}
r.subs[c] = struct{}{}
r.subsMu.Unlock()
return c, func() {
r.subsMu.Lock()
delete(r.subs, c)
r.subsMu.Unlock()
close(c)
}
}
func (r *Repo) broadcast(e Entry) {
r.subsMu.RLock()
subs := make([]chan Entry, 0, len(r.subs))
for c := range r.subs {
subs = append(subs, c)
}
r.subsMu.RUnlock()
for _, c := range subs {
select {
case c <- e:
default:
}
}
}
// Entry mirrors one audit_log row — ListRecent returns these for
// the dashboard's recent-activity card.
type Entry struct {
ID int64 `json:"id"`
Actor string `json:"actor"`
Action string `json:"action"`
Subject *string `json:"subject,omitempty"`
Detail json.RawMessage `json:"detail,omitempty"`
NodeID *string `json:"node_id,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// ListRecent returns the most recent audit entries, newest first.
// Pass 0 for a sensible default (10).
func (r *Repo) ListRecent(ctx context.Context, limit int) ([]Entry, error) {
if r == nil || r.Pool == nil {
return []Entry{}, nil
}
if limit <= 0 || limit > 100 {
limit = 10
}
rows, err := r.Pool.Query(ctx, `
SELECT id, actor, action, subject, detail, node_id, created_at
FROM audit_log
ORDER BY created_at DESC, id DESC
LIMIT $1`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]Entry, 0, limit)
for rows.Next() {
var e Entry
if err := rows.Scan(&e.ID, &e.Actor, &e.Action, &e.Subject, &e.Detail, &e.NodeID, &e.CreatedAt); err != nil {
return nil, err
}
out = append(out, e)
}
return out, rows.Err()
}
// Log writes one audit_log row. detail is JSON-encodable (typically a
// map[string]any) — empty map means "no payload". If pool is nil
// (e.g. dev env without DB), Log silently no-ops so handlers don't
// have to guard each call site.
func (r *Repo) Log(ctx context.Context, actor, action, subject string, detail any, nodeID string) error {
if r == nil || r.Pool == nil {
return nil
}
var detailJSON []byte
if detail != nil {
var err error
detailJSON, err = json.Marshal(detail)
if err != nil {
return err
}
}
var subjectArg any = subject
if subject == "" {
subjectArg = nil
}
var nodeArg any = nodeID
if nodeID == "" {
nodeArg = nil
}
// RETURNING id+created_at damit der Subscribe-Channel direkt einen
// vollständigen Entry verteilen kann — Subscriber müssen nicht
// erneut die DB hitten für die Anzeige.
var e Entry
e.Actor = actor
e.Action = action
if subject != "" {
s := subject
e.Subject = &s
}
if len(detailJSON) > 0 {
e.Detail = json.RawMessage(detailJSON)
}
if nodeID != "" {
n := nodeID
e.NodeID = &n
}
err := r.Pool.QueryRow(ctx, `
INSERT INTO audit_log (actor, action, subject, detail, node_id)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, created_at`,
actor, action, subjectArg, detailJSON, nodeArg).
Scan(&e.ID, &e.CreatedAt)
if err != nil {
return err
}
// Broadcast nach erfolgreichem INSERT — wenn DB ablehnt, sollen
// Subscribers das Event auch nicht sehen.
r.broadcast(e)
return nil
}