Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache rebuild #124

Merged
merged 11 commits into from
May 27, 2019
7 changes: 7 additions & 0 deletions balloon/balloon.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ type Balloon struct {
hasher hashing.Hasher
}

func min(x, y uint16) uint16 {
if x < y {
return x
}
return y
}

func NewBalloon(store storage.Store, hasherF func() hashing.Hasher) (*Balloon, error) {

// create trees
Expand Down
21 changes: 14 additions & 7 deletions balloon/hyper/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
)

type pruningContext struct {
Hasher hashing.Hasher
Cache cache.ModifiableCache
DefaultHashes []hashing.Digest
Mutations []*storage.Mutation
AuditPath AuditPath
Value []byte
Hasher hashing.Hasher
Cache cache.ModifiableCache
RecoveryHeight uint16
DefaultHashes []hashing.Digest
Mutations []*storage.Mutation
AuditPath AuditPath
Value []byte
}

type operationCode int
Expand Down Expand Up @@ -129,8 +130,14 @@ func putInCache(pos position, batch *batchNode) *operation {
Code: putInCacheCode,
Pos: pos,
Interpret: func(ops *operationsStack, c *pruningContext) hashing.Digest {

hash := ops.Pop().Interpret(ops, c)
c.Cache.Put(pos.Bytes(), batch.Serialize())
key := pos.Bytes()
val := batch.Serialize()
c.Cache.Put(key, val)
if pos.Height == c.RecoveryHeight {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to include this in the pruner as a new mutateOp

c.Mutations = append(c.Mutations, storage.NewMutation(storage.HyperCacheTable, key, val))
}
return hash
},
}
Expand Down
67 changes: 40 additions & 27 deletions balloon/hyper/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,65 @@ package hyper

import (
"bytes"
"sort"
)

func pruneToRebuild(index, serializedBatch []byte, cacheHeightLimit uint16, batches batchLoader) *operationsStack {
func pruneToRebuild(indexes [][]byte, cacheHeightLimit uint16, batches batchLoader) *operationsStack {

persistedBatch := parseBatchNode(len(index), serializedBatch)
var traverse, traverseThroughCache func(pos position, indexes [][]byte, batch *batchNode, iBatch int8, ops *operationsStack)

var traverse, discardBranch func(pos position, batch *batchNode, iBatch int8, ops *operationsStack)

discardBranch = func(pos position, batch *batchNode, iBatch int8, ops *operationsStack) {

if batch.HasElementAt(iBatch) {
ops.Push(getProvidedHash(pos, iBatch, batch))
} else {
ops.Push(getDefaultHash(pos))
}
split := func(l [][]byte, index []byte) (left, right [][]byte) {
// the smallest index i where l[i] >= index
splitIndex := sort.Search(len(l), func(i int) bool {
return bytes.Compare(l[i], index) >= 0
})
return l[:splitIndex], l[splitIndex:]
}

traverse = func(pos position, batch *batchNode, iBatch int8, ops *operationsStack) {
traverse = func(pos position, indexes [][]byte, batch *batchNode, iBatch int8, ops *operationsStack) {
if batch == nil {
batch = batches.Load(pos)
}

// we don't need to check the length of the leaves because we
// always have to descend to the cache height limit
if pos.Height == cacheHeightLimit {
ops.PushAll(useHash(pos, persistedBatch.GetElementAt(0)), updateBatchNode(pos, iBatch, batch))
if batch.HasElementAt(iBatch) {
ops.Push(getProvidedHash(pos, iBatch, batch))
} else {
ops.Push(getDefaultHash(pos))
}

return
}

if batch == nil {
batch = batches.Load(pos)
traverseThroughCache(pos, indexes, batch, iBatch, ops)
}

traverseThroughCache = func(pos position, indexes [][]byte, batch *batchNode, iBatch int8, ops *operationsStack) {

if len(indexes) == 0 { // discarded branch
if batch.HasElementAt(iBatch) {
ops.Push(getProvidedHash(pos, iBatch, batch))
} else {
ops.Push(getDefaultHash(pos))
}
return
}

// at the end of a batch tree
if iBatch > 0 && pos.Height%4 == 0 {
traverse(pos, nil, 0, ops)
traverse(pos, indexes, nil, 0, ops)
ops.Push(updateBatchNode(pos, iBatch, batch))
return
}

// on an internal node with more than one leaf

rightPos := pos.Right()
leftPos := pos.Left()
if bytes.Compare(index, rightPos.Index) < 0 { // go to left
traverse(pos.Left(), batch, 2*iBatch+1, ops)
discardBranch(rightPos, batch, 2*iBatch+2, ops)
} else { // go to right
discardBranch(leftPos, batch, 2*iBatch+1, ops)
traverse(rightPos, batch, 2*iBatch+2, ops)
}
leftIndexes, rightIndexes := split(indexes, rightPos.Index)

traverseThroughCache(pos.Left(), leftIndexes, batch, 2*iBatch+1, ops)
traverseThroughCache(rightPos, rightIndexes, batch, 2*iBatch+2, ops)

ops.PushAll(innerHash(pos), updateBatchNode(pos, iBatch, batch))
if iBatch == 0 { // it's the root of the batch tree
Expand All @@ -73,7 +86,7 @@ func pruneToRebuild(index, serializedBatch []byte, cacheHeightLimit uint16, batc
}

ops := newOperationsStack()
traverse(newRootPosition(uint16(len(index))), nil, 0, ops)
return ops

traverse(newRootPosition(uint16(len(indexes[0]))), indexes, nil, 0, ops)
return ops
}
109 changes: 7 additions & 102 deletions balloon/hyper/rebuild_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package hyper
import (
"testing"

"github.com/bbva/qed/balloon/cache"
"github.com/bbva/qed/hashing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -57,7 +55,7 @@ func TestPruneToRebuild(t *testing.T) {
{innerHashCode, pos(0, 5)},
{getDefaultHashCode, pos(16, 4)},
{updateBatchNodeCode, pos(0, 4)},
{useHashCode, pos(0, 4)},
{getDefaultHashCode, pos(0, 4)},
},
},
{
Expand Down Expand Up @@ -97,7 +95,7 @@ func TestPruneToRebuild(t *testing.T) {
{innerHashCode, pos(0, 5)},
{getDefaultHashCode, pos(16, 4)},
{updateBatchNodeCode, pos(0, 4)},
{useHashCode, pos(0, 4)},
{getDefaultHashCode, pos(0, 4)},
},
},
}
Expand All @@ -107,105 +105,12 @@ func TestPruneToRebuild(t *testing.T) {

for i, c := range testCases {
loader := newFakeBatchLoader(c.cachedBatches, nil, cacheHeightLimit)
prunedOps := pruneToRebuild(c.index, c.serializedBatch, cacheHeightLimit, loader).List()
require.Truef(t, len(c.expectedOps) == len(prunedOps), "The size of the pruned ops should match the expected for test case %d", i)
for j := 0; j < len(prunedOps); j++ {
assert.Equalf(t, c.expectedOps[j].Code, prunedOps[j].Code, "The pruned operation's code should match for test case %d", i)
assert.Equalf(t, c.expectedOps[j].Pos, prunedOps[j].Pos, "The pruned operation's position should match for test case %d", i)
}
}
prunedOps := pruneToRebuild([][]byte{c.index}, cacheHeightLimit, loader).List()

}

func TestRebuildInterpretation(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recover this test


testCases := []struct {
index, serializedBatch []byte
cachedBatches map[string][]byte
expectedElements []*cachedElement
}{
{
// insert index = 0 on empty cache
index: []byte{0},
serializedBatch: []byte{
0xe0, 0x00, 0x00, 0x00, // bitmap: 11100000 00000000 00000000 00000000
0x00, 0x01, // iBatch 0 -> hash=0x00 (shortcut index=0)
0x00, 0x02, // iBatch 1 -> key=0x00
0x00, 0x02, // iBatch 2 -> value=0x00
},
cachedBatches: map[string][]byte{},
expectedElements: []*cachedElement{
{
Pos: pos(0, 8),
Value: []byte{
0xd1, 0x01, 0x00, 0x00, // bitmap: 11010001 00000001 00000000 00000000
0x00, 0x00, // iBatch 0 -> hash=0x00
0x00, 0x00, // iBatch 1 -> hash=0x00
0x00, 0x00, // iBatch 3 -> hash=0x00
0x00, 0x00, // iBatch 7 -> hash=0x00
0x00, 0x00, // iBatch 15 -> hash=0x00
},
},
},
},
{
// insert index=1 on tree with 1 leaf (index: 0, value: 0)
index: []byte{1},
serializedBatch: []byte{
0xd1, 0x01, 0x80, 0x00, // bitmap: 11010001 00000001 10000000 00000000
0x01, 0x00, // iBatch 0 -> hash=0x01
0x01, 0x00, // iBatch 1 -> hash=0x01
0x01, 0x00, // iBatch 3 -> hash=0x01
0x01, 0x00, // iBatch 7 -> hash=0x01
0x00, 0x00, // iBatch 15 -> hash=0x00
0x01, 0x00, // iBatch 16 -> hash=0x01
},
cachedBatches: map[string][]byte{
pos(0, 8).StringId(): []byte{
0xd1, 0x01, 0x00, 0x00, // bitmap: 11010001 00000001 00000000 00000000
0x00, 0x00, // iBatch 0 -> hash=0x00
0x00, 0x00, // iBatch 1 -> hash=0x00
0x00, 0x00, // iBatch 3 -> hash=0x00
0x00, 0x00, // iBatch 7 -> hash=0x00
0x00, 0x00, // iBatch 15 -> hash=0x00
},
},
expectedElements: []*cachedElement{
{
Pos: pos(0, 8),
Value: []byte{
0xd1, 0x01, 0x00, 0x00, // bitmap: 11010001 00000001 00000000 00000000
0x01, 0x00, // iBatch 0 -> hash=0x01
0x01, 0x00, // iBatch 1 -> hash=0x01
0x01, 0x00, // iBatch 3 -> hash=0x01
0x01, 0x00, // iBatch 7 -> hash=0x01
0x01, 0x00, // iBatch 15 -> hash=0x01
},
},
},
},
}

batchLevels := uint16(1)
cacheHeightLimit := batchLevels * 4
defaultHashes := []hashing.Digest{{0}, {0}, {0}, {0}, {0}, {0}, {0}, {0}, {0}}

for i, c := range testCases {
cache := cache.NewFakeCache([]byte{0x0})
batches := newFakeBatchLoader(c.cachedBatches, nil, cacheHeightLimit)

ops := pruneToRebuild(c.index, c.serializedBatch, cacheHeightLimit, batches)
ctx := &pruningContext{
Hasher: hashing.NewFakeXorHasher(),
Cache: cache,
DefaultHashes: defaultHashes,
}

ops.Pop().Interpret(ops, ctx)

for _, e := range c.expectedElements {
v, _ := cache.Get(e.Pos.Bytes())
assert.Equalf(t, e.Value, v, "The cached element %v should be cached in test case %d", e, i)
require.Equal(t, len(c.expectedOps), len(prunedOps), "The size of the pruned ops should match the expected for test case %d", i)
for j := 0; j < len(prunedOps); j++ {
assert.Equalf(t, c.expectedOps[j].Code, prunedOps[j].Code, "The pruned operation's code should match for test case %d elem %d", i, j)
assert.Equalf(t, c.expectedOps[j].Pos, prunedOps[j].Pos, "The pruned operation's position should match for test case %d elem %d", i, j)
}
}
}
73 changes: 43 additions & 30 deletions balloon/hyper/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewHyperTree(hasherF func() hashing.Hasher, store storage.Store, cache cach

hasher := hasherF()
numBits := hasher.Len()
cacheHeightLimit := numBits - min(24, (numBits/8)*4)
cacheHeightLimit := numBits - min(24, numBits/8*4)

tree := &HyperTree{
store: store,
Expand Down Expand Up @@ -83,10 +83,11 @@ func (t *HyperTree) Add(eventDigest hashing.Digest, version uint64) (hashing.Dig
// build a stack of operations and then interpret it to generate the root hash
ops := pruneToInsert(eventDigest, versionAsBytes, t.cacheHeightLimit, t.batchLoader)
ctx := &pruningContext{
Hasher: t.hasher,
Cache: t.cache,
DefaultHashes: t.defaultHashes,
Mutations: make([]*storage.Mutation, 0),
Hasher: t.hasher,
Cache: t.cache,
RecoveryHeight: t.cacheHeightLimit + 4,
DefaultHashes: t.defaultHashes,
Mutations: make([]*storage.Mutation, 0),
}

rh := ops.Pop().Interpret(ops, ctx)
Expand All @@ -108,10 +109,11 @@ func (t *HyperTree) AddBulk(eventDigests []hashing.Digest, versions []uint64) (h
// build a stack of operations and then interpret it to generate the root hash
ops := pruneToInsertBulk(digestsAsBytes, versionsAsBytes, t.cacheHeightLimit, t.batchLoader)
ctx := &pruningContext{
Hasher: t.hasher,
Cache: t.cache,
DefaultHashes: t.defaultHashes,
Mutations: make([]*storage.Mutation, 0),
Hasher: t.hasher,
Cache: t.cache,
RecoveryHeight: t.cacheHeightLimit + 4,
DefaultHashes: t.defaultHashes,
Mutations: make([]*storage.Mutation, 0),
}

rh := ops.Pop().Interpret(ops, ctx)
Expand All @@ -128,10 +130,11 @@ func (t *HyperTree) QueryMembership(eventDigest hashing.Digest) (proof *QueryPro
// build a stack of operations and then interpret it to generate the audit path
ops := pruneToFind(eventDigest, t.batchLoader)
ctx := &pruningContext{
Hasher: t.hasher,
Cache: t.cache,
DefaultHashes: t.defaultHashes,
AuditPath: make(AuditPath, 0),
Hasher: t.hasher,
Cache: t.cache,
RecoveryHeight: t.cacheHeightLimit + 4,
DefaultHashes: t.defaultHashes,
AuditPath: make(AuditPath, 0),
}

ops.Pop().Interpret(ops, ctx)
Expand All @@ -147,26 +150,36 @@ func (t *HyperTree) RebuildCache() {
// warm up cache
log.Info("Warming up hyper cache...")

// get all nodes at cache limit height
start := make([]byte, 2+t.hasher.Len()/8)
end := make([]byte, 2+t.hasher.Len()/8)
start[1] = byte(t.cacheHeightLimit)
end[1] = byte(t.cacheHeightLimit + 1)
nodes, err := t.store.GetRange(storage.HyperTable, start, end)
if err != nil {
log.Fatalf("Oops, something went wrong: %v", err)
}
indexes := make([][]byte, 0)

tileReader := t.store.GetAll(storage.HyperCacheTable)
tiles := make([]*storage.KVPair, 1000)
for {
n, err := tileReader.Read(tiles)
if n == 0 || err != nil {
break
}

// insert every node into cache
for _, node := range nodes {
ops := pruneToRebuild(node.Key[2:], node.Value, t.cacheHeightLimit, t.batchLoader)
ctx := &pruningContext{
Hasher: t.hasher,
Cache: t.cache,
DefaultHashes: t.defaultHashes,
for i := 0; i < n; i++ {
indexes = append(indexes, tiles[i].Key[2:])
t.cache.Put(tiles[i].Key, tiles[i].Value)
}
ops.Pop().Interpret(ops, ctx)
}
// if there are no elements, we start with a clean
// cache
if len(indexes) == 0 {
return
}

ops := pruneToRebuild(indexes, t.cacheHeightLimit+4, t.batchLoader)
ctx := &pruningContext{
Hasher: t.hasher,
Cache: t.cache,
RecoveryHeight: t.cacheHeightLimit + 4,
DefaultHashes: t.defaultHashes,
}
ops.Pop().Interpret(ops, ctx)

}

func (t *HyperTree) Close() {
Expand Down
Loading