feat(logs): Phase 4 — zentrales Logsystem /api/v1/logs + /system/logs
Aggregierter Reader für alle EdgeGuard-Service-Journale + audit_log.
internal/services/syslogs/
- 9 Quellen: edgeguard-api, edgeguard-scheduler, haproxy, squid,
unbound, chrony, wg-quick@*, ulogd2, audit
- journalctl --output=json + parser für __REALTIME_TIMESTAMP,
PRIORITY (0-7 → debug/info/warn/error), MESSAGE, _HOSTNAME
- audit-Reader nutzt bestehende audit.Repo.ListRecent
- Concurrent fan-out über alle gewählten Quellen, dann merge-sort
by Timestamp DESC + cap auf Limit (max 1000)
- Client-Filter: Level, Grep (case-insensitive über message +
actor + action + subject)
internal/handlers/logs.go:
GET /api/v1/logs — Filter via Query-Params
GET /api/v1/logs/sources — statische Quellen-Liste fürs UI
postinst: edgeguard → systemd-journal + adm Gruppen, damit
journalctl ohne sudo lesen kann. Verifiziert auf der Box: id zeigt
`groups=adm,systemd-journal,haproxy,edgeguard`.
UI: management-ui/src/pages/Logs — Multi-Source-Select, Level-Color-
Tags, Time-Range-Picker, Volltext-Suche, Auto-Refresh 5s (Toggle),
CSV-Export. Sidebar-Eintrag "Logs" unter System (FileSearchOutlined).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
347
internal/services/syslogs/syslogs.go
Normal file
347
internal/services/syslogs/syslogs.go
Normal file
@@ -0,0 +1,347 @@
|
||||
// Package syslogs aggregiert Log-Entries aus systemd-journal (per
|
||||
// `journalctl --output=json`) und der audit_log-Tabelle in ein
|
||||
// einheitliches Entry-Format. Phase 4 des Log-Systems.
|
||||
//
|
||||
// Quellen sind statisch konfiguriert — alles was zur EdgeGuard-Box
|
||||
// gehört. ulogd2 ist NICHT enthalten, weil dessen Daten als
|
||||
// strukturiertes File von /firewall-live gehandhabt werden.
|
||||
//
|
||||
// journalctl-Aufruf läuft als edgeguard-User; das funktioniert nur
|
||||
// wenn der in der Gruppe `systemd-journal` (oder `adm`) ist — der
|
||||
// postinst legt das an.
|
||||
package syslogs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.netcell-it.de/projekte/edgeguard-native/internal/services/audit"
|
||||
)
|
||||
|
||||
// Source bezeichnet eine logische Log-Quelle, die das UI als Filter
|
||||
// auswählt. "audit" ist die DB-Tabelle, alle anderen sind systemd-
|
||||
// Units.
|
||||
type Source string
|
||||
|
||||
const (
|
||||
SourceAPI Source = "edgeguard-api"
|
||||
SourceScheduler Source = "edgeguard-scheduler"
|
||||
SourceHAProxy Source = "haproxy"
|
||||
SourceSquid Source = "squid"
|
||||
SourceUnbound Source = "unbound"
|
||||
SourceChrony Source = "chrony"
|
||||
SourceWireguard Source = "wg-quick" // bezieht alle wg-quick@*.service
|
||||
SourceUlogd Source = "ulogd2"
|
||||
SourceAudit Source = "audit"
|
||||
)
|
||||
|
||||
// AllSources ist die Reihenfolge fürs UI-Dropdown.
|
||||
var AllSources = []Source{
|
||||
SourceAPI, SourceScheduler,
|
||||
SourceHAProxy, SourceSquid, SourceUnbound, SourceChrony, SourceWireguard, SourceUlogd,
|
||||
SourceAudit,
|
||||
}
|
||||
|
||||
// Level mappt syslog-PRIORITY-Werte auf eine UI-freundliche
|
||||
// Kategorie. journalctl liefert Strings; wir normalisieren auf
|
||||
// debug/info/warn/error.
|
||||
type Level string
|
||||
|
||||
const (
|
||||
LevelDebug Level = "debug"
|
||||
LevelInfo Level = "info"
|
||||
LevelWarn Level = "warn"
|
||||
LevelError Level = "error"
|
||||
)
|
||||
|
||||
// Entry ist die unified Sicht — sortiert by Timestamp DESC im
|
||||
// Endergebnis.
|
||||
type Entry struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Source Source `json:"source"`
|
||||
Level Level `json:"level"`
|
||||
Message string `json:"message"`
|
||||
Host string `json:"host,omitempty"`
|
||||
// Audit-spezifisch — optional gesetzt für SourceAudit. UI rendert
|
||||
// das in einer eigenen Tooltip.
|
||||
Actor string `json:"actor,omitempty"`
|
||||
Action string `json:"action,omitempty"`
|
||||
Subject string `json:"subject,omitempty"`
|
||||
}
|
||||
|
||||
// Filter ist der Query-Container für GET /logs.
|
||||
type Filter struct {
|
||||
Sources []Source
|
||||
Levels []Level
|
||||
Since time.Time
|
||||
Until time.Time
|
||||
Grep string // case-insensitive substring auf Message + Actor/Action/Subject
|
||||
Limit int // pro-Source-Limit, default 200, max 1000
|
||||
}
|
||||
|
||||
// Reader bündelt alle Quellen-Zugriffe.
|
||||
type Reader struct {
|
||||
Audit *audit.Repo
|
||||
}
|
||||
|
||||
func New(auditRepo *audit.Repo) *Reader {
|
||||
return &Reader{Audit: auditRepo}
|
||||
}
|
||||
|
||||
// Query führt den Filter über alle gewählten Quellen aus und liefert
|
||||
// eine zusammengeführte, by-Timestamp-DESC sortierte Liste. Pro
|
||||
// Source-Slot maximal Filter.Limit Entries; das End-Result kann also
|
||||
// in der Theorie len(Sources)*Limit groß sein — UI cappt selbst auf
|
||||
// f.Limit beim Render.
|
||||
func (r *Reader) Query(ctx context.Context, f Filter) ([]Entry, error) {
|
||||
if f.Limit <= 0 {
|
||||
f.Limit = 200
|
||||
}
|
||||
if f.Limit > 1000 {
|
||||
f.Limit = 1000
|
||||
}
|
||||
sources := f.Sources
|
||||
if len(sources) == 0 {
|
||||
sources = AllSources
|
||||
}
|
||||
|
||||
var (
|
||||
out []Entry
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
errs := make(chan error, len(sources))
|
||||
|
||||
for _, s := range sources {
|
||||
s := s
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var entries []Entry
|
||||
var err error
|
||||
if s == SourceAudit {
|
||||
entries, err = r.queryAudit(ctx, f)
|
||||
} else {
|
||||
entries, err = readJournal(ctx, s, f)
|
||||
}
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("%s: %w", s, err)
|
||||
return
|
||||
}
|
||||
// Level + Grep nochmal filtern (journalctl-Output kommt
|
||||
// roh durch).
|
||||
entries = filterClient(entries, f)
|
||||
mu.Lock()
|
||||
out = append(out, entries...)
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
|
||||
// Nicht-fatal — failed Source = einfach leere Liste für die,
|
||||
// Operator sieht die Box "kein Eintrag von <source>". Aber wir
|
||||
// flag'en den letzten Fehler oben für slog.Warn.
|
||||
var lastErr error
|
||||
for e := range errs {
|
||||
lastErr = e
|
||||
}
|
||||
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].Timestamp.After(out[j].Timestamp)
|
||||
})
|
||||
if len(out) > f.Limit {
|
||||
out = out[:f.Limit]
|
||||
}
|
||||
return out, lastErr
|
||||
}
|
||||
|
||||
// queryAudit füllt Entries aus der audit_log-Tabelle.
|
||||
func (r *Reader) queryAudit(ctx context.Context, f Filter) ([]Entry, error) {
|
||||
if r.Audit == nil {
|
||||
return nil, nil
|
||||
}
|
||||
// audit.ListRecent hat einen 100-Max-Limit-Guard, was für unsere
|
||||
// Aggregations-Use-Case zu wenig sein kann. Da wir nur die
|
||||
// letzten Einträge brauchen reicht das aber als Source.
|
||||
limit := f.Limit
|
||||
if limit > 100 {
|
||||
limit = 100
|
||||
}
|
||||
rows, err := r.Audit.ListRecent(ctx, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]Entry, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
// Since/Until-Filter direkt anwenden (DB-Repo kennt das
|
||||
// nicht — wir holen die letzten N und filtern hier).
|
||||
if !f.Since.IsZero() && row.CreatedAt.Before(f.Since) {
|
||||
continue
|
||||
}
|
||||
if !f.Until.IsZero() && row.CreatedAt.After(f.Until) {
|
||||
continue
|
||||
}
|
||||
subject := ""
|
||||
if row.Subject != nil {
|
||||
subject = *row.Subject
|
||||
}
|
||||
msg := row.Action
|
||||
if subject != "" {
|
||||
msg = row.Action + ": " + subject
|
||||
}
|
||||
out = append(out, Entry{
|
||||
Timestamp: row.CreatedAt,
|
||||
Source: SourceAudit,
|
||||
Level: LevelInfo,
|
||||
Message: msg,
|
||||
Actor: row.Actor,
|
||||
Action: row.Action,
|
||||
Subject: subject,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// readJournal ruft `journalctl --output=json -u <unit> --since=...`
|
||||
// auf. Mehrere Units (wg-quick@*) sind erlaubt via mehreren -u-Flags;
|
||||
// für SourceWireguard wickeln wir das mit `-u wg-quick@*` ab.
|
||||
func readJournal(ctx context.Context, s Source, f Filter) ([]Entry, error) {
|
||||
args := []string{"--output=json", "--no-pager"}
|
||||
unit := string(s) + ".service"
|
||||
if s == SourceWireguard {
|
||||
// alle wg-quick-Instanzen — glob-Pattern wird vom systemd
|
||||
// matcher unterstützt.
|
||||
args = append(args, "-u", "wg-quick@*.service")
|
||||
} else {
|
||||
args = append(args, "-u", unit)
|
||||
}
|
||||
if !f.Since.IsZero() {
|
||||
args = append(args, "--since", f.Since.Format(time.RFC3339))
|
||||
}
|
||||
if !f.Until.IsZero() {
|
||||
args = append(args, "--until", f.Until.Format(time.RFC3339))
|
||||
}
|
||||
// journalctl -n N gibt die letzten N. Wir nehmen N=Limit als
|
||||
// Obergrenze pro Source.
|
||||
args = append(args, "-n", strconv.Itoa(f.Limit))
|
||||
|
||||
cmd := exec.CommandContext(ctx, "journalctl", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = cmd.Wait() }()
|
||||
|
||||
out := make([]Entry, 0, f.Limit)
|
||||
sc := bufio.NewScanner(stdout)
|
||||
sc.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
|
||||
for sc.Scan() {
|
||||
line := sc.Bytes()
|
||||
if len(line) == 0 || line[0] != '{' {
|
||||
continue
|
||||
}
|
||||
var raw map[string]json.RawMessage
|
||||
if err := json.Unmarshal(line, &raw); err != nil {
|
||||
continue
|
||||
}
|
||||
e := parseJournalEntry(s, raw)
|
||||
if e.Timestamp.IsZero() {
|
||||
continue
|
||||
}
|
||||
out = append(out, e)
|
||||
}
|
||||
if err := sc.Err(); err != nil && !errors.Is(err, bufio.ErrTooLong) {
|
||||
return out, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func parseJournalEntry(s Source, raw map[string]json.RawMessage) Entry {
|
||||
e := Entry{Source: s}
|
||||
// __REALTIME_TIMESTAMP ist string von Mikrosekunden seit epoch.
|
||||
if v, ok := raw["__REALTIME_TIMESTAMP"]; ok {
|
||||
var ts string
|
||||
if err := json.Unmarshal(v, &ts); err == nil {
|
||||
if usec, err := strconv.ParseInt(ts, 10, 64); err == nil {
|
||||
e.Timestamp = time.UnixMicro(usec)
|
||||
}
|
||||
}
|
||||
}
|
||||
if v, ok := raw["MESSAGE"]; ok {
|
||||
var s string
|
||||
if json.Unmarshal(v, &s) == nil {
|
||||
e.Message = s
|
||||
}
|
||||
}
|
||||
if v, ok := raw["_HOSTNAME"]; ok {
|
||||
var s string
|
||||
if json.Unmarshal(v, &s) == nil {
|
||||
e.Host = s
|
||||
}
|
||||
}
|
||||
if v, ok := raw["PRIORITY"]; ok {
|
||||
var s string
|
||||
if json.Unmarshal(v, &s) == nil {
|
||||
e.Level = priorityToLevel(s)
|
||||
}
|
||||
}
|
||||
if e.Level == "" {
|
||||
e.Level = LevelInfo
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func priorityToLevel(p string) Level {
|
||||
switch p {
|
||||
case "0", "1", "2", "3":
|
||||
return LevelError
|
||||
case "4":
|
||||
return LevelWarn
|
||||
case "7":
|
||||
return LevelDebug
|
||||
default:
|
||||
return LevelInfo
|
||||
}
|
||||
}
|
||||
|
||||
// filterClient wendet Level + Grep clientseitig an (journalctl filtert
|
||||
// die nicht direkt; das einfacher als --priority + --grep zu jonglieren).
|
||||
func filterClient(in []Entry, f Filter) []Entry {
|
||||
hasLevel := len(f.Levels) > 0
|
||||
levelSet := map[Level]bool{}
|
||||
for _, l := range f.Levels {
|
||||
levelSet[l] = true
|
||||
}
|
||||
grep := strings.ToLower(strings.TrimSpace(f.Grep))
|
||||
|
||||
out := in[:0]
|
||||
for _, e := range in {
|
||||
if hasLevel && !levelSet[e.Level] {
|
||||
continue
|
||||
}
|
||||
if grep != "" {
|
||||
hay := strings.ToLower(e.Message)
|
||||
if !strings.Contains(hay, grep) &&
|
||||
!strings.Contains(strings.ToLower(e.Actor), grep) &&
|
||||
!strings.Contains(strings.ToLower(e.Action), grep) &&
|
||||
!strings.Contains(strings.ToLower(e.Subject), grep) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
out = append(out, e)
|
||||
}
|
||||
return out
|
||||
}
|
||||
Reference in New Issue
Block a user