diff --git a/.github/workflows/bench.yaml b/.github/workflows/bench.yaml index dd700e38..3c4687e1 100644 --- a/.github/workflows/bench.yaml +++ b/.github/workflows/bench.yaml @@ -41,8 +41,8 @@ jobs: - name: Collect Docker Compose logs if: failure() run: docker-compose -f ./tests/docker-compose.yaml logs || true - - name: Upload artifact + - name: Upload artifacts uses: actions/upload-artifact@v2 with: name: BenchmarkStressForBasicFunc Profile - path: ./BenchmarkStressForBasicFunc.pprof + path: ./benchmark/*.pprof diff --git a/.gitignore b/.gitignore index 6326197c..04ef68fb 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,5 @@ dist .run bin/ .DS_Store + +benchmark/*.pprof \ No newline at end of file diff --git a/Makefile b/Makefile index 2d01e990..2b636bd9 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,10 @@ test: go test -race ./... -timeout 10m bench: - export FS_TEST_WORK_DIR="$(shell pwd)" && go test -bench=. ./benchmark -timeout 10m + go test -bench=. ./benchmark -timeout 10m + +bench_race: + go test -race -bench=. ./benchmark -timeout 10m gen_rest_client: mkdir -p restclient diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index ae3bdf4a..25062caf 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -23,7 +23,6 @@ import ( "github.com/functionstream/functionstream/perf" "github.com/functionstream/functionstream/restclient" "github.com/functionstream/functionstream/server" - "log/slog" "math/rand" "os" "runtime/pprof" @@ -32,21 +31,7 @@ import ( "time" ) -func prepareEnv() { - workingDirectory := os.Getenv("FS_TEST_WORK_DIR") - if workingDirectory != "" { - err := os.Chdir(workingDirectory) - slog.Info("Changing working directory", "working-dir", workingDirectory) - if err != nil { - panic(err) - } - } - -} - func BenchmarkStressForBasicFunc(b *testing.B) { - prepareEnv() - s := server.New(server.LoadConfigFromEnv()) go s.Run() defer func() { @@ -82,7 +67,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) { PulsarURL: "pulsar://localhost:6650", RequestRate: 200000.0, Func: &restclient.Function{ - Archive: "./bin/example_basic.wasm", + Archive: "../bin/example_basic.wasm", Inputs: []string{inputTopic}, Output: outputTopic, Replicas: &replicas, @@ -94,7 +79,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) defer cancel() - profile := "BenchmarkStressForBasicFunc.pprof" + profile := b.Name() + ".pprof" file, err := os.Create(profile) if err != nil { b.Fatal(err) @@ -114,8 +99,6 @@ func BenchmarkStressForBasicFunc(b *testing.B) { } func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { - prepareEnv() - memoryQueueFactory := lib.NewMemoryQueueFactory(context.Background()) svrConf := &lib.Config{ @@ -142,7 +125,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { pConfig := &perf.Config{ RequestRate: 200000.0, Func: &restclient.Function{ - Archive: "./bin/example_basic.wasm", + Archive: "../bin/example_basic.wasm", Inputs: []string{inputTopic}, Output: outputTopic, Replicas: &replicas, @@ -157,7 +140,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) defer cancel() - profile := "BenchmarkStressForBasicFunc.pprof" + profile := b.Name() + ".pprof" file, err := os.Create(profile) if err != nil { b.Fatal(err) diff --git a/common/chan_utils.go b/common/chan_utils.go new file mode 100644 index 00000000..6385b996 --- /dev/null +++ b/common/chan_utils.go @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import "context" + +func SendToChannel[T any](ctx context.Context, c chan<- T, e interface{}) bool { + select { + case c <- e.(T): // It will panic if `e` is not of type `T` or a type that can be converted to `T`. + return true + case <-ctx.Done(): + close(c) + return false + } +} + +func zeroValue[T any]() T { + var v T + return v +} + +func ReceiveFromChannel[T any](ctx context.Context, c <-chan T) (T, bool) { + select { + case e := <-c: + return e, true + case <-ctx.Done(): + return zeroValue[T](), false + } +} diff --git a/perf/perf.go b/perf/perf.go index de5db756..7eb3dced 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "github.com/bmizerany/perks/quantile" + "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/lib" "github.com/functionstream/functionstream/restclient" "golang.org/x/time/rate" @@ -196,30 +197,6 @@ func (p *perf) Run(ctx context.Context) { } -func SendToChannel[T any](ctx context.Context, c chan<- T, e interface{}) bool { - select { - case c <- e.(T): // It will panic if `e` is not of type `T` or a type that can be converted to `T`. - return true - case <-ctx.Done(): - close(c) - return false - } -} - -func zeroValue[T any]() T { - var v T - return v -} - -func ReceiveFromChannel[T any](ctx context.Context, c <-chan T) (T, bool) { - select { - case e := <-c: - return e, true - case <-ctx.Done(): - return zeroValue[T](), false - } -} - func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failureCount *int64) { limiter := rate.NewLimiter(rate.Limit(p.config.RequestRate), int(p.config.RequestRate)) @@ -240,11 +217,11 @@ func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failur os.Exit(1) } start := time.Now() - if !SendToChannel(ctx, p.input, lib.NewAckableEvent(jsonBytes, func() {})) { + if !common.SendToChannel(ctx, p.input, lib.NewAckableEvent(jsonBytes, func() {})) { return } go func() { - e, ok := ReceiveFromChannel(ctx, p.output) + e, ok := common.ReceiveFromChannel(ctx, p.output) if !ok { return } diff --git a/server/server.go b/server/server.go index 24b0f1b3..00c4f182 100644 --- a/server/server.go +++ b/server/server.go @@ -31,6 +31,7 @@ import ( type Server struct { manager *lib.FunctionManager config *lib.Config + httpSvr *http.Server } func New(config *lib.Config) *Server { @@ -48,7 +49,7 @@ func (s *Server) Run() { slog.Info("Hello, Function Stream!") err := s.startRESTHandlers() if err != nil { - slog.Error("Error starting REST handlers", err) + slog.Error("Error starting REST handlers", "error", err) } } @@ -151,11 +152,22 @@ func (s *Server) startRESTHandlers() error { } }).Methods("GET") - return http.ListenAndServe(s.config.ListenAddr, r) + httpSvr := &http.Server{ + Addr: s.config.ListenAddr, + Handler: r, + } + s.httpSvr = httpSvr + + return httpSvr.ListenAndServe() } func (s *Server) Close() error { slog.Info("Shutting down function stream server") + if s.httpSvr != nil { + if err := s.httpSvr.Close(); err != nil { + return err + } + } return nil }