-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintervalroutine.go
165 lines (151 loc) · 4.34 KB
/
intervalroutine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Package goodroutine provide some generic maintenance utilities generally useful for server implementations.
// The goal of this package is to provide predictability of initialization, intervals, retries, and panic recovery
// to maintenance functions that would otherwise just run bare in goroutines.
// It also includes often needed aspects of server's health.
//
// Features include:
// - interval-based goroutine that safely runs a function
// - threshold based up / down healthcheck
//
package goodroutine
import (
"fmt"
"runtime/debug"
"sync"
"time"
)
// Runner implements a function that is run at interval
type Runner interface {
IntervalRun() error
}
// The RunnerFunc type is an adapter to allow the use of
// ordinary functions as Runner. If f is a function
// with the appropriate signature, RunnerFunc(f) is a
// Runner that calls f.
type RunnerFunc func() error
// IntervalRun implements the Runner interface
func (rf RunnerFunc) IntervalRun() error {
return rf()
}
// IntervalRoutine implements a management goroutine.
// It provides a safe way to run a function, at interval, from a single goroutine.
type IntervalRoutine struct {
runner Runner
runInterval time.Duration
retryInterval time.Duration
currentInterval time.Duration
force chan bool
done chan bool
start sync.Once
stop sync.Once
// PanicRecoverDisabled if set to true, panics are not recovered
PanicRecoverDisabled bool
// RetryBackoffDisabled if set to true, retry interval does not increase exponentially
RetryBackoffDisabled bool
OnPanic func(recovered interface{})
}
// NewIntervalRoutine creates a new IntervalRoutine.
// Runs may be triggered in 3 ways:
// - normally at each run interval
// - at the retry interval, if the last run returned an error
// - if TriggerRun was called
// A typical usage is a runInterval of 5min, retryInterval of 30sec.
// By default the retry interval increases exponentially from retryInterval up to runInterval.
// retryInterval cannot be set higher than runInterval.
func NewIntervalRoutine(runner Runner, runInterval time.Duration, retryInterval time.Duration) *IntervalRoutine {
if retryInterval > runInterval {
// wrong interval, disable custom retry
retryInterval = 0
}
return &IntervalRoutine{
runner: runner,
runInterval: runInterval,
retryInterval: retryInterval,
force: make(chan bool, 1),
done: make(chan bool, 1),
}
}
// TriggerRun triggers a run as soon as possible.
// Does nothing if a forced run is already scheduled.
func (rrt *IntervalRoutine) TriggerRun() {
select {
case rrt.force <- true:
default:
// already has a force
}
}
// Start the management routine.
func (rrt *IntervalRoutine) Start() {
rrt.start.Do(func() {
go func() {
// add a force to run once at startup, ticker will get set after
rrt.force <- true
for {
if !rrt.runSafe() {
break
}
}
}()
})
}
// Stop the management routine.
func (rrt *IntervalRoutine) Stop() {
rrt.stop.Do(func() {
close(rrt.done)
})
}
func (rrt *IntervalRoutine) runSafe() bool {
if !rrt.PanicRecoverDisabled {
// recover any panic
defer func() {
if r := recover(); r != nil {
if rrt.OnPanic != nil {
rrt.OnPanic(r)
} else {
fmt.Printf("recovered: %v, stack: %s\n", r, debug.Stack())
}
}
}()
}
var err error
var timerC <-chan time.Time
if rrt.currentInterval > 0 {
timer := time.NewTimer(rrt.currentInterval)
timerC = timer.C
defer timer.Stop()
}
select {
case <-timerC:
select {
case <-rrt.done:
return false
default:
}
err = rrt.runner.IntervalRun()
case <-rrt.force:
select {
case <-rrt.done:
return false
default:
}
err = rrt.runner.IntervalRun()
case <-rrt.done:
return false
}
if err != nil && rrt.retryInterval > 0 {
retryInterval := rrt.retryInterval
// rrt.currentInterval == rrt.runInterval on the first retry only
if !rrt.RetryBackoffDisabled && rrt.currentInterval > 0 && rrt.currentInterval < rrt.runInterval {
// backoff, starting from rrt.retryInterval, up to rrt.runInterval
retryInterval = rrt.currentInterval * 2
if retryInterval >= rrt.runInterval {
// set the interval just under run interval to differentiate
retryInterval = rrt.runInterval - 1
}
}
rrt.currentInterval = retryInterval
} else {
rrt.currentInterval = rrt.runInterval
}
return true
}