diff --git a/checker/server.go b/checker/server.go index c036b07..d9b7f61 100644 --- a/checker/server.go +++ b/checker/server.go @@ -20,14 +20,42 @@ import ( "fmt" "io" "log" + "math" "net/http" + "runtime" "strings" + "sync" + "sync/atomic" "time" ) // maxRequestBodySize is the maximum allowed size for incoming request bodies (1 MB). const maxRequestBodySize = 1 << 20 +// loadSampleInterval is how often the background sampler updates the +// exponentially weighted moving averages reported in HealthResponse.LoadAvg. +// 5 seconds matches the Unix kernel's loadavg cadence. +const loadSampleInterval = 5 * time.Second + +// EWMA smoothing factors for 1, 5, and 15-minute windows sampled every +// loadSampleInterval. Derived as 1 - exp(-interval/window) so that the +// steady-state response to a constant InFlight of N converges to N. +var ( + loadAlpha1 = 1 - math.Exp(-float64(loadSampleInterval)/float64(1*time.Minute)) + loadAlpha5 = 1 - math.Exp(-float64(loadSampleInterval)/float64(5*time.Minute)) + loadAlpha15 = 1 - math.Exp(-float64(loadSampleInterval)/float64(15*time.Minute)) +) + +// updateLoadAvg advances the three EWMAs by one tick given the current +// InFlight sample. It is a pure function to keep the sampler trivially testable. +func updateLoadAvg(prev [3]float64, sample float64) [3]float64 { + return [3]float64{ + prev[0] + loadAlpha1*(sample-prev[0]), + prev[1] + loadAlpha5*(sample-prev[1]), + prev[2] + loadAlpha15*(sample-prev[2]), + } +} + // Server is a generic HTTP server for external checkers. // It always exposes /health and /collect. If the provider implements // CheckerDefinitionProvider, it also exposes /definition and /evaluate. @@ -41,29 +69,65 @@ type Server struct { provider ObservationProvider definition *CheckerDefinition mux *http.ServeMux + + // startTime is captured in NewServer and used to compute uptime. + startTime time.Time + + // inFlight counts work requests (/collect, /evaluate, /report) currently + // being processed. /health and /definition are not tracked. + inFlight atomic.Int64 + + // totalRequests is the cumulative number of work requests served. + totalRequests atomic.Uint64 + + // loadBits stores the 1, 5, 15-minute EWMAs of inFlight as float64 bit + // patterns (math.Float64bits) so reads and writes are tear-free and + // lock-free across the sampler goroutine and the /health handler. + loadBits [3]atomic.Uint64 + + // cancelSampler stops the background load-average sampler. + cancelSampler context.CancelFunc + + // samplerDone is closed when the sampler goroutine returns. + samplerDone chan struct{} + + // closeOnce guarantees Close is idempotent. + closeOnce sync.Once } // NewServer creates a new checker HTTP server backed by the given provider. // Additional endpoints are registered based on optional interfaces the provider implements. +// +// NewServer also starts a background goroutine that samples the in-flight +// request count every loadSampleInterval to compute the load averages +// reported on /health. Call Close to stop it. func NewServer(provider ObservationProvider) *Server { - s := &Server{provider: provider} + ctx, cancel := context.WithCancel(context.Background()) + s := &Server{ + provider: provider, + startTime: time.Now(), + cancelSampler: cancel, + samplerDone: make(chan struct{}), + } s.mux = http.NewServeMux() s.mux.HandleFunc("GET /health", s.handleHealth) - s.mux.HandleFunc("POST /collect", s.handleCollect) + s.mux.Handle("POST /collect", s.trackWork(http.HandlerFunc(s.handleCollect))) if dp, ok := provider.(CheckerDefinitionProvider); ok { s.definition = dp.Definition() s.definition.BuildRulesInfo() s.mux.HandleFunc("GET /definition", s.handleDefinition) - s.mux.HandleFunc("POST /evaluate", s.handleEvaluate) + s.mux.Handle("POST /evaluate", s.trackWork(http.HandlerFunc(s.handleEvaluate))) } if _, ok := provider.(CheckerHTMLReporter); ok { - s.mux.HandleFunc("POST /report", s.handleReport) + s.mux.Handle("POST /report", s.trackWork(http.HandlerFunc(s.handleReport))) } else if _, ok := provider.(CheckerMetricsReporter); ok { - s.mux.HandleFunc("POST /report", s.handleReport) + s.mux.Handle("POST /report", s.trackWork(http.HandlerFunc(s.handleReport))) } + go s.runSampler(ctx) + return s } @@ -74,11 +138,61 @@ func (s *Server) Handler() http.Handler { } // ListenAndServe starts the HTTP server on the given address. +// +// ListenAndServe does not stop the background load-average sampler on return; +// call Close to stop it. This is not required for process-scoped usage but is +// recommended for tests and embedded lifecycles. func (s *Server) ListenAndServe(addr string) error { log.Printf("checker listening on %s", addr) return http.ListenAndServe(addr, requestLogger(s.mux)) } +// Close stops the background load-average sampler goroutine. It is safe to +// call multiple times; subsequent calls are no-ops. Close does not shut down +// any underlying http.Server — callers own that lifecycle. +func (s *Server) Close() error { + s.closeOnce.Do(func() { + s.cancelSampler() + <-s.samplerDone + }) + return nil +} + +// trackWork wraps a handler with in-flight and total-request accounting. +// It is applied only to "work" endpoints (/collect, /evaluate, /report) so +// that /health polling traffic does not pollute the load signal. +func (s *Server) trackWork(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.inFlight.Add(1) + s.totalRequests.Add(1) + defer s.inFlight.Add(-1) + next.ServeHTTP(w, r) + }) +} + +// runSampler updates the load-average EWMAs every loadSampleInterval until +// ctx is canceled. It closes s.samplerDone on exit. +func (s *Server) runSampler(ctx context.Context) { + defer close(s.samplerDone) + ticker := time.NewTicker(loadSampleInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var prev [3]float64 + for i := range prev { + prev[i] = math.Float64frombits(s.loadBits[i].Load()) + } + next := updateLoadAvg(prev, float64(s.inFlight.Load())) + for i := range next { + s.loadBits[i].Store(math.Float64bits(next[i])) + } + } + } +} + type statusRecorder struct { http.ResponseWriter status int @@ -99,8 +213,18 @@ func requestLogger(next http.Handler) http.Handler { } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) + var load [3]float64 + for i := range load { + load[i] = math.Float64frombits(s.loadBits[i].Load()) + } + writeJSON(w, http.StatusOK, HealthResponse{ + Status: "ok", + Uptime: time.Since(s.startTime).Seconds(), + NumCPU: runtime.NumCPU(), + InFlight: s.inFlight.Load(), + TotalRequests: s.totalRequests.Load(), + LoadAvg: load, + }) } func (s *Server) handleDefinition(w http.ResponseWriter, r *http.Request) { diff --git a/checker/server_test.go b/checker/server_test.go index 3bc36bd..a20ee61 100644 --- a/checker/server_test.go +++ b/checker/server_test.go @@ -21,6 +21,7 @@ import ( "errors" "net/http" "net/http/httptest" + "sync" "testing" "time" ) @@ -96,14 +97,148 @@ func doRequest(handler http.Handler, method, path string, body any, headers map[ func TestServer_Health(t *testing.T) { p := &testProvider{key: "test", definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}} srv := newTestServer(p) + defer srv.Close() rec := doRequest(srv.Handler(), "GET", "/health", nil, nil) if rec.Code != http.StatusOK { t.Fatalf("GET /health = %d, want %d", rec.Code, http.StatusOK) } - var resp map[string]string - json.NewDecoder(rec.Body).Decode(&resp) - if resp["status"] != "ok" { - t.Errorf("GET /health status = %q, want \"ok\"", resp["status"]) + var resp HealthResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode /health: %v", err) + } + if resp.Status != "ok" { + t.Errorf("GET /health status = %q, want \"ok\"", resp.Status) + } + if resp.NumCPU <= 0 { + t.Errorf("NumCPU = %d, want > 0", resp.NumCPU) + } + if resp.Uptime < 0 { + t.Errorf("Uptime = %v, want >= 0", resp.Uptime) + } + if resp.InFlight != 0 { + t.Errorf("InFlight = %d on fresh server, want 0", resp.InFlight) + } + if resp.TotalRequests != 0 { + t.Errorf("TotalRequests = %d on fresh server, want 0", resp.TotalRequests) + } + if resp.LoadAvg != [3]float64{0, 0, 0} { + t.Errorf("LoadAvg = %v on fresh server, want all zero", resp.LoadAvg) + } +} + +func TestServer_Health_TracksInFlight(t *testing.T) { + release := make(chan struct{}) + var collectEntered sync.WaitGroup + p := &testProvider{ + key: "test", + definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}, + collectFn: func(ctx context.Context, opts CheckerOptions) (any, error) { + collectEntered.Done() + <-release + return map[string]string{"ok": "1"}, nil + }, + } + srv := newTestServer(p) + defer srv.Close() + handler := srv.Handler() + + const n = 3 + collectEntered.Add(n) + var clientsDone sync.WaitGroup + clientsDone.Add(n) + for i := 0; i < n; i++ { + go func() { + defer clientsDone.Done() + doRequest(handler, "POST", "/collect", ExternalCollectRequest{Key: "test"}, nil) + }() + } + + // Wait for all n handlers to be inside collectFn (== all n in-flight). + collectEntered.Wait() + + // Record /health mid-flight. Also hammer it to verify /health polls + // do not inflate InFlight or TotalRequests. + var mid HealthResponse + for i := 0; i < 5; i++ { + rec := doRequest(handler, "GET", "/health", nil, nil) + if rec.Code != http.StatusOK { + t.Fatalf("GET /health = %d, want %d", rec.Code, http.StatusOK) + } + if err := json.NewDecoder(rec.Body).Decode(&mid); err != nil { + t.Fatalf("decode /health: %v", err) + } + } + if mid.InFlight != n { + t.Errorf("mid-flight InFlight = %d, want %d", mid.InFlight, n) + } + if mid.TotalRequests != n { + t.Errorf("mid-flight TotalRequests = %d, want %d (health polls must not count)", mid.TotalRequests, n) + } + + // Release all work and wait for clients to return. + close(release) + clientsDone.Wait() + + rec := doRequest(handler, "GET", "/health", nil, nil) + var after HealthResponse + if err := json.NewDecoder(rec.Body).Decode(&after); err != nil { + t.Fatalf("decode /health: %v", err) + } + if after.InFlight != 0 { + t.Errorf("post-flight InFlight = %d, want 0", after.InFlight) + } + if after.TotalRequests != n { + t.Errorf("post-flight TotalRequests = %d, want %d", after.TotalRequests, n) + } + if after.Uptime < mid.Uptime { + t.Errorf("Uptime went backwards: mid=%v after=%v", mid.Uptime, after.Uptime) + } +} + +func TestUpdateLoadAvg(t *testing.T) { + load := [3]float64{0, 0, 0} + for i := 0; i < 20; i++ { + load = updateLoadAvg(load, 5) + } + if !(load[0] > load[1] && load[1] > load[2]) { + t.Errorf("expected load[0] > load[1] > load[2], got %v", load) + } + for i, v := range load { + if v <= 0 { + t.Errorf("load[%d] = %v, want > 0", i, v) + } + if v >= 5 { + t.Errorf("load[%d] = %v, want < 5 (not yet converged)", i, v) + } + } + + // Constant sample of zero from a non-zero state must decay toward zero. + decaying := load + for i := 0; i < 50; i++ { + decaying = updateLoadAvg(decaying, 0) + } + for i := range decaying { + if decaying[i] >= load[i] { + t.Errorf("decaying[%d] = %v, want < %v", i, decaying[i], load[i]) + } + } +} + +func TestServer_Close_Idempotent(t *testing.T) { + p := &testProvider{key: "test", definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}} + srv := newTestServer(p) + done := make(chan error, 2) + go func() { done <- srv.Close() }() + go func() { done <- srv.Close() }() + for i := 0; i < 2; i++ { + select { + case err := <-done: + if err != nil { + t.Errorf("Close() returned %v, want nil", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Close() deadlocked") + } } } diff --git a/checker/types.go b/checker/types.go index 9adced0..08f7431 100644 --- a/checker/types.go +++ b/checker/types.go @@ -336,3 +336,33 @@ type ExternalReportRequest struct { Key ObservationKey `json:"key"` Data json.RawMessage `json:"data"` } + +// HealthResponse is returned by GET /health on a remote checker endpoint. +// It carries lightweight runtime signals so a scheduler can pick the least +// busy worker among a set of equivalent checker instances. +// +// LoadAvg mirrors /proc/loadavg semantics: it is the 1, 5, 15-minute +// exponentially weighted moving average of the InFlight request count, +// sampled every 5 seconds. Divide by NumCPU to estimate saturation. +type HealthResponse struct { + // Status is a coarse liveness indicator. Currently always "ok"; + // "degraded" is reserved for future use. + Status string `json:"status"` + + // Uptime is the number of (fractional) seconds since the server started. + Uptime float64 `json:"uptime_seconds"` + + // NumCPU is the value of runtime.NumCPU() on this worker. + NumCPU int `json:"num_cpu"` + + // InFlight is the number of work requests (/collect, /evaluate, /report) + // currently being processed. /health and /definition are not counted. + InFlight int64 `json:"inflight"` + + // TotalRequests is the cumulative number of work requests served since + // the server started. /health and /definition are not counted. + TotalRequests uint64 `json:"total_requests"` + + // LoadAvg holds the 1, 5, 15-minute EWMAs of InFlight. + LoadAvg [3]float64 `json:"loadavg"` +}