package firewalllog import ( "bufio" "context" "errors" "io" "log/slog" "os" "sync" "time" "github.com/fsnotify/fsnotify" ) // Tailer hält den In-Memory-Ringbuffer der letzten N Events + verteilt // jeden neuen Event an alle aktuell subscribed WebSocket-Clients. // // Lebt single-instance im edgeguard-api-Prozess (gestartet in main.go). // Robust gegen logrotate `copytruncate`: wir erkennen wenn die Datei // kürzer wird als unser Read-Offset und seek'en auf 0. type Tailer struct { path string ringSize int mu sync.RWMutex ring []Entry subs map[chan Entry]struct{} // fan-out targets offset int64 // last read offset im file // Wakeup-Signal aus fsnotify-Loop. Buffered=1 damit der fs-Watch // nicht blockt wenn der Reader gerade durch ist. wake chan struct{} } // NewTailer baut einen Tailer für path; ringSize ist der Memory- // Ringbuffer (typisch 1000 — bei 200 Bytes/Event = 200 KB, harmlos). func NewTailer(path string, ringSize int) *Tailer { if ringSize <= 0 { ringSize = 1000 } return &Tailer{ path: path, ringSize: ringSize, ring: make([]Entry, 0, ringSize), subs: map[chan Entry]struct{}{}, wake: make(chan struct{}, 1), } } // Start läuft bis ctx.Done(). Lädt zuerst die existierenden Zeilen, // dann fsnotify-watch + auf jeden Write-Event die neuen Bytes lesen. // Bei Truncate (logrotate copytruncate) → offset auf 0 zurück. func (t *Tailer) Start(ctx context.Context) error { if err := t.bootstrap(); err != nil { // Kein Fehler wenn Datei noch nicht existiert — erste // firewall-rule-with-log triggert ulogd, das das File anlegt. if !errors.Is(err, os.ErrNotExist) { return err } } w, err := fsnotify.NewWatcher() if err != nil { return err } defer w.Close() // Watch das Verzeichnis statt der Datei — wenn ulogd das File // erstmals anlegt (oder rotate-rename), bekommen wir Create-Events. dir := pathDir(t.path) if err := w.Add(dir); err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() case ev, ok := <-w.Events: if !ok { return nil } if ev.Name != t.path { continue } t.drainFile() case err, ok := <-w.Errors: if !ok { return nil } slog.Warn("firewalllog tailer: fsnotify error", "error", err) case <-time.After(2 * time.Second): // Safety-Net: poll-fallback alle 2s falls fsnotify einen // Event verschluckt (selten, aber inotify hat Race- // Conditions bei sehr schnellen Writes). t.drainFile() } } } // bootstrap lädt die existierenden Zeilen in den Ring (bis ringSize) // und merkt sich den Offset für den Tail-Loop. func (t *Tailer) bootstrap() error { f, err := os.Open(t.path) if err != nil { return err } defer f.Close() sc := bufio.NewScanner(f) sc.Buffer(make([]byte, 0, 64*1024), 1024*1024) for sc.Scan() { line := sc.Bytes() if len(line) == 0 || line[0] != '{' { continue } if e, perr := parseLine(line); perr == nil { t.appendNoBroadcast(e) } } end, _ := f.Seek(0, io.SeekEnd) t.mu.Lock() t.offset = end t.mu.Unlock() return sc.Err() } // drainFile liest alle Bytes ab t.offset, parst Zeilen, schiebt sie in // den Ring + broadcastet. Truncate-Detection: wenn file-size < offset → // offset auf 0 (logrotate copytruncate, ulogd schreibt weiter ins // gleiche Inode). func (t *Tailer) drainFile() { f, err := os.Open(t.path) if err != nil { return } defer f.Close() stat, err := f.Stat() if err != nil { return } t.mu.Lock() off := t.offset if stat.Size() < off { off = 0 } t.mu.Unlock() if _, err := f.Seek(off, io.SeekStart); err != nil { return } rdr := bufio.NewReader(f) var read int64 for { line, err := rdr.ReadBytes('\n') if len(line) > 0 { // Nur komplette Zeilen verarbeiten — bei partial-write // (read überholt write) lassen wir das Fragment liegen // und kriegen es beim nächsten Tick. if line[len(line)-1] != '\n' { break } read += int64(len(line)) trim := line[:len(line)-1] if len(trim) > 0 && trim[0] == '{' { if e, perr := parseLine(trim); perr == nil { t.appendAndBroadcast(e) } } } if err != nil { break } } t.mu.Lock() t.offset = off + read t.mu.Unlock() } func (t *Tailer) appendNoBroadcast(e Entry) { t.mu.Lock() defer t.mu.Unlock() if len(t.ring) >= t.ringSize { t.ring = append(t.ring[1:], e) } else { t.ring = append(t.ring, e) } } func (t *Tailer) appendAndBroadcast(e Entry) { t.mu.Lock() if len(t.ring) >= t.ringSize { t.ring = append(t.ring[1:], e) } else { t.ring = append(t.ring, e) } subs := make([]chan Entry, 0, len(t.subs)) for c := range t.subs { subs = append(subs, c) } t.mu.Unlock() // Non-blocking send — wenn ein Subscriber zu langsam liest, droppen // wir den Event für ihn. Besser als alle anderen ausbremsen. for _, c := range subs { select { case c <- e: default: } } } // Snapshot gibt eine Kopie des Ringbuffers zurück, optional gefiltert. // Wird vom WebSocket-Handler beim initial-connect verwendet, damit der // Client nicht 0 Events sieht bevor das erste neue Paket reinkommt. func (t *Tailer) Snapshot(f Filter) []Entry { if f.Limit <= 0 { f.Limit = 200 } t.mu.RLock() defer t.mu.RUnlock() out := make([]Entry, 0, f.Limit) for _, e := range t.ring { if !f.Matches(&e) { continue } out = append(out, e) } // nur die letzten f.Limit zurückgeben if len(out) > f.Limit { out = out[len(out)-f.Limit:] } return out } // Subscribe gibt einen Channel zurück über den neue Events fließen + // einen Unsubscribe-Cleanup. Channel-Buffer 64 — bei stehenden Clients // wird gedropt. func (t *Tailer) Subscribe() (<-chan Entry, func()) { c := make(chan Entry, 64) t.mu.Lock() t.subs[c] = struct{}{} t.mu.Unlock() return c, func() { t.mu.Lock() delete(t.subs, c) t.mu.Unlock() close(c) } } func pathDir(p string) string { for i := len(p) - 1; i >= 0; i-- { if p[i] == '/' { return p[:i] } } return "." }