Add an auto-cleanup worker
This commit is contained in:
parent
2037a59cdc
commit
8da3878104
5 changed files with 136 additions and 7 deletions
108
internal/app/cleanup.go
Normal file
108
internal/app/cleanup.go
Normal file
|
|
@ -0,0 +1,108 @@
|
||||||
|
// This file is part of the happyDeliver (R) project.
|
||||||
|
// Copyright (c) 2025 happyDomain
|
||||||
|
// Authors: Pierre-Olivier Mercier, et al.
|
||||||
|
//
|
||||||
|
// This program is offered under a commercial and under the AGPL license.
|
||||||
|
// For commercial licensing, contact us at <contact@happydomain.org>.
|
||||||
|
//
|
||||||
|
// For AGPL licensing:
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.happydns.org/happyDeliver/internal/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// How often to run the cleanup check
|
||||||
|
cleanupInterval = 1 * time.Hour
|
||||||
|
)
|
||||||
|
|
||||||
|
// CleanupService handles periodic cleanup of old reports
|
||||||
|
type CleanupService struct {
|
||||||
|
store storage.Storage
|
||||||
|
retention time.Duration
|
||||||
|
ticker *time.Ticker
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCleanupService creates a new cleanup service
|
||||||
|
func NewCleanupService(store storage.Storage, retention time.Duration) *CleanupService {
|
||||||
|
return &CleanupService{
|
||||||
|
store: store,
|
||||||
|
retention: retention,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the cleanup service in a background goroutine
|
||||||
|
func (s *CleanupService) Start(ctx context.Context) {
|
||||||
|
if s.retention <= 0 {
|
||||||
|
log.Println("Report retention is disabled (keeping reports forever)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Starting cleanup service: will delete reports older than %s", s.retention)
|
||||||
|
|
||||||
|
// Run cleanup immediately on startup
|
||||||
|
s.runCleanup()
|
||||||
|
|
||||||
|
// Then run periodically
|
||||||
|
s.ticker = time.NewTicker(cleanupInterval)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ticker.C:
|
||||||
|
s.runCleanup()
|
||||||
|
case <-ctx.Done():
|
||||||
|
s.Stop()
|
||||||
|
return
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the cleanup service
|
||||||
|
func (s *CleanupService) Stop() {
|
||||||
|
if s.ticker != nil {
|
||||||
|
s.ticker.Stop()
|
||||||
|
}
|
||||||
|
close(s.done)
|
||||||
|
}
|
||||||
|
|
||||||
|
// runCleanup performs the actual cleanup operation
|
||||||
|
func (s *CleanupService) runCleanup() {
|
||||||
|
cutoffTime := time.Now().Add(-s.retention)
|
||||||
|
log.Printf("Running cleanup: deleting reports older than %s", cutoffTime.Format(time.RFC3339))
|
||||||
|
|
||||||
|
deleted, err := s.store.DeleteOldReports(cutoffTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error during cleanup: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleted > 0 {
|
||||||
|
log.Printf("Cleanup completed: deleted %d old report(s)", deleted)
|
||||||
|
} else {
|
||||||
|
log.Printf("Cleanup completed: no old reports to delete")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -22,6 +22,7 @@
|
||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
|
@ -49,6 +50,12 @@ func RunServer(cfg *config.Config) error {
|
||||||
|
|
||||||
log.Printf("Connected to %s database", cfg.Database.Type)
|
log.Printf("Connected to %s database", cfg.Database.Type)
|
||||||
|
|
||||||
|
// Start cleanup service for old reports
|
||||||
|
ctx := context.Background()
|
||||||
|
cleanupSvc := NewCleanupService(store, cfg.ReportRetention)
|
||||||
|
cleanupSvc.Start(ctx)
|
||||||
|
defer cleanupSvc.Stop()
|
||||||
|
|
||||||
// Start LMTP server in background
|
// Start LMTP server in background
|
||||||
go func() {
|
go func() {
|
||||||
if err := lmtp.StartServer(cfg.Email.LMTPAddr, store, cfg); err != nil {
|
if err := lmtp.StartServer(cfg.Email.LMTPAddr, store, cfg); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ func declareFlags(o *Config) {
|
||||||
flag.DurationVar(&o.Analysis.DNSTimeout, "dns-timeout", o.Analysis.DNSTimeout, "Timeout when performing DNS query")
|
flag.DurationVar(&o.Analysis.DNSTimeout, "dns-timeout", o.Analysis.DNSTimeout, "Timeout when performing DNS query")
|
||||||
flag.DurationVar(&o.Analysis.HTTPTimeout, "http-timeout", o.Analysis.HTTPTimeout, "Timeout when performing HTTP query")
|
flag.DurationVar(&o.Analysis.HTTPTimeout, "http-timeout", o.Analysis.HTTPTimeout, "Timeout when performing HTTP query")
|
||||||
flag.Var(&StringArray{&o.Analysis.RBLs}, "rbl", "Append a RBL (use this option multiple time to append multiple RBLs)")
|
flag.Var(&StringArray{&o.Analysis.RBLs}, "rbl", "Append a RBL (use this option multiple time to append multiple RBLs)")
|
||||||
|
flag.DurationVar(&o.ReportRetention, "report-retention", o.ReportRetention, "How long to keep reports (e.g., 720h, 30d). 0 = keep forever")
|
||||||
|
|
||||||
// Others flags are declared in some other files likes sources, storages, ... when they need specials configurations
|
// Others flags are declared in some other files likes sources, storages, ... when they need specials configurations
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,11 +35,12 @@ import (
|
||||||
|
|
||||||
// Config represents the application configuration
|
// Config represents the application configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
DevProxy string
|
DevProxy string
|
||||||
Bind string
|
Bind string
|
||||||
Database DatabaseConfig
|
Database DatabaseConfig
|
||||||
Email EmailConfig
|
Email EmailConfig
|
||||||
Analysis AnalysisConfig
|
Analysis AnalysisConfig
|
||||||
|
ReportRetention time.Duration // How long to keep reports. 0 = keep forever
|
||||||
}
|
}
|
||||||
|
|
||||||
// DatabaseConfig contains database connection settings
|
// DatabaseConfig contains database connection settings
|
||||||
|
|
@ -65,8 +66,9 @@ type AnalysisConfig struct {
|
||||||
// DefaultConfig returns a configuration with sensible defaults
|
// DefaultConfig returns a configuration with sensible defaults
|
||||||
func DefaultConfig() *Config {
|
func DefaultConfig() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
DevProxy: "",
|
DevProxy: "",
|
||||||
Bind: ":8081",
|
Bind: ":8081",
|
||||||
|
ReportRetention: 0, // Keep reports forever by default
|
||||||
Database: DatabaseConfig{
|
Database: DatabaseConfig{
|
||||||
Type: "sqlite",
|
Type: "sqlite",
|
||||||
DSN: "happydeliver.db",
|
DSN: "happydeliver.db",
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"gorm.io/driver/postgres"
|
"gorm.io/driver/postgres"
|
||||||
|
|
@ -42,6 +43,7 @@ type Storage interface {
|
||||||
CreateReport(testID uuid.UUID, rawEmail []byte, reportJSON []byte) (*Report, error)
|
CreateReport(testID uuid.UUID, rawEmail []byte, reportJSON []byte) (*Report, error)
|
||||||
GetReport(testID uuid.UUID) (reportJSON []byte, rawEmail []byte, err error)
|
GetReport(testID uuid.UUID) (reportJSON []byte, rawEmail []byte, err error)
|
||||||
ReportExists(testID uuid.UUID) (bool, error)
|
ReportExists(testID uuid.UUID) (bool, error)
|
||||||
|
DeleteOldReports(olderThan time.Time) (int64, error)
|
||||||
|
|
||||||
// Close closes the database connection
|
// Close closes the database connection
|
||||||
Close() error
|
Close() error
|
||||||
|
|
@ -115,6 +117,15 @@ func (s *DBStorage) GetReport(testID uuid.UUID) ([]byte, []byte, error) {
|
||||||
return dbReport.ReportJSON, dbReport.RawEmail, nil
|
return dbReport.ReportJSON, dbReport.RawEmail, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteOldReports deletes reports older than the specified time
|
||||||
|
func (s *DBStorage) DeleteOldReports(olderThan time.Time) (int64, error) {
|
||||||
|
result := s.db.Where("created_at < ?", olderThan).Delete(&Report{})
|
||||||
|
if result.Error != nil {
|
||||||
|
return 0, fmt.Errorf("failed to delete old reports: %w", result.Error)
|
||||||
|
}
|
||||||
|
return result.RowsAffected, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes the database connection
|
// Close closes the database connection
|
||||||
func (s *DBStorage) Close() error {
|
func (s *DBStorage) Close() error {
|
||||||
sqlDB, err := s.db.DB()
|
sqlDB, err := s.db.DB()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue