feat: add MCP injection support to responses streaming interceptor#143
feat: add MCP injection support to responses streaming interceptor#143
Conversation
db0be87 to
2ca985c
Compare
2ca985c to
ba4e96d
Compare
| // Set responseID to the first response.id that is set. | ||
| if responseID == "" && ev.Response.ID != "" { | ||
| responseID = ev.Response.ID | ||
| for shouldLoop { |
There was a problem hiding this comment.
Do we need any security breaker for the maximum number of tools?
There was a problem hiding this comment.
I assume something like maximum number of inner loop iterations? I don't think it is necessary.
If I understand MPC correctly there is no such limit when using it in responses API, by default provider will ask for approval but it can be disabled per tool: https://platform.openai.com/docs/guides/tools-connectors-mcp#approvals
So infinite MCP tool call loop in theory could happen when using provider directly with approval disabled.
I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.
There was a problem hiding this comment.
I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.
SGTM 👍
intercept/responses/streaming.go
Outdated
| respCopy = responseCopier{} | ||
| opts := i.requestOptions(&respCopy) | ||
| stream := i.newStream(ctx, srv, opts) | ||
| defer stream.Close() |
There was a problem hiding this comment.
so this defer will be called after leaving this function, but most likely you want to call it after iteration
There was a problem hiding this comment.
Good catch. Added stream.Close() after the iteration (stream can be safely closed multiple times). Though about moving stream processing into separate function but code seemed less clear to me.
There was a problem hiding this comment.
Watch out for defer in loops: https://www.jetbrains.com/help/inspectopedia/GoDeferInLoop.html
There was a problem hiding this comment.
Good point, although I don't think this loop should iterate much.
This is annoying 😅
I really want to have a defer just in case (very easy to forget in future code changes) but there are too many references to local variables to cleanly move it to separate function.
Would scoping stream processing it into a local func be ok?
| pending := i.getPendingInjectedToolCalls(response) | ||
| shouldLoop, err = i.handleInnerAgenticLoop(ctx, pending, response) | ||
| if err != nil { | ||
| i.sendCustomErr(ctx, w, http.StatusInternalServerError, err) |
There was a problem hiding this comment.
Does it mean that if one tool returns an error, we failed the entire prompt?
There was a problem hiding this comment.
Depends what "returns an error" means.
- if the tool was called and MCP tool returned an error it will be forwarded as a tool call result (see *SingleInjectedToolError fixtures / test cases in responses_integration_test.go)
- if there was an error in code executing the MCP tool call eg: empty response or re-marshalling error we fail the prompt.
Thanks to this comment I've found re-marshaling error was ignored. Will add prepareRequestForAgenticLoop error being returned in handleInnerAgenticLoop.
| } | ||
| t.Cleanup(func() { _ = ln.Close() }) | ||
|
|
||
| go func() { |
There was a problem hiding this comment.
nit: shouldn't we wait for graceful shutdown of this goroutine rather than leaving it on its own?
There was a problem hiding this comment.
Added wait to cleanup.
dannykopping
left a comment
There was a problem hiding this comment.
Great work @pawbana!
intercept/responses/streaming.go
Outdated
| respCopy = responseCopier{} | ||
| opts := i.requestOptions(&respCopy) | ||
| stream := i.newStream(ctx, srv, opts) | ||
| defer stream.Close() |
There was a problem hiding this comment.
Watch out for defer in loops: https://www.jetbrains.com/help/inspectopedia/GoDeferInLoop.html
intercept/responses/streaming.go
Outdated
| // so events can be forwarded as soon as received. | ||
| // Otherwise loop could iterate so only last response will be forwarded. | ||
| // This is needed to keep consistency between response.id and response.previous_response_id fields. | ||
| if i.mcpProxy == nil { |
There was a problem hiding this comment.
Nit: this doesn't really match the comment, or at least it could be more explicit.
Currently it requires knowledge about how this relates to tool injection.
i.e.
"If no MCP proxy is provided then no tools are injected, ..."
intercept/responses/streaming.go
Outdated
| err = fmt.Errorf("failed to relay chunk: %w", err) | ||
| return err | ||
| streamErr = stream.Err() | ||
| stream.Close() |
There was a problem hiding this comment.
This error is not handled.
I've been meaning to add a linter for a while, gonna do it now.
| // Append newly added items to reqPayload field | ||
| // New items are appended to limit Input re-marshaling. | ||
| // See responsesInterceptionBase.requestOptions for more details about marshaling issues. | ||
| for j := originalInputSize; j < len(i.req.Input.OfInputItemList); j++ { |

Adds suppport to MCP tool injection in responses streaming interceptor.
Fixes: #89