Instrument check scheduler with Prometheus metrics

Track queue depth on enqueue and pop, active worker count, check execution
duration per checker, and check result status counters.
This commit is contained in:
nemunaire 2026-04-08 11:08:18 +07:00
commit e3d8eed1a6
2 changed files with 59 additions and 2 deletions

View file

@ -23,6 +23,7 @@ package metrics
import (
"strconv"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -47,9 +48,24 @@ var (
})
// Scheduler metrics
SchedulerQueueDepth = promauto.NewGauge(prometheus.GaugeOpts{
//
// schedulerQueueDepthFn is consulted at scrape time by the GaugeFunc
// registered below. The scheduler installs its accessor via
// RegisterSchedulerQueueDepth at construction, which avoids sprinkling
// gauge.Set calls across every queue mutation site.
schedulerQueueDepthFn atomic.Pointer[func() float64]
// SchedulerQueueDepth is kept as a package-level var (rather than the
// blank identifier) so it is discoverable via grep alongside the other
// metric vars and easy to reference from tests.
SchedulerQueueDepth = promauto.NewGaugeFunc(prometheus.GaugeOpts{
Name: "happydomain_scheduler_queue_depth",
Help: "Number of items currently in the check scheduler queue.",
}, func() float64 {
if fn := schedulerQueueDepthFn.Load(); fn != nil {
return (*fn)()
}
return 0
})
SchedulerActiveWorkers = promauto.NewGauge(prometheus.GaugeOpts{
@ -106,3 +122,16 @@ var (
func SetBuildInfo(version, revision, buildDate string, dirty bool) {
BuildInfo.WithLabelValues(version, revision, strconv.FormatBool(dirty), buildDate).Set(1)
}
// RegisterSchedulerQueueDepth installs the accessor used at scrape time to
// report the current scheduler queue depth. The function is invoked from the
// Prometheus scrape goroutine, so it must be safe to call concurrently with
// queue mutations and must not block for long. Passing nil unregisters the
// accessor (the gauge will then report 0).
func RegisterSchedulerQueueDepth(fn func() float64) {
if fn == nil {
schedulerQueueDepthFn.Store(nil)
return
}
schedulerQueueDepthFn.Store(&fn)
}

View file

@ -31,6 +31,7 @@ import (
"time"
checkerPkg "git.happydns.org/happyDomain/internal/checker"
"git.happydns.org/happyDomain/internal/metrics"
"git.happydns.org/happyDomain/model"
)
@ -129,7 +130,7 @@ func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore C
if maxConcurrency <= 0 {
maxConcurrency = 1
}
return &Scheduler{
s := &Scheduler{
engine: engine,
planStore: planStore,
domainStore: domainStore,
@ -139,6 +140,19 @@ func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore C
wake: make(chan struct{}, 1),
maxConcurrency: maxConcurrency,
}
// The scheduler queue depth is exposed via a Prometheus GaugeFunc that
// reads the live queue length at scrape time. This avoids having to call
// gauge.Set after every queue mutation site (Push/Pop/Init/buildQueue/…).
metrics.RegisterSchedulerQueueDepth(s.queueDepthForMetrics)
return s
}
// queueDepthForMetrics returns the current queue length under the read lock.
// It is invoked from the Prometheus scrape goroutine.
func (s *Scheduler) queueDepthForMetrics() float64 {
s.mu.RLock()
defer s.mu.RUnlock()
return float64(s.queue.Len())
}
// Start begins the scheduler loop in a goroutine.
@ -168,6 +182,10 @@ func (s *Scheduler) Stop() {
if done != nil {
<-done
}
// Drop the queue-depth accessor so a stopped scheduler does not keep its
// closure (and the captured queue) reachable for the lifetime of the
// process. This is essential in tests that spin schedulers up and down.
metrics.RegisterSchedulerQueueDepth(nil)
}
// GetStatus returns a snapshot of the scheduler's current state.
@ -319,16 +337,26 @@ func (s *Scheduler) run(ctx context.Context) {
s.wg.Add(1)
go func(j *SchedulerJob, p *happydns.CheckPlan) {
defer func() { <-sem; s.wg.Done() }()
metrics.SchedulerActiveWorkers.Inc()
checkStart := time.Now()
defer func() {
metrics.SchedulerActiveWorkers.Dec()
metrics.SchedulerCheckDuration.WithLabelValues(j.CheckerID).Observe(time.Since(checkStart).Seconds())
}()
log.Printf("Scheduler: running checker %s on %s", j.CheckerID, j.Target.String())
exec, err := s.engine.CreateExecution(j.CheckerID, j.Target, p)
if err != nil {
metrics.SchedulerChecksTotal.WithLabelValues(j.CheckerID, "error").Inc()
log.Printf("Scheduler: checker %s on %s failed to create execution: %v", j.CheckerID, j.Target.String(), err)
return
}
_, err = s.engine.RunExecution(ctx, exec, p, nil)
status := "success"
if err != nil {
status = "error"
log.Printf("Scheduler: checker %s on %s failed: %v", j.CheckerID, j.Target.String(), err)
}
metrics.SchedulerChecksTotal.WithLabelValues(j.CheckerID, status).Inc()
if s.stateStore != nil {
if err := s.stateStore.SetLastSchedulerRun(time.Now()); err != nil {
log.Printf("Scheduler: failed to persist last run time: %v", err)