Files
edgeguard-native/internal/services/syslogs/syslogs.go
Debian 827c364335 feat(logs): Phase 4 — zentrales Logsystem /api/v1/logs + /system/logs
Aggregierter Reader für alle EdgeGuard-Service-Journale + audit_log.

internal/services/syslogs/
  - 9 Quellen: edgeguard-api, edgeguard-scheduler, haproxy, squid,
    unbound, chrony, wg-quick@*, ulogd2, audit
  - journalctl --output=json + parser für __REALTIME_TIMESTAMP,
    PRIORITY (0-7 → debug/info/warn/error), MESSAGE, _HOSTNAME
  - audit-Reader nutzt bestehende audit.Repo.ListRecent
  - Concurrent fan-out über alle gewählten Quellen, dann merge-sort
    by Timestamp DESC + cap auf Limit (max 1000)
  - Client-Filter: Level, Grep (case-insensitive über message +
    actor + action + subject)

internal/handlers/logs.go:
  GET /api/v1/logs            — Filter via Query-Params
  GET /api/v1/logs/sources    — statische Quellen-Liste fürs UI

postinst: edgeguard → systemd-journal + adm Gruppen, damit
journalctl ohne sudo lesen kann. Verifiziert auf der Box: id zeigt
`groups=adm,systemd-journal,haproxy,edgeguard`.

UI: management-ui/src/pages/Logs — Multi-Source-Select, Level-Color-
Tags, Time-Range-Picker, Volltext-Suche, Auto-Refresh 5s (Toggle),
CSV-Export. Sidebar-Eintrag "Logs" unter System (FileSearchOutlined).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 21:29:38 +02:00

348 lines
8.8 KiB
Go

// Package syslogs aggregiert Log-Entries aus systemd-journal (per
// `journalctl --output=json`) und der audit_log-Tabelle in ein
// einheitliches Entry-Format. Phase 4 des Log-Systems.
//
// Quellen sind statisch konfiguriert — alles was zur EdgeGuard-Box
// gehört. ulogd2 ist NICHT enthalten, weil dessen Daten als
// strukturiertes File von /firewall-live gehandhabt werden.
//
// journalctl-Aufruf läuft als edgeguard-User; das funktioniert nur
// wenn der in der Gruppe `systemd-journal` (oder `adm`) ist — der
// postinst legt das an.
package syslogs
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"time"
"git.netcell-it.de/projekte/edgeguard-native/internal/services/audit"
)
// Source bezeichnet eine logische Log-Quelle, die das UI als Filter
// auswählt. "audit" ist die DB-Tabelle, alle anderen sind systemd-
// Units.
type Source string
const (
SourceAPI Source = "edgeguard-api"
SourceScheduler Source = "edgeguard-scheduler"
SourceHAProxy Source = "haproxy"
SourceSquid Source = "squid"
SourceUnbound Source = "unbound"
SourceChrony Source = "chrony"
SourceWireguard Source = "wg-quick" // bezieht alle wg-quick@*.service
SourceUlogd Source = "ulogd2"
SourceAudit Source = "audit"
)
// AllSources ist die Reihenfolge fürs UI-Dropdown.
var AllSources = []Source{
SourceAPI, SourceScheduler,
SourceHAProxy, SourceSquid, SourceUnbound, SourceChrony, SourceWireguard, SourceUlogd,
SourceAudit,
}
// Level mappt syslog-PRIORITY-Werte auf eine UI-freundliche
// Kategorie. journalctl liefert Strings; wir normalisieren auf
// debug/info/warn/error.
type Level string
const (
LevelDebug Level = "debug"
LevelInfo Level = "info"
LevelWarn Level = "warn"
LevelError Level = "error"
)
// Entry ist die unified Sicht — sortiert by Timestamp DESC im
// Endergebnis.
type Entry struct {
Timestamp time.Time `json:"timestamp"`
Source Source `json:"source"`
Level Level `json:"level"`
Message string `json:"message"`
Host string `json:"host,omitempty"`
// Audit-spezifisch — optional gesetzt für SourceAudit. UI rendert
// das in einer eigenen Tooltip.
Actor string `json:"actor,omitempty"`
Action string `json:"action,omitempty"`
Subject string `json:"subject,omitempty"`
}
// Filter ist der Query-Container für GET /logs.
type Filter struct {
Sources []Source
Levels []Level
Since time.Time
Until time.Time
Grep string // case-insensitive substring auf Message + Actor/Action/Subject
Limit int // pro-Source-Limit, default 200, max 1000
}
// Reader bündelt alle Quellen-Zugriffe.
type Reader struct {
Audit *audit.Repo
}
func New(auditRepo *audit.Repo) *Reader {
return &Reader{Audit: auditRepo}
}
// Query führt den Filter über alle gewählten Quellen aus und liefert
// eine zusammengeführte, by-Timestamp-DESC sortierte Liste. Pro
// Source-Slot maximal Filter.Limit Entries; das End-Result kann also
// in der Theorie len(Sources)*Limit groß sein — UI cappt selbst auf
// f.Limit beim Render.
func (r *Reader) Query(ctx context.Context, f Filter) ([]Entry, error) {
if f.Limit <= 0 {
f.Limit = 200
}
if f.Limit > 1000 {
f.Limit = 1000
}
sources := f.Sources
if len(sources) == 0 {
sources = AllSources
}
var (
out []Entry
mu sync.Mutex
wg sync.WaitGroup
)
errs := make(chan error, len(sources))
for _, s := range sources {
s := s
wg.Add(1)
go func() {
defer wg.Done()
var entries []Entry
var err error
if s == SourceAudit {
entries, err = r.queryAudit(ctx, f)
} else {
entries, err = readJournal(ctx, s, f)
}
if err != nil {
errs <- fmt.Errorf("%s: %w", s, err)
return
}
// Level + Grep nochmal filtern (journalctl-Output kommt
// roh durch).
entries = filterClient(entries, f)
mu.Lock()
out = append(out, entries...)
mu.Unlock()
}()
}
wg.Wait()
close(errs)
// Nicht-fatal — failed Source = einfach leere Liste für die,
// Operator sieht die Box "kein Eintrag von <source>". Aber wir
// flag'en den letzten Fehler oben für slog.Warn.
var lastErr error
for e := range errs {
lastErr = e
}
sort.Slice(out, func(i, j int) bool {
return out[i].Timestamp.After(out[j].Timestamp)
})
if len(out) > f.Limit {
out = out[:f.Limit]
}
return out, lastErr
}
// queryAudit füllt Entries aus der audit_log-Tabelle.
func (r *Reader) queryAudit(ctx context.Context, f Filter) ([]Entry, error) {
if r.Audit == nil {
return nil, nil
}
// audit.ListRecent hat einen 100-Max-Limit-Guard, was für unsere
// Aggregations-Use-Case zu wenig sein kann. Da wir nur die
// letzten Einträge brauchen reicht das aber als Source.
limit := f.Limit
if limit > 100 {
limit = 100
}
rows, err := r.Audit.ListRecent(ctx, limit)
if err != nil {
return nil, err
}
out := make([]Entry, 0, len(rows))
for _, row := range rows {
// Since/Until-Filter direkt anwenden (DB-Repo kennt das
// nicht — wir holen die letzten N und filtern hier).
if !f.Since.IsZero() && row.CreatedAt.Before(f.Since) {
continue
}
if !f.Until.IsZero() && row.CreatedAt.After(f.Until) {
continue
}
subject := ""
if row.Subject != nil {
subject = *row.Subject
}
msg := row.Action
if subject != "" {
msg = row.Action + ": " + subject
}
out = append(out, Entry{
Timestamp: row.CreatedAt,
Source: SourceAudit,
Level: LevelInfo,
Message: msg,
Actor: row.Actor,
Action: row.Action,
Subject: subject,
})
}
return out, nil
}
// readJournal ruft `journalctl --output=json -u <unit> --since=...`
// auf. Mehrere Units (wg-quick@*) sind erlaubt via mehreren -u-Flags;
// für SourceWireguard wickeln wir das mit `-u wg-quick@*` ab.
func readJournal(ctx context.Context, s Source, f Filter) ([]Entry, error) {
args := []string{"--output=json", "--no-pager"}
unit := string(s) + ".service"
if s == SourceWireguard {
// alle wg-quick-Instanzen — glob-Pattern wird vom systemd
// matcher unterstützt.
args = append(args, "-u", "wg-quick@*.service")
} else {
args = append(args, "-u", unit)
}
if !f.Since.IsZero() {
args = append(args, "--since", f.Since.Format(time.RFC3339))
}
if !f.Until.IsZero() {
args = append(args, "--until", f.Until.Format(time.RFC3339))
}
// journalctl -n N gibt die letzten N. Wir nehmen N=Limit als
// Obergrenze pro Source.
args = append(args, "-n", strconv.Itoa(f.Limit))
cmd := exec.CommandContext(ctx, "journalctl", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
defer func() { _ = cmd.Wait() }()
out := make([]Entry, 0, f.Limit)
sc := bufio.NewScanner(stdout)
sc.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 || line[0] != '{' {
continue
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(line, &raw); err != nil {
continue
}
e := parseJournalEntry(s, raw)
if e.Timestamp.IsZero() {
continue
}
out = append(out, e)
}
if err := sc.Err(); err != nil && !errors.Is(err, bufio.ErrTooLong) {
return out, err
}
return out, nil
}
func parseJournalEntry(s Source, raw map[string]json.RawMessage) Entry {
e := Entry{Source: s}
// __REALTIME_TIMESTAMP ist string von Mikrosekunden seit epoch.
if v, ok := raw["__REALTIME_TIMESTAMP"]; ok {
var ts string
if err := json.Unmarshal(v, &ts); err == nil {
if usec, err := strconv.ParseInt(ts, 10, 64); err == nil {
e.Timestamp = time.UnixMicro(usec)
}
}
}
if v, ok := raw["MESSAGE"]; ok {
var s string
if json.Unmarshal(v, &s) == nil {
e.Message = s
}
}
if v, ok := raw["_HOSTNAME"]; ok {
var s string
if json.Unmarshal(v, &s) == nil {
e.Host = s
}
}
if v, ok := raw["PRIORITY"]; ok {
var s string
if json.Unmarshal(v, &s) == nil {
e.Level = priorityToLevel(s)
}
}
if e.Level == "" {
e.Level = LevelInfo
}
return e
}
func priorityToLevel(p string) Level {
switch p {
case "0", "1", "2", "3":
return LevelError
case "4":
return LevelWarn
case "7":
return LevelDebug
default:
return LevelInfo
}
}
// filterClient wendet Level + Grep clientseitig an (journalctl filtert
// die nicht direkt; das einfacher als --priority + --grep zu jonglieren).
func filterClient(in []Entry, f Filter) []Entry {
hasLevel := len(f.Levels) > 0
levelSet := map[Level]bool{}
for _, l := range f.Levels {
levelSet[l] = true
}
grep := strings.ToLower(strings.TrimSpace(f.Grep))
out := in[:0]
for _, e := range in {
if hasLevel && !levelSet[e.Level] {
continue
}
if grep != "" {
hay := strings.ToLower(e.Message)
if !strings.Contains(hay, grep) &&
!strings.Contains(strings.ToLower(e.Actor), grep) &&
!strings.Contains(strings.ToLower(e.Action), grep) &&
!strings.Contains(strings.ToLower(e.Subject), grep) {
continue
}
}
out = append(out, e)
}
return out
}