diff --git a/VERSION b/VERSION index c3ccccf..4bf1778 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.59 +1.0.60 diff --git a/cmd/edgeguard-api/main.go b/cmd/edgeguard-api/main.go index bfe83c2..53f5ec7 100644 --- a/cmd/edgeguard-api/main.go +++ b/cmd/edgeguard-api/main.go @@ -36,6 +36,7 @@ import ( dnssvc "git.netcell-it.de/projekte/edgeguard-native/internal/services/dns" "git.netcell-it.de/projekte/edgeguard-native/internal/services/domains" "git.netcell-it.de/projekte/edgeguard-native/internal/services/firewall" + "git.netcell-it.de/projekte/edgeguard-native/internal/services/firewalllog" "git.netcell-it.de/projekte/edgeguard-native/internal/services/forwardproxy" "git.netcell-it.de/projekte/edgeguard-native/internal/services/ipaddresses" "git.netcell-it.de/projekte/edgeguard-native/internal/services/networkifs" @@ -48,7 +49,7 @@ import ( wgsvc "git.netcell-it.de/projekte/edgeguard-native/internal/services/wireguard" ) -var version = "1.0.59" +var version = "1.0.60" func main() { addr := os.Getenv("EDGEGUARD_API_ADDR") @@ -183,6 +184,12 @@ func main() { handlers.NewClusterHandler(clusterStore, nodeID).Register(authed) handlers.NewAuditHandler(auditRepo).Register(authed) handlers.NewHAProxyStatsHandler().Register(authed) + + // Firewall-Log (Phase 2): Tailer für /var/log/edgeguard/ + // firewall.jsonl + HTTP-Tail + WebSocket-Live-Stream. + fwLogTailer := firewalllog.NewTailer(firewalllog.DefaultLogPath, 1000) + handlers.StartFirewallLogTailer(context.Background(), fwLogTailer) + handlers.NewFirewallLogHandler(fwLogTailer, firewalllog.DefaultLogPath).Register(authed) handlers.NewTLSCertsHandler(tlsRepo, auditRepo, nodeID, acmeService).Register(authed) // Firewall reload: nach jeder Mutation den Renderer neu fahren // (writes ruleset.nft + sudo nft -f). Errors loggen, nicht failen. diff --git a/cmd/edgeguard-ctl/main.go b/cmd/edgeguard-ctl/main.go index e4b6779..ff19009 100644 --- a/cmd/edgeguard-ctl/main.go +++ b/cmd/edgeguard-ctl/main.go @@ -9,7 +9,7 @@ import ( "os" ) -var version = "1.0.59" +var version = "1.0.60" const usage = `edgeguard-ctl — EdgeGuard CLI diff --git a/cmd/edgeguard-scheduler/main.go b/cmd/edgeguard-scheduler/main.go index 8e3247c..5ed7088 100644 --- a/cmd/edgeguard-scheduler/main.go +++ b/cmd/edgeguard-scheduler/main.go @@ -24,7 +24,7 @@ import ( "git.netcell-it.de/projekte/edgeguard-native/internal/services/tlscerts" ) -var version = "1.0.59" +var version = "1.0.60" const ( // renewTickInterval — how often we re-evaluate expiring certs. diff --git a/go.mod b/go.mod index aa11ead..2ba4c90 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,10 @@ module git.netcell-it.de/projekte/edgeguard-native go 1.26.0 require ( + github.com/fsnotify/fsnotify v1.10.1 github.com/gin-gonic/gin v1.10.0 github.com/go-acme/lego/v4 v4.35.2 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.9.2 github.com/pressly/goose/v3 v3.27.1 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e diff --git a/go.sum b/go.sum index 907a1d0..4bd8ede 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/fsnotify/fsnotify v1.10.1 h1:b0/UzAf9yR5rhf3RPm9gf3ehBPpf0oZKIjtpKrx59Ho= +github.com/fsnotify/fsnotify v1.10.1/go.mod h1:TLheqan6HD6GBK6PrDWyDPBaEV8LspOxvPSjC+bVfgo= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -40,6 +42,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/handlers/firewall_log.go b/internal/handlers/firewall_log.go new file mode 100644 index 0000000..b76f5c2 --- /dev/null +++ b/internal/handlers/firewall_log.go @@ -0,0 +1,184 @@ +package handlers + +import ( + "context" + "errors" + "log/slog" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + + "git.netcell-it.de/projekte/edgeguard-native/internal/handlers/response" + "git.netcell-it.de/projekte/edgeguard-native/internal/services/firewalllog" +) + +// FirewallLogHandler exposes: +// +// GET /api/v1/firewall/log?since=&until=&rule_id=&src=&dst=&proto=&action=&limit= +// WS /api/v1/firewall/log/live +// +// since/until: RFC3339 timestamp. limit defaults to 200, max 1000. +// Die HTTP-Variante liest direkt das jsonl-File (Historie der letzten +// 14 Tage), die WS-Variante streamt aus dem In-Memory-Ringbuffer + +// jeden neuen Event ab dem Connect. +type FirewallLogHandler struct { + Tailer *firewalllog.Tailer + LogPath string +} + +func NewFirewallLogHandler(tailer *firewalllog.Tailer, path string) *FirewallLogHandler { + if path == "" { + path = firewalllog.DefaultLogPath + } + return &FirewallLogHandler{Tailer: tailer, LogPath: path} +} + +func (h *FirewallLogHandler) Register(rg *gin.RouterGroup) { + g := rg.Group("/firewall") + g.GET("/log", h.Tail) + g.GET("/log/live", h.Live) +} + +// Tail liest die letzten matching Events aus dem rotated jsonl-File. +// Die HTTP-Antwort ist eine Liste — keine Pagination, der Ring-Cap +// (limit, max 1000) ist die Grenze. UI ruft das beim Page-Open auf +// und holt danach Live-Events über den WS. +func (h *FirewallLogHandler) Tail(c *gin.Context) { + f := parseFilter(c) + entries, err := firewalllog.ReadTail(h.LogPath, f) + if err != nil { + response.Internal(c, err) + return + } + response.OK(c, gin.H{"entries": entries, "count": len(entries)}) +} + +// upgrader: kein CheckOrigin (Same-Origin durch HAProxy bzw. dev-vite- +// proxy; ohne Origin-Check geht's außerdem in der Dev-Konsole auf). +var fwLogUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 32 * 1024, + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// Live upgraded auf WebSocket, schickt einen Initial-Snapshot aus dem +// Ringbuffer (gefiltert nach Query-Params) und dann jeden neuen Event +// als JSON-Zeile. +func (h *FirewallLogHandler) Live(c *gin.Context) { + if h.Tailer == nil { + response.Err(c, http.StatusServiceUnavailable, + errors.New("firewall log tailer not running")) + return + } + conn, err := fwLogUpgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + // Upgrade-Failures sind Browser-side; nichts loggen + return + } + defer conn.Close() + + f := parseFilter(c) + + // 1) Snapshot aus dem Ring — UI sieht sofort die letzten Events. + for _, e := range h.Tailer.Snapshot(f) { + if err := conn.WriteJSON(e); err != nil { + return + } + } + + // 2) Subscribe + stream forward. + ch, unsub := h.Tailer.Subscribe() + defer unsub() + + // Read-loop nebenher um Close-Frames + Ping-Pong sauber zu + // behandeln. Wenn der Client zumacht, returnen wir aus der Write- + // Schleife. + ctx, cancel := context.WithCancel(c.Request.Context()) + defer cancel() + go func() { + defer cancel() + for { + if _, _, err := conn.NextReader(); err != nil { + return + } + } + }() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + _ = conn.SetWriteDeadline(time.Now().Add(60 * time.Second)) + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + return + } + if !f.Matches(&e) { + continue + } + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := conn.WriteJSON(e); err != nil { + return + } + case <-ticker.C: + // Ping um stale-connection-detection durch HAProxy auf + // timeout client/tunnel zu vermeiden. + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := conn.WriteControl(websocket.PingMessage, nil, + time.Now().Add(5*time.Second)); err != nil { + return + } + } + } +} + +func parseFilter(c *gin.Context) firewalllog.Filter { + f := firewalllog.Filter{ + RuleID: c.Query("rule_id"), + SrcIP: c.Query("src"), + DstIP: c.Query("dst"), + Proto: c.Query("proto"), + Action: c.Query("action"), + } + if s := c.Query("since"); s != "" { + if t, err := time.Parse(time.RFC3339, s); err == nil { + f.Since = t + } + } + if s := c.Query("until"); s != "" { + if t, err := time.Parse(time.RFC3339, s); err == nil { + f.Until = t + } + } + if s := c.Query("limit"); s != "" { + if n, err := strconv.Atoi(s); err == nil { + f.Limit = n + } + } + if f.Limit <= 0 { + f.Limit = 200 + } + if f.Limit > 1000 { + f.Limit = 1000 + } + return f +} + +// startFirewallLogTailer is a small helper main.go invokes — wird +// hier definiert damit der Lifecycle (ctx, slog.Warn) eine Heimat hat. +func StartFirewallLogTailer(ctx context.Context, t *firewalllog.Tailer) { + if t == nil { + return + } + go func() { + if err := t.Start(ctx); err != nil && err != context.Canceled { + slog.Warn("firewalllog tailer exited", "error", err) + } + }() +} diff --git a/internal/services/firewalllog/reader.go b/internal/services/firewalllog/reader.go new file mode 100644 index 0000000..90430b0 --- /dev/null +++ b/internal/services/firewalllog/reader.go @@ -0,0 +1,223 @@ +// Package firewalllog liest die von ulogd2 nach /var/log/edgeguard/ +// firewall.jsonl geschriebenen NFLOG-Events. Phase 2 des Log-Systems: +// +// - Reader: parst JSONL-Zeilen, filtert by since/action/src/dst/... +// - Tailer: live-tailt das File via fsnotify, hält einen Ring-Buffer +// der letzten N Events und broadcastet neue Events an WebSocket- +// Clients. +// +// Schema-Felder kommen direkt aus ulogd2 + ulogd2-json — siehe +// /etc/ulogd.conf in postinst (NFLOG + BASE + IFINDEX + IP2STR + +// HWHDR → JSON-output). Beispiel-Line: +// +// {"timestamp":"2026-05-12T20:43:45+0200","oob.prefix":"edgeguard:42 ", +// "src_ip":"10.0.5.1","dest_ip":"10.0.20.23","ip.protocol":6, +// "src_port":54321,"dest_port":443,"oob.in":"ens18","action":"blocked",...} +package firewalllog + +import ( + "bufio" + "encoding/json" + "errors" + "io" + "os" + "strconv" + "strings" + "time" +) + +// DefaultLogPath ist der Pfad, in den ulogd2 (via postinst-gerendertem +// /etc/ulogd.conf) die JSON-Lines schreibt. +const DefaultLogPath = "/var/log/edgeguard/firewall.jsonl" + +// Entry ist die typisierte Sicht auf eine ulogd-JSON-Zeile. Nur die +// Felder die das UI braucht — den Rest können Operatoren mit `jq` auf +// dem File selber rauspulen. +type Entry struct { + Timestamp time.Time `json:"timestamp"` + RuleID string `json:"rule_id,omitempty"` // aus oob.prefix "edgeguard:42 " → "42" + Prefix string `json:"prefix,omitempty"` // raw oob.prefix (zum debuggen) + SrcIP string `json:"src_ip,omitempty"` + DstIP string `json:"dst_ip,omitempty"` + SrcPort int `json:"src_port,omitempty"` + DstPort int `json:"dst_port,omitempty"` + Proto string `json:"proto,omitempty"` // "tcp"/"udp"/"icmp"/"icmpv6" + IfIn string `json:"if_in,omitempty"` + IfOut string `json:"if_out,omitempty"` + PktLen int `json:"pkt_len,omitempty"` + Action string `json:"action,omitempty"` // ulogd setzt das pauschal auf "blocked" — wir leiten besser aus Prefix ab +} + +// Filter ist der Query-Param-Container für GET /firewall/log. +// Empty-String-Felder = kein Filter. +type Filter struct { + Since time.Time // events >= since + Until time.Time // events <= until (Zero = jetzt) + RuleID string + SrcIP string + DstIP string + Proto string // "tcp"/"udp"/"icmp"/... + Action string + Limit int // max entries (default 200) +} + +// Matches prüft ob ein Entry dem Filter entspricht. +func (f *Filter) Matches(e *Entry) bool { + if !f.Since.IsZero() && e.Timestamp.Before(f.Since) { + return false + } + if !f.Until.IsZero() && e.Timestamp.After(f.Until) { + return false + } + if f.RuleID != "" && e.RuleID != f.RuleID { + return false + } + if f.SrcIP != "" && e.SrcIP != f.SrcIP { + return false + } + if f.DstIP != "" && e.DstIP != f.DstIP { + return false + } + if f.Proto != "" && !strings.EqualFold(e.Proto, f.Proto) { + return false + } + if f.Action != "" && !strings.EqualFold(e.Action, f.Action) { + return false + } + return true +} + +// ReadTail liest die letzten Events aus path die zum Filter passen. +// Bewusst forward-read mit limit-Ringbuffer — das jsonl-File ist +// rotated (logrotate, 14d), also nie absurd groß. Bei >10 MB wäre +// reverse-read sinnvoller; das machen wir wenn's relevant wird. +func ReadTail(path string, f Filter) ([]Entry, error) { + if f.Limit <= 0 { + f.Limit = 200 + } + file, err := os.Open(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return []Entry{}, nil // noch keine Events — leeres Result, kein Fehler + } + return nil, err + } + defer file.Close() + + // Ring-Buffer für die letzten f.Limit matching entries. + buf := make([]Entry, 0, f.Limit) + sc := bufio.NewScanner(file) + // ulogd-Lines können breit werden (Mac-Header + alle Optional-Fields). + // Default-Buffer 64 KB reicht knapp; wir geben 1 MB. + sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 || line[0] != '{' { + continue + } + e, err := parseLine(line) + if err != nil { + continue // fehlerhafte Zeile überspringen, nicht abbrechen + } + if !f.Matches(&e) { + continue + } + if len(buf) >= f.Limit { + // FIFO-rotate + buf = append(buf[1:], e) + } else { + buf = append(buf, e) + } + } + return buf, sc.Err() +} + +// rawEntry mirrors the JSON-shape that ulogd2-json emits. Wir +// unmarshallen NICHT direkt in Entry weil ulogd Felder mit Punkten im +// Namen liefert ("ip.protocol", "oob.in", "oob.prefix") — Go-Tags +// können das, aber wir wollen außerdem Type-Coercion (Strings die +// numeric sind, missing fields, etc.). +type rawEntry struct { + Timestamp string `json:"timestamp"` + OobPrefix string `json:"oob.prefix"` + OobIn string `json:"oob.in"` + OobOut string `json:"oob.out"` + SrcIP string `json:"src_ip"` + DstIP string `json:"dest_ip"` + SrcPort int `json:"src_port"` + DstPort int `json:"dest_port"` + IPProto int `json:"ip.protocol"` + PktLen int `json:"raw.pktlen"` + Action string `json:"action"` + ICMPType *int `json:"icmp.type"` + ICMP6Type *int `json:"icmpv6.type"` +} + +func parseLine(b []byte) (Entry, error) { + var r rawEntry + if err := json.Unmarshal(b, &r); err != nil { + return Entry{}, err + } + e := Entry{ + Prefix: strings.TrimSpace(r.OobPrefix), + SrcIP: r.SrcIP, + DstIP: r.DstIP, + SrcPort: r.SrcPort, + DstPort: r.DstPort, + IfIn: r.OobIn, + IfOut: r.OobOut, + PktLen: r.PktLen, + Action: r.Action, + } + // Timestamp parsen — ulogd liefert RFC3339 mit Zone-Offset. + if r.Timestamp != "" { + if t, err := time.Parse(time.RFC3339Nano, r.Timestamp); err == nil { + e.Timestamp = t + } else if t2, err := time.Parse("2006-01-02T15:04:05-0700", r.Timestamp); err == nil { + e.Timestamp = t2 + } + } + // RuleID aus prefix extrahieren: "edgeguard:42" → "42". + if strings.HasPrefix(e.Prefix, "edgeguard:") { + e.RuleID = strings.TrimSpace(strings.TrimPrefix(e.Prefix, "edgeguard:")) + } + // Proto-Mapping aus IP-Protocol-Number. + switch r.IPProto { + case 1: + e.Proto = "icmp" + case 6: + e.Proto = "tcp" + case 17: + e.Proto = "udp" + case 58: + e.Proto = "icmpv6" + default: + if r.IPProto != 0 { + e.Proto = strconv.Itoa(r.IPProto) + } + } + return e, nil +} + +// parseReader ist exportiert für Tests + den Tailer, der zeilenweise +// parsed statt eine ganze Datei zu lesen. +func ParseReader(rd io.Reader) ([]Entry, error) { + sc := bufio.NewScanner(rd) + sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) + out := []Entry{} + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 || line[0] != '{' { + continue + } + e, err := parseLine(line) + if err != nil { + continue + } + out = append(out, e) + } + return out, sc.Err() +} + +// ParseLine ist exportiert für den Tailer (eine Zeile vom inotify-Event). +func ParseLine(b []byte) (Entry, error) { return parseLine(b) } diff --git a/internal/services/firewalllog/tailer.go b/internal/services/firewalllog/tailer.go new file mode 100644 index 0000000..f0d5ca9 --- /dev/null +++ b/internal/services/firewalllog/tailer.go @@ -0,0 +1,259 @@ +package firewalllog + +import ( + "bufio" + "context" + "errors" + "io" + "log/slog" + "os" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +// Tailer hält den In-Memory-Ringbuffer der letzten N Events + verteilt +// jeden neuen Event an alle aktuell subscribed WebSocket-Clients. +// +// Lebt single-instance im edgeguard-api-Prozess (gestartet in main.go). +// Robust gegen logrotate `copytruncate`: wir erkennen wenn die Datei +// kürzer wird als unser Read-Offset und seek'en auf 0. +type Tailer struct { + path string + ringSize int + + mu sync.RWMutex + ring []Entry + subs map[chan Entry]struct{} // fan-out targets + offset int64 // last read offset im file + + // Wakeup-Signal aus fsnotify-Loop. Buffered=1 damit der fs-Watch + // nicht blockt wenn der Reader gerade durch ist. + wake chan struct{} +} + +// NewTailer baut einen Tailer für path; ringSize ist der Memory- +// Ringbuffer (typisch 1000 — bei 200 Bytes/Event = 200 KB, harmlos). +func NewTailer(path string, ringSize int) *Tailer { + if ringSize <= 0 { + ringSize = 1000 + } + return &Tailer{ + path: path, + ringSize: ringSize, + ring: make([]Entry, 0, ringSize), + subs: map[chan Entry]struct{}{}, + wake: make(chan struct{}, 1), + } +} + +// Start läuft bis ctx.Done(). Lädt zuerst die existierenden Zeilen, +// dann fsnotify-watch + auf jeden Write-Event die neuen Bytes lesen. +// Bei Truncate (logrotate copytruncate) → offset auf 0 zurück. +func (t *Tailer) Start(ctx context.Context) error { + if err := t.bootstrap(); err != nil { + // Kein Fehler wenn Datei noch nicht existiert — erste + // firewall-rule-with-log triggert ulogd, das das File anlegt. + if !errors.Is(err, os.ErrNotExist) { + return err + } + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer w.Close() + + // Watch das Verzeichnis statt der Datei — wenn ulogd das File + // erstmals anlegt (oder rotate-rename), bekommen wir Create-Events. + dir := pathDir(t.path) + if err := w.Add(dir); err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev, ok := <-w.Events: + if !ok { + return nil + } + if ev.Name != t.path { + continue + } + t.drainFile() + case err, ok := <-w.Errors: + if !ok { + return nil + } + slog.Warn("firewalllog tailer: fsnotify error", "error", err) + case <-time.After(2 * time.Second): + // Safety-Net: poll-fallback alle 2s falls fsnotify einen + // Event verschluckt (selten, aber inotify hat Race- + // Conditions bei sehr schnellen Writes). + t.drainFile() + } + } +} + +// bootstrap lädt die existierenden Zeilen in den Ring (bis ringSize) +// und merkt sich den Offset für den Tail-Loop. +func (t *Tailer) bootstrap() error { + f, err := os.Open(t.path) + if err != nil { + return err + } + defer f.Close() + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 || line[0] != '{' { + continue + } + if e, perr := parseLine(line); perr == nil { + t.appendNoBroadcast(e) + } + } + end, _ := f.Seek(0, io.SeekEnd) + t.mu.Lock() + t.offset = end + t.mu.Unlock() + return sc.Err() +} + +// drainFile liest alle Bytes ab t.offset, parst Zeilen, schiebt sie in +// den Ring + broadcastet. Truncate-Detection: wenn file-size < offset → +// offset auf 0 (logrotate copytruncate, ulogd schreibt weiter ins +// gleiche Inode). +func (t *Tailer) drainFile() { + f, err := os.Open(t.path) + if err != nil { + return + } + defer f.Close() + + stat, err := f.Stat() + if err != nil { + return + } + t.mu.Lock() + off := t.offset + if stat.Size() < off { + off = 0 + } + t.mu.Unlock() + + if _, err := f.Seek(off, io.SeekStart); err != nil { + return + } + rdr := bufio.NewReader(f) + var read int64 + for { + line, err := rdr.ReadBytes('\n') + if len(line) > 0 { + // Nur komplette Zeilen verarbeiten — bei partial-write + // (read überholt write) lassen wir das Fragment liegen + // und kriegen es beim nächsten Tick. + if line[len(line)-1] != '\n' { + break + } + read += int64(len(line)) + trim := line[:len(line)-1] + if len(trim) > 0 && trim[0] == '{' { + if e, perr := parseLine(trim); perr == nil { + t.appendAndBroadcast(e) + } + } + } + if err != nil { + break + } + } + t.mu.Lock() + t.offset = off + read + t.mu.Unlock() +} + +func (t *Tailer) appendNoBroadcast(e Entry) { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.ring) >= t.ringSize { + t.ring = append(t.ring[1:], e) + } else { + t.ring = append(t.ring, e) + } +} + +func (t *Tailer) appendAndBroadcast(e Entry) { + t.mu.Lock() + if len(t.ring) >= t.ringSize { + t.ring = append(t.ring[1:], e) + } else { + t.ring = append(t.ring, e) + } + subs := make([]chan Entry, 0, len(t.subs)) + for c := range t.subs { + subs = append(subs, c) + } + t.mu.Unlock() + // Non-blocking send — wenn ein Subscriber zu langsam liest, droppen + // wir den Event für ihn. Besser als alle anderen ausbremsen. + for _, c := range subs { + select { + case c <- e: + default: + } + } +} + +// Snapshot gibt eine Kopie des Ringbuffers zurück, optional gefiltert. +// Wird vom WebSocket-Handler beim initial-connect verwendet, damit der +// Client nicht 0 Events sieht bevor das erste neue Paket reinkommt. +func (t *Tailer) Snapshot(f Filter) []Entry { + if f.Limit <= 0 { + f.Limit = 200 + } + t.mu.RLock() + defer t.mu.RUnlock() + out := make([]Entry, 0, f.Limit) + for _, e := range t.ring { + if !f.Matches(&e) { + continue + } + out = append(out, e) + } + // nur die letzten f.Limit zurückgeben + if len(out) > f.Limit { + out = out[len(out)-f.Limit:] + } + return out +} + +// Subscribe gibt einen Channel zurück über den neue Events fließen + +// einen Unsubscribe-Cleanup. Channel-Buffer 64 — bei stehenden Clients +// wird gedropt. +func (t *Tailer) Subscribe() (<-chan Entry, func()) { + c := make(chan Entry, 64) + t.mu.Lock() + t.subs[c] = struct{}{} + t.mu.Unlock() + return c, func() { + t.mu.Lock() + delete(t.subs, c) + t.mu.Unlock() + close(c) + } +} + +func pathDir(p string) string { + for i := len(p) - 1; i >= 0; i-- { + if p[i] == '/' { + return p[:i] + } + } + return "." +} diff --git a/management-ui/src/components/Layout/Sidebar.tsx b/management-ui/src/components/Layout/Sidebar.tsx index c906308..b7c7798 100644 --- a/management-ui/src/components/Layout/Sidebar.tsx +++ b/management-ui/src/components/Layout/Sidebar.tsx @@ -75,7 +75,7 @@ const NAV: NavSection[] = [ }, ] -const VERSION = '1.0.59' +const VERSION = '1.0.60' // Sidebar-Pattern 1:1 aus netcell-webpanel (enconf) übernommen: // -