checker-ssh/checker/rules_algorithms.go

174 lines
6.4 KiB
Go

// This file is part of the happyDomain (R) project.
// Copyright (c) 2020-2026 happyDomain
// Authors: Pierre-Olivier Mercier, et al.
//
// This program is offered under a commercial and under the AGPL license.
// For commercial licensing, contact us at <contact@happydomain.org>.
//
// For AGPL licensing:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package checker
import (
"context"
sdk "git.happydns.org/checker-sdk-go/checker"
)
// algorithmFamilyRule is the shared implementation for the four
// algorithm posture rules (KEX, host key, cipher, MAC). Each one
// inspects a different field on SSHProbe and uses a different catalog.
type algorithmFamilyRule struct {
ruleName string
description string
passCode string
passMsg string
family string
extract func(p *SSHProbe) []string
table map[string]algoVerdict
}
func (r *algorithmFamilyRule) Name() string { return r.ruleName }
func (r *algorithmFamilyRule) Description() string { return r.description }
func (r *algorithmFamilyRule) Evaluate(ctx context.Context, obs sdk.ObservationGetter, _ sdk.CheckerOptions) []sdk.CheckState {
data, errSt := loadSSHData(ctx, obs)
if errSt != nil {
return []sdk.CheckState{*errSt}
}
eps := reachableEndpoints(data.Endpoints)
if len(eps) == 0 {
return []sdk.CheckState{notTestedState(r.ruleName+".skipped", "No endpoint produced an algorithm listing.")}
}
var issues []Issue
for _, ep := range eps {
issues = append(issues, analyseWeakAlgos(ep.Address, r.family, r.extract(&ep), r.table)...)
}
if len(issues) == 0 {
return []sdk.CheckState{passState(r.passCode, r.passMsg)}
}
return statesFromIssues(issues)
}
type kexAlgorithmsRule struct{ algorithmFamilyRule }
func newKexAlgorithmsRule() *kexAlgorithmsRule {
return &kexAlgorithmsRule{algorithmFamilyRule{
ruleName: "ssh.kex_algorithms",
description: "Flags key-exchange algorithms advertised by the server that are weak or broken.",
passCode: "ssh.kex_algorithms.ok",
passMsg: "Every advertised KEX algorithm is modern.",
family: "kex",
extract: func(p *SSHProbe) []string { return p.KEX },
table: kexAlgos,
}}
}
type hostKeyAlgorithmsRule struct{ algorithmFamilyRule }
func newHostKeyAlgorithmsRule() *hostKeyAlgorithmsRule {
return &hostKeyAlgorithmsRule{algorithmFamilyRule{
ruleName: "ssh.host_key_algorithms",
description: "Flags server host-key algorithms that are weak or deprecated (ssh-rsa/SHA-1, ssh-dss, …).",
passCode: "ssh.host_key_algorithms.ok",
passMsg: "Every advertised host-key algorithm is modern.",
family: "hostkey_alg",
extract: func(p *SSHProbe) []string { return p.HostKey },
table: hostKeyAlgos,
}}
}
type cipherAlgorithmsRule struct{ algorithmFamilyRule }
func newCipherAlgorithmsRule() *cipherAlgorithmsRule {
return &cipherAlgorithmsRule{algorithmFamilyRule{
ruleName: "ssh.cipher_algorithms",
description: "Flags symmetric ciphers advertised by the server that are weak or broken (CBC, 3DES, RC4, …).",
passCode: "ssh.cipher_algorithms.ok",
passMsg: "Every advertised cipher is modern.",
family: "cipher",
extract: func(p *SSHProbe) []string { return uniqueMerge(p.CiphersC2S, p.CiphersS2C) },
table: cipherAlgos,
}}
}
type macAlgorithmsRule struct{ algorithmFamilyRule }
func newMacAlgorithmsRule() *macAlgorithmsRule {
return &macAlgorithmsRule{algorithmFamilyRule{
ruleName: "ssh.mac_algorithms",
description: "Flags MAC algorithms advertised by the server that are weak (SHA-1, non-ETM, …).",
passCode: "ssh.mac_algorithms.ok",
passMsg: "Every advertised MAC algorithm is modern.",
family: "mac",
extract: func(p *SSHProbe) []string { return uniqueMerge(p.MACsC2S, p.MACsS2C) },
table: macAlgos,
}}
}
// strictKexRule flags the absence of the Terrapin mitigation marker.
type strictKexRule struct{}
func (r *strictKexRule) Name() string { return "ssh.strict_kex" }
func (r *strictKexRule) Description() string {
return "Verifies the server advertises the strict-KEX marker (CVE-2023-48795 Terrapin mitigation)."
}
func (r *strictKexRule) Evaluate(ctx context.Context, obs sdk.ObservationGetter, _ sdk.CheckerOptions) []sdk.CheckState {
data, errSt := loadSSHData(ctx, obs)
if errSt != nil {
return []sdk.CheckState{*errSt}
}
eps := reachableEndpoints(data.Endpoints)
if len(eps) == 0 {
return []sdk.CheckState{notTestedState("ssh.strict_kex.skipped", "No endpoint produced an algorithm listing.")}
}
var issues []Issue
for _, ep := range eps {
issues = append(issues, analyseStrictKex(ep.Address, ep.KEX)...)
}
if len(issues) == 0 {
return []sdk.CheckState{passState("ssh.strict_kex.ok", "Every endpoint advertises the Terrapin mitigation marker.")}
}
return statesFromIssues(issues)
}
// preauthCompressionRule flags servers offering "zlib" (pre-auth)
// compression alongside / instead of zlib@openssh.com (post-auth).
type preauthCompressionRule struct{}
func (r *preauthCompressionRule) Name() string { return "ssh.preauth_compression" }
func (r *preauthCompressionRule) Description() string {
return "Flags servers that offer pre-authentication zlib compression (prefer zlib@openssh.com)."
}
func (r *preauthCompressionRule) Evaluate(ctx context.Context, obs sdk.ObservationGetter, _ sdk.CheckerOptions) []sdk.CheckState {
data, errSt := loadSSHData(ctx, obs)
if errSt != nil {
return []sdk.CheckState{*errSt}
}
eps := reachableEndpoints(data.Endpoints)
if len(eps) == 0 {
return []sdk.CheckState{notTestedState("ssh.preauth_compression.skipped", "No endpoint produced compression data.")}
}
var issues []Issue
for _, ep := range eps {
issues = append(issues, analysePreauthCompression(ep.Address, ep.CompC2S)...)
}
if len(issues) == 0 {
return []sdk.CheckState{passState("ssh.preauth_compression.ok", "No endpoint offers pre-authentication zlib compression.")}
}
return statesFromIssues(issues)
}