Skip to content

Commit 5113b2e

Browse files
Fix compaction with holes (#44)
* Fix compaction with holes This change updates the file selection process to include files regardless whether the sizes fit. This fixes a longer standing issue where smaller files between larger files (e.g. due to tombstone removal) can cause the data lineage to be corrupted. This adds more tests and bumps the go version to 1.23. fixes #36 * add a regression test for tombstones that are never cleaned up --------- Co-authored-by: Thomas Jungblut <tjungblu@redhat.com>
1 parent abc28eb commit 5113b2e

10 files changed

+334
-106
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ require (
2020

2121
replace github.com/anishathalye/porcupine v0.1.2 => github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569
2222

23-
go 1.22
23+
go 1.23

simpledb/compaction.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
5858
compactionAction := db.sstableManager.candidateTablesForCompaction(db.compactedMaxSizeBytes)
5959
paths := compactionAction.pathsToCompact
6060
numRecords := compactionAction.totalRecords
61-
if len(paths) <= db.compactionThreshold {
61+
if len(paths) <= db.compactionFileThreshold {
6262
return nil, nil
6363
}
6464

@@ -116,11 +116,8 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
116116
}
117117
}()
118118

119-
// we need to keep it if the sstable is not the first with this key.
120-
// SS1(KEY1=toto) SS2(KEY2=deleted) in this case the KEY2 can be removed in SS2
121-
// SS1(KEY2=toto) SS2(KEY2=deleted) in this case the KEY2 can't be removed in SS2
122-
123-
err = sstables.NewSSTableMerger(db.cmp).MergeCompact(iterators, writer, sstables.ScanReduceLatestWins)
119+
reduceFunc := sstables.ScanReduceLatestWinsSkipTombstones
120+
err = sstables.NewSSTableMerger(db.cmp).MergeCompact(iterators, writer, reduceFunc)
124121
if err != nil {
125122
return nil, err
126123
}

simpledb/compaction_test.go

+117-38
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestExecCompactionLessFilesThanExpected(t *testing.T) {
1717
defer cleanDatabaseFolder(t, db)
1818
defer closeDatabase(t, db)
1919

20-
db.compactionThreshold = 1
20+
db.compactionFileThreshold = 1
2121
db.sstableManager.addReader(&MockSSTableReader{
2222
metadata: &proto.MetaData{NumRecords: 10, TotalBytes: 100},
2323
path: "1",
@@ -34,7 +34,7 @@ func TestExecCompactionSameContent(t *testing.T) {
3434
// we'll close the database to mock some internals directly, yes it's very hacky
3535
closeDatabase(t, db)
3636
db.closed = false
37-
db.compactionThreshold = 1
37+
db.compactionFileThreshold = 1
3838

3939
writeSSTableInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
4040
writeSSTableInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
@@ -53,41 +53,12 @@ func TestExecCompactionSameContent(t *testing.T) {
5353
assert.Nil(t, db.sstableManager.currentReader.Close())
5454
}
5555

56-
func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
57-
fakeTablePath := filepath.Join(db.basePath, p)
58-
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
59-
mStore := memstore.NewMemStore()
60-
for i := 0; i < 1000; i++ {
61-
assert.Nil(t, mStore.Add([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))))
62-
}
63-
assert.Nil(t, mStore.Flush(
64-
sstables.WriteBasePath(fakeTablePath),
65-
sstables.WithKeyComparator(db.cmp),
66-
))
67-
}
68-
69-
func writeSSTableWithTombstoneInDatabaseFolder(t *testing.T, db *DB, p string) {
70-
fakeTablePath := filepath.Join(db.basePath, p)
71-
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
72-
mStore := memstore.NewMemStore()
73-
74-
// delete all key between 500 and 800
75-
for i := 500; i < 800; i++ {
76-
assert.Nil(t, mStore.Tombstone([]byte(fmt.Sprintf("%d", i))))
77-
}
78-
assert.Nil(t, mStore.FlushWithTombstones(
79-
sstables.WriteBasePath(fakeTablePath),
80-
sstables.WithKeyComparator(db.cmp),
81-
))
82-
}
83-
8456
func TestExecCompactionWithTombstone(t *testing.T) {
8557
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
8658
defer cleanDatabaseFolder(t, db)
87-
// we'll close the database to mock some internals directly, yes it's very hacky
8859
closeDatabase(t, db)
8960
db.closed = false
90-
db.compactionThreshold = 0
61+
db.compactionFileThreshold = 0
9162

9263
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
9364
// only one SStable with holes should shrink
@@ -100,7 +71,7 @@ func TestExecCompactionWithTombstone(t *testing.T) {
10071
assert.Nil(t, err)
10172
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
10273
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
103-
fmt.Print(compactionMeta)
74+
10475
err = db.sstableManager.reflectCompactionResult(compactionMeta)
10576
assert.NoError(t, err)
10677
v, err := db.Get("512")
@@ -112,16 +83,15 @@ func TestExecCompactionWithTombstone(t *testing.T) {
11283
// check size of compacted sstable
11384
assert.Equal(t, 700, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
11485
}
115-
func TestExecCompactionWithTombstoneRewriten(t *testing.T) {
86+
func TestExecCompactionWithTombstoneRewritten(t *testing.T) {
11687
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
11788
defer cleanDatabaseFolder(t, db)
118-
// we'll close the database to mock some internals directly, yes it's very hacky
11989
closeDatabase(t, db)
12090
db.closed = false
121-
db.compactionThreshold = 0
91+
db.compactionFileThreshold = 0
12292

12393
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
124-
// the tombstone are overwrite
94+
// the tombstone will be overwritten
12595
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
12696
assert.Nil(t, db.reconstructSSTables())
12797
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))
@@ -130,7 +100,38 @@ func TestExecCompactionWithTombstoneRewriten(t *testing.T) {
130100
assert.Nil(t, err)
131101
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
132102
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
133-
fmt.Print(compactionMeta)
103+
104+
err = db.sstableManager.reflectCompactionResult(compactionMeta)
105+
assert.NoError(t, err)
106+
v, err := db.Get("512")
107+
assert.NoError(t, err)
108+
assert.Equal(t, "512", v)
109+
// for cleanups
110+
assert.Nil(t, db.sstableManager.currentReader.Close())
111+
112+
// check size of compacted sstable
113+
assert.Equal(t, 1000, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
114+
}
115+
116+
// regression reported in https://github.com/thomasjungblut/go-sstables/issues/36
117+
func TestExecCompactionWithTombstonesEmptyTable(t *testing.T) {
118+
db := newOpenedSimpleDB(t, "simpledb_compactionEmptyTable")
119+
defer cleanDatabaseFolder(t, db)
120+
closeDatabase(t, db)
121+
db.closed = false
122+
db.compactionFileThreshold = 0
123+
124+
writeEmptySSTableInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 41))
125+
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
126+
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
127+
assert.Nil(t, db.reconstructSSTables())
128+
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))
129+
130+
compactionMeta, err := executeCompaction(db)
131+
assert.Nil(t, err)
132+
assert.Equal(t, "sstable_000000000000041", compactionMeta.ReplacementPath)
133+
assert.Equal(t, []string{"sstable_000000000000041", "sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
134+
134135
err = db.sstableManager.reflectCompactionResult(compactionMeta)
135136
assert.NoError(t, err)
136137
v, err := db.Get("512")
@@ -142,3 +143,81 @@ func TestExecCompactionWithTombstoneRewriten(t *testing.T) {
142143
// check size of compacted sstable
143144
assert.Equal(t, 1000, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
144145
}
146+
147+
// regression reported in https://github.com/thomasjungblut/go-sstables/issues/36
148+
// more specifically this relates to a case where the first table is above the compaction threshold and never compacted.
149+
// The keyspace might already be partially rewritten, thus wasting unnecessary space.
150+
// This test is supposed to fail for later changes, thus the assertion is inverted
151+
func TestCompactionWithTombstonesBeyondMaxSize(t *testing.T) {
152+
db := newOpenedSimpleDB(t, "simpledb_compactionTombstoneBeyondMaxSize")
153+
defer cleanDatabaseFolder(t, db)
154+
closeDatabase(t, db)
155+
db.closed = false
156+
db.compactionFileThreshold = 0
157+
// the tombstone file is about 6k, the 10 written entries are 300k
158+
db.compactedMaxSizeBytes = 1024
159+
160+
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
161+
writeSSTableWithDataInDatabaseFolderRange(t, db, fmt.Sprintf(SSTablePattern, 43), 510, 520)
162+
assert.Nil(t, db.reconstructSSTables())
163+
assert.Equal(t, 310, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))
164+
165+
compactionMeta, err := executeCompaction(db)
166+
assert.Nil(t, err)
167+
// TODO(thomas): this should also compact the 42 table, as it wastes a ton of space in tombstones
168+
assert.Equal(t, "sstable_000000000000043", compactionMeta.ReplacementPath)
169+
assert.Equal(t, []string{"sstable_000000000000043"}, compactionMeta.SstablePaths)
170+
171+
err = db.sstableManager.reflectCompactionResult(compactionMeta)
172+
assert.NoError(t, err)
173+
v, err := db.Get("512")
174+
assert.NoError(t, err)
175+
assert.Equal(t, "512", v)
176+
// for cleanups
177+
assert.Nil(t, db.sstableManager.currentReader.Close())
178+
// TODO(thomas): ideally that table should only be 10
179+
assert.Equal(t, 310, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
180+
}
181+
182+
func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
183+
writeSSTableWithDataInDatabaseFolderRange(t, db, p, 0, 1000)
184+
}
185+
186+
func writeSSTableWithDataInDatabaseFolderRange(t *testing.T, db *DB, p string, start, end int) {
187+
fakeTablePath := filepath.Join(db.basePath, p)
188+
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
189+
mStore := memstore.NewMemStore()
190+
for i := start; i < end; i++ {
191+
assert.Nil(t, mStore.Add([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))))
192+
}
193+
assert.Nil(t, mStore.Flush(
194+
sstables.WriteBasePath(fakeTablePath),
195+
sstables.WithKeyComparator(db.cmp),
196+
))
197+
}
198+
199+
func writeSSTableWithTombstoneInDatabaseFolder(t *testing.T, db *DB, p string) {
200+
fakeTablePath := filepath.Join(db.basePath, p)
201+
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
202+
mStore := memstore.NewMemStore()
203+
204+
// delete all key between 500 and 800
205+
for i := 500; i < 800; i++ {
206+
assert.Nil(t, mStore.Tombstone([]byte(fmt.Sprintf("%d", i))))
207+
}
208+
assert.Nil(t, mStore.FlushWithTombstones(
209+
sstables.WriteBasePath(fakeTablePath),
210+
sstables.WithKeyComparator(db.cmp),
211+
))
212+
}
213+
214+
func writeEmptySSTableInDatabaseFolder(t *testing.T, db *DB, p string) {
215+
fakeTablePath := filepath.Join(db.basePath, p)
216+
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
217+
mStore := memstore.NewMemStore()
218+
219+
assert.Nil(t, mStore.FlushWithTombstones(
220+
sstables.WriteBasePath(fakeTablePath),
221+
sstables.WithKeyComparator(db.cmp),
222+
))
223+
}

simpledb/db.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ type DatabaseI interface {
5353
}
5454

5555
type compactionAction struct {
56-
pathsToCompact []string
57-
totalRecords uint64
58-
canRemoveTombstone bool // if the compaction don't start from the first sstable we cannot remove tombstone
56+
pathsToCompact []string
57+
totalRecords uint64
5958
}
6059

6160
type memStoreFlushAction struct {
@@ -68,18 +67,18 @@ type DB struct {
6867
// read more here: https://pkg.go.dev/sync/atomic#pkg-note-BUG
6968
currentGeneration uint64
7069

71-
cmp skiplist.Comparator[[]byte]
72-
basePath string
73-
currentSSTablePath string
74-
memstoreMaxSize uint64
75-
compactionThreshold int
76-
compactionInterval time.Duration
77-
compactedMaxSizeBytes uint64
78-
enableCompactions bool
79-
enableAsyncWAL bool
80-
enableDirectIOWAL bool
81-
open bool
82-
closed bool
70+
cmp skiplist.Comparator[[]byte]
71+
basePath string
72+
currentSSTablePath string
73+
memstoreMaxSize uint64
74+
compactionFileThreshold int
75+
compactionInterval time.Duration
76+
compactedMaxSizeBytes uint64
77+
enableCompactions bool
78+
enableAsyncWAL bool
79+
enableDirectIOWAL bool
80+
open bool
81+
closed bool
8382

8483
rwLock *sync.RWMutex
8584
wal wal.WriteAheadLogI
@@ -356,7 +355,7 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
356355
basePath: basePath,
357356
currentSSTablePath: "",
358357
memstoreMaxSize: extraOpts.memstoreSizeBytes,
359-
compactionThreshold: extraOpts.compactionFileThreshold,
358+
compactionFileThreshold: extraOpts.compactionFileThreshold,
360359
compactedMaxSizeBytes: extraOpts.compactionMaxSizeBytes,
361360
enableCompactions: extraOpts.enableCompactions,
362361
enableAsyncWAL: extraOpts.enableAsyncWAL,
@@ -438,7 +437,8 @@ func CompactionFileThreshold(n int) ExtraOption {
438437
}
439438

440439
// CompactionMaxSizeBytes tells whether an SSTable is considered for compaction.
441-
// SSTables over the given threshold will not be compacted any further. Default is 5GB in DefaultCompactionMaxSizeBytes
440+
// This is a best-effort implementation, depending on the write/delete pattern you may need to compact bigger tables.
441+
// Default is 5GB in DefaultCompactionMaxSizeBytes
442442
func CompactionMaxSizeBytes(n uint64) ExtraOption {
443443
return func(args *ExtraOptions) {
444444
args.compactionMaxSizeBytes = n

simpledb/db_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestCompactionsFileThreshold(t *testing.T) {
187187
defer cleanDatabaseFolder(t, db)
188188
defer closeDatabase(t, db)
189189

190-
assert.Equal(t, 1337, db.compactionThreshold)
190+
assert.Equal(t, 1337, db.compactionFileThreshold)
191191
}
192192

193193
func assertGet(t *testing.T, db *DB, key string) {

0 commit comments

Comments
 (0)