From e3d8eed1a6f9d49f7b863b83bfcef3d168bf4390 Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Wed, 8 Apr 2026 11:08:18 +0700 Subject: [PATCH] Instrument check scheduler with Prometheus metrics Track queue depth on enqueue and pop, active worker count, check execution duration per checker, and check result status counters. --- internal/metrics/metrics.go | 31 ++++++++++++++++++++++++++- internal/usecase/checker/scheduler.go | 30 +++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 5e17c1fb..a3d5e1e3 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -23,6 +23,7 @@ package metrics import ( "strconv" + "sync/atomic" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -47,9 +48,24 @@ var ( }) // Scheduler metrics - SchedulerQueueDepth = promauto.NewGauge(prometheus.GaugeOpts{ + // + // schedulerQueueDepthFn is consulted at scrape time by the GaugeFunc + // registered below. The scheduler installs its accessor via + // RegisterSchedulerQueueDepth at construction, which avoids sprinkling + // gauge.Set calls across every queue mutation site. + schedulerQueueDepthFn atomic.Pointer[func() float64] + + // SchedulerQueueDepth is kept as a package-level var (rather than the + // blank identifier) so it is discoverable via grep alongside the other + // metric vars and easy to reference from tests. + SchedulerQueueDepth = promauto.NewGaugeFunc(prometheus.GaugeOpts{ Name: "happydomain_scheduler_queue_depth", Help: "Number of items currently in the check scheduler queue.", + }, func() float64 { + if fn := schedulerQueueDepthFn.Load(); fn != nil { + return (*fn)() + } + return 0 }) SchedulerActiveWorkers = promauto.NewGauge(prometheus.GaugeOpts{ @@ -106,3 +122,16 @@ var ( func SetBuildInfo(version, revision, buildDate string, dirty bool) { BuildInfo.WithLabelValues(version, revision, strconv.FormatBool(dirty), buildDate).Set(1) } + +// RegisterSchedulerQueueDepth installs the accessor used at scrape time to +// report the current scheduler queue depth. The function is invoked from the +// Prometheus scrape goroutine, so it must be safe to call concurrently with +// queue mutations and must not block for long. Passing nil unregisters the +// accessor (the gauge will then report 0). +func RegisterSchedulerQueueDepth(fn func() float64) { + if fn == nil { + schedulerQueueDepthFn.Store(nil) + return + } + schedulerQueueDepthFn.Store(&fn) +} diff --git a/internal/usecase/checker/scheduler.go b/internal/usecase/checker/scheduler.go index 145f00c3..9ed86443 100644 --- a/internal/usecase/checker/scheduler.go +++ b/internal/usecase/checker/scheduler.go @@ -31,6 +31,7 @@ import ( "time" checkerPkg "git.happydns.org/happyDomain/internal/checker" + "git.happydns.org/happyDomain/internal/metrics" "git.happydns.org/happyDomain/model" ) @@ -129,7 +130,7 @@ func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore C if maxConcurrency <= 0 { maxConcurrency = 1 } - return &Scheduler{ + s := &Scheduler{ engine: engine, planStore: planStore, domainStore: domainStore, @@ -139,6 +140,19 @@ func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore C wake: make(chan struct{}, 1), maxConcurrency: maxConcurrency, } + // The scheduler queue depth is exposed via a Prometheus GaugeFunc that + // reads the live queue length at scrape time. This avoids having to call + // gauge.Set after every queue mutation site (Push/Pop/Init/buildQueue/…). + metrics.RegisterSchedulerQueueDepth(s.queueDepthForMetrics) + return s +} + +// queueDepthForMetrics returns the current queue length under the read lock. +// It is invoked from the Prometheus scrape goroutine. +func (s *Scheduler) queueDepthForMetrics() float64 { + s.mu.RLock() + defer s.mu.RUnlock() + return float64(s.queue.Len()) } // Start begins the scheduler loop in a goroutine. @@ -168,6 +182,10 @@ func (s *Scheduler) Stop() { if done != nil { <-done } + // Drop the queue-depth accessor so a stopped scheduler does not keep its + // closure (and the captured queue) reachable for the lifetime of the + // process. This is essential in tests that spin schedulers up and down. + metrics.RegisterSchedulerQueueDepth(nil) } // GetStatus returns a snapshot of the scheduler's current state. @@ -319,16 +337,26 @@ func (s *Scheduler) run(ctx context.Context) { s.wg.Add(1) go func(j *SchedulerJob, p *happydns.CheckPlan) { defer func() { <-sem; s.wg.Done() }() + metrics.SchedulerActiveWorkers.Inc() + checkStart := time.Now() + defer func() { + metrics.SchedulerActiveWorkers.Dec() + metrics.SchedulerCheckDuration.WithLabelValues(j.CheckerID).Observe(time.Since(checkStart).Seconds()) + }() log.Printf("Scheduler: running checker %s on %s", j.CheckerID, j.Target.String()) exec, err := s.engine.CreateExecution(j.CheckerID, j.Target, p) if err != nil { + metrics.SchedulerChecksTotal.WithLabelValues(j.CheckerID, "error").Inc() log.Printf("Scheduler: checker %s on %s failed to create execution: %v", j.CheckerID, j.Target.String(), err) return } _, err = s.engine.RunExecution(ctx, exec, p, nil) + status := "success" if err != nil { + status = "error" log.Printf("Scheduler: checker %s on %s failed: %v", j.CheckerID, j.Target.String(), err) } + metrics.SchedulerChecksTotal.WithLabelValues(j.CheckerID, status).Inc() if s.stateStore != nil { if err := s.stateStore.SetLastSchedulerRun(time.Now()); err != nil { log.Printf("Scheduler: failed to persist last run time: %v", err)