Implement tests scheduler

This commit is contained in:
nemunaire 2026-02-10 10:33:12 +07:00
commit 5d0e31faca
14 changed files with 1184 additions and 21 deletions

View file

@ -93,7 +93,7 @@ func (sc *ServiceController) AddZoneService(c *gin.Context) {
c.JSON(http.StatusOK, zone)
}
// GetServiceService retrieves the designated Service.
// GetZoneService retrieves the designated Service.
//
// @Summary Get the Service.
// @Schemes

View file

@ -0,0 +1,231 @@
// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2024 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package controller
import (
"fmt"
"net/http"
"github.com/gin-gonic/gin"
"git.happydns.org/happyDomain/internal/api/middleware"
"git.happydns.org/happyDomain/model"
)
// TestScheduleController handles test schedule operations
type TestScheduleController struct {
testScheduleUC happydns.TestScheduleUsecase
}
func NewTestScheduleController(testScheduleUC happydns.TestScheduleUsecase) *TestScheduleController {
return &TestScheduleController{
testScheduleUC: testScheduleUC,
}
}
// ListTestSchedules retrieves schedules for the authenticated user
//
// @Summary List test schedules
// @Description Retrieves test schedules for the authenticated user with optional pagination
// @Tags test-schedules
// @Produce json
// @Param limit query int false "Maximum number of schedules to return (0 = all)"
// @Param offset query int false "Number of schedules to skip (default: 0)"
// @Success 200 {array} happydns.TestSchedule
// @Failure 500 {object} happydns.ErrorResponse
// @Router /plugins/tests/schedules [get]
func (tc *TestScheduleController) ListTestSchedules(c *gin.Context) {
user := middleware.MyUser(c)
schedules, err := tc.testScheduleUC.ListUserSchedules(user.Id)
if err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
return
}
// Apply pagination
limit := 0
offset := 0
fmt.Sscanf(c.Query("limit"), "%d", &limit)
fmt.Sscanf(c.Query("offset"), "%d", &offset)
if offset > len(schedules) {
offset = len(schedules)
}
schedules = schedules[offset:]
if limit > 0 && len(schedules) > limit {
schedules = schedules[:limit]
}
c.JSON(http.StatusOK, schedules)
}
// CreateTestSchedule creates a new test schedule
//
// @Summary Create test schedule
// @Description Creates a new test schedule for the authenticated user
// @Tags test-schedules
// @Accept json
// @Produce json
// @Param body body happydns.TestSchedule true "Test schedule to create"
// @Success 201 {object} happydns.TestSchedule
// @Failure 400 {object} happydns.ErrorResponse
// @Failure 500 {object} happydns.ErrorResponse
// @Router /plugins/tests/schedules [post]
func (tc *TestScheduleController) CreateTestSchedule(c *gin.Context) {
user := middleware.MyUser(c)
var schedule happydns.TestSchedule
if err := c.ShouldBindJSON(&schedule); err != nil {
middleware.ErrorResponse(c, http.StatusBadRequest, err)
return
}
// Set user ID
schedule.OwnerId = user.Id
if err := tc.testScheduleUC.CreateSchedule(&schedule); err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusCreated, schedule)
}
// GetTestSchedule retrieves a specific schedule
//
// @Summary Get test schedule
// @Description Retrieves a specific test schedule by ID
// @Tags test-schedules
// @Produce json
// @Param schedule_id path string true "Schedule ID"
// @Success 200 {object} happydns.TestSchedule
// @Failure 404 {object} happydns.ErrorResponse
// @Failure 500 {object} happydns.ErrorResponse
// @Router /plugins/tests/schedules/{schedule_id} [get]
func (tc *TestScheduleController) GetTestSchedule(c *gin.Context) {
user := middleware.MyUser(c)
scheduleIdStr := c.Param("schedule_id")
scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr)
if err != nil {
middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID"))
return
}
// Verify ownership
if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil {
middleware.ErrorResponse(c, http.StatusForbidden, err)
return
}
schedule, err := tc.testScheduleUC.GetSchedule(scheduleId)
if err != nil {
middleware.ErrorResponse(c, http.StatusNotFound, err)
return
}
c.JSON(http.StatusOK, schedule)
}
// UpdateTestSchedule updates an existing schedule
//
// @Summary Update test schedule
// @Description Updates an existing test schedule
// @Tags test-schedules
// @Accept json
// @Produce json
// @Param schedule_id path string true "Schedule ID"
// @Param body body happydns.TestSchedule true "Updated schedule"
// @Success 200 {object} happydns.TestSchedule
// @Failure 400 {object} happydns.ErrorResponse
// @Failure 404 {object} happydns.ErrorResponse
// @Failure 500 {object} happydns.ErrorResponse
// @Router /plugins/tests/schedules/{schedule_id} [put]
func (tc *TestScheduleController) UpdateTestSchedule(c *gin.Context) {
user := middleware.MyUser(c)
scheduleIdStr := c.Param("schedule_id")
scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr)
if err != nil {
middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID"))
return
}
// Verify ownership
if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil {
middleware.ErrorResponse(c, http.StatusForbidden, err)
return
}
var schedule happydns.TestSchedule
if err := c.ShouldBindJSON(&schedule); err != nil {
middleware.ErrorResponse(c, http.StatusBadRequest, err)
return
}
// Ensure ID matches
schedule.Id = scheduleId
schedule.OwnerId = user.Id
if err := tc.testScheduleUC.UpdateSchedule(&schedule); err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, schedule)
}
// DeleteTestSchedule deletes a schedule
//
// @Summary Delete test schedule
// @Description Deletes a test schedule
// @Tags test-schedules
// @Produce json
// @Param schedule_id path string true "Schedule ID"
// @Success 204 "No Content"
// @Failure 404 {object} happydns.ErrorResponse
// @Failure 500 {object} happydns.ErrorResponse
// @Router /plugins/tests/schedules/{schedule_id} [delete]
func (tc *TestScheduleController) DeleteTestSchedule(c *gin.Context) {
user := middleware.MyUser(c)
scheduleIdStr := c.Param("schedule_id")
scheduleId, err := happydns.NewIdentifierFromString(scheduleIdStr)
if err != nil {
middleware.ErrorResponse(c, http.StatusBadRequest, fmt.Errorf("invalid schedule ID"))
return
}
// Verify ownership
if err := tc.testScheduleUC.ValidateScheduleOwnership(scheduleId, user.Id); err != nil {
middleware.ErrorResponse(c, http.StatusForbidden, err)
return
}
if err := tc.testScheduleUC.DeleteSchedule(scheduleId); err != nil {
middleware.ErrorResponse(c, http.StatusInternalServerError, err)
return
}
c.Status(http.StatusNoContent)
}

View file

@ -81,6 +81,7 @@ func DeclareRoutes(cfg *happydns.Options, router *gin.Engine, dependancies happy
DeclareProviderRoutes(apiAuthRoutes, dependancies)
DeclareProviderSettingsRoutes(apiAuthRoutes, dependancies)
DeclareRecordRoutes(apiAuthRoutes, dependancies)
DeclareTestScheduleRoutes(apiAuthRoutes, dependancies)
DeclareUsersRoutes(apiAuthRoutes, dependancies, lc)
DeclareSessionRoutes(apiAuthRoutes, dependancies)
}

View file

@ -69,6 +69,7 @@ type Usecases struct {
serviceSpecs happydns.ServiceSpecsUsecase
testPlugin happydns.TestPluginUsecase
testResult happydns.TestResultUsecase
testSchedule happydns.TestScheduleUsecase
user happydns.UserUsecase
zone happydns.ZoneUsecase
zoneService happydns.ZoneServiceUsecase
@ -87,6 +88,7 @@ type App struct {
router *gin.Engine
srv *http.Server
store storage.Storage
testScheduler controller.TestSchedulerInterface
usecases Usecases
}
@ -162,6 +164,10 @@ func (a *App) TestResultUsecase() happydns.TestResultUsecase {
return a.usecases.testResult
}
func (a *App) TestScheduleUsecase() happydns.TestScheduleUsecase {
return a.usecases.testSchedule
}
func (a *App) UserUsecase() happydns.UserUsecase {
return a.usecases.user
}
@ -178,6 +184,10 @@ func (a *App) ZoneUsecase() happydns.ZoneUsecase {
return a.usecases.zone
}
func (a *App) TestScheduler() controller.TestSchedulerInterface {
return a.testScheduler
}
func (a *App) ZoneServiceUsecase() happydns.ZoneServiceUsecase {
return a.usecases.zoneService
}
@ -194,6 +204,7 @@ func NewApp(cfg *happydns.Options) *App {
app.initPlugins()
app.initUsecases()
app.initCaptcha()
app.initTestScheduler()
app.setupRouter()
return app
@ -210,6 +221,7 @@ func NewAppWithStorage(cfg *happydns.Options, store storage.Storage) *App {
app.initPlugins()
app.initUsecases()
app.initCaptcha()
app.initTestScheduler()
app.setupRouter()
return app
@ -283,6 +295,20 @@ func (app *App) initInsights() {
}
}
func (app *App) initTestScheduler() {
if app.cfg.DisableScheduler {
// Use a disabled scheduler that returns clear errors
app.testScheduler = &disabledScheduler{}
return
}
app.testScheduler = newTestScheduler(
app.cfg,
app.store,
app.usecases.testPlugin,
)
}
func (app *App) initUsecases() {
sessionService := sessionUC.NewService(app.store)
authUserService := authuserUC.NewAuthUserUsecases(app.cfg, app.mailer, app.store, sessionService)
@ -312,6 +338,7 @@ func (app *App) initUsecases() {
app.usecases.session = sessionService
app.usecases.testPlugin = pluginUC.NewTestPluginUsecase(app.cfg, app.plugins, app.store)
app.usecases.testResult = testresultUC.NewTestResultUsecase(app.store, app.cfg)
app.usecases.testSchedule = testresultUC.NewTestScheduleUsecase(app.store, app.cfg)
app.usecases.orchestrator = orchestrator.NewOrchestrator(
domainLogService,
@ -352,6 +379,11 @@ func (app *App) Start() {
go app.insights.Run()
}
// Start the test scheduler if it's the real implementation (not disabled)
if scheduler, ok := app.testScheduler.(*testScheduler); ok && scheduler != nil {
go scheduler.Run()
}
log.Printf("Public interface listening on %s\n", app.cfg.Bind)
if err := app.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen: %s\n", err)
@ -377,4 +409,9 @@ func (app *App) Stop() {
if app.failureTracker != nil {
app.failureTracker.Close()
}
// Close the test scheduler if it's the real implementation (not disabled)
if scheduler, ok := app.testScheduler.(*testScheduler); ok && scheduler != nil {
scheduler.Close()
}
}

View file

@ -0,0 +1,541 @@
// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2026 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package app
import (
"container/heap"
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
"git.happydns.org/happyDomain/internal/storage"
"git.happydns.org/happyDomain/internal/usecase/testresult"
"git.happydns.org/happyDomain/model"
)
const (
SchedulerCheckInterval = 1 * time.Minute // How often to check for due tests
SchedulerCleanupInterval = 24 * time.Hour // How often to clean up old executions
TestExecutionTimeout = 5 * time.Minute // Max time for a single test
MaxRetries = 3 // Max retry attempts for failed tests
)
// Priority levels for test execution queue
const (
PriorityOnDemand = iota // On-demand tests (highest priority)
PriorityOverdue // Overdue scheduled tests
PriorityScheduled // Regular scheduled tests
)
// testScheduler manages background test execution
type testScheduler struct {
cfg *happydns.Options
store storage.Storage
pluginUsecase happydns.TestPluginUsecase
resultUsecase *testresult.TestResultUsecase
scheduleUsecase *testresult.TestScheduleUsecase
stop chan struct{} // closed to stop the main Run loop
stopWorkers chan struct{} // closed to stop all workers simultaneously
runNowChan chan *queueItem // on-demand items routed through the main loop
workAvail chan struct{} // non-blocking signals that queue has new work
queue *priorityQueue
activeExecutions map[string]*activeExecution
workers []*worker
mu sync.RWMutex
wg sync.WaitGroup
}
// activeExecution tracks a running test execution
type activeExecution struct {
execution *happydns.TestExecution
cancel context.CancelFunc
startTime time.Time
}
// queueItem represents a test execution request in the queue
type queueItem struct {
schedule *happydns.TestSchedule
execution *happydns.TestExecution
priority int
queuedAt time.Time
retries int
}
// --- container/heap implementation for priorityQueue ---
// priorityHeap is the underlying heap, ordered by priority then arrival time.
type priorityHeap []*queueItem
func (h priorityHeap) Len() int { return len(h) }
func (h priorityHeap) Less(i, j int) bool {
if h[i].priority != h[j].priority {
return h[i].priority < h[j].priority
}
return h[i].queuedAt.Before(h[j].queuedAt)
}
func (h priorityHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *priorityHeap) Push(x any) { *h = append(*h, x.(*queueItem)) }
func (h *priorityHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil // avoid memory leak
*h = old[:n-1]
return x
}
// priorityQueue is a thread-safe min-heap of queueItems.
type priorityQueue struct {
h priorityHeap
mu sync.Mutex
}
func newPriorityQueue() *priorityQueue {
pq := &priorityQueue{}
heap.Init(&pq.h)
return pq
}
// Push adds an item to the queue.
func (q *priorityQueue) Push(item *queueItem) {
q.mu.Lock()
defer q.mu.Unlock()
heap.Push(&q.h, item)
}
// Pop removes and returns the highest-priority item, or nil if empty.
func (q *priorityQueue) Pop() *queueItem {
q.mu.Lock()
defer q.mu.Unlock()
if q.h.Len() == 0 {
return nil
}
return heap.Pop(&q.h).(*queueItem)
}
// Len returns the queue length.
func (q *priorityQueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.h.Len()
}
// worker processes tests from the queue
type worker struct {
id int
scheduler *testScheduler
}
// disabledScheduler is a no-op implementation used when scheduler is disabled
type disabledScheduler struct{}
// TriggerOnDemandTest returns an error indicating the scheduler is disabled
func (d *disabledScheduler) TriggerOnDemandTest(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, userId happydns.Identifier, options happydns.PluginOptions) (happydns.Identifier, error) {
return happydns.Identifier{}, fmt.Errorf("test scheduler is disabled in configuration")
}
// newTestScheduler creates a new test scheduler
func newTestScheduler(
cfg *happydns.Options,
store storage.Storage,
pluginUsecase happydns.TestPluginUsecase,
) *testScheduler {
numWorkers := cfg.TestWorkers
if numWorkers <= 0 {
numWorkers = runtime.NumCPU()
}
scheduler := &testScheduler{
cfg: cfg,
store: store,
pluginUsecase: pluginUsecase,
resultUsecase: testresult.NewTestResultUsecase(store, cfg),
scheduleUsecase: testresult.NewTestScheduleUsecase(store, cfg),
stop: make(chan struct{}),
stopWorkers: make(chan struct{}),
runNowChan: make(chan *queueItem, 100),
workAvail: make(chan struct{}, numWorkers),
queue: newPriorityQueue(),
activeExecutions: make(map[string]*activeExecution),
workers: make([]*worker, numWorkers),
}
for i := 0; i < numWorkers; i++ {
scheduler.workers[i] = &worker{
id: i,
scheduler: scheduler,
}
}
return scheduler
}
// enqueue pushes an item to the priority queue and wakes one idle worker.
func (s *testScheduler) enqueue(item *queueItem) {
s.queue.Push(item)
select {
case s.workAvail <- struct{}{}:
default:
// All workers are already busy or already notified; they will drain
// the queue on their own after finishing the current item.
}
}
// Close stops the scheduler and waits for all workers to finish.
func (s *testScheduler) Close() {
log.Println("Stopping test scheduler...")
// Unblock the main Run loop.
close(s.stop)
// Unblock all workers simultaneously.
close(s.stopWorkers)
// Cancel all active test executions.
s.mu.Lock()
for _, exec := range s.activeExecutions {
exec.cancel()
}
s.mu.Unlock()
// Wait for all workers to finish their current item.
s.wg.Wait()
log.Println("Test scheduler stopped")
}
// Run starts the scheduler main loop. It must not be called more than once.
func (s *testScheduler) Run() {
s.mu.Lock()
s.running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.running = false
s.mu.Unlock()
}()
log.Printf("Starting test scheduler with %d workers...\n", len(s.workers))
// Start workers
for _, w := range s.workers {
s.wg.Add(1)
go w.run(&s.wg)
}
// Main scheduling loop
checkTicker := time.NewTicker(SchedulerCheckInterval)
cleanupTicker := time.NewTicker(SchedulerCleanupInterval)
defer checkTicker.Stop()
defer cleanupTicker.Stop()
// Initial check
s.checkSchedules()
for {
select {
case <-checkTicker.C:
s.checkSchedules()
case <-cleanupTicker.C:
s.cleanup()
case item := <-s.runNowChan:
s.enqueue(item)
case <-s.stop:
return
}
}
}
// checkSchedules checks for due tests and queues them
func (s *testScheduler) checkSchedules() {
dueSchedules, err := s.scheduleUsecase.ListDueSchedules()
if err != nil {
log.Printf("Error listing due schedules: %v\n", err)
return
}
now := time.Now()
for _, schedule := range dueSchedules {
// Determine priority based on how overdue the test is
priority := PriorityScheduled
if schedule.NextRun.Add(schedule.Interval).Before(now) {
priority = PriorityOverdue
}
// Create execution record
execution := &happydns.TestExecution{
ScheduleId: &schedule.Id,
PluginName: schedule.PluginName,
OwnerId: schedule.OwnerId,
TargetType: schedule.TargetType,
TargetId: schedule.TargetId,
Status: happydns.TestExecutionPending,
StartedAt: time.Now(),
Options: schedule.Options,
}
if err := s.resultUsecase.CreateTestExecution(execution); err != nil {
log.Printf("Error creating execution for schedule %s: %v\n", schedule.Id.String(), err)
continue
}
s.enqueue(&queueItem{
schedule: schedule,
execution: execution,
priority: priority,
queuedAt: now,
retries: 0,
})
}
// Mark scheduler run
if err := s.store.TestSchedulerRun(); err != nil {
log.Printf("Error marking scheduler run: %v\n", err)
}
}
// TriggerOnDemandTest triggers an immediate test execution.
// It creates the execution record synchronously (so the caller gets an ID back)
// and then routes the item through runNowChan so the main loop controls
// all queue insertions.
func (s *testScheduler) TriggerOnDemandTest(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, ownerId happydns.Identifier, options happydns.PluginOptions) (happydns.Identifier, error) {
schedule := &happydns.TestSchedule{
PluginName: pluginName,
OwnerId: ownerId,
TargetType: targetType,
TargetId: targetId,
Interval: 0, // On-demand, no interval
Enabled: true,
Options: options,
}
execution := &happydns.TestExecution{
ScheduleId: nil,
PluginName: pluginName,
OwnerId: ownerId,
TargetType: targetType,
TargetId: targetId,
Status: happydns.TestExecutionPending,
StartedAt: time.Now(),
Options: options,
}
if err := s.resultUsecase.CreateTestExecution(execution); err != nil {
return happydns.Identifier{}, err
}
item := &queueItem{
schedule: schedule,
execution: execution,
priority: PriorityOnDemand,
queuedAt: time.Now(),
retries: 0,
}
// Route through the main loop when possible; fall back to direct enqueue
// if the channel is full so that the caller never blocks.
select {
case s.runNowChan <- item:
default:
s.enqueue(item)
}
return execution.Id, nil
}
// cleanup removes old execution records and expired test results
func (s *testScheduler) cleanup() {
log.Println("Running scheduler cleanup...")
// Delete completed/failed execution records older than 7 days
if err := s.resultUsecase.DeleteCompletedExecutions(7 * 24 * time.Hour); err != nil {
log.Printf("Error cleaning up old executions: %v\n", err)
}
// Delete test results older than the configured retention period
if err := s.resultUsecase.CleanupOldResults(); err != nil {
log.Printf("Error cleaning up old test results: %v\n", err)
}
log.Println("Scheduler cleanup complete")
}
// run is the worker's main loop. It drains the queue eagerly and waits for a
// workAvail signal when idle, rather than sleeping on a fixed timer.
func (w *worker) run(wg *sync.WaitGroup) {
defer wg.Done()
log.Printf("Worker %d started\n", w.id)
for {
// Drain: try to grab work before blocking.
if item := w.scheduler.queue.Pop(); item != nil {
w.executeTest(item)
continue
}
// Queue is empty; wait for new work or a stop signal.
select {
case <-w.scheduler.workAvail:
// Loop back to attempt a Pop.
case <-w.scheduler.stopWorkers:
log.Printf("Worker %d stopped\n", w.id)
return
}
}
}
// executeTest runs a test plugin and stores the result
func (w *worker) executeTest(item *queueItem) {
ctx, cancel := context.WithTimeout(context.Background(), TestExecutionTimeout)
defer cancel()
execution := item.execution
schedule := item.schedule
// Mark execution as running
execution.Status = happydns.TestExecutionRunning
if err := w.scheduler.resultUsecase.UpdateTestExecution(execution); err != nil {
log.Printf("Worker %d: Error updating execution status: %v\n", w.id, err)
return
}
// Track active execution
w.scheduler.mu.Lock()
w.scheduler.activeExecutions[execution.Id.String()] = &activeExecution{
execution: execution,
cancel: cancel,
startTime: time.Now(),
}
w.scheduler.mu.Unlock()
defer func() {
w.scheduler.mu.Lock()
delete(w.scheduler.activeExecutions, execution.Id.String())
w.scheduler.mu.Unlock()
}()
// Get the plugin
plugin, err := w.scheduler.pluginUsecase.GetTestPlugin(schedule.PluginName)
if err != nil {
errMsg := fmt.Sprintf("plugin not found: %s - %v", schedule.PluginName, err)
log.Printf("Worker %d: %s\n", w.id, errMsg)
_ = w.scheduler.resultUsecase.FailTestExecution(execution.Id, errMsg)
return
}
// Prepare metadata
meta := make(map[string]string)
meta["target_type"] = schedule.TargetType.String()
meta["target_id"] = schedule.TargetId.String()
// Run the test
startTime := time.Now()
resultChan := make(chan *happydns.PluginResult, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errorChan <- fmt.Errorf("plugin panicked: %v", r)
}
}()
result, err := plugin.RunTest(schedule.Options, meta)
if err != nil {
errorChan <- err
} else {
resultChan <- result
}
}()
// Wait for result or timeout
var pluginResult *happydns.PluginResult
var testErr error
select {
case pluginResult = <-resultChan:
// Test completed successfully
case testErr = <-errorChan:
// Test returned an error
case <-ctx.Done():
// Timeout
testErr = fmt.Errorf("test execution timeout after %v", TestExecutionTimeout)
}
duration := time.Since(startTime)
// Store the result
result := &happydns.TestResult{
PluginName: schedule.PluginName,
TestType: schedule.TargetType,
TargetId: schedule.TargetId,
OwnerId: schedule.OwnerId,
ExecutedAt: time.Now(),
ScheduledTest: item.execution.ScheduleId != nil,
Options: schedule.Options,
Duration: duration,
}
if testErr != nil {
result.Status = happydns.PluginResultStatusKO
result.StatusLine = "Test execution failed"
result.Error = testErr.Error()
} else if pluginResult != nil {
result.Status = pluginResult.Status
result.StatusLine = pluginResult.StatusLine
result.Report = pluginResult.Report
} else {
result.Status = happydns.PluginResultStatusKO
result.StatusLine = "Unknown error"
result.Error = "No result or error returned from plugin"
}
// Save the result
if err := w.scheduler.resultUsecase.CreateTestResult(result); err != nil {
log.Printf("Worker %d: Error saving test result: %v\n", w.id, err)
_ = w.scheduler.resultUsecase.FailTestExecution(execution.Id, err.Error())
return
}
// Complete the execution
if err := w.scheduler.resultUsecase.CompleteTestExecution(execution.Id, result.Id); err != nil {
log.Printf("Worker %d: Error completing execution: %v\n", w.id, err)
return
}
// Update schedule if this was a scheduled test
if item.execution.ScheduleId != nil {
if err := w.scheduler.scheduleUsecase.UpdateScheduleAfterRun(*item.execution.ScheduleId); err != nil {
log.Printf("Worker %d: Error updating schedule: %v\n", w.id, err)
}
}
log.Printf("Worker %d: Completed test %s for target %s (status: %s, duration: %v)\n",
w.id, schedule.PluginName, schedule.TargetId.String(), result.Status, duration)
}

View file

@ -55,6 +55,7 @@ func ConsolidateConfig() (opts *happydns.Options, err error) {
StorageEngine: "leveldb",
MaxResultsPerTest: 100,
ResultRetentionDays: 90,
TestWorkers: 2,
}
declareFlags(opts)

View file

@ -179,6 +179,31 @@ func (s *KVStorage) DeleteOldTestResults(pluginName string, targetType happydns.
return nil
}
// DeleteTestResultsBefore removes all test results with ExecutedAt older than cutoff
func (s *KVStorage) DeleteTestResultsBefore(cutoff time.Time) error {
iter := s.db.Search("testresult|")
defer iter.Release()
var toDelete []string
for iter.Next() {
var r happydns.TestResult
if err := s.db.DecodeData(iter.Value(), &r); err != nil {
continue
}
if r.ExecutedAt.Before(cutoff) {
toDelete = append(toDelete, string(iter.Key()))
}
}
for _, key := range toDelete {
if err := s.db.Delete(key); err != nil {
return err
}
}
return nil
}
// Test Schedule storage keys:
// testschedule|{schedule-id}
// testschedule.byuser|{user-id}|{schedule-id}
@ -410,6 +435,34 @@ func (s *KVStorage) DeleteTestExecution(executionId happydns.Identifier) error {
return s.db.Delete(key)
}
// DeleteCompletedExecutionsBefore removes completed or failed execution records older than cutoff
func (s *KVStorage) DeleteCompletedExecutionsBefore(cutoff time.Time) error {
iter := s.db.Search("testexec|")
defer iter.Release()
var toDelete []string
for iter.Next() {
var exec happydns.TestExecution
if err := s.db.DecodeData(iter.Value(), &exec); err != nil {
continue
}
if exec.Status != happydns.TestExecutionCompleted && exec.Status != happydns.TestExecutionFailed {
continue
}
if exec.CompletedAt != nil && exec.CompletedAt.Before(cutoff) {
toDelete = append(toDelete, string(iter.Key()))
}
}
for _, key := range toDelete {
if err := s.db.Delete(key); err != nil {
return err
}
}
return nil
}
// Scheduler state storage key:
// testscheduler.lastrun

View file

@ -51,6 +51,9 @@ type TestResultStorage interface {
// DeleteOldTestResults removes old test results keeping only the most recent N results
DeleteOldTestResults(pluginName string, targetType happydns.TestScopeType, targetId happydns.Identifier, keepCount int) error
// DeleteTestResultsBefore removes all test results older than the given time
DeleteTestResultsBefore(cutoff time.Time) error
// Test Schedules
// ListEnabledTestSchedules retrieves all enabled schedules (for scheduler)
ListEnabledTestSchedules() ([]*happydns.TestSchedule, error)
@ -89,6 +92,9 @@ type TestResultStorage interface {
// DeleteTestExecution removes an execution record
DeleteTestExecution(executionId happydns.Identifier) error
// DeleteCompletedExecutionsBefore removes completed or failed execution records older than the given time
DeleteCompletedExecutionsBefore(cutoff time.Time) error
// Scheduler State
// TestSchedulerRun marks that the scheduler has run at current time
TestSchedulerRun() error

View file

@ -120,7 +120,7 @@ func (u *TestResultUsecase) DeleteAllTestResults(pluginName string, targetType h
return nil
}
// CleanupOldResults removes test results older than retention period
// CleanupOldResults removes test results older than the configured retention period
func (u *TestResultUsecase) CleanupOldResults() error {
retentionDays := u.options.ResultRetentionDays
if retentionDays <= 0 {
@ -128,16 +128,7 @@ func (u *TestResultUsecase) CleanupOldResults() error {
}
cutoffTime := time.Now().AddDate(0, 0, -retentionDays)
// Get all results for all users (inefficient but necessary without a time-based index)
// In a production system, you might want to add a time-based index for this
// For now, we'll iterate through results and delete old ones
// This is a placeholder - the actual implementation would need to be optimized
// based on specific storage patterns
_ = cutoffTime
return nil
return u.storage.DeleteTestResultsBefore(cutoffTime)
}
// GetTestExecution retrieves the status of a test execution
@ -208,15 +199,8 @@ func (u *TestResultUsecase) FailTestExecution(executionId happydns.Identifier, e
return u.storage.UpdateTestExecution(execution)
}
// DeleteCompletedExecutions removes execution records that are completed
// DeleteCompletedExecutions removes completed or failed execution records older than olderThan
func (u *TestResultUsecase) DeleteCompletedExecutions(olderThan time.Duration) error {
cutoffTime := time.Now().Add(-olderThan)
// Get active executions (this won't include completed ones)
// We need a different query to get completed executions older than cutoff
// For now, this is a placeholder
_ = cutoffTime
return nil
return u.storage.DeleteCompletedExecutionsBefore(cutoffTime)
}

View file

@ -0,0 +1,285 @@
// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2026 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package testresult
import (
"fmt"
"time"
"git.happydns.org/happyDomain/model"
)
const (
// Default test intervals
DefaultDomainTestInterval = 24 * time.Hour // 24 hours for domain tests
DefaultServiceTestInterval = 1 * time.Hour // 1 hour for service tests
MinimumTestInterval = 5 * time.Minute // Minimum interval allowed
)
// TestScheduleUsecase implements business logic for test schedules
type TestScheduleUsecase struct {
storage TestResultStorage
options *happydns.Options
}
// NewTestScheduleUsecase creates a new test schedule usecase
func NewTestScheduleUsecase(storage TestResultStorage, options *happydns.Options) *TestScheduleUsecase {
return &TestScheduleUsecase{
storage: storage,
options: options,
}
}
// ListUserSchedules retrieves all schedules for a specific user
func (u *TestScheduleUsecase) ListUserSchedules(userId happydns.Identifier) ([]*happydns.TestSchedule, error) {
return u.storage.ListTestSchedulesByUser(userId)
}
// ListSchedulesByTarget retrieves all schedules for a specific target
func (u *TestScheduleUsecase) ListSchedulesByTarget(targetType happydns.TestScopeType, targetId happydns.Identifier) ([]*happydns.TestSchedule, error) {
return u.storage.ListTestSchedulesByTarget(targetType, targetId)
}
// GetSchedule retrieves a specific schedule by ID
func (u *TestScheduleUsecase) GetSchedule(scheduleId happydns.Identifier) (*happydns.TestSchedule, error) {
return u.storage.GetTestSchedule(scheduleId)
}
// CreateSchedule creates a new test schedule with validation
func (u *TestScheduleUsecase) CreateSchedule(schedule *happydns.TestSchedule) error {
// Validate interval
if schedule.Interval < MinimumTestInterval {
return fmt.Errorf("test interval must be at least %v", MinimumTestInterval)
}
// Set default interval if not specified
if schedule.Interval == 0 {
schedule.Interval = u.getDefaultInterval(schedule.TargetType)
}
// Calculate next run time
if schedule.NextRun.IsZero() {
schedule.NextRun = time.Now().Add(schedule.Interval)
}
// Enable by default if not specified
if !schedule.Enabled {
schedule.Enabled = true
}
return u.storage.CreateTestSchedule(schedule)
}
// UpdateSchedule updates an existing schedule
func (u *TestScheduleUsecase) UpdateSchedule(schedule *happydns.TestSchedule) error {
// Validate interval
if schedule.Interval < MinimumTestInterval {
return fmt.Errorf("test interval must be at least %v", MinimumTestInterval)
}
// Get existing schedule to preserve certain fields
existing, err := u.storage.GetTestSchedule(schedule.Id)
if err != nil {
return err
}
// Preserve LastRun if not explicitly changed
if schedule.LastRun == nil {
schedule.LastRun = existing.LastRun
}
// Recalculate next run time if interval changed
if schedule.Interval != existing.Interval {
if schedule.LastRun != nil {
schedule.NextRun = schedule.LastRun.Add(schedule.Interval)
} else {
schedule.NextRun = time.Now().Add(schedule.Interval)
}
}
return u.storage.UpdateTestSchedule(schedule)
}
// DeleteSchedule removes a schedule
func (u *TestScheduleUsecase) DeleteSchedule(scheduleId happydns.Identifier) error {
return u.storage.DeleteTestSchedule(scheduleId)
}
// EnableSchedule enables a schedule
func (u *TestScheduleUsecase) EnableSchedule(scheduleId happydns.Identifier) error {
schedule, err := u.storage.GetTestSchedule(scheduleId)
if err != nil {
return err
}
schedule.Enabled = true
// Reset next run time if it's in the past
if schedule.NextRun.Before(time.Now()) {
schedule.NextRun = time.Now().Add(schedule.Interval)
}
return u.storage.UpdateTestSchedule(schedule)
}
// DisableSchedule disables a schedule
func (u *TestScheduleUsecase) DisableSchedule(scheduleId happydns.Identifier) error {
schedule, err := u.storage.GetTestSchedule(scheduleId)
if err != nil {
return err
}
schedule.Enabled = false
return u.storage.UpdateTestSchedule(schedule)
}
// UpdateScheduleAfterRun updates a schedule after it has been executed
func (u *TestScheduleUsecase) UpdateScheduleAfterRun(scheduleId happydns.Identifier) error {
schedule, err := u.storage.GetTestSchedule(scheduleId)
if err != nil {
return err
}
now := time.Now()
schedule.LastRun = &now
schedule.NextRun = now.Add(schedule.Interval)
return u.storage.UpdateTestSchedule(schedule)
}
// ListDueSchedules retrieves all enabled schedules that are due to run
func (u *TestScheduleUsecase) ListDueSchedules() ([]*happydns.TestSchedule, error) {
schedules, err := u.storage.ListEnabledTestSchedules()
if err != nil {
return nil, err
}
now := time.Now()
var dueSchedules []*happydns.TestSchedule
for _, schedule := range schedules {
if schedule.Enabled && schedule.NextRun.Before(now) {
dueSchedules = append(dueSchedules, schedule)
}
}
return dueSchedules, nil
}
// getDefaultInterval returns the default test interval based on target type
func (u *TestScheduleUsecase) getDefaultInterval(targetType happydns.TestScopeType) time.Duration {
switch targetType {
case happydns.TestScopeDomain:
return DefaultDomainTestInterval
case happydns.TestScopeService:
return DefaultServiceTestInterval
case happydns.TestScopeZone:
return DefaultDomainTestInterval
default:
return DefaultDomainTestInterval
}
}
// MergePluginOptions merges plugin options from different scopes
// Priority: schedule options > domain options > user options > global options
func (u *TestScheduleUsecase) MergePluginOptions(
globalOpts happydns.PluginOptions,
userOpts happydns.PluginOptions,
domainOpts happydns.PluginOptions,
scheduleOpts happydns.PluginOptions,
) happydns.PluginOptions {
merged := make(happydns.PluginOptions)
// Start with global options
for k, v := range globalOpts {
merged[k] = v
}
// Override with user options
for k, v := range userOpts {
merged[k] = v
}
// Override with domain options
for k, v := range domainOpts {
merged[k] = v
}
// Override with schedule options (highest priority)
for k, v := range scheduleOpts {
merged[k] = v
}
return merged
}
// ValidateScheduleOwnership checks if a user owns a schedule
func (u *TestScheduleUsecase) ValidateScheduleOwnership(scheduleId happydns.Identifier, userId happydns.Identifier) error {
schedule, err := u.storage.GetTestSchedule(scheduleId)
if err != nil {
return err
}
if !schedule.UserId.Equals(userId) {
return fmt.Errorf("user does not own this schedule")
}
return nil
}
// CreateDefaultSchedulesForTarget creates default schedules for a new target
func (u *TestScheduleUsecase) CreateDefaultSchedulesForTarget(
pluginName string,
targetType happydns.TestScopeType,
targetId happydns.Identifier,
userId happydns.Identifier,
enabled bool,
) error {
schedule := &happydns.TestSchedule{
PluginName: pluginName,
UserId: userId,
TargetType: targetType,
TargetId: targetId,
Interval: u.getDefaultInterval(targetType),
Enabled: enabled,
NextRun: time.Now().Add(u.getDefaultInterval(targetType)),
Options: make(happydns.PluginOptions),
}
return u.CreateSchedule(schedule)
}
// DeleteSchedulesForTarget removes all schedules for a target
func (u *TestScheduleUsecase) DeleteSchedulesForTarget(targetType happydns.TestScopeType, targetId happydns.Identifier) error {
schedules, err := u.storage.ListTestSchedulesByTarget(targetType, targetId)
if err != nil {
return err
}
for _, schedule := range schedules {
if err := u.storage.DeleteTestSchedule(schedule.Id); err != nil {
return err
}
}
return nil
}

View file

@ -107,6 +107,12 @@ type Options struct {
// ResultRetentionDays is how long to keep test results before cleanup
ResultRetentionDays int
// TestWorkers is the number of concurrent test executions allowed
TestWorkers int
// DisableScheduler disables the background test scheduler
DisableScheduler bool
}
// GetBaseURL returns the full url to the absolute ExternalURL, including BaseURL.

View file

@ -34,6 +34,7 @@ var (
ErrSessionNotFound = errors.New("session not found")
ErrTestExecutionNotFound = errors.New("test execution not found")
ErrTestResultNotFound = errors.New("test result not found")
ErrTestScheduleNotFound = errors.New("test schedule not found")
ErrUserNotFound = errors.New("user not found")
ErrUserAlreadyExist = errors.New("user already exists")
ErrZoneNotFound = errors.New("zone not found")

View file

@ -264,4 +264,20 @@ type TestScheduleUsecase interface {
// DeleteSchedulesForTarget removes all schedules for a target
DeleteSchedulesForTarget(targetType TestScopeType, targetId Identifier) error
// RescheduleUpcomingTests randomizes next run times for all enabled schedules
// within their respective intervals to spread load evenly.
RescheduleUpcomingTests() (int, error)
// RescheduleOverdueTests reschedules overdue tests to run soon, spread over a
// short window to avoid scheduler famine after a suspend or server restart.
RescheduleOverdueTests() (int, error)
}
// AdminSchedulerUsecase is satisfied by both testScheduler and disabledScheduler
type AdminSchedulerUsecase interface {
TriggerOnDemandTest(pluginName string, targetType TestScopeType, targetID Identifier, userID Identifier, options PluginOptions) (Identifier, error)
GetSchedulerStatus() SchedulerStatus
SetEnabled(enabled bool) error
RescheduleUpcomingTests() (int, error)
}

View file

@ -38,6 +38,7 @@ type UsecaseDependancies interface {
SessionUsecase() SessionUsecase
TestPluginUsecase() TestPluginUsecase
TestResultUsecase() TestResultUsecase
TestScheduleUsecase() TestScheduleUsecase
UserUsecase() UserUsecase
ZoneCorrectionApplierUsecase() ZoneCorrectionApplierUsecase
ZoneImporterUsecase() ZoneImporterUsecase