checkers: add incremental scheduler updates on domain/zone changes

Instead of rebuilding the entire scheduler queue, incrementally add or
remove jobs when domains are created/deleted or zones are
imported/published. A wake channel interrupts the run loop so new jobs
are picked up immediately. A jobKeys index prevents duplicate entries.

Hook points: domain creation, domain deletion, zone import, and zone
publish (correction apply) all notify the scheduler via the narrow
SchedulerDomainNotifier interface, wired through setter methods to
avoid initialization ordering issues.
This commit is contained in:
nemunaire 2026-04-05 12:09:07 +07:00
commit 808efa002d
7 changed files with 210 additions and 16 deletions

View file

@ -267,6 +267,10 @@ func (app *App) initUsecases() {
)
app.usecases.checkerScheduler = checkerUC.NewScheduler(app.usecases.checkerEngine, app.cfg.CheckerMaxConcurrency, app.store, app.store, app.store, app.store)
app.usecases.checkerStatusUC.SetPlannedJobProvider(app.usecases.checkerScheduler)
// Wire scheduler notifications for incremental queue updates.
domainService.SetSchedulerNotifier(app.usecases.checkerScheduler)
app.usecases.orchestrator.SetSchedulerNotifier(app.usecases.checkerScheduler)
}
func (app *App) setupRouter() {

View file

@ -96,12 +96,14 @@ type SchedulerStatus struct {
// Scheduler manages periodic execution of checkers.
type Scheduler struct {
queue SchedulerQueue
jobKeys map[string]bool
engine happydns.CheckerEngine
planStore CheckPlanStorage
domainStore DomainLister
zoneStore ZoneGetter
stateStore SchedulerStateStorage
cancel context.CancelFunc
wake chan struct{}
mu sync.RWMutex
running bool
ctx context.Context
@ -119,6 +121,8 @@ func NewScheduler(engine happydns.CheckerEngine, maxConcurrency int, planStore C
domainStore: domainStore,
zoneStore: zoneStore,
stateStore: stateStore,
jobKeys: make(map[string]bool),
wake: make(chan struct{}, 1),
maxConcurrency: maxConcurrency,
}
}
@ -206,6 +210,8 @@ func (s *Scheduler) run(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-s.wake:
continue
case <-time.After(1 * time.Minute):
s.mu.Lock()
s.buildQueue()
@ -228,6 +234,9 @@ func (s *Scheduler) run(ctx context.Context) {
case <-ctx.Done():
timer.Stop()
return
case <-s.wake:
timer.Stop()
continue
case <-timer.C:
}
}
@ -287,14 +296,17 @@ func (s *Scheduler) run(ctx context.Context) {
}
// Add jitter for next cycle.
job.NextRun = job.NextRun.Add(computeJitter(job.CheckerID, job.Target.String(), job.NextRun, job.Interval))
key := job.CheckerID + "|" + job.Target.String()
s.mu.Lock()
heap.Push(&s.queue, job)
s.jobKeys[key] = true
s.mu.Unlock()
}
}
func (s *Scheduler) buildQueue() {
s.queue = s.queue[:0]
s.jobKeys = make(map[string]bool)
var lastRun time.Time
if s.stateStore != nil {
@ -370,6 +382,7 @@ func (s *Scheduler) buildQueue() {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
}
// Service-level discovery: load the latest zone and match services.
@ -403,12 +416,137 @@ func (s *Scheduler) buildQueue() {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
}
}
}
}
}
// NotifyDomainChange incrementally adds scheduler jobs for a domain
// without rebuilding the entire queue. Call this after a domain is
// created or its zone is imported/published.
func (s *Scheduler) NotifyDomainChange(domain *happydns.Domain) {
checkers := checkerPkg.GetCheckers()
// Load plans relevant to this domain.
uid := domain.Owner
did := domain.Id
domainTarget := happydns.CheckTarget{UserId: uid.String(), DomainId: did.String()}
plans, err := s.planStore.ListCheckPlansByTarget(domainTarget)
if err != nil {
log.Printf("Scheduler: NotifyDomainChange: failed to load plans: %v", err)
}
disabledSet := make(map[string]bool)
planMap := make(map[string]*happydns.CheckPlan)
for _, p := range plans {
key := p.CheckerID + "|" + p.Target.String()
planMap[key] = p
if p.IsFullyDisabled() {
disabledSet[key] = true
}
}
var added int
s.mu.Lock()
for checkerID, def := range checkers {
if def.Availability.ApplyToDomain {
key := checkerID + "|" + domainTarget.String()
if s.jobKeys[key] || disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(def, plan)
job := &SchedulerJob{
CheckerID: checkerID,
Target: domainTarget,
Interval: interval,
NextRun: time.Now().Add(computeJitter(checkerID, domainTarget.String(), time.Now(), interval)),
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
added++
}
if def.Availability.ApplyToService {
services := s.loadDomainServices(domain)
for _, svc := range services {
if len(def.Availability.LimitToServices) > 0 && !slices.Contains(def.Availability.LimitToServices, svc.Type) {
continue
}
sid := svc.Id
svcTarget := happydns.CheckTarget{UserId: uid.String(), DomainId: did.String(), ServiceId: sid.String(), ServiceType: svc.Type}
key := checkerID + "|" + svcTarget.String()
if s.jobKeys[key] || disabledSet[key] {
continue
}
plan := planMap[key]
interval := s.effectiveInterval(def, plan)
job := &SchedulerJob{
CheckerID: checkerID,
Target: svcTarget,
Interval: interval,
NextRun: time.Now().Add(computeJitter(checkerID, svcTarget.String(), time.Now(), interval)),
}
if plan != nil {
job.PlanID = &plan.Id
}
heap.Push(&s.queue, job)
s.jobKeys[key] = true
added++
}
}
}
s.mu.Unlock()
if added > 0 {
log.Printf("Scheduler: NotifyDomainChange(%s): added %d jobs", domain.DomainName, added)
// Wake the run loop so it re-evaluates the queue head.
select {
case s.wake <- struct{}{}:
default:
}
}
}
// NotifyDomainRemoved removes all scheduler jobs for the given domain.
func (s *Scheduler) NotifyDomainRemoved(domainID happydns.Identifier) {
s.mu.Lock()
n := 0
for i := 0; i < len(s.queue); {
job := s.queue[i]
if job.Target.DomainId == domainID.String() {
key := job.CheckerID + "|" + job.Target.String()
delete(s.jobKeys, key)
// Swap with last and shrink.
s.queue[i] = s.queue[len(s.queue)-1]
s.queue[len(s.queue)-1] = nil
s.queue = s.queue[:len(s.queue)-1]
n++
} else {
i++
}
}
if n > 0 {
heap.Init(&s.queue)
// Re-index after Init.
for i, job := range s.queue {
job.index = i
}
}
s.mu.Unlock()
if n > 0 {
log.Printf("Scheduler: NotifyDomainRemoved(%s): removed %d jobs", domainID, n)
}
}
func (s *Scheduler) loadAllPlans() ([]*happydns.CheckPlan, error) {
iter, err := s.planStore.ListAllCheckPlans()
if err != nil {

View file

@ -43,6 +43,13 @@ type ZoneGetter interface {
GetZone(id happydns.Identifier) (*happydns.ZoneMessage, error)
}
// SchedulerDomainNotifier is a narrow interface for notifying the scheduler
// about domain changes so it can incrementally update its job queue.
type SchedulerDomainNotifier interface {
NotifyDomainChange(domain *happydns.Domain)
NotifyDomainRemoved(domainID happydns.Identifier)
}
// CheckAutoFillStorage provides access to domain, zone and user data
// needed to resolve auto-fill field values at execution time.
type CheckAutoFillStorage interface {

View file

@ -41,12 +41,20 @@ type DomainExistenceTester interface {
TestDomainExistence(ctx context.Context, provider *happydns.Provider, name string) error
}
// SchedulerDomainNotifier is an optional callback to notify the scheduler
// about domain changes so it can incrementally update its job queue.
type SchedulerDomainNotifier interface {
NotifyDomainChange(domain *happydns.Domain)
NotifyDomainRemoved(domainID happydns.Identifier)
}
type Service struct {
store DomainStorage
providerService ProviderGetter
getZone *zoneUC.GetZoneUsecase
domainExistence DomainExistenceTester
domainLogAppender domainLogUC.DomainLogAppender
store DomainStorage
providerService ProviderGetter
getZone *zoneUC.GetZoneUsecase
domainExistence DomainExistenceTester
domainLogAppender domainLogUC.DomainLogAppender
schedulerNotifier SchedulerDomainNotifier
}
func NewService(
@ -65,6 +73,12 @@ func NewService(
}
}
// SetSchedulerNotifier sets the optional scheduler notifier for incremental
// queue updates on domain creation/deletion.
func (s *Service) SetSchedulerNotifier(notifier SchedulerDomainNotifier) {
s.schedulerNotifier = notifier
}
// CreateDomain creates a new domain for the given user.
func (s *Service) CreateDomain(ctx context.Context, user *happydns.User, uz *happydns.Domain) error {
uz, err := happydns.NewDomain(user, uz.DomainName, uz.ProviderId)
@ -93,6 +107,10 @@ func (s *Service) CreateDomain(ctx context.Context, user *happydns.User, uz *hap
s.domainLogAppender.AppendDomainLog(uz, happydns.NewDomainLog(user, happydns.LOG_INFO, fmt.Sprintf("Domain name %s added.", uz.DomainName)))
}
if s.schedulerNotifier != nil {
s.schedulerNotifier.NotifyDomainChange(uz)
}
return nil
}
@ -194,5 +212,9 @@ func (s *Service) DeleteDomain(domainID happydns.Identifier) error {
}
}
if s.schedulerNotifier != nil {
s.schedulerNotifier.NotifyDomainRemoved(domainID)
}
return nil
}

View file

@ -54,6 +54,12 @@ type ZoneCorrector interface {
ListZoneCorrections(ctx context.Context, provider *happydns.Provider, domain *happydns.Domain, records []happydns.Record) ([]*happydns.Correction, int, error)
}
// SchedulerDomainNotifier is an optional callback to notify the scheduler
// about domain changes so it can incrementally update its job queue.
type SchedulerDomainNotifier interface {
NotifyDomainChange(domain *happydns.Domain)
}
// Orchestrator aggregates the use-cases that together implement the DNS zone
// lifecycle: importing zones from a provider, listing required corrections, and
// applying those corrections back to the provider.
@ -91,3 +97,10 @@ func NewOrchestrator(
ZoneImporter: zoneImporter,
}
}
// SetSchedulerNotifier sets the optional scheduler notifier on the
// sub-usecases that create or publish zones.
func (o *Orchestrator) SetSchedulerNotifier(notifier SchedulerDomainNotifier) {
o.RemoteZoneImporter.schedulerNotifier = notifier
o.ZoneCorrectionApplier.schedulerNotifier = notifier
}

View file

@ -34,10 +34,11 @@ import (
// from the provider and delegates to ZoneImporterUsecase to persist them. It
// also appends a domain log entry on success.
type RemoteZoneImporterUsecase struct {
appendDomainLog domainlogUC.DomainLogAppender
providerService ProviderGetter
zoneImporter happydns.ZoneImporterUsecase
zoneRetriever ZoneRetriever
appendDomainLog domainlogUC.DomainLogAppender
providerService ProviderGetter
zoneImporter happydns.ZoneImporterUsecase
zoneRetriever ZoneRetriever
schedulerNotifier SchedulerDomainNotifier
}
// NewRemoteZoneImporterUsecase creates a RemoteZoneImporterUsecase wired to
@ -79,5 +80,9 @@ func (uc *RemoteZoneImporterUsecase) Import(ctx context.Context, user *happydns.
log.Printf("unable to append domain log for %s: %s", domain.DomainName, err.Error())
}
if uc.schedulerNotifier != nil {
uc.schedulerNotifier.NotifyDomainChange(domain)
}
return myZone, nil
}

View file

@ -41,13 +41,14 @@ import (
// in the domain history. The WIP zone at ZoneHistory[0] is never modified.
type ZoneCorrectionApplierUsecase struct {
*ZoneCorrectionListerUsecase
appendDomainLog domainlogUC.DomainLogAppender
domainUpdater DomainUpdater
zoneCreator *zoneUC.CreateZoneUsecase
zoneGetter *zoneUC.GetZoneUsecase
zoneRetriever ZoneRetriever
zoneUpdater *zoneUC.UpdateZoneUsecase
clock func() time.Time
appendDomainLog domainlogUC.DomainLogAppender
domainUpdater DomainUpdater
zoneCreator *zoneUC.CreateZoneUsecase
zoneGetter *zoneUC.GetZoneUsecase
zoneRetriever ZoneRetriever
zoneUpdater *zoneUC.UpdateZoneUsecase
schedulerNotifier SchedulerDomainNotifier
clock func() time.Time
}
// NewZoneCorrectionApplierUsecase creates a ZoneCorrectionApplierUsecase with
@ -288,6 +289,10 @@ func (uc *ZoneCorrectionApplierUsecase) Apply(
log.Printf("%s: unable to update WIP zone propagation times: %s", domain.DomainName, updateErr)
}
if uc.schedulerNotifier != nil {
uc.schedulerNotifier.NotifyDomainChange(domain)
}
return snapshot, nil
}