diff --git a/Makefile b/Makefile index 831ba654..9ce6ec53 100644 --- a/Makefile +++ b/Makefile @@ -34,16 +34,18 @@ bench_race: get-apidocs: curl -o apidocs.json http://localhost:7300/apidocs +ADMIN_CLIENT_DIR := admin/client +FILES_TO_REMOVE := go.mod go.sum .travis.yml .openapi-generator-ignore git_push.sh .openapi-generator api test + gen-rest-client: - rm -r restclient - mkdir -p restclient - openapi-generator generate -i ./apidocs.json -g go -o restclient \ + -rm -r $(ADMIN_CLIENT_DIR) + mkdir -p $(ADMIN_CLIENT_DIR) + openapi-generator generate -i ./apidocs.json -g go -o $(ADMIN_CLIENT_DIR) \ --git-user-id functionstream \ - --git-repo-id functionstream/restclient \ - --package-name restclient \ + --git-repo-id functionstream/$(ADMIN_CLIENT_DIR) \ + --package-name adminclient \ --global-property apiDocs,apis,models,supportingFiles - rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \ - restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test + rm -r $(addprefix $(ADMIN_CLIENT_DIR)/, $(FILES_TO_REMOVE)) proto: for PROTO_FILE in $$(find . -name '*.proto'); do \ diff --git a/restclient/.gitignore b/admin/client/.gitignore similarity index 100% rename from restclient/.gitignore rename to admin/client/.gitignore diff --git a/restclient/README.md b/admin/client/README.md similarity index 82% rename from restclient/README.md rename to admin/client/README.md index a7bb9fd9..85d8b9f4 100644 --- a/restclient/README.md +++ b/admin/client/README.md @@ -1,4 +1,4 @@ -# Go API client for restclient +# Go API client for adminclient Manage Function Stream Resources @@ -22,7 +22,7 @@ go get golang.org/x/net/context Put the package under your project folder and add the following in import: ```go -import restclient "github.com/functionstream/functionstream/restclient" +import adminclient "github.com/functionstream/functionstream/admin/client" ``` To use a proxy, set the environment variable `HTTP_PROXY`: @@ -37,18 +37,18 @@ Default configuration comes with `Servers` field that contains server objects as ### Select Server Configuration -For using other server than the one defined on index 0 set context value `restclient.ContextServerIndex` of type `int`. +For using other server than the one defined on index 0 set context value `adminclient.ContextServerIndex` of type `int`. ```go -ctx := context.WithValue(context.Background(), restclient.ContextServerIndex, 1) +ctx := context.WithValue(context.Background(), adminclient.ContextServerIndex, 1) ``` ### Templated Server URL -Templated server URL is formatted using default variables from configuration or from context value `restclient.ContextServerVariables` of type `map[string]string`. +Templated server URL is formatted using default variables from configuration or from context value `adminclient.ContextServerVariables` of type `map[string]string`. ```go -ctx := context.WithValue(context.Background(), restclient.ContextServerVariables, map[string]string{ +ctx := context.WithValue(context.Background(), adminclient.ContextServerVariables, map[string]string{ "basePath": "v2", }) ``` @@ -59,13 +59,13 @@ Note, enum values are always validated and all unused variables are silently ign Each operation can use different server URL defined using `OperationServers` map in the `Configuration`. An operation is uniquely identified by `"{classname}Service.{nickname}"` string. -Similar rules for overriding default operation server index and variables applies by using `restclient.ContextOperationServerIndices` and `restclient.ContextOperationServerVariables` context maps. +Similar rules for overriding default operation server index and variables applies by using `adminclient.ContextOperationServerIndices` and `adminclient.ContextOperationServerVariables` context maps. ```go -ctx := context.WithValue(context.Background(), restclient.ContextOperationServerIndices, map[string]int{ +ctx := context.WithValue(context.Background(), adminclient.ContextOperationServerIndices, map[string]int{ "{classname}Service.{nickname}": 2, }) -ctx = context.WithValue(context.Background(), restclient.ContextOperationServerVariables, map[string]map[string]string{ +ctx = context.WithValue(context.Background(), adminclient.ContextOperationServerVariables, map[string]map[string]string{ "{classname}Service.{nickname}": { "port": "8443", }, diff --git a/restclient/api_function.go b/admin/client/api_function.go similarity index 99% rename from restclient/api_function.go rename to admin/client/api_function.go index 5f15a5b3..af3bb119 100644 --- a/restclient/api_function.go +++ b/admin/client/api_function.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/api_http_tube.go b/admin/client/api_http_tube.go similarity index 99% rename from restclient/api_http_tube.go rename to admin/client/api_http_tube.go index 3829c8fd..4e8752ee 100644 --- a/restclient/api_http_tube.go +++ b/admin/client/api_http_tube.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/api_state.go b/admin/client/api_state.go similarity index 99% rename from restclient/api_state.go rename to admin/client/api_state.go index 8a7fbe23..09857746 100644 --- a/restclient/api_state.go +++ b/admin/client/api_state.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/api_status.go b/admin/client/api_status.go similarity index 99% rename from restclient/api_status.go rename to admin/client/api_status.go index c8e24806..36aa8692 100644 --- a/restclient/api_status.go +++ b/admin/client/api_status.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/api_tube.go b/admin/client/api_tube.go similarity index 99% rename from restclient/api_tube.go rename to admin/client/api_tube.go index da83da70..ab2c290a 100644 --- a/restclient/api_tube.go +++ b/admin/client/api_tube.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/client.go b/admin/client/client.go similarity index 99% rename from restclient/client.go rename to admin/client/client.go index 8e937949..f067ce93 100644 --- a/restclient/client.go +++ b/admin/client/client.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/configuration.go b/admin/client/configuration.go similarity index 99% rename from restclient/configuration.go rename to admin/client/configuration.go index bb340af5..1c0c86f0 100644 --- a/restclient/configuration.go +++ b/admin/client/configuration.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "context" diff --git a/restclient/docs/ModelFunction.md b/admin/client/docs/ModelFunction.md similarity index 75% rename from restclient/docs/ModelFunction.md rename to admin/client/docs/ModelFunction.md index 15a3caf5..bb63fb65 100644 --- a/restclient/docs/ModelFunction.md +++ b/admin/client/docs/ModelFunction.md @@ -5,19 +5,17 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **Config** | Pointer to **map[string]string** | | [optional] -**Inputs** | **[]string** | | **Name** | **string** | | -**Output** | **string** | | **Replicas** | **int32** | | **Runtime** | [**ModelRuntimeConfig**](ModelRuntimeConfig.md) | | **Sink** | Pointer to [**ModelTubeConfig**](ModelTubeConfig.md) | | [optional] -**Source** | Pointer to [**ModelTubeConfig**](ModelTubeConfig.md) | | [optional] +**Source** | Pointer to [**[]ModelTubeConfig**](ModelTubeConfig.md) | | [optional] ## Methods ### NewModelFunction -`func NewModelFunction(inputs []string, name string, output string, replicas int32, runtime ModelRuntimeConfig, ) *ModelFunction` +`func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig, ) *ModelFunction` NewModelFunction instantiates a new ModelFunction object This constructor will assign default values to properties that have it defined, @@ -57,26 +55,6 @@ SetConfig sets Config field to given value. HasConfig returns a boolean if a field has been set. -### GetInputs - -`func (o *ModelFunction) GetInputs() []string` - -GetInputs returns the Inputs field if non-nil, zero value otherwise. - -### GetInputsOk - -`func (o *ModelFunction) GetInputsOk() (*[]string, bool)` - -GetInputsOk returns a tuple with the Inputs field if it's non-nil, zero value otherwise -and a boolean to check if the value has been set. - -### SetInputs - -`func (o *ModelFunction) SetInputs(v []string)` - -SetInputs sets Inputs field to given value. - - ### GetName `func (o *ModelFunction) GetName() string` @@ -97,26 +75,6 @@ and a boolean to check if the value has been set. SetName sets Name field to given value. -### GetOutput - -`func (o *ModelFunction) GetOutput() string` - -GetOutput returns the Output field if non-nil, zero value otherwise. - -### GetOutputOk - -`func (o *ModelFunction) GetOutputOk() (*string, bool)` - -GetOutputOk returns a tuple with the Output field if it's non-nil, zero value otherwise -and a boolean to check if the value has been set. - -### SetOutput - -`func (o *ModelFunction) SetOutput(v string)` - -SetOutput sets Output field to given value. - - ### GetReplicas `func (o *ModelFunction) GetReplicas() int32` @@ -184,20 +142,20 @@ HasSink returns a boolean if a field has been set. ### GetSource -`func (o *ModelFunction) GetSource() ModelTubeConfig` +`func (o *ModelFunction) GetSource() []ModelTubeConfig` GetSource returns the Source field if non-nil, zero value otherwise. ### GetSourceOk -`func (o *ModelFunction) GetSourceOk() (*ModelTubeConfig, bool)` +`func (o *ModelFunction) GetSourceOk() (*[]ModelTubeConfig, bool)` GetSourceOk returns a tuple with the Source field if it's non-nil, zero value otherwise and a boolean to check if the value has been set. ### SetSource -`func (o *ModelFunction) SetSource(v ModelTubeConfig)` +`func (o *ModelFunction) SetSource(v []ModelTubeConfig)` SetSource sets Source field to given value. diff --git a/restclient/docs/ModelRuntimeConfig.md b/admin/client/docs/ModelRuntimeConfig.md similarity index 100% rename from restclient/docs/ModelRuntimeConfig.md rename to admin/client/docs/ModelRuntimeConfig.md diff --git a/restclient/docs/ModelTubeConfig.md b/admin/client/docs/ModelTubeConfig.md similarity index 100% rename from restclient/docs/ModelTubeConfig.md rename to admin/client/docs/ModelTubeConfig.md diff --git a/restclient/docs/RestfulspecSchemaType.md b/admin/client/docs/RestfulspecSchemaType.md similarity index 100% rename from restclient/docs/RestfulspecSchemaType.md rename to admin/client/docs/RestfulspecSchemaType.md diff --git a/restclient/model_model_function.go b/admin/client/model_model_function.go similarity index 80% rename from restclient/model_model_function.go rename to admin/client/model_model_function.go index 24a9d07f..490aec5f 100644 --- a/restclient/model_model_function.go +++ b/admin/client/model_model_function.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" @@ -22,13 +22,11 @@ var _ MappedNullable = &ModelFunction{} // ModelFunction struct for ModelFunction type ModelFunction struct { Config *map[string]string `json:"config,omitempty"` - Inputs []string `json:"inputs"` Name string `json:"name"` - Output string `json:"output"` Replicas int32 `json:"replicas"` Runtime ModelRuntimeConfig `json:"runtime"` Sink *ModelTubeConfig `json:"sink,omitempty"` - Source *ModelTubeConfig `json:"source,omitempty"` + Source []ModelTubeConfig `json:"source,omitempty"` } type _ModelFunction ModelFunction @@ -37,11 +35,9 @@ type _ModelFunction ModelFunction // This constructor will assign default values to properties that have it defined, // and makes sure properties required by API are set, but the set of arguments // will change when the set of required properties is changed -func NewModelFunction(inputs []string, name string, output string, replicas int32, runtime ModelRuntimeConfig) *ModelFunction { +func NewModelFunction(name string, replicas int32, runtime ModelRuntimeConfig) *ModelFunction { this := ModelFunction{} - this.Inputs = inputs this.Name = name - this.Output = output this.Replicas = replicas this.Runtime = runtime return &this @@ -87,30 +83,6 @@ func (o *ModelFunction) SetConfig(v map[string]string) { o.Config = &v } -// GetInputs returns the Inputs field value -func (o *ModelFunction) GetInputs() []string { - if o == nil { - var ret []string - return ret - } - - return o.Inputs -} - -// GetInputsOk returns a tuple with the Inputs field value -// and a boolean to check if the value has been set. -func (o *ModelFunction) GetInputsOk() ([]string, bool) { - if o == nil { - return nil, false - } - return o.Inputs, true -} - -// SetInputs sets field value -func (o *ModelFunction) SetInputs(v []string) { - o.Inputs = v -} - // GetName returns the Name field value func (o *ModelFunction) GetName() string { if o == nil { @@ -135,30 +107,6 @@ func (o *ModelFunction) SetName(v string) { o.Name = v } -// GetOutput returns the Output field value -func (o *ModelFunction) GetOutput() string { - if o == nil { - var ret string - return ret - } - - return o.Output -} - -// GetOutputOk returns a tuple with the Output field value -// and a boolean to check if the value has been set. -func (o *ModelFunction) GetOutputOk() (*string, bool) { - if o == nil { - return nil, false - } - return &o.Output, true -} - -// SetOutput sets field value -func (o *ModelFunction) SetOutput(v string) { - o.Output = v -} - // GetReplicas returns the Replicas field value func (o *ModelFunction) GetReplicas() int32 { if o == nil { @@ -240,17 +188,17 @@ func (o *ModelFunction) SetSink(v ModelTubeConfig) { } // GetSource returns the Source field value if set, zero value otherwise. -func (o *ModelFunction) GetSource() ModelTubeConfig { +func (o *ModelFunction) GetSource() []ModelTubeConfig { if o == nil || IsNil(o.Source) { - var ret ModelTubeConfig + var ret []ModelTubeConfig return ret } - return *o.Source + return o.Source } // GetSourceOk returns a tuple with the Source field value if set, nil otherwise // and a boolean to check if the value has been set. -func (o *ModelFunction) GetSourceOk() (*ModelTubeConfig, bool) { +func (o *ModelFunction) GetSourceOk() ([]ModelTubeConfig, bool) { if o == nil || IsNil(o.Source) { return nil, false } @@ -266,9 +214,9 @@ func (o *ModelFunction) HasSource() bool { return false } -// SetSource gets a reference to the given ModelTubeConfig and assigns it to the Source field. -func (o *ModelFunction) SetSource(v ModelTubeConfig) { - o.Source = &v +// SetSource gets a reference to the given []ModelTubeConfig and assigns it to the Source field. +func (o *ModelFunction) SetSource(v []ModelTubeConfig) { + o.Source = v } func (o ModelFunction) MarshalJSON() ([]byte, error) { @@ -284,9 +232,7 @@ func (o ModelFunction) ToMap() (map[string]interface{}, error) { if !IsNil(o.Config) { toSerialize["config"] = o.Config } - toSerialize["inputs"] = o.Inputs toSerialize["name"] = o.Name - toSerialize["output"] = o.Output toSerialize["replicas"] = o.Replicas toSerialize["runtime"] = o.Runtime if !IsNil(o.Sink) { @@ -303,9 +249,7 @@ func (o *ModelFunction) UnmarshalJSON(data []byte) (err error) { // by unmarshalling the object into a generic map with string keys and checking // that every required field exists as a key in the generic map. requiredProperties := []string{ - "inputs", "name", - "output", "replicas", "runtime", } diff --git a/restclient/model_model_runtime_config.go b/admin/client/model_model_runtime_config.go similarity index 99% rename from restclient/model_model_runtime_config.go rename to admin/client/model_model_runtime_config.go index c400ce18..cbaeae5d 100644 --- a/restclient/model_model_runtime_config.go +++ b/admin/client/model_model_runtime_config.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "encoding/json" diff --git a/restclient/model_model_tube_config.go b/admin/client/model_model_tube_config.go similarity index 99% rename from restclient/model_model_tube_config.go rename to admin/client/model_model_tube_config.go index c555a9cf..f3570a74 100644 --- a/restclient/model_model_tube_config.go +++ b/admin/client/model_model_tube_config.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "encoding/json" diff --git a/restclient/model_restfulspec_schema_type.go b/admin/client/model_restfulspec_schema_type.go similarity index 99% rename from restclient/model_restfulspec_schema_type.go rename to admin/client/model_restfulspec_schema_type.go index 6fe24624..9ec66446 100644 --- a/restclient/model_restfulspec_schema_type.go +++ b/admin/client/model_restfulspec_schema_type.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "bytes" diff --git a/restclient/response.go b/admin/client/response.go similarity index 98% rename from restclient/response.go rename to admin/client/response.go index bf098739..2e36f66d 100644 --- a/restclient/response.go +++ b/admin/client/response.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "net/http" diff --git a/restclient/utils.go b/admin/client/utils.go similarity index 99% rename from restclient/utils.go rename to admin/client/utils.go index 0c471c17..adf098e7 100644 --- a/restclient/utils.go +++ b/admin/client/utils.go @@ -8,7 +8,7 @@ API version: 1.0.0 // Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. -package restclient +package adminclient import ( "encoding/json" diff --git a/admin/utils/utils.go b/admin/utils/utils.go new file mode 100644 index 00000000..2358b5ac --- /dev/null +++ b/admin/utils/utils.go @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "fmt" + "github.com/functionstream/function-stream/admin/client" +) + +func MakeQueueSourceTubeConfig(subName string, topics ...string) []adminclient.ModelTubeConfig { + return []adminclient.ModelTubeConfig{ + { + Config: map[string]interface{}{ + "topicList": append([]string{}, topics...), + "subName": subName, + }, + }, + } +} + +func MakeQueueSinkTubeConfig(topic string) *adminclient.ModelTubeConfig { + return &adminclient.ModelTubeConfig{ + Config: map[string]interface{}{ + "topic": topic, + }, + } +} + +func GetInputTopics(f *adminclient.ModelFunction) ([]string, error) { + if len(f.Source) < 1 { + return nil, fmt.Errorf("function %s has no sources", f.Name) + } + config := f.Source[0].Config + if len(config) < 1 { + return nil, fmt.Errorf("source config for function %s is empty", f.Name) + } + if topicList, ok := config["topicList"].([]string); ok { + return topicList, nil + } + return nil, fmt.Errorf("source config for function %s has no topicList", f.Name) +} + +func GetOutputTopic(f *adminclient.ModelFunction) (string, error) { + if f.Sink == nil { + return "", fmt.Errorf("function %s has no sink", f.Name) + } + config := f.Sink.Config + if len(config) < 1 { + return "", fmt.Errorf("sink config for function %s is empty", f.Name) + } + if topic, ok := config["topic"].(string); ok { + return topic, nil + } + return "", fmt.Errorf("sink config for function %s has no topic", f.Name) +} diff --git a/apidocs.json b/apidocs.json index f8b4f6a7..6e1e8c2e 100644 --- a/apidocs.json +++ b/apidocs.json @@ -287,8 +287,6 @@ "required": [ "name", "runtime", - "inputs", - "output", "replicas" ], "properties": { @@ -298,18 +296,9 @@ "type": "string" } }, - "inputs": { - "type": "array", - "items": { - "type": "string" - } - }, "name": { "type": "string" }, - "output": { - "type": "string" - }, "replicas": { "type": "integer", "format": "int32" @@ -321,7 +310,10 @@ "$ref": "#/definitions/model.TubeConfig" }, "source": { - "$ref": "#/definitions/model.TubeConfig" + "type": "array", + "items": { + "$ref": "#/definitions/model.TubeConfig" + } } } }, diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 4c08d748..15679092 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -20,11 +20,12 @@ import ( "context" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/functionstream/function-stream/admin/client" + adminutils "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/perf" - "github.com/functionstream/function-stream/restclient" "github.com/functionstream/function-stream/server" "math/rand" "os" @@ -70,14 +71,14 @@ func BenchmarkStressForBasicFunc(b *testing.B) { pConfig := &perf.Config{ PulsarURL: "pulsar://localhost:6650", RequestRate: 200000.0, - Func: &restclient.ModelFunction{ - Runtime: restclient.ModelRuntimeConfig{ + Func: &adminclient.ModelFunction{ + Runtime: adminclient.ModelRuntimeConfig{ Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Inputs: []string{inputTopic}, - Output: outputTopic, + Source: adminutils.MakeQueueSourceTubeConfig("fs", inputTopic), + Sink: adminutils.MakeQueueSinkTubeConfig(outputTopic), Replicas: replicas, }, } @@ -127,14 +128,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { pConfig := &perf.Config{ RequestRate: 200000.0, - Func: &restclient.ModelFunction{ - Runtime: restclient.ModelRuntimeConfig{ + Func: &adminclient.ModelFunction{ + Runtime: adminclient.ModelRuntimeConfig{ Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Inputs: []string{inputTopic}, - Output: outputTopic, + Source: adminutils.MakeQueueSourceTubeConfig("fs", inputTopic), + Sink: adminutils.MakeQueueSinkTubeConfig(outputTopic), Replicas: replicas, }, QueueBuilder: func(ctx context.Context) (contube.TubeFactory, error) { diff --git a/cmd/client/consume/cmd.go b/cmd/client/consume/cmd.go index acc59512..0df1a2bc 100644 --- a/cmd/client/consume/cmd.go +++ b/cmd/client/consume/cmd.go @@ -18,8 +18,8 @@ package consume import ( "fmt" + "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" - "github.com/functionstream/function-stream/restclient" "github.com/spf13/cobra" "golang.org/x/net/context" "os" @@ -47,11 +47,11 @@ func init() { } func exec(_ *cobra.Command, _ []string) { - cfg := restclient.NewConfiguration() - cfg.Servers = []restclient.ServerConfiguration{{ + cfg := adminclient.NewConfiguration() + cfg.Servers = []adminclient.ServerConfiguration{{ URL: common.Config.ServiceAddr, }} - cli := restclient.NewAPIClient(cfg) + cli := adminclient.NewAPIClient(cfg) e, res, err := cli.TubeAPI.ConsumeMessage(context.Background(), config.name).Execute() if err != nil { diff --git a/cmd/client/create/cmd.go b/cmd/client/create/cmd.go index bbf177c0..5852b1dd 100644 --- a/cmd/client/create/cmd.go +++ b/cmd/client/create/cmd.go @@ -19,9 +19,10 @@ package create import ( "context" "fmt" + "github.com/functionstream/function-stream/admin/client" + "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/cmd/client/common" fs_cmmon "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/restclient" "github.com/spf13/cobra" "io" "os" @@ -58,18 +59,18 @@ func init() { } func exec(_ *cobra.Command, _ []string) { - cfg := restclient.NewConfiguration() - cfg.Servers = []restclient.ServerConfiguration{{ + cfg := adminclient.NewConfiguration() + cfg.Servers = []adminclient.ServerConfiguration{{ URL: common.Config.ServiceAddr, }} - cli := restclient.NewAPIClient(cfg) - f := restclient.ModelFunction{ + cli := adminclient.NewAPIClient(cfg) + f := adminclient.ModelFunction{ Name: config.name, - Runtime: restclient.ModelRuntimeConfig{Config: map[string]interface{}{ + Runtime: adminclient.ModelRuntimeConfig{Config: map[string]interface{}{ fs_cmmon.RuntimeArchiveConfigKey: config.archive, }}, - Inputs: config.inputs, - Output: config.output, + Source: utils.MakeQueueSourceTubeConfig("fs", config.inputs...), + Sink: utils.MakeQueueSinkTubeConfig(config.output), Replicas: config.replica, } diff --git a/cmd/client/delete/cmd.go b/cmd/client/delete/cmd.go index defd79b3..186001d5 100644 --- a/cmd/client/delete/cmd.go +++ b/cmd/client/delete/cmd.go @@ -17,8 +17,8 @@ package del import ( + "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" - "github.com/functionstream/function-stream/restclient" "github.com/spf13/cobra" ) @@ -44,11 +44,11 @@ func init() { } func exec(_ *cobra.Command, _ []string) { - cfg := restclient.NewConfiguration() - cfg.Servers = []restclient.ServerConfiguration{{ + cfg := adminclient.NewConfiguration() + cfg.Servers = []adminclient.ServerConfiguration{{ URL: common.Config.ServiceAddr, }} - _ = restclient.NewAPIClient(cfg) + _ = adminclient.NewAPIClient(cfg) //res, err := cli.DefaultAPI.ApiV1FunctionFunctionNameDelete(context.Background(), config.name).Execute() //if err != nil { diff --git a/cmd/client/list/cmd.go b/cmd/client/list/cmd.go index 9bf6275c..9ba5a0d1 100644 --- a/cmd/client/list/cmd.go +++ b/cmd/client/list/cmd.go @@ -19,8 +19,8 @@ package list import ( "context" "fmt" + "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" - "github.com/functionstream/function-stream/restclient" "github.com/spf13/cobra" "os" ) @@ -34,11 +34,11 @@ var Cmd = &cobra.Command{ } func exec(_ *cobra.Command, _ []string) { - cfg := restclient.NewConfiguration() - cfg.Servers = []restclient.ServerConfiguration{{ + cfg := adminclient.NewConfiguration() + cfg.Servers = []adminclient.ServerConfiguration{{ URL: common.Config.ServiceAddr, }} - cli := restclient.NewAPIClient(cfg) + cli := adminclient.NewAPIClient(cfg) list, res, err := cli.FunctionAPI.GetAllFunctions(context.Background()).Execute() if err != nil { diff --git a/cmd/client/produce/cmd.go b/cmd/client/produce/cmd.go index be4d36b1..ff47825c 100644 --- a/cmd/client/produce/cmd.go +++ b/cmd/client/produce/cmd.go @@ -18,8 +18,8 @@ package produce import ( "fmt" + "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" - "github.com/functionstream/function-stream/restclient" "github.com/spf13/cobra" "golang.org/x/net/context" "os" @@ -49,11 +49,11 @@ func init() { } func exec(_ *cobra.Command, _ []string) { - cfg := restclient.NewConfiguration() - cfg.Servers = []restclient.ServerConfiguration{{ + cfg := adminclient.NewConfiguration() + cfg.Servers = []adminclient.ServerConfiguration{{ URL: common.Config.ServiceAddr, }} - cli := restclient.NewAPIClient(cfg) + cli := adminclient.NewAPIClient(cfg) res, err := cli.TubeAPI.ProduceMessage(context.Background(), config.name).Body(config.content).Execute() if err != nil { diff --git a/common/model/function.go b/common/model/function.go index 9b48fcd0..d8312a69 100644 --- a/common/model/function.go +++ b/common/model/function.go @@ -19,9 +19,9 @@ package model import "github.com/functionstream/function-stream/fs/contube" type TubeConfig struct { - Config contube.ConfigMap `json:"config,omitempty" yaml:"config,omitempty"` - Type *string `json:"type,omitempty" yaml:"type,omitempty"` // Default to `default` + Type *string `json:"type,omitempty"` // Default to `default` Name *string `json:"name,omitempty"` + Config contube.ConfigMap `json:"config,omitempty"` } type ConfigMap map[string]interface{} @@ -32,14 +32,10 @@ type RuntimeConfig struct { } type Function struct { - Name string `json:"name" yaml:"name"` - Runtime *RuntimeConfig `json:"runtime" yaml:"runtime"` - // Deprecate - Source *TubeConfig `json:"source,omitempty" yaml:"source,omitempty"` - // Deprecate - Sink *TubeConfig `json:"sink,omitempty" yaml:"sink,omitempty"` - Inputs []string `json:"inputs" yaml:"inputs"` - Output string `json:"output" yaml:"output"` - Config map[string]string `json:"config,omitempty" yaml:"config,omitempty"` - Replicas int32 `json:"replicas" yaml:"replicas"` + Name string `json:"name"` + Runtime *RuntimeConfig `json:"runtime"` + Sources []*TubeConfig `json:"source,omitempty"` + Sink *TubeConfig `json:"sink,omitempty"` + Config map[string]string `json:"config,omitempty"` + Replicas int32 `json:"replicas"` } diff --git a/common/model/function_serde_test.go b/common/model/function_serde_test.go index 63e7ae71..6fac8088 100644 --- a/common/model/function_serde_test.go +++ b/common/model/function_serde_test.go @@ -29,10 +29,8 @@ func TestFunctionSerde(t *testing.T) { f := Function{ Name: "TestFunction", Runtime: &RuntimeConfig{Type: common.OptionalStr("runtime"), Config: map[string]interface{}{"key": "value"}}, - Source: &TubeConfig{Type: common.OptionalStr("source"), Config: map[string]interface{}{"key": "value"}}, + Sources: []*TubeConfig{{Type: common.OptionalStr("source"), Config: map[string]interface{}{"key": "value"}}}, Sink: &TubeConfig{Type: common.OptionalStr("sink"), Config: map[string]interface{}{"key": "value"}}, - Inputs: []string{"input1", "input2"}, - Output: "output", Config: map[string]string{"key": "value"}, Replicas: 2, } @@ -79,10 +77,8 @@ func TestFunctionSerdeWithNil(t *testing.T) { f := Function{ Name: "TestFunction", Runtime: nil, - Source: nil, + Sources: nil, Sink: nil, - Inputs: []string{"input1", "input2"}, - Output: "output", Config: map[string]string{"key": "value"}, Replicas: 2, } @@ -114,6 +110,8 @@ func TestFunctionSerdeWithNil(t *testing.T) { fmt.Println(string(data)) + f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML serialization + // YAML Deserialization err = yaml.Unmarshal(data, &f2) if err != nil { diff --git a/common/model/stream.go b/common/model/stream.go index a1c7701b..c8cbdfdf 100644 --- a/common/model/stream.go +++ b/common/model/stream.go @@ -17,6 +17,6 @@ package model type Stream struct { - Tubes map[string]*TubeConfig `json:"tubes,omitempty" yaml:"tubes,omitempty"` - Functions map[string]*Function `json:"functions,omitempty" yaml:"functions,omitempty"` + Tubes []*TubeConfig `json:"tubes,omitempty" yaml:"tubes,omitempty"` + Functions map[string]*Function `json:"functions,omitempty" yaml:"functions,omitempty"` } diff --git a/fs/api/instance.go b/fs/api/instance.go index 6ffa7920..e5d923fc 100644 --- a/fs/api/instance.go +++ b/fs/api/instance.go @@ -29,11 +29,11 @@ type FunctionInstance interface { Definition() *model.Function Index() int32 Stop() - Run(factory FunctionRuntimeFactory) + Run(factory FunctionRuntimeFactory, sources []<-chan contube.Record, sink chan<- contube.Record) WaitForReady() <-chan error Logger() *slog.Logger } type FunctionInstanceFactory interface { - NewFunctionInstance(f *model.Function, funcCtx FunctionContext, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance + NewFunctionInstance(f *model.Function, funcCtx FunctionContext, i int32, logger *slog.Logger) FunctionInstance } diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 50377055..3205fff7 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -18,6 +18,7 @@ package contube import ( "context" + "fmt" "github.com/pkg/errors" ) @@ -45,20 +46,33 @@ const ( SubNameKey = "subName" ) -func NewSourceQueueConfig(config ConfigMap) *SourceQueueConfig { +func NewSourceQueueConfig(config ConfigMap) (*SourceQueueConfig, error) { var result SourceQueueConfig - if topics, ok := config[TopicListKey].([]string); ok { - result.Topics = topics + // The list value type should be considered as interface{} + if topics, ok := config[TopicListKey].([]interface{}); ok { + var topicStrList []string + for _, v := range topics { + if topicStr, ok := v.(string); ok { + topicStrList = append(topicStrList, v.(string)) + } else { + return nil, fmt.Errorf("invalid topic in the %s: %s", TopicListKey, topicStr) + } + } + result.Topics = topicStrList } if subName, ok := config[SubNameKey].(string); ok { result.SubName = subName } - return &result + return &result, nil } func (c *SourceQueueConfig) ToConfigMap() ConfigMap { + topicListInterface := make([]interface{}, len(c.Topics)) + for i, v := range c.Topics { + topicListInterface[i] = v + } return ConfigMap{ - TopicListKey: c.Topics, + TopicListKey: topicListInterface, SubNameKey: c.SubName, } } diff --git a/fs/contube/memory.go b/fs/contube/memory.go index c3425c63..176b0812 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -78,7 +78,10 @@ func (f *MemoryQueueFactory) release(name string) { } func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { - config := NewSourceQueueConfig(configMap) + config, err := NewSourceQueueConfig(configMap) + if err != nil { + return nil, err + } result := make(chan Record) var wg sync.WaitGroup diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index be9ea1d4..f67fa277 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -54,7 +54,10 @@ type PulsarEventQueueFactory struct { } func (f *PulsarEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { - config := NewSourceQueueConfig(configMap) + config, err := NewSourceQueueConfig(configMap) + if err != nil { + return nil, err + } return f.newSourceChan(ctx, config) } diff --git a/fs/instance_impl.go b/fs/instance_impl.go index 85767a51..7382a5bb 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -18,26 +18,24 @@ package fs import ( "context" - "fmt" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/pkg/errors" "log/slog" + "reflect" ) type FunctionInstanceImpl struct { - ctx context.Context - funcCtx api.FunctionContext - cancelFunc context.CancelFunc - definition *model.Function - sourceFactory contube.SourceTubeFactory - sinkFactory contube.SinkTubeFactory - readyCh chan error - index int32 - parentLog *slog.Logger - log *slog.Logger + ctx context.Context + funcCtx api.FunctionContext + cancelFunc context.CancelFunc + definition *model.Function + readyCh chan error + index int32 + parentLog *slog.Logger + log *slog.Logger } type CtxKey string @@ -55,49 +53,29 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory { return &DefaultInstanceFactory{} } -func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32, logger *slog.Logger) api.FunctionInstance { +func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, index int32, logger *slog.Logger) api.FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) ctx = context.WithValue(ctx, CtxKeyFunctionName, definition.Name) ctx = context.WithValue(ctx, CtxKeyInstanceIndex, index) return &FunctionInstanceImpl{ - ctx: ctx, - funcCtx: funcCtx, - cancelFunc: cancelFunc, - definition: definition, - sourceFactory: sourceFactory, - sinkFactory: sinkFactory, - readyCh: make(chan error), - index: index, - parentLog: logger, - log: logger.With(slog.String("component", "function-instance")), + ctx: ctx, + funcCtx: funcCtx, + cancelFunc: cancelFunc, + definition: definition, + readyCh: make(chan error), + index: index, + parentLog: logger, + log: logger.With(slog.String("component", "function-instance")), } } -func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory) { +func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory, sources []<-chan contube.Record, sink chan<- contube.Record) { runtime, err := runtimeFactory.NewFunctionRuntime(instance) if err != nil { instance.readyCh <- errors.Wrap(err, "Error creating runtime") return } - getTubeConfig := func(config contube.ConfigMap, tubeConfig *model.TubeConfig) contube.ConfigMap { - if tubeConfig != nil && tubeConfig.Config != nil { - return contube.MergeConfig(config, tubeConfig.Config) - } - return config - } - sourceConfig := (&contube.SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)}).ToConfigMap() - sourceChan, err := instance.sourceFactory.NewSourceTube(instance.ctx, getTubeConfig(sourceConfig, instance.definition.Source)) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating source event queue") - return - } - sinkConfig := (&contube.SinkQueueConfig{Topic: instance.definition.Output}).ToConfigMap() - sinkChan, err := instance.sinkFactory.NewSinkTube(instance.ctx, getTubeConfig(sinkConfig, instance.definition.Sink)) - if err != nil { - instance.readyCh <- errors.Wrap(err, "Error creating sink event queue") - return - } - defer close(sinkChan) + defer close(sink) err = <-runtime.WaitForReady() if err != nil { instance.readyCh <- errors.Wrap(err, "Error waiting for runtime to be ready") @@ -107,26 +85,53 @@ func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFact close(instance.readyCh) defer instance.log.InfoContext(instance.ctx, "function instance has been stopped") logCounter := common.LogCounter() - for e := range sourceChan { - instance.log.DebugContext(instance.ctx, "calling process function", slog.Any("count", logCounter)) - output, err := runtime.Call(e) + channels := make([]reflect.SelectCase, len(sources)+1) + for i, s := range sources { + channels[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s)} + } + channels[len(sources)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(instance.ctx.Done())} + + for len(channels) > 0 { + // Use reflect.Select to select a channel from the slice + chosen, value, ok := reflect.Select(channels) + if !ok { + // The selected channel has been closed, remove it from the slice + channels = append(channels[:chosen], channels[chosen+1:]...) + continue + } + + // Convert the selected value to the type Record + record := value.Interface().(contube.Record) + instance.log.DebugContext(instance.ctx, "Calling process function", slog.Any("count", logCounter)) + + // Call the processing function + output, err := runtime.Call(record) if err != nil { if errors.Is(err, context.Canceled) { return } + // Log the error if there's an issue with the processing function instance.log.ErrorContext(instance.ctx, "Error calling process function", slog.Any("error", err)) return } + + // If the output is nil, continue with the next iteration if output == nil { - instance.log.DebugContext(instance.ctx, "output is nil") + instance.log.DebugContext(instance.ctx, "Output is nil") continue } + + // Try to send the output to the sink, but also listen to the context's Done channel select { - case sinkChan <- output: + case sink <- output: case <-instance.ctx.Done(): return } + // If the selected channel is the context's Done channel, exit the loop + if chosen == len(channels)-1 { + return + } } } diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index 79d10ab0..e4ee40a8 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -28,7 +28,7 @@ func TestFunctionInstanceContextSetting(t *testing.T) { Name: "test-function", } index := int32(1) - instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, nil, index, slog.Default()) + instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, index, slog.Default()) if instance == nil { t.Error("FunctionInstance should not be nil") diff --git a/fs/manager.go b/fs/manager.go index 1d78f7df..f038930c 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -131,7 +131,7 @@ func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) { }, nil } -func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) { +func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) { // TODO: Change input parameter to Type get := func(t string) (contube.TubeFactory, error) { factory, exist := fm.options.tubeFactoryMap[t] if !exist { @@ -183,17 +183,9 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { } funcCtx := fm.createFuncCtx(f) for i := int32(0); i < f.Replicas; i++ { - sourceFactory, err := fm.getTubeFactory(f.Source) - if err != nil { - return err - } - sinkFactory, err := fm.getTubeFactory(f.Sink) - if err != nil { - return err - } runtimeType := fm.getRuntimeType(f.Runtime) - instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, sourceFactory, sinkFactory, i, slog.With( + instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, i, slog.With( slog.String("name", f.Name), slog.Int("index", int(i)), slog.String("runtime", runtimeType), @@ -205,7 +197,28 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { if err != nil { return err } - go instance.Run(runtimeFactory) + sources := []<-chan contube.Record{} + for _, t := range f.Sources { + sourceFactory, err := fm.getTubeFactory(t) + if err != nil { + return nil + } + sourceChan, err := sourceFactory.NewSourceTube(instance.Context(), t.Config) + if err != nil { + return errors.Wrap(err, "Error creating source event queue") + } + sources = append(sources, sourceChan) + } + sinkFactory, err := fm.getTubeFactory(f.Sink) + if err != nil { + return nil + } + sink, err := sinkFactory.NewSinkTube(instance.Context(), f.Sink.Config) + if err != nil { + return errors.Wrap(err, "Error creating sink event queue") + } + + go instance.Run(runtimeFactory, sources, sink) select { case err := <-instance.WaitForReady(): if err != nil { diff --git a/fs/runtime/grpc/grpc_func_test.go b/fs/runtime/grpc/grpc_func_test.go index 92b94742..f5b51936 100644 --- a/fs/runtime/grpc/grpc_func_test.go +++ b/fs/runtime/grpc/grpc_func_test.go @@ -54,6 +54,8 @@ func TestFMWithGRPCRuntime(t *testing.T) { t.Fatal(err) } + inputTopic := "input" + outputTopic := "output" f := &model.Function{ Name: "test", Runtime: &model.RuntimeConfig{ @@ -62,8 +64,19 @@ func TestFMWithGRPCRuntime(t *testing.T) { "addr": addr, }, }, - Inputs: []string{"input"}, - Output: "output", + Sources: []*model.TubeConfig{ + { + Config: (&contube.SourceQueueConfig{ + Topics: []string{inputTopic}, + SubName: "test", + }).ToConfigMap(), + }, + }, + Sink: &model.TubeConfig{ + Config: (&contube.SinkQueueConfig{ + Topic: outputTopic, + }).ToConfigMap(), + }, Replicas: 1, } @@ -76,11 +89,11 @@ func TestFMWithGRPCRuntime(t *testing.T) { assert.Nil(t, err) event := contube.NewRecordImpl([]byte("hello"), func() {}) - err = fm.ProduceEvent(f.Inputs[0], event) + err = fm.ProduceEvent(inputTopic, event) if err != nil { t.Fatal(err) } - output, err := fm.ConsumeEvent(f.Output) + output, err := fm.ConsumeEvent(outputTopic) if err != nil { t.Fatal(err) } diff --git a/license-checker/license-checker.sh b/license-checker/license-checker.sh index ef945baf..706506c5 100755 --- a/license-checker/license-checker.sh +++ b/license-checker/license-checker.sh @@ -24,10 +24,10 @@ if [ ! -f "$LICENSE_CHECKER" ]; then export BINDIR=bin && curl -s https://raw.githubusercontent.com/lluissm/license-header-checker/master/install.sh | bash fi -$LICENSE_CHECKER -a -r -i bin,restclient,common/run.go,common/signal.go,fs/runtime/grpc/proto ./license-checker/license-header.txt . go +$LICENSE_CHECKER -a -r -i bin,admin/client,common/run.go,common/signal.go,fs/runtime/grpc/proto ./license-checker/license-header.txt . go $LICENSE_CHECKER -a -r ./license-checker/license-header.txt . proto -$LICENSE_CHECKER -a -r -i bin,restclient,.chglog ./license-checker/license-header-sh.txt . sh yaml yml -$LICENSE_CHECKER -a -r -i bin,restclient,.chglog,CHANGELOG.md ./license-checker/license-header-md.txt . md +$LICENSE_CHECKER -a -r -i bin,admin/client,.chglog ./license-checker/license-header-sh.txt . sh yaml yml +$LICENSE_CHECKER -a -r -i bin,admin/client,.chglog,CHANGELOG.md ./license-checker/license-header-md.txt . md if [[ -z $(git status -s) ]]; then echo "No license header issues found" diff --git a/perf/perf.go b/perf/perf.go index 4f130e24..7458852a 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -21,9 +21,10 @@ import ( "encoding/json" "fmt" "github.com/bmizerany/perks/quantile" + "github.com/functionstream/function-stream/admin/client" + "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs/contube" - "github.com/functionstream/function-stream/restclient" "golang.org/x/time/rate" "log/slog" "math/rand" @@ -38,7 +39,7 @@ type TubeBuilder func(ctx context.Context) (contube.TubeFactory, error) type Config struct { PulsarURL string RequestRate float64 - Func *restclient.ModelFunction + Func *adminclient.ModelFunction QueueBuilder TubeBuilder } @@ -82,18 +83,18 @@ func (p *perf) Run(ctx context.Context) { ) name := "perf-" + strconv.Itoa(rand.Int()) - var f restclient.ModelFunction + var f adminclient.ModelFunction if p.config.Func != nil { f = *p.config.Func } else { - f = restclient.ModelFunction{ - Runtime: restclient.ModelRuntimeConfig{ + f = adminclient.ModelFunction{ + Runtime: adminclient.ModelRuntimeConfig{ Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "./bin/example_basic.wasm", }, }, - Inputs: []string{"test-input-" + strconv.Itoa(rand.Int())}, - Output: "test-output-" + strconv.Itoa(rand.Int()), + Source: utils.MakeQueueSourceTubeConfig("test-sub", "test-input-"+strconv.Itoa(rand.Int())), + Sink: utils.MakeQueueSinkTubeConfig("test-output-" + strconv.Itoa(rand.Int())), } } f.Name = name @@ -107,8 +108,17 @@ func (p *perf) Run(ctx context.Context) { os.Exit(1) } + inputTopic, err := utils.GetInputTopics(&f) + if err != nil { + slog.Error( + "Failed to get input topics", + slog.Any("error", err), + ) + os.Exit(1) + + } p.input, err = queueFactory.NewSinkTube(ctx, (&contube.SinkQueueConfig{ - Topic: f.Inputs[0], + Topic: inputTopic[0], }).ToConfigMap()) if err != nil { slog.Error( @@ -118,20 +128,28 @@ func (p *perf) Run(ctx context.Context) { os.Exit(1) } + outputTopic, err := utils.GetOutputTopic(&f) + if err != nil { + slog.Error( + "Failed to get output topic", + slog.Any("error", err), + ) + os.Exit(1) + } p.output, err = queueFactory.NewSourceTube(ctx, (&contube.SourceQueueConfig{ - Topics: []string{f.Output}, + Topics: []string{outputTopic}, SubName: "perf", }).ToConfigMap()) if err != nil { slog.Error( - "Failed to create Source Perf Channel", + "Failed to create Sources Perf Channel", slog.Any("error", err), ) os.Exit(1) } - cfg := restclient.NewConfiguration() - cli := restclient.NewAPIClient(cfg) + cfg := adminclient.NewConfiguration() + cli := adminclient.NewAPIClient(cfg) res, err := cli.FunctionAPI.CreateFunction(context.Background()).Body(f).Execute() if err != nil { diff --git a/server/server_test.go b/server/server_test.go index ed6bbc0f..cc416843 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,12 +19,12 @@ package server import ( "context" "encoding/json" + "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" - "github.com/functionstream/function-stream/restclient" "github.com/functionstream/function-stream/tests" "github.com/stretchr/testify/assert" "math/rand" @@ -76,8 +76,19 @@ func TestStandaloneBasicFunction(t *testing.T) { common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Inputs: []string{inputTopic}, - Output: outputTopic, + Sources: []*model.TubeConfig{ + { + Config: (&contube.SourceQueueConfig{ + Topics: []string{inputTopic}, + SubName: "test", + }).ToConfigMap(), + }, + }, + Sink: &model.TubeConfig{ + Config: (&contube.SinkQueueConfig{ + Topic: outputTopic, + }).ToConfigMap(), + }, Name: "test-func", Replicas: 1, } @@ -129,14 +140,17 @@ func TestHttpTube(t *testing.T) { common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Source: &model.TubeConfig{ + Sources: []*model.TubeConfig{{ Type: common.OptionalStr(common.HttpTubeType), Config: map[string]interface{}{ contube.EndpointKey: endpoint, }, + }}, + Sink: &model.TubeConfig{ + Config: (&contube.SinkQueueConfig{ + Topic: "output", + }).ToConfigMap(), }, - Inputs: []string{}, - Output: "output", Name: "test-func", Replicas: 1, } @@ -153,13 +167,13 @@ func TestHttpTube(t *testing.T) { t.Fatal(err) } - cfg := restclient.NewConfiguration() + cfg := adminclient.NewConfiguration() cfg.Host = httpAddr - cli := restclient.NewAPIClient(cfg) + cli := adminclient.NewAPIClient(cfg) _, err = cli.HttpTubeAPI.TriggerHttpTubeEndpoint(ctx, endpoint).Body(string(jsonBytes)).Execute() assert.Nil(t, err) - event, err := s.Manager.ConsumeEvent(funcConf.Output) + event, err := s.Manager.ConsumeEvent("output") if err != nil { t.Error(err) return @@ -218,10 +232,23 @@ func TestStatefulFunction(t *testing.T) { defer cancel() s, httpAddr := startStandaloneSvr(t, ctx, WithFunctionManager(fs.WithDefaultRuntimeFactory(&MockRuntimeFactory{}))) + input := "input" + output := "output" funcConf := &model.Function{ - Name: "test-func", - Inputs: []string{"input"}, - Output: "output", + Name: "test-func", + Sources: []*model.TubeConfig{ + { + Config: (&contube.SourceQueueConfig{ + Topics: []string{input}, + SubName: "test", + }).ToConfigMap(), + }, + }, + Sink: &model.TubeConfig{ + Config: (&contube.SinkQueueConfig{ + Topic: "output", + }).ToConfigMap(), + }, Replicas: 1, } err := s.Manager.StartFunction(funcConf) @@ -229,18 +256,18 @@ func TestStatefulFunction(t *testing.T) { t.Fatal(err) } - cfg := restclient.NewConfiguration() + cfg := adminclient.NewConfiguration() cfg.Host = httpAddr - cli := restclient.NewAPIClient(cfg) + cli := adminclient.NewAPIClient(cfg) _, err = cli.StateAPI.SetState(ctx, "key").Body("hello").Execute() assert.Nil(t, err) - err = s.Manager.ProduceEvent(funcConf.Inputs[0], contube.NewRecordImpl(nil, func() { + err = s.Manager.ProduceEvent(input, contube.NewRecordImpl(nil, func() { })) assert.Nil(t, err) - _, err = s.Manager.ConsumeEvent(funcConf.Output) + _, err = s.Manager.ConsumeEvent(output) assert.Nil(t, err) result, _, err := cli.StateAPI.GetState(ctx, "key").Execute() diff --git a/tests/integration_test.go b/tests/integration_test.go index 8eeaef5e..b3de8cb9 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -20,8 +20,9 @@ import ( "context" "encoding/json" "github.com/apache/pulsar-client-go/pulsar" + "github.com/functionstream/function-stream/admin/client" + "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/restclient" "github.com/functionstream/function-stream/server" "io" "math/rand" @@ -46,8 +47,8 @@ func init() { func TestBasicFunction(t *testing.T) { - cfg := restclient.NewConfiguration() - cli := restclient.NewAPIClient(cfg) + cfg := adminclient.NewConfiguration() + cli := adminclient.NewAPIClient(cfg) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", @@ -57,27 +58,36 @@ func TestBasicFunction(t *testing.T) { } name := "func-" + strconv.Itoa(rand.Int()) - f := restclient.ModelFunction{ + inputTopic := "test-input-" + strconv.Itoa(rand.Int()) + outputTopic := "test-output-" + strconv.Itoa(rand.Int()) + f := adminclient.ModelFunction{ Name: name, - Runtime: restclient.ModelRuntimeConfig{ + Runtime: adminclient.ModelRuntimeConfig{ Config: map[string]interface{}{ common.RuntimeArchiveConfigKey: "../bin/example_basic.wasm", }, }, - Inputs: []string{"test-input-" + strconv.Itoa(rand.Int())}, - Output: "test-output-" + strconv.Itoa(rand.Int()), + Source: []adminclient.ModelTubeConfig{ + { + Config: map[string]interface{}{ + "topicList": []string{inputTopic}, + "subName": "fs", + }, + }, + }, + Sink: utils.MakeQueueSinkTubeConfig(outputTopic), Replicas: 1, } producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: f.Inputs[0], + Topic: inputTopic, }) if err != nil { t.Fatalf(err.Error()) } consumer, err := client.Subscribe(pulsar.ConsumerOptions{ - Topic: f.Output, + Topic: outputTopic, SubscriptionName: "test-sub", }) if err != nil { @@ -85,12 +95,12 @@ func TestBasicFunction(t *testing.T) { } res, err := cli.FunctionAPI.CreateFunction(context.Background()).Body(f).Execute() - if err != nil { - t.Fatalf("failed to create function: %v", err) - return + if err != nil && res == nil { + t.Errorf("failed to create function: %v", err) } if res.StatusCode != 200 { - t.Fatalf("expected 200, got %d", res.StatusCode) + body, _ := io.ReadAll(res.Body) + t.Fatalf("expected 200, got %d: %s", res.StatusCode, body) return }