Skip to content

Commit ae28926

Browse files
authored
refactor: improve logging (#146)
Signed-off-by: Zike Yang <zike@apache.org>
1 parent 4e286dd commit ae28926

File tree

9 files changed

+175
-63
lines changed

9 files changed

+175
-63
lines changed

common/utils.go

+31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,37 @@
1616

1717
package common
1818

19+
import "log/slog"
20+
1921
func OptionalStr(s string) *string {
2022
return &s
2123
}
24+
25+
type expensive struct {
26+
slog.LogValuer
27+
f func() slog.Value
28+
}
29+
30+
func (e *expensive) LogValue() slog.Value {
31+
return e.f()
32+
}
33+
34+
func Expensive(f func() slog.Value) slog.LogValuer {
35+
e := &expensive{}
36+
e.f = f
37+
return e
38+
}
39+
40+
type logCounter struct {
41+
slog.LogValuer
42+
count int
43+
}
44+
45+
func (l *logCounter) LogValue() slog.Value {
46+
l.count++
47+
return slog.IntValue(l.count)
48+
}
49+
50+
func LogCounter() slog.LogValuer {
51+
return &logCounter{}
52+
}

fs/api/instance.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ import (
2020
"github.com/functionstream/function-stream/common/model"
2121
"github.com/functionstream/function-stream/fs/contube"
2222
"golang.org/x/net/context"
23+
"log/slog"
2324
)
2425

2526
type FunctionInstance interface {
2627
Context() context.Context
2728
Definition() *model.Function
29+
Index() int32
2830
Stop()
2931
Run(factory FunctionRuntimeFactory)
3032
WaitForReady() <-chan error
33+
Logger() *slog.Logger
3134
}
3235

3336
type FunctionInstanceFactory interface {
34-
NewFunctionInstance(f *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32) FunctionInstance
37+
NewFunctionInstance(f *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance
3538
}

fs/instance_impl.go

+30-13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package fs
1919
import (
2020
"context"
2121
"fmt"
22+
"github.com/functionstream/function-stream/common"
2223
"github.com/functionstream/function-stream/common/model"
2324
"github.com/functionstream/function-stream/fs/api"
2425
"github.com/functionstream/function-stream/fs/contube"
@@ -27,27 +28,35 @@ import (
2728
)
2829

2930
type FunctionInstanceImpl struct {
31+
api.FunctionInstance
3032
ctx context.Context
3133
cancelFunc context.CancelFunc
3234
definition *model.Function
3335
sourceFactory contube.SourceTubeFactory
3436
sinkFactory contube.SinkTubeFactory
3537
readyCh chan error
3638
index int32
39+
parentLog *slog.Logger
40+
log *slog.Logger
3741
}
3842

3943
type CtxKey string
4044

45+
const (
46+
CtxKeyFunctionName CtxKey = "function-name"
47+
CtxKeyInstanceIndex CtxKey = "instance-index"
48+
)
49+
4150
type DefaultInstanceFactory struct{}
4251

4352
func NewDefaultInstanceFactory() api.FunctionInstanceFactory {
4453
return &DefaultInstanceFactory{}
4554
}
4655

47-
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32) api.FunctionInstance {
56+
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32, logger *slog.Logger) api.FunctionInstance {
4857
ctx, cancelFunc := context.WithCancel(context.Background())
49-
ctx = context.WithValue(ctx, CtxKey("function-name"), definition.Name)
50-
ctx = context.WithValue(ctx, CtxKey("function-index"), index)
58+
ctx = context.WithValue(ctx, CtxKeyFunctionName, definition.Name)
59+
ctx = context.WithValue(ctx, CtxKeyInstanceIndex, index)
5160
return &FunctionInstanceImpl{
5261
ctx: ctx,
5362
cancelFunc: cancelFunc,
@@ -56,18 +65,11 @@ func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function,
5665
sinkFactory: sinkFactory,
5766
readyCh: make(chan error),
5867
index: index,
68+
parentLog: logger,
69+
log: logger.With(slog.String("component", "function-instance")),
5970
}
6071
}
6172

62-
func handleErr(ctx context.Context, err error, message string, args ...interface{}) {
63-
if errors.Is(err, context.Canceled) {
64-
slog.InfoContext(ctx, "function instance has been stopped")
65-
return
66-
}
67-
extraArgs := append(args, slog.Error)
68-
slog.ErrorContext(ctx, message, extraArgs...)
69-
}
70-
7173
func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory) {
7274
runtime, err := runtimeFactory.NewFunctionRuntime(instance)
7375
if err != nil {
@@ -100,10 +102,16 @@ func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFact
100102
}
101103

102104
close(instance.readyCh)
105+
defer instance.log.InfoContext(instance.ctx, "function instance has been stopped")
106+
logCounter := common.LogCounter()
103107
for e := range sourceChan {
108+
instance.log.DebugContext(instance.ctx, "calling process function", slog.Any("count", logCounter))
104109
output, err := runtime.Call(e)
105110
if err != nil {
106-
handleErr(instance.ctx, err, "Error calling process function")
111+
if errors.Is(err, context.Canceled) {
112+
return
113+
}
114+
instance.log.ErrorContext(instance.ctx, "Error calling process function", slog.Any("error", err))
107115
return
108116
}
109117
select {
@@ -120,6 +128,7 @@ func (instance *FunctionInstanceImpl) WaitForReady() <-chan error {
120128
}
121129

122130
func (instance *FunctionInstanceImpl) Stop() {
131+
instance.log.InfoContext(instance.ctx, "stopping function instance")
123132
instance.cancelFunc()
124133
}
125134

@@ -130,3 +139,11 @@ func (instance *FunctionInstanceImpl) Context() context.Context {
130139
func (instance *FunctionInstanceImpl) Definition() *model.Function {
131140
return instance.definition
132141
}
142+
143+
func (instance *FunctionInstanceImpl) Index() int32 {
144+
return instance.index
145+
}
146+
147+
func (instance *FunctionInstanceImpl) Logger() *slog.Logger {
148+
return instance.parentLog
149+
}

fs/instance_impl_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package fs
1818

1919
import (
2020
"github.com/functionstream/function-stream/common/model"
21+
"log/slog"
2122
"testing"
2223
)
2324

@@ -27,18 +28,18 @@ func TestFunctionInstanceContextSetting(t *testing.T) {
2728
Name: "test-function",
2829
}
2930
index := int32(1)
30-
instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, index)
31+
instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, index, slog.Default())
3132

3233
if instance == nil {
3334
t.Error("FunctionInstance should not be nil")
3435
}
3536

36-
if ctxValue, ok := instance.Context().Value(CtxKey("function-name")).(string); !ok || ctxValue != definition.Name {
37-
t.Errorf("Expected 'function-name' in ctx to be '%s'", definition.Name)
37+
if ctxValue, ok := instance.Context().Value(CtxKeyFunctionName).(string); !ok || ctxValue != definition.Name {
38+
t.Errorf("Expected '%s' in ctx to be '%s'", CtxKeyFunctionName, definition.Name)
3839
}
3940

40-
if ctxValue, ok := instance.Context().Value(CtxKey("function-index")).(int32); !ok || ctxValue != index {
41-
t.Errorf("Expected 'function-index' in ctx to be '%d'", index)
41+
if ctxValue, ok := instance.Context().Value(CtxKey(CtxKeyInstanceIndex)).(int32); !ok || ctxValue != index {
42+
t.Errorf("Expected '%s' in ctx to be '%d'", CtxKeyInstanceIndex, index)
4243
}
4344

4445
}

fs/manager.go

+34-17
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type FunctionManager struct {
3434
options *managerOptions
3535
functions map[string][]api.FunctionInstance //TODO: Use sync.map
3636
functionsLock sync.Mutex
37+
log *slog.Logger
3738
}
3839

3940
type managerOptions struct {
@@ -95,17 +96,28 @@ func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) {
9596
return nil, err
9697
}
9798
}
99+
log := slog.With()
100+
loadedRuntimeFact := make([]string, 0, len(options.runtimeFactoryMap))
101+
for k := range options.runtimeFactoryMap {
102+
loadedRuntimeFact = append(loadedRuntimeFact, k)
103+
}
104+
loadedTubeFact := make([]string, 0, len(options.tubeFactoryMap))
105+
for k := range options.tubeFactoryMap {
106+
loadedTubeFact = append(loadedTubeFact, k)
107+
}
108+
log.Info("Function manager created", slog.Any("runtime-factories", loadedRuntimeFact), slog.Any("tube-factories", loadedTubeFact))
98109
return &FunctionManager{
99110
options: options,
100111
functions: make(map[string][]api.FunctionInstance),
112+
log: log,
101113
}, nil
102114
}
103115

104116
func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) {
105117
get := func(t string) (contube.TubeFactory, error) {
106118
factory, exist := fm.options.tubeFactoryMap[t]
107119
if !exist {
108-
slog.ErrorContext(context.Background(), "tube factory not found", "type", t)
120+
fm.log.ErrorContext(context.Background(), "tube factory not found", "type", t)
109121
return nil, common.ErrorTubeFactoryNotFound
110122
}
111123
return factory, nil
@@ -117,20 +129,20 @@ func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube
117129
return get(*tubeConfig.Type)
118130
}
119131

120-
func (fm *FunctionManager) getRuntimeFactory(runtimeConfig *model.RuntimeConfig) (api.FunctionRuntimeFactory, error) {
121-
get := func(t string) (api.FunctionRuntimeFactory, error) {
122-
factory, exist := fm.options.runtimeFactoryMap[t]
123-
if !exist {
124-
slog.ErrorContext(context.Background(), "runtime factory not found", "type", t)
125-
return nil, common.ErrorRuntimeFactoryNotFound
126-
}
127-
return factory, nil
128-
129-
}
132+
func (fm *FunctionManager) getRuntimeType(runtimeConfig *model.RuntimeConfig) string {
130133
if runtimeConfig == nil || runtimeConfig.Type == nil {
131-
return get("default")
134+
return "default"
135+
}
136+
return *runtimeConfig.Type
137+
}
138+
139+
func (fm *FunctionManager) getRuntimeFactory(t string) (api.FunctionRuntimeFactory, error) {
140+
factory, exist := fm.options.runtimeFactoryMap[t]
141+
if !exist {
142+
fm.log.ErrorContext(context.Background(), "runtime factory not found", "type", t)
143+
return nil, common.ErrorRuntimeFactoryNotFound
132144
}
133-
return get(*runtimeConfig.Type)
145+
return factory, nil
134146
}
135147

136148
func (fm *FunctionManager) StartFunction(f *model.Function) error {
@@ -153,25 +165,30 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
153165
if err != nil {
154166
return err
155167
}
168+
runtimeType := fm.getRuntimeType(f.Runtime)
156169

157-
instance := fm.options.instanceFactory.NewFunctionInstance(f, sourceFactory, sinkFactory, i)
170+
instance := fm.options.instanceFactory.NewFunctionInstance(f, sourceFactory, sinkFactory, i, slog.With(
171+
slog.String("name", f.Name),
172+
slog.Int("index", int(i)),
173+
slog.String("runtime", runtimeType),
174+
))
158175
fm.functionsLock.Lock()
159176
fm.functions[f.Name][i] = instance
160177
fm.functionsLock.Unlock()
161-
runtimeFactory, err := fm.getRuntimeFactory(f.Runtime)
178+
runtimeFactory, err := fm.getRuntimeFactory(runtimeType)
162179
if err != nil {
163180
return err
164181
}
165182
go instance.Run(runtimeFactory)
166183
select {
167184
case <-instance.WaitForReady():
168185
if err != nil {
169-
slog.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", err.Error()))
186+
fm.log.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", err.Error()))
170187
instance.Stop()
171188
return err
172189
}
173190
case <-instance.Context().Done():
174-
slog.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", "context cancelled"))
191+
fm.log.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", "context cancelled"))
175192
return errors.New("context cancelled")
176193
}
177194
}

0 commit comments

Comments
 (0)