Files
edgeguard-native/internal/services/firewalllog/tailer.go
Debian a798d1b796 feat(firewall-log): Phase 2 — HTTP-Tail + WebSocket-Live-Stream
Backend für /firewall-Live-Tail und historische Recherche der
ulogd2-JSONL aus Phase 1.

internal/services/firewalllog/
  reader.go  — JSONL parser + Filter (since/until/rule_id/src/dst/
               proto/action/limit). Proto-Mapping aus IP-Protocol-Number
               (1=icmp, 6=tcp, 17=udp, 58=icmpv6). RuleID wird aus
               oob.prefix "edgeguard:<id>" extrahiert.
  tailer.go  — fsnotify-Watcher auf /var/log/edgeguard/, In-Memory
               Ring-Buffer 1000 Events, fan-out an Subscribe()-Channel.
               Robust gegen logrotate copytruncate (truncate-detection
               via stat.Size() < offset → seek(0)). Safety-Net 2s-poll
               falls fsnotify einen Write verschluckt. Non-blocking send
               an Subscriber — langsame Clients droppen Events statt
               die Pipeline zu blockieren.

internal/handlers/firewall_log.go:
  GET /api/v1/firewall/log     — typed JSON list, Filter via Query
  WS  /api/v1/firewall/log/live — Snapshot + live broadcast
                                  (gorilla/websocket, 30s-ping)

main.go: Tailer beim Startup gestartet (context.Background) — UI
landet in Phase 3.

deps: gorilla/websocket v1.5.3, fsnotify v1.10.1

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 21:05:39 +02:00

260 lines
5.9 KiB
Go

package firewalllog
import (
"bufio"
"context"
"errors"
"io"
"log/slog"
"os"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
// Tailer hält den In-Memory-Ringbuffer der letzten N Events + verteilt
// jeden neuen Event an alle aktuell subscribed WebSocket-Clients.
//
// Lebt single-instance im edgeguard-api-Prozess (gestartet in main.go).
// Robust gegen logrotate `copytruncate`: wir erkennen wenn die Datei
// kürzer wird als unser Read-Offset und seek'en auf 0.
type Tailer struct {
path string
ringSize int
mu sync.RWMutex
ring []Entry
subs map[chan Entry]struct{} // fan-out targets
offset int64 // last read offset im file
// Wakeup-Signal aus fsnotify-Loop. Buffered=1 damit der fs-Watch
// nicht blockt wenn der Reader gerade durch ist.
wake chan struct{}
}
// NewTailer baut einen Tailer für path; ringSize ist der Memory-
// Ringbuffer (typisch 1000 — bei 200 Bytes/Event = 200 KB, harmlos).
func NewTailer(path string, ringSize int) *Tailer {
if ringSize <= 0 {
ringSize = 1000
}
return &Tailer{
path: path,
ringSize: ringSize,
ring: make([]Entry, 0, ringSize),
subs: map[chan Entry]struct{}{},
wake: make(chan struct{}, 1),
}
}
// Start läuft bis ctx.Done(). Lädt zuerst die existierenden Zeilen,
// dann fsnotify-watch + auf jeden Write-Event die neuen Bytes lesen.
// Bei Truncate (logrotate copytruncate) → offset auf 0 zurück.
func (t *Tailer) Start(ctx context.Context) error {
if err := t.bootstrap(); err != nil {
// Kein Fehler wenn Datei noch nicht existiert — erste
// firewall-rule-with-log triggert ulogd, das das File anlegt.
if !errors.Is(err, os.ErrNotExist) {
return err
}
}
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close()
// Watch das Verzeichnis statt der Datei — wenn ulogd das File
// erstmals anlegt (oder rotate-rename), bekommen wir Create-Events.
dir := pathDir(t.path)
if err := w.Add(dir); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-w.Events:
if !ok {
return nil
}
if ev.Name != t.path {
continue
}
t.drainFile()
case err, ok := <-w.Errors:
if !ok {
return nil
}
slog.Warn("firewalllog tailer: fsnotify error", "error", err)
case <-time.After(2 * time.Second):
// Safety-Net: poll-fallback alle 2s falls fsnotify einen
// Event verschluckt (selten, aber inotify hat Race-
// Conditions bei sehr schnellen Writes).
t.drainFile()
}
}
}
// bootstrap lädt die existierenden Zeilen in den Ring (bis ringSize)
// und merkt sich den Offset für den Tail-Loop.
func (t *Tailer) bootstrap() error {
f, err := os.Open(t.path)
if err != nil {
return err
}
defer f.Close()
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 || line[0] != '{' {
continue
}
if e, perr := parseLine(line); perr == nil {
t.appendNoBroadcast(e)
}
}
end, _ := f.Seek(0, io.SeekEnd)
t.mu.Lock()
t.offset = end
t.mu.Unlock()
return sc.Err()
}
// drainFile liest alle Bytes ab t.offset, parst Zeilen, schiebt sie in
// den Ring + broadcastet. Truncate-Detection: wenn file-size < offset →
// offset auf 0 (logrotate copytruncate, ulogd schreibt weiter ins
// gleiche Inode).
func (t *Tailer) drainFile() {
f, err := os.Open(t.path)
if err != nil {
return
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
return
}
t.mu.Lock()
off := t.offset
if stat.Size() < off {
off = 0
}
t.mu.Unlock()
if _, err := f.Seek(off, io.SeekStart); err != nil {
return
}
rdr := bufio.NewReader(f)
var read int64
for {
line, err := rdr.ReadBytes('\n')
if len(line) > 0 {
// Nur komplette Zeilen verarbeiten — bei partial-write
// (read überholt write) lassen wir das Fragment liegen
// und kriegen es beim nächsten Tick.
if line[len(line)-1] != '\n' {
break
}
read += int64(len(line))
trim := line[:len(line)-1]
if len(trim) > 0 && trim[0] == '{' {
if e, perr := parseLine(trim); perr == nil {
t.appendAndBroadcast(e)
}
}
}
if err != nil {
break
}
}
t.mu.Lock()
t.offset = off + read
t.mu.Unlock()
}
func (t *Tailer) appendNoBroadcast(e Entry) {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.ring) >= t.ringSize {
t.ring = append(t.ring[1:], e)
} else {
t.ring = append(t.ring, e)
}
}
func (t *Tailer) appendAndBroadcast(e Entry) {
t.mu.Lock()
if len(t.ring) >= t.ringSize {
t.ring = append(t.ring[1:], e)
} else {
t.ring = append(t.ring, e)
}
subs := make([]chan Entry, 0, len(t.subs))
for c := range t.subs {
subs = append(subs, c)
}
t.mu.Unlock()
// Non-blocking send — wenn ein Subscriber zu langsam liest, droppen
// wir den Event für ihn. Besser als alle anderen ausbremsen.
for _, c := range subs {
select {
case c <- e:
default:
}
}
}
// Snapshot gibt eine Kopie des Ringbuffers zurück, optional gefiltert.
// Wird vom WebSocket-Handler beim initial-connect verwendet, damit der
// Client nicht 0 Events sieht bevor das erste neue Paket reinkommt.
func (t *Tailer) Snapshot(f Filter) []Entry {
if f.Limit <= 0 {
f.Limit = 200
}
t.mu.RLock()
defer t.mu.RUnlock()
out := make([]Entry, 0, f.Limit)
for _, e := range t.ring {
if !f.Matches(&e) {
continue
}
out = append(out, e)
}
// nur die letzten f.Limit zurückgeben
if len(out) > f.Limit {
out = out[len(out)-f.Limit:]
}
return out
}
// Subscribe gibt einen Channel zurück über den neue Events fließen +
// einen Unsubscribe-Cleanup. Channel-Buffer 64 — bei stehenden Clients
// wird gedropt.
func (t *Tailer) Subscribe() (<-chan Entry, func()) {
c := make(chan Entry, 64)
t.mu.Lock()
t.subs[c] = struct{}{}
t.mu.Unlock()
return c, func() {
t.mu.Lock()
delete(t.subs, c)
t.mu.Unlock()
close(c)
}
}
func pathDir(p string) string {
for i := len(p) - 1; i >= 0; i-- {
if p[i] == '/' {
return p[:i]
}
}
return "."
}