Skip to content

Commit fd426ab

Browse files
authoredApr 5, 2019
Merge pull request BBVA#102 from aalda/metrics
Metrics
2 parents 1652a0c + 3d571ac commit fd426ab

File tree

12 files changed

+1241
-207
lines changed

12 files changed

+1241
-207
lines changed
 

‎api/apihttp/apihttp.go

-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"time"
2424

2525
"github.com/bbva/qed/log"
26-
"github.com/bbva/qed/metrics"
2726
"github.com/bbva/qed/protocol"
2827
"github.com/bbva/qed/raftwal"
2928
)
@@ -44,8 +43,6 @@ type HealthCheckResponse struct {
4443
// and no body.
4544
func HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
4645

47-
metrics.QedAPIHealthcheckRequestsTotal.Inc()
48-
4946
// Make sure we can only be called with an HTTP POST request.
5047
if r.Method != "HEAD" {
5148
w.Header().Set("Allow", "HEAD")

‎balloon/balloon.go

+1-17
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,6 @@ func (b *Balloon) RefreshVersion() error {
182182

183183
func (b *Balloon) Add(event []byte) (*Snapshot, []*storage.Mutation, error) {
184184

185-
// Metrics
186-
metrics.QedBalloonAddTotal.Inc()
187-
//timer := prometheus.NewTimer(metrics.QedBalloonAddDurationSeconds)
188-
//defer timer.ObserveDuration()
189-
190185
// Activate metrics gathering
191186
stats := metrics.Balloon
192187

@@ -230,21 +225,13 @@ func (b *Balloon) Add(event []byte) (*Snapshot, []*storage.Mutation, error) {
230225
Version: version,
231226
}
232227

233-
// Increment add hits and version
234-
stats.AddFloat("add_hits", 1)
228+
// Increment version
235229
stats.Set("version", metrics.Uint64ToVar(version))
236230

237231
return snapshot, mutations, nil
238232
}
239233

240234
func (b Balloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*MembershipProof, error) {
241-
// Metrics
242-
metrics.QedBalloonDigestMembershipTotal.Inc()
243-
//timer := prometheus.NewTimer(metrics.QedBalloonDigestMembershipDurationSeconds)
244-
//defer timer.ObserveDuration()
245-
246-
stats := metrics.Balloon
247-
stats.AddFloat("QueryMembership", 1)
248235

249236
var proof MembershipProof
250237
var err error
@@ -297,9 +284,6 @@ func (b Balloon) QueryMembership(event []byte, version uint64) (*MembershipProof
297284

298285
func (b Balloon) QueryConsistency(start, end uint64) (*IncrementalProof, error) {
299286

300-
// Metrics
301-
metrics.QedBalloonIncrementalTotal.Inc()
302-
303287
stats := metrics.Balloon
304288
stats.AddFloat("QueryConsistency", 1)
305289
var proof IncrementalProof

‎balloon/balloon_test.go

+10-47
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/bbva/qed/hashing"
2828
"github.com/bbva/qed/log"
29+
metrics_utils "github.com/bbva/qed/testutils/metrics"
2930
"github.com/bbva/qed/testutils/rand"
3031
storage_utils "github.com/bbva/qed/testutils/storage"
3132
"github.com/bbva/qed/util"
@@ -225,50 +226,6 @@ func TestGenIncrementalAndVerify(t *testing.T) {
225226
assert.True(t, correct, "Unable to verify incremental proof")
226227
}
227228

228-
func BenchmarkAddBadger(b *testing.B) {
229-
230-
log.SetLogger("BenchmarkAddBadger", log.SILENT)
231-
232-
store, closeF := storage_utils.OpenBadgerStore(b, "/var/tmp/balloon_bench.db")
233-
defer closeF()
234-
235-
balloon, err := NewBalloon(store, hashing.NewSha256Hasher)
236-
require.NoError(b, err)
237-
238-
b.ResetTimer()
239-
b.N = 100000
240-
for i := 0; i < b.N; i++ {
241-
event := rand.Bytes(128)
242-
_, mutations, _ := balloon.Add(event)
243-
store.Mutate(mutations)
244-
}
245-
246-
}
247-
248-
func BenchmarkQueryBadger(b *testing.B) {
249-
var events [][]byte
250-
log.SetLogger("BenchmarkAddBadger", log.SILENT)
251-
252-
store, closeF := storage_utils.OpenBadgerStore(b, "/var/tmp/ballon_bench.db")
253-
defer closeF()
254-
255-
balloon, err := NewBalloon(store, hashing.NewSha256Hasher)
256-
require.NoError(b, err)
257-
258-
b.N = 100000
259-
for i := 0; i < b.N; i++ {
260-
event := rand.Bytes(128)
261-
events = append(events, event)
262-
_, mutations, _ := balloon.Add(event)
263-
store.Mutate(mutations)
264-
}
265-
266-
b.ResetTimer()
267-
for i, e := range events {
268-
balloon.QueryMembership(e, uint64(i))
269-
}
270-
271-
}
272229
func BenchmarkAddRocksDB(b *testing.B) {
273230

274231
log.SetLogger("BenchmarkAddRocksDB", log.SILENT)
@@ -279,12 +236,18 @@ func BenchmarkAddRocksDB(b *testing.B) {
279236
balloon, err := NewBalloon(store, hashing.NewSha256Hasher)
280237
require.NoError(b, err)
281238

239+
balloonMetrics := metrics_utils.CustomRegister(AddTotal)
240+
srvCloseF := metrics_utils.StartMetricsServer(balloonMetrics, store)
241+
defer srvCloseF()
242+
282243
b.ResetTimer()
283-
b.N = 1000000
244+
b.N = 2000000
284245
for i := 0; i < b.N; i++ {
285246
event := rand.Bytes(128)
286-
_, mutations, _ := balloon.Add(event)
287-
store.Mutate(mutations)
247+
_, mutations, err := balloon.Add(event)
248+
require.NoError(b, err)
249+
require.NoError(b, store.Mutate(mutations))
250+
AddTotal.Inc()
288251
}
289252

290253
}

‎balloon/metrics.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package balloon
18+
19+
import "github.com/prometheus/client_golang/prometheus"
20+
21+
const namespace = "qed"
22+
const subSystem = "balloon"
23+
24+
var (
25+
AddTotal = prometheus.NewCounter(
26+
prometheus.CounterOpts{
27+
Namespace: namespace,
28+
Subsystem: subSystem,
29+
Name: "add_total",
30+
Help: "Number of add operations",
31+
},
32+
)
33+
MembershipTotal = prometheus.NewCounter(
34+
prometheus.CounterOpts{
35+
Namespace: namespace,
36+
Subsystem: subSystem,
37+
Name: "membership_total",
38+
Help: "Number of membership queries.",
39+
},
40+
)
41+
DigestMembershipTotal = prometheus.NewGauge(
42+
prometheus.GaugeOpts{
43+
Namespace: namespace,
44+
Subsystem: subSystem,
45+
Name: "digest_membership_total",
46+
Help: "Number of membership by digest queries.",
47+
},
48+
)
49+
IncrementalTotal = prometheus.NewCounter(
50+
prometheus.CounterOpts{
51+
Namespace: namespace,
52+
Subsystem: subSystem,
53+
Name: "incremental_total",
54+
Help: "Number of incremental queries.",
55+
},
56+
)
57+
)

‎deploy/aws/provision/files/grafana/dashboards/QED.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@
215215
"tableColumn": "Value",
216216
"targets": [
217217
{
218-
"expr": "sum(qed_balloon_add_total) / sum(qed_server_instances)",
218+
"expr": "sum(qed_raft_balloon_adds) / sum(qed_server_instances)",
219219
"format": "time_series",
220220
"hide": false,
221221
"instant": false,
@@ -296,7 +296,7 @@
296296
"tableColumn": "",
297297
"targets": [
298298
{
299-
"expr": "sum(qed_balloon_digest_membership_total)",
299+
"expr": "sum(qed_raft_balloon_digest_membership_queries)",
300300
"format": "time_series",
301301
"intervalFactor": 1,
302302
"refId": "A"
@@ -375,7 +375,7 @@
375375
"tableColumn": "",
376376
"targets": [
377377
{
378-
"expr": "sum(qed_balloon_incremental_total)",
378+
"expr": "sum(qed_raft_balloon_incremental_queries)",
379379
"format": "time_series",
380380
"intervalFactor": 1,
381381
"refId": "A"
@@ -443,7 +443,7 @@
443443
"steppedLine": false,
444444
"targets": [
445445
{
446-
"expr": "sum(rate(qed_balloon_add_total[$interval])) by (job)",
446+
"expr": "sum(rate(qed_raft_balloon_adds[$interval])) by (job)",
447447
"format": "time_series",
448448
"hide": false,
449449
"interval": "",
@@ -530,7 +530,7 @@
530530
"steppedLine": false,
531531
"targets": [
532532
{
533-
"expr": "sum(rate(qed_balloon_digest_membership_total[$interval])) by (job)",
533+
"expr": "sum(rate(qed_raft_balloon_digest_membership_queries[$interval])) by (job)",
534534
"format": "time_series",
535535
"intervalFactor": 1,
536536
"legendFormat": "{{job}}",
@@ -615,7 +615,7 @@
615615
"steppedLine": false,
616616
"targets": [
617617
{
618-
"expr": "sum(rate(qed_balloon_incremental_total[$interval])) by (job)",
618+
"expr": "sum(rate(qed_raft_balloon_incremental_queries[$interval])) by (job)",
619619
"format": "time_series",
620620
"intervalFactor": 1,
621621
"legendFormat": "{{job}}",

‎metrics/definitions.go

-71
This file was deleted.

‎raftwal/metrics.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package raftwal
18+
19+
import "github.com/prometheus/client_golang/prometheus"
20+
21+
// namespace is the leading part of all published metrics.
22+
const namespace = "qed"
23+
24+
// subsystem associated with metrics for raft balloon
25+
const subSystem = "raft_balloon"
26+
27+
type raftBalloonMetrics struct {
28+
Version prometheus.GaugeFunc
29+
Adds prometheus.Counter
30+
MembershipQueries prometheus.Counter
31+
DigestMembershipQueries prometheus.Counter
32+
IncrementalQueries prometheus.Counter
33+
}
34+
35+
func newRaftBalloonMetrics(b *RaftBalloon) *raftBalloonMetrics {
36+
return &raftBalloonMetrics{
37+
Version: prometheus.NewGaugeFunc(
38+
prometheus.GaugeOpts{
39+
Namespace: namespace,
40+
Subsystem: subSystem,
41+
Name: "version",
42+
Help: "Current balloon version.",
43+
},
44+
func() float64 {
45+
return float64(b.fsm.balloon.Version())
46+
},
47+
),
48+
Adds: prometheus.NewCounter(
49+
prometheus.CounterOpts{
50+
Namespace: namespace,
51+
Subsystem: subSystem,
52+
Name: "adds",
53+
Help: "Number of add operations",
54+
},
55+
),
56+
MembershipQueries: prometheus.NewCounter(
57+
prometheus.CounterOpts{
58+
Namespace: namespace,
59+
Subsystem: subSystem,
60+
Name: "membership_queries",
61+
Help: "Number of membership queries.",
62+
},
63+
),
64+
DigestMembershipQueries: prometheus.NewGauge(
65+
prometheus.GaugeOpts{
66+
Namespace: namespace,
67+
Subsystem: subSystem,
68+
Name: "digest_membership_queries",
69+
Help: "Number of membership by digest queries.",
70+
},
71+
),
72+
IncrementalQueries: prometheus.NewCounter(
73+
prometheus.CounterOpts{
74+
Namespace: namespace,
75+
Subsystem: subSystem,
76+
Name: "incremental_queries",
77+
Help: "Number of incremental queries.",
78+
},
79+
),
80+
}
81+
}
82+
83+
// collectors satisfies the prom.PrometheusCollector interface.
84+
func (m *raftBalloonMetrics) collectors() []prometheus.Collector {
85+
return []prometheus.Collector{
86+
m.Version,
87+
m.Adds,
88+
m.MembershipQueries,
89+
m.DigestMembershipQueries,
90+
m.IncrementalQueries,
91+
}
92+
}

‎raftwal/raft.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/bbva/qed/balloon"
2727
"github.com/bbva/qed/hashing"
2828
"github.com/bbva/qed/log"
29+
"github.com/bbva/qed/metrics"
2930
"github.com/bbva/qed/protocol"
3031
"github.com/bbva/qed/raftwal/commands"
3132
"github.com/bbva/qed/raftwal/raftrocks"
@@ -90,13 +91,14 @@ type RaftBalloon struct {
9091
fsm *BalloonFSM // balloon's finite state machine
9192
snapshotsCh chan *protocol.Snapshot // channel to publish snapshots
9293

94+
metrics *raftBalloonMetrics
9395
}
9496

9597
// NewRaftBalloon returns a new RaftBalloon.
9698
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) {
9799

98100
// Create the log store and stable store
99-
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
101+
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true, EnableStatistics: true})
100102
if err != nil {
101103
return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err)
102104
}
@@ -128,6 +130,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshots
128130
rb.store.db = store
129131
rb.store.log = logStore
130132
rb.store.rocksStore = rocksStore
133+
rb.metrics = newRaftBalloonMetrics(rb)
131134

132135
return rb, nil
133136
}
@@ -233,9 +236,11 @@ func (b *RaftBalloon) Close(wait bool) error {
233236

234237
b.store.rocksStore = nil
235238
b.store.log = nil
239+
b.metrics = nil
236240

237241
// Close FSM
238242
b.fsm.Close()
243+
b.fsm = nil
239244

240245
// close database
241246
if err := b.store.db.Close(); err != nil {
@@ -369,6 +374,7 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
369374
if err != nil {
370375
return nil, err
371376
}
377+
b.metrics.Adds.Inc()
372378
snapshot := resp.(*fsmAddResponse).snapshot
373379

374380
//Send snapshot to the snapshot channel
@@ -383,14 +389,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
383389
}
384390

385391
func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) {
392+
b.metrics.DigestMembershipQueries.Inc()
386393
return b.fsm.QueryDigestMembership(keyDigest, version)
387394
}
388395

389396
func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) {
397+
b.metrics.MembershipQueries.Inc()
390398
return b.fsm.QueryMembership(event, version)
391399
}
392400

393401
func (b *RaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) {
402+
b.metrics.IncrementalQueries.Inc()
394403
return b.fsm.QueryConsistency(start, end)
395404
}
396405

@@ -467,3 +476,10 @@ func (b *RaftBalloon) Info() map[string]interface{} {
467476
m["meta"] = b.fsm.meta
468477
return m
469478
}
479+
480+
func (b *RaftBalloon) RegisterMetrics(registry metrics.Registry) {
481+
if registry != nil {
482+
b.store.rocksStore.RegisterMetrics(registry)
483+
}
484+
registry.MustRegister(b.metrics.collectors()...)
485+
}

‎raftwal/raft_test.go

+18-11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"github.com/bbva/qed/log"
3333
"github.com/bbva/qed/storage/rocks"
34+
metrics_utils "github.com/bbva/qed/testutils/metrics"
3435
utilrand "github.com/bbva/qed/testutils/rand"
3536
"github.com/hashicorp/raft"
3637
"github.com/stretchr/testify/require"
@@ -449,34 +450,40 @@ func mustTempDir() string {
449450
}
450451

451452
func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
452-
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)
453+
storePath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id)
453454

454-
err := os.MkdirAll(rocksdbPath, os.FileMode(0755))
455+
err := os.MkdirAll(storePath, os.FileMode(0755))
455456
require.NoError(b, err)
456-
rocksdb, err := rocks.NewRocksDBStore(rocksdbPath)
457+
store, err := rocks.NewRocksDBStore(storePath)
457458
require.NoError(b, err)
458459

459460
raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
460461
err = os.MkdirAll(raftPath, os.FileMode(0755))
461462
require.NoError(b, err)
462463

463464
snapshotsCh := make(chan *protocol.Snapshot, 10000)
464-
startSnapshotsDrainer(snapshotsCh)
465-
//defer close(snapshotsCh)
465+
snapshotsDrainer(snapshotsCh)
466466

467-
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, snapshotsCh)
467+
node, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), store, snapshotsCh)
468468
require.NoError(b, err)
469469

470-
return r, func() {
470+
srvCloseF := metrics_utils.StartMetricsServer(node, store)
471+
472+
return node, func() {
473+
srvCloseF()
474+
close(snapshotsCh)
471475
os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id))
472476
}
473477

474478
}
475479

476-
func startSnapshotsDrainer(snapshotsCh chan *protocol.Snapshot) {
480+
func snapshotsDrainer(snapshotsCh chan *protocol.Snapshot) {
477481
go func() {
478-
for range snapshotsCh {
479-
482+
for {
483+
_, ok := <-snapshotsCh
484+
if !ok {
485+
return
486+
}
480487
}
481488
}()
482489
}
@@ -491,7 +498,7 @@ func BenchmarkRaftAdd(b *testing.B) {
491498
err := raftNode.Open(true, map[string]string{"foo": "bar"})
492499
require.NoError(b, err)
493500

494-
// b.N shoul be eq or greater than 500k to avoid benchmark framework spreding more than one goroutine.
501+
// b.N shoul be eq or greater than 500k to avoid benchmark framework spreading more than one goroutine.
495502
b.N = 2000000
496503
b.ResetTimer()
497504
b.SetParallelism(100)

‎raftwal/raftrocks/metrics.go

+945
Large diffs are not rendered by default.

‎raftwal/raftrocks/rocksdb_store.go

+87-43
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"errors"
2222

23+
"github.com/bbva/qed/metrics"
2324
"github.com/bbva/qed/rocksdb"
2425
"github.com/bbva/qed/util"
2526
"github.com/hashicorp/go-msgpack/codec"
@@ -31,34 +32,58 @@ var (
3132
ErrKeyNotFound = errors.New("not found")
3233
)
3334

35+
// table groups related key-value pairs under a
36+
// consistent space.
37+
type table uint32
38+
3439
const (
35-
stableCF string = "stable"
36-
logCF string = "log"
40+
defaultTable table = iota
41+
stableTable
42+
logTable
3743
)
3844

45+
func (t table) String() string {
46+
var s string
47+
switch t {
48+
case defaultTable:
49+
s = "default"
50+
case stableTable:
51+
s = "stable"
52+
case logTable:
53+
s = "log"
54+
}
55+
return s
56+
}
57+
3958
// RocksDBStore provides access to RocksDB for Raft to store and retrieve
4059
// log entries. It also provides key/value storage, and can be used as
4160
// a LogStore and StableStore.
4261
type RocksDBStore struct {
4362
// db is the underlying handle to the db.
4463
db *rocksdb.DB
4564

65+
stats *rocksdb.Statistics
66+
4667
// The path to the RocksDB database directory.
47-
path string
48-
ro *rocksdb.ReadOptions
49-
wo *rocksdb.WriteOptions
50-
stableCFHandle *rocksdb.ColumnFamilyHandle
51-
logCFHandle *rocksdb.ColumnFamilyHandle
68+
path string
69+
ro *rocksdb.ReadOptions
70+
wo *rocksdb.WriteOptions
71+
// column family handlers
72+
cfHandles rocksdb.ColumnFamilyHandles
5273

5374
// global options
5475
globalOpts *rocksdb.Options
5576
// stable options
5677
stableBbto *rocksdb.BlockBasedTableOptions
5778
stableOpts *rocksdb.Options
5879
// log options
59-
logBbto *rocksdb.BlockBasedTableOptions
60-
logOpts *rocksdb.Options
61-
logCache *rocksdb.Cache
80+
logBbto *rocksdb.BlockBasedTableOptions
81+
logOpts *rocksdb.Options
82+
// block cache
83+
blockCache *rocksdb.Cache
84+
85+
// metrics
86+
metrics *rocksDBMetrics
6287
}
6388

6489
// Options contains all the configuration used to open the RocksDB instance.
@@ -71,6 +96,8 @@ type Options struct {
7196
// write to the log. This is unsafe, so it should be used
7297
// with caution.
7398
NoSync bool
99+
100+
EnableStatistics bool
74101
}
75102

76103
// NewRocksDBStore takes a file path and returns a connected Raft backend.
@@ -85,14 +112,20 @@ func New(options Options) (*RocksDBStore, error) {
85112
// we need two column families, one for stable store and one for log store:
86113
// stable : used for storing key configurations.
87114
// log : used for storing logs in a durable fashion.
88-
cfNames := []string{stableCF, logCF, "default"}
115+
cfNames := []string{defaultTable.String(), stableTable.String(), logTable.String()}
89116

90117
defaultOpts := rocksdb.NewDefaultOptions()
91118

92119
// global options
93120
globalOpts := rocksdb.NewDefaultOptions()
94121
globalOpts.SetCreateIfMissing(true)
95122
globalOpts.SetCreateIfMissingColumnFamilies(true)
123+
blockCache := rocksdb.NewDefaultLRUCache(512 * 1024 * 1024)
124+
var stats *rocksdb.Statistics
125+
if options.EnableStatistics {
126+
stats = rocksdb.NewStatistics()
127+
globalOpts.SetStatistics(stats)
128+
}
96129

97130
// stable store options
98131
stableBbto := rocksdb.NewDefaultBlockBasedTableOptions()
@@ -103,8 +136,7 @@ func New(options Options) (*RocksDBStore, error) {
103136
logBbto := rocksdb.NewDefaultBlockBasedTableOptions()
104137
logBbto.SetBlockSize(32 * 1024)
105138
logBbto.SetCacheIndexAndFilterBlocks(true)
106-
logCache := rocksdb.NewDefaultLRUCache(512 * 1024 * 1024)
107-
logBbto.SetBlockCache(logCache)
139+
logBbto.SetBlockCache(blockCache)
108140
logOpts := rocksdb.NewDefaultOptions()
109141
logOpts.SetUseFsync(!options.NoSync)
110142
// dio := directIOSupported(options.Path)
@@ -146,7 +178,7 @@ func New(options Options) (*RocksDBStore, error) {
146178
logOpts.SetMaxBackgroundCompactions(2)
147179
logOpts.SetMaxBackgroundFlushes(2)
148180

149-
cfOpts := []*rocksdb.Options{stableOpts, logOpts, defaultOpts}
181+
cfOpts := []*rocksdb.Options{defaultOpts, stableOpts, logOpts}
150182
db, cfHandles, err := rocksdb.OpenDBColumnFamilies(options.Path, globalOpts, cfNames, cfOpts)
151183
if err != nil {
152184
return nil, err
@@ -158,29 +190,32 @@ func New(options Options) (*RocksDBStore, error) {
158190
ro := rocksdb.NewDefaultReadOptions()
159191
ro.SetFillCache(false)
160192

161-
return &RocksDBStore{
162-
db: db,
163-
path: options.Path,
164-
stableCFHandle: cfHandles[0],
165-
logCFHandle: cfHandles[1],
166-
stableBbto: stableBbto,
167-
stableOpts: stableOpts,
168-
logBbto: logBbto,
169-
logOpts: logOpts,
170-
logCache: logCache,
171-
globalOpts: globalOpts,
172-
ro: ro,
173-
wo: wo,
174-
}, nil
193+
store := &RocksDBStore{
194+
db: db,
195+
stats: stats,
196+
path: options.Path,
197+
cfHandles: cfHandles,
198+
stableBbto: stableBbto,
199+
stableOpts: stableOpts,
200+
logBbto: logBbto,
201+
logOpts: logOpts,
202+
blockCache: blockCache,
203+
globalOpts: globalOpts,
204+
ro: ro,
205+
wo: wo,
206+
}
207+
208+
if stats != nil {
209+
store.metrics = newRocksDBMetrics(store)
210+
}
211+
212+
return store, nil
175213
}
176214

177215
// Close is used to gracefully close the DB connection.
178216
func (s *RocksDBStore) Close() error {
179-
if s.stableCFHandle != nil {
180-
s.stableCFHandle.Destroy()
181-
}
182-
if s.logCFHandle != nil {
183-
s.logCFHandle.Destroy()
217+
for _, cf := range s.cfHandles {
218+
cf.Destroy()
184219
}
185220
if s.db != nil {
186221
s.db.Close()
@@ -191,15 +226,18 @@ func (s *RocksDBStore) Close() error {
191226
if s.stableOpts != nil {
192227
s.stableOpts.Destroy()
193228
}
194-
if s.logCache != nil {
195-
s.logCache.Destroy()
229+
if s.blockCache != nil {
230+
s.blockCache.Destroy()
196231
}
197232
if s.logBbto != nil {
198233
s.logBbto.Destroy()
199234
}
200235
if s.logOpts != nil {
201236
s.logOpts.Destroy()
202237
}
238+
if s.stats != nil {
239+
s.stats.Destroy()
240+
}
203241
if s.wo != nil {
204242
s.wo.Destroy()
205243
}
@@ -212,7 +250,7 @@ func (s *RocksDBStore) Close() error {
212250

213251
// FirstIndex returns the first known index from the Raft log.
214252
func (s *RocksDBStore) FirstIndex() (uint64, error) {
215-
it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle)
253+
it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.cfHandles[logTable])
216254
defer it.Close()
217255
it.SeekToFirst()
218256
if it.Valid() {
@@ -227,7 +265,7 @@ func (s *RocksDBStore) FirstIndex() (uint64, error) {
227265

228266
// LastIndex returns the last known index from the Raft log.
229267
func (s *RocksDBStore) LastIndex() (uint64, error) {
230-
it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle)
268+
it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.cfHandles[logTable])
231269
defer it.Close()
232270
it.SeekToLast()
233271
if it.Valid() {
@@ -242,7 +280,7 @@ func (s *RocksDBStore) LastIndex() (uint64, error) {
242280

243281
// GetLog gets a log entry at a given index.
244282
func (s *RocksDBStore) GetLog(index uint64, log *raft.Log) error {
245-
val, err := s.db.GetBytesCF(s.ro, s.logCFHandle, util.Uint64AsBytes(index))
283+
val, err := s.db.GetBytesCF(s.ro, s.cfHandles[logTable], util.Uint64AsBytes(index))
246284
if err != nil {
247285
return err
248286
}
@@ -258,7 +296,7 @@ func (s *RocksDBStore) StoreLog(log *raft.Log) error {
258296
if err != nil {
259297
return err
260298
}
261-
return s.db.PutCF(s.wo, s.logCFHandle, util.Uint64AsBytes(log.Index), val.Bytes())
299+
return s.db.PutCF(s.wo, s.cfHandles[logTable], util.Uint64AsBytes(log.Index), val.Bytes())
262300
}
263301

264302
// StoreLogs stores a set of raft logs.
@@ -270,29 +308,29 @@ func (s *RocksDBStore) StoreLogs(logs []*raft.Log) error {
270308
if err != nil {
271309
return err
272310
}
273-
batch.PutCF(s.logCFHandle, key, val.Bytes())
311+
batch.PutCF(s.cfHandles[logTable], key, val.Bytes())
274312
}
275313
return s.db.Write(s.wo, batch)
276314
}
277315

278316
// DeleteRange deletes logs within a given range inclusively.
279317
func (s *RocksDBStore) DeleteRange(min, max uint64) error {
280318
batch := rocksdb.NewWriteBatch()
281-
batch.DeleteRangeCF(s.logCFHandle, util.Uint64AsBytes(min), util.Uint64AsBytes(max+1))
319+
batch.DeleteRangeCF(s.cfHandles[logTable], util.Uint64AsBytes(min), util.Uint64AsBytes(max+1))
282320
return s.db.Write(s.wo, batch)
283321
}
284322

285323
// Set is used to set a key/value set outside of the raft log.
286324
func (s *RocksDBStore) Set(key []byte, val []byte) error {
287-
if err := s.db.PutCF(s.wo, s.stableCFHandle, key, val); err != nil {
325+
if err := s.db.PutCF(s.wo, s.cfHandles[stableTable], key, val); err != nil {
288326
return err
289327
}
290328
return nil
291329
}
292330

293331
// Get is used to retrieve a value from the k/v store by key
294332
func (s *RocksDBStore) Get(key []byte) ([]byte, error) {
295-
val, err := s.db.GetBytesCF(s.ro, s.stableCFHandle, key)
333+
val, err := s.db.GetBytesCF(s.ro, s.cfHandles[stableTable], key)
296334
if err != nil {
297335
return nil, err
298336
}
@@ -316,6 +354,12 @@ func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) {
316354
return util.BytesAsUint64(val), nil
317355
}
318356

357+
func (s *RocksDBStore) RegisterMetrics(registry metrics.Registry) {
358+
if registry != nil {
359+
registry.MustRegister(s.metrics.collectors()...)
360+
}
361+
}
362+
319363
// Decode reverses the encode operation on a byte slice input
320364
func decodeMsgPack(buf []byte, out interface{}) error {
321365
r := bytes.NewBuffer(buf)

‎server/server.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,8 @@ func NewServer(conf *Config) (*Server, error) {
119119
return nil, err
120120
}
121121

122-
// create metrics server and register default qed metrics
123-
server.metrics = newServerMetrics()
122+
// Create metrics server
124123
server.metricsServer = metrics.NewServer(conf.MetricsAddr)
125-
server.RegisterMetrics(server.metricsServer)
126-
store.RegisterMetrics(server.metricsServer)
127-
// server.metricsServer.Register(raft.PrometheusCollectors())
128-
// server.metricsServer.Register(balloon.PrometheusCollectors())
129124

130125
// Create gossip agent
131126
config := gossip.DefaultConfig()
@@ -150,7 +145,6 @@ func NewServer(conf *Config) (*Server, error) {
150145

151146
// Create sender
152147
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
153-
server.sender.RegisterMetrics(server.metricsServer)
154148

155149
// Create RaftBalloon
156150
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh)
@@ -172,6 +166,13 @@ func NewServer(conf *Config) (*Server, error) {
172166
mgmtMux := mgmthttp.NewMgmtHttp(server.raftBalloon)
173167
server.mgmtServer = newHTTPServer(conf.MgmtAddr, mgmtMux)
174168

169+
// register qed metrics
170+
server.metrics = newServerMetrics()
171+
server.RegisterMetrics(server.metricsServer)
172+
store.RegisterMetrics(server.metricsServer)
173+
server.raftBalloon.RegisterMetrics(server.metricsServer)
174+
server.sender.RegisterMetrics(server.metricsServer)
175+
175176
return server, nil
176177
}
177178

@@ -308,7 +309,6 @@ func (s *Server) Stop() error {
308309
func (s *Server) RegisterMetrics(registry metrics.Registry) {
309310
if registry != nil {
310311
registry.MustRegister(s.metrics.collectors()...)
311-
registry.MustRegister(metrics.DefaultMetrics...) // TODO: remove this!!!
312312
}
313313
}
314314

0 commit comments

Comments
 (0)
Please sign in to comment.