checker-alias/checker/rules_test.go
Pierre-Olivier Mercier c5c13960d5
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
checker: add dname_coexistence rule and refactor sibling probing
Extract querySiblings from observeCoexistence so both CNAME and DNAME
coexistence checks share the same parallel RRset scan. Add
observeDNAMECoexistence (called from Collect) that populates
AliasData.DNAMECoexistence for each DNAME node in DNAMESubstitutions.
Add the dname_coexistence rule (RFC 6672 §2.3) that flags any sibling
RRsets at a DNAME owner as CRIT, with matching tests.
2026-05-16 21:36:20 +08:00

383 lines
12 KiB
Go

package checker
import (
"context"
"encoding/json"
"strings"
"testing"
sdk "git.happydns.org/checker-sdk-go/checker"
)
// JSON-round-tripping mirrors the production read path so tests catch tag
// drift between AliasData fields and rule expectations.
type fakeObs struct {
data *AliasData
}
func (f fakeObs) Get(_ context.Context, _ sdk.ObservationKey, dest any) error {
if f.data == nil {
return nil
}
raw, err := json.Marshal(f.data)
if err != nil {
return err
}
return json.Unmarshal(raw, dest)
}
func (fakeObs) GetRelated(_ context.Context, _ sdk.ObservationKey) ([]sdk.RelatedObservation, error) {
return nil, nil
}
func run(r sdk.CheckRule, data *AliasData, opts sdk.CheckerOptions) []sdk.CheckState {
return r.Evaluate(context.Background(), fakeObs{data: data}, opts)
}
// apexKnownData returns a minimal AliasData whose apex lookup succeeded, used
// as a baseline so non-apex rules can run.
func apexKnownData() *AliasData {
return &AliasData{
Owner: "www.example.com.",
Apex: "example.com.",
}
}
func assertSkipped(t *testing.T, states []sdk.CheckState, wantSubstr string) {
t.Helper()
if len(states) != 1 {
t.Fatalf("want 1 state, got %d: %+v", len(states), states)
}
if states[0].Status != sdk.StatusUnknown {
t.Fatalf("want StatusUnknown (skipped), got %v", states[0].Status)
}
if !strings.Contains(states[0].Message, "skipped") || !strings.Contains(states[0].Message, wantSubstr) {
t.Fatalf("want skipped message containing %q, got %q", wantSubstr, states[0].Message)
}
}
func assertSingle(t *testing.T, states []sdk.CheckState, want sdk.Status) sdk.CheckState {
t.Helper()
if len(states) != 1 {
t.Fatalf("want 1 state, got %d: %+v", len(states), states)
}
if states[0].Status != want {
t.Fatalf("want status %v, got %v (msg=%q)", want, states[0].Status, states[0].Message)
}
return states[0]
}
func TestApexLookupRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
s := assertSingle(t, run(apexLookupRule{}, apexKnownData(), nil), sdk.StatusOK)
if s.Subject != "example.com." {
t.Fatalf("want subject=example.com., got %q", s.Subject)
}
})
t.Run("failure", func(t *testing.T) {
data := &AliasData{Owner: "www.nope.invalid.", ApexLookupError: "no SOA"}
s := assertSingle(t, run(apexLookupRule{}, data, nil), sdk.StatusCrit)
if s.Meta[hintKey] == nil {
t.Fatalf("want hint, got none")
}
})
}
func TestChainLoopRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
assertSingle(t, run(chainLoopRule{}, d, nil), sdk.StatusOK)
})
t.Run("loop", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated = ChainTermination{Reason: TermLoop, Subject: "a.example.com."}
s := assertSingle(t, run(chainLoopRule{}, d, nil), sdk.StatusCrit)
if s.Subject != "a.example.com." {
t.Fatalf("want subject to be loop offender, got %q", s.Subject)
}
})
t.Run("skip when apex unknown", func(t *testing.T) {
d := &AliasData{Owner: "x.", ApexLookupError: "boom"}
assertSkipped(t, run(chainLoopRule{}, d, nil), "apex")
})
}
func TestChainLengthRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
assertSingle(t, run(chainLengthRule{}, d, nil), sdk.StatusOK)
})
t.Run("too long", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated = ChainTermination{Reason: TermTooLong, Subject: "deep.example.com."}
assertSingle(t, run(chainLengthRule{}, d, sdk.CheckerOptions{"maxChainLength": float64(3)}), sdk.StatusCrit)
})
}
func TestChainQueryErrorRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
assertSingle(t, run(chainQueryErrorRule{}, d, nil), sdk.StatusOK)
})
t.Run("query err", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated = ChainTermination{Reason: TermQueryErr, Subject: "broken.example.com.", Detail: "timeout"}
assertSingle(t, run(chainQueryErrorRule{}, d, nil), sdk.StatusWarn)
})
}
func TestChainRcodeRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
assertSingle(t, run(chainRcodeRule{}, d, nil), sdk.StatusOK)
})
t.Run("mid-chain NXDOMAIN", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated = ChainTermination{Reason: TermRcode, Subject: "gone.example.com.", Rcode: "NXDOMAIN"}
assertSingle(t, run(chainRcodeRule{}, d, nil), sdk.StatusCrit)
})
t.Run("final rcode", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
d.FinalTarget = "target.example."
d.FinalRcode = "SERVFAIL"
states := run(chainRcodeRule{}, d, nil)
if len(states) != 1 || states[0].Status != sdk.StatusWarn {
t.Fatalf("want single WARN, got %+v", states)
}
})
}
func TestHopTTLRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.Chain = []ChainHop{{Owner: "a.", Kind: KindCNAME, Target: "b.", TTL: 300}}
d.ChainTerminated.Reason = TermOK
assertSingle(t, run(hopTTLRule{}, d, nil), sdk.StatusOK)
})
t.Run("multi-subject low TTL", func(t *testing.T) {
d := apexKnownData()
d.Chain = []ChainHop{
{Owner: "a.", Kind: KindCNAME, Target: "b.", TTL: 10},
{Owner: "b.", Kind: KindCNAME, Target: "c.", TTL: 20},
{Owner: "c.", Kind: KindTarget},
}
states := run(hopTTLRule{}, d, sdk.CheckerOptions{"minTargetTTL": float64(60)})
if len(states) != 2 {
t.Fatalf("want 2 states (one per low-TTL hop), got %d: %+v", len(states), states)
}
for _, s := range states {
if s.Status != sdk.StatusWarn {
t.Fatalf("want WARN, got %v", s.Status)
}
}
})
t.Run("skip empty chain", func(t *testing.T) {
d := apexKnownData()
assertSkipped(t, run(hopTTLRule{}, d, nil), "chain is empty")
})
}
func TestCnameAtApexRule(t *testing.T) {
t.Run("ok when no cname at apex", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
d.Owner = "example.com."
assertSingle(t, run(cnameAtApexRule{}, d, nil), sdk.StatusOK)
})
t.Run("crit when apex has cname", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
d.ApexHasCNAME = true
d.Owner = "example.com."
assertSingle(t, run(cnameAtApexRule{}, d, nil), sdk.StatusCrit)
})
t.Run("warn when allowApexCNAME", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
d.ApexHasCNAME = true
d.Owner = "example.com."
assertSingle(t, run(cnameAtApexRule{}, d, sdk.CheckerOptions{"allowApexCNAME": true}), sdk.StatusWarn)
})
t.Run("skip when not apex", func(t *testing.T) {
d := apexKnownData()
assertSkipped(t, run(cnameAtApexRule{}, d, nil), "apex")
})
}
func TestApexFlatteningRule(t *testing.T) {
t.Run("ok when no flattening", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
assertSingle(t, run(apexFlatteningRule{}, d, nil), sdk.StatusOK)
})
t.Run("info when flattening recognized", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
d.ApexFlattening = true
assertSingle(t, run(apexFlatteningRule{}, d, nil), sdk.StatusInfo)
})
t.Run("skip when recognizeApexFlattening=false", func(t *testing.T) {
d := apexKnownData()
d.OwnerIsApex = true
d.ApexFlattening = true
assertSkipped(t, run(apexFlatteningRule{}, d, sdk.CheckerOptions{"recognizeApexFlattening": false}), "recognizeApexFlattening")
})
}
func TestCnameCoexistenceRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.OwnerHasCNAME = true
assertSingle(t, run(cnameCoexistenceRule{}, d, nil), sdk.StatusOK)
})
t.Run("multi-subject crit", func(t *testing.T) {
d := apexKnownData()
d.OwnerHasCNAME = true
d.Coexisting = []CoexistingRRset{{Type: "MX"}, {Type: "TXT"}}
states := run(cnameCoexistenceRule{}, d, nil)
if len(states) != 2 {
t.Fatalf("want 2 states, got %d", len(states))
}
})
t.Run("apex A/AAAA excused by flattening", func(t *testing.T) {
d := apexKnownData()
d.Owner = "example.com."
d.OwnerIsApex = true
d.OwnerHasCNAME = true
d.ApexFlattening = true
d.Coexisting = []CoexistingRRset{{Type: "A"}, {Type: "AAAA"}, {Type: "MX"}}
states := run(cnameCoexistenceRule{}, d, nil)
// Only MX remains, A/AAAA excused.
if len(states) != 1 {
t.Fatalf("want 1 state (MX only), got %d: %+v", len(states), states)
}
if states[0].Code != "MX" {
t.Fatalf("want code=MX, got %q", states[0].Code)
}
})
t.Run("skip without cname", func(t *testing.T) {
d := apexKnownData()
assertSkipped(t, run(cnameCoexistenceRule{}, d, nil), "owner has no CNAME")
})
}
func TestDnameCoexistenceRule(t *testing.T) {
t.Run("skip when no DNAME in chain", func(t *testing.T) {
d := apexKnownData()
assertSkipped(t, run(dnameCoexistenceRule{}, d, nil), "no DNAME in chain")
})
t.Run("ok when DNAME has no siblings", func(t *testing.T) {
d := apexKnownData()
d.DNAMESubstitutions = []ChainHop{{Owner: "old.example.com.", Kind: KindDNAME, Target: "new.example.com."}}
assertSingle(t, run(dnameCoexistenceRule{}, d, nil), sdk.StatusOK)
})
t.Run("crit when DNAME has siblings", func(t *testing.T) {
d := apexKnownData()
d.DNAMESubstitutions = []ChainHop{{Owner: "old.example.com.", Kind: KindDNAME, Target: "new.example.com."}}
d.DNAMECoexistence = map[string][]CoexistingRRset{
"old.example.com.": {{Type: "MX"}, {Type: "A"}},
}
states := run(dnameCoexistenceRule{}, d, nil)
if len(states) != 2 {
t.Fatalf("want 2 states, got %d: %+v", len(states), states)
}
for _, s := range states {
if s.Status != sdk.StatusCrit {
t.Fatalf("want CRIT, got %v", s.Status)
}
}
})
t.Run("skip when apex unknown", func(t *testing.T) {
d := &AliasData{Owner: "x.", ApexLookupError: "boom"}
assertSkipped(t, run(dnameCoexistenceRule{}, d, nil), "apex")
})
}
func TestCnameDnssecRule(t *testing.T) {
t.Run("skip unsigned zone", func(t *testing.T) {
d := apexKnownData()
d.OwnerHasCNAME = true
assertSkipped(t, run(cnameDnssecRule{}, d, nil), "zone not DNSSEC")
})
t.Run("ok when signed", func(t *testing.T) {
d := apexKnownData()
d.ZoneSigned = true
d.OwnerHasCNAME = true
d.CNAMESigCheckDone = true
d.CNAMESigned = true
assertSingle(t, run(cnameDnssecRule{}, d, nil), sdk.StatusOK)
})
t.Run("crit when unsigned cname", func(t *testing.T) {
d := apexKnownData()
d.ZoneSigned = true
d.OwnerHasCNAME = true
d.CNAMESigCheckDone = true
assertSingle(t, run(cnameDnssecRule{}, d, nil), sdk.StatusCrit)
})
}
func TestTargetResolvableRule(t *testing.T) {
t.Run("ok when NOERROR with A record", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
d.FinalTarget = "target."
d.FinalA = []string{"1.2.3.4"}
assertSingle(t, run(targetResolvableRule{}, d, nil), sdk.StatusOK)
})
t.Run("ok when NOERROR with no A/AAAA (e.g. service label)", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
d.FinalTarget = "_2772._tcp.znc.example."
assertSingle(t, run(targetResolvableRule{}, d, nil), sdk.StatusOK)
})
t.Run("crit when NXDOMAIN", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermOK
d.FinalTarget = "target."
d.FinalRcode = "NXDOMAIN"
assertSingle(t, run(targetResolvableRule{}, d, nil), sdk.StatusCrit)
})
t.Run("skip when chain did not terminate normally", func(t *testing.T) {
d := apexKnownData()
d.ChainTerminated.Reason = TermLoop
assertSkipped(t, run(targetResolvableRule{}, d, nil), "chain did not terminate normally")
})
}
func TestMultipleRecordsRule(t *testing.T) {
t.Run("ok", func(t *testing.T) {
d := apexKnownData()
d.Chain = []ChainHop{{Owner: "a.", Kind: KindCNAME, Target: "b."}}
assertSingle(t, run(multipleRecordsRule{}, d, nil), sdk.StatusOK)
})
t.Run("duplicate owner", func(t *testing.T) {
d := apexKnownData()
d.Chain = []ChainHop{
{Owner: "dup.", Kind: KindCNAME, Target: "b."},
{Owner: "dup.", Kind: KindCNAME, Target: "c."},
}
states := run(multipleRecordsRule{}, d, nil)
if len(states) != 1 || states[0].Status != sdk.StatusCrit {
t.Fatalf("want 1 CRIT, got %+v", states)
}
})
}
// Sanity: every rule registered in the Definition returns at least one state
// even when asked to judge a blank AliasData (apex lookup failed). This guards
// against a rule slipping through with an empty-slice return path that would
// be replaced by the SDK with StatusUnknown.
func TestAllRulesAlwaysReturnAtLeastOneState(t *testing.T) {
blank := &AliasData{ApexLookupError: "no apex"}
for _, r := range Definition().Rules {
got := run(r, blank, nil)
if len(got) == 0 {
t.Fatalf("rule %s returned no states", r.Name())
}
}
}