@@ -36,13 +36,15 @@ func NewRaftNode(
36
36
Mutex : sync.Mutex {},
37
37
38
38
MessageBuffer : make (chan * pubsub.Message , messageBuffer ),
39
- Resign : make (chan interface {}),
40
39
HeartbeatTimeout : HEARTBEAT_TIMEOUT ,
41
40
42
41
LeaderJobTimeout : leaderJobTimeout ,
43
42
MissedHeartbeats : 0 ,
44
43
CooldownPeriod : DefaultCooldownPeriod ,
45
44
LastElectionTime : time.Time {},
45
+
46
+ blacklist : make (map [string ]struct {}),
47
+ isDelayedNode : false ,
46
48
}
47
49
return r
48
50
}
@@ -102,6 +104,11 @@ func (r *Raft) subscribe(ctx context.Context) {
102
104
// handler for incoming messages
103
105
104
106
func (r * Raft ) handleMessage (ctx context.Context , msg Message ) error {
107
+ if _ , ok := r .blacklist [msg .SentFrom ]; ok {
108
+ // blacklist for testing to simulate network partition
109
+ return nil
110
+ }
111
+
105
112
switch msg .Type {
106
113
case Heartbeat :
107
114
return r .handleHeartbeat (msg )
@@ -137,7 +144,7 @@ func (r *Raft) handleHeartbeat(msg Message) error {
137
144
currentRole := r .Role
138
145
currentTerm := r .Term
139
146
140
- if r .Role == Follower {
147
+ if r .Role != Leader {
141
148
r .startElectionTimer ()
142
149
}
143
150
@@ -162,7 +169,6 @@ func (r *Raft) handleHeartbeat(msg Message) error {
162
169
r .LeaderID = heartbeatMessage .LeaderID
163
170
}
164
171
}
165
-
166
172
return nil
167
173
}
168
174
@@ -239,6 +245,10 @@ func (r *Raft) PublishMessage(ctx context.Context, msg Message) error {
239
245
if err != nil {
240
246
return err
241
247
}
248
+
249
+ if r .isDelayedNode {
250
+ time .Sleep (500 * time .Millisecond )
251
+ }
242
252
return r .Topic .Publish (ctx , data )
243
253
}
244
254
@@ -313,17 +323,16 @@ func (r *Raft) sendRequestVote(ctx context.Context) error {
313
323
// utility functions
314
324
315
325
func (r * Raft ) ResignLeader () {
316
- if r . Resign != nil {
317
- close ( r . Resign )
318
- r . Resign = nil
319
- r . Role = Follower
320
- r . LeaderID = ""
321
- r . startElectionTimer ()
322
- }
326
+ r . Role = Follower
327
+ r . LeaderID = ""
328
+
329
+ r . HeartbeatTicker . Stop ()
330
+ r . LeaderJobTicker . Stop ()
331
+
332
+ r . startElectionTimer ()
323
333
}
324
334
325
335
func (r * Raft ) setLeaderState () {
326
- r .Resign = make (chan interface {})
327
336
r .ElectionTimer .Stop ()
328
337
r .Term ++
329
338
r .Role = Leader
@@ -343,13 +352,6 @@ func (r *Raft) becomeLeader(ctx context.Context) {
343
352
344
353
for {
345
354
select {
346
- case <- r .Resign :
347
- r .Mutex .Lock ()
348
- r .HeartbeatTicker .Stop ()
349
- r .LeaderJobTicker .Stop ()
350
- r .Mutex .Unlock ()
351
- return
352
-
353
355
case <- r .HeartbeatTicker .C :
354
356
err := r .sendHeartbeat (ctx )
355
357
if err != nil {
@@ -445,3 +447,24 @@ func (r *Raft) unmarshalMessage(data []byte) (Message, error) {
445
447
}
446
448
return m , nil
447
449
}
450
+
451
+ func (r * Raft ) addBlacklist (id string ) {
452
+ // used for testing purposes
453
+ r .Mutex .Lock ()
454
+ defer r .Mutex .Unlock ()
455
+ r .blacklist [id ] = struct {}{}
456
+ }
457
+
458
+ func (r * Raft ) removeBlacklist (id string ) {
459
+ // used for testing purposes
460
+ r .Mutex .Lock ()
461
+ defer r .Mutex .Unlock ()
462
+ delete (r .blacklist , id )
463
+ }
464
+
465
+ func (r * Raft ) setDelayedNode (isDelayed bool ) {
466
+ // used for testing purposes
467
+ r .Mutex .Lock ()
468
+ defer r .Mutex .Unlock ()
469
+ r .isDelayedNode = isDelayed
470
+ }
0 commit comments