checkers: store observations as json.RawMessage with cross-checker reuse

Refactor observation data pipeline to serialize once after collection and
keep json.RawMessage throughout storage and API responses. This eliminates
double-serialization and makes DB round-trips lossless.
This commit is contained in:
nemunaire 2026-04-05 11:45:59 +07:00
commit cab205c97f
12 changed files with 440 additions and 37 deletions

View file

@ -22,7 +22,6 @@
package controller
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
@ -196,21 +195,15 @@ func (cc *CheckerController) GetExecutionObservations(c *gin.Context) {
// @Router /domains/{domain}/zone/{zoneid}/{subdomain}/services/{serviceid}/checkers/{checkerId}/executions/{executionId}/observations/{obsKey} [get]
func (cc *CheckerController) GetExecutionObservation(c *gin.Context) {
exec := c.MustGet("execution").(*happydns.Execution)
snap, err := cc.statusUC.GetObservationsByExecution(targetFromContext(c), exec.Id)
if err != nil {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observations not available"})
return
}
obsKey := c.Param("obsKey")
val, ok := snap.Data[obsKey]
if !ok {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observation key not found"})
val, err := cc.statusUC.GetSnapshotByExecution(targetFromContext(c), exec.Id, obsKey)
if err != nil {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observation not available"})
return
}
c.JSON(http.StatusOK, val)
c.Data(http.StatusOK, "application/json; charset=utf-8", val)
}
// GetExecutionResults returns the evaluation (per-rule states) for an execution.
@ -296,27 +289,15 @@ func (cc *CheckerController) GetExecutionResult(c *gin.Context) {
// @Router /domains/{domain}/zone/{zoneid}/{subdomain}/services/{serviceid}/checkers/{checkerId}/executions/{executionId}/observations/{obsKey}/report [get]
func (cc *CheckerController) GetExecutionHTMLReport(c *gin.Context) {
exec := c.MustGet("execution").(*happydns.Execution)
snap, err := cc.statusUC.GetObservationsByExecution(targetFromContext(c), exec.Id)
if err != nil {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observations not available"})
return
}
obsKey := c.Param("obsKey")
val, ok := snap.Data[obsKey]
if !ok {
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observation key not found"})
return
}
raw, err := json.Marshal(val)
val, err := cc.statusUC.GetSnapshotByExecution(targetFromContext(c), exec.Id, obsKey)
if err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
c.AbortWithStatusJSON(http.StatusNotFound, gin.H{"errmsg": "Observation not available"})
return
}
htmlContent, supported, err := checkerPkg.GetHTMLReport(obsKey, json.RawMessage(raw))
htmlContent, supported, err := checkerPkg.GetHTMLReport(obsKey, val)
if err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
return

View file

@ -269,6 +269,7 @@ func (app *App) initUsecases() {
app.store,
app.store,
app.store,
app.store,
)
app.usecases.checkerScheduler = checkerUC.NewScheduler(app.usecases.checkerEngine, app.cfg.CheckerMaxConcurrency, app.store, app.store, app.store, app.store)
}

View file

@ -45,6 +45,7 @@ type Storage interface {
checker.CheckerOptionsStorage
checker.CheckEvaluationStorage
checker.ExecutionStorage
checker.ObservationCacheStorage
checker.ObservationSnapshotStorage
checker.SchedulerStateStorage
domain.DomainStorage

View file

@ -0,0 +1,50 @@
// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2026 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package database
import (
"fmt"
"git.happydns.org/happyDomain/model"
)
func obsCacheKey(target happydns.CheckTarget, key happydns.ObservationKey) string {
return fmt.Sprintf("obscache|%s-%s", target.String(), key)
}
func (s *KVStorage) ListAllCachedObservations() (happydns.Iterator[happydns.ObservationCacheEntry], error) {
iter := s.db.Search("obscache|")
return NewKVIterator[happydns.ObservationCacheEntry](s.db, iter), nil
}
func (s *KVStorage) GetCachedObservation(target happydns.CheckTarget, key happydns.ObservationKey) (*happydns.ObservationCacheEntry, error) {
entry := &happydns.ObservationCacheEntry{}
err := s.db.Get(obsCacheKey(target, key), entry)
if err != nil {
return nil, err
}
return entry, nil
}
func (s *KVStorage) PutCachedObservation(target happydns.CheckTarget, key happydns.ObservationKey, entry *happydns.ObservationCacheEntry) error {
return s.db.Put(obsCacheKey(target, key), entry)
}

View file

@ -22,6 +22,7 @@
package checker
import (
"encoding/json"
"log"
"slices"
@ -344,3 +345,18 @@ func (u *CheckStatusUsecase) GetMetricsByDomain(domainId happydns.Identifier, li
}
return u.extractMetricsFromExecutions(execs)
}
// GetSnapshotByExecution returns the raw observation data for a single key from an execution after verifying scope.
func (u *CheckStatusUsecase) GetSnapshotByExecution(scope happydns.CheckTarget, execID happydns.Identifier, obsKey string) (json.RawMessage, error) {
snap, err := u.GetObservationsByExecution(scope, execID)
if err != nil {
return nil, err
}
raw, ok := snap.Data[obsKey]
if !ok {
return nil, happydns.ErrSnapshotNotFound
}
return raw, nil
}

View file

@ -22,6 +22,7 @@
package checker_test
import (
"encoding/json"
"testing"
"time"
@ -722,3 +723,117 @@ func TestCheckStatusUsecase_GetMetricsByUser_LimitApplied(t *testing.T) {
}
_ = metrics
}
func TestCheckStatusUsecase_GetSnapshotByExecution(t *testing.T) {
uc, _, ms := setupStatusUC(t)
uid, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String(), DomainId: "d1"}
// Create snapshot with observation data.
snap := &happydns.ObservationSnapshot{
Target: target,
CollectedAt: time.Now(),
Data: map[happydns.ObservationKey]json.RawMessage{
"dns_records": json.RawMessage(`{"records":["A 1.2.3.4"]}`),
},
}
if err := ms.CreateSnapshot(snap); err != nil {
t.Fatalf("CreateSnapshot() error: %v", err)
}
eval := &happydns.CheckEvaluation{
CheckerID: "status_test_checker",
Target: target,
SnapshotID: snap.Id,
}
if err := ms.CreateEvaluation(eval); err != nil {
t.Fatalf("CreateEvaluation() error: %v", err)
}
exec := &happydns.Execution{
CheckerID: "status_test_checker",
Target: target,
Status: happydns.ExecutionDone,
EvaluationID: &eval.Id,
}
if err := ms.CreateExecution(exec); err != nil {
t.Fatalf("CreateExecution() error: %v", err)
}
raw, err := uc.GetSnapshotByExecution(target, exec.Id, "dns_records")
if err != nil {
t.Fatalf("GetSnapshotByExecution() error: %v", err)
}
var parsed map[string]any
if err := json.Unmarshal(raw, &parsed); err != nil {
t.Fatalf("failed to unmarshal observation data: %v", err)
}
if _, ok := parsed["records"]; !ok {
t.Error("expected 'records' key in observation data")
}
}
func TestCheckStatusUsecase_GetSnapshotByExecution_KeyNotFound(t *testing.T) {
uc, _, ms := setupStatusUC(t)
uid, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String(), DomainId: "d1"}
snap := &happydns.ObservationSnapshot{
Target: target,
CollectedAt: time.Now(),
Data: map[happydns.ObservationKey]json.RawMessage{},
}
if err := ms.CreateSnapshot(snap); err != nil {
t.Fatalf("CreateSnapshot() error: %v", err)
}
eval := &happydns.CheckEvaluation{
CheckerID: "status_test_checker",
Target: target,
SnapshotID: snap.Id,
}
if err := ms.CreateEvaluation(eval); err != nil {
t.Fatalf("CreateEvaluation() error: %v", err)
}
exec := &happydns.Execution{
CheckerID: "status_test_checker",
Target: target,
Status: happydns.ExecutionDone,
EvaluationID: &eval.Id,
}
if err := ms.CreateExecution(exec); err != nil {
t.Fatalf("CreateExecution() error: %v", err)
}
_, err := uc.GetSnapshotByExecution(target, exec.Id, "nonexistent_key")
if err == nil {
t.Fatal("expected error for nonexistent observation key")
}
}
func TestCheckStatusUsecase_GetSnapshotByExecution_ScopeMismatch(t *testing.T) {
uc, _, ms := setupStatusUC(t)
uid, _ := happydns.NewRandomIdentifier()
uid2, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String(), DomainId: "d1"}
exec := &happydns.Execution{
CheckerID: "status_test_checker",
Target: target,
Status: happydns.ExecutionDone,
}
if err := ms.CreateExecution(exec); err != nil {
t.Fatalf("CreateExecution() error: %v", err)
}
wrongScope := happydns.CheckTarget{UserId: uid2.String()}
_, err := uc.GetSnapshotByExecution(wrongScope, exec.Id, "any_key")
if err == nil {
t.Fatal("expected error when scope doesn't match")
}
}

View file

@ -23,6 +23,7 @@ package checker
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
@ -37,6 +38,7 @@ type checkerEngine struct {
evalStore CheckEvaluationStorage
execStore ExecutionStorage
snapStore ObservationSnapshotStorage
cacheStore ObservationCacheStorage
}
// NewCheckerEngine creates a new CheckerEngine implementation.
@ -45,12 +47,14 @@ func NewCheckerEngine(
evalStore CheckEvaluationStorage,
execStore ExecutionStorage,
snapStore ObservationSnapshotStorage,
cacheStore ObservationCacheStorage,
) happydns.CheckerEngine {
return &checkerEngine{
optionsUC: optionsUC,
evalStore: evalStore,
execStore: execStore,
snapStore: snapStore,
optionsUC: optionsUC,
evalStore: evalStore,
execStore: execStore,
snapStore: snapStore,
cacheStore: cacheStore,
}
}
@ -141,8 +145,35 @@ func (e *checkerEngine) runPipeline(ctx context.Context, def *happydns.CheckerDe
return happydns.CheckState{}, nil, fmt.Errorf("resolving options: %w", err)
}
// Build observation cache lookup for cross-checker reuse.
var cacheLookup checkerPkg.ObservationCacheLookup
if e.cacheStore != nil {
cacheLookup = func(target happydns.CheckTarget, key happydns.ObservationKey) (json.RawMessage, time.Time, error) {
entry, err := e.cacheStore.GetCachedObservation(target, key)
if err != nil {
return nil, time.Time{}, err
}
snap, err := e.snapStore.GetSnapshot(entry.SnapshotID)
if err != nil {
return nil, time.Time{}, err
}
raw, ok := snap.Data[key]
if !ok {
return nil, time.Time{}, fmt.Errorf("observation %q not in snapshot", key)
}
return raw, entry.CollectedAt, nil
}
}
var freshness time.Duration
if plan != nil && plan.Interval != nil {
freshness = *plan.Interval
} else if plan != nil && def.Interval != nil {
freshness = def.Interval.Default
}
// Create observation context for lazy data collection.
obsCtx := checkerPkg.NewObservationContext(target, mergedOpts)
obsCtx := checkerPkg.NewObservationContext(target, mergedOpts, cacheLookup, freshness)
// Evaluate all rules, skipping disabled ones.
states := make([]happydns.CheckState, 0, len(def.Rules))
@ -174,6 +205,18 @@ func (e *checkerEngine) runPipeline(ctx context.Context, def *happydns.CheckerDe
return happydns.CheckState{}, nil, fmt.Errorf("creating snapshot: %w", err)
}
// Update observation cache pointers for cross-checker reuse.
if e.cacheStore != nil {
for key := range snap.Data {
if err := e.cacheStore.PutCachedObservation(target, key, &happydns.ObservationCacheEntry{
SnapshotID: snap.Id,
CollectedAt: snap.CollectedAt,
}); err != nil {
log.Printf("warning: failed to cache observation %q for target %s: %v", key, target.String(), err)
}
}
}
// Persist evaluation.
eval := &happydns.CheckEvaluation{
PlanID: planID,

View file

@ -52,8 +52,8 @@ func (r *testCheckRule) Name() string { return r.name }
func (r *testCheckRule) Description() string { return "test rule: " + r.name }
func (r *testCheckRule) Evaluate(ctx context.Context, obs happydns.ObservationGetter, opts happydns.CheckerOptions) happydns.CheckState {
_, err := obs.Get(ctx, "test_obs")
if err != nil {
var data map[string]any
if err := obs.Get(ctx, "test_obs", &data); err != nil {
return happydns.CheckState{Status: happydns.StatusError, Message: err.Error()}
}
return happydns.CheckState{Status: r.status, Message: r.name + " passed", Code: r.name}
@ -79,7 +79,7 @@ func TestCheckerEngine_RunOK(t *testing.T) {
})
optionsUC := checkerUC.NewCheckerOptionsUsecase(store, nil)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store, store)
uid, _ := happydns.NewRandomIdentifier()
did, _ := happydns.NewRandomIdentifier()
@ -143,7 +143,7 @@ func TestCheckerEngine_RunWarn(t *testing.T) {
})
optionsUC := checkerUC.NewCheckerOptionsUsecase(store, nil)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store, store)
uid, _ := happydns.NewRandomIdentifier()
did, _ := happydns.NewRandomIdentifier()
@ -188,7 +188,7 @@ func TestCheckerEngine_RunPerRuleDisable(t *testing.T) {
})
optionsUC := checkerUC.NewCheckerOptionsUsecase(store, nil)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store, store)
uid, _ := happydns.NewRandomIdentifier()
did, _ := happydns.NewRandomIdentifier()
@ -278,7 +278,7 @@ func TestCheckerEngine_RunNotFound(t *testing.T) {
t.Fatalf("Instantiate() returned error: %v", err)
}
optionsUC := checkerUC.NewCheckerOptionsUsecase(store, nil)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store, store)
uid, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String()}
@ -388,3 +388,60 @@ func TestCheckerEngine_RunExecution_CheckerDisappeared(t *testing.T) {
t.Errorf("expected execution status Failed, got %d", persisted.Status)
}
}
func TestCheckerEngine_RunPopulatesObservationCache(t *testing.T) {
store, err := inmemory.Instantiate()
if err != nil {
t.Fatalf("Instantiate() returned error: %v", err)
}
checker.RegisterObservationProvider(&testObservationProvider{})
checker.RegisterChecker(&happydns.CheckerDefinition{
ID: "test_checker_cache",
Name: "Test Checker Cache",
Availability: happydns.CheckerAvailability{
ApplyToDomain: true,
},
Rules: []happydns.CheckRule{
&testCheckRule{name: "rule_cache", status: happydns.StatusOK},
},
})
optionsUC := checkerUC.NewCheckerOptionsUsecase(store, nil)
engine := checkerUC.NewCheckerEngine(optionsUC, store, store, store, store)
uid, _ := happydns.NewRandomIdentifier()
did, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String(), DomainId: did.String()}
exec, err := engine.CreateExecution("test_checker_cache", target, nil)
if err != nil {
t.Fatalf("CreateExecution() returned error: %v", err)
}
_, err = engine.RunExecution(context.Background(), exec, nil, nil)
if err != nil {
t.Fatalf("RunExecution() returned error: %v", err)
}
// Verify observation cache was populated for the "test_obs" key.
entry, err := store.GetCachedObservation(target, "test_obs")
if err != nil {
t.Fatalf("GetCachedObservation() returned error: %v", err)
}
if entry.SnapshotID.IsEmpty() {
t.Error("expected non-empty snapshot ID in cache entry")
}
if entry.CollectedAt.IsZero() {
t.Error("expected non-zero CollectedAt in cache entry")
}
// Verify the cached snapshot actually exists and contains the data.
snap, err := store.GetSnapshot(entry.SnapshotID)
if err != nil {
t.Fatalf("GetSnapshot() returned error: %v", err)
}
if _, ok := snap.Data["test_obs"]; !ok {
t.Error("expected 'test_obs' key in snapshot data")
}
}

View file

@ -118,3 +118,11 @@ type ObservationSnapshotStorage interface {
DeleteSnapshot(snapID happydns.Identifier) error
ClearSnapshots() error
}
// ObservationCacheStorage provides a lightweight cache mapping (target, observation key)
// to the snapshot that holds the most recent data.
type ObservationCacheStorage interface {
ListAllCachedObservations() (happydns.Iterator[happydns.ObservationCacheEntry], error)
GetCachedObservation(target happydns.CheckTarget, key happydns.ObservationKey) (*happydns.ObservationCacheEntry, error)
PutCachedObservation(target happydns.CheckTarget, key happydns.ObservationKey, entry *happydns.ObservationCacheEntry) error
}

View file

@ -54,6 +54,7 @@ func (tu *tidyUpUsecase) TidyAll() error {
tu.TidyExecutions,
tu.TidyCheckEvaluations,
tu.TidySnapshots,
tu.TidyObservationCache,
} {
if err := tidy(); err != nil {
return err
@ -302,6 +303,27 @@ func (tu *tidyUpUsecase) TidyExecutions() error {
return tu.store.TidyExecutionIndexes()
}
func (tu *tidyUpUsecase) TidyObservationCache() error {
iter, err := tu.store.ListAllCachedObservations()
if err != nil {
return err
}
defer iter.Close()
for iter.Next() {
entry := iter.Item()
if _, err = tu.store.GetSnapshot(entry.SnapshotID); errors.Is(err, happydns.ErrSnapshotNotFound) {
log.Printf("Deleting stale observation cache entry (snapshot %s not found)\n", entry.SnapshotID.String())
if err = iter.DropItem(); err != nil {
return err
}
}
}
return iter.Err()
}
func (tu *tidyUpUsecase) TidyDomains() error {
iter, err := tu.store.ListAllDomains()
if err != nil {

View file

@ -0,0 +1,108 @@
// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2026 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package usecase_test
import (
"encoding/json"
"testing"
"time"
"git.happydns.org/happyDomain/internal/storage/inmemory"
"git.happydns.org/happyDomain/internal/usecase"
"git.happydns.org/happyDomain/model"
)
func TestTidyObservationCache_RemovesStaleEntries(t *testing.T) {
store, err := inmemory.Instantiate()
if err != nil {
t.Fatalf("Instantiate() returned error: %v", err)
}
uid, _ := happydns.NewRandomIdentifier()
target := happydns.CheckTarget{UserId: uid.String(), DomainId: "d1"}
// Create a snapshot and a cache entry pointing to it.
snap := &happydns.ObservationSnapshot{
Target: target,
CollectedAt: time.Now(),
Data: map[happydns.ObservationKey]json.RawMessage{
"obs_a": json.RawMessage(`{"x":1}`),
},
}
if err := store.CreateSnapshot(snap); err != nil {
t.Fatalf("CreateSnapshot() error: %v", err)
}
validEntry := &happydns.ObservationCacheEntry{
SnapshotID: snap.Id,
CollectedAt: snap.CollectedAt,
}
if err := store.PutCachedObservation(target, "obs_a", validEntry); err != nil {
t.Fatalf("PutCachedObservation() error: %v", err)
}
// Create a stale cache entry pointing to a non-existent snapshot.
staleSnapID, _ := happydns.NewRandomIdentifier()
staleEntry := &happydns.ObservationCacheEntry{
SnapshotID: staleSnapID,
CollectedAt: time.Now().Add(-time.Hour),
}
if err := store.PutCachedObservation(target, "obs_stale", staleEntry); err != nil {
t.Fatalf("PutCachedObservation() error: %v", err)
}
// Verify both entries exist before tidy.
if _, err := store.GetCachedObservation(target, "obs_a"); err != nil {
t.Fatalf("expected valid cache entry to exist: %v", err)
}
if _, err := store.GetCachedObservation(target, "obs_stale"); err != nil {
t.Fatalf("expected stale cache entry to exist: %v", err)
}
// Run tidy.
tu := usecase.NewTidyUpUsecase(store)
if err := tu.TidyObservationCache(); err != nil {
t.Fatalf("TidyObservationCache() error: %v", err)
}
// Valid entry should still exist.
if _, err := store.GetCachedObservation(target, "obs_a"); err != nil {
t.Errorf("expected valid cache entry to survive tidy: %v", err)
}
// Stale entry should be removed.
if _, err := store.GetCachedObservation(target, "obs_stale"); err == nil {
t.Error("expected stale cache entry to be removed by tidy")
}
}
func TestTidyObservationCache_EmptyCache(t *testing.T) {
store, err := inmemory.Instantiate()
if err != nil {
t.Fatalf("Instantiate() returned error: %v", err)
}
tu := usecase.NewTidyUpUsecase(store)
if err := tu.TidyObservationCache(); err != nil {
t.Fatalf("TidyObservationCache() on empty cache error: %v", err)
}
}

View file

@ -30,6 +30,7 @@ type TidyUpUseCase interface {
TidyCheckPlans() error
TidyCheckerConfigurations() error
TidyExecutions() error
TidyObservationCache() error
TidySnapshots() error
TidyDomains() error
TidyDomainLogs() error