Aggregierter Reader für alle EdgeGuard-Service-Journale + audit_log.
internal/services/syslogs/
- 9 Quellen: edgeguard-api, edgeguard-scheduler, haproxy, squid,
unbound, chrony, wg-quick@*, ulogd2, audit
- journalctl --output=json + parser für __REALTIME_TIMESTAMP,
PRIORITY (0-7 → debug/info/warn/error), MESSAGE, _HOSTNAME
- audit-Reader nutzt bestehende audit.Repo.ListRecent
- Concurrent fan-out über alle gewählten Quellen, dann merge-sort
by Timestamp DESC + cap auf Limit (max 1000)
- Client-Filter: Level, Grep (case-insensitive über message +
actor + action + subject)
internal/handlers/logs.go:
GET /api/v1/logs — Filter via Query-Params
GET /api/v1/logs/sources — statische Quellen-Liste fürs UI
postinst: edgeguard → systemd-journal + adm Gruppen, damit
journalctl ohne sudo lesen kann. Verifiziert auf der Box: id zeigt
`groups=adm,systemd-journal,haproxy,edgeguard`.
UI: management-ui/src/pages/Logs — Multi-Source-Select, Level-Color-
Tags, Time-Range-Picker, Volltext-Suche, Auto-Refresh 5s (Toggle),
CSV-Export. Sidebar-Eintrag "Logs" unter System (FileSearchOutlined).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
348 lines
8.8 KiB
Go
348 lines
8.8 KiB
Go
// Package syslogs aggregiert Log-Entries aus systemd-journal (per
|
|
// `journalctl --output=json`) und der audit_log-Tabelle in ein
|
|
// einheitliches Entry-Format. Phase 4 des Log-Systems.
|
|
//
|
|
// Quellen sind statisch konfiguriert — alles was zur EdgeGuard-Box
|
|
// gehört. ulogd2 ist NICHT enthalten, weil dessen Daten als
|
|
// strukturiertes File von /firewall-live gehandhabt werden.
|
|
//
|
|
// journalctl-Aufruf läuft als edgeguard-User; das funktioniert nur
|
|
// wenn der in der Gruppe `systemd-journal` (oder `adm`) ist — der
|
|
// postinst legt das an.
|
|
package syslogs
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os/exec"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.netcell-it.de/projekte/edgeguard-native/internal/services/audit"
|
|
)
|
|
|
|
// Source bezeichnet eine logische Log-Quelle, die das UI als Filter
|
|
// auswählt. "audit" ist die DB-Tabelle, alle anderen sind systemd-
|
|
// Units.
|
|
type Source string
|
|
|
|
const (
|
|
SourceAPI Source = "edgeguard-api"
|
|
SourceScheduler Source = "edgeguard-scheduler"
|
|
SourceHAProxy Source = "haproxy"
|
|
SourceSquid Source = "squid"
|
|
SourceUnbound Source = "unbound"
|
|
SourceChrony Source = "chrony"
|
|
SourceWireguard Source = "wg-quick" // bezieht alle wg-quick@*.service
|
|
SourceUlogd Source = "ulogd2"
|
|
SourceAudit Source = "audit"
|
|
)
|
|
|
|
// AllSources ist die Reihenfolge fürs UI-Dropdown.
|
|
var AllSources = []Source{
|
|
SourceAPI, SourceScheduler,
|
|
SourceHAProxy, SourceSquid, SourceUnbound, SourceChrony, SourceWireguard, SourceUlogd,
|
|
SourceAudit,
|
|
}
|
|
|
|
// Level mappt syslog-PRIORITY-Werte auf eine UI-freundliche
|
|
// Kategorie. journalctl liefert Strings; wir normalisieren auf
|
|
// debug/info/warn/error.
|
|
type Level string
|
|
|
|
const (
|
|
LevelDebug Level = "debug"
|
|
LevelInfo Level = "info"
|
|
LevelWarn Level = "warn"
|
|
LevelError Level = "error"
|
|
)
|
|
|
|
// Entry ist die unified Sicht — sortiert by Timestamp DESC im
|
|
// Endergebnis.
|
|
type Entry struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Source Source `json:"source"`
|
|
Level Level `json:"level"`
|
|
Message string `json:"message"`
|
|
Host string `json:"host,omitempty"`
|
|
// Audit-spezifisch — optional gesetzt für SourceAudit. UI rendert
|
|
// das in einer eigenen Tooltip.
|
|
Actor string `json:"actor,omitempty"`
|
|
Action string `json:"action,omitempty"`
|
|
Subject string `json:"subject,omitempty"`
|
|
}
|
|
|
|
// Filter ist der Query-Container für GET /logs.
|
|
type Filter struct {
|
|
Sources []Source
|
|
Levels []Level
|
|
Since time.Time
|
|
Until time.Time
|
|
Grep string // case-insensitive substring auf Message + Actor/Action/Subject
|
|
Limit int // pro-Source-Limit, default 200, max 1000
|
|
}
|
|
|
|
// Reader bündelt alle Quellen-Zugriffe.
|
|
type Reader struct {
|
|
Audit *audit.Repo
|
|
}
|
|
|
|
func New(auditRepo *audit.Repo) *Reader {
|
|
return &Reader{Audit: auditRepo}
|
|
}
|
|
|
|
// Query führt den Filter über alle gewählten Quellen aus und liefert
|
|
// eine zusammengeführte, by-Timestamp-DESC sortierte Liste. Pro
|
|
// Source-Slot maximal Filter.Limit Entries; das End-Result kann also
|
|
// in der Theorie len(Sources)*Limit groß sein — UI cappt selbst auf
|
|
// f.Limit beim Render.
|
|
func (r *Reader) Query(ctx context.Context, f Filter) ([]Entry, error) {
|
|
if f.Limit <= 0 {
|
|
f.Limit = 200
|
|
}
|
|
if f.Limit > 1000 {
|
|
f.Limit = 1000
|
|
}
|
|
sources := f.Sources
|
|
if len(sources) == 0 {
|
|
sources = AllSources
|
|
}
|
|
|
|
var (
|
|
out []Entry
|
|
mu sync.Mutex
|
|
wg sync.WaitGroup
|
|
)
|
|
errs := make(chan error, len(sources))
|
|
|
|
for _, s := range sources {
|
|
s := s
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
var entries []Entry
|
|
var err error
|
|
if s == SourceAudit {
|
|
entries, err = r.queryAudit(ctx, f)
|
|
} else {
|
|
entries, err = readJournal(ctx, s, f)
|
|
}
|
|
if err != nil {
|
|
errs <- fmt.Errorf("%s: %w", s, err)
|
|
return
|
|
}
|
|
// Level + Grep nochmal filtern (journalctl-Output kommt
|
|
// roh durch).
|
|
entries = filterClient(entries, f)
|
|
mu.Lock()
|
|
out = append(out, entries...)
|
|
mu.Unlock()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
close(errs)
|
|
|
|
// Nicht-fatal — failed Source = einfach leere Liste für die,
|
|
// Operator sieht die Box "kein Eintrag von <source>". Aber wir
|
|
// flag'en den letzten Fehler oben für slog.Warn.
|
|
var lastErr error
|
|
for e := range errs {
|
|
lastErr = e
|
|
}
|
|
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Timestamp.After(out[j].Timestamp)
|
|
})
|
|
if len(out) > f.Limit {
|
|
out = out[:f.Limit]
|
|
}
|
|
return out, lastErr
|
|
}
|
|
|
|
// queryAudit füllt Entries aus der audit_log-Tabelle.
|
|
func (r *Reader) queryAudit(ctx context.Context, f Filter) ([]Entry, error) {
|
|
if r.Audit == nil {
|
|
return nil, nil
|
|
}
|
|
// audit.ListRecent hat einen 100-Max-Limit-Guard, was für unsere
|
|
// Aggregations-Use-Case zu wenig sein kann. Da wir nur die
|
|
// letzten Einträge brauchen reicht das aber als Source.
|
|
limit := f.Limit
|
|
if limit > 100 {
|
|
limit = 100
|
|
}
|
|
rows, err := r.Audit.ListRecent(ctx, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]Entry, 0, len(rows))
|
|
for _, row := range rows {
|
|
// Since/Until-Filter direkt anwenden (DB-Repo kennt das
|
|
// nicht — wir holen die letzten N und filtern hier).
|
|
if !f.Since.IsZero() && row.CreatedAt.Before(f.Since) {
|
|
continue
|
|
}
|
|
if !f.Until.IsZero() && row.CreatedAt.After(f.Until) {
|
|
continue
|
|
}
|
|
subject := ""
|
|
if row.Subject != nil {
|
|
subject = *row.Subject
|
|
}
|
|
msg := row.Action
|
|
if subject != "" {
|
|
msg = row.Action + ": " + subject
|
|
}
|
|
out = append(out, Entry{
|
|
Timestamp: row.CreatedAt,
|
|
Source: SourceAudit,
|
|
Level: LevelInfo,
|
|
Message: msg,
|
|
Actor: row.Actor,
|
|
Action: row.Action,
|
|
Subject: subject,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// readJournal ruft `journalctl --output=json -u <unit> --since=...`
|
|
// auf. Mehrere Units (wg-quick@*) sind erlaubt via mehreren -u-Flags;
|
|
// für SourceWireguard wickeln wir das mit `-u wg-quick@*` ab.
|
|
func readJournal(ctx context.Context, s Source, f Filter) ([]Entry, error) {
|
|
args := []string{"--output=json", "--no-pager"}
|
|
unit := string(s) + ".service"
|
|
if s == SourceWireguard {
|
|
// alle wg-quick-Instanzen — glob-Pattern wird vom systemd
|
|
// matcher unterstützt.
|
|
args = append(args, "-u", "wg-quick@*.service")
|
|
} else {
|
|
args = append(args, "-u", unit)
|
|
}
|
|
if !f.Since.IsZero() {
|
|
args = append(args, "--since", f.Since.Format(time.RFC3339))
|
|
}
|
|
if !f.Until.IsZero() {
|
|
args = append(args, "--until", f.Until.Format(time.RFC3339))
|
|
}
|
|
// journalctl -n N gibt die letzten N. Wir nehmen N=Limit als
|
|
// Obergrenze pro Source.
|
|
args = append(args, "-n", strconv.Itoa(f.Limit))
|
|
|
|
cmd := exec.CommandContext(ctx, "journalctl", args...)
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = cmd.Wait() }()
|
|
|
|
out := make([]Entry, 0, f.Limit)
|
|
sc := bufio.NewScanner(stdout)
|
|
sc.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
|
|
for sc.Scan() {
|
|
line := sc.Bytes()
|
|
if len(line) == 0 || line[0] != '{' {
|
|
continue
|
|
}
|
|
var raw map[string]json.RawMessage
|
|
if err := json.Unmarshal(line, &raw); err != nil {
|
|
continue
|
|
}
|
|
e := parseJournalEntry(s, raw)
|
|
if e.Timestamp.IsZero() {
|
|
continue
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
if err := sc.Err(); err != nil && !errors.Is(err, bufio.ErrTooLong) {
|
|
return out, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func parseJournalEntry(s Source, raw map[string]json.RawMessage) Entry {
|
|
e := Entry{Source: s}
|
|
// __REALTIME_TIMESTAMP ist string von Mikrosekunden seit epoch.
|
|
if v, ok := raw["__REALTIME_TIMESTAMP"]; ok {
|
|
var ts string
|
|
if err := json.Unmarshal(v, &ts); err == nil {
|
|
if usec, err := strconv.ParseInt(ts, 10, 64); err == nil {
|
|
e.Timestamp = time.UnixMicro(usec)
|
|
}
|
|
}
|
|
}
|
|
if v, ok := raw["MESSAGE"]; ok {
|
|
var s string
|
|
if json.Unmarshal(v, &s) == nil {
|
|
e.Message = s
|
|
}
|
|
}
|
|
if v, ok := raw["_HOSTNAME"]; ok {
|
|
var s string
|
|
if json.Unmarshal(v, &s) == nil {
|
|
e.Host = s
|
|
}
|
|
}
|
|
if v, ok := raw["PRIORITY"]; ok {
|
|
var s string
|
|
if json.Unmarshal(v, &s) == nil {
|
|
e.Level = priorityToLevel(s)
|
|
}
|
|
}
|
|
if e.Level == "" {
|
|
e.Level = LevelInfo
|
|
}
|
|
return e
|
|
}
|
|
|
|
func priorityToLevel(p string) Level {
|
|
switch p {
|
|
case "0", "1", "2", "3":
|
|
return LevelError
|
|
case "4":
|
|
return LevelWarn
|
|
case "7":
|
|
return LevelDebug
|
|
default:
|
|
return LevelInfo
|
|
}
|
|
}
|
|
|
|
// filterClient wendet Level + Grep clientseitig an (journalctl filtert
|
|
// die nicht direkt; das einfacher als --priority + --grep zu jonglieren).
|
|
func filterClient(in []Entry, f Filter) []Entry {
|
|
hasLevel := len(f.Levels) > 0
|
|
levelSet := map[Level]bool{}
|
|
for _, l := range f.Levels {
|
|
levelSet[l] = true
|
|
}
|
|
grep := strings.ToLower(strings.TrimSpace(f.Grep))
|
|
|
|
out := in[:0]
|
|
for _, e := range in {
|
|
if hasLevel && !levelSet[e.Level] {
|
|
continue
|
|
}
|
|
if grep != "" {
|
|
hay := strings.ToLower(e.Message)
|
|
if !strings.Contains(hay, grep) &&
|
|
!strings.Contains(strings.ToLower(e.Actor), grep) &&
|
|
!strings.Contains(strings.ToLower(e.Action), grep) &&
|
|
!strings.Contains(strings.ToLower(e.Subject), grep) {
|
|
continue
|
|
}
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
return out
|
|
}
|