diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 43e234a3..4c08d748 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -137,7 +137,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { Output: outputTopic, Replicas: replicas, }, - QueueBuilder: func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) { + QueueBuilder: func(ctx context.Context) (contube.TubeFactory, error) { return memoryQueueFactory, nil }, } diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index 248d794f..c3409e4a 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -33,9 +33,37 @@ var ( } ) +type flags struct { + configFile string + loadConfigFromEnv bool +} + +var ( + config = flags{} +) + +func init() { + Cmd.Flags().StringVarP(&config.configFile, "config-file", "c", "conf/function-stream.yaml", + "path to the config file (default is conf/function-stream.yaml)") + Cmd.Flags().BoolVarP(&config.loadConfigFromEnv, "load-config-from-env", "e", false, "load config from env (default is false)") +} + func exec(*cobra.Command, []string) { common.RunProcess(func() (io.Closer, error) { - s, err := server.NewServer() + var c *server.Config + var err error + if config.loadConfigFromEnv { + c, err = server.LoadConfigFromEnv() + if err != nil { + return nil, err + } + } else { + c, err = server.LoadConfigFromFile(config.configFile) + if err != nil { + return nil, err + } + } + s, err := server.NewServer(server.WithConfig(c)) if err != nil { return nil, err } diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go index 384517ea..d2c9b890 100644 --- a/cmd/standalone/cmd.go +++ b/cmd/standalone/cmd.go @@ -35,7 +35,11 @@ var ( func exec(*cobra.Command, []string) { common.RunProcess(func() (io.Closer, error) { - s, err := server.NewServerWithConfig(server.LoadStandaloneConfigFromEnv()) + config, err := server.LoadConfigFromFile("conf/standalone.yaml") + if err != nil { + return nil, err + } + s, err := server.NewServer(server.WithConfig(config)) if err != nil { return nil, err } diff --git a/common/config.go b/common/config.go index 49a1d078..2633211e 100644 --- a/common/config.go +++ b/common/config.go @@ -16,9 +16,18 @@ package common -// Config is a struct that holds the configuration for a function stream. -type Config struct { - ListenAddr string // ListenAddr is the address that the function stream REST service will listen on. - PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube - TubeType string +type ConfigMap map[string]interface{} + +// MergeConfig merges multiple ConfigMap into one +func MergeConfig(configs ...*ConfigMap) *ConfigMap { + result := ConfigMap{} + for _, config := range configs { + if config == nil { + continue + } + for k, v := range *config { + result[k] = v + } + } + return &result } diff --git a/conf/function-stream.yaml b/conf/function-stream.yaml new file mode 100644 index 00000000..001c50dc --- /dev/null +++ b/conf/function-stream.yaml @@ -0,0 +1,29 @@ +# Copyright 2024 Function Stream Org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +listen_addr: ":7300" +tube_factory: + pulsar: + type: "pulsar" + config: + pulsar_url: "pulsar://localhost:6650" + memory: + Type: "memory" + default: + ref: "pulsar" +runtime_factory: + wasm: + type: "wasm" + default: + ref: "wasm" \ No newline at end of file diff --git a/conf/standalone.yaml b/conf/standalone.yaml new file mode 100644 index 00000000..24bfcb85 --- /dev/null +++ b/conf/standalone.yaml @@ -0,0 +1,25 @@ +# Copyright 2024 Function Stream Org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +listen_addr: ":7300" +tube_factory: + memory: + Type: "memory" + default: + ref: "memory" +runtime_factory: + wasm: + type: "wasm" + default: + ref: "wasm" \ No newline at end of file diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index 5616e4d0..be9ea1d4 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -25,19 +25,21 @@ import ( ) const ( - PulsarURLKey = "pulsarURL" + PulsarURLKey = "pulsar_url" ) type PulsarTubeFactoryConfig struct { PulsarURL string } -func NewPulsarTubeFactoryConfig(configMap ConfigMap) *PulsarTubeFactoryConfig { +func NewPulsarTubeFactoryConfig(configMap ConfigMap) (*PulsarTubeFactoryConfig, error) { var result PulsarTubeFactoryConfig if pulsarURL, ok := configMap[PulsarURLKey].(string); ok { result.PulsarURL = pulsarURL + } else { + return nil, errors.Errorf("Missing required field %s", PulsarURLKey) } - return &result + return &result, nil } func (c *PulsarTubeFactoryConfig) ToConfigMap() ConfigMap { @@ -62,7 +64,10 @@ func (f *PulsarEventQueueFactory) NewSinkTube(ctx context.Context, configMap Con } func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error) { - config := NewPulsarTubeFactoryConfig(configMap) + config, err := NewPulsarTubeFactoryConfig(configMap) + if err != nil { + return nil, err + } pc, err := pulsar.NewClient(pulsar.ClientOptions{ URL: config.PulsarURL, }) diff --git a/go.mod b/go.mod index 94560cb6..a2b1273b 100644 --- a/go.mod +++ b/go.mod @@ -35,9 +35,11 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/danieljoos/wincred v1.2.1 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/frankban/quicktest v1.14.6 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -53,6 +55,7 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -60,21 +63,32 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.12.0 // indirect + github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/viper v1.18.2 // indirect + github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/mod v0.15.0 // indirect @@ -85,6 +99,7 @@ require ( google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/apimachinery v0.29.1 // indirect diff --git a/go.sum b/go.sum index 6c88b351..7d1faa37 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,10 @@ github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= @@ -55,6 +59,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= @@ -109,6 +115,8 @@ github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/U github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -128,11 +136,15 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -147,6 +159,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -156,6 +170,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= @@ -168,16 +184,29 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= +github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -185,8 +214,12 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -194,6 +227,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -275,6 +310,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/perf/perf.go b/perf/perf.go index 61030ec5..4f130e24 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -33,7 +33,7 @@ import ( "time" ) -type TubeBuilder func(ctx context.Context, config *common.Config) (contube.TubeFactory, error) +type TubeBuilder func(ctx context.Context) (contube.TubeFactory, error) type Config struct { PulsarURL string @@ -58,7 +58,7 @@ func New(config *Config) Perf { config: config, } if config.QueueBuilder == nil { - p.tubeBuilder = func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) { + p.tubeBuilder = func(ctx context.Context) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: config.PulsarURL, }).ToConfigMap()) @@ -98,11 +98,7 @@ func (p *perf) Run(ctx context.Context) { } f.Name = name - config := &common.Config{ - PulsarURL: p.config.PulsarURL, - } - - queueFactory, err := p.tubeBuilder(ctx, config) + queueFactory, err := p.tubeBuilder(ctx) if err != nil { slog.Error( "Failed to create Record Queue Factory", diff --git a/server/config.go b/server/config.go new file mode 100644 index 00000000..7cc1bb75 --- /dev/null +++ b/server/config.go @@ -0,0 +1,125 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "github.com/functionstream/function-stream/common" + "github.com/pkg/errors" + "github.com/spf13/viper" + "k8s.io/utils/set" + "log/slog" + "os" + "strings" +) + +const ( + WASMRuntime = "wasm" + GRPCRuntime = "grpc" +) + +type FactoryConfig struct { + Ref *string `mapstructure:"ref"` + Type *string `mapstructure:"type"` + Config *common.ConfigMap `mapstructure:"config"` +} + +type Config struct { + // ListenAddr is the address that the function stream REST service will listen on. + ListenAddr string `mapstructure:"listen_addr"` + + // TubeFactory is the list of tube factories that the function stream server will use. + TubeFactory map[string]*FactoryConfig `mapstructure:"tube_factory"` + + // RuntimeFactory is the list of runtime factories that the function stream server will use. + RuntimeFactory map[string]*FactoryConfig `mapstructure:"runtime_factory"` +} + +func init() { + viper.SetDefault("ListenAddr", "7300") +} + +func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig, supportedTypes set.Set[string]) error { + for name, factory := range m { + if ref := factory.Ref; ref != nil && *ref != "" { + referred, ok := m[strings.ToLower(*ref)] + if !ok { + return errors.Errorf("%s factory %s refers to non-existent factory %s", n, name, *ref) + } + if factory.Type == nil { + factory.Type = referred.Type + } + factory.Config = common.MergeConfig(referred.Config, factory.Config) + } + } + + for name, factory := range m { + if factory.Type == nil { + return errors.Errorf("%s factory %s has no type", n, name) + } + if !supportedTypes.Has(strings.ToLower(*factory.Type)) { + return errors.Errorf("%s factory %s has unsupported type %s", n, name, *factory.Type) + } + } + return nil +} + +func (c *Config) preprocessConfig() error { + if c.ListenAddr == "" { + return errors.New("ListenAddr shouldn't be empty") + } + err := preprocessFactoriesConfig("Tube", c.TubeFactory, set.New[string](common.PulsarTubeType, common.MemoryTubeType)) + if err != nil { + return err + } + return preprocessFactoriesConfig("Runtime", c.RuntimeFactory, set.New[string](WASMRuntime, GRPCRuntime)) +} + +func loadConfig() (*Config, error) { + var c Config + if err := viper.Unmarshal(&c); err != nil { + return nil, err + } + if err := c.preprocessConfig(); err != nil { + return nil, err + } + return &c, nil +} + +const envPrefix = "FS_" + +func LoadConfigFromFile(filePath string) (*Config, error) { + viper.SetConfigFile(filePath) + if err := viper.ReadInConfig(); err != nil { + return nil, err + } + return loadConfig() +} + +func LoadConfigFromEnv() (*Config, error) { + for _, env := range os.Environ() { + if strings.HasPrefix(env, "FS_") { + parts := strings.SplitN(strings.TrimPrefix(env, envPrefix), "=", 2) + key := parts[0] + value := parts[1] + + slog.Info("Loading environment variable", "key", key, "value", value) + viper.Set(strings.Replace(key, "__", ".", -1), value) + } + } + + return loadConfig() +} diff --git a/server/config_loader.go b/server/config_loader.go deleted file mode 100644 index 4606defe..00000000 --- a/server/config_loader.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2024 Function Stream Org. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package server - -import ( - "github.com/functionstream/function-stream/common" - "log/slog" - "os" - "sync" -) - -var loadedConfig *common.Config -var initConfig = sync.Once{} - -func LoadConfigFromEnv() *common.Config { - initConfig.Do(func() { - loadedConfig = &common.Config{ - ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), - PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL), - TubeType: getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType), - } - }) - return loadedConfig -} - -func LoadStandaloneConfigFromEnv() *common.Config { - initConfig.Do(func() { - loadedConfig = &common.Config{ - ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), - TubeType: getEnvWithDefault("TUBE_TYPE", common.MemoryTubeType), - } - }) - return loadedConfig -} - -func getEnvWithDefault(key string, defaultVal string) string { - if value, ok := os.LookupEnv(key); ok { - return value - } - slog.Info("Environment variable not found, using the default value:", key, defaultVal) - return defaultVal -} diff --git a/server/config_test.go b/server/config_test.go new file mode 100644 index 00000000..291be0ed --- /dev/null +++ b/server/config_test.go @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestLoadConfigFromYaml(t *testing.T) { + c, err := LoadConfigFromFile("../tests/test_config.yaml") + require.Nil(t, err) + assertConfig(t, c) +} + +func TestLoadConfigFromJson(t *testing.T) { + c, err := LoadConfigFromFile("../tests/test_config.json") + require.Nil(t, err) + assertConfig(t, c) +} + +func TestLoadConfigFromEnv(t *testing.T) { + assert.Nil(t, os.Setenv("FS_LISTEN_ADDR", ":17300")) + assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_PULSAR__TYPE", "pulsar")) + assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_PULSAR__CONFIG__PULSAR_URL", "pulsar://localhost:6651")) + assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__MY_MEMORY__TYPE", "memory")) + assert.Nil(t, os.Setenv("FS_TUBE_FACTORY__DEFAULT__REF", "my_pulsar")) + + viper.AutomaticEnv() + + c, err := LoadConfigFromEnv() + require.Nil(t, err) + assertConfig(t, c) +} + +func assertConfig(t *testing.T, c *Config) { + assert.Equal(t, ":17300", c.ListenAddr) + require.Contains(t, c.TubeFactory, "my_pulsar") + assert.Equal(t, "pulsar", *c.TubeFactory["my_pulsar"].Type) + + if config := c.TubeFactory["my_pulsar"].Config; config != nil { + assert.Equal(t, "pulsar://localhost:6651", (*config)["pulsar_url"]) + } else { + t.Fatal("pulsar config is nil") + } + + require.Contains(t, c.TubeFactory, "my_memory") + assert.Equal(t, "memory", *c.TubeFactory["my_memory"].Type) + + require.Contains(t, c.TubeFactory, "default") + assert.Equal(t, "my_pulsar", *c.TubeFactory["default"].Ref) + if config := c.TubeFactory["default"].Config; config != nil { + assert.Equal(t, "pulsar://localhost:6651", (*config)["pulsar_url"]) + } else { + t.Fatal("pulsar config is nil") + } +} diff --git a/server/server.go b/server/server.go index d5e9855e..8249f77a 100644 --- a/server/server.go +++ b/server/server.go @@ -22,13 +22,17 @@ import ( "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs" + "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" + "github.com/functionstream/function-stream/fs/runtime/wazero" "github.com/go-openapi/spec" "github.com/pkg/errors" + "k8s.io/utils/set" "log/slog" "net" "net/http" "net/url" + "strings" "sync/atomic" "time" ) @@ -82,6 +86,90 @@ func WithHttpTubeFactory(factory *contube.HttpTubeFactory) ServerOption { }) } +func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[string]) (string, error) { + if visited.Has(name) { + return "", errors.Errorf("circular reference of factory %s", name) + } + visited.Insert(name) + f, ok := m[name] + if !ok { + return "", errors.Errorf("tube factory %s not found", name) + } + if f.Ref != nil { + return getRefFactory(m, strings.ToLower(*f.Ref), visited) + } + return name, nil +} + +func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string, c *FactoryConfig) (T, error), setup func(n string, f T)) error { + factoryMap := make(map[string]T) + + for name := range m { + refName, err := getRefFactory(m, name, set.New[string]()) + if err != nil { + return err + } + if _, ok := factoryMap[refName]; !ok { + fc, exist := m[refName] + if !exist { + return errors.Errorf("factory %s not found, which the factory %s is pointed to", refName, name) + } + f, err := newFactory(refName, fc) + if err != nil { + return err + } + factoryMap[refName] = f + } + factoryMap[name] = factoryMap[refName] + setup(name, factoryMap[name]) + } + return nil +} + +func WithConfig(config *Config) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + ln, err := net.Listen("tcp", config.ListenAddr) + if err != nil { + return nil, err + } + o.httpListener = ln + err = initFactories[contube.TubeFactory](config.TubeFactory, func(n string, c *FactoryConfig) (contube.TubeFactory, error) { + if c.Type == nil { + return nil, errors.Errorf("tube factory %s type is not set", n) + } + switch strings.ToLower(*c.Type) { + case common.PulsarTubeType: + return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config)) + case common.MemoryTubeType: + return contube.NewMemoryQueueFactory(context.Background()), nil + } + return nil, errors.Errorf("unsupported tube type %s", *c.Type) + }, func(n string, f contube.TubeFactory) { + o.managerOpts = append(o.managerOpts, fs.WithTubeFactory(n, f)) + }) + if err != nil { + return nil, err + } + err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, + func(n string, c *FactoryConfig) (api.FunctionRuntimeFactory, error) { + if c.Type == nil { + return nil, errors.Errorf("runtime factory %s type is not set", n) + } + switch strings.ToLower(*c.Type) { + case WASMRuntime: + return wazero.NewWazeroFunctionRuntimeFactory(), nil + } + return nil, errors.Errorf("unsupported runtime type %s", *c.Type) + }, func(n string, f api.FunctionRuntimeFactory) { + o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) + }) + if err != nil { + return nil, err + } + return o, nil + }) +} + func NewServer(opts ...ServerOption) (*Server, error) { options := &serverOptions{} httpTubeFact := contube.NewHttpTubeFactory(context.Background()) @@ -116,36 +204,30 @@ func NewServer(opts ...ServerOption) (*Server, error) { }, nil } -func NewServerWithConfig(config *common.Config) (*Server, error) { - ln, err := net.Listen("tcp", config.ListenAddr) - if err != nil { - return nil, err - } - - var tubeFactory contube.TubeFactory - switch config.TubeType { - case common.PulsarTubeType: - tubeFactory, err = contube.NewPulsarEventQueueFactory(context.Background(), (&contube.PulsarTubeFactoryConfig{ - PulsarURL: config.PulsarURL, - }).ToConfigMap()) - if err != nil { - return nil, errors.Wrap(err, "failed to create default pulsar tube factory") - } - case common.MemoryTubeType: - tubeFactory = contube.NewMemoryQueueFactory(context.Background()) - } - managerOpts := []fs.ManagerOption{ - fs.WithDefaultTubeFactory(tubeFactory), - } - - return NewServer( - WithHttpListener(ln), - WithFunctionManager(managerOpts...), - ) -} - func NewDefaultServer() (*Server, error) { - return NewServerWithConfig(LoadConfigFromEnv()) + defaultConfig := &Config{ + ListenAddr: ":7300", + TubeFactory: map[string]*FactoryConfig{ + "pulsar": { + Type: common.OptionalStr(common.PulsarTubeType), + Config: &common.ConfigMap{ + contube.PulsarURLKey: "pulsar://localhost:6650", + }, + }, + "default": { + Ref: common.OptionalStr("pulsar"), + }, + }, + RuntimeFactory: map[string]*FactoryConfig{ + "wasm": { + Type: common.OptionalStr(WASMRuntime), + }, + "default": { + Ref: common.OptionalStr("wasm"), + }, + }, + } + return NewServer(WithConfig(defaultConfig)) } func (s *Server) Run(context context.Context) { diff --git a/tests/test_config.json b/tests/test_config.json new file mode 100644 index 00000000..8c89ab5d --- /dev/null +++ b/tests/test_config.json @@ -0,0 +1,17 @@ +{ + "listen_addr": ":17300", + "tube_factory": { + "my_pulsar": { + "type": "pulsar", + "config": { + "pulsar_url": "pulsar://localhost:6651" + } + }, + "my_memory": { + "type": "memory" + }, + "default": { + "ref": "my_pulsar" + } + } +} \ No newline at end of file diff --git a/tests/test_config.yaml b/tests/test_config.yaml new file mode 100644 index 00000000..a9a0dd12 --- /dev/null +++ b/tests/test_config.yaml @@ -0,0 +1,24 @@ +# Copyright 2024 Function Stream Org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +listen_addr: ":17300" +tube_factory: + my_pulsar: + type: "pulsar" + config: + pulsar_url: "pulsar://localhost:6651" + my_memory: + type: "memory" + default: + ref: "my_pulsar" \ No newline at end of file