server: expose runtime metrics on /health for scheduler routing
Adds HealthResponse carrying inflight count, total requests, 1/5/15-min EWMA load averages, uptime, and NumCPU so a scheduler can pick the least busy worker. A background sampler updates the load averages every 5s, stopped by a new idempotent Close method. Work endpoints (/collect, /evaluate, /report) are wrapped with a trackWork middleware; /health and /definition are excluded so polling traffic does not pollute the signal.
This commit is contained in:
parent
fa5198f78c
commit
6b96ee8c2f
3 changed files with 300 additions and 11 deletions
|
|
@ -20,14 +20,42 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// maxRequestBodySize is the maximum allowed size for incoming request bodies (1 MB).
|
// maxRequestBodySize is the maximum allowed size for incoming request bodies (1 MB).
|
||||||
const maxRequestBodySize = 1 << 20
|
const maxRequestBodySize = 1 << 20
|
||||||
|
|
||||||
|
// loadSampleInterval is how often the background sampler updates the
|
||||||
|
// exponentially weighted moving averages reported in HealthResponse.LoadAvg.
|
||||||
|
// 5 seconds matches the Unix kernel's loadavg cadence.
|
||||||
|
const loadSampleInterval = 5 * time.Second
|
||||||
|
|
||||||
|
// EWMA smoothing factors for 1, 5, and 15-minute windows sampled every
|
||||||
|
// loadSampleInterval. Derived as 1 - exp(-interval/window) so that the
|
||||||
|
// steady-state response to a constant InFlight of N converges to N.
|
||||||
|
var (
|
||||||
|
loadAlpha1 = 1 - math.Exp(-float64(loadSampleInterval)/float64(1*time.Minute))
|
||||||
|
loadAlpha5 = 1 - math.Exp(-float64(loadSampleInterval)/float64(5*time.Minute))
|
||||||
|
loadAlpha15 = 1 - math.Exp(-float64(loadSampleInterval)/float64(15*time.Minute))
|
||||||
|
)
|
||||||
|
|
||||||
|
// updateLoadAvg advances the three EWMAs by one tick given the current
|
||||||
|
// InFlight sample. It is a pure function to keep the sampler trivially testable.
|
||||||
|
func updateLoadAvg(prev [3]float64, sample float64) [3]float64 {
|
||||||
|
return [3]float64{
|
||||||
|
prev[0] + loadAlpha1*(sample-prev[0]),
|
||||||
|
prev[1] + loadAlpha5*(sample-prev[1]),
|
||||||
|
prev[2] + loadAlpha15*(sample-prev[2]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Server is a generic HTTP server for external checkers.
|
// Server is a generic HTTP server for external checkers.
|
||||||
// It always exposes /health and /collect. If the provider implements
|
// It always exposes /health and /collect. If the provider implements
|
||||||
// CheckerDefinitionProvider, it also exposes /definition and /evaluate.
|
// CheckerDefinitionProvider, it also exposes /definition and /evaluate.
|
||||||
|
|
@ -41,29 +69,65 @@ type Server struct {
|
||||||
provider ObservationProvider
|
provider ObservationProvider
|
||||||
definition *CheckerDefinition
|
definition *CheckerDefinition
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
|
|
||||||
|
// startTime is captured in NewServer and used to compute uptime.
|
||||||
|
startTime time.Time
|
||||||
|
|
||||||
|
// inFlight counts work requests (/collect, /evaluate, /report) currently
|
||||||
|
// being processed. /health and /definition are not tracked.
|
||||||
|
inFlight atomic.Int64
|
||||||
|
|
||||||
|
// totalRequests is the cumulative number of work requests served.
|
||||||
|
totalRequests atomic.Uint64
|
||||||
|
|
||||||
|
// loadBits stores the 1, 5, 15-minute EWMAs of inFlight as float64 bit
|
||||||
|
// patterns (math.Float64bits) so reads and writes are tear-free and
|
||||||
|
// lock-free across the sampler goroutine and the /health handler.
|
||||||
|
loadBits [3]atomic.Uint64
|
||||||
|
|
||||||
|
// cancelSampler stops the background load-average sampler.
|
||||||
|
cancelSampler context.CancelFunc
|
||||||
|
|
||||||
|
// samplerDone is closed when the sampler goroutine returns.
|
||||||
|
samplerDone chan struct{}
|
||||||
|
|
||||||
|
// closeOnce guarantees Close is idempotent.
|
||||||
|
closeOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new checker HTTP server backed by the given provider.
|
// NewServer creates a new checker HTTP server backed by the given provider.
|
||||||
// Additional endpoints are registered based on optional interfaces the provider implements.
|
// Additional endpoints are registered based on optional interfaces the provider implements.
|
||||||
|
//
|
||||||
|
// NewServer also starts a background goroutine that samples the in-flight
|
||||||
|
// request count every loadSampleInterval to compute the load averages
|
||||||
|
// reported on /health. Call Close to stop it.
|
||||||
func NewServer(provider ObservationProvider) *Server {
|
func NewServer(provider ObservationProvider) *Server {
|
||||||
s := &Server{provider: provider}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s := &Server{
|
||||||
|
provider: provider,
|
||||||
|
startTime: time.Now(),
|
||||||
|
cancelSampler: cancel,
|
||||||
|
samplerDone: make(chan struct{}),
|
||||||
|
}
|
||||||
s.mux = http.NewServeMux()
|
s.mux = http.NewServeMux()
|
||||||
s.mux.HandleFunc("GET /health", s.handleHealth)
|
s.mux.HandleFunc("GET /health", s.handleHealth)
|
||||||
s.mux.HandleFunc("POST /collect", s.handleCollect)
|
s.mux.Handle("POST /collect", s.trackWork(http.HandlerFunc(s.handleCollect)))
|
||||||
|
|
||||||
if dp, ok := provider.(CheckerDefinitionProvider); ok {
|
if dp, ok := provider.(CheckerDefinitionProvider); ok {
|
||||||
s.definition = dp.Definition()
|
s.definition = dp.Definition()
|
||||||
s.definition.BuildRulesInfo()
|
s.definition.BuildRulesInfo()
|
||||||
s.mux.HandleFunc("GET /definition", s.handleDefinition)
|
s.mux.HandleFunc("GET /definition", s.handleDefinition)
|
||||||
s.mux.HandleFunc("POST /evaluate", s.handleEvaluate)
|
s.mux.Handle("POST /evaluate", s.trackWork(http.HandlerFunc(s.handleEvaluate)))
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := provider.(CheckerHTMLReporter); ok {
|
if _, ok := provider.(CheckerHTMLReporter); ok {
|
||||||
s.mux.HandleFunc("POST /report", s.handleReport)
|
s.mux.Handle("POST /report", s.trackWork(http.HandlerFunc(s.handleReport)))
|
||||||
} else if _, ok := provider.(CheckerMetricsReporter); ok {
|
} else if _, ok := provider.(CheckerMetricsReporter); ok {
|
||||||
s.mux.HandleFunc("POST /report", s.handleReport)
|
s.mux.Handle("POST /report", s.trackWork(http.HandlerFunc(s.handleReport)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go s.runSampler(ctx)
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,11 +138,61 @@ func (s *Server) Handler() http.Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenAndServe starts the HTTP server on the given address.
|
// ListenAndServe starts the HTTP server on the given address.
|
||||||
|
//
|
||||||
|
// ListenAndServe does not stop the background load-average sampler on return;
|
||||||
|
// call Close to stop it. This is not required for process-scoped usage but is
|
||||||
|
// recommended for tests and embedded lifecycles.
|
||||||
func (s *Server) ListenAndServe(addr string) error {
|
func (s *Server) ListenAndServe(addr string) error {
|
||||||
log.Printf("checker listening on %s", addr)
|
log.Printf("checker listening on %s", addr)
|
||||||
return http.ListenAndServe(addr, requestLogger(s.mux))
|
return http.ListenAndServe(addr, requestLogger(s.mux))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close stops the background load-average sampler goroutine. It is safe to
|
||||||
|
// call multiple times; subsequent calls are no-ops. Close does not shut down
|
||||||
|
// any underlying http.Server — callers own that lifecycle.
|
||||||
|
func (s *Server) Close() error {
|
||||||
|
s.closeOnce.Do(func() {
|
||||||
|
s.cancelSampler()
|
||||||
|
<-s.samplerDone
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// trackWork wraps a handler with in-flight and total-request accounting.
|
||||||
|
// It is applied only to "work" endpoints (/collect, /evaluate, /report) so
|
||||||
|
// that /health polling traffic does not pollute the load signal.
|
||||||
|
func (s *Server) trackWork(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.inFlight.Add(1)
|
||||||
|
s.totalRequests.Add(1)
|
||||||
|
defer s.inFlight.Add(-1)
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// runSampler updates the load-average EWMAs every loadSampleInterval until
|
||||||
|
// ctx is canceled. It closes s.samplerDone on exit.
|
||||||
|
func (s *Server) runSampler(ctx context.Context) {
|
||||||
|
defer close(s.samplerDone)
|
||||||
|
ticker := time.NewTicker(loadSampleInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
var prev [3]float64
|
||||||
|
for i := range prev {
|
||||||
|
prev[i] = math.Float64frombits(s.loadBits[i].Load())
|
||||||
|
}
|
||||||
|
next := updateLoadAvg(prev, float64(s.inFlight.Load()))
|
||||||
|
for i := range next {
|
||||||
|
s.loadBits[i].Store(math.Float64bits(next[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type statusRecorder struct {
|
type statusRecorder struct {
|
||||||
http.ResponseWriter
|
http.ResponseWriter
|
||||||
status int
|
status int
|
||||||
|
|
@ -99,8 +213,18 @@ func requestLogger(next http.Handler) http.Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
var load [3]float64
|
||||||
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
|
for i := range load {
|
||||||
|
load[i] = math.Float64frombits(s.loadBits[i].Load())
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, HealthResponse{
|
||||||
|
Status: "ok",
|
||||||
|
Uptime: time.Since(s.startTime).Seconds(),
|
||||||
|
NumCPU: runtime.NumCPU(),
|
||||||
|
InFlight: s.inFlight.Load(),
|
||||||
|
TotalRequests: s.totalRequests.Load(),
|
||||||
|
LoadAvg: load,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleDefinition(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleDefinition(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
@ -96,14 +97,148 @@ func doRequest(handler http.Handler, method, path string, body any, headers map[
|
||||||
func TestServer_Health(t *testing.T) {
|
func TestServer_Health(t *testing.T) {
|
||||||
p := &testProvider{key: "test", definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}}
|
p := &testProvider{key: "test", definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}}
|
||||||
srv := newTestServer(p)
|
srv := newTestServer(p)
|
||||||
|
defer srv.Close()
|
||||||
rec := doRequest(srv.Handler(), "GET", "/health", nil, nil)
|
rec := doRequest(srv.Handler(), "GET", "/health", nil, nil)
|
||||||
if rec.Code != http.StatusOK {
|
if rec.Code != http.StatusOK {
|
||||||
t.Fatalf("GET /health = %d, want %d", rec.Code, http.StatusOK)
|
t.Fatalf("GET /health = %d, want %d", rec.Code, http.StatusOK)
|
||||||
}
|
}
|
||||||
var resp map[string]string
|
var resp HealthResponse
|
||||||
json.NewDecoder(rec.Body).Decode(&resp)
|
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
|
||||||
if resp["status"] != "ok" {
|
t.Fatalf("decode /health: %v", err)
|
||||||
t.Errorf("GET /health status = %q, want \"ok\"", resp["status"])
|
}
|
||||||
|
if resp.Status != "ok" {
|
||||||
|
t.Errorf("GET /health status = %q, want \"ok\"", resp.Status)
|
||||||
|
}
|
||||||
|
if resp.NumCPU <= 0 {
|
||||||
|
t.Errorf("NumCPU = %d, want > 0", resp.NumCPU)
|
||||||
|
}
|
||||||
|
if resp.Uptime < 0 {
|
||||||
|
t.Errorf("Uptime = %v, want >= 0", resp.Uptime)
|
||||||
|
}
|
||||||
|
if resp.InFlight != 0 {
|
||||||
|
t.Errorf("InFlight = %d on fresh server, want 0", resp.InFlight)
|
||||||
|
}
|
||||||
|
if resp.TotalRequests != 0 {
|
||||||
|
t.Errorf("TotalRequests = %d on fresh server, want 0", resp.TotalRequests)
|
||||||
|
}
|
||||||
|
if resp.LoadAvg != [3]float64{0, 0, 0} {
|
||||||
|
t.Errorf("LoadAvg = %v on fresh server, want all zero", resp.LoadAvg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServer_Health_TracksInFlight(t *testing.T) {
|
||||||
|
release := make(chan struct{})
|
||||||
|
var collectEntered sync.WaitGroup
|
||||||
|
p := &testProvider{
|
||||||
|
key: "test",
|
||||||
|
definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}},
|
||||||
|
collectFn: func(ctx context.Context, opts CheckerOptions) (any, error) {
|
||||||
|
collectEntered.Done()
|
||||||
|
<-release
|
||||||
|
return map[string]string{"ok": "1"}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv := newTestServer(p)
|
||||||
|
defer srv.Close()
|
||||||
|
handler := srv.Handler()
|
||||||
|
|
||||||
|
const n = 3
|
||||||
|
collectEntered.Add(n)
|
||||||
|
var clientsDone sync.WaitGroup
|
||||||
|
clientsDone.Add(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
defer clientsDone.Done()
|
||||||
|
doRequest(handler, "POST", "/collect", ExternalCollectRequest{Key: "test"}, nil)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all n handlers to be inside collectFn (== all n in-flight).
|
||||||
|
collectEntered.Wait()
|
||||||
|
|
||||||
|
// Record /health mid-flight. Also hammer it to verify /health polls
|
||||||
|
// do not inflate InFlight or TotalRequests.
|
||||||
|
var mid HealthResponse
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
rec := doRequest(handler, "GET", "/health", nil, nil)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Fatalf("GET /health = %d, want %d", rec.Code, http.StatusOK)
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(rec.Body).Decode(&mid); err != nil {
|
||||||
|
t.Fatalf("decode /health: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if mid.InFlight != n {
|
||||||
|
t.Errorf("mid-flight InFlight = %d, want %d", mid.InFlight, n)
|
||||||
|
}
|
||||||
|
if mid.TotalRequests != n {
|
||||||
|
t.Errorf("mid-flight TotalRequests = %d, want %d (health polls must not count)", mid.TotalRequests, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release all work and wait for clients to return.
|
||||||
|
close(release)
|
||||||
|
clientsDone.Wait()
|
||||||
|
|
||||||
|
rec := doRequest(handler, "GET", "/health", nil, nil)
|
||||||
|
var after HealthResponse
|
||||||
|
if err := json.NewDecoder(rec.Body).Decode(&after); err != nil {
|
||||||
|
t.Fatalf("decode /health: %v", err)
|
||||||
|
}
|
||||||
|
if after.InFlight != 0 {
|
||||||
|
t.Errorf("post-flight InFlight = %d, want 0", after.InFlight)
|
||||||
|
}
|
||||||
|
if after.TotalRequests != n {
|
||||||
|
t.Errorf("post-flight TotalRequests = %d, want %d", after.TotalRequests, n)
|
||||||
|
}
|
||||||
|
if after.Uptime < mid.Uptime {
|
||||||
|
t.Errorf("Uptime went backwards: mid=%v after=%v", mid.Uptime, after.Uptime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateLoadAvg(t *testing.T) {
|
||||||
|
load := [3]float64{0, 0, 0}
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
load = updateLoadAvg(load, 5)
|
||||||
|
}
|
||||||
|
if !(load[0] > load[1] && load[1] > load[2]) {
|
||||||
|
t.Errorf("expected load[0] > load[1] > load[2], got %v", load)
|
||||||
|
}
|
||||||
|
for i, v := range load {
|
||||||
|
if v <= 0 {
|
||||||
|
t.Errorf("load[%d] = %v, want > 0", i, v)
|
||||||
|
}
|
||||||
|
if v >= 5 {
|
||||||
|
t.Errorf("load[%d] = %v, want < 5 (not yet converged)", i, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Constant sample of zero from a non-zero state must decay toward zero.
|
||||||
|
decaying := load
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
decaying = updateLoadAvg(decaying, 0)
|
||||||
|
}
|
||||||
|
for i := range decaying {
|
||||||
|
if decaying[i] >= load[i] {
|
||||||
|
t.Errorf("decaying[%d] = %v, want < %v", i, decaying[i], load[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServer_Close_Idempotent(t *testing.T) {
|
||||||
|
p := &testProvider{key: "test", definition: &CheckerDefinition{ID: "test", Rules: []CheckRule{}}}
|
||||||
|
srv := newTestServer(p)
|
||||||
|
done := make(chan error, 2)
|
||||||
|
go func() { done <- srv.Close() }()
|
||||||
|
go func() { done <- srv.Close() }()
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Close() returned %v, want nil", err)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Close() deadlocked")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -336,3 +336,33 @@ type ExternalReportRequest struct {
|
||||||
Key ObservationKey `json:"key"`
|
Key ObservationKey `json:"key"`
|
||||||
Data json.RawMessage `json:"data"`
|
Data json.RawMessage `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HealthResponse is returned by GET /health on a remote checker endpoint.
|
||||||
|
// It carries lightweight runtime signals so a scheduler can pick the least
|
||||||
|
// busy worker among a set of equivalent checker instances.
|
||||||
|
//
|
||||||
|
// LoadAvg mirrors /proc/loadavg semantics: it is the 1, 5, 15-minute
|
||||||
|
// exponentially weighted moving average of the InFlight request count,
|
||||||
|
// sampled every 5 seconds. Divide by NumCPU to estimate saturation.
|
||||||
|
type HealthResponse struct {
|
||||||
|
// Status is a coarse liveness indicator. Currently always "ok";
|
||||||
|
// "degraded" is reserved for future use.
|
||||||
|
Status string `json:"status"`
|
||||||
|
|
||||||
|
// Uptime is the number of (fractional) seconds since the server started.
|
||||||
|
Uptime float64 `json:"uptime_seconds"`
|
||||||
|
|
||||||
|
// NumCPU is the value of runtime.NumCPU() on this worker.
|
||||||
|
NumCPU int `json:"num_cpu"`
|
||||||
|
|
||||||
|
// InFlight is the number of work requests (/collect, /evaluate, /report)
|
||||||
|
// currently being processed. /health and /definition are not counted.
|
||||||
|
InFlight int64 `json:"inflight"`
|
||||||
|
|
||||||
|
// TotalRequests is the cumulative number of work requests served since
|
||||||
|
// the server started. /health and /definition are not counted.
|
||||||
|
TotalRequests uint64 `json:"total_requests"`
|
||||||
|
|
||||||
|
// LoadAvg holds the 1, 5, 15-minute EWMAs of InFlight.
|
||||||
|
LoadAvg [3]float64 `json:"loadavg"`
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue