Backend für /firewall-Live-Tail und historische Recherche der
ulogd2-JSONL aus Phase 1.
internal/services/firewalllog/
reader.go — JSONL parser + Filter (since/until/rule_id/src/dst/
proto/action/limit). Proto-Mapping aus IP-Protocol-Number
(1=icmp, 6=tcp, 17=udp, 58=icmpv6). RuleID wird aus
oob.prefix "edgeguard:<id>" extrahiert.
tailer.go — fsnotify-Watcher auf /var/log/edgeguard/, In-Memory
Ring-Buffer 1000 Events, fan-out an Subscribe()-Channel.
Robust gegen logrotate copytruncate (truncate-detection
via stat.Size() < offset → seek(0)). Safety-Net 2s-poll
falls fsnotify einen Write verschluckt. Non-blocking send
an Subscriber — langsame Clients droppen Events statt
die Pipeline zu blockieren.
internal/handlers/firewall_log.go:
GET /api/v1/firewall/log — typed JSON list, Filter via Query
WS /api/v1/firewall/log/live — Snapshot + live broadcast
(gorilla/websocket, 30s-ping)
main.go: Tailer beim Startup gestartet (context.Background) — UI
landet in Phase 3.
deps: gorilla/websocket v1.5.3, fsnotify v1.10.1
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
260 lines
5.9 KiB
Go
260 lines
5.9 KiB
Go
package firewalllog
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// Tailer hält den In-Memory-Ringbuffer der letzten N Events + verteilt
|
|
// jeden neuen Event an alle aktuell subscribed WebSocket-Clients.
|
|
//
|
|
// Lebt single-instance im edgeguard-api-Prozess (gestartet in main.go).
|
|
// Robust gegen logrotate `copytruncate`: wir erkennen wenn die Datei
|
|
// kürzer wird als unser Read-Offset und seek'en auf 0.
|
|
type Tailer struct {
|
|
path string
|
|
ringSize int
|
|
|
|
mu sync.RWMutex
|
|
ring []Entry
|
|
subs map[chan Entry]struct{} // fan-out targets
|
|
offset int64 // last read offset im file
|
|
|
|
// Wakeup-Signal aus fsnotify-Loop. Buffered=1 damit der fs-Watch
|
|
// nicht blockt wenn der Reader gerade durch ist.
|
|
wake chan struct{}
|
|
}
|
|
|
|
// NewTailer baut einen Tailer für path; ringSize ist der Memory-
|
|
// Ringbuffer (typisch 1000 — bei 200 Bytes/Event = 200 KB, harmlos).
|
|
func NewTailer(path string, ringSize int) *Tailer {
|
|
if ringSize <= 0 {
|
|
ringSize = 1000
|
|
}
|
|
return &Tailer{
|
|
path: path,
|
|
ringSize: ringSize,
|
|
ring: make([]Entry, 0, ringSize),
|
|
subs: map[chan Entry]struct{}{},
|
|
wake: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// Start läuft bis ctx.Done(). Lädt zuerst die existierenden Zeilen,
|
|
// dann fsnotify-watch + auf jeden Write-Event die neuen Bytes lesen.
|
|
// Bei Truncate (logrotate copytruncate) → offset auf 0 zurück.
|
|
func (t *Tailer) Start(ctx context.Context) error {
|
|
if err := t.bootstrap(); err != nil {
|
|
// Kein Fehler wenn Datei noch nicht existiert — erste
|
|
// firewall-rule-with-log triggert ulogd, das das File anlegt.
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
w, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer w.Close()
|
|
|
|
// Watch das Verzeichnis statt der Datei — wenn ulogd das File
|
|
// erstmals anlegt (oder rotate-rename), bekommen wir Create-Events.
|
|
dir := pathDir(t.path)
|
|
if err := w.Add(dir); err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case ev, ok := <-w.Events:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if ev.Name != t.path {
|
|
continue
|
|
}
|
|
t.drainFile()
|
|
case err, ok := <-w.Errors:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
slog.Warn("firewalllog tailer: fsnotify error", "error", err)
|
|
case <-time.After(2 * time.Second):
|
|
// Safety-Net: poll-fallback alle 2s falls fsnotify einen
|
|
// Event verschluckt (selten, aber inotify hat Race-
|
|
// Conditions bei sehr schnellen Writes).
|
|
t.drainFile()
|
|
}
|
|
}
|
|
}
|
|
|
|
// bootstrap lädt die existierenden Zeilen in den Ring (bis ringSize)
|
|
// und merkt sich den Offset für den Tail-Loop.
|
|
func (t *Tailer) bootstrap() error {
|
|
f, err := os.Open(t.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
sc := bufio.NewScanner(f)
|
|
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
for sc.Scan() {
|
|
line := sc.Bytes()
|
|
if len(line) == 0 || line[0] != '{' {
|
|
continue
|
|
}
|
|
if e, perr := parseLine(line); perr == nil {
|
|
t.appendNoBroadcast(e)
|
|
}
|
|
}
|
|
end, _ := f.Seek(0, io.SeekEnd)
|
|
t.mu.Lock()
|
|
t.offset = end
|
|
t.mu.Unlock()
|
|
return sc.Err()
|
|
}
|
|
|
|
// drainFile liest alle Bytes ab t.offset, parst Zeilen, schiebt sie in
|
|
// den Ring + broadcastet. Truncate-Detection: wenn file-size < offset →
|
|
// offset auf 0 (logrotate copytruncate, ulogd schreibt weiter ins
|
|
// gleiche Inode).
|
|
func (t *Tailer) drainFile() {
|
|
f, err := os.Open(t.path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return
|
|
}
|
|
t.mu.Lock()
|
|
off := t.offset
|
|
if stat.Size() < off {
|
|
off = 0
|
|
}
|
|
t.mu.Unlock()
|
|
|
|
if _, err := f.Seek(off, io.SeekStart); err != nil {
|
|
return
|
|
}
|
|
rdr := bufio.NewReader(f)
|
|
var read int64
|
|
for {
|
|
line, err := rdr.ReadBytes('\n')
|
|
if len(line) > 0 {
|
|
// Nur komplette Zeilen verarbeiten — bei partial-write
|
|
// (read überholt write) lassen wir das Fragment liegen
|
|
// und kriegen es beim nächsten Tick.
|
|
if line[len(line)-1] != '\n' {
|
|
break
|
|
}
|
|
read += int64(len(line))
|
|
trim := line[:len(line)-1]
|
|
if len(trim) > 0 && trim[0] == '{' {
|
|
if e, perr := parseLine(trim); perr == nil {
|
|
t.appendAndBroadcast(e)
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
t.mu.Lock()
|
|
t.offset = off + read
|
|
t.mu.Unlock()
|
|
}
|
|
|
|
func (t *Tailer) appendNoBroadcast(e Entry) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if len(t.ring) >= t.ringSize {
|
|
t.ring = append(t.ring[1:], e)
|
|
} else {
|
|
t.ring = append(t.ring, e)
|
|
}
|
|
}
|
|
|
|
func (t *Tailer) appendAndBroadcast(e Entry) {
|
|
t.mu.Lock()
|
|
if len(t.ring) >= t.ringSize {
|
|
t.ring = append(t.ring[1:], e)
|
|
} else {
|
|
t.ring = append(t.ring, e)
|
|
}
|
|
subs := make([]chan Entry, 0, len(t.subs))
|
|
for c := range t.subs {
|
|
subs = append(subs, c)
|
|
}
|
|
t.mu.Unlock()
|
|
// Non-blocking send — wenn ein Subscriber zu langsam liest, droppen
|
|
// wir den Event für ihn. Besser als alle anderen ausbremsen.
|
|
for _, c := range subs {
|
|
select {
|
|
case c <- e:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Snapshot gibt eine Kopie des Ringbuffers zurück, optional gefiltert.
|
|
// Wird vom WebSocket-Handler beim initial-connect verwendet, damit der
|
|
// Client nicht 0 Events sieht bevor das erste neue Paket reinkommt.
|
|
func (t *Tailer) Snapshot(f Filter) []Entry {
|
|
if f.Limit <= 0 {
|
|
f.Limit = 200
|
|
}
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
out := make([]Entry, 0, f.Limit)
|
|
for _, e := range t.ring {
|
|
if !f.Matches(&e) {
|
|
continue
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
// nur die letzten f.Limit zurückgeben
|
|
if len(out) > f.Limit {
|
|
out = out[len(out)-f.Limit:]
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Subscribe gibt einen Channel zurück über den neue Events fließen +
|
|
// einen Unsubscribe-Cleanup. Channel-Buffer 64 — bei stehenden Clients
|
|
// wird gedropt.
|
|
func (t *Tailer) Subscribe() (<-chan Entry, func()) {
|
|
c := make(chan Entry, 64)
|
|
t.mu.Lock()
|
|
t.subs[c] = struct{}{}
|
|
t.mu.Unlock()
|
|
return c, func() {
|
|
t.mu.Lock()
|
|
delete(t.subs, c)
|
|
t.mu.Unlock()
|
|
close(c)
|
|
}
|
|
}
|
|
|
|
func pathDir(p string) string {
|
|
for i := len(p) - 1; i >= 0; i-- {
|
|
if p[i] == '/' {
|
|
return p[:i]
|
|
}
|
|
}
|
|
return "."
|
|
}
|