package cluster import ( "context" "errors" "fmt" "os" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "git.netcell-it.de/projekte/edgeguard-native/internal/models" ) var ErrNotFound = errors.New("ha_node not found") // Store wraps the ha_nodes table — used by the cluster handler and by // EnsureSelfRegistered. v1 only ever has one row (the local node); // the table is in place so Phase 3.1 multi-node lands without // schema churn. type Store struct { Pool *pgxpool.Pool } func NewStore(pool *pgxpool.Pool) *Store { return &Store{Pool: pool} } const baseSelect = ` SELECT id, name, fqdn, api_url, public_ip, internal_ip, mgmt_ip, role, version, config_hash, status, last_seen, joined_at, created_at, updated_at FROM ha_nodes ` func (s *Store) List(ctx context.Context) ([]models.HANode, error) { rows, err := s.Pool.Query(ctx, baseSelect+" ORDER BY joined_at ASC") if err != nil { return nil, err } defer rows.Close() out := make([]models.HANode, 0, 4) for rows.Next() { n, err := scanNode(rows) if err != nil { return nil, err } out = append(out, *n) } return out, rows.Err() } func (s *Store) Get(ctx context.Context, id string) (*models.HANode, error) { row := s.Pool.QueryRow(ctx, baseSelect+" WHERE id = $1", id) n, err := scanNode(row) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return nil, err } return n, nil } // UpsertSelf writes the local node's row using the database-side // ON CONFLICT DO UPDATE so the call is safe to make on every boot. // last_seen is also bumped — handy for the heartbeat-by-restart // pattern even before periodic heartbeats land. Phase-3-Felder // (mgmt_ip, version, config_hash, status) werden mit COALESCE // erhalten falls der Caller sie nicht setzt. func (s *Store) UpsertSelf(ctx context.Context, n models.HANode) (*models.HANode, error) { now := time.Now().UTC() if n.Status == "" { n.Status = "online" } row := s.Pool.QueryRow(ctx, ` INSERT INTO ha_nodes (id, name, fqdn, api_url, public_ip, internal_ip, mgmt_ip, role, version, config_hash, status, last_seen, joined_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, fqdn = EXCLUDED.fqdn, api_url = EXCLUDED.api_url, public_ip = COALESCE(EXCLUDED.public_ip, ha_nodes.public_ip), internal_ip = COALESCE(EXCLUDED.internal_ip, ha_nodes.internal_ip), mgmt_ip = COALESCE(EXCLUDED.mgmt_ip, ha_nodes.mgmt_ip), role = EXCLUDED.role, version = COALESCE(EXCLUDED.version, ha_nodes.version), config_hash = COALESCE(EXCLUDED.config_hash, ha_nodes.config_hash), status = EXCLUDED.status, last_seen = EXCLUDED.last_seen, updated_at = NOW() RETURNING id, name, fqdn, api_url, public_ip, internal_ip, mgmt_ip, role, version, config_hash, status, last_seen, joined_at, created_at, updated_at`, n.ID, n.Name, n.FQDN, n.APIURL, n.PublicIP, n.InternalIP, n.MgmtIP, n.Role, n.Version, n.ConfigHash, n.Status, now, now, ) return scanNode(row) } // EnsureSelfRegistered mints the node-id if needed, builds the row // from setup.json + os.Hostname + node.conf, and upserts it. Called // on edgeguard-api boot AFTER the DB pool is reachable. // // fqdn = setup-store fqdn (preferred) or hostname. // apiURL = "https://". // version = edgeguard-api-Version (für Drift-Banner). // mgmtIP = aus /etc/edgeguard/node.conf wenn vorhanden, sonst Auto. func EnsureSelfRegistered(ctx context.Context, store *Store, fqdn, role, version string) (*models.HANode, error) { id, err := EnsureNodeID("") if err != nil { // Even when persistence failed (read-only /var/lib in dev), // EnsureNodeID returns the in-memory id alongside the error // — so we can still register, but the id will rotate on // every boot. _ = err } if id == "" { return nil, fmt.Errorf("could not derive node id") } host, hostErr := os.Hostname() if hostErr != nil { host = "unknown" } if fqdn == "" { fqdn = host } cfg, _ := LoadLocalConfig("") var mgmtIP *string if cfg != nil && cfg.MgmtIP != "" { v := cfg.MgmtIP mgmtIP = &v } var ver *string if version != "" { v := version ver = &v } n := models.HANode{ ID: id, Name: host, FQDN: fqdn, APIURL: "https://" + fqdn, MgmtIP: mgmtIP, Role: role, Version: ver, Status: "online", } return store.UpsertSelf(ctx, n) } func scanNode(row interface{ Scan(...any) error }) (*models.HANode, error) { var n models.HANode if err := row.Scan( &n.ID, &n.Name, &n.FQDN, &n.APIURL, &n.PublicIP, &n.InternalIP, &n.MgmtIP, &n.Role, &n.Version, &n.ConfigHash, &n.Status, &n.LastSeen, &n.JoinedAt, &n.CreatedAt, &n.UpdatedAt, ); err != nil { return nil, err } return &n, nil }