leveldb: compact the keyspace periodically to reclaim tombstones
Deletes from the retention janitor and tidy passes leave tombstones that LevelDB only compacts opportunistically, slowing prefix scans for hours. A background worker now runs a full-keyspace CompactRange on a configurable interval (-leveldb-compaction-interval, default 24h, 0 disables). Its lifecycle is bound to the store: Close stops the worker before closing the DB so no compaction races the close.
This commit is contained in:
parent
ae5e9f7ed3
commit
337173ec63
3 changed files with 151 additions and 2 deletions
103
internal/storage/leveldb/compaction_test.go
Normal file
103
internal/storage/leveldb/compaction_test.go
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
// This file is part of the happyDomain (R) project.
|
||||
// Copyright (c) 2020-2026 happyDomain
|
||||
// Authors: Pierre-Olivier Mercier, et al.
|
||||
//
|
||||
// This program is offered under a commercial and under the AGPL license.
|
||||
// For commercial licensing, contact us at <contact@happydomain.org>.
|
||||
//
|
||||
// For AGPL licensing:
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
goerrors "errors"
|
||||
|
||||
"git.happydns.org/happyDomain/model"
|
||||
)
|
||||
|
||||
func openTemp(t *testing.T) *LevelDBStorage {
|
||||
t.Helper()
|
||||
s, err := NewLevelDBStorage(t.TempDir(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("NewLevelDBStorage: %v", err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func TestCompactReclaimsDeletedKeys(t *testing.T) {
|
||||
s := openTemp(t)
|
||||
defer s.Close()
|
||||
|
||||
// Write then delete a batch of keys so compaction has tombstones to clear.
|
||||
for i := range 100 {
|
||||
if err := s.Put(fmt.Sprintf("k-%03d", i), i); err != nil {
|
||||
t.Fatalf("Put: %v", err)
|
||||
}
|
||||
}
|
||||
for i := range 100 {
|
||||
if err := s.Delete(fmt.Sprintf("k-%03d", i)); err != nil {
|
||||
t.Fatalf("Delete: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.Compact(); err != nil {
|
||||
t.Fatalf("Compact: %v", err)
|
||||
}
|
||||
|
||||
// The deleted keys must remain logically gone after compaction.
|
||||
var v int
|
||||
if err := s.Get("k-000", &v); !goerrors.Is(err, happydns.ErrNotFound) {
|
||||
t.Errorf("Get after compaction: got err=%v, want ErrNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompactionWorkerStopsOnClose(t *testing.T) {
|
||||
s := openTemp(t)
|
||||
s.StartCompactionWorker(20 * time.Millisecond)
|
||||
|
||||
// Let the worker tick at least once.
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
|
||||
// Close must stop the worker and return without hanging.
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- s.Close() }()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
t.Fatalf("Close: %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Close hung: compaction worker did not stop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompactionWorkerDisabled(t *testing.T) {
|
||||
s := openTemp(t)
|
||||
|
||||
// interval <= 0 must start no goroutine and leave Close working.
|
||||
s.StartCompactionWorker(0)
|
||||
if s.compactionStop != nil || s.compactionDone != nil {
|
||||
t.Fatal("StartCompactionWorker(0) should not create worker channels")
|
||||
}
|
||||
|
||||
if err := s.Close(); err != nil {
|
||||
t.Fatalf("Close: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -23,6 +23,7 @@ package database
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"time"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
|
|
@ -38,6 +39,7 @@ var (
|
|||
openFilesCacheCapacity int
|
||||
bloomFilterBits int
|
||||
compactionTableSizeMiB int
|
||||
compactionInterval time.Duration
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -49,6 +51,7 @@ func init() {
|
|||
flag.IntVar(&openFilesCacheCapacity, "leveldb-open-files-cache", 4096, "LevelDB open files cache capacity (goleveldb default: 500)")
|
||||
flag.IntVar(&bloomFilterBits, "leveldb-bloom-filter-bits", 10, "Bits per key for the LevelDB Bloom filter; 0 disables it (recommended: 10)")
|
||||
flag.IntVar(&compactionTableSizeMiB, "leveldb-compaction-table-size", 8, "LevelDB compaction table size, in MiB (goleveldb default: 2)")
|
||||
flag.DurationVar(&compactionInterval, "leveldb-compaction-interval", 24*time.Hour, "How often to compact the whole LevelDB keyspace to reclaim space from deleted keys; 0 disables")
|
||||
}
|
||||
|
||||
// options builds the goleveldb tuning options from the configured flags.
|
||||
|
|
@ -73,5 +76,7 @@ func Instantiate() (storage.Storage, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
db.StartCompactionWorker(compactionInterval)
|
||||
|
||||
return kv.NewKVDatabase(db)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import (
|
|||
goerrors "errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
|
|
@ -37,7 +38,9 @@ import (
|
|||
)
|
||||
|
||||
type LevelDBStorage struct {
|
||||
db *leveldb.DB
|
||||
db *leveldb.DB
|
||||
compactionStop chan struct{} // closed to ask the worker to stop
|
||||
compactionDone chan struct{} // closed by the worker once it has exited
|
||||
}
|
||||
|
||||
// NewLevelDBStorage establishes the connection to the database. A nil opts
|
||||
|
|
@ -59,11 +62,49 @@ func NewLevelDBStorage(path string, opts *opt.Options) (s *LevelDBStorage, err e
|
|||
}
|
||||
}
|
||||
|
||||
s = &LevelDBStorage{db}
|
||||
s = &LevelDBStorage{db: db}
|
||||
return
|
||||
}
|
||||
|
||||
// Compact runs a full-keyspace LevelDB compaction, reclaiming space from
|
||||
// tombstones left by deletes (retention janitor, tidy).
|
||||
func (s *LevelDBStorage) Compact() error {
|
||||
return s.db.CompactRange(util.Range{})
|
||||
}
|
||||
|
||||
// StartCompactionWorker launches a goroutine that calls Compact every
|
||||
// interval. interval <= 0 is a no-op. The worker is stopped by Close.
|
||||
func (s *LevelDBStorage) StartCompactionWorker(interval time.Duration) {
|
||||
if interval <= 0 {
|
||||
return
|
||||
}
|
||||
s.compactionStop = make(chan struct{})
|
||||
s.compactionDone = make(chan struct{})
|
||||
go func() {
|
||||
defer close(s.compactionDone)
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.compactionStop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
start := time.Now()
|
||||
if err := s.Compact(); err != nil {
|
||||
log.Printf("LevelDB: compaction failed: %v", err)
|
||||
} else {
|
||||
log.Printf("LevelDB: compaction done in %s", time.Since(start))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *LevelDBStorage) Close() error {
|
||||
if s.compactionStop != nil {
|
||||
close(s.compactionStop)
|
||||
<-s.compactionDone
|
||||
}
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue