happyDomain/internal/usecase/checker/scheduler.go
Pierre-Olivier Mercier 88663ed4cc checkers: add incremental scheduler updates on domain/zone changes
Instead of rebuilding the entire scheduler queue, incrementally add or
remove jobs when domains are created/deleted or zones are
imported/published. A wake channel interrupts the run loop so new jobs
are picked up immediately. A jobKeys index prevents duplicate entries.

Hook points: domain creation, domain deletion, zone import, and zone
publish (correction apply) all notify the scheduler via the narrow
SchedulerDomainNotifier interface, wired through setter methods to
avoid initialization ordering issues.
2026-04-05 17:09:54 +07:00

717 lines
19 KiB
Go

// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2025 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package checker
import (
"container/heap"
"context"
"hash/fnv"
"log"
"slices"
"sort"
"sync"
"time"
checkerPkg "git.happydns.org/happyDomain/internal/checker"
"git.happydns.org/happyDomain/model"
)
const (
minSpacing = 2 * time.Second
maxCatchUpWindow = 10 * time.Minute
defaultInterval = 24 * time.Hour
)
// SchedulerJob represents a single scheduled checker execution.
type SchedulerJob struct {
CheckerID string `json:"checkerID"`
Target happydns.CheckTarget `json:"target"`
PlanID *happydns.Identifier `json:"planID" swaggertype:"string"`
Interval time.Duration `json:"interval" swaggertype:"integer"`
NextRun time.Time `json:"nextRun"`
index int // heap index
}
// SchedulerQueue is a min-heap of SchedulerJobs sorted by NextRun.
type SchedulerQueue []*SchedulerJob
func (q SchedulerQueue) Len() int { return len(q) }
func (q SchedulerQueue) Less(i, j int) bool { return q[i].NextRun.Before(q[j].NextRun) }
func (q SchedulerQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *SchedulerQueue) Push(x any) {
n := len(*q)
job := x.(*SchedulerJob)
job.index = n
*q = append(*q, job)
}
func (q *SchedulerQueue) Pop() any {
old := *q
n := len(old)
job := old[n-1]
old[n-1] = nil
job.index = -1
*q = old[:n-1]
return job
}
func (q *SchedulerQueue) Peek() *SchedulerJob {
if len(*q) == 0 {
return nil
}
return (*q)[0]
}
// SchedulerStatus holds a snapshot of the scheduler's current state.
type SchedulerStatus struct {
Running bool `json:"running"`
JobCount int `json:"job_count"`
NextJobs []*SchedulerJob `json:"next_jobs,omitempty"`
}
// Scheduler manages periodic execution of checkers.
type Scheduler struct {
queue SchedulerQueue
jobKeys map[string]bool
engine happydns.CheckerEngine
planStore CheckPlanStorage
domainStore DomainLister
zoneStore ZoneGetter
stateStore SchedulerStateStorage
cancel context.CancelFunc
wake chan struct{}
mu sync.RWMutex
running bool
ctx context.Context
maxConcurrency int
}
// NewScheduler creates a new Scheduler.
func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore CheckPlanStorage, domainStore DomainLister, zoneStore ZoneGetter, stateStore SchedulerStateStorage) *Scheduler {
if maxConcurrency <= 0 {
maxConcurrency = 1
}
return &Scheduler{
engine: engine,
planStore: planStore,
domainStore: domainStore,
zoneStore: zoneStore,
stateStore: stateStore,
jobKeys: make(map[string]bool),
wake: make(chan struct{}, 1),
maxConcurrency: maxConcurrency,
}
}
// Start begins the scheduler loop in a goroutine.
func (s *Scheduler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
s.mu.Lock()
s.ctx = ctx
s.cancel = cancel
s.running = true
s.buildQueue()
s.spreadOverdueJobs()
s.mu.Unlock()
go s.run(ctx)
}
// Stop halts the scheduler.
func (s *Scheduler) Stop() {
s.mu.Lock()
s.running = false
cancel := s.cancel
s.mu.Unlock()
if cancel != nil {
cancel()
}
}
// GetStatus returns a snapshot of the scheduler's current state.
func (s *Scheduler) GetStatus() SchedulerStatus {
s.mu.RLock()
defer s.mu.RUnlock()
status := SchedulerStatus{
Running: s.running,
JobCount: s.queue.Len(),
}
n := min(20, s.queue.Len())
if n > 0 {
all := make([]*SchedulerJob, s.queue.Len())
copy(all, s.queue)
sort.Slice(all, func(i, j int) bool {
return all[i].NextRun.Before(all[j].NextRun)
})
status.NextJobs = all[:n]
}
return status
}
// SetEnabled starts or stops the scheduler.
func (s *Scheduler) SetEnabled(ctx context.Context, enabled bool) error {
s.Stop()
if enabled {
s.mu.Lock()
parentCtx := s.ctx
s.mu.Unlock()
if parentCtx == nil {
parentCtx = ctx
}
s.Start(parentCtx)
}
return nil
}
// RebuildQueue rebuilds the scheduler queue and returns the new job count.
func (s *Scheduler) RebuildQueue() int {
s.mu.Lock()
defer s.mu.Unlock()
s.buildQueue()
s.spreadOverdueJobs()
return s.queue.Len()
}
func (s *Scheduler) run(ctx context.Context) {
sem := make(chan struct{}, s.maxConcurrency)
for {
s.mu.RLock()
qLen := s.queue.Len()
s.mu.RUnlock()
if qLen == 0 {
select {
case <-ctx.Done():
return
case <-s.wake:
continue
case <-time.After(1 * time.Minute):
s.mu.Lock()
s.buildQueue()
s.mu.Unlock()
continue
}
}
s.mu.RLock()
next := s.queue.Peek()
var delay time.Duration
if next != nil {
delay = time.Until(next.NextRun)
}
s.mu.RUnlock()
if delay > 0 {
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return
case <-s.wake:
timer.Stop()
continue
case <-timer.C:
}
}
s.mu.Lock()
if s.queue.Len() == 0 {
s.mu.Unlock()
continue
}
job := heap.Pop(&s.queue).(*SchedulerJob)
s.mu.Unlock()
// Find plan if applicable.
var plan *happydns.CheckPlan
if job.PlanID != nil {
p, err := s.planStore.GetCheckPlan(*job.PlanID)
if err == nil {
plan = p
}
}
// Acquire a concurrency slot, but stay responsive to cancellation.
select {
case sem <- struct{}{}:
default:
log.Printf("Scheduler: all %d workers busy, waiting for a slot (checker %s on %s)", s.maxConcurrency, job.CheckerID, job.Target.String())
select {
case sem <- struct{}{}:
case <-ctx.Done():
return
}
}
go func(j *SchedulerJob, p *happydns.CheckPlan) {
defer func() { <-sem }()
log.Printf("Scheduler: running checker %s on %s", j.CheckerID, j.Target.String())
exec, err := s.engine.CreateExecution(j.CheckerID, j.Target, p)
if err != nil {
log.Printf("Scheduler: checker %s on %s failed to create execution: %v", j.CheckerID, j.Target.String(), err)
return
}
_, err = s.engine.RunExecution(ctx, exec, p, nil)
if err != nil {
log.Printf("Scheduler: checker %s on %s failed: %v", j.CheckerID, j.Target.String(), err)
}
if s.stateStore != nil {
if err := s.stateStore.SetLastSchedulerRun(time.Now()); err != nil {
log.Printf("Scheduler: failed to persist last run time: %v", err)
}
}
}(job, plan)
// Advance to next cycle, skipping past cycles.
now := time.Now()
for job.NextRun.Before(now) {
job.NextRun = job.NextRun.Add(job.Interval)
}
// Add jitter for next cycle.
job.NextRun = job.NextRun.Add(computeJitter(job.CheckerID, job.Target.String(), job.NextRun, job.Interval))
key := job.CheckerID + "|" + job.Target.String()
s.mu.Lock()
heap.Push(&s.queue, job)
s.jobKeys[key] = true
s.mu.Unlock()
}
}
func (s *Scheduler) buildQueue() {
s.queue = s.queue[:0]
s.jobKeys = make(map[string]bool)
var lastRun time.Time
if s.stateStore != nil {
if t, err := s.stateStore.GetLastSchedulerRun(); err != nil {
log.Printf("Scheduler: failed to read last run time: %v", err)
} else {
lastRun = t
}
}
checkers := checkerPkg.GetCheckers()
plans, err := s.loadAllPlans()
if err != nil {
log.Printf("Scheduler: failed to load plans: %v", err)
}
// Build a set of disabled (checker, target) pairs.
disabledSet := make(map[string]bool)
planMap := make(map[string]*happydns.CheckPlan)
for _, p := range plans {
key := p.CheckerID + "|" + p.Target.String()
planMap[key] = p
if p.IsFullyDisabled() {
disabledSet[key] = true
}
}
// Collect checkers by scope for efficient iteration.
var domainCheckers, serviceCheckers []struct {
id string
def *happydns.CheckerDefinition
}
for checkerID, def := range checkers {
if def.Availability.ApplyToDomain {
domainCheckers = append(domainCheckers, struct {
id string
def *happydns.CheckerDefinition
}{checkerID, def})
}
if def.Availability.ApplyToService {
serviceCheckers = append(serviceCheckers, struct {
id string
def *happydns.CheckerDefinition
}{checkerID, def})
}
}
// Auto-discovery: enumerate all domains and schedule applicable checkers.
domains := s.loadAllDomains()
for _, domain := range domains {
uid := domain.Owner
did := domain.Id
domainTarget := happydns.CheckTarget{UserId: &uid, DomainId: &did}
for _, c := range domainCheckers {
key := c.id + "|" + domainTarget.String()
if disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(c.def, plan)
offset := computeOffset(c.id, domainTarget.String(), interval)
nextRun := computeNextRun(interval, offset, lastRun)
job := &SchedulerJob{
CheckerID: c.id,
Target: domainTarget,
Interval: interval,
NextRun: nextRun,
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
}
// Service-level discovery: load the latest zone and match services.
if len(serviceCheckers) > 0 {
services := s.loadDomainServices(domain)
for _, svc := range services {
sid := svc.Id
svcTarget := happydns.CheckTarget{UserId: &uid, DomainId: &did, ServiceId: &sid, ServiceType: svc.Type}
for _, c := range serviceCheckers {
if len(c.def.Availability.LimitToServices) > 0 && !slices.Contains(c.def.Availability.LimitToServices, svc.Type) {
continue
}
key := c.id + "|" + svcTarget.String()
if disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(c.def, plan)
offset := computeOffset(c.id, svcTarget.String(), interval)
nextRun := computeNextRun(interval, offset, lastRun)
job := &SchedulerJob{
CheckerID: c.id,
Target: svcTarget,
Interval: interval,
NextRun: nextRun,
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
}
}
}
}
}
// NotifyDomainChange incrementally adds scheduler jobs for a domain
// without rebuilding the entire queue. Call this after a domain is
// created or its zone is imported/published.
func (s *Scheduler) NotifyDomainChange(domain *happydns.Domain) {
checkers := checkerPkg.GetCheckers()
// Load plans relevant to this domain.
uid := domain.Owner
did := domain.Id
domainTarget := happydns.CheckTarget{UserId: &uid, DomainId: &did}
plans, err := s.planStore.ListCheckPlansByTarget(domainTarget)
if err != nil {
log.Printf("Scheduler: NotifyDomainChange: failed to load plans: %v", err)
}
disabledSet := make(map[string]bool)
planMap := make(map[string]*happydns.CheckPlan)
for _, p := range plans {
key := p.CheckerID + "|" + p.Target.String()
planMap[key] = p
if p.IsFullyDisabled() {
disabledSet[key] = true
}
}
var added int
s.mu.Lock()
for checkerID, def := range checkers {
if def.Availability.ApplyToDomain {
key := checkerID + "|" + domainTarget.String()
if s.jobKeys[key] || disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(def, plan)
job := &SchedulerJob{
CheckerID: checkerID,
Target: domainTarget,
Interval: interval,
NextRun: time.Now().Add(computeJitter(checkerID, domainTarget.String(), time.Now(), interval)),
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
added++
}
if def.Availability.ApplyToService {
services := s.loadDomainServices(domain)
for _, svc := range services {
if len(def.Availability.LimitToServices) > 0 && !slices.Contains(def.Availability.LimitToServices, svc.Type) {
continue
}
sid := svc.Id
svcTarget := happydns.CheckTarget{UserId: &uid, DomainId: &did, ServiceId: &sid, ServiceType: svc.Type}
key := checkerID + "|" + svcTarget.String()
if s.jobKeys[key] || disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(def, plan)
job := &SchedulerJob{
CheckerID: checkerID,
Target: svcTarget,
Interval: interval,
NextRun: time.Now().Add(computeJitter(checkerID, svcTarget.String(), time.Now(), interval)),
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
added++
}
}
}
s.mu.Unlock()
if added > 0 {
log.Printf("Scheduler: NotifyDomainChange(%s): added %d jobs", domain.DomainName, added)
// Wake the run loop so it re-evaluates the queue head.
select {
case s.wake <- struct{}{}:
default:
}
}
}
// NotifyDomainRemoved removes all scheduler jobs for the given domain.
func (s *Scheduler) NotifyDomainRemoved(domainID happydns.Identifier) {
s.mu.Lock()
n := 0
for i := 0; i < len(s.queue); {
job := s.queue[i]
if job.Target.DomainId != nil && job.Target.DomainId.Equals(domainID) {
key := job.CheckerID + "|" + job.Target.String()
delete(s.jobKeys, key)
// Swap with last and shrink.
s.queue[i] = s.queue[len(s.queue)-1]
s.queue[len(s.queue)-1] = nil
s.queue = s.queue[:len(s.queue)-1]
n++
} else {
i++
}
}
if n > 0 {
heap.Init(&s.queue)
// Re-index after Init.
for i, job := range s.queue {
job.index = i
}
}
s.mu.Unlock()
if n > 0 {
log.Printf("Scheduler: NotifyDomainRemoved(%s): removed %d jobs", domainID, n)
}
}
func (s *Scheduler) loadAllPlans() ([]*happydns.CheckPlan, error) {
iter, err := s.planStore.ListAllCheckPlans()
if err != nil {
return nil, err
}
defer iter.Close()
var plans []*happydns.CheckPlan
for iter.Next() {
plans = append(plans, iter.Item())
}
return plans, nil
}
func (s *Scheduler) loadAllDomains() []*happydns.Domain {
if s.domainStore == nil {
return nil
}
iter, err := s.domainStore.ListAllDomains()
if err != nil {
log.Printf("Scheduler: failed to list domains for auto-discovery: %v", err)
return nil
}
defer iter.Close()
var domains []*happydns.Domain
for iter.Next() {
d := iter.Item()
domains = append(domains, d)
}
return domains
}
func (s *Scheduler) loadDomainServices(domain *happydns.Domain) []*happydns.ServiceMessage {
if s.zoneStore == nil || len(domain.ZoneHistory) == 0 {
return nil
}
latestZoneID := domain.ZoneHistory[len(domain.ZoneHistory)-1]
zone, err := s.zoneStore.GetZone(latestZoneID)
if err != nil {
log.Printf("Scheduler: failed to load zone %s for domain %s: %v", latestZoneID, domain.DomainName, err)
return nil
}
var services []*happydns.ServiceMessage
for _, svcs := range zone.Services {
services = append(services, svcs...)
}
return services
}
func (s *Scheduler) effectiveInterval(def *happydns.CheckerDefinition, plan *happydns.CheckPlan) time.Duration {
interval := defaultInterval
if def.Interval != nil {
interval = def.Interval.Default
}
if plan != nil && plan.Interval != nil {
interval = *plan.Interval
}
// Clamp to bounds.
if def.Interval != nil {
if interval < def.Interval.Min {
interval = def.Interval.Min
}
if interval > def.Interval.Max {
interval = def.Interval.Max
}
}
return interval
}
func (s *Scheduler) spreadOverdueJobs() {
now := time.Now()
var overdue []*SchedulerJob
for s.queue.Len() > 0 && s.queue.Peek().NextRun.Before(now) {
overdue = append(overdue, heap.Pop(&s.queue).(*SchedulerJob))
}
if len(overdue) == 0 {
return
}
window := time.Duration(len(overdue)) * minSpacing
window = min(window, maxCatchUpWindow)
for i, job := range overdue {
delay := window * time.Duration(i) / time.Duration(len(overdue))
job.NextRun = now.Add(delay)
heap.Push(&s.queue, job)
}
}
// GetPlannedJobsForChecker returns a snapshot of scheduled jobs for the given checker and target.
func (s *Scheduler) GetPlannedJobsForChecker(checkerID string, target happydns.CheckTarget) []*SchedulerJob {
s.mu.RLock()
defer s.mu.RUnlock()
tStr := target.String()
var result []*SchedulerJob
for _, job := range s.queue {
if job.CheckerID == checkerID && job.Target.String() == tStr {
cp := *job
result = append(result, &cp)
}
}
return result
}
// computeOffset returns a deterministic offset within the interval.
func computeOffset(checkerID, targetStr string, interval time.Duration) time.Duration {
h := fnv.New64a()
h.Write([]byte(checkerID + targetStr))
return time.Duration(h.Sum64()%uint64(interval.Nanoseconds())) * time.Nanosecond
}
// computeJitter returns a small deterministic jitter (~5% of interval).
func computeJitter(checkerID, targetStr string, cycleTime time.Time, interval time.Duration) time.Duration {
h := fnv.New64a()
h.Write([]byte(checkerID + targetStr + cycleTime.Format(time.RFC3339)))
maxJitter := interval / 20 // 5%
if maxJitter <= 0 {
return 0
}
return time.Duration(h.Sum64()%uint64(maxJitter.Nanoseconds())) * time.Nanosecond
}
// computeNextRun calculates the next run time based on interval, offset, and
// the last time the scheduler was known to be active. When lastActive is zero
// (first execution), it behaves as before. Otherwise it detects jobs that were
// missed during downtime (slot in (lastActive, now]) and schedules them
// immediately so spreadOverdueJobs can stagger them, while skipping jobs that
// already ran (slot <= lastActive).
func computeNextRun(interval, offset time.Duration, lastActive time.Time) time.Time {
now := time.Now()
// Use Unix nanoseconds to avoid time.Duration overflow with ancient epochs.
nowNano := now.UnixNano()
intervalNano := int64(interval)
offsetNano := int64(offset) % intervalNano
// Find the most recent grid slot <= now.
cycleN := (nowNano - offsetNano) / intervalNano
slotNano := cycleN*intervalNano + offsetNano
if slotNano > nowNano {
slotNano -= intervalNano
}
slot := time.Unix(0, slotNano)
if lastActive.IsZero() {
// First execution: schedule at the next future slot.
if !slot.After(now) {
return slot.Add(interval)
}
return slot
}
// Slot was missed during downtime — schedule now for catch-up.
if slot.After(lastActive) && !slot.After(now) {
return now
}
// Slot already executed before shutdown — advance to next cycle.
return slot.Add(interval)
}