@@ -2,7 +2,6 @@ package offline
22
33import (
44 "bufio"
5- "context"
65 "crypto/md5"
76 "encoding/json"
87 "fmt"
@@ -37,132 +36,104 @@ func ServeHandler(handlerInstance *HandlerInstance, r *mux.Router) {
3736
3837 defer np .Close ()
3938
40- for method , path := range handlerInstance .handlerConfig .Http {
41- go r .HandleFunc (path , func (w http.ResponseWriter , r * http.Request ) {
42- handlerExecutionMutex .Lock ()
43- defer handlerExecutionMutex .Unlock ()
44-
45- ctx , cancel := context .WithTimeout (r .Context (), 30 * time .Second )
46- defer cancel ()
39+ handleRequestFunc := func (w http.ResponseWriter , r * http.Request , code string ) {
40+ handlerExecutionMutex .Lock ()
41+ defer handlerExecutionMutex .Unlock ()
4742
48- code := generateHttpHandlerRuntimeCode ( handlerInstance , r )
43+ np . Execute ( code )
4944
50- np .Execute (code )
45+ outputChannel := make (chan HandlerOutput , 1 )
46+ errorChannel := make (chan error , 1 )
5147
52- outputChannel := make (chan HandlerOutput , 1 )
48+ fmt .Printf ("%s %s (%s) \n " , r .Method , r .URL .Path , handlerInstance .handlerConfig .Name )
49+ start := time .Now ()
5350
54- fmt .Printf ("%s %s (%s) \n " , r .Method , r .URL .Path , handlerInstance .handlerConfig .Name )
55- start := time .Now ()
51+ go func () {
52+ if err := processOutputStream (np , outputChannel ); err != nil {
53+ errorChannel <- err
54+ }
55+ }()
5656
57- go processOutputStream (np , ctx , outputChannel )
58- go processErrorStream (np , ctx )
57+ // Start error processing with done channel
58+ go processErrorStream (np )
59+ sendResult (start , w , outputChannel )
60+ }
5961
60- sendResult (start , ctx , w , outputChannel )
62+ for method , path := range handlerInstance .handlerConfig .Http {
63+ r .HandleFunc (path , func (w http.ResponseWriter , r * http.Request ) {
64+ code := generateHttpHandlerRuntimeCode (handlerInstance , r )
65+ handleRequestFunc (w , r , code )
6166 }).Methods (method )
6267 }
6368
6469 for range handlerInstance .handlerConfig .Sqs {
65- go r .HandleFunc (fmt .Sprintf ("/_sqs/%s" , handlerInstance .handlerConfig .Name ), func (w http.ResponseWriter , r * http.Request ) {
66- handlerExecutionMutex .Lock ()
67- defer handlerExecutionMutex .Unlock ()
68-
69- ctx , cancel := context .WithTimeout (r .Context (), 30 * time .Second )
70- defer cancel ()
71-
70+ r .HandleFunc (fmt .Sprintf ("/_sqs/%s" , handlerInstance .handlerConfig .Name ), func (w http.ResponseWriter , r * http.Request ) {
7271 code := generateSqsHandlerRuntimeCode (handlerInstance , r )
73-
74- np .Execute (code )
75-
76- outputChannel := make (chan HandlerOutput , 1 )
77-
78- fmt .Printf ("%s %s (%s) \n " , r .Method , r .URL .Path , handlerInstance .handlerConfig .Name )
79- start := time .Now ()
80-
81- go processOutputStream (np , ctx , outputChannel )
82- go processErrorStream (np , ctx )
83-
84- sendResult (start , ctx , w , outputChannel )
72+ handleRequestFunc (w , r , code )
8573 }).Methods ("POST" )
8674 }
8775
8876 np .cmd .Wait ()
8977}
9078
91- func sendResult (startTime time.Time , ctx context.Context , w http.ResponseWriter , outputChannel chan HandlerOutput ) {
92- select {
93- case parsed := <- outputChannel :
94- if parsed .err != nil {
95- fmt .Println (parsed .err )
96- w .WriteHeader (500 )
97- w .Write ([]byte {})
98- return
99- }
100-
101- // Set response headers
102- for k , header := range parsed .handlerResult .Headers {
103- w .Header ().Set (k , header )
104- }
105-
106- // Write status code
107- w .WriteHeader (int (parsed .handlerResult .StatusCode ))
108-
109- // Write the body
110- w .Write ([]byte (parsed .handlerResult .Body ))
111- fmt .Printf ("Completed in %.dms\n \n " , time .Since (startTime ).Milliseconds ())
112- case <- ctx .Done ():
113- // Handle timeout
114- w .WriteHeader (http .StatusGatewayTimeout )
79+ func sendResult (startTime time.Time , w http.ResponseWriter , outputChannel chan HandlerOutput ) {
80+ parsed := <- outputChannel
81+ if parsed .err != nil {
82+ fmt .Println (parsed .err )
83+ w .WriteHeader (500 )
11584 w .Write ([]byte {})
116-
117- fmt .Printf ("Request timed out\n " )
118- fmt .Printf ("Completed in %.dms\n \n " , time .Since (startTime ).Milliseconds ())
11985 return
12086 }
87+
88+ // Set response headers
89+ for k , header := range parsed .handlerResult .Headers {
90+ w .Header ().Set (k , header )
91+ }
92+
93+ // Write status code
94+ w .WriteHeader (int (parsed .handlerResult .StatusCode ))
95+
96+ // Write the body
97+ w .Write ([]byte (parsed .handlerResult .Body ))
98+ fmt .Printf ("Completed in %.dms\n \n " , time .Since (startTime ).Milliseconds ())
12199}
122100
123- func processOutputStream (np * NodeProcess , ctx context. Context , resultChan chan <- HandlerOutput ) {
101+ func processOutputStream (np * NodeProcess , resultChan chan <- HandlerOutput ) error {
124102 scanner := bufio .NewReader (np .stdout )
125103
126104 for {
127- select {
128- case <- ctx .Done ():
129- return
130- default :
131- line , _ := scanner .ReadString ('\n' )
105+ line , err := scanner .ReadString ('\n' )
132106
133- if strings .HasPrefix (line , "TERRABLE_RESULT_START" ) {
134- extractedResult , err := extractResult (line )
135-
136- resultChan <- HandlerOutput {
137- handlerResult : extractedResult ,
138- err : err ,
139- }
140-
141- return
107+ if err != nil {
108+ if err == io .EOF {
109+ return nil
142110 }
111+ return err
112+ }
143113
144- if strings .HasPrefix (line , "CODE_EXECUTION_COMPLETE" ) {
145- continue
114+ if strings .HasPrefix (line , "TERRABLE_RESULT_START" ) {
115+ extractedResult , err := extractResult (line )
116+ resultChan <- HandlerOutput {
117+ handlerResult : extractedResult ,
118+ err : err ,
146119 }
120+ return nil
121+ }
147122
148- fmt .Println (line )
123+ if strings .HasPrefix (line , "CODE_EXECUTION_COMPLETE" ) {
124+ continue
149125 }
126+
127+ fmt .Println (line )
150128 }
151129}
152130
153- func processErrorStream (np * NodeProcess , ctx context. Context ) {
131+ func processErrorStream (np * NodeProcess ) {
154132 scanner := bufio .NewReader (np .stderr )
155133 errorColour := color .New (color .FgHiRed ).SprintFunc ()
156134
157- for {
158- select {
159- case <- ctx .Done ():
160- return
161- default :
162- line , _ := scanner .ReadString ('\n' )
163- fmt .Println (errorColour (line ))
164- }
165- }
135+ line , _ := scanner .ReadString ('\n' )
136+ fmt .Println (errorColour (line ))
166137}
167138
168139func generateHttpHandlerRuntimeCode (handler * HandlerInstance , r * http.Request ) string {
@@ -222,7 +193,7 @@ func generateHttpHandlerRuntimeCode(handler *HandlerInstance, r *http.Request) s
222193
223194 eventInputJSON , _ := json .Marshal (eventInput )
224195 envVars := generateEnvVars (handler )
225- return generateJSCode (string (envVars ), handler .GetExecutionPath (), string (eventInputJSON ))
196+ return generateJSCode (string (envVars ), handler .GetExecutionPath (), string (eventInputJSON ), handler . handlerConfig . Timeout )
226197}
227198
228199func generateEnvVars (handler * HandlerInstance ) string {
@@ -274,10 +245,10 @@ func generateSqsHandlerRuntimeCode(handler *HandlerInstance, r *http.Request) st
274245
275246 eventInputJSON , _ := json .Marshal (eventInput )
276247 envVars := generateEnvVars (handler )
277- return generateJSCode (string (envVars ), handler .GetExecutionPath (), string (eventInputJSON ))
248+ return generateJSCode (string (envVars ), handler .GetExecutionPath (), string (eventInputJSON ), handler . handlerConfig . Timeout )
278249}
279250
280- func generateJSCode (envVars , executionPath , eventInputJSON string ) string {
251+ func generateJSCode (envVars , executionPath , eventInputJSON string , timeoutSeconds int ) string {
281252 return fmt .Sprintf (`
282253 const env = %s;
283254 process.env = {};
@@ -290,6 +261,7 @@ func generateJSCode(envVars, executionPath, eventInputJSON string) string {
290261 var transpiledFunction = require('%s');
291262
292263 var eventInput = %s;
264+ const endTime = Date.now() + (%d * 1000);
293265
294266 // Create a fake context object
295267 const context = {
@@ -300,11 +272,22 @@ func generateJSCode(envVars, executionPath, eventInputJSON string) string {
300272 awsRequestId: "local-" + Date.now(),
301273 logGroupName: "local-group",
302274 logStreamName: "local-stream",
303- getRemainingTimeInMillis: () => 30000,
275+ getRemainingTimeInMillis: () => {
276+ const remaining = endTime - Date.now();
277+ return remaining > 0 ? remaining : 0;
278+ },
304279 callbackWaitsForEmptyEventLoop: true
305280 };
306281
307- new Promise((resolve, reject) => {
282+ // Create a timeout promise
283+ const timeoutPromise = new Promise((resolve) => {
284+ setTimeout(() => {
285+ resolve({ statusCode: 504 })
286+ }, %d * 1000);
287+ });
288+
289+ // Main execution promise
290+ const executionPromise = new Promise((resolve, reject) => {
308291 const callback = (error, result) => {
309292 if (error) {
310293 reject(error);
@@ -322,19 +305,22 @@ func generateJSCode(envVars, executionPath, eventInputJSON string) string {
322305 } else {
323306 resolve(handlerResult);
324307 }
325- })
308+ });
309+
310+ // Race between execution and timeout
311+ Promise.race([executionPromise, timeoutPromise])
326312 .then(result => {
327313 console.log("TERRABLE_RESULT_START:" + JSON.stringify({ statusCode: 200, ...result }) + ":TERRABLE_RESULT_END");
328314 })
329315 .catch(error => {
330316 console.error(error);
331317 console.log("TERRABLE_RESULT_START:" + JSON.stringify({
332- statusCode: 500,
318+ statusCode: error.message.includes('timed out') ? 408 : 500,
333319 headers: {
334320 "Content-Type": "application/json",
335321 },
336322 body: JSON.stringify({
337- message: "Internal server error",
323+ message: error.message.includes('timed out') ? "Function timed out" : "Internal server error",
338324 errorMessage: error.message,
339325 errorType: error.name,
340326 stackTrace: error.stack
@@ -344,7 +330,7 @@ func generateJSCode(envVars, executionPath, eventInputJSON string) string {
344330 .finally(() => {
345331 complete();
346332 });
347- ` , envVars , executionPath , executionPath , eventInputJSON )
333+ ` , envVars , executionPath , executionPath , eventInputJSON , timeoutSeconds , timeoutSeconds )
348334}
349335
350336func extractResult (output string ) (* handlerResult , error ) {
0 commit comments