Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 75 additions & 65 deletions fs/runtime/grpc/grpc_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"io"
"log/slog"
"net"
"sync"
"sync/atomic"
)

type GRPCFuncRuntime struct {
Name string
ctx context.Context
status *proto.FunctionStatus
readyCh chan error
input chan string
output chan string
stopFunc func()
log *slog.Logger
api.FunctionRuntime
Name string
ctx context.Context
status *proto.FunctionStatus
readyOnce sync.Once
readyCh chan error
input chan string
output chan string
stopFunc func()
log *slog.Logger
}

type Status int32
Expand Down Expand Up @@ -77,7 +78,7 @@ func (s *FSSReconcileServer) WaitForReady() <-chan struct{} {
return s.readyCh
}

func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer) error {
func (s *FSSReconcileServer) Reconcile(req *proto.ConnectRequest, stream proto.FSReconcile_ReconcileServer) error {
s.connected.Do(func() {
close(s.readyCh)
})
Expand All @@ -89,29 +90,20 @@ func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer)
atomic.StoreInt32(&s.status, int32(NotReady))
s.log.InfoContext(s.ctx, "grpc reconcile stream closed")
}()
errCh := make(chan error)
go func() {
for {
newStatus, err := stream.Recv()
s.log.DebugContext(s.ctx, "received status update", slog.Any("status", newStatus))
if err == io.EOF {
return
}
if err != nil {
errCh <- err
return
}
s.functionsMu.Lock()
instance, ok := s.functions[newStatus.Name]
if !ok {
s.functionsMu.Unlock()
s.log.ErrorContext(s.ctx, "receive non-exist function status update", slog.Any("name", newStatus.Name))
continue
}
s.functionsMu.Unlock()
instance.Update(newStatus)
// Sync all exiting function status to the newly connected reconcile client
s.functionsMu.Lock()
statusList := make([]*proto.FunctionStatus, 0, len(s.functions))
for _, v := range s.functions {
statusList = append(statusList, v.status)
}
s.functionsMu.Unlock()
for _, v := range statusList {
err := stream.Send(v)
if err != nil {
s.log.ErrorContext(stream.Context(), "failed to send status update", slog.Any("status", v))
// Continue to send the next status update.
}
}()
}
for {
select {
case status := <-s.reconcile:
Expand All @@ -125,18 +117,39 @@ func (s *FSSReconcileServer) Reconcile(stream proto.FSReconcile_ReconcileServer)
return nil
case <-s.ctx.Done():
return nil
case e := <-errCh:
return e
}
}

}

func (s *FSSReconcileServer) UpdateStatus(ctx context.Context, newStatus *proto.FunctionStatus) (*proto.Response, error) {
s.log.DebugContext(s.ctx, "received status update", slog.Any("status", newStatus))
s.functionsMu.Lock()
instance, ok := s.functions[newStatus.Name]
if !ok {
s.functionsMu.Unlock()
s.log.ErrorContext(s.ctx, "receive non-exist function status update", slog.Any("name", newStatus.Name))
return &proto.Response{
Status: proto.Response_ERROR,
Message: common.OptionalStr("function not found"),
}, nil
}
s.functionsMu.Unlock()
instance.Update(newStatus)
return &proto.Response{
Status: proto.Response_OK,
}, nil
}

func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) {
name := instance.Definition().Name
log := instance.Logger().With(
slog.String("component", "grpc-runtime"),
)
go func() {
<-instance.Context().Done()
s.removeFunction(name)
}()
runtime := &GRPCFuncRuntime{
Name: name,
readyCh: make(chan error),
Expand All @@ -147,7 +160,7 @@ func (s *FSSReconcileServer) NewFunctionRuntime(instance api.FunctionInstance) (
Status: proto.FunctionStatus_CREATING,
},
ctx: instance.Context(),
stopFunc: func() {
stopFunc: func() { // TODO: remove it, we should use instance.ctx to control the lifecycle
s.removeFunction(name)
},
log: log,
Expand Down Expand Up @@ -228,43 +241,22 @@ func NewFunctionServerImpl(s *FSSReconcileServer) *FunctionServerImpl {
}
}

func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error {
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return fmt.Errorf("failed to get metadata")
}
instance, err := f.reconcileSvr.getFunc(md["name"][0])
func (f *FunctionServerImpl) Process(req *proto.FunctionProcessRequest, stream proto.Function_ProcessServer) error {
runtime, err := f.reconcileSvr.getFunc(req.Name)
if err != nil {
return err
}
log := instance.log
log := runtime.log
log.InfoContext(stream.Context(), "Start processing events using GRPC")
instance.readyCh <- err
defer func() {
instance.Stop()
}()
runtime.readyOnce.Do(func() {
runtime.readyCh <- err
})
errCh := make(chan error)
go func() {
defer log.InfoContext(stream.Context(), "Stop processing events using GRPC")
logCounter := common.LogCounter()
for {
event, err := stream.Recv()
log.DebugContext(stream.Context(), "received event", slog.Any("count", logCounter))
if err == io.EOF {
return
}
if err != nil {
errCh <- err
return
}
instance.output <- event.Payload
}
}()

logCounter := common.LogCounter()
for {
select {
case payload := <-instance.input:
case payload := <-runtime.input:
log.DebugContext(stream.Context(), "sending event", slog.Any("count", logCounter))
err := stream.Send(&proto.Event{Payload: payload})
if err != nil {
Expand All @@ -273,15 +265,33 @@ func (f *FunctionServerImpl) Process(stream proto.Function_ProcessServer) error
}
case <-stream.Context().Done():
return nil
case <-instance.ctx.Done():
case <-runtime.ctx.Done():
return nil
case e := <-errCh:
log.ErrorContext(stream.Context(), "error processing event", slog.Any("error", e))
return e
}
}
}

func (f *FunctionServerImpl) Output(ctx context.Context, e *proto.Event) (*proto.Response, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("failed to get metadata")
}
if _, ok := md["name"]; !ok || len(md["name"]) == 0 {
return nil, fmt.Errorf("the metadata doesn't contain the function name")
}
runtime, err := f.reconcileSvr.getFunc(md["name"][0])
if err != nil {
return nil, err
}
runtime.log.DebugContext(ctx, "received event")
runtime.output <- e.Payload
return &proto.Response{
Status: proto.Response_OK,
}, nil
}

func StartGRPCServer(f *FSSReconcileServer, addr string) (*grpc.Server, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
Expand Down
17 changes: 13 additions & 4 deletions fs/runtime/grpc/mock_grpc_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func StartMockGRPCFunc(t *testing.T, addr string) {
}
client := proto.NewFSReconcileClient(conn)

stream, err := client.Reconcile(context.Background())
stream, err := client.Reconcile(context.Background(), &proto.ConnectRequest{})
if err != nil {
t.Errorf("failed to get process stream: %v", err)
return
Expand Down Expand Up @@ -70,14 +70,19 @@ func StartMockGRPCFunc(t *testing.T, addr string) {
}
t.Logf("client received status: %v", s)
s.Status = proto.FunctionStatus_RUNNING
err = stream.Send(s)
res, err := client.UpdateStatus(context.Background(), s)
if err != nil {
t.Errorf("failed to send: %v", err)
return
}
if res.GetStatus() != proto.Response_OK {
t.Errorf("failed to update status: %s", res.GetMessage())
}
go func() {
ctx := metadata.AppendToOutgoingContext(context.Background(), "name", s.Name)
processStream, err := funcCli.Process(ctx)
processStream, err := funcCli.Process(ctx, &proto.FunctionProcessRequest{
Name: s.Name,
})
if err != nil {
t.Errorf("failed to get process stream: %v", err)
return
Expand All @@ -90,11 +95,15 @@ func StartMockGRPCFunc(t *testing.T, addr string) {
}
t.Logf("client received event: %v", event)
event.Payload += "!"
err = processStream.Send(event)
res, err := funcCli.Output(ctx, event)
if err != nil {
t.Errorf("failed to send event: %v", err)
return
}
if res.GetStatus() != proto.Response_OK {
t.Errorf("failed to send event: %s", res.GetMessage())
return
}
}
}()
}
Expand Down
Loading