Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions a2a/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func (s *OAuth2SecurityScheme) UnmarshalJSON(b []byte) error {
return fmt.Errorf("unknown OAuth flow type: %s, available: %v", name, keys)
}
}
s.Description = scheme.Description
s.Oauth2MetadataURL = scheme.Oauth2MetadataURL
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions a2a/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func TestSecuritySchemeJSONCodec(t *testing.T) {
"name3": MutualTLSSecurityScheme{Description: "optional"},
"name4": HTTPAuthSecurityScheme{Scheme: "Bearer", BearerFormat: "JWT"},
"name5": OAuth2SecurityScheme{
Oauth2MetadataURL: "https://test.com",
Description: "test",
Flows: PasswordOAuthFlow{
TokenURL: "url",
Scopes: map[string]string{"email": "read user emails"},
Expand All @@ -88,21 +90,21 @@ func TestSecuritySchemeJSONCodec(t *testing.T) {
`"name2":{"openIdConnect":{"openIdConnectUrl":"url"}}`,
`"name3":{"mutualTLS":{"description":"optional"}}`,
`"name4":{"http":{"bearerFormat":"JWT","scheme":"Bearer"}}`,
`"name5":{"oauth2":{"flows":{"password":{"scopes":{"email":"read user emails"},"tokenUrl":"url"}}}}`,
`"name5":{"oauth2":{"oauth2MetadataUrl": "https://test.com", "description": "test","flows":{"password":{"scopes":{"email":"read user emails"},"tokenUrl":"url"}}}}`,
}
wantJSON := fmt.Sprintf("{%s}", strings.Join(entriesJSON, ","))

var decodedJSON NamedSecuritySchemes
mustUnmarshal(t, []byte(wantJSON), &decodedJSON)
if !reflect.DeepEqual(decodedJSON, schemes) {
t.Fatalf("Unmarshal() failed:\nwant %v\ngot: %s", schemes, decodedJSON)
t.Fatalf("Unmarshal() failed:\nwant %+v\ngot: %s", schemes, decodedJSON)
}

encodedSchemes := mustMarshal(t, &schemes)
var decodedBack NamedSecuritySchemes
mustUnmarshal(t, []byte(encodedSchemes), &decodedBack)
if !reflect.DeepEqual(decodedJSON, decodedBack) {
t.Fatalf("Decoding back failed:\nwant %v\ngot: %s", decodedJSON, decodedBack)
t.Fatalf("Decoding back failed:\nwant %+v\ngot: %s", decodedJSON, decodedBack)
}
}

Expand Down
5 changes: 0 additions & 5 deletions a2aclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ type interceptBeforeResult[Req any, Resp any] struct {
earlyErr error
}

// AddCallInterceptor allows to attach a [CallInterceptor] to the client after creation.
func (c *Client) AddCallInterceptor(ci CallInterceptor) {
c.interceptors = append(c.interceptors, ci)
}

// A2A protocol methods

// GetTask implements the 'GetTask' protocol method.
Expand Down
4 changes: 1 addition & 3 deletions a2aclient/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"iter"
"net/http"
"time"

"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/internal/jsonrpc"
Expand All @@ -48,7 +47,6 @@ func WithJSONRPCTransport(client *http.Client) FactoryOption {
}

// NewJSONRPCTransport creates a new JSON-RPC transport for A2A protocol communication.
// By default, an HTTP client will use a 3-minute timeout.
// For production deployments, provide a client with appropriate timeout, retry policy,
// and connection pooling configured for your requirements.
//
Expand All @@ -63,7 +61,7 @@ func NewJSONRPCTransport(url string, client *http.Client) Transport {
}

if t.httpClient == nil {
t.httpClient = &http.Client{Timeout: 3 * time.Minute}
t.httpClient = &http.Client{Timeout: defaultRequestTimeout}
}

return t
Expand Down
5 changes: 1 addition & 4 deletions a2aclient/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type RESTTransport struct {
}

// NewRESTTransport creates a new REST Transport for A2A protocol and communication
// By default, an HTTP client with 5-second timeout is used.
// For production deployments, provide a client with appropriate timeout, retry policy,
// and connection pooling configured for your requirements.
func NewRESTTransport(u *url.URL, client *http.Client) Transport {
Expand All @@ -48,9 +47,7 @@ func NewRESTTransport(u *url.URL, client *http.Client) Transport {
}

if t.httpClient == nil {
t.httpClient = &http.Client{
Timeout: 5 * time.Second,
}
t.httpClient = &http.Client{Timeout: defaultRequestTimeout}
}
return t
}
Expand Down
3 changes: 3 additions & 0 deletions a2aclient/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"errors"
"iter"
"time"

"github.com/a2aproject/a2a-go/v2/a2a"
)

const defaultRequestTimeout = 3 * time.Minute

// Transport defines a transport-agnostic interface for making A2A requests.
// Implementations are a translation layer between a2a core types and wire formats.
type Transport interface {
Expand Down
56 changes: 34 additions & 22 deletions a2acompat/a2av0/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/base64"
"fmt"
"maps"
"strings"

"github.com/a2aproject/a2a-go/v2/a2a"
Expand Down Expand Up @@ -307,14 +308,12 @@ func ToV1Part(p a2alegacy.Part) *a2a.Part {
}
case a2alegacy.DataPart:
var val any = c.Data
if compat, ok := c.Metadata["data_part_compat"].(bool); ok && compat {
metadata := maps.Clone(c.Metadata)
if compat, ok := metadata["data_part_compat"].(bool); ok && compat {
val = c.Data["value"]
delete(c.Metadata, "data_part_compat")
}
return &a2a.Part{
Content: a2a.Data{Value: val},
Metadata: c.Metadata,
delete(metadata, "data_part_compat")
}
return &a2a.Part{Content: a2a.Data{Value: val}, Metadata: metadata}
case a2alegacy.FilePart:
switch f := c.File.(type) {
case a2alegacy.FileBytes:
Expand Down Expand Up @@ -368,18 +367,16 @@ func FromV1Part(p *a2a.Part) a2alegacy.Part {
}
case a2a.Data:
val := c.Value
metadata := maps.Clone(p.Metadata)
// Compatibility mode: wrap non-map values to avoid crashing old clients that expect a map.
if _, ok := val.(map[string]any); !ok {
val = map[string]any{"value": val}
if p.Metadata == nil {
p.Metadata = make(map[string]any)
if metadata == nil {
metadata = make(map[string]any)
}
p.Metadata["data_part_compat"] = true
}
return a2alegacy.DataPart{
Data: val.(map[string]any),
Metadata: p.Metadata,
metadata["data_part_compat"] = true
}
return a2alegacy.DataPart{Data: val.(map[string]any), Metadata: metadata}
case a2a.Raw:
return a2alegacy.FilePart{
File: a2alegacy.FileBytes{
Expand Down Expand Up @@ -852,20 +849,35 @@ func FromV1AgentCard(card *a2a.AgentCard) *a2alegacy.AgentCard {
// Simplified conversion, focusing on common fields.
// For full conversion, more complex mapping of interfaces/security is needed.
res := &a2alegacy.AgentCard{
DefaultInputModes: card.DefaultInputModes,
DefaultOutputModes: card.DefaultOutputModes,
Description: card.Description,
DocumentationURL: card.DocumentationURL,
IconURL: card.IconURL,
Name: card.Name,
Provider: (*a2alegacy.AgentProvider)(card.Provider),
Signatures: make([]a2alegacy.AgentCardSignature, len(card.Signatures)),
Version: card.Version,
DefaultInputModes: card.DefaultInputModes,
DefaultOutputModes: card.DefaultOutputModes,
Description: card.Description,
DocumentationURL: card.DocumentationURL,
IconURL: card.IconURL,
Name: card.Name,
Provider: (*a2alegacy.AgentProvider)(card.Provider),
Signatures: make([]a2alegacy.AgentCardSignature, len(card.Signatures)),
Version: card.Version,
SupportsAuthenticatedExtendedCard: card.Capabilities.ExtendedAgentCard,
Capabilities: a2alegacy.AgentCapabilities{
PushNotifications: card.Capabilities.PushNotifications,
Streaming: card.Capabilities.Streaming,
},
}
if len(card.Skills) > 0 {
res.Skills = make([]a2alegacy.AgentSkill, len(card.Skills))
for i, s := range card.Skills {
res.Skills[i] = a2alegacy.AgentSkill{
Description: s.Description,
Examples: s.Examples,
ID: s.ID,
InputModes: s.InputModes,
Name: s.Name,
OutputModes: s.OutputModes,
Tags: s.Tags,
}
}
}
if len(card.Capabilities.Extensions) > 0 {
res.Capabilities.Extensions = make([]a2alegacy.AgentExtension, len(card.Capabilities.Extensions))
for i, e := range card.Capabilities.Extensions {
Expand Down
14 changes: 8 additions & 6 deletions a2acompat/a2av0/jsonrpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ func (h *jsonrpcHandler) handleRequest(ctx context.Context, rw http.ResponseWrit
return
}

if result != nil {
resp := jsonrpc.ServerResponse{JSONRPC: jsonrpc.Version, ID: req.ID, Result: result}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
log.Error(ctx, "failed to encode response", err)
}
resp := jsonrpc.ServerResponse{JSONRPC: jsonrpc.Version, ID: req.ID, Result: result}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
log.Error(ctx, "failed to encode response", err)
}
}

Expand Down Expand Up @@ -295,7 +293,11 @@ func (h *jsonrpcHandler) onSendMessage(ctx context.Context, raw json.RawMessage)
if err != nil || compEvent == nil {
return nil, err
}
return compEvent.(a2alegacy.SendMessageResult), nil
result, ok := compEvent.(a2alegacy.SendMessageResult)
if !ok {
return nil, fmt.Errorf("unexpected event type %T: %w", compEvent, a2a.ErrInvalidAgentResponse)
}
return result, nil
}

func (h *jsonrpcHandler) onResubscribeToTask(ctx context.Context, raw json.RawMessage) iter.Seq2[a2a.Event, error] {
Expand Down
7 changes: 6 additions & 1 deletion a2acompat/a2av0/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/a2aproject/a2a-go/v2/a2aclient"
"github.com/a2aproject/a2a-go/v2/a2asrv"
"github.com/a2aproject/a2a-go/v2/a2asrv/taskstore"
"github.com/a2aproject/a2a-go/v2/log"
)

// NewServerInterceptor adapts a legacy server call interceptor to the v1 interceptor interface.
Expand Down Expand Up @@ -322,7 +323,11 @@ func fromV1Payload(payload any) any {
case *a2a.ListTasksResponse:
return FromV1ListTasksResponse(v)
case a2a.Event:
legacy, _ := FromV1Event(v)
legacy, err := FromV1Event(v)
if err != nil {
log.Warn(context.Background(), "failed to convert v1 event to legacy", "error", err)
Comment thread
yarolegovich marked this conversation as resolved.
return payload
}
return legacy
default:
return payload
Expand Down
2 changes: 1 addition & 1 deletion a2agrpc/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *grpcTransport) ListTasks(ctx context.Context, params a2aclient.ServiceP

pbResp, err := c.client.ListTasks(withGRPCMetadata(ctx, params), pbReq)
if err != nil {
return nil, err
return nil, grpcutil.FromGRPCError(err)
}

return pbconv.FromProtoListTasksResponse(pbResp)
Expand Down
5 changes: 4 additions & 1 deletion a2apb/v0/pbconv/to_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pbconv

import (
"fmt"
"maps"
"slices"

"github.com/a2aproject/a2a-go/a2apb"
Expand Down Expand Up @@ -339,7 +340,9 @@ func toProtoDataPart(part *a2a.Part) (*a2apb.Part, error) {
// Version 0.3 clients expect a map, so we wrap non-map values.
m := map[string]any{"value": dataContent.Value}
if part.Metadata == nil {
part.Metadata = make(map[string]any)
part.Metadata = map[string]any{}
} else {
part.Metadata = maps.Clone(part.Metadata)
}
part.Metadata["data_part_compat"] = true
s, err = toProtoMap(m)
Expand Down
7 changes: 0 additions & 7 deletions a2asrv/agentexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ type processor struct {
pushSender push.Sender
execCtx *ExecutorContext
store taskstore.Store

processedCount int
}

var _ taskexec.Processor = (*processor)(nil)
Expand Down Expand Up @@ -407,11 +405,6 @@ func (p *processor) Process(ctx context.Context, event a2a.Event) (*taskexec.Pro
// ProcessError implements taskexec.ProcessError interface method.
// Here we can try handling producer or queue error by moving the task to failed state and making it the execution result.
func (p *processor) ProcessError(ctx context.Context, cause error) (a2a.SendMessageResult, error) {
if p.execCtx.StoredTask == nil && p.processedCount == 0 {
// there was no task in the store, don't create one in failed state and allow to error to be propagated to the client
return nil, cause
}

versioned, err := p.updateManager.SetTaskFailed(ctx, nil, cause)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions a2asrv/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (h *defaultRequestHandler) SendMessage(ctx context.Context, req *a2a.SendMe
return res, nil
}

if lastEvent == nil {
return nil, fmt.Errorf("execution finished without producing any events: %w", a2a.ErrInvalidAgentResponse)
}

task, err := h.taskStore.Get(ctx, lastEvent.TaskInfo().TaskID)
if err != nil {
return nil, fmt.Errorf("failed to load result after execution finished: %w", err)
Expand Down
12 changes: 5 additions & 7 deletions a2asrv/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,10 @@ func (h *jsonrpcHandler) handleRequest(ctx context.Context, rw http.ResponseWrit
return
}

if result != nil {
rw.Header().Set("Content-Type", "application/json")
resp := jsonrpc.ServerResponse{JSONRPC: jsonrpc.Version, ID: req.ID, Result: result}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
log.Error(ctx, "failed to encode response", err)
}
rw.Header().Set("Content-Type", "application/json")
resp := jsonrpc.ServerResponse{JSONRPC: jsonrpc.Version, ID: req.ID, Result: result}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
log.Error(ctx, "failed to encode response", err)
}
}

Expand All @@ -151,7 +149,7 @@ func (h *jsonrpcHandler) handleStreamingRequest(ctx context.Context, rw http.Res

sseWriter.WriteHeaders()

sseChan, panicChan := make(chan []byte), make(chan error)
sseChan, panicChan := make(chan []byte), make(chan error, 1)
requestCtx, cancelExecCtx := context.WithCancel(ctx)
defer cancelExecCtx()
go func() {
Expand Down
Loading
Loading