Recent-Activity-Karte zeigt neue audit_log-Events jetzt sofort statt
in 15s-Polls.
internal/services/audit/audit.go:
- Repo bekommt Subscribe()-Methode mit fan-out-channel (Buffer 32,
non-blocking-send — langsame Clients droppen Events statt die
Pipeline zu blockieren).
- Log() macht jetzt INSERT … RETURNING id, created_at und broadcastet
den fertigen Entry an alle Subscribers. Broadcast nur nach
erfolgreichem INSERT — failed inserts erscheinen nicht.
internal/handlers/audit.go:
- Neuer GET /api/v1/audit/live (WebSocket): sendet beim Connect die
letzten 50 Einträge (oldest→newest), danach Live-Stream aus
Subscribe-Channel. 30s-Ping gegen HAProxy-Tunnel-Timeout.
- Recent (Poll-Endpoint) bleibt für Fallbacks erhalten.
UI Dashboard:
- useAuditLive(keep=15)-Hook ersetzt das 15s-useQuery-Poll.
- WebSocket auf wss://<host>/api/v1/audit/live; Auto-Reconnect alle
2s nach Drop.
- dedupe per id (Snapshot + erste live-Events können sich kurz
überschneiden während des Subscribe-Race).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
156 lines
3.8 KiB
Go
156 lines
3.8 KiB
Go
// Package audit appends rows to the audit_log table. Every mutation
|
|
// in the API funnels through this so the operator can answer
|
|
// "who did what when?" from a single SELECT.
|
|
package audit
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type Repo struct {
|
|
Pool *pgxpool.Pool
|
|
|
|
subsMu sync.RWMutex
|
|
subs map[chan Entry]struct{}
|
|
}
|
|
|
|
func New(pool *pgxpool.Pool) *Repo {
|
|
return &Repo{Pool: pool, subs: map[chan Entry]struct{}{}}
|
|
}
|
|
|
|
// Subscribe gibt einen Channel für Live-Audit-Events zurück + ein
|
|
// Unsubscribe-Cleanup. Channel-Buffer 32 — bei stehenden Clients
|
|
// werden Events gedropt (non-blocking-send).
|
|
func (r *Repo) Subscribe() (<-chan Entry, func()) {
|
|
c := make(chan Entry, 32)
|
|
r.subsMu.Lock()
|
|
if r.subs == nil {
|
|
r.subs = map[chan Entry]struct{}{}
|
|
}
|
|
r.subs[c] = struct{}{}
|
|
r.subsMu.Unlock()
|
|
return c, func() {
|
|
r.subsMu.Lock()
|
|
delete(r.subs, c)
|
|
r.subsMu.Unlock()
|
|
close(c)
|
|
}
|
|
}
|
|
|
|
func (r *Repo) broadcast(e Entry) {
|
|
r.subsMu.RLock()
|
|
subs := make([]chan Entry, 0, len(r.subs))
|
|
for c := range r.subs {
|
|
subs = append(subs, c)
|
|
}
|
|
r.subsMu.RUnlock()
|
|
for _, c := range subs {
|
|
select {
|
|
case c <- e:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Entry mirrors one audit_log row — ListRecent returns these for
|
|
// the dashboard's recent-activity card.
|
|
type Entry struct {
|
|
ID int64 `json:"id"`
|
|
Actor string `json:"actor"`
|
|
Action string `json:"action"`
|
|
Subject *string `json:"subject,omitempty"`
|
|
Detail json.RawMessage `json:"detail,omitempty"`
|
|
NodeID *string `json:"node_id,omitempty"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// ListRecent returns the most recent audit entries, newest first.
|
|
// Pass 0 for a sensible default (10).
|
|
func (r *Repo) ListRecent(ctx context.Context, limit int) ([]Entry, error) {
|
|
if r == nil || r.Pool == nil {
|
|
return []Entry{}, nil
|
|
}
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 10
|
|
}
|
|
rows, err := r.Pool.Query(ctx, `
|
|
SELECT id, actor, action, subject, detail, node_id, created_at
|
|
FROM audit_log
|
|
ORDER BY created_at DESC, id DESC
|
|
LIMIT $1`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := make([]Entry, 0, limit)
|
|
for rows.Next() {
|
|
var e Entry
|
|
if err := rows.Scan(&e.ID, &e.Actor, &e.Action, &e.Subject, &e.Detail, &e.NodeID, &e.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// Log writes one audit_log row. detail is JSON-encodable (typically a
|
|
// map[string]any) — empty map means "no payload". If pool is nil
|
|
// (e.g. dev env without DB), Log silently no-ops so handlers don't
|
|
// have to guard each call site.
|
|
func (r *Repo) Log(ctx context.Context, actor, action, subject string, detail any, nodeID string) error {
|
|
if r == nil || r.Pool == nil {
|
|
return nil
|
|
}
|
|
var detailJSON []byte
|
|
if detail != nil {
|
|
var err error
|
|
detailJSON, err = json.Marshal(detail)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
var subjectArg any = subject
|
|
if subject == "" {
|
|
subjectArg = nil
|
|
}
|
|
var nodeArg any = nodeID
|
|
if nodeID == "" {
|
|
nodeArg = nil
|
|
}
|
|
// RETURNING id+created_at damit der Subscribe-Channel direkt einen
|
|
// vollständigen Entry verteilen kann — Subscriber müssen nicht
|
|
// erneut die DB hitten für die Anzeige.
|
|
var e Entry
|
|
e.Actor = actor
|
|
e.Action = action
|
|
if subject != "" {
|
|
s := subject
|
|
e.Subject = &s
|
|
}
|
|
if len(detailJSON) > 0 {
|
|
e.Detail = json.RawMessage(detailJSON)
|
|
}
|
|
if nodeID != "" {
|
|
n := nodeID
|
|
e.NodeID = &n
|
|
}
|
|
err := r.Pool.QueryRow(ctx, `
|
|
INSERT INTO audit_log (actor, action, subject, detail, node_id)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id, created_at`,
|
|
actor, action, subjectArg, detailJSON, nodeArg).
|
|
Scan(&e.ID, &e.CreatedAt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Broadcast nach erfolgreichem INSERT — wenn DB ablehnt, sollen
|
|
// Subscribers das Event auch nicht sehen.
|
|
r.broadcast(e)
|
|
return nil
|
|
}
|