diff --git a/Makefile b/Makefile index ee079a40..2d01e990 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -.PHONY: build .PHONY: license build: go build -v -o bin/function-stream ./cmd @@ -24,7 +23,7 @@ lint: build_all: build build_example test: - go test ./... -timeout 10m + go test -race ./... -timeout 10m bench: export FS_TEST_WORK_DIR="$(shell pwd)" && go test -bench=. ./benchmark -timeout 10m diff --git a/README.md b/README.md index 04098784..a2cbd69f 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,146 @@ limitations under the License. --> -# function-stream \ No newline at end of file +# Function Stream + +Function stream is an event-streaming function platform based on Apache Pulsar and WebAssembly. It enables efficient and +scalable processing of data streams by leveraging the power of WebAssembly. Function Stream provides seamless +integration with Apache Pulsar, allowing users to take full advantage of its robust messaging capabilities. + +## Features + +1. **Support for Multiple Programming Languages**: Function Stream aims to provide the capability to write code using + multiple programming languages. This allows developers to use their preferred language and harness its specific + strengths while working with Function Stream. +2. **High Performance and Throughput**: Function Stream is designed to deliver high performance and handle substantial + throughput. It strives to optimize resource utilization and minimize latency, enabling efficient execution of code + and processing of data. +3. **Isolated Environment**: Function Stream offers an isolated environment for executing code. This ensures that each + function runs independently, without interference from other functions or external factors. The isolation enhances + the security, reliability, and predictability of code execution. +4. **Scalability and Fault Tolerance**: Function Stream focuses on scalability by offering the ability to effortlessly + scale up or down based on workload demands. Additionally, it emphasizes fault tolerance, ensuring that system + failures or errors do not disrupt the overall functioning of the platform. +4. **Support for Complex Data Schema**: Function Stream acknowledges the need to handle diverse data types and formats. + It provides support for complex data schema, including bytes data and JSON format data, among others. This + versatility enables developers to process and manipulate data efficiently within the platform. +6. **Stateful/Stateless Computing**: Function Stream caters to both stateful and stateless computing requirements. It + accommodates scenarios where functions require maintaining state between invocations as well as situations where a + stateless approach is more suitable. This flexibility allows developers to implement the desired architectural + patterns. +7. **Cross-Architecture Platform Execution**: Function Stream aims to be a cross-architecture platform capable of + executing code across different hardware architectures seamlessly. It provides compatibility and portability, + allowing developers to run their code on various platforms without concerns about underlying hardware dependencies. + +### Architecture and Components + +Function Stream is composed of three main components: the WebAssembly runtime engine, the Pulsar client, and the +Function Stream service. The following figure shows the overview of the Function Stream architecture. +![Architecture](docs/images/arch.png) + +The **WebAssembly runtime engine** is responsible for executing the WebAssembly modules that implement the stream +processing logic. The runtime engine supports an interface for the underlying wasm runtime library. We use [wazero +](https://github.com/tetratelabs/wazero) as the +WebAssembly runtime library, as they are both fast and lightweight. The WebAssembly runtime +engine communicates with the Pulsar client through standard IO and file systems. + +**The Pulsar client** is responsible for consuming and publishing the messages from and to the Apache Pulsar cluster. We +use [Pulsar Go client](https://github.com/apache/pulsar-client-go), which is a pure go implementation of the pulsar +client library, to interact with the Pulsar brokers. The Pulsar client handles the data schema, the message metadata, +and the processing guarantees of the messages. + +**The Function Stream service** is responsible for managing the lifecycle and coordination of the WebAssembly instances. + +## Building Instructions + +To compile Function Stream, use this command: + +```shell +make build_all +``` + +This creates the function-stream binary program and example wasm files in the `bin` directory, +like `bin/example_basic.wasm`. + +## Running Instructions + +You have two ways to start the function stream server. + +### Option 1: Standalone Mode (for development and testing) + +Use this command to start the standalone server: + +```shell +bin/function-stream standalone +``` + +### Option 2: Server Mode (for production) + +First, start an Apache Pulsar service. See this [doc](https://pulsar.apache.org/docs/en/standalone/) for instructions. + +Then, use this command to start the server based on Apache Pulsar: + +```shell +bin/function-stream server +``` + +### Creating a Function + +We'll use `example_basic.wasm` as an example wasm file. This function increases the money by 1. See the +code [here](examples/basic/main.go). + +After starting the server, create a function with this command: + +```shell +bin/function-stream client create -n example -a "bin/example_basic.wasm" -i example-input -o example-output -r 1 +``` + +This creates a function named `example` using `example_basic.wasm`. It takes messages from `example-input`, produces +messages to `example-output`, and runs with 1 replica. + +### Consuming a Message from the Function Output + +After creating the function, consume a message from the output topic with this command: + +```shell +bin/function-stream client consume -n example-output +``` + +### Producing a Message to the Function Input + +In a new terminal, produce a message to the input topic with this command: + +```shell +bin/function-stream client produce -n example-input -c '{"name":"rbt","money":2}' +``` + +You'll see this log: + +``` +Event produced +``` + +### Checking the Output + +In the terminal where you consume the message from the output topic, you'll see this log: + +``` +"{\"name\":\"rbt\",\"money\":3,\"expected\":0}" +``` + +### Deleting the Function + +After testing, delete the function with this command: + +```shell +bin/function-stream client delete -n example +``` + +## Contributing + +We're happy to receive contributions from the community. If you find a bug or have a feature request, please open an +issue or submit a pull request. + +## License + +This project is licensed under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). \ No newline at end of file diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 3beb1711..ae3bdf4a 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -23,7 +23,6 @@ import ( "github.com/functionstream/functionstream/perf" "github.com/functionstream/functionstream/restclient" "github.com/functionstream/functionstream/server" - "io" "log/slog" "math/rand" "os" @@ -48,12 +47,13 @@ func prepareEnv() { func BenchmarkStressForBasicFunc(b *testing.B) { prepareEnv() - s := server.New() - go func() { - common.RunProcess(func() (io.Closer, error) { - go s.Run() - return s, nil - }) + s := server.New(server.LoadConfigFromEnv()) + go s.Run() + defer func() { + err := s.Close() + if err != nil { + b.Fatal(err) + } }() inputTopic := "test-input-" + strconv.Itoa(rand.Int()) @@ -91,7 +91,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) { b.ReportAllocs() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) defer cancel() profile := "BenchmarkStressForBasicFunc.pprof" @@ -111,40 +111,33 @@ func BenchmarkStressForBasicFunc(b *testing.B) { perf.New(pConfig).Run(ctx) pprof.StopCPUProfile() - - err = s.Close() - if err != nil { - b.Fatal(err) - } } func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { prepareEnv() - memoryQueueFactory := lib.NewMemoryQueueFactory() + memoryQueueFactory := lib.NewMemoryQueueFactory(context.Background()) svrConf := &lib.Config{ + ListenAddr: common.DefaultAddr, QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) { return memoryQueueFactory, nil }, } - fm, err := lib.NewFunctionManager(svrConf) - if err != nil { - b.Fatal(err) - } - s := server.NewWithFM(fm) - go func() { - common.RunProcess(func() (io.Closer, error) { - go s.Run() - return s, nil - }) + s := server.New(svrConf) + go s.Run() + defer func() { + err := s.Close() + if err != nil { + b.Fatal(err) + } }() inputTopic := "test-input-" + strconv.Itoa(rand.Int()) outputTopic := "test-output-" + strconv.Itoa(rand.Int()) - replicas := int32(15) + replicas := int32(1) pConfig := &perf.Config{ RequestRate: 200000.0, @@ -161,7 +154,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { b.ReportAllocs() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) defer cancel() profile := "BenchmarkStressForBasicFunc.pprof" diff --git a/cmd/client/cmd.go b/cmd/client/cmd.go new file mode 100644 index 00000000..1c993082 --- /dev/null +++ b/cmd/client/cmd.go @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + c "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/cmd/client/consume" + "github.com/functionstream/functionstream/cmd/client/create" + del "github.com/functionstream/functionstream/cmd/client/delete" + "github.com/functionstream/functionstream/cmd/client/list" + "github.com/functionstream/functionstream/cmd/client/produce" + "github.com/spf13/cobra" +) + +var ( + Cmd = &cobra.Command{ + Use: "client", + Short: "Function Stream Client Tool", + Long: `Operations to manage functions in a function stream server`, + } +) + +func init() { + Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s", "http://localhost:7300", "Service address") + + Cmd.AddCommand(create.Cmd) + Cmd.AddCommand(list.Cmd) + Cmd.AddCommand(del.Cmd) + Cmd.AddCommand(produce.Cmd) + Cmd.AddCommand(consume.Cmd) +} diff --git a/cmd/client/common/config.go b/cmd/client/common/config.go new file mode 100644 index 00000000..33be36a9 --- /dev/null +++ b/cmd/client/common/config.go @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +type ClientConfig struct { + ServiceAddr string +} + +var Config ClientConfig diff --git a/cmd/client/consume/cmd.go b/cmd/client/consume/cmd.go new file mode 100644 index 00000000..3cdfee59 --- /dev/null +++ b/cmd/client/consume/cmd.go @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consume + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/restclient" + "github.com/spf13/cobra" + "os" +) + +var ( + config = flags{} +) + +type flags struct { + name string +} + +var Cmd = &cobra.Command{ + Use: "consume", + Short: "Consume an event", + Long: `Consume an event from a queue`, + Args: cobra.NoArgs, + Run: exec, +} + +func init() { + Cmd.Flags().StringVarP(&config.name, "name", "n", "", "The name of the queue") + Cmd.MarkFlagsRequiredTogether("name") +} + +func exec(_ *cobra.Command, _ []string) { + cfg := restclient.NewConfiguration() + cfg.Servers = []restclient.ServerConfiguration{{ + URL: common.Config.ServiceAddr, + }} + cli := restclient.NewAPIClient(cfg) + + e, res, err := cli.DefaultAPI.ApiV1ConsumeQueueNameGet(context.Background(), config.name).Execute() + if err != nil { + fmt.Printf("Failed to consume event: %v\n", err) + os.Exit(1) + } + if res.StatusCode != 200 { + fmt.Printf("Failed to consume event: %v\n", res.Status) + os.Exit(1) + } + fmt.Printf("%s\n", e) +} diff --git a/cmd/client/create/cmd.go b/cmd/client/create/cmd.go new file mode 100644 index 00000000..4b4d9cf7 --- /dev/null +++ b/cmd/client/create/cmd.go @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package create + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/restclient" + "github.com/spf13/cobra" + "io" + "os" +) + +var ( + config = flags{} +) + +type flags struct { + name string + archive string + inputs []string + output string + replica int32 +} + +var Cmd = &cobra.Command{ + Use: "create", + Short: "Create Function", + Long: `Create a function on the function stream server`, + Args: cobra.NoArgs, + Run: exec, +} + +func init() { + Cmd.Flags().StringVarP(&config.name, "name", "n", "", "The name of the function") + Cmd.Flags().StringVarP(&config.archive, "archive", "a", "", "The archive path of the function") + Cmd.Flags().StringSliceVarP(&config.inputs, "inputs", "i", []string{}, "The inputs of the function") + Cmd.Flags().StringVarP(&config.output, "output", "o", "", "The output of the function") + Cmd.Flags().Int32VarP(&config.replica, "replica", "r", 1, "The replica of the function") + + Cmd.MarkFlagsRequiredTogether("name") +} + +func exec(_ *cobra.Command, _ []string) { + cfg := restclient.NewConfiguration() + cfg.Servers = []restclient.ServerConfiguration{{ + URL: common.Config.ServiceAddr, + }} + cli := restclient.NewAPIClient(cfg) + f := restclient.Function{ + Name: &config.name, + Archive: config.archive, + Inputs: config.inputs, + Output: config.output, + } + + res, err := cli.DefaultAPI.ApiV1FunctionFunctionNamePost(context.Background(), config.name).Function(f).Execute() + if err != nil { + body, e := io.ReadAll(res.Body) + if e != nil { + fmt.Printf("Failed to create function: %v\n", err) + os.Exit(1) + } + fmt.Printf("Failed to create function: %v, %s\n", err, string(body)) + os.Exit(1) + } + if res.StatusCode != 200 { + fmt.Printf("Failed to create function with status code: %d\n", res.StatusCode) + os.Exit(1) + } +} diff --git a/cmd/client/delete/cmd.go b/cmd/client/delete/cmd.go new file mode 100644 index 00000000..3761f4be --- /dev/null +++ b/cmd/client/delete/cmd.go @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package del + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/restclient" + "github.com/spf13/cobra" + "os" +) + +var ( + config = flags{} +) + +type flags struct { + name string +} + +var Cmd = &cobra.Command{ + Use: "delete", + Short: "Delete Function", + Long: `Delete a function on the function stream server`, + Args: cobra.NoArgs, + Run: exec, +} + +func init() { + Cmd.Flags().StringVarP(&config.name, "name", "n", "", "The name of the function") + Cmd.MarkFlagsRequiredTogether("name") +} + +func exec(_ *cobra.Command, _ []string) { + cfg := restclient.NewConfiguration() + cfg.Servers = []restclient.ServerConfiguration{{ + URL: common.Config.ServiceAddr, + }} + cli := restclient.NewAPIClient(cfg) + + res, err := cli.DefaultAPI.ApiV1FunctionFunctionNameDelete(context.Background(), config.name).Execute() + if err != nil { + fmt.Printf("Failed to delete function: %v\n", err) + os.Exit(1) + } + if res.StatusCode != 200 { + fmt.Printf("Failed to delete function with status code: %d\n", res.StatusCode) + os.Exit(1) + } +} diff --git a/cmd/client/list/cmd.go b/cmd/client/list/cmd.go new file mode 100644 index 00000000..c59c9b0a --- /dev/null +++ b/cmd/client/list/cmd.go @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package list + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/restclient" + "github.com/spf13/cobra" + "os" +) + +var Cmd = &cobra.Command{ + Use: "list", + Short: "List All Functions", + Long: `List all functions on the function stream server`, + Args: cobra.NoArgs, + Run: exec, +} + +func exec(_ *cobra.Command, _ []string) { + cfg := restclient.NewConfiguration() + cfg.Servers = []restclient.ServerConfiguration{{ + URL: common.Config.ServiceAddr, + }} + cli := restclient.NewAPIClient(cfg) + + list, res, err := cli.DefaultAPI.ApiV1FunctionsGet(context.Background()).Execute() + if err != nil { + fmt.Printf("Failed to list functions: %v\n", err) + os.Exit(1) + } + if res.StatusCode != 200 { + fmt.Printf("Failed to list functions with status code: %d\n", res.StatusCode) + os.Exit(1) + } + for _, f := range list { + fmt.Println(f) + } +} diff --git a/cmd/client/produce/cmd.go b/cmd/client/produce/cmd.go new file mode 100644 index 00000000..26c83a77 --- /dev/null +++ b/cmd/client/produce/cmd.go @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package produce + +import ( + "context" + "fmt" + "github.com/functionstream/functionstream/cmd/client/common" + "github.com/functionstream/functionstream/restclient" + "github.com/spf13/cobra" + "os" +) + +var ( + config = flags{} +) + +type flags struct { + name string + content string +} + +var Cmd = &cobra.Command{ + Use: "produce", + Short: "Produce an event", + Long: `Produce an event to a queue`, + Args: cobra.NoArgs, + Run: exec, +} + +func init() { + Cmd.Flags().StringVarP(&config.name, "name", "n", "", "The name of the queue") + Cmd.Flags().StringVarP(&config.content, "content", "c", "", "The content of the event") + Cmd.MarkFlagsRequiredTogether("name", "content") +} + +func exec(_ *cobra.Command, _ []string) { + cfg := restclient.NewConfiguration() + cfg.Servers = []restclient.ServerConfiguration{{ + URL: common.Config.ServiceAddr, + }} + cli := restclient.NewAPIClient(cfg) + + res, err := cli.DefaultAPI.ApiV1ProduceQueueNamePut(context.Background(), config.name).Body(config.content).Execute() + if err != nil { + fmt.Printf("Failed to produce event: %v\n", err) + os.Exit(1) + } + if res.StatusCode != 200 { + fmt.Printf("Failed to produce event: %v\n", res.Status) + os.Exit(1) + } + fmt.Println("Event produced") +} diff --git a/cmd/main.go b/cmd/main.go index 418f28d1..abf79e98 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,8 +16,10 @@ package main import ( "fmt" + "github.com/functionstream/functionstream/cmd/client" "github.com/functionstream/functionstream/cmd/perf" "github.com/functionstream/functionstream/cmd/server" + "github.com/functionstream/functionstream/cmd/standalone" "github.com/spf13/cobra" "os" ) @@ -32,7 +34,9 @@ var ( func init() { rootCmd.AddCommand(server.Cmd) + rootCmd.AddCommand(client.Cmd) rootCmd.AddCommand(perf.Cmd) + rootCmd.AddCommand(standalone.Cmd) } func main() { diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index f48d7692..ef165204 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -32,7 +32,7 @@ var ( func exec(*cobra.Command, []string) { common.RunProcess(func() (io.Closer, error) { - s := server.New() + s := server.New(server.LoadConfigFromEnv()) go s.Run() return s, nil }) diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go new file mode 100644 index 00000000..a8b43a09 --- /dev/null +++ b/cmd/standalone/cmd.go @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package standalone + +import ( + "github.com/functionstream/functionstream/common" + "github.com/functionstream/functionstream/server" + "github.com/spf13/cobra" + "io" +) + +var ( + Cmd = &cobra.Command{ + Use: "standalone", + Short: "Start a standalone server", + Long: `Start a standalone server`, + Run: exec, + } +) + +func exec(*cobra.Command, []string) { + common.RunProcess(func() (io.Closer, error) { + s := server.New(server.LoadStandaloneConfigFromEnv()) + go s.Run() + return s, nil + }) +} diff --git a/common/constants.go b/common/constants.go new file mode 100644 index 00000000..ce8168ed --- /dev/null +++ b/common/constants.go @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +const ( + PulsarQueueType = "pulsar" + + DefaultAddr = "localhost:7300" + DefaultPulsarURL = "pulsar://localhost:6650" + DefaultQueueType = PulsarQueueType +) diff --git a/common/signal.go b/common/signal.go index 9e51492d..93315b86 100644 --- a/common/signal.go +++ b/common/signal.go @@ -39,7 +39,7 @@ func WaitUntilSignal(closers ...io.Closer) { "Failed when shutting down server", slog.Any("error", err), ) - os.Exit(1) + code = 1 } } diff --git a/docs/images/arch.png b/docs/images/arch.png new file mode 100644 index 00000000..5e2d07d2 Binary files /dev/null and b/docs/images/arch.png differ diff --git a/lib/event_queue.go b/lib/event_queue.go index 898dc57c..23d8c43e 100644 --- a/lib/event_queue.go +++ b/lib/event_queue.go @@ -36,6 +36,9 @@ type SinkQueueConfig struct { type EventQueueFactory interface { NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) + // NewSinkChan returns a news channel + // The event.Ack() would be invoked after the event is sunk successfully + // The caller should close the channel when it is done NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) } diff --git a/lib/instance.go b/lib/instance.go index 3a915448..beb6ec62 100644 --- a/lib/instance.go +++ b/lib/instance.go @@ -90,7 +90,8 @@ func (instance *FunctionInstance) Run() { slog.InfoContext(instance.ctx, "function instance has been stopped") return } - slog.ErrorContext(ctx, message, args...) + extraArgs := append(args, slog.Any("error", err.Error())) + slog.ErrorContext(ctx, message, extraArgs...) } // Trigger the "_start" function, WASI's "main". @@ -119,6 +120,7 @@ func (instance *FunctionInstance) Run() { instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") return } + defer close(sinkChan) instance.readyCh <- nil for e := range sourceChan { diff --git a/lib/manager.go b/lib/manager.go index 52b12193..ec88fb0e 100644 --- a/lib/manager.go +++ b/lib/manager.go @@ -19,6 +19,8 @@ import ( "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" "log/slog" + "math/rand" + "strconv" "sync" ) @@ -41,9 +43,8 @@ func NewFunctionManager(config *Config) (*FunctionManager, error) { func (fm *FunctionManager) StartFunction(f *model.Function) error { fm.functionsLock.Lock() - defer fm.functionsLock.Unlock() + defer fm.functionsLock.Unlock() // TODO: narrow the lock scope if _, exist := fm.functions[f.Name]; exist { - fm.functionsLock.Unlock() return common.ErrorFunctionExists } fm.functions[f.Name] = make([]*FunctionInstance, f.Replicas) @@ -55,7 +56,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { if err != nil { slog.ErrorContext(instance.ctx, "Error starting function instance", err) } - fm.functionsLock.Unlock() + instance.Stop() return err } } @@ -80,8 +81,31 @@ func (fm *FunctionManager) ListFunctions() (result []string) { fm.functionsLock.Lock() defer fm.functionsLock.Unlock() result = make([]string, len(fm.functions)) + i := 0 for k := range fm.functions { - result = append(result, k) + result[i] = k + i++ } return } + +func (fm *FunctionManager) ProduceEvent(name string, event Event) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c, err := fm.eventQueueFactory.NewSinkChan(ctx, &SinkQueueConfig{Topic: name}) + if err != nil { + return err + } + c <- event + return nil +} + +func (fm *FunctionManager) ConsumeEvent(name string) (Event, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c, err := fm.eventQueueFactory.NewSourceChan(ctx, &SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}) + if err != nil { + return nil, err + } + return <-c, nil +} diff --git a/lib/memory_queue.go b/lib/memory_queue.go index de6b988a..ef201837 100644 --- a/lib/memory_queue.go +++ b/lib/memory_queue.go @@ -16,37 +16,76 @@ package lib import ( "context" + "log/slog" "sync" + "sync/atomic" ) +type queue struct { + c chan Event + refCnt int32 +} + type MemoryQueueFactory struct { + ctx context.Context mu sync.Mutex - queues map[string]chan Event + queues map[string]*queue } -func NewMemoryQueueFactory() EventQueueFactory { +func NewMemoryQueueFactory(ctx context.Context) EventQueueFactory { return &MemoryQueueFactory{ - queues: make(map[string]chan Event), + ctx: ctx, + queues: make(map[string]*queue), } } func (f *MemoryQueueFactory) getOrCreateChan(name string) chan Event { - if queue, ok := f.queues[name]; ok { - return queue - } f.mu.Lock() defer f.mu.Unlock() - c := make(chan Event) - f.queues[name] = c + defer func() { + slog.InfoContext(f.ctx, "Get memory queue chan", + "current_use_count", atomic.LoadInt32(&f.queues[name].refCnt), + "name", name) + }() + if q, ok := f.queues[name]; ok { + atomic.AddInt32(&q.refCnt, 1) + return q.c + } + c := make(chan Event, 100) + f.queues[name] = &queue{ + c: c, + refCnt: 1, + } return c } +func (f *MemoryQueueFactory) release(name string) { + f.mu.Lock() + defer f.mu.Unlock() + q, ok := f.queues[name] + if !ok { + panic("release non-exist queue: " + name) + } + if atomic.AddInt32(&q.refCnt, -1) == 0 { + close(q.c) + delete(f.queues, name) + } + slog.InfoContext(f.ctx, "Released memory queue", + "current_use_count", atomic.LoadInt32(&q.refCnt), + "name", name) +} + func (f *MemoryQueueFactory) NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) { result := make(chan Event) for _, topic := range config.Topics { t := topic + go func() { + <-ctx.Done() + f.release(t) + }() go func() { c := f.getOrCreateChan(t) + defer close(result) for { select { case <-ctx.Done(): @@ -61,5 +100,22 @@ func (f *MemoryQueueFactory) NewSourceChan(ctx context.Context, config *SourceQu } func (f *MemoryQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) { - return f.getOrCreateChan(config.Topic), nil + c := f.getOrCreateChan(config.Topic) + wrapperC := make(chan Event) + go func() { + defer f.release(config.Topic) + for { + select { + case <-ctx.Done(): + return + case event, ok := <-wrapperC: + if !ok { + return + } + event.Ack() + c <- event + } + } + }() + return wrapperC, nil } diff --git a/lib/pulsar_queue.go b/lib/pulsar_queue.go index 55300a48..1737adcc 100644 --- a/lib/pulsar_queue.go +++ b/lib/pulsar_queue.go @@ -19,6 +19,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/pkg/errors" "log/slog" + "sync/atomic" ) type PulsarEventQueueFactory struct { @@ -41,16 +42,23 @@ func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (EventQueue if err != nil { return nil, err } + var closed atomic.Bool // TODO: Remove this after the bug of Producer.Flush is fixed go func() { <-ctx.Done() + slog.InfoContext(ctx, "Closing Pulsar event queue factory", slog.Any("config", config)) + closed.Store(true) pc.Close() }() handleErr := func(ctx context.Context, err error, message string, args ...interface{}) { if errors.Is(err, context.Canceled) { - slog.InfoContext(ctx, "function instance has been stopped") + slog.InfoContext(ctx, "Pulsar queue cancelled", slog.Any("config", config)) return } - slog.ErrorContext(ctx, message, args...) + extraArgs := append(args, slog.Any("config", config), slog.Any("error", err)) + slog.ErrorContext(ctx, message, extraArgs...) + } + log := func(message string, config interface{}, args ...interface{}) { + slog.InfoContext(ctx, message, append(args, slog.Any("config", config))...) } return &PulsarEventQueueFactory{ newSourceChan: func(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) { @@ -63,8 +71,11 @@ func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (EventQueue if err != nil { return nil, errors.Wrap(err, "Error creating consumer") } + log("Pulsar source queue created", config) go func() { + defer log("Pulsar source queue closed", config) defer consumer.Close() + defer close(c) for msg := range consumer.Chan() { c <- NewAckableEvent(msg.Payload(), func() { err := consumer.Ack(msg) @@ -85,18 +96,39 @@ func NewPulsarEventQueueFactory(ctx context.Context, config *Config) (EventQueue if err != nil { return nil, errors.Wrap(err, "Error creating producer") } + log("Pulsar sink queue created", config) go func() { + defer log("Pulsar sink queue closed", config) defer producer.Close() - for e := range c { - producer.SendAsync(ctx, &pulsar.ProducerMessage{ - Payload: e.GetPayload(), - }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { - if err != nil { - handleErr(ctx, err, "Error sending message", "error", err, "messageId", id) + flush := func() { + if closed.Load() { + return + } + err := producer.Flush() + if err != nil { + handleErr(ctx, err, "Error flushing producer", "error", err) + } + } + for { + select { + case e, ok := <-c: + if !ok { + flush() return } - e.Ack() - }) + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: e.GetPayload(), + }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + if err != nil { + handleErr(ctx, err, "Error sending message", "error", err, "messageId", id) + return + } + e.Ack() + }) + case <-ctx.Done(): + flush() + return + } } }() return c, nil diff --git a/openapi.yaml b/openapi.yaml index 0cda13e0..477be060 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -23,7 +23,7 @@ paths: summary: Returns a list of functions responses: '200': - description: A JSON array of user names + description: A JSON array of function names content: application/json: schema: @@ -66,6 +66,42 @@ paths: description: Function deleted successfully '404': description: Function not found + /api/v1/produce/{queue_name}: + put: + summary: Produces an event to a queue + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + type: string + responses: + '200': + description: Event produced successfully + '400': + description: Bad request, failed to parse event or body is empty + /api/v1/consume/{queue_name}: + get: + summary: Consumes an event from a queue + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + responses: + '200': + description: Event consumed successfully + content: + application/json: + schema: + type: string components: schemas: Function: diff --git a/perf/perf.go b/perf/perf.go index d7188112..de5db756 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -90,7 +90,7 @@ func (p *perf) Run(ctx context.Context) { PulsarURL: p.config.PulsarURL, } - queueFactory, err := p.queueBuilder(context.Background(), config) + queueFactory, err := p.queueBuilder(ctx, config) if err != nil { slog.Error( "Failed to create Event Queue Factory", @@ -99,7 +99,7 @@ func (p *perf) Run(ctx context.Context) { os.Exit(1) } - p.input, err = queueFactory.NewSinkChan(context.Background(), &lib.SinkQueueConfig{ + p.input, err = queueFactory.NewSinkChan(ctx, &lib.SinkQueueConfig{ Topic: f.Inputs[0], }) if err != nil { @@ -110,7 +110,7 @@ func (p *perf) Run(ctx context.Context) { os.Exit(1) } - p.output, err = queueFactory.NewSourceChan(context.Background(), &lib.SourceQueueConfig{ + p.output, err = queueFactory.NewSourceChan(ctx, &lib.SourceQueueConfig{ Topics: []string{f.Output}, SubName: "perf", }) @@ -141,6 +141,24 @@ func (p *perf) Run(ctx context.Context) { os.Exit(1) } + defer func() { + res, err := cli.DefaultAPI.ApiV1FunctionFunctionNameDelete(context.Background(), name).Execute() + if err != nil { + slog.Error( + "Failed to delete Function", + slog.Any("error", err), + ) + os.Exit(1) + } + if res.StatusCode != 200 { + slog.Error( + "Failed to delete Function", + slog.Any("statusCode", res.StatusCode), + ) + os.Exit(1) + } + }() + latencyCh := make(chan int64) var failureCount int64 go p.generateTraffic(ctx, latencyCh, &failureCount) @@ -171,38 +189,65 @@ func (p *perf) Run(ctx context.Context) { ops++ q.Insert(float64(l) / 1000.0) // Convert to millis case <-ctx.Done(): + slog.InfoContext(ctx, "Shutting down perf client") return } } } +func SendToChannel[T any](ctx context.Context, c chan<- T, e interface{}) bool { + select { + case c <- e.(T): // It will panic if `e` is not of type `T` or a type that can be converted to `T`. + return true + case <-ctx.Done(): + close(c) + return false + } +} + +func zeroValue[T any]() T { + var v T + return v +} + +func ReceiveFromChannel[T any](ctx context.Context, c <-chan T) (T, bool) { + select { + case e := <-c: + return e, true + case <-ctx.Done(): + return zeroValue[T](), false + } +} + func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failureCount *int64) { limiter := rate.NewLimiter(rate.Limit(p.config.RequestRate), int(p.config.RequestRate)) count := 0 - next := make(chan bool, int(p.config.RequestRate)) for { if err := limiter.Wait(ctx); err != nil { return } c := count count++ - + person := Person{Name: "rbt", Money: c, Expected: c + 1} + jsonBytes, err := json.Marshal(person) + if err != nil { + slog.Error( + "Failed to marshal Person", + slog.Any("error", err), + ) + os.Exit(1) + } + start := time.Now() + if !SendToChannel(ctx, p.input, lib.NewAckableEvent(jsonBytes, func() {})) { + return + } go func() { - person := Person{Name: "rbt", Money: c, Expected: c + 1} - jsonBytes, err := json.Marshal(person) - if err != nil { - slog.Error( - "Failed to marshal Person", - slog.Any("error", err), - ) - os.Exit(1) + e, ok := ReceiveFromChannel(ctx, p.output) + if !ok { + return } - start := time.Now() - p.input <- lib.NewAckableEvent(jsonBytes, func() {}) - next <- true - e := <-p.output latencyCh <- time.Since(start).Microseconds() payload := e.GetPayload() e.Ack() @@ -223,6 +268,5 @@ func (p *perf) generateTraffic(ctx context.Context, latencyCh chan int64, failur atomic.AddInt64(failureCount, 1) } }() - <-next } } diff --git a/restclient/README.md b/restclient/README.md index 51786366..6a0e09cc 100644 --- a/restclient/README.md +++ b/restclient/README.md @@ -77,9 +77,11 @@ All URIs are relative to *http://localhost:7300* Class | Method | HTTP request | Description ------------ | ------------- | ------------- | ------------- +*DefaultAPI* | [**ApiV1ConsumeQueueNameGet**](docs/DefaultAPI.md#apiv1consumequeuenameget) | **Get** /api/v1/consume/{queue_name} | Consumes an event from a queue *DefaultAPI* | [**ApiV1FunctionFunctionNameDelete**](docs/DefaultAPI.md#apiv1functionfunctionnamedelete) | **Delete** /api/v1/function/{function_name} | Deletes a function *DefaultAPI* | [**ApiV1FunctionFunctionNamePost**](docs/DefaultAPI.md#apiv1functionfunctionnamepost) | **Post** /api/v1/function/{function_name} | Starts a function *DefaultAPI* | [**ApiV1FunctionsGet**](docs/DefaultAPI.md#apiv1functionsget) | **Get** /api/v1/functions | Returns a list of functions +*DefaultAPI* | [**ApiV1ProduceQueueNamePut**](docs/DefaultAPI.md#apiv1producequeuenameput) | **Put** /api/v1/produce/{queue_name} | Produces an event to a queue ## Documentation For Models diff --git a/restclient/api_default.go b/restclient/api_default.go index c7987f77..f2b3e4b7 100644 --- a/restclient/api_default.go +++ b/restclient/api_default.go @@ -22,6 +22,108 @@ import ( // DefaultAPIService DefaultAPI service type DefaultAPIService service +type ApiApiV1ConsumeQueueNameGetRequest struct { + ctx context.Context + ApiService *DefaultAPIService + queueName string +} + +func (r ApiApiV1ConsumeQueueNameGetRequest) Execute() (string, *http.Response, error) { + return r.ApiService.ApiV1ConsumeQueueNameGetExecute(r) +} + +/* +ApiV1ConsumeQueueNameGet Consumes an event from a queue + + @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + @param queueName + @return ApiApiV1ConsumeQueueNameGetRequest +*/ +func (a *DefaultAPIService) ApiV1ConsumeQueueNameGet(ctx context.Context, queueName string) ApiApiV1ConsumeQueueNameGetRequest { + return ApiApiV1ConsumeQueueNameGetRequest{ + ApiService: a, + ctx: ctx, + queueName: queueName, + } +} + +// Execute executes the request +// +// @return string +func (a *DefaultAPIService) ApiV1ConsumeQueueNameGetExecute(r ApiApiV1ConsumeQueueNameGetRequest) (string, *http.Response, error) { + var ( + localVarHTTPMethod = http.MethodGet + localVarPostBody interface{} + formFiles []formFile + localVarReturnValue string + ) + + localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "DefaultAPIService.ApiV1ConsumeQueueNameGet") + if err != nil { + return localVarReturnValue, nil, &GenericOpenAPIError{error: err.Error()} + } + + localVarPath := localBasePath + "/api/v1/consume/{queue_name}" + localVarPath = strings.Replace(localVarPath, "{"+"queue_name"+"}", url.PathEscape(parameterValueToString(r.queueName, "queueName")), -1) + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := url.Values{} + localVarFormParams := url.Values{} + + // to determine the Content-Type header + localVarHTTPContentTypes := []string{} + + // set Content-Type header + localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) + if localVarHTTPContentType != "" { + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + } + + // to determine the Accept header + localVarHTTPHeaderAccepts := []string{"application/json"} + + // set Accept header + localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) + if localVarHTTPHeaderAccept != "" { + localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept + } + req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, formFiles) + if err != nil { + return localVarReturnValue, nil, err + } + + localVarHTTPResponse, err := a.client.callAPI(req) + if err != nil || localVarHTTPResponse == nil { + return localVarReturnValue, localVarHTTPResponse, err + } + + localVarBody, err := io.ReadAll(localVarHTTPResponse.Body) + localVarHTTPResponse.Body.Close() + localVarHTTPResponse.Body = io.NopCloser(bytes.NewBuffer(localVarBody)) + if err != nil { + return localVarReturnValue, localVarHTTPResponse, err + } + + if localVarHTTPResponse.StatusCode >= 300 { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: localVarHTTPResponse.Status, + } + return localVarReturnValue, localVarHTTPResponse, newErr + } + + err = a.client.decode(&localVarReturnValue, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: err.Error(), + } + return localVarReturnValue, localVarHTTPResponse, newErr + } + + return localVarReturnValue, localVarHTTPResponse, nil +} + type ApiApiV1FunctionFunctionNameDeleteRequest struct { ctx context.Context ApiService *DefaultAPIService @@ -310,3 +412,104 @@ func (a *DefaultAPIService) ApiV1FunctionsGetExecute(r ApiApiV1FunctionsGetReque return localVarReturnValue, localVarHTTPResponse, nil } + +type ApiApiV1ProduceQueueNamePutRequest struct { + ctx context.Context + ApiService *DefaultAPIService + queueName string + body *string +} + +func (r ApiApiV1ProduceQueueNamePutRequest) Body(body string) ApiApiV1ProduceQueueNamePutRequest { + r.body = &body + return r +} + +func (r ApiApiV1ProduceQueueNamePutRequest) Execute() (*http.Response, error) { + return r.ApiService.ApiV1ProduceQueueNamePutExecute(r) +} + +/* +ApiV1ProduceQueueNamePut Produces an event to a queue + + @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + @param queueName + @return ApiApiV1ProduceQueueNamePutRequest +*/ +func (a *DefaultAPIService) ApiV1ProduceQueueNamePut(ctx context.Context, queueName string) ApiApiV1ProduceQueueNamePutRequest { + return ApiApiV1ProduceQueueNamePutRequest{ + ApiService: a, + ctx: ctx, + queueName: queueName, + } +} + +// Execute executes the request +func (a *DefaultAPIService) ApiV1ProduceQueueNamePutExecute(r ApiApiV1ProduceQueueNamePutRequest) (*http.Response, error) { + var ( + localVarHTTPMethod = http.MethodPut + localVarPostBody interface{} + formFiles []formFile + ) + + localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "DefaultAPIService.ApiV1ProduceQueueNamePut") + if err != nil { + return nil, &GenericOpenAPIError{error: err.Error()} + } + + localVarPath := localBasePath + "/api/v1/produce/{queue_name}" + localVarPath = strings.Replace(localVarPath, "{"+"queue_name"+"}", url.PathEscape(parameterValueToString(r.queueName, "queueName")), -1) + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := url.Values{} + localVarFormParams := url.Values{} + if r.body == nil { + return nil, reportError("body is required and must be specified") + } + + // to determine the Content-Type header + localVarHTTPContentTypes := []string{"application/json"} + + // set Content-Type header + localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) + if localVarHTTPContentType != "" { + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + } + + // to determine the Accept header + localVarHTTPHeaderAccepts := []string{} + + // set Accept header + localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) + if localVarHTTPHeaderAccept != "" { + localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept + } + // body params + localVarPostBody = r.body + req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, formFiles) + if err != nil { + return nil, err + } + + localVarHTTPResponse, err := a.client.callAPI(req) + if err != nil || localVarHTTPResponse == nil { + return localVarHTTPResponse, err + } + + localVarBody, err := io.ReadAll(localVarHTTPResponse.Body) + localVarHTTPResponse.Body.Close() + localVarHTTPResponse.Body = io.NopCloser(bytes.NewBuffer(localVarBody)) + if err != nil { + return localVarHTTPResponse, err + } + + if localVarHTTPResponse.StatusCode >= 300 { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: localVarHTTPResponse.Status, + } + return localVarHTTPResponse, newErr + } + + return localVarHTTPResponse, nil +} diff --git a/server/config_loader.go b/server/config_loader.go index 08b94ac5..771ad0b4 100644 --- a/server/config_loader.go +++ b/server/config_loader.go @@ -16,33 +16,43 @@ package server import ( "context" + "github.com/functionstream/functionstream/common" "log/slog" "os" + "sync" "github.com/functionstream/functionstream/lib" ) var loadedConfig *lib.Config +var initConfig = sync.Once{} -const ( - PulsarQueueType = "pulsar" -) - -func init() { - loadedConfig = &lib.Config{ - ListenAddr: getEnvWithDefault("PORT", ":7300"), - PulsarURL: getEnvWithDefault("PULSAR_URL", "pulsar://localhost:6650"), - } - queueType := getEnvWithDefault("QUEUE_TYPE", PulsarQueueType) - switch queueType { - case PulsarQueueType: - loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { - return lib.NewPulsarEventQueueFactory(ctx, c) +func LoadConfigFromEnv() *lib.Config { + initConfig.Do(func() { + loadedConfig = &lib.Config{ + ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), + PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL), } - } + queueType := getEnvWithDefault("QUEUE_TYPE", common.DefaultQueueType) + switch queueType { + case common.PulsarQueueType: + loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { + return lib.NewPulsarEventQueueFactory(ctx, c) + } + } + }) + return loadedConfig } -func GetConfig() *lib.Config { +func LoadStandaloneConfigFromEnv() *lib.Config { + initConfig.Do(func() { + loadedConfig = &lib.Config{ + ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), + } + loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) { + return lib.NewMemoryQueueFactory(ctx), nil + } + }) return loadedConfig } diff --git a/server/server.go b/server/server.go index 1919fd70..24b0f1b3 100644 --- a/server/server.go +++ b/server/server.go @@ -16,13 +16,13 @@ package server import ( "encoding/json" - "errors" "fmt" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" "github.com/functionstream/functionstream/lib" "github.com/functionstream/functionstream/restclient" "github.com/gorilla/mux" + "github.com/pkg/errors" "io" "log/slog" "net/http" @@ -30,21 +30,17 @@ import ( type Server struct { manager *lib.FunctionManager + config *lib.Config } -func New() *Server { - manager, err := lib.NewFunctionManager(GetConfig()) +func New(config *lib.Config) *Server { + manager, err := lib.NewFunctionManager(config) if err != nil { slog.Error("Error creating function manager", err) } return &Server{ manager: manager, - } -} - -func NewWithFM(fm *lib.FunctionManager) *Server { - return &Server{ - manager: fm, + config: config, } } @@ -93,21 +89,69 @@ func (s *Server) startRESTHandlers() error { http.Error(w, err.Error(), http.StatusBadRequest) return } + slog.Info("Started function", slog.Any("name", functionName)) }).Methods("POST") r.HandleFunc("/api/v1/function/{function_name}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) functionName := vars["function_name"] - slog.Info("Deleting function", slog.Any("name", functionName)) + err := s.manager.DeleteFunction(functionName) if errors.Is(err, common.ErrorFunctionNotFound) { http.Error(w, err.Error(), http.StatusNotFound) return } + slog.Info("Deleted function", slog.Any("name", functionName)) }).Methods("DELETE") - return http.ListenAndServe(GetConfig().ListenAddr, r) + r.HandleFunc("/api/v1/functions", func(w http.ResponseWriter, r *http.Request) { + functions := s.manager.ListFunctions() + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(functions) + if err != nil { + slog.Error("Error when listing functions", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }).Methods("GET") + + r.HandleFunc("/api/v1/produce/{queue_name}", func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue_name"] + slog.Info("Producing event to queue", slog.Any("queue_name", queueName)) + content, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, errors.Wrap(err, "Failed to read body").Error(), http.StatusBadRequest) + return + } + err = s.manager.ProduceEvent(queueName, lib.NewAckableEvent(content, func() {})) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }).Methods("PUT") + + r.HandleFunc("/api/v1/consume/{queue_name}", func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue_name"] + slog.Info("Consuming event from queue", slog.Any("queue_name", queueName)) + event, err := s.manager.ConsumeEvent(queueName) + if err != nil { + slog.Error("Error when consuming event", "error", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(string(event.GetPayload())) + if err != nil { + slog.Error("Error when encoding event", "error", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }).Methods("GET") + + return http.ListenAndServe(s.config.ListenAddr, r) } func (s *Server) Close() error { diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 00000000..5bce1bc2 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "encoding/json" + "github.com/functionstream/functionstream/common/model" + "github.com/functionstream/functionstream/lib" + "github.com/functionstream/functionstream/tests" + "math/rand" + "strconv" + "testing" +) + +func TestStandaloneBasicFunction(t *testing.T) { + + conf := &lib.Config{ + ListenAddr: "localhost:7301", + QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) { + return lib.NewMemoryQueueFactory(ctx), nil + }, + } + s := New(conf) + go s.Run() + defer func() { + err := s.Close() + if err != nil { + t.Fatal(err) + } + }() + + inputTopic := "test-input-" + strconv.Itoa(rand.Int()) + outputTopic := "test-output-" + strconv.Itoa(rand.Int()) + + funcConf := &model.Function{ + Archive: "../bin/example_basic.wasm", + Inputs: []string{inputTopic}, + Output: outputTopic, + Name: "test-func", + Replicas: 1, + } + err := s.manager.StartFunction(funcConf) + if err != nil { + t.Fatal(err) + } + + p := &tests.Person{ + Name: "rbt", + Money: 0, + } + jsonBytes, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + err = s.manager.ProduceEvent(inputTopic, lib.NewAckableEvent(jsonBytes, func() { + })) + if err != nil { + t.Fatal(err) + } + + output := make(chan lib.Event) + go func() { + event, err := s.manager.ConsumeEvent(outputTopic) + if err != nil { + t.Error(err) + return + } + output <- event + }() + + event := <-output + var out tests.Person + err = json.Unmarshal(event.GetPayload(), &out) + if err != nil { + t.Error(err) + return + } + if out.Money != 1 { + t.Errorf("expected 1, got %d", out.Money) + return + } +}