Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions fs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ func (fm *FunctionManager) getRuntimeFactory(runtimeConfig *model.RuntimeConfig)

func (fm *FunctionManager) StartFunction(f *model.Function) error {
fm.functionsLock.Lock()
defer fm.functionsLock.Unlock() // TODO: narrow the lock scope
if _, exist := fm.functions[f.Name]; exist {
fm.functionsLock.Unlock()
return common.ErrorFunctionExists
}
fm.functions[f.Name] = make([]api.FunctionInstance, f.Replicas)
fm.functionsLock.Unlock()
if f.Replicas <= 0 {
return errors.New("replicas should be greater than 0")
}
Expand All @@ -154,31 +155,37 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
}

instance := fm.options.instanceFactory.NewFunctionInstance(f, sourceFactory, sinkFactory, i)
fm.functionsLock.Lock()
fm.functions[f.Name][i] = instance
fm.functionsLock.Unlock()
runtimeFactory, err := fm.getRuntimeFactory(f.Runtime)
if err != nil {
return err
}
go instance.Run(runtimeFactory)
if err := <-instance.WaitForReady(); err != nil {
select {
case <-instance.WaitForReady():
if err != nil {
slog.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", err.Error()))
instance.Stop()
return err
}
instance.Stop()
return err
case <-instance.Context().Done():
slog.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", "context cancelled"))
return errors.New("context cancelled")
}
}
return nil
}

func (fm *FunctionManager) DeleteFunction(name string) error {
fm.functionsLock.Lock()
defer fm.functionsLock.Unlock()
instances, exist := fm.functions[name]
if !exist {
return common.ErrorFunctionNotFound
}
delete(fm.functions, name)
fm.functionsLock.Unlock()
for _, instance := range instances {
instance.Stop()
}
Expand Down
23 changes: 21 additions & 2 deletions fs/runtime/grpc/grpc_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"log/slog"
"net"
"sync"
"sync/atomic"
)

// TODO: Replace with FunctionInstane after the function instance abstraction is finishedf
type GRPCFuncRuntime struct {
Name string
ctx context.Context
Expand All @@ -41,6 +41,13 @@ type GRPCFuncRuntime struct {
stopFunc func()
}

type Status int32

const (
NotReady Status = iota
Ready
)

// FSSReconcileServer is the struct that implements the FSServiceServer interface.
type FSSReconcileServer struct {
proto.UnimplementedFSReconcileServer
Expand All @@ -50,6 +57,7 @@ type FSSReconcileServer struct {
reconcile chan *proto.FunctionStatus
functions map[string]*GRPCFuncRuntime
functionsMu sync.Mutex
status int32
}

func NewFSReconcile(ctx context.Context) *FSSReconcileServer {
Expand All @@ -69,6 +77,14 @@ func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer)
s.connected.Do(func() {
close(s.readyCh)
})
if Status(atomic.LoadInt32(&s.status)) == Ready {
return fmt.Errorf("there is already a reconcile stream")
}
slog.InfoContext(s.ctx, "grpc reconcile stream opened")
defer func() {
atomic.StoreInt32(&s.status, int32(NotReady))
slog.InfoContext(s.ctx, "grpc reconcile stream closed")
}()
errCh := make(chan error)
go func() {
for {
Expand Down Expand Up @@ -135,7 +151,7 @@ func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) (
s.functions[name] = runtime
}
s.reconcile <- runtime.status
slog.InfoContext(runtime.ctx, "Creating function", slog.Any("name", name))
slog.InfoContext(runtime.ctx, "Creating function runtime", slog.Any("name", name))
return runtime, nil
}

Expand Down Expand Up @@ -212,6 +228,9 @@ func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error
}
slog.InfoContext(stream.Context(), "Start processing events", slog.Any("name", md["name"]))
instance.readyCh <- err
defer func() {
instance.Stop()
}()
errCh := make(chan error)
go func() {
for {
Expand Down
2 changes: 0 additions & 2 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ components:
type: integer
format: int32
required:
- source
- sink
- inputs
- output
- replicas
16 changes: 13 additions & 3 deletions restclient/docs/Function.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Name** | Pointer to **string** | | [optional]
**Runtime** | Pointer to [**FunctionRuntime**](FunctionRuntime.md) | | [optional]
**Source** | [**FunctionSource**](FunctionSource.md) | |
**Sink** | [**FunctionSource**](FunctionSource.md) | |
**Source** | Pointer to [**FunctionSource**](FunctionSource.md) | | [optional]
**Sink** | Pointer to [**FunctionSource**](FunctionSource.md) | | [optional]
**Inputs** | **[]string** | |
**Output** | **string** | |
**Config** | Pointer to **map[string]string** | | [optional]
Expand All @@ -17,7 +17,7 @@ Name | Type | Description | Notes

### NewFunction

`func NewFunction(source FunctionSource, sink FunctionSource, inputs []string, output string, replicas int32, ) *Function`
`func NewFunction(inputs []string, output string, replicas int32, ) *Function`

NewFunction instantiates a new Function object
This constructor will assign default values to properties that have it defined,
Expand Down Expand Up @@ -101,6 +101,11 @@ and a boolean to check if the value has been set.

SetSource sets Source field to given value.

### HasSource

`func (o *Function) HasSource() bool`

HasSource returns a boolean if a field has been set.

### GetSink

Expand All @@ -121,6 +126,11 @@ and a boolean to check if the value has been set.

SetSink sets Sink field to given value.

### HasSink

`func (o *Function) HasSink() bool`

HasSink returns a boolean if a field has been set.

### GetInputs

Expand Down
70 changes: 43 additions & 27 deletions restclient/model_function.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (s *Server) startRESTHandlers() error {
slog.Info("Starting function", slog.Any("name", functionName))
err = s.options.manager.StartFunction(f)
if err != nil {
slog.ErrorContext(r.Context(), "failed to start function", slog.Any("name", functionName), slog.Any("error", err.Error()))
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -175,6 +176,7 @@ func (s *Server) startRESTHandlers() error {

err := s.options.manager.DeleteFunction(functionName)
if errors.Is(err, common.ErrorFunctionNotFound) {
slog.ErrorContext(r.Context(), "failed to delete functions: function not found", slog.Any("name", functionName))
http.Error(w, err.Error(), http.StatusNotFound)
return
}
Expand Down Expand Up @@ -203,6 +205,7 @@ func (s *Server) startRESTHandlers() error {
}
err = s.options.manager.ProduceEvent(queueName, contube.NewRecordImpl(content, func() {}))
if err != nil {
slog.ErrorContext(r.Context(), "Error when producing event", slog.Any("error", err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down