1717package contube
1818
1919import (
20+ "github.com/functionstream/function-stream/common"
2021 "io"
21- "log/slog"
2222 "net/http"
2323 "sync"
2424 "sync/atomic"
@@ -30,9 +30,10 @@ import (
3030type state int
3131
3232const (
33- EndpointKey = "endpoint"
33+ EndpointKey = "endpoint"
34+ IncludeMetadata = "includeMetadata"
3435
35- stateReady state = iota
36+ stateReady state = iota // TODO: Why do we need this? Maybe we can remove it.
3637 stateClosed state = iota
3738)
3839
4344)
4445
4546type EndpointHandler func (ctx context.Context , endpoint string , payload []byte ) error
47+ type HttpHandler func (w http.ResponseWriter , r * http.Request , payload []byte ) Record
48+
49+ func DefaultHttpHandler (_ http.ResponseWriter , _ * http.Request , payload []byte ) Record {
50+ return NewRecordImpl (payload , func () {})
51+ }
4652
4753type endpointHandler struct {
4854 ctx context.Context
@@ -55,32 +61,42 @@ type HttpTubeFactory struct {
5561 ctx context.Context
5662 mu sync.RWMutex
5763 endpoints map [string ]* endpointHandler
64+ handler HttpHandler
5865}
5966
6067func NewHttpTubeFactory (ctx context.Context ) * HttpTubeFactory {
68+ return NewHttpTubeFactoryWithIntercept (ctx , DefaultHttpHandler )
69+ }
70+
71+ func NewHttpTubeFactoryWithIntercept (ctx context.Context , handler HttpHandler ) * HttpTubeFactory {
6172 return & HttpTubeFactory {
6273 ctx : ctx ,
6374 endpoints : make (map [string ]* endpointHandler ),
75+ handler : handler ,
6476 }
6577}
6678
6779type httpSourceTubeConfig struct {
6880 Endpoint string `json:"endpoint" validate:"required"`
6981}
7082
71- func (f * HttpTubeFactory ) Handle ( ctx context. Context , endpoint string , payload [] byte ) error {
83+ func (f * HttpTubeFactory ) getEndpoint ( endpoint string ) ( * endpointHandler , bool ) {
7284 f .mu .RLock ()
85+ defer f .mu .RUnlock ()
7386 e , ok := f .endpoints [endpoint ]
87+ return e , ok
88+ }
89+
90+ func (f * HttpTubeFactory ) Handle (ctx context.Context , endpoint string , record Record ) error {
91+ e , ok := f .getEndpoint (endpoint )
7492 if ! ok {
75- f .mu .RUnlock ()
7693 return ErrEndpointNotFound
7794 }
78- f .mu .RUnlock ()
7995 if e .s .Load () == stateClosed {
8096 return ErrEndpointClosed
8197 }
8298 select {
83- case e .c <- NewRecordImpl ( payload , func () {}) :
99+ case e .c <- record :
84100 return nil
85101 case <- ctx .Done ():
86102 return ctx .Err ()
@@ -125,28 +141,33 @@ func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Re
125141}
126142
127143func (f * HttpTubeFactory ) GetHandleFunc (getEndpoint func (r * http.Request ) (string , error ),
128- logger * slog .Logger ) func (http.ResponseWriter , * http.Request ) {
144+ logger * common .Logger ) func (http.ResponseWriter , * http.Request ) {
129145 return func (w http.ResponseWriter , r * http.Request ) {
130146 endpoint , err := getEndpoint (r )
131147 if err != nil {
132- logger .Error ("Failed to get endpoint" , "error" , err )
148+ logger .Error (err , "Failed to get endpoint" )
133149 http .Error (w , errors .Wrap (err , "Failed to get endpoint" ).Error (), http .StatusBadRequest )
134150 return
135151 }
136- log := logger .With (slog .String ("endpoint" , endpoint ), slog .String ("component" , "http-tube" ))
137- log .Info ("Handle records from http request" )
152+ log := logger .SubLogger ("endpoint" , endpoint , "component" , "http-tube" )
153+ if log .DebugEnabled () {
154+ log .Debug ("Handle records from http request" )
155+ }
138156 content , err := io .ReadAll (r .Body )
139157 if err != nil {
140- log .Error ("Failed to read body" , "error" , err )
158+ log .Error (err , "Failed to read body" )
141159 http .Error (w , errors .Wrap (err , "Failed to read body" ).Error (), http .StatusBadRequest )
142160 return
143161 }
144- err = f .Handle (r .Context (), endpoint , content )
162+ record := f .handler (w , r , content )
163+ err = f .Handle (r .Context (), endpoint , record )
145164 if err != nil {
146- log .Error ("Failed to handle record" , "error" , err )
165+ log .Error (err , "Failed to handle record" )
147166 http .Error (w , err .Error (), http .StatusInternalServerError )
148167 return
149168 }
150- log .Info ("Handled records from http request" )
169+ if log .DebugEnabled () {
170+ log .Debug ("Handled records from http request" )
171+ }
151172 }
152173}
0 commit comments