From 5d0e31facab4ad42198b36cbfb3c2f49133dcf8c Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Tue, 10 Feb 2026 10:33:12 +0700 Subject: [PATCH] Implement tests scheduler --- internal/api/controller/service.go | 2 +- .../api/controller/testschedule_controller.go | 231 ++++++++ internal/api/route/route.go | 1 + internal/app/app.go | 37 ++ internal/app/testscheduler.go | 541 ++++++++++++++++++ internal/config/config.go | 1 + internal/storage/kvtpl/testresult.go | 53 ++ internal/usecase/testresult/storage.go | 6 + .../usecase/testresult/testresult_usecase.go | 24 +- .../testresult/testschedule_usecase.go | 285 +++++++++ model/config.go | 6 + model/errors.go | 1 + model/test_result.go | 16 + model/usecase.go | 1 + 14 files changed, 1184 insertions(+), 21 deletions(-) create mode 100644 internal/api/controller/testschedule_controller.go create mode 100644 internal/app/testscheduler.go create mode 100644 internal/usecase/testresult/testschedule_usecase.go diff --git a/internal/api/controller/service.go b/internal/api/controller/service.go index 820f19eb..93c1f900 100644 --- a/internal/api/controller/service.go +++ b/internal/api/controller/service.go @@ -93,7 +93,7 @@ func (sc *ServiceController) AddZoneService(c *gin.Context) { c.JSON(http.StatusOK, zone) } -// GetServiceService retrieves the designated Service. +// GetZoneService retrieves the designated Service. // // @Summary Get the Service. // @Schemes diff --git a/internal/api/controller/testschedule_controller.go b/internal/api/controller/testschedule_controller.go new file mode 100644 index 00000000..611286a4 --- /dev/null +++ b/internal/api/controller/testschedule_controller.go @@ -0,0 +1,231 @@ +// This file is part of the happyDomain (R) project. +// Copyright (c) 2020-2024 happyDomain +// Authors: Pierre-Olivier Mercier, et al. +// +// This program is offered under a commercial and under the AGPL license. +// For commercial licensing, contact us at . +// +// For AGPL licensing: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package controller + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + + "git.happydns.org/happyDomain/internal/api/middleware" + "git.happydns.org/happyDomain/model" +) + +// TestScheduleController handles test schedule operations +type TestScheduleController struct { + testScheduleUC happydns.TestScheduleUsecase +} + +func NewTestScheduleController(testScheduleUC happydns.TestScheduleUsecase) *TestScheduleController { + return &TestScheduleController{ + testScheduleUC: testScheduleUC, + } +} + +// ListTestSchedules retrieves schedules for the authenticated user +// +// @Summary List test schedules +// @Description Retrieves test schedules for the authenticated user with optional pagination +// @Tags test-schedules +// @Produce json +// @Param limit query int false "Maximum number of schedules to return (0 = all)" +// @Param offset query int false "Number of schedules to skip (default: 0)" +// @Success 200 {array} happydns.TestSchedule +// @Failure 500 {object} happydns.ErrorResponse +// @Router /plugins/tests/schedules [get] +func (tc *TestScheduleController) ListTestSchedules(c *gin.Context) { + user := middleware.MyUser(c) + + schedules, err := tc.testScheduleUC.ListUserSchedules(user.Id) + if err != nil { + middleware.ErrorResponse(c, http.StatusInternalServerError, err) + return + } + + // Apply pagination + limit := 0 + offset := 0 + fmt.Sscanf(c.Query("limit"), "%d", &limit) + fmt.Sscanf(c.Query("offset"), "%d", &offset) + + if offset > len(schedules) { + offset = len(schedules) + } + schedules = schedules[offset:] + if limit > 0 && len(schedules) > limit { + schedules = schedules[:limit] + } + + c.JSON(http.StatusOK, schedules) +} + +// CreateTestSchedule creates a new test schedule +// +// @Summary Create test schedule +// @Description Creates a new test schedule for the authenticated user +// @Tags test-schedules +// @Accept json +// @Produce json +// @Param body body happydns.TestSchedule true "Test schedule to create" +// @Success 201 {object} happydns.TestSchedule +// @Failure 400 {object} happydns.ErrorResponse +// @Failure 500 {object} happydns.ErrorResponse +// @Router /plugins/tests/schedules [post] +func (tc *TestScheduleController) CreateTestSchedule(c *gin.Context) { + user := middleware.MyUser(c) + + var schedule happydns.TestSchedule + if err := c.ShouldBindJSON(&schedule); err != nil { + middleware.ErrorResponse(c, http.StatusBadRequest, err) + return + } + + // Set user ID + schedule.OwnerId = user.Id + + if err := tc.testScheduleUC.CreateSchedule(&schedule); err != nil { + middleware.ErrorResponse(c, http.StatusInternalServerError, err) + return + } + + c.JSON(http.StatusCreated, schedule) +} + +// GetTestSchedule retrieves a specific schedule +// +// @Summary Get test schedule +// @Description Retrieves a specific test schedule by ID +// @Tags test-schedules +// @Produce json +// @Param schedule_id path string true "Schedule ID" +// @Success 200 {object} happydns.TestSchedule +// @Failure 404 {object} happydns.ErrorResponse +// @Failure 500 {object} happydns.ErrorResponse +// @Router /plugins/tests/schedules/{schedule_id} [get] +func (tc *TestScheduleController) GetTestSchedule(c *gin.Context) { + user := middleware.MyUser(c) + scheduleIdStr := c.Param("schedule_id") + + scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr) + if err != nil { + middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID")) + return + } + + // Verify ownership + if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil { + middleware.ErrorResponse(c, http.StatusForbidden, err) + return + } + + schedule, err := tc.testScheduleUC.GetSchedule(scheduleId) + if err != nil { + middleware.ErrorResponse(c, http.StatusNotFound, err) + return + } + + c.JSON(http.StatusOK, schedule) +} + +// UpdateTestSchedule updates an existing schedule +// +// @Summary Update test schedule +// @Description Updates an existing test schedule +// @Tags test-schedules +// @Accept json +// @Produce json +// @Param schedule_id path string true "Schedule ID" +// @Param body body happydns.TestSchedule true "Updated schedule" +// @Success 200 {object} happydns.TestSchedule +// @Failure 400 {object} happydns.ErrorResponse +// @Failure 404 {object} happydns.ErrorResponse +// @Failure 500 {object} happydns.ErrorResponse +// @Router /plugins/tests/schedules/{schedule_id} [put] +func (tc *TestScheduleController) UpdateTestSchedule(c *gin.Context) { + user := middleware.MyUser(c) + scheduleIdStr := c.Param("schedule_id") + + scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr) + if err != nil { + middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID")) + return + } + + // Verify ownership + if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil { + middleware.ErrorResponse(c, http.StatusForbidden, err) + return + } + + var schedule happydns.TestSchedule + if err := c.ShouldBindJSON(&schedule); err != nil { + middleware.ErrorResponse(c, http.StatusBadRequest, err) + return + } + + // Ensure ID matches + schedule.Id = scheduleId + schedule.OwnerId = user.Id + + if err := tc.testScheduleUC.UpdateSchedule(&schedule); err != nil { + middleware.ErrorResponse(c, http.StatusInternalServerError, err) + return + } + + c.JSON(http.StatusOK, schedule) +} + +// DeleteTestSchedule deletes a schedule +// +// @Summary Delete test schedule +// @Description Deletes a test schedule +// @Tags test-schedules +// @Produce json +// @Param schedule_id path string true "Schedule ID" +// @Success 204 "No Content" +// @Failure 404 {object} happydns.ErrorResponse +// @Failure 500 {object} happydns.ErrorResponse +// @Router /plugins/tests/schedules/{schedule_id} [delete] +func (tc *TestScheduleController) DeleteTestSchedule(c *gin.Context) { + user := middleware.MyUser(c) + scheduleIdStr := c.Param("schedule_id") + + scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr) + if err != nil { + middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID")) + return + } + + // Verify ownership + if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil { + middleware.ErrorResponse(c, http.StatusForbidden, err) + return + } + + if err := tc.testScheduleUC.DeleteSchedule(scheduleId); err != nil { + middleware.ErrorResponse(c, http.StatusInternalServerError, err) + return + } + + c.Status(http.StatusNoContent) +} diff --git a/internal/api/route/route.go b/internal/api/route/route.go index 11ffd17d..2276eb84 100644 --- a/internal/api/route/route.go +++ b/internal/api/route/route.go @@ -81,6 +81,7 @@ func DeclareRoutes(cfg *happydns.Options, router *gin.Engine, dependancies happy DeclareProviderRoutes(apiAuthRoutes, dependancies) DeclareProviderSettingsRoutes(apiAuthRoutes, dependancies) DeclareRecordRoutes(apiAuthRoutes, dependancies) + DeclareTestScheduleRoutes(apiAuthRoutes, dependancies) DeclareUsersRoutes(apiAuthRoutes, dependancies, lc) DeclareSessionRoutes(apiAuthRoutes, dependancies) } diff --git a/internal/app/app.go b/internal/app/app.go index 361f9c26..af0b8ce2 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -69,6 +69,7 @@ type Usecases struct { serviceSpecs happydns.ServiceSpecsUsecase testPlugin happydns.TestPluginUsecase testResult happydns.TestResultUsecase + testSchedule happydns.TestScheduleUsecase user happydns.UserUsecase zone happydns.ZoneUsecase zoneService happydns.ZoneServiceUsecase @@ -87,6 +88,7 @@ type App struct { router *gin.Engine srv *http.Server store storage.Storage + testScheduler controller.TestSchedulerInterface usecases Usecases } @@ -162,6 +164,10 @@ func (a *App) TestResultUsecase() happydns.TestResultUsecase { return a.usecases.testResult } +func (a *App) TestScheduleUsecase() happydns.TestScheduleUsecase { + return a.usecases.testSchedule +} + func (a *App) UserUsecase() happydns.UserUsecase { return a.usecases.user } @@ -178,6 +184,10 @@ func (a *App) ZoneUsecase() happydns.ZoneUsecase { return a.usecases.zone } +func (a *App) TestScheduler() controller.TestSchedulerInterface { + return a.testScheduler +} + func (a *App) ZoneServiceUsecase() happydns.ZoneServiceUsecase { return a.usecases.zoneService } @@ -194,6 +204,7 @@ func NewApp(cfg *happydns.Options) *App { app.initPlugins() app.initUsecases() app.initCaptcha() + app.initTestScheduler() app.setupRouter() return app @@ -210,6 +221,7 @@ func NewAppWithStorage(cfg *happydns.Options, store storage.Storage) *App { app.initPlugins() app.initUsecases() app.initCaptcha() + app.initTestScheduler() app.setupRouter() return app @@ -283,6 +295,20 @@ func (app *App) initInsights() { } } +func (app *App) initTestScheduler() { + if app.cfg.DisableScheduler { + // Use a disabled scheduler that returns clear errors + app.testScheduler = &disabledScheduler{} + return + } + + app.testScheduler = newTestScheduler( + app.cfg, + app.store, + app.usecases.testPlugin, + ) +} + func (app *App) initUsecases() { sessionService := sessionUC.NewService(app.store) authUserService := authuserUC.NewAuthUserUsecases(app.cfg, app.mailer, app.store, sessionService) @@ -312,6 +338,7 @@ func (app *App) initUsecases() { app.usecases.session = sessionService app.usecases.testPlugin = pluginUC.NewTestPluginUsecase(app.cfg, app.plugins, app.store) app.usecases.testResult = testresultUC.NewTestResultUsecase(app.store, app.cfg) + app.usecases.testSchedule = testresultUC.NewTestScheduleUsecase(app.store, app.cfg) app.usecases.orchestrator = orchestrator.NewOrchestrator( domainLogService, @@ -352,6 +379,11 @@ func (app *App) Start() { go app.insights.Run() } + // Start the test scheduler if it's the real implementation (not disabled) + if scheduler, ok := app.testScheduler.(*testScheduler); ok && scheduler != nil { + go scheduler.Run() + } + log.Printf("Public interface listening on %s\n", app.cfg.Bind) if err := app.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen: %s\n", err) @@ -377,4 +409,9 @@ func (app *App) Stop() { if app.failureTracker != nil { app.failureTracker.Close() } + + // Close the test scheduler if it's the real implementation (not disabled) + if scheduler, ok := app.testScheduler.(*testScheduler); ok && scheduler != nil { + scheduler.Close() + } } diff --git a/internal/app/testscheduler.go b/internal/app/testscheduler.go new file mode 100644 index 00000000..c1e282b4 --- /dev/null +++ b/internal/app/testscheduler.go @@ -0,0 +1,541 @@ +// This file is part of the happyDomain (R) project. +// Copyright (c) 2020-2026 happyDomain +// Authors: Pierre-Olivier Mercier, et al. +// +// This program is offered under a commercial and under the AGPL license. +// For commercial licensing, contact us at . +// +// For AGPL licensing: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package app + +import ( + "container/heap" + "context" + "fmt" + "log" + "runtime" + "sync" + "time" + + "git.happydns.org/happyDomain/internal/storage" + "git.happydns.org/happyDomain/internal/usecase/testresult" + "git.happydns.org/happyDomain/model" +) + +const ( + SchedulerCheckInterval = 1 * time.Minute // How often to check for due tests + SchedulerCleanupInterval = 24 * time.Hour // How often to clean up old executions + TestExecutionTimeout = 5 * time.Minute // Max time for a single test + MaxRetries = 3 // Max retry attempts for failed tests +) + +// Priority levels for test execution queue +const ( + PriorityOnDemand = iota // On-demand tests (highest priority) + PriorityOverdue // Overdue scheduled tests + PriorityScheduled // Regular scheduled tests +) + +// testScheduler manages background test execution +type testScheduler struct { + cfg *happydns.Options + store storage.Storage + pluginUsecase happydns.TestPluginUsecase + resultUsecase *testresult.TestResultUsecase + scheduleUsecase *testresult.TestScheduleUsecase + stop chan struct{} // closed to stop the main Run loop + stopWorkers chan struct{} // closed to stop all workers simultaneously + runNowChan chan *queueItem // on-demand items routed through the main loop + workAvail chan struct{} // non-blocking signals that queue has new work + queue *priorityQueue + activeExecutions map[string]*activeExecution + workers []*worker + mu sync.RWMutex + wg sync.WaitGroup +} + +// activeExecution tracks a running test execution +type activeExecution struct { + execution *happydns.TestExecution + cancel context.CancelFunc + startTime time.Time +} + +// queueItem represents a test execution request in the queue +type queueItem struct { + schedule *happydns.TestSchedule + execution *happydns.TestExecution + priority int + queuedAt time.Time + retries int +} + +// --- container/heap implementation for priorityQueue --- + +// priorityHeap is the underlying heap, ordered by priority then arrival time. +type priorityHeap []*queueItem + +func (h priorityHeap) Len() int { return len(h) } +func (h priorityHeap) Less(i, j int) bool { + if h[i].priority != h[j].priority { + return h[i].priority < h[j].priority + } + return h[i].queuedAt.Before(h[j].queuedAt) +} +func (h priorityHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *priorityHeap) Push(x any) { *h = append(*h, x.(*queueItem)) } +func (h *priorityHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil // avoid memory leak + *h = old[:n-1] + return x +} + +// priorityQueue is a thread-safe min-heap of queueItems. +type priorityQueue struct { + h priorityHeap + mu sync.Mutex +} + +func newPriorityQueue() *priorityQueue { + pq := &priorityQueue{} + heap.Init(&pq.h) + return pq +} + +// Push adds an item to the queue. +func (q *priorityQueue) Push(item *queueItem) { + q.mu.Lock() + defer q.mu.Unlock() + heap.Push(&q.h, item) +} + +// Pop removes and returns the highest-priority item, or nil if empty. +func (q *priorityQueue) Pop() *queueItem { + q.mu.Lock() + defer q.mu.Unlock() + if q.h.Len() == 0 { + return nil + } + return heap.Pop(&q.h).(*queueItem) +} + +// Len returns the queue length. +func (q *priorityQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.h.Len() +} + +// worker processes tests from the queue +type worker struct { + id int + scheduler *testScheduler +} + +// disabledScheduler is a no-op implementation used when scheduler is disabled +type disabledScheduler struct{} + +// TriggerOnDemandTest returns an error indicating the scheduler is disabled +func (d *disabledScheduler) TriggerOnDemandTest(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, userId happydns.Identifier, options happydns.PluginOptions) (happydns.Identifier, error) { + return happydns.Identifier{}, fmt.Errorf("test scheduler is disabled in configuration") +} + +// newTestScheduler creates a new test scheduler +func newTestScheduler( + cfg *happydns.Options, + store storage.Storage, + pluginUsecase happydns.TestPluginUsecase, +) *testScheduler { + numWorkers := cfg.TestWorkers + if numWorkers <= 0 { + numWorkers = runtime.NumCPU() + } + + scheduler := &testScheduler{ + cfg: cfg, + store: store, + pluginUsecase: pluginUsecase, + resultUsecase: testresult.NewTestResultUsecase(store, cfg), + scheduleUsecase: testresult.NewTestScheduleUsecase(store, cfg), + stop: make(chan struct{}), + stopWorkers: make(chan struct{}), + runNowChan: make(chan *queueItem, 100), + workAvail: make(chan struct{}, numWorkers), + queue: newPriorityQueue(), + activeExecutions: make(map[string]*activeExecution), + workers: make([]*worker, numWorkers), + } + + for i := 0; i < numWorkers; i++ { + scheduler.workers[i] = &worker{ + id: i, + scheduler: scheduler, + } + } + + return scheduler +} + +// enqueue pushes an item to the priority queue and wakes one idle worker. +func (s *testScheduler) enqueue(item *queueItem) { + s.queue.Push(item) + select { + case s.workAvail <- struct{}{}: + default: + // All workers are already busy or already notified; they will drain + // the queue on their own after finishing the current item. + } +} + +// Close stops the scheduler and waits for all workers to finish. +func (s *testScheduler) Close() { + log.Println("Stopping test scheduler...") + + // Unblock the main Run loop. + close(s.stop) + + // Unblock all workers simultaneously. + close(s.stopWorkers) + + // Cancel all active test executions. + s.mu.Lock() + for _, exec := range s.activeExecutions { + exec.cancel() + } + s.mu.Unlock() + + // Wait for all workers to finish their current item. + s.wg.Wait() + + log.Println("Test scheduler stopped") +} + +// Run starts the scheduler main loop. It must not be called more than once. +func (s *testScheduler) Run() { + s.mu.Lock() + s.running = true + s.mu.Unlock() + + defer func() { + s.mu.Lock() + s.running = false + s.mu.Unlock() + }() + + log.Printf("Starting test scheduler with %d workers...\n", len(s.workers)) + + // Start workers + for _, w := range s.workers { + s.wg.Add(1) + go w.run(&s.wg) + } + + // Main scheduling loop + checkTicker := time.NewTicker(SchedulerCheckInterval) + cleanupTicker := time.NewTicker(SchedulerCleanupInterval) + defer checkTicker.Stop() + defer cleanupTicker.Stop() + + // Initial check + s.checkSchedules() + + for { + select { + case <-checkTicker.C: + s.checkSchedules() + + case <-cleanupTicker.C: + s.cleanup() + + case item := <-s.runNowChan: + s.enqueue(item) + + case <-s.stop: + return + } + } +} + +// checkSchedules checks for due tests and queues them +func (s *testScheduler) checkSchedules() { + dueSchedules, err := s.scheduleUsecase.ListDueSchedules() + if err != nil { + log.Printf("Error listing due schedules: %v\n", err) + return + } + + now := time.Now() + for _, schedule := range dueSchedules { + // Determine priority based on how overdue the test is + priority := PriorityScheduled + if schedule.NextRun.Add(schedule.Interval).Before(now) { + priority = PriorityOverdue + } + + // Create execution record + execution := &happydns.TestExecution{ + ScheduleId: &schedule.Id, + PluginName: schedule.PluginName, + OwnerId: schedule.OwnerId, + TargetType: schedule.TargetType, + TargetId: schedule.TargetId, + Status: happydns.TestExecutionPending, + StartedAt: time.Now(), + Options: schedule.Options, + } + + if err := s.resultUsecase.CreateTestExecution(execution); err != nil { + log.Printf("Error creating execution for schedule %s: %v\n", schedule.Id.String(), err) + continue + } + + s.enqueue(&queueItem{ + schedule: schedule, + execution: execution, + priority: priority, + queuedAt: now, + retries: 0, + }) + } + + // Mark scheduler run + if err := s.store.TestSchedulerRun(); err != nil { + log.Printf("Error marking scheduler run: %v\n", err) + } +} + +// TriggerOnDemandTest triggers an immediate test execution. +// It creates the execution record synchronously (so the caller gets an ID back) +// and then routes the item through runNowChan so the main loop controls +// all queue insertions. +func (s *testScheduler) TriggerOnDemandTest(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, ownerId happydns.Identifier, options happydns.PluginOptions) (happydns.Identifier, error) { + schedule := &happydns.TestSchedule{ + PluginName: pluginName, + OwnerId: ownerId, + TargetType: targetType, + TargetId: targetId, + Interval: 0, // On-demand, no interval + Enabled: true, + Options: options, + } + + execution := &happydns.TestExecution{ + ScheduleId: nil, + PluginName: pluginName, + OwnerId: ownerId, + TargetType: targetType, + TargetId: targetId, + Status: happydns.TestExecutionPending, + StartedAt: time.Now(), + Options: options, + } + + if err := s.resultUsecase.CreateTestExecution(execution); err != nil { + return happydns.Identifier{}, err + } + + item := &queueItem{ + schedule: schedule, + execution: execution, + priority: PriorityOnDemand, + queuedAt: time.Now(), + retries: 0, + } + + // Route through the main loop when possible; fall back to direct enqueue + // if the channel is full so that the caller never blocks. + select { + case s.runNowChan <- item: + default: + s.enqueue(item) + } + + return execution.Id, nil +} + +// cleanup removes old execution records and expired test results +func (s *testScheduler) cleanup() { + log.Println("Running scheduler cleanup...") + + // Delete completed/failed execution records older than 7 days + if err := s.resultUsecase.DeleteCompletedExecutions(7 * 24 * time.Hour); err != nil { + log.Printf("Error cleaning up old executions: %v\n", err) + } + + // Delete test results older than the configured retention period + if err := s.resultUsecase.CleanupOldResults(); err != nil { + log.Printf("Error cleaning up old test results: %v\n", err) + } + + log.Println("Scheduler cleanup complete") +} + +// run is the worker's main loop. It drains the queue eagerly and waits for a +// workAvail signal when idle, rather than sleeping on a fixed timer. +func (w *worker) run(wg *sync.WaitGroup) { + defer wg.Done() + + log.Printf("Worker %d started\n", w.id) + + for { + // Drain: try to grab work before blocking. + if item := w.scheduler.queue.Pop(); item != nil { + w.executeTest(item) + continue + } + + // Queue is empty; wait for new work or a stop signal. + select { + case <-w.scheduler.workAvail: + // Loop back to attempt a Pop. + case <-w.scheduler.stopWorkers: + log.Printf("Worker %d stopped\n", w.id) + return + } + } +} + +// executeTest runs a test plugin and stores the result +func (w *worker) executeTest(item *queueItem) { + ctx, cancel := context.WithTimeout(context.Background(), TestExecutionTimeout) + defer cancel() + + execution := item.execution + schedule := item.schedule + + // Mark execution as running + execution.Status = happydns.TestExecutionRunning + if err := w.scheduler.resultUsecase.UpdateTestExecution(execution); err != nil { + log.Printf("Worker %d: Error updating execution status: %v\n", w.id, err) + return + } + + // Track active execution + w.scheduler.mu.Lock() + w.scheduler.activeExecutions[execution.Id.String()] = &activeExecution{ + execution: execution, + cancel: cancel, + startTime: time.Now(), + } + w.scheduler.mu.Unlock() + + defer func() { + w.scheduler.mu.Lock() + delete(w.scheduler.activeExecutions, execution.Id.String()) + w.scheduler.mu.Unlock() + }() + + // Get the plugin + plugin, err := w.scheduler.pluginUsecase.GetTestPlugin(schedule.PluginName) + if err != nil { + errMsg := fmt.Sprintf("plugin not found: %s - %v", schedule.PluginName, err) + log.Printf("Worker %d: %s\n", w.id, errMsg) + _ = w.scheduler.resultUsecase.FailTestExecution(execution.Id, errMsg) + return + } + + // Prepare metadata + meta := make(map[string]string) + meta["target_type"] = schedule.TargetType.String() + meta["target_id"] = schedule.TargetId.String() + + // Run the test + startTime := time.Now() + resultChan := make(chan *happydns.PluginResult, 1) + errorChan := make(chan error, 1) + + go func() { + defer func() { + if r := recover(); r != nil { + errorChan <- fmt.Errorf("plugin panicked: %v", r) + } + }() + result, err := plugin.RunTest(schedule.Options, meta) + if err != nil { + errorChan <- err + } else { + resultChan <- result + } + }() + + // Wait for result or timeout + var pluginResult *happydns.PluginResult + var testErr error + + select { + case pluginResult = <-resultChan: + // Test completed successfully + case testErr = <-errorChan: + // Test returned an error + case <-ctx.Done(): + // Timeout + testErr = fmt.Errorf("test execution timeout after %v", TestExecutionTimeout) + } + + duration := time.Since(startTime) + + // Store the result + result := &happydns.TestResult{ + PluginName: schedule.PluginName, + TestType: schedule.TargetType, + TargetId: schedule.TargetId, + OwnerId: schedule.OwnerId, + ExecutedAt: time.Now(), + ScheduledTest: item.execution.ScheduleId != nil, + Options: schedule.Options, + Duration: duration, + } + + if testErr != nil { + result.Status = happydns.PluginResultStatusKO + result.StatusLine = "Test execution failed" + result.Error = testErr.Error() + } else if pluginResult != nil { + result.Status = pluginResult.Status + result.StatusLine = pluginResult.StatusLine + result.Report = pluginResult.Report + } else { + result.Status = happydns.PluginResultStatusKO + result.StatusLine = "Unknown error" + result.Error = "No result or error returned from plugin" + } + + // Save the result + if err := w.scheduler.resultUsecase.CreateTestResult(result); err != nil { + log.Printf("Worker %d: Error saving test result: %v\n", w.id, err) + _ = w.scheduler.resultUsecase.FailTestExecution(execution.Id, err.Error()) + return + } + + // Complete the execution + if err := w.scheduler.resultUsecase.CompleteTestExecution(execution.Id, result.Id); err != nil { + log.Printf("Worker %d: Error completing execution: %v\n", w.id, err) + return + } + + // Update schedule if this was a scheduled test + if item.execution.ScheduleId != nil { + if err := w.scheduler.scheduleUsecase.UpdateScheduleAfterRun(*item.execution.ScheduleId); err != nil { + log.Printf("Worker %d: Error updating schedule: %v\n", w.id, err) + } + } + + log.Printf("Worker %d: Completed test %s for target %s (status: %s, duration: %v)\n", + w.id, schedule.PluginName, schedule.TargetId.String(), result.Status, duration) +} diff --git a/internal/config/config.go b/internal/config/config.go index 696e005a..d1715821 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,7 @@ func ConsolidateConfig() (opts *happydns.Options, err error) { StorageEngine: "leveldb", MaxResultsPerTest: 100, ResultRetentionDays: 90, + TestWorkers: 2, } declareFlags(opts) diff --git a/internal/storage/kvtpl/testresult.go b/internal/storage/kvtpl/testresult.go index 8afed1a6..652444eb 100644 --- a/internal/storage/kvtpl/testresult.go +++ b/internal/storage/kvtpl/testresult.go @@ -179,6 +179,31 @@ func (s *KVStorage) DeleteOldTestResults(pluginName string, targetType happydns. return nil } +// DeleteTestResultsBefore removes all test results with ExecutedAt older than cutoff +func (s *KVStorage) DeleteTestResultsBefore(cutoff time.Time) error { + iter := s.db.Search("testresult|") + defer iter.Release() + + var toDelete []string + for iter.Next() { + var r happydns.TestResult + if err := s.db.DecodeData(iter.Value(), &r); err != nil { + continue + } + if r.ExecutedAt.Before(cutoff) { + toDelete = append(toDelete, string(iter.Key())) + } + } + + for _, key := range toDelete { + if err := s.db.Delete(key); err != nil { + return err + } + } + + return nil +} + // Test Schedule storage keys: // testschedule|{schedule-id} // testschedule.byuser|{user-id}|{schedule-id} @@ -410,6 +435,34 @@ func (s *KVStorage) DeleteTestExecution(executionId happydns.Identifier) error { return s.db.Delete(key) } +// DeleteCompletedExecutionsBefore removes completed or failed execution records older than cutoff +func (s *KVStorage) DeleteCompletedExecutionsBefore(cutoff time.Time) error { + iter := s.db.Search("testexec|") + defer iter.Release() + + var toDelete []string + for iter.Next() { + var exec happydns.TestExecution + if err := s.db.DecodeData(iter.Value(), &exec); err != nil { + continue + } + if exec.Status != happydns.TestExecutionCompleted && exec.Status != happydns.TestExecutionFailed { + continue + } + if exec.CompletedAt != nil && exec.CompletedAt.Before(cutoff) { + toDelete = append(toDelete, string(iter.Key())) + } + } + + for _, key := range toDelete { + if err := s.db.Delete(key); err != nil { + return err + } + } + + return nil +} + // Scheduler state storage key: // testscheduler.lastrun diff --git a/internal/usecase/testresult/storage.go b/internal/usecase/testresult/storage.go index c70c0bd3..4ff9ba9f 100644 --- a/internal/usecase/testresult/storage.go +++ b/internal/usecase/testresult/storage.go @@ -51,6 +51,9 @@ type TestResultStorage interface { // DeleteOldTestResults removes old test results keeping only the most recent N results DeleteOldTestResults(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, keepCount int) error + // DeleteTestResultsBefore removes all test results older than the given time + DeleteTestResultsBefore(cutoff time.Time) error + // Test Schedules // ListEnabledTestSchedules retrieves all enabled schedules (for scheduler) ListEnabledTestSchedules() ([]*happydns.TestSchedule, error) @@ -89,6 +92,9 @@ type TestResultStorage interface { // DeleteTestExecution removes an execution record DeleteTestExecution(executionId happydns.Identifier) error + // DeleteCompletedExecutionsBefore removes completed or failed execution records older than the given time + DeleteCompletedExecutionsBefore(cutoff time.Time) error + // Scheduler State // TestSchedulerRun marks that the scheduler has run at current time TestSchedulerRun() error diff --git a/internal/usecase/testresult/testresult_usecase.go b/internal/usecase/testresult/testresult_usecase.go index 9bc4648c..1d51f990 100644 --- a/internal/usecase/testresult/testresult_usecase.go +++ b/internal/usecase/testresult/testresult_usecase.go @@ -120,7 +120,7 @@ func (u *TestResultUsecase) DeleteAllTestResults(pluginName string, targetType h return nil } -// CleanupOldResults removes test results older than retention period +// CleanupOldResults removes test results older than the configured retention period func (u *TestResultUsecase) CleanupOldResults() error { retentionDays := u.options.ResultRetentionDays if retentionDays <= 0 { @@ -128,16 +128,7 @@ func (u *TestResultUsecase) CleanupOldResults() error { } cutoffTime := time.Now().AddDate(0, 0, -retentionDays) - - // Get all results for all users (inefficient but necessary without a time-based index) - // In a production system, you might want to add a time-based index for this - // For now, we'll iterate through results and delete old ones - - // This is a placeholder - the actual implementation would need to be optimized - // based on specific storage patterns - _ = cutoffTime - - return nil + return u.storage.DeleteTestResultsBefore(cutoffTime) } // GetTestExecution retrieves the status of a test execution @@ -208,15 +199,8 @@ func (u *TestResultUsecase) FailTestExecution(executionId happydns.Identifier, e return u.storage.UpdateTestExecution(execution) } -// DeleteCompletedExecutions removes execution records that are completed +// DeleteCompletedExecutions removes completed or failed execution records older than olderThan func (u *TestResultUsecase) DeleteCompletedExecutions(olderThan time.Duration) error { cutoffTime := time.Now().Add(-olderThan) - - // Get active executions (this won't include completed ones) - // We need a different query to get completed executions older than cutoff - // For now, this is a placeholder - - _ = cutoffTime - - return nil + return u.storage.DeleteCompletedExecutionsBefore(cutoffTime) } diff --git a/internal/usecase/testresult/testschedule_usecase.go b/internal/usecase/testresult/testschedule_usecase.go new file mode 100644 index 00000000..2e44f54f --- /dev/null +++ b/internal/usecase/testresult/testschedule_usecase.go @@ -0,0 +1,285 @@ +// This file is part of the happyDomain (R) project. +// Copyright (c) 2020-2026 happyDomain +// Authors: Pierre-Olivier Mercier, et al. +// +// This program is offered under a commercial and under the AGPL license. +// For commercial licensing, contact us at . +// +// For AGPL licensing: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package testresult + +import ( + "fmt" + "time" + + "git.happydns.org/happyDomain/model" +) + +const ( + // Default test intervals + DefaultDomainTestInterval = 24 * time.Hour // 24 hours for domain tests + DefaultServiceTestInterval = 1 * time.Hour // 1 hour for service tests + MinimumTestInterval = 5 * time.Minute // Minimum interval allowed +) + +// TestScheduleUsecase implements business logic for test schedules +type TestScheduleUsecase struct { + storage TestResultStorage + options *happydns.Options +} + +// NewTestScheduleUsecase creates a new test schedule usecase +func NewTestScheduleUsecase(storage TestResultStorage, options *happydns.Options) *TestScheduleUsecase { + return &TestScheduleUsecase{ + storage: storage, + options: options, + } +} + +// ListUserSchedules retrieves all schedules for a specific user +func (u *TestScheduleUsecase) ListUserSchedules(userId happydns.Identifier) ([]*happydns.TestSchedule, error) { + return u.storage.ListTestSchedulesByUser(userId) +} + +// ListSchedulesByTarget retrieves all schedules for a specific target +func (u *TestScheduleUsecase) ListSchedulesByTarget(targetType happydns.TestScopeType, targetId happydns.Identifier) ([]*happydns.TestSchedule, error) { + return u.storage.ListTestSchedulesByTarget(targetType, targetId) +} + +// GetSchedule retrieves a specific schedule by ID +func (u *TestScheduleUsecase) GetSchedule(scheduleId happydns.Identifier) (*happydns.TestSchedule, error) { + return u.storage.GetTestSchedule(scheduleId) +} + +// CreateSchedule creates a new test schedule with validation +func (u *TestScheduleUsecase) CreateSchedule(schedule *happydns.TestSchedule) error { + // Validate interval + if schedule.Interval < MinimumTestInterval { + return fmt.Errorf("test interval must be at least %v", MinimumTestInterval) + } + + // Set default interval if not specified + if schedule.Interval == 0 { + schedule.Interval = u.getDefaultInterval(schedule.TargetType) + } + + // Calculate next run time + if schedule.NextRun.IsZero() { + schedule.NextRun = time.Now().Add(schedule.Interval) + } + + // Enable by default if not specified + if !schedule.Enabled { + schedule.Enabled = true + } + + return u.storage.CreateTestSchedule(schedule) +} + +// UpdateSchedule updates an existing schedule +func (u *TestScheduleUsecase) UpdateSchedule(schedule *happydns.TestSchedule) error { + // Validate interval + if schedule.Interval < MinimumTestInterval { + return fmt.Errorf("test interval must be at least %v", MinimumTestInterval) + } + + // Get existing schedule to preserve certain fields + existing, err := u.storage.GetTestSchedule(schedule.Id) + if err != nil { + return err + } + + // Preserve LastRun if not explicitly changed + if schedule.LastRun == nil { + schedule.LastRun = existing.LastRun + } + + // Recalculate next run time if interval changed + if schedule.Interval != existing.Interval { + if schedule.LastRun != nil { + schedule.NextRun = schedule.LastRun.Add(schedule.Interval) + } else { + schedule.NextRun = time.Now().Add(schedule.Interval) + } + } + + return u.storage.UpdateTestSchedule(schedule) +} + +// DeleteSchedule removes a schedule +func (u *TestScheduleUsecase) DeleteSchedule(scheduleId happydns.Identifier) error { + return u.storage.DeleteTestSchedule(scheduleId) +} + +// EnableSchedule enables a schedule +func (u *TestScheduleUsecase) EnableSchedule(scheduleId happydns.Identifier) error { + schedule, err := u.storage.GetTestSchedule(scheduleId) + if err != nil { + return err + } + + schedule.Enabled = true + + // Reset next run time if it's in the past + if schedule.NextRun.Before(time.Now()) { + schedule.NextRun = time.Now().Add(schedule.Interval) + } + + return u.storage.UpdateTestSchedule(schedule) +} + +// DisableSchedule disables a schedule +func (u *TestScheduleUsecase) DisableSchedule(scheduleId happydns.Identifier) error { + schedule, err := u.storage.GetTestSchedule(scheduleId) + if err != nil { + return err + } + + schedule.Enabled = false + return u.storage.UpdateTestSchedule(schedule) +} + +// UpdateScheduleAfterRun updates a schedule after it has been executed +func (u *TestScheduleUsecase) UpdateScheduleAfterRun(scheduleId happydns.Identifier) error { + schedule, err := u.storage.GetTestSchedule(scheduleId) + if err != nil { + return err + } + + now := time.Now() + schedule.LastRun = &now + schedule.NextRun = now.Add(schedule.Interval) + + return u.storage.UpdateTestSchedule(schedule) +} + +// ListDueSchedules retrieves all enabled schedules that are due to run +func (u *TestScheduleUsecase) ListDueSchedules() ([]*happydns.TestSchedule, error) { + schedules, err := u.storage.ListEnabledTestSchedules() + if err != nil { + return nil, err + } + + now := time.Now() + var dueSchedules []*happydns.TestSchedule + + for _, schedule := range schedules { + if schedule.Enabled && schedule.NextRun.Before(now) { + dueSchedules = append(dueSchedules, schedule) + } + } + + return dueSchedules, nil +} + +// getDefaultInterval returns the default test interval based on target type +func (u *TestScheduleUsecase) getDefaultInterval(targetType happydns.TestScopeType) time.Duration { + switch targetType { + case happydns.TestScopeDomain: + return DefaultDomainTestInterval + case happydns.TestScopeService: + return DefaultServiceTestInterval + case happydns.TestScopeZone: + return DefaultDomainTestInterval + default: + return DefaultDomainTestInterval + } +} + +// MergePluginOptions merges plugin options from different scopes +// Priority: schedule options > domain options > user options > global options +func (u *TestScheduleUsecase) MergePluginOptions( + globalOpts happydns.PluginOptions, + userOpts happydns.PluginOptions, + domainOpts happydns.PluginOptions, + scheduleOpts happydns.PluginOptions, +) happydns.PluginOptions { + merged := make(happydns.PluginOptions) + + // Start with global options + for k, v := range globalOpts { + merged[k] = v + } + + // Override with user options + for k, v := range userOpts { + merged[k] = v + } + + // Override with domain options + for k, v := range domainOpts { + merged[k] = v + } + + // Override with schedule options (highest priority) + for k, v := range scheduleOpts { + merged[k] = v + } + + return merged +} + +// ValidateScheduleOwnership checks if a user owns a schedule +func (u *TestScheduleUsecase) ValidateScheduleOwnership(scheduleId happydns.Identifier, userId happydns.Identifier) error { + schedule, err := u.storage.GetTestSchedule(scheduleId) + if err != nil { + return err + } + + if !schedule.UserId.Equals(userId) { + return fmt.Errorf("user does not own this schedule") + } + + return nil +} + +// CreateDefaultSchedulesForTarget creates default schedules for a new target +func (u *TestScheduleUsecase) CreateDefaultSchedulesForTarget( + pluginName string, + targetType happydns.TestScopeType, + targetId happydns.Identifier, + userId happydns.Identifier, + enabled bool, +) error { + schedule := &happydns.TestSchedule{ + PluginName: pluginName, + UserId: userId, + TargetType: targetType, + TargetId: targetId, + Interval: u.getDefaultInterval(targetType), + Enabled: enabled, + NextRun: time.Now().Add(u.getDefaultInterval(targetType)), + Options: make(happydns.PluginOptions), + } + + return u.CreateSchedule(schedule) +} + +// DeleteSchedulesForTarget removes all schedules for a target +func (u *TestScheduleUsecase) DeleteSchedulesForTarget(targetType happydns.TestScopeType, targetId happydns.Identifier) error { + schedules, err := u.storage.ListTestSchedulesByTarget(targetType, targetId) + if err != nil { + return err + } + + for _, schedule := range schedules { + if err := u.storage.DeleteTestSchedule(schedule.Id); err != nil { + return err + } + } + + return nil +} diff --git a/model/config.go b/model/config.go index f15ba745..fd0bbf38 100644 --- a/model/config.go +++ b/model/config.go @@ -107,6 +107,12 @@ type Options struct { // ResultRetentionDays is how long to keep test results before cleanup ResultRetentionDays int + + // TestWorkers is the number of concurrent test executions allowed + TestWorkers int + + // DisableScheduler disables the background test scheduler + DisableScheduler bool } // GetBaseURL returns the full url to the absolute ExternalURL, including BaseURL. diff --git a/model/errors.go b/model/errors.go index 2b7efc11..fdc964ac 100644 --- a/model/errors.go +++ b/model/errors.go @@ -34,6 +34,7 @@ var ( ErrSessionNotFound = errors.New("session not found") ErrTestExecutionNotFound = errors.New("test execution not found") ErrTestResultNotFound = errors.New("test result not found") + ErrTestScheduleNotFound = errors.New("test schedule not found") ErrUserNotFound = errors.New("user not found") ErrUserAlreadyExist = errors.New("user already exists") ErrZoneNotFound = errors.New("zone not found") diff --git a/model/test_result.go b/model/test_result.go index dfc02431..8b9af544 100644 --- a/model/test_result.go +++ b/model/test_result.go @@ -264,4 +264,20 @@ type TestScheduleUsecase interface { // DeleteSchedulesForTarget removes all schedules for a target DeleteSchedulesForTarget(targetType TestScopeType, targetId Identifier) error + + // RescheduleUpcomingTests randomizes next run times for all enabled schedules + // within their respective intervals to spread load evenly. + RescheduleUpcomingTests() (int, error) + + // RescheduleOverdueTests reschedules overdue tests to run soon, spread over a + // short window to avoid scheduler famine after a suspend or server restart. + RescheduleOverdueTests() (int, error) +} + +// AdminSchedulerUsecase is satisfied by both testScheduler and disabledScheduler +type AdminSchedulerUsecase interface { + TriggerOnDemandTest(pluginName string, targetType TestScopeType, targetID Identifier, userID Identifier, options PluginOptions) (Identifier, error) + GetSchedulerStatus() SchedulerStatus + SetEnabled(enabled bool) error + RescheduleUpcomingTests() (int, error) } diff --git a/model/usecase.go b/model/usecase.go index 64fd3092..6c7b5d39 100644 --- a/model/usecase.go +++ b/model/usecase.go @@ -38,6 +38,7 @@ type UsecaseDependancies interface { SessionUsecase() SessionUsecase TestPluginUsecase() TestPluginUsecase TestResultUsecase() TestResultUsecase + TestScheduleUsecase() TestScheduleUsecase UserUsecase() UserUsecase ZoneCorrectionApplierUsecase() ZoneCorrectionApplierUsecase ZoneImporterUsecase() ZoneImporterUsecase