diff --git a/Makefile b/Makefile index 2b636bd9..cec4a5bb 100644 --- a/Makefile +++ b/Makefile @@ -41,5 +41,18 @@ gen_rest_client: rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \ restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test +proto: + for PROTO_FILE in $$(find . -name '*.proto'); do \ + echo "generating codes for $$PROTO_FILE"; \ + protoc \ + --go_out=. \ + --go_opt paths=source_relative \ + --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" \ + --go-grpc_out=. \ + --go-grpc_opt paths=source_relative \ + --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" \ + $$PROTO_FILE; \ + done + license: ./license-checker/license-checker.sh diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index a757d5cd..8ac8d36d 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -35,7 +35,10 @@ import ( ) func BenchmarkStressForBasicFunc(b *testing.B) { - s := server.New(server.LoadConfigFromEnv()) + s, err := server.NewServer(server.LoadConfigFromEnv()) + if err != nil { + b.Fatal(err) + } svrCtx, svrCancel := context.WithCancel(context.Background()) go s.Run(svrCtx) defer func() { @@ -103,14 +106,18 @@ func BenchmarkStressForBasicFunc(b *testing.B) { func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background()) - svrConf := &fs.Config{ + svrConf := &common.Config{ ListenAddr: common.DefaultAddr, - TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { - return memoryQueueFactory, nil - }, } - s := server.New(svrConf) + fm, err := fs.NewFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory)) + if err != nil { + b.Fatal(err) + } + s, err := server.NewServer(svrConf, server.WithFunctionManager(fm)) + if err != nil { + b.Fatal(err) + } svrCtx, svrCancel := context.WithCancel(context.Background()) go s.Run(svrCtx) defer func() { @@ -130,7 +137,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { Output: outputTopic, Replicas: &replicas, }, - QueueBuilder: func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { + QueueBuilder: func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) { return memoryQueueFactory, nil }, } diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index cf3aaca5..8e7f0f99 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -35,7 +35,10 @@ var ( func exec(*cobra.Command, []string) { common.RunProcess(func() (io.Closer, error) { - s := server.New(server.LoadConfigFromEnv()) + s, err := server.NewServer(server.LoadConfigFromEnv()) + if err != nil { + return nil, err + } go s.Run(context.Background()) return s, nil }) diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go index 74527225..1127178a 100644 --- a/cmd/standalone/cmd.go +++ b/cmd/standalone/cmd.go @@ -35,7 +35,10 @@ var ( func exec(*cobra.Command, []string) { common.RunProcess(func() (io.Closer, error) { - s := server.New(server.LoadStandaloneConfigFromEnv()) + s, err := server.NewServer(server.LoadStandaloneConfigFromEnv()) + if err != nil { + return nil, err + } go s.Run(context.Background()) return s, nil }) diff --git a/fs/config.go b/common/config.go similarity index 59% rename from fs/config.go rename to common/config.go index d9420e9f..49a1d078 100644 --- a/fs/config.go +++ b/common/config.go @@ -14,18 +14,11 @@ * limitations under the License. */ -package fs - -import ( - "context" - "github.com/functionstream/functionstream/fs/contube" -) - -type TubeBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error) +package common // Config is a struct that holds the configuration for a function stream. type Config struct { - ListenAddr string // ListenAddr is the address that the function stream REST service will listen on. - PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube - TubeBuilder TubeBuilder // TubeBuilder is a function that will be used to build the tube. + ListenAddr string // ListenAddr is the address that the function stream REST service will listen on. + PulsarURL string // PulsarURL is the URL of the Pulsar service. It's used for the pulsar_tube + TubeType string } diff --git a/common/constants.go b/common/constants.go index 7f376e1a..26e941e5 100644 --- a/common/constants.go +++ b/common/constants.go @@ -18,6 +18,7 @@ package common const ( PulsarTubeType = "pulsar" + MemoryTubeType = "memory" DefaultAddr = "localhost:7300" DefaultPulsarURL = "pulsar://localhost:6650" diff --git a/common/model/function.go b/common/model/function.go index fd475540..69546584 100644 --- a/common/model/function.go +++ b/common/model/function.go @@ -17,9 +17,13 @@ package model type Function struct { - Name string - Archive string - Inputs []string + Name string + Archive string + Source map[string]any + Sink map[string]any + // Deprecate + Inputs []string + // Deprecate Output string Config map[string]string Replicas int32 diff --git a/fs/api/instance.go b/fs/api/instance.go new file mode 100644 index 00000000..a2c83bb6 --- /dev/null +++ b/fs/api/instance.go @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package api + +import ( + "github.com/functionstream/functionstream/common/model" + "github.com/functionstream/functionstream/fs/contube" + "golang.org/x/net/context" +) + +type FunctionInstance interface { + Context() context.Context + Definition() *model.Function + Stop() + Run(factory FunctionRuntimeFactory) + WaitForReady() <-chan error +} + +type FunctionInstanceFactory interface { + NewFunctionInstance(f *model.Function, queueFactory contube.TubeFactory, i int32) FunctionInstance +} diff --git a/fs/api/runtime.go b/fs/api/runtime.go new file mode 100644 index 00000000..4cae72a2 --- /dev/null +++ b/fs/api/runtime.go @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package api + +import ( + "github.com/functionstream/functionstream/fs/contube" +) + +type FunctionRuntime interface { + WaitForReady() <-chan error + Call(e contube.Record) (contube.Record, error) + Stop() +} + +type FunctionRuntimeFactory interface { + NewFunctionRuntime(instance FunctionInstance) (FunctionRuntime, error) +} diff --git a/fs/contube/event_tube.go b/fs/contube/contube.go similarity index 100% rename from fs/contube/event_tube.go rename to fs/contube/contube.go diff --git a/fs/contube/memory_tube.go b/fs/contube/memory.go similarity index 100% rename from fs/contube/memory_tube.go rename to fs/contube/memory.go diff --git a/fs/contube/pulsar_tube.go b/fs/contube/pulsar.go similarity index 100% rename from fs/contube/pulsar_tube.go rename to fs/contube/pulsar.go diff --git a/fs/instance.go b/fs/instance.go deleted file mode 100644 index 23493192..00000000 --- a/fs/instance.go +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2024 Function Stream Org. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fs - -import ( - "context" - "fmt" - "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/fs/contube" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" - "github.com/tetratelabs/wazero/sys" - "log/slog" - "os" -) - -type FunctionInstance struct { - ctx context.Context - cancelFunc context.CancelFunc - definition *model.Function - tubeFactory contube.TubeFactory - readyCh chan error - index int32 -} - -func NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) *FunctionInstance { - ctx, cancelFunc := context.WithCancel(context.Background()) - ctx.Value(logrus.Fields{ - "function-name": definition.Name, - "function-index": index, - }) - return &FunctionInstance{ - ctx: ctx, - cancelFunc: cancelFunc, - definition: definition, - tubeFactory: queueFactory, - readyCh: make(chan error), - index: index, - } -} - -func (instance *FunctionInstance) Run() { - r := wazero.NewRuntime(instance.ctx) - defer func(runtime wazero.Runtime, ctx context.Context) { - err := runtime.Close(ctx) - if err != nil { - slog.ErrorContext(ctx, "Error closing r", err) - } - }(r, instance.ctx) - - _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, m api.Module, a, b, c, d uint32) { - panic("abort") - }).Export("abort").Instantiate(instance.ctx) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error instantiating function module") - return - } - - stdin := common.NewChanReader() - stdout := common.NewChanWriter() - - config := wazero.NewModuleConfig(). - WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr) - - wasi_snapshot_preview1.MustInstantiate(instance.ctx, r) - - wasmBytes, err := os.ReadFile(instance.definition.Archive) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error reading wasm file") - return - } - - handleErr := func(ctx context.Context, err error, message string, args ...interface{}) { - if errors.Is(err, context.Canceled) { - slog.InfoContext(instance.ctx, "function instance has been stopped") - return - } - extraArgs := append(args, slog.Any("error", err.Error())) - slog.ErrorContext(ctx, message, extraArgs...) - } - - // Trigger the "_start" function, WASI's "main". - mod, err := r.InstantiateWithConfig(instance.ctx, wasmBytes, config) - if err != nil { - if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 { - handleErr(instance.ctx, err, "Function exit with code", "code", exitErr.ExitCode()) - } else if !ok { - handleErr(instance.ctx, err, "Error instantiating function") - } - return - } - process := mod.ExportedFunction("process") - if process == nil { - instance.readyCh <- errors.New("No process function found") - return - } - - sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap()) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating source event queue") - return - } - sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap()) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") - return - } - defer close(sinkChan) - - instance.readyCh <- nil - for e := range sourceChan { - stdin.ResetBuffer(e.GetPayload()) - _, err = process.Call(instance.ctx) - if err != nil { - handleErr(instance.ctx, err, "Error calling process function") - return - } - output := stdout.GetAndReset() - sinkChan <- contube.NewRecordImpl(output, e.Commit) - } -} - -func (instance *FunctionInstance) WaitForReady() <-chan error { - return instance.readyCh -} - -func (instance *FunctionInstance) Stop() { - instance.cancelFunc() -} diff --git a/fs/instance_impl.go b/fs/instance_impl.go new file mode 100644 index 00000000..2c108716 --- /dev/null +++ b/fs/instance_impl.go @@ -0,0 +1,124 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/common/model" + "github.com/functionstream/functionstream/fs/api" + "github.com/functionstream/functionstream/fs/contube" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "log/slog" +) + +type FunctionInstanceImpl struct { + ctx context.Context + cancelFunc context.CancelFunc + definition *model.Function + tubeFactory contube.TubeFactory + readyCh chan error + index int32 +} + +type DefaultInstanceFactory struct{} + +func NewDefaultInstanceFactory() api.FunctionInstanceFactory { + return &DefaultInstanceFactory{} +} + +func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, queueFactory contube.TubeFactory, index int32) api.FunctionInstance { + ctx, cancelFunc := context.WithCancel(context.Background()) + ctx.Value(logrus.Fields{ + "function-name": definition.Name, + "function-index": index, + }) + return &FunctionInstanceImpl{ + ctx: ctx, + cancelFunc: cancelFunc, + definition: definition, + tubeFactory: queueFactory, + readyCh: make(chan error), + index: index, + } +} + +func handleErr(ctx context.Context, err error, message string, args ...interface{}) { + if errors.Is(err, context.Canceled) { + slog.InfoContext(ctx, "function instance has been stopped") + return + } + extraArgs := append(args, slog.Error) + slog.ErrorContext(ctx, message, extraArgs...) +} + +func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory) { + runtime, err := runtimeFactory.NewFunctionRuntime(instance) + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error creating runtime") + return + } + + sourceChan, err := instance.tubeFactory.NewSourceTube(instance.ctx, (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap()) + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error creating source event queue") + return + } + sinkChan, err := instance.tubeFactory.NewSinkTube(instance.ctx, (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap()) + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") + return + } + defer close(sinkChan) + err = <-runtime.WaitForReady() + if err != nil { + instance.readyCh <- errors.Wrap(err, "Error waiting for runtime to be ready") + return + } + + close(instance.readyCh) + for e := range sourceChan { + output, err := runtime.Call(e) + if err != nil { + handleErr(instance.ctx, err, "Error calling process function") + return + } + select { + case sinkChan <- output: + case <-instance.ctx.Done(): + return + } + + } +} + +func (instance *FunctionInstanceImpl) WaitForReady() <-chan error { + return instance.readyCh +} + +func (instance *FunctionInstanceImpl) Stop() { + instance.cancelFunc() +} + +func (instance *FunctionInstanceImpl) Context() context.Context { + return instance.ctx +} + +func (instance *FunctionInstanceImpl) Definition() *model.Function { + return instance.definition +} diff --git a/fs/manager.go b/fs/manager.go index 527718c5..d4ca46d4 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -20,7 +20,9 @@ import ( "context" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" + "github.com/functionstream/functionstream/fs/api" "github.com/functionstream/functionstream/fs/contube" + "github.com/functionstream/functionstream/fs/runtime/wazero" "log/slog" "math/rand" "strconv" @@ -28,19 +30,73 @@ import ( ) type FunctionManager struct { - functions map[string][]*FunctionInstance + options *managerOptions + functions map[string][]api.FunctionInstance //TODO: Use sync.map functionsLock sync.Mutex - tubeFactory contube.TubeFactory } -func NewFunctionManager(config *Config) (*FunctionManager, error) { - tubeFactory, err := config.TubeBuilder(context.Background(), config) - if err != nil { - return nil, err +type managerOptions struct { + tubeFactoryMap map[string]contube.TubeFactory + runtimeFactoryMap map[string]api.FunctionRuntimeFactory + instanceFactory api.FunctionInstanceFactory +} + +type ManagerOption interface { + apply(option *managerOptions) (*managerOptions, error) +} + +type managerOptionFunc func(*managerOptions) (*managerOptions, error) + +func (f managerOptionFunc) apply(c *managerOptions) (*managerOptions, error) { + return f(c) +} + +func WithTubeFactory(name string, factory contube.TubeFactory) ManagerOption { + return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { + c.tubeFactoryMap[name] = factory + return c, nil + }) +} + +func WithDefaultTubeFactory(factory contube.TubeFactory) ManagerOption { + return WithTubeFactory("default", factory) +} + +func WithRuntimeFactory(name string, factory api.FunctionRuntimeFactory) ManagerOption { + return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { + c.runtimeFactoryMap[name] = factory + return c, nil + }) +} + +func WithDefaultRuntimeFactory(factory api.FunctionRuntimeFactory) ManagerOption { + return WithRuntimeFactory("default", factory) +} + +func WithInstanceFactory(factory api.FunctionInstanceFactory) ManagerOption { + return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) { + c.instanceFactory = factory + return c, nil + }) +} + +func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) { + options := &managerOptions{ + tubeFactoryMap: make(map[string]contube.TubeFactory), + runtimeFactoryMap: make(map[string]api.FunctionRuntimeFactory), + } + options.runtimeFactoryMap["default"] = wazero.NewWazeroFunctionRuntimeFactory() + options.tubeFactoryMap["default"] = contube.NewMemoryQueueFactory(context.Background()) + options.instanceFactory = NewDefaultInstanceFactory() + for _, o := range opts { + _, err := o.apply(options) + if err != nil { + return nil, err + } } return &FunctionManager{ - functions: make(map[string][]*FunctionInstance), - tubeFactory: tubeFactory, + options: options, + functions: make(map[string][]api.FunctionInstance), }, nil } @@ -50,14 +106,14 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { if _, exist := fm.functions[f.Name]; exist { return common.ErrorFunctionExists } - fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas) + fm.functions[f.Name] = make([]api.FunctionInstance, f.Replicas) for i := int32(0); i < f.Replicas; i++ { - instance := NewFunctionInstance(f, fm.tubeFactory, i) + instance := fm.options.instanceFactory.NewFunctionInstance(f, fm.options.tubeFactoryMap["default"], i) fm.functions[f.Name][i] = instance - go instance.Run() + go instance.Run(fm.options.runtimeFactoryMap["default"]) if err := <-instance.WaitForReady(); err != nil { if err != nil { - slog.ErrorContext(instance.ctx, "Error starting function instance", err) + slog.ErrorContext(instance.Context(), "Error starting function instance", err) } instance.Stop() return err @@ -95,7 +151,7 @@ func (fm *FunctionManager) ListFunctions() (result []string) { func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.tubeFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) + c, err := fm.options.tubeFactoryMap["default"].NewSinkTube(ctx, (&contube.SinkQueueConfig{Topic: name}).ToConfigMap()) if err != nil { return err } @@ -106,7 +162,7 @@ func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.tubeFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + c, err := fm.options.tubeFactoryMap["default"].NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { return nil, err } diff --git a/fs/runtime/grpc/grpc_func.go b/fs/runtime/grpc/grpc_func.go new file mode 100644 index 00000000..23235f67 --- /dev/null +++ b/fs/runtime/grpc/grpc_func.go @@ -0,0 +1,262 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "fmt" + "github.com/functionstream/functionstream/fs/api" + "github.com/functionstream/functionstream/fs/contube" + "github.com/functionstream/functionstream/fs/runtime/grpc/proto" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "io" + "log/slog" + "net" + "sync" +) + +// TODO: Replace with FunctionInstane after the function instance abstraction is finishedf +type GRPCFuncRuntime struct { + Name string + ctx context.Context + status *proto.FunctionStatus + readyCh chan error + input chan string + output chan string + stopFunc func() +} + +// FSSReconcileServer is the struct that implements the FSServiceServer interface. +type FSSReconcileServer struct { + proto.UnimplementedFSReconcileServer + ctx context.Context + readyCh chan struct{} + connected sync.Once + reconcile chan *proto.FunctionStatus + functions map[string]*GRPCFuncRuntime + functionsMu sync.Mutex +} + +func NewFSReconcile(ctx context.Context) *FSSReconcileServer { + return &FSSReconcileServer{ + ctx: ctx, + readyCh: make(chan struct{}), + reconcile: make(chan *proto.FunctionStatus, 100), + functions: make(map[string]*GRPCFuncRuntime), + } +} + +func (s *FSSReconcileServer) WaitForReady() <-chan struct{} { + return s.readyCh +} + +func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer) error { + s.connected.Do(func() { + close(s.readyCh) + }) + errCh := make(chan error) + go func() { + for { + newStatus, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + errCh <- err + return + } + s.functionsMu.Lock() + instance, ok := s.functions[newStatus.Name] + if !ok { + s.functionsMu.Unlock() + slog.Error("receive non-exist function status update", slog.Any("name", newStatus.Name)) + continue + } + s.functionsMu.Unlock() + instance.Update(newStatus) + } + }() + for { + select { + case status := <-s.reconcile: + err := stream.Send(status) + if err != nil { + slog.ErrorContext(stream.Context(), "failed to send status update", slog.Any("status", status)) + // Continue to send the next status update. + } + case <-stream.Context().Done(): + return nil + case <-s.ctx.Done(): + return nil + case e := <-errCh: + return e + } + } + +} + +func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { + name := instance.Definition().Name + runtime := &GRPCFuncRuntime{ + Name: name, + readyCh: make(chan error), + input: make(chan string), + output: make(chan string), + status: &proto.FunctionStatus{ + Name: name, + Status: proto.FunctionStatus_CREATING, + }, + ctx: instance.Context(), + stopFunc: func() { + s.removeFunction(name) + }, + } + { + s.functionsMu.Lock() + defer s.functionsMu.Unlock() + if _, ok := s.functions[name]; ok { + return nil, fmt.Errorf("function already exists") + } + s.functions[name] = runtime + } + s.reconcile <- runtime.status + slog.InfoContext(runtime.ctx, "Creating function", slog.Any("name", name)) + return runtime, nil +} + +func (s *FSSReconcileServer) getFunc(name string) (*GRPCFuncRuntime, error) { + s.functionsMu.Lock() + defer s.functionsMu.Unlock() + instance, ok := s.functions[name] + if !ok { + return nil, fmt.Errorf("function not found") + } + return instance, nil +} + +func (s *FSSReconcileServer) removeFunction(name string) { + s.functionsMu.Lock() + defer s.functionsMu.Unlock() + instance, ok := s.functions[name] + if !ok { + return + } + slog.InfoContext(instance.ctx, "Removing function", slog.Any("name", name)) + delete(s.functions, name) +} + +func isFinalStatus(status proto.FunctionStatus_Status) bool { + return status == proto.FunctionStatus_FAILED || status == proto.FunctionStatus_DELETED +} + +func (f *GRPCFuncRuntime) Update(new *proto.FunctionStatus) { + if f.status.Status == proto.FunctionStatus_CREATING && isFinalStatus(new.Status) { + f.readyCh <- fmt.Errorf("function failed to start") + } + if f.status.Status != new.Status { + slog.InfoContext(f.ctx, "Function status update", slog.Any("new_status", new.Status), slog.Any("old_status", f.status.Status)) + } + f.status = new +} + +func (f *GRPCFuncRuntime) WaitForReady() <-chan error { + return f.readyCh +} + +// Stop stops the function runtime and remove it +// It is different from the ctx.Cancel. It will make sure the runtime has been deleted after this method returns. +func (f *GRPCFuncRuntime) Stop() { + f.stopFunc() +} + +func (f *GRPCFuncRuntime) Call(event contube.Record) (contube.Record, error) { + f.input <- string(event.GetPayload()) + out := <-f.output + return contube.NewRecordImpl([]byte(out), event.Commit), nil +} + +type FunctionServerImpl struct { + proto.UnimplementedFunctionServer + reconcileSvr *FSSReconcileServer +} + +func NewFunctionServerImpl(s *FSSReconcileServer) *FunctionServerImpl { + return &FunctionServerImpl{ + reconcileSvr: s, + } +} + +func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error { + md, ok := metadata.FromIncomingContext(stream.Context()) + if !ok { + return fmt.Errorf("failed to get metadata") + } + instance, err := f.reconcileSvr.getFunc(md["name"][0]) + if err != nil { + return err + } + slog.InfoContext(stream.Context(), "Start processing events", slog.Any("name", md["name"])) + instance.readyCh <- err + errCh := make(chan error) + go func() { + for { + event, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + errCh <- err + return + } + instance.output <- event.Payload + } + }() + + for { + select { + case payload := <-instance.input: + err := stream.Send(&proto.Event{Payload: payload}) + if err != nil { + slog.Error("failed to send event", slog.Any("error", err)) + return err + } + case <-stream.Context().Done(): + return nil + case <-instance.ctx.Done(): + return nil + case e := <-errCh: + return e + } + } +} + +func StartGRPCServer(f *FSSReconcileServer, port int) (*grpc.Server, error) { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + s := grpc.NewServer() + proto.RegisterFSReconcileServer(s, f) + proto.RegisterFunctionServer(s, NewFunctionServerImpl(f)) + go func() { + if err := s.Serve(lis); err != nil { + slog.Error("failed to serve", slog.Any("error", err)) + } + }() + return s, nil +} diff --git a/fs/runtime/grpc/grpc_func_test.go b/fs/runtime/grpc/grpc_func_test.go new file mode 100644 index 00000000..5049dcd3 --- /dev/null +++ b/fs/runtime/grpc/grpc_func_test.go @@ -0,0 +1,176 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "context" + "github.com/functionstream/functionstream/common/model" + "github.com/functionstream/functionstream/fs" + "github.com/functionstream/functionstream/fs/api" + "github.com/functionstream/functionstream/fs/contube" + "testing" + "time" +) + +type mockInstance struct { + ctx context.Context + definition *model.Function +} + +func (m *mockInstance) Context() context.Context { + return m.ctx +} + +func (m *mockInstance) Definition() *model.Function { + return m.definition +} + +func (m *mockInstance) Stop() { + +} + +func (m *mockInstance) Run(_ api.FunctionRuntimeFactory) { + +} +func (m *mockInstance) WaitForReady() <-chan error { + c := make(chan error) + close(c) + return c +} + +func TestGRPCFunc(t *testing.T) { + ctx, closeFSReconcile := context.WithCancel(context.Background()) + fsService := NewFSReconcile(ctx) + port := 17400 + s, err := StartGRPCServer(fsService, port) // The test may running in parallel with other tests, so we need to specify the port + if err != nil { + t.Fatal(err) + return + } + defer s.Stop() + + go StartMockGRPCFunc(t, port) + + select { + case <-fsService.WaitForReady(): + t.Logf("ready") + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for fs service ready") + return + } + + funcCtx, funcCancel := context.WithCancel(context.Background()) + function, err := fsService.NewFunctionRuntime(&mockInstance{ + ctx: funcCtx, + definition: &model.Function{ + Name: "test", + }, + }) + if err != nil { + t.Error(err) + } + select { + case <-function.WaitForReady(): + t.Logf("function ready") + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for function ready") + } + + result, err := function.Call(contube.NewRecordImpl([]byte("hello"), func() { + t.Logf("commit") + })) + if err != nil { + t.Fatalf("failed to call function: %v", err) + return + } + if string(result.GetPayload()) != "hello!" { + t.Fatalf("unexpected result: %v", result) + return + } + + funcCancel() + function.Stop() + + fsService.functionsMu.Lock() + if _, ok := fsService.functions["test"]; ok { + t.Fatalf("function not removed") + } + fsService.functionsMu.Unlock() + + closeFSReconcile() + + time.Sleep(3 * time.Second) // Wait for some time to make sure the cleanup of function doesn't raise any errors +} + +func TestFMWithGRPCRuntime(t *testing.T) { + ctx, closeFSReconcile := context.WithCancel(context.Background()) + fsService := NewFSReconcile(ctx) + port := 17401 + s, err := StartGRPCServer(fsService, port) + if err != nil { + t.Fatal(err) + return + } + defer s.Stop() + go StartMockGRPCFunc(t, port) + select { + case <-fsService.WaitForReady(): + t.Logf("ready") + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for fs service ready") + return + } + + fm, err := fs.NewFunctionManager( + fs.WithDefaultRuntimeFactory(fsService), + fs.WithDefaultTubeFactory(contube.NewMemoryQueueFactory(ctx))) + if err != nil { + t.Fatal(err) + } + + f := &model.Function{ + Name: "test", + Inputs: []string{"input"}, + Output: "output", + Replicas: 1, + } + + err = fm.StartFunction(f) + if err != nil { + t.Fatal(err) + } + + event := contube.NewRecordImpl([]byte("hello"), func() {}) + err = fm.ProduceEvent(f.Inputs[0], event) + if err != nil { + t.Fatal(err) + } + output, err := fm.ConsumeEvent(f.Output) + if err != nil { + t.Fatal(err) + } + if string(output.GetPayload()) != "hello!" { + t.Fatalf("unexpected result: %v", output) + } + + err = fm.DeleteFunction(f.Name) + if err != nil { + t.Fatal(err) + } + + closeFSReconcile() +} diff --git a/fs/runtime/grpc/mock_grpc_func_test.go b/fs/runtime/grpc/mock_grpc_func_test.go new file mode 100644 index 00000000..ce90e31c --- /dev/null +++ b/fs/runtime/grpc/mock_grpc_func_test.go @@ -0,0 +1,114 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "fmt" + "github.com/functionstream/functionstream/fs/runtime/grpc/proto" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "io" + "log/slog" + "testing" +) + +func StartMockGRPCFunc(t *testing.T, port int) { + addr := fmt.Sprintf("localhost:%d", port) + // Set up a connection to the server. + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + return + } + client := proto.NewFSReconcileClient(conn) + + stream, err := client.Reconcile(context.Background()) + if err != nil { + t.Errorf("failed to get process stream: %v", err) + return + } + + funcCli := proto.NewFunctionClient(conn) + + go func() { + defer func(conn *grpc.ClientConn) { + err := conn.Close() + if err != nil { + t.Errorf("did not close: %v", err) + return + } + }(conn) + defer func() { + err := stream.CloseSend() + if err != nil { + t.Errorf("failed to close: %v", err) + return + } + }() + for { + s, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + s, ok := status.FromError(err) + if ok && s.Code() == codes.Unavailable { + slog.Info("server disconnected") + return + } + t.Errorf("failed to receive: %v", err) + return + } + t.Logf("client received status: %v", s) + s.Status = proto.FunctionStatus_RUNNING + err = stream.Send(s) + if err != nil { + t.Errorf("failed to send: %v", err) + return + } + go func() { + ctx := metadata.AppendToOutgoingContext(context.Background(), "name", s.Name) + processStream, err := funcCli.Process(ctx) + if err != nil { + t.Errorf("failed to get process stream: %v", err) + return + } + for { + event, err := processStream.Recv() + if err == io.EOF { + return + } + if err != nil { + t.Errorf("failed to receive event: %v", err) + return + } + t.Logf("client received event: %v", event) + event.Payload += "!" + err = processStream.Send(event) + if err != nil { + t.Errorf("failed to send event: %v", err) + return + } + } + }() + } + }() +} diff --git a/fs/runtime/grpc/proto/grpc_func.pb.go b/fs/runtime/grpc/proto/grpc_func.pb.go new file mode 100644 index 00000000..9558586d --- /dev/null +++ b/fs/runtime/grpc/proto/grpc_func.pb.go @@ -0,0 +1,526 @@ +// +// Copyright 2024 Function Stream Org. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.25.2 +// source: fs/runtime/grpc/proto/grpc_func.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Response_Status int32 + +const ( + Response_OK Response_Status = 0 + Response_ERROR Response_Status = 1 +) + +// Enum value maps for Response_Status. +var ( + Response_Status_name = map[int32]string{ + 0: "OK", + 1: "ERROR", + } + Response_Status_value = map[string]int32{ + "OK": 0, + "ERROR": 1, + } +) + +func (x Response_Status) Enum() *Response_Status { + p := new(Response_Status) + *p = x + return p +} + +func (x Response_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Response_Status) Descriptor() protoreflect.EnumDescriptor { + return file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes[0].Descriptor() +} + +func (Response_Status) Type() protoreflect.EnumType { + return &file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes[0] +} + +func (x Response_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Response_Status.Descriptor instead. +func (Response_Status) EnumDescriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{1, 0} +} + +type FunctionStatus_Status int32 + +const ( + FunctionStatus_CREATING FunctionStatus_Status = 0 + FunctionStatus_RUNNING FunctionStatus_Status = 1 + FunctionStatus_DELETING FunctionStatus_Status = 2 + FunctionStatus_DELETED FunctionStatus_Status = 3 + FunctionStatus_FAILED FunctionStatus_Status = 4 +) + +// Enum value maps for FunctionStatus_Status. +var ( + FunctionStatus_Status_name = map[int32]string{ + 0: "CREATING", + 1: "RUNNING", + 2: "DELETING", + 3: "DELETED", + 4: "FAILED", + } + FunctionStatus_Status_value = map[string]int32{ + "CREATING": 0, + "RUNNING": 1, + "DELETING": 2, + "DELETED": 3, + "FAILED": 4, + } +) + +func (x FunctionStatus_Status) Enum() *FunctionStatus_Status { + p := new(FunctionStatus_Status) + *p = x + return p +} + +func (x FunctionStatus_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FunctionStatus_Status) Descriptor() protoreflect.EnumDescriptor { + return file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes[1].Descriptor() +} + +func (FunctionStatus_Status) Type() protoreflect.EnumType { + return &file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes[1] +} + +func (x FunctionStatus_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FunctionStatus_Status.Descriptor instead. +func (FunctionStatus_Status) EnumDescriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{3, 0} +} + +// The request message for the Process method. +type Event struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *Event) Reset() { + *x = Event{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{0} +} + +func (x *Event) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status Response_Status `protobuf:"varint,1,opt,name=status,proto3,enum=fs_func.Response_Status" json:"status,omitempty"` + Message *string `protobuf:"bytes,2,opt,name=message,proto3,oneof" json:"message,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{1} +} + +func (x *Response) GetStatus() Response_Status { + if x != nil { + return x.Status + } + return Response_OK +} + +func (x *Response) GetMessage() string { + if x != nil && x.Message != nil { + return *x.Message + } + return "" +} + +// The request message for the SetState method. +type SetStateRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *SetStateRequest) Reset() { + *x = SetStateRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SetStateRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SetStateRequest) ProtoMessage() {} + +func (x *SetStateRequest) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SetStateRequest.ProtoReflect.Descriptor instead. +func (*SetStateRequest) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{2} +} + +func (x *SetStateRequest) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *SetStateRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +type FunctionStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Status FunctionStatus_Status `protobuf:"varint,2,opt,name=status,proto3,enum=fs_func.FunctionStatus_Status" json:"status,omitempty"` + Details *string `protobuf:"bytes,3,opt,name=details,proto3,oneof" json:"details,omitempty"` +} + +func (x *FunctionStatus) Reset() { + *x = FunctionStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FunctionStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FunctionStatus) ProtoMessage() {} + +func (x *FunctionStatus) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FunctionStatus.ProtoReflect.Descriptor instead. +func (*FunctionStatus) Descriptor() ([]byte, []int) { + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP(), []int{3} +} + +func (x *FunctionStatus) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *FunctionStatus) GetStatus() FunctionStatus_Status { + if x != nil { + return x.Status + } + return FunctionStatus_CREATING +} + +func (x *FunctionStatus) GetDetails() string { + if x != nil && x.Details != nil { + return *x.Details + } + return "" +} + +var File_fs_runtime_grpc_proto_grpc_func_proto protoreflect.FileDescriptor + +var file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc = []byte{ + 0x0a, 0x25, 0x66, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x70, + 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x66, 0x75, 0x6e, + 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, + 0x22, 0x21, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x22, 0x84, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x30, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, + 0x01, 0x22, 0x1b, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, + 0x4b, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x01, 0x42, 0x0a, + 0x0a, 0x08, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x39, 0x0a, 0x0f, 0x53, 0x65, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xd3, 0x01, 0x0a, 0x0e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x06, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x66, + 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, + 0x88, 0x01, 0x01, 0x22, 0x4a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, + 0x08, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x52, + 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x4c, 0x45, + 0x54, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, + 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x42, + 0x0a, 0x0a, 0x08, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x32, 0x52, 0x0a, 0x0b, 0x46, + 0x53, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x43, 0x0a, 0x09, 0x52, 0x65, + 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x12, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, + 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x1a, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x46, 0x75, 0x6e, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x32, + 0x76, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2f, 0x0a, 0x07, 0x50, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, + 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x0e, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, + 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x08, + 0x53, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, + 0x6e, 0x63, 0x2e, 0x53, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x66, 0x73, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x19, 0x5a, 0x17, 0x66, 0x73, 0x2f, 0x66, 0x75, + 0x6e, 0x63, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x66, 0x75, 0x6e, 0x63, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_fs_runtime_grpc_proto_grpc_func_proto_rawDescOnce sync.Once + file_fs_runtime_grpc_proto_grpc_func_proto_rawDescData = file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc +) + +func file_fs_runtime_grpc_proto_grpc_func_proto_rawDescGZIP() []byte { + file_fs_runtime_grpc_proto_grpc_func_proto_rawDescOnce.Do(func() { + file_fs_runtime_grpc_proto_grpc_func_proto_rawDescData = protoimpl.X.CompressGZIP(file_fs_runtime_grpc_proto_grpc_func_proto_rawDescData) + }) + return file_fs_runtime_grpc_proto_grpc_func_proto_rawDescData +} + +var file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_fs_runtime_grpc_proto_grpc_func_proto_goTypes = []interface{}{ + (Response_Status)(0), // 0: fs_func.Response.Status + (FunctionStatus_Status)(0), // 1: fs_func.FunctionStatus.Status + (*Event)(nil), // 2: fs_func.Event + (*Response)(nil), // 3: fs_func.Response + (*SetStateRequest)(nil), // 4: fs_func.SetStateRequest + (*FunctionStatus)(nil), // 5: fs_func.FunctionStatus +} +var file_fs_runtime_grpc_proto_grpc_func_proto_depIdxs = []int32{ + 0, // 0: fs_func.Response.status:type_name -> fs_func.Response.Status + 1, // 1: fs_func.FunctionStatus.status:type_name -> fs_func.FunctionStatus.Status + 5, // 2: fs_func.FSReconcile.Reconcile:input_type -> fs_func.FunctionStatus + 2, // 3: fs_func.Function.Process:input_type -> fs_func.Event + 4, // 4: fs_func.Function.SetState:input_type -> fs_func.SetStateRequest + 5, // 5: fs_func.FSReconcile.Reconcile:output_type -> fs_func.FunctionStatus + 2, // 6: fs_func.Function.Process:output_type -> fs_func.Event + 3, // 7: fs_func.Function.SetState:output_type -> fs_func.Response + 5, // [5:8] is the sub-list for method output_type + 2, // [2:5] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_fs_runtime_grpc_proto_grpc_func_proto_init() } +func file_fs_runtime_grpc_proto_grpc_func_proto_init() { + if File_fs_runtime_grpc_proto_grpc_func_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SetStateRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FunctionStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[1].OneofWrappers = []interface{}{} + file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes[3].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc, + NumEnums: 2, + NumMessages: 4, + NumExtensions: 0, + NumServices: 2, + }, + GoTypes: file_fs_runtime_grpc_proto_grpc_func_proto_goTypes, + DependencyIndexes: file_fs_runtime_grpc_proto_grpc_func_proto_depIdxs, + EnumInfos: file_fs_runtime_grpc_proto_grpc_func_proto_enumTypes, + MessageInfos: file_fs_runtime_grpc_proto_grpc_func_proto_msgTypes, + }.Build() + File_fs_runtime_grpc_proto_grpc_func_proto = out.File + file_fs_runtime_grpc_proto_grpc_func_proto_rawDesc = nil + file_fs_runtime_grpc_proto_grpc_func_proto_goTypes = nil + file_fs_runtime_grpc_proto_grpc_func_proto_depIdxs = nil +} diff --git a/fs/runtime/grpc/proto/grpc_func.proto b/fs/runtime/grpc/proto/grpc_func.proto new file mode 100644 index 00000000..ef6ea379 --- /dev/null +++ b/fs/runtime/grpc/proto/grpc_func.proto @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; +option go_package = "fs/func/grpc_func/proto"; +package fs_func; + +// The request message for the Process method. +message Event { + string payload = 1; +} + +message Response { + enum Status { + OK = 0; + ERROR = 1; + } + Status status = 1; + optional string message = 2; +} + +// The request message for the SetState method. +message SetStateRequest { + string key = 1; + string value = 2; +} + +message FunctionStatus { + string name = 1; + enum Status { + CREATING = 0; + RUNNING = 1; + DELETING = 2; + DELETED = 3; + FAILED = 4; + } + Status status = 2; + optional string details = 3; +} + +service FSReconcile { + rpc Reconcile(stream FunctionStatus) returns (stream FunctionStatus) {} +} + +service Function { + rpc Process(stream Event) returns (stream Event) {} + rpc SetState(SetStateRequest) returns (Response) {} +} diff --git a/fs/runtime/grpc/proto/grpc_func_grpc.pb.go b/fs/runtime/grpc/proto/grpc_func_grpc.pb.go new file mode 100644 index 00000000..b6fcc726 --- /dev/null +++ b/fs/runtime/grpc/proto/grpc_func_grpc.pb.go @@ -0,0 +1,288 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// FSReconcileClient is the client API for FSReconcile service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type FSReconcileClient interface { + Reconcile(ctx context.Context, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) +} + +type fSReconcileClient struct { + cc grpc.ClientConnInterface +} + +func NewFSReconcileClient(cc grpc.ClientConnInterface) FSReconcileClient { + return &fSReconcileClient{cc} +} + +func (c *fSReconcileClient) Reconcile(ctx context.Context, opts ...grpc.CallOption) (FSReconcile_ReconcileClient, error) { + stream, err := c.cc.NewStream(ctx, &FSReconcile_ServiceDesc.Streams[0], "/fs_func.FSReconcile/Reconcile", opts...) + if err != nil { + return nil, err + } + x := &fSReconcileReconcileClient{stream} + return x, nil +} + +type FSReconcile_ReconcileClient interface { + Send(*FunctionStatus) error + Recv() (*FunctionStatus, error) + grpc.ClientStream +} + +type fSReconcileReconcileClient struct { + grpc.ClientStream +} + +func (x *fSReconcileReconcileClient) Send(m *FunctionStatus) error { + return x.ClientStream.SendMsg(m) +} + +func (x *fSReconcileReconcileClient) Recv() (*FunctionStatus, error) { + m := new(FunctionStatus) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// FSReconcileServer is the server API for FSReconcile service. +// All implementations must embed UnimplementedFSReconcileServer +// for forward compatibility +type FSReconcileServer interface { + Reconcile(FSReconcile_ReconcileServer) error + mustEmbedUnimplementedFSReconcileServer() +} + +// UnimplementedFSReconcileServer must be embedded to have forward compatible implementations. +type UnimplementedFSReconcileServer struct { +} + +func (UnimplementedFSReconcileServer) Reconcile(FSReconcile_ReconcileServer) error { + return status.Errorf(codes.Unimplemented, "method Reconcile not implemented") +} +func (UnimplementedFSReconcileServer) mustEmbedUnimplementedFSReconcileServer() {} + +// UnsafeFSReconcileServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to FSReconcileServer will +// result in compilation errors. +type UnsafeFSReconcileServer interface { + mustEmbedUnimplementedFSReconcileServer() +} + +func RegisterFSReconcileServer(s grpc.ServiceRegistrar, srv FSReconcileServer) { + s.RegisterService(&FSReconcile_ServiceDesc, srv) +} + +func _FSReconcile_Reconcile_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(FSReconcileServer).Reconcile(&fSReconcileReconcileServer{stream}) +} + +type FSReconcile_ReconcileServer interface { + Send(*FunctionStatus) error + Recv() (*FunctionStatus, error) + grpc.ServerStream +} + +type fSReconcileReconcileServer struct { + grpc.ServerStream +} + +func (x *fSReconcileReconcileServer) Send(m *FunctionStatus) error { + return x.ServerStream.SendMsg(m) +} + +func (x *fSReconcileReconcileServer) Recv() (*FunctionStatus, error) { + m := new(FunctionStatus) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// FSReconcile_ServiceDesc is the grpc.ServiceDesc for FSReconcile service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var FSReconcile_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "fs_func.FSReconcile", + HandlerType: (*FSReconcileServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Reconcile", + Handler: _FSReconcile_Reconcile_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "fs/runtime/grpc/proto/grpc_func.proto", +} + +// FunctionClient is the client API for Function service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type FunctionClient interface { + Process(ctx context.Context, opts ...grpc.CallOption) (Function_ProcessClient, error) + SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*Response, error) +} + +type functionClient struct { + cc grpc.ClientConnInterface +} + +func NewFunctionClient(cc grpc.ClientConnInterface) FunctionClient { + return &functionClient{cc} +} + +func (c *functionClient) Process(ctx context.Context, opts ...grpc.CallOption) (Function_ProcessClient, error) { + stream, err := c.cc.NewStream(ctx, &Function_ServiceDesc.Streams[0], "/fs_func.Function/Process", opts...) + if err != nil { + return nil, err + } + x := &functionProcessClient{stream} + return x, nil +} + +type Function_ProcessClient interface { + Send(*Event) error + Recv() (*Event, error) + grpc.ClientStream +} + +type functionProcessClient struct { + grpc.ClientStream +} + +func (x *functionProcessClient) Send(m *Event) error { + return x.ClientStream.SendMsg(m) +} + +func (x *functionProcessClient) Recv() (*Event, error) { + m := new(Event) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *functionClient) SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fs_func.Function/SetState", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// FunctionServer is the server API for Function service. +// All implementations must embed UnimplementedFunctionServer +// for forward compatibility +type FunctionServer interface { + Process(Function_ProcessServer) error + SetState(context.Context, *SetStateRequest) (*Response, error) + mustEmbedUnimplementedFunctionServer() +} + +// UnimplementedFunctionServer must be embedded to have forward compatible implementations. +type UnimplementedFunctionServer struct { +} + +func (UnimplementedFunctionServer) Process(Function_ProcessServer) error { + return status.Errorf(codes.Unimplemented, "method Process not implemented") +} +func (UnimplementedFunctionServer) SetState(context.Context, *SetStateRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method SetState not implemented") +} +func (UnimplementedFunctionServer) mustEmbedUnimplementedFunctionServer() {} + +// UnsafeFunctionServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to FunctionServer will +// result in compilation errors. +type UnsafeFunctionServer interface { + mustEmbedUnimplementedFunctionServer() +} + +func RegisterFunctionServer(s grpc.ServiceRegistrar, srv FunctionServer) { + s.RegisterService(&Function_ServiceDesc, srv) +} + +func _Function_Process_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(FunctionServer).Process(&functionProcessServer{stream}) +} + +type Function_ProcessServer interface { + Send(*Event) error + Recv() (*Event, error) + grpc.ServerStream +} + +type functionProcessServer struct { + grpc.ServerStream +} + +func (x *functionProcessServer) Send(m *Event) error { + return x.ServerStream.SendMsg(m) +} + +func (x *functionProcessServer) Recv() (*Event, error) { + m := new(Event) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Function_SetState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SetStateRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FunctionServer).SetState(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fs_func.Function/SetState", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FunctionServer).SetState(ctx, req.(*SetStateRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Function_ServiceDesc is the grpc.ServiceDesc for Function service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Function_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "fs_func.Function", + HandlerType: (*FunctionServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SetState", + Handler: _Function_SetState_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Process", + Handler: _Function_Process_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "fs/runtime/grpc/proto/grpc_func.proto", +} diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go new file mode 100644 index 00000000..6e5df931 --- /dev/null +++ b/fs/runtime/wazero/wazero_runtime.go @@ -0,0 +1,110 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package wazero + +import ( + "github.com/functionstream/functionstream/common" + "github.com/functionstream/functionstream/fs/api" + "github.com/functionstream/functionstream/fs/contube" + "github.com/pkg/errors" + "github.com/tetratelabs/wazero" + wazero_api "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/tetratelabs/wazero/sys" + "golang.org/x/net/context" + "log/slog" + "os" + "strconv" +) + +type WazeroFunctionRuntimeFactory struct { +} + +func NewWazeroFunctionRuntimeFactory() api.FunctionRuntimeFactory { + return &WazeroFunctionRuntimeFactory{} +} + +func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { + r := wazero.NewRuntime(instance.Context()) + _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, m wazero_api.Module, a, b, c, d uint32) { + panic("abort") + }).Export("abort").Instantiate(instance.Context()) + if err != nil { + return nil, errors.Wrap(err, "Error instantiating function module") + } + stdin := common.NewChanReader() + stdout := common.NewChanWriter() + + config := wazero.NewModuleConfig(). + WithStdout(stdout).WithStdin(stdin).WithStderr(os.Stderr) + + wasi_snapshot_preview1.MustInstantiate(instance.Context(), r) + + wasmBytes, err := os.ReadFile(instance.Definition().Archive) + if err != nil { + return nil, errors.Wrap(err, "Error reading wasm file") + } + // Trigger the "_start" function, WASI's "main". + mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) + if err != nil { + if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 { + return nil, errors.Wrap(err, "Error instantiating function, function exit with code"+strconv.Itoa(int(exitErr.ExitCode()))) + } else if !ok { + return nil, errors.Wrap(err, "Error instantiating function") + } + } + process := mod.ExportedFunction("process") + if process == nil { + return nil, errors.New("No process function found") + } + return &WazeroFunctionRuntime{ + callFunc: func(e contube.Record) (contube.Record, error) { + stdin.ResetBuffer(e.GetPayload()) + _, err := process.Call(instance.Context()) + if err != nil { + return nil, errors.Wrap(err, "Error calling wasm function") + } + output := stdout.GetAndReset() + return contube.NewRecordImpl(output, e.Commit), nil + }, + stopFunc: func() { + err := r.Close(instance.Context()) + if err != nil { + slog.ErrorContext(instance.Context(), "Error closing r", err) + } + }, + }, nil +} + +type WazeroFunctionRuntime struct { + callFunc func(e contube.Record) (contube.Record, error) + stopFunc func() +} + +func (r *WazeroFunctionRuntime) WaitForReady() <-chan error { + c := make(chan error) + close(c) + return c +} + +func (r *WazeroFunctionRuntime) Call(e contube.Record) (contube.Record, error) { + return r.callFunc(e) +} + +func (r *WazeroFunctionRuntime) Stop() { + r.stopFunc() +} diff --git a/go.mod b/go.mod index 9cdd1251..4438cc55 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/spf13/cobra v1.8.0 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 + google.golang.org/grpc v1.61.1 + google.golang.org/protobuf v1.32.0 ) require ( @@ -52,15 +54,15 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.18.0 // indirect + golang.org/x/crypto v0.19.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.20.0 // indirect + golang.org/x/net v0.21.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/sys v0.16.0 // indirect - golang.org/x/term v0.16.0 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/term v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index e88ac935..d4ec8faf 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -158,8 +158,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -174,12 +174,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= -golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -199,6 +199,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk= +google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= +google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= diff --git a/license-checker/license-checker.sh b/license-checker/license-checker.sh index 12bc1f63..32e87aca 100755 --- a/license-checker/license-checker.sh +++ b/license-checker/license-checker.sh @@ -36,7 +36,8 @@ if [ ! -f "$LICENSE_CHECKER" ]; then export BINDIR=bin && curl -s https://raw.githubusercontent.com/lluissm/license-header-checker/master/install.sh | bash fi -$LICENSE_CHECKER -a -r -i bin,restclient,common/run.go,common/signal.go ./license-checker/license-header.txt . go +$LICENSE_CHECKER -a -r -i bin,restclient,common/run.go,common/signal.go,fs/runtime/grpc/proto ./license-checker/license-header.txt . go +$LICENSE_CHECKER -a -r ./license-checker/license-header.txt . proto $LICENSE_CHECKER -a -r -i bin,restclient ./license-checker/license-header-sh.txt . sh yaml yml $LICENSE_CHECKER -a -r -i bin,restclient ./license-checker/license-header-md.txt . md diff --git a/perf/perf.go b/perf/perf.go index 2ea94aa8..35f16561 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -22,7 +22,6 @@ import ( "fmt" "github.com/bmizerany/perks/quantile" "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/fs" "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/restclient" "golang.org/x/time/rate" @@ -34,11 +33,13 @@ import ( "time" ) +type TubeBuilder func(ctx context.Context, config *common.Config) (contube.TubeFactory, error) + type Config struct { PulsarURL string RequestRate float64 Func *restclient.Function - QueueBuilder fs.TubeBuilder + QueueBuilder TubeBuilder } type Perf interface { @@ -49,7 +50,7 @@ type perf struct { config *Config input chan<- contube.Record output <-chan contube.Record - tubeBuilder fs.TubeBuilder + tubeBuilder TubeBuilder } func New(config *Config) Perf { @@ -57,7 +58,7 @@ func New(config *Config) Perf { config: config, } if config.QueueBuilder == nil { - p.tubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { + p.tubeBuilder = func(ctx context.Context, c *common.Config) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: config.PulsarURL, }).ToConfigMap()) @@ -92,7 +93,7 @@ func (p *perf) Run(ctx context.Context) { } } - config := &fs.Config{ + config := &common.Config{ PulsarURL: p.config.PulsarURL, } diff --git a/server/config_loader.go b/server/config_loader.go index 6b384b56..9f72a99d 100644 --- a/server/config_loader.go +++ b/server/config_loader.go @@ -17,45 +17,31 @@ package server import ( - "context" "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/fs/contube" "log/slog" "os" "sync" - - "github.com/functionstream/functionstream/fs" ) -var loadedConfig *fs.Config +var loadedConfig *common.Config var initConfig = sync.Once{} -func LoadConfigFromEnv() *fs.Config { +func LoadConfigFromEnv() *common.Config { initConfig.Do(func() { - loadedConfig = &fs.Config{ + loadedConfig = &common.Config{ ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL), - } - tubeType := getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType) - switch tubeType { - case common.PulsarTubeType: - loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { - return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ - PulsarURL: c.PulsarURL, - }).ToConfigMap()) - } + TubeType: getEnvWithDefault("TUBE_TYPE", common.DefaultTubeType), } }) return loadedConfig } -func LoadStandaloneConfigFromEnv() *fs.Config { +func LoadStandaloneConfigFromEnv() *common.Config { initConfig.Do(func() { - loadedConfig = &fs.Config{ + loadedConfig = &common.Config{ ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), - } - loadedConfig.TubeBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { - return contube.NewMemoryQueueFactory(ctx), nil + TubeType: getEnvWithDefault("TUBE_TYPE", common.MemoryTubeType), } }) return loadedConfig diff --git a/server/server.go b/server/server.go index 647448bf..4ecdf0a4 100644 --- a/server/server.go +++ b/server/server.go @@ -36,20 +36,76 @@ import ( ) type Server struct { - manager *fs.FunctionManager - config *fs.Config + options *serverOptions + config *common.Config httpSvr atomic.Pointer[http.Server] } -func New(config *fs.Config) *Server { - manager, err := fs.NewFunctionManager(config) +type serverOptions struct { + manager *fs.FunctionManager +} + +type ServerOption interface { + apply(option *serverOptions) (*serverOptions, error) +} + +type serverOptionFunc func(*serverOptions) (*serverOptions, error) + +func (f serverOptionFunc) apply(c *serverOptions) (*serverOptions, error) { + return f(c) +} + +func WithFunctionManager(manager *fs.FunctionManager) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.manager = manager + return o, nil + }) +} + +func newDefaultFunctionManager(config *common.Config) (*fs.FunctionManager, error) { + var tubeFactory contube.TubeFactory + var err error + switch config.TubeType { + case common.PulsarTubeType: + tubeFactory, err = contube.NewPulsarEventQueueFactory(context.Background(), (&contube.PulsarTubeFactoryConfig{ + PulsarURL: config.PulsarURL, + }).ToConfigMap()) + if err != nil { + return nil, errors.Wrap(err, "failed to create default pulsar tube factory") + } + case common.MemoryTubeType: + tubeFactory = contube.NewMemoryQueueFactory(context.Background()) + } + + manager, err := fs.NewFunctionManager( + fs.WithDefaultTubeFactory(tubeFactory), + ) if err != nil { - slog.Error("Error creating function manager", err) + return nil, errors.Wrap(err, "failed to create default function manager") + } + return manager, nil +} + +func NewServer(config *common.Config, opts ...ServerOption) (*Server, error) { + options := &serverOptions{} + for _, o := range opts { + _, err := o.apply(options) + if err != nil { + return nil, err + } + } + if options.manager == nil { + manager, err := newDefaultFunctionManager(config) + if err != nil { + return nil, err + } + options.manager = manager } + return &Server{ - manager: manager, + options: options, config: config, - } + }, nil } func (s *Server) Run(context context.Context) { @@ -104,7 +160,7 @@ func (s *Server) startRESTHandlers() error { } slog.Info("Starting function", slog.Any("name", functionName)) - err = s.manager.StartFunction(f) + err = s.options.manager.StartFunction(f) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -117,7 +173,7 @@ func (s *Server) startRESTHandlers() error { functionName := vars["function_name"] slog.Info("Deleting function", slog.Any("name", functionName)) - err := s.manager.DeleteFunction(functionName) + err := s.options.manager.DeleteFunction(functionName) if errors.Is(err, common.ErrorFunctionNotFound) { http.Error(w, err.Error(), http.StatusNotFound) return @@ -126,7 +182,7 @@ func (s *Server) startRESTHandlers() error { }).Methods("DELETE") r.HandleFunc("/api/v1/functions", func(w http.ResponseWriter, r *http.Request) { - functions := s.manager.ListFunctions() + functions := s.options.manager.ListFunctions() w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(functions) if err != nil { @@ -145,7 +201,7 @@ func (s *Server) startRESTHandlers() error { http.Error(w, errors.Wrap(err, "Failed to read body").Error(), http.StatusBadRequest) return } - err = s.manager.ProduceEvent(queueName, contube.NewRecordImpl(content, func() {})) + err = s.options.manager.ProduceEvent(queueName, contube.NewRecordImpl(content, func() {})) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -156,7 +212,7 @@ func (s *Server) startRESTHandlers() error { vars := mux.Vars(r) queueName := vars["queue_name"] slog.Info("Consuming event from queue", slog.Any("queue_name", queueName)) - event, err := s.manager.ConsumeEvent(queueName) + event, err := s.options.manager.ConsumeEvent(queueName) if err != nil { slog.Error("Error when consuming event", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server/server_test.go b/server/server_test.go index c431f27f..dcfabbb3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,8 +19,8 @@ package server import ( "context" "encoding/json" + "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/fs" "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/tests" "math/rand" @@ -30,13 +30,14 @@ import ( func TestStandaloneBasicFunction(t *testing.T) { - conf := &fs.Config{ + conf := &common.Config{ ListenAddr: "localhost:7301", - TubeBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { - return contube.NewMemoryQueueFactory(ctx), nil - }, + TubeType: common.MemoryTubeType, + } + s, err := NewServer(conf) + if err != nil { + t.Fatal(err) } - s := New(conf) svrCtx, svrCancel := context.WithCancel(context.Background()) go s.Run(svrCtx) defer func() { @@ -53,7 +54,7 @@ func TestStandaloneBasicFunction(t *testing.T) { Name: "test-func", Replicas: 1, } - err := s.manager.StartFunction(funcConf) + err = s.options.manager.StartFunction(funcConf) if err != nil { t.Fatal(err) } @@ -66,23 +67,17 @@ func TestStandaloneBasicFunction(t *testing.T) { if err != nil { t.Fatal(err) } - err = s.manager.ProduceEvent(inputTopic, contube.NewRecordImpl(jsonBytes, func() { + err = s.options.manager.ProduceEvent(inputTopic, contube.NewRecordImpl(jsonBytes, func() { })) if err != nil { t.Fatal(err) } - output := make(chan contube.Record) - go func() { - event, err := s.manager.ConsumeEvent(outputTopic) - if err != nil { - t.Error(err) - return - } - output <- event - }() - - event := <-output + event, err := s.options.manager.ConsumeEvent(outputTopic) + if err != nil { + t.Error(err) + return + } var out tests.Person err = json.Unmarshal(event.GetPayload(), &out) if err != nil {