happyDomain/internal/usecase/checker/checker_engine.go
Pierre-Olivier Mercier f17b046e1b checker: add execution callback for notification integration
Add ExecutionCallbackSetter interface and onComplete field to the
checker engine. After a successful execution, the callback is fired
asynchronously so it never blocks the checker pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 08:49:29 +07:00

257 lines
8.5 KiB
Go

// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2025 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package checker
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
checkerPkg "git.happydns.org/happyDomain/internal/checker"
"git.happydns.org/happyDomain/model"
)
// ExecutionCallbackSetter is implemented by checker engines that support
// notification callbacks after execution completion.
type ExecutionCallbackSetter interface {
SetExecutionCallback(func(*happydns.Execution, *happydns.CheckEvaluation))
}
// checkerEngine implements the happydns.CheckerEngine interface.
type checkerEngine struct {
optionsUC *CheckerOptionsUsecase
evalStore CheckEvaluationStorage
execStore ExecutionStorage
snapStore ObservationSnapshotStorage
cacheStore ObservationCacheStorage
onComplete func(exec *happydns.Execution, eval *happydns.CheckEvaluation)
}
// SetExecutionCallback registers a callback invoked after each successful execution.
func (e *checkerEngine) SetExecutionCallback(cb func(*happydns.Execution, *happydns.CheckEvaluation)) {
e.onComplete = cb
}
// NewCheckerEngine creates a new CheckerEngine implementation.
func NewCheckerEngine(
optionsUC *CheckerOptionsUsecase,
evalStore CheckEvaluationStorage,
execStore ExecutionStorage,
snapStore ObservationSnapshotStorage,
cacheStore ObservationCacheStorage,
) happydns.CheckerEngine {
return &checkerEngine{
optionsUC: optionsUC,
evalStore: evalStore,
execStore: execStore,
snapStore: snapStore,
cacheStore: cacheStore,
}
}
// CreateExecution validates the checker and creates a pending Execution record.
func (e *checkerEngine) CreateExecution(checkerID string, target happydns.CheckTarget, plan *happydns.CheckPlan) (*happydns.Execution, error) {
if checkerPkg.FindChecker(checkerID) == nil {
return nil, fmt.Errorf("%w: %s", happydns.ErrCheckerNotFound, checkerID)
}
// Determine trigger info.
trigger := happydns.TriggerInfo{Type: happydns.TriggerManual}
var planID *happydns.Identifier
if plan != nil {
planID = &plan.Id
trigger.PlanID = planID
trigger.Type = happydns.TriggerSchedule
}
// Create execution record.
exec := &happydns.Execution{
CheckerID: checkerID,
PlanID: planID,
Target: target,
Trigger: trigger,
StartedAt: time.Now(),
Status: happydns.ExecutionPending,
}
if err := e.execStore.CreateExecution(exec); err != nil {
return nil, fmt.Errorf("creating execution: %w", err)
}
return exec, nil
}
// RunExecution takes an existing execution and runs the checker pipeline.
func (e *checkerEngine) RunExecution(ctx context.Context, exec *happydns.Execution, plan *happydns.CheckPlan, runOpts happydns.CheckerOptions) (*happydns.CheckEvaluation, error) {
log.Printf("CheckerEngine: running checker %s on %s", exec.CheckerID, exec.Target.String())
def := checkerPkg.FindChecker(exec.CheckerID)
if def == nil {
endTime := time.Now()
exec.Status = happydns.ExecutionFailed
exec.EndedAt = &endTime
exec.Error = fmt.Sprintf("checker not found: %s", exec.CheckerID)
if err := e.execStore.UpdateExecution(exec); err != nil {
log.Printf("CheckerEngine: failed to update execution: %v", err)
}
return nil, fmt.Errorf("%w: %s", happydns.ErrCheckerNotFound, exec.CheckerID)
}
// Mark as running.
exec.Status = happydns.ExecutionRunning
if err := e.execStore.UpdateExecution(exec); err != nil {
log.Printf("CheckerEngine: failed to update execution: %v", err)
}
// Run the pipeline and handle failure.
result, eval, err := e.runPipeline(ctx, def, exec.Target, plan, exec.PlanID, runOpts)
if err != nil {
log.Printf("CheckerEngine: checker %s on %s failed: %v", exec.CheckerID, exec.Target.String(), err)
endTime := time.Now()
exec.Status = happydns.ExecutionFailed
exec.EndedAt = &endTime
exec.Error = err.Error()
if err := e.execStore.UpdateExecution(exec); err != nil {
log.Printf("CheckerEngine: failed to update execution: %v", err)
}
return nil, err
}
// Mark as done.
endTime := time.Now()
exec.Status = happydns.ExecutionDone
exec.EndedAt = &endTime
exec.Result = result
exec.EvaluationID = &eval.Id
if err := e.execStore.UpdateExecution(exec); err != nil {
log.Printf("CheckerEngine: failed to update execution: %v", err)
}
// Fire notification callback asynchronously.
if e.onComplete != nil {
go e.onComplete(exec, eval)
}
return eval, nil
}
func (e *checkerEngine) runPipeline(ctx context.Context, def *happydns.CheckerDefinition, target happydns.CheckTarget, plan *happydns.CheckPlan, planID *happydns.Identifier, runOpts happydns.CheckerOptions) (happydns.CheckState, *happydns.CheckEvaluation, error) {
// Resolve options (stored + run + auto-fill).
mergedOpts, err := e.optionsUC.BuildMergedCheckerOptionsWithAutoFill(def.ID, happydns.TargetIdentifier(target.UserId), happydns.TargetIdentifier(target.DomainId), happydns.TargetIdentifier(target.ServiceId), runOpts)
if err != nil {
return happydns.CheckState{}, nil, fmt.Errorf("resolving options: %w", err)
}
// Build observation cache lookup for cross-checker reuse.
var cacheLookup checkerPkg.ObservationCacheLookup
if e.cacheStore != nil {
cacheLookup = func(target happydns.CheckTarget, key happydns.ObservationKey) (json.RawMessage, time.Time, error) {
entry, err := e.cacheStore.GetCachedObservation(target, key)
if err != nil {
return nil, time.Time{}, err
}
snap, err := e.snapStore.GetSnapshot(entry.SnapshotID)
if err != nil {
return nil, time.Time{}, err
}
raw, ok := snap.Data[key]
if !ok {
return nil, time.Time{}, fmt.Errorf("observation %q not in snapshot", key)
}
return raw, entry.CollectedAt, nil
}
}
var freshness time.Duration
if plan != nil && plan.Interval != nil {
freshness = *plan.Interval
} else if plan != nil && def.Interval != nil {
freshness = def.Interval.Default
}
// Create observation context for lazy data collection.
obsCtx := checkerPkg.NewObservationContext(target, mergedOpts, cacheLookup, freshness)
// If an endpoint is configured, override observation providers with HTTP transport.
if endpoint, ok := mergedOpts["endpoint"].(string); ok && endpoint != "" {
for _, key := range def.ObservationKeys {
obsCtx.SetProviderOverride(key, checkerPkg.NewHTTPObservationProvider(key, endpoint))
}
}
// Evaluate all rules, skipping disabled ones.
states := make([]happydns.CheckState, 0, len(def.Rules))
for _, rule := range def.Rules {
if plan != nil && !plan.IsRuleEnabled(rule.Name()) {
continue
}
state := rule.Evaluate(ctx, obsCtx, mergedOpts)
if state.Code == "" {
state.Code = rule.Name()
}
states = append(states, state)
}
// Aggregate results.
aggregator := def.Aggregator
if aggregator == nil {
aggregator = checkerPkg.WorstStatusAggregator{}
}
result := aggregator.Aggregate(states)
// Persist observation snapshot.
snap := &happydns.ObservationSnapshot{
Target: target,
CollectedAt: time.Now(),
Data: obsCtx.Data(),
}
if err := e.snapStore.CreateSnapshot(snap); err != nil {
return happydns.CheckState{}, nil, fmt.Errorf("creating snapshot: %w", err)
}
// Update observation cache pointers for cross-checker reuse.
if e.cacheStore != nil {
for key := range snap.Data {
_ = e.cacheStore.PutCachedObservation(target, key, &happydns.ObservationCacheEntry{
SnapshotID: snap.Id,
CollectedAt: snap.CollectedAt,
})
}
}
// Persist evaluation.
eval := &happydns.CheckEvaluation{
PlanID: planID,
CheckerID: def.ID,
Target: target,
SnapshotID: snap.Id,
EvaluatedAt: time.Now(),
States: states,
}
if err := e.evalStore.CreateEvaluation(eval); err != nil {
return happydns.CheckState{}, nil, fmt.Errorf("creating evaluation: %w", err)
}
return result, eval, nil
}