-
Notifications
You must be signed in to change notification settings - Fork 153
Description
I'm trying to trace a content retrieval request via ipfs get ... with the actual bitswap interactions.
This is tricky though because a bitswap client session creates a new disparate context. Then I also found this comment for the Close() method which only cancels the session context:
// Session lifecycle is independent of the context used to create requests -
// canceling a request context does not close the session. When context-based
// automation is desired, Client.NewSession(ctx) uses context.AfterFunc to
// automatically close sessions when the context is canceled.
func (s *Session) Close() {
s.cancel()
}@gammazero I saw that this comment is from you. Is the meaning descriptive or normative? Is it just describing how it's working or stating how it's supposed to work?
Looking at the session lifecycle (checking where new sessions are created and closed) I can see that they are always tied to the context that instantiated the session. This to me indicates that a separate context for the session isn't needed which would enable proper context propagation and improve the tracing. As a nice byproduct, the retrievalState wouldn't need to be passed manually through the stack.
New sessions are created here:
Lines 442 to 483 in 0f1ebac
| // This method creates a temporary internal session that is automatically cleaned up | |
| // when the operation completes. Any retrieval.State attached to the context will be | |
| // preserved and used for diagnostic tracking throughout the retrieval process. | |
| func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) { | |
| ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys)))) | |
| defer span.End() | |
| session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx)) | |
| blocksChan, err := session.GetBlocks(ctx, keys) | |
| if err != nil { | |
| session.Close() | |
| return nil, err | |
| } | |
| out := make(chan blocks.Block) | |
| go func() { | |
| defer func() { | |
| close(out) | |
| session.Close() | |
| }() | |
| ctxDone := ctx.Done() | |
| for { | |
| select { | |
| case blk, ok := <-blocksChan: | |
| if !ok { | |
| return | |
| } | |
| select { | |
| case out <- blk: | |
| case <-ctxDone: | |
| return | |
| } | |
| case <-ctxDone: | |
| return | |
| } | |
| } | |
| }() | |
| return out, nil | |
| } |
and here:
Lines 702 to 725 in 0f1ebac
| // NewSession generates a new Bitswap session. You should use this, rather | |
| // that calling Client.GetBlocks, any time you intend to do several related | |
| // block requests in a row. The session returned will have it's own GetBlocks | |
| // method, but the session will use the fact that the requests are related to | |
| // be more efficient in its requests to peers. If you are using a session | |
| // from blockservice, it will create a bitswap session automatically. | |
| // | |
| // The context is used for: | |
| // - Automatic session cleanup: when the context is canceled, the session will be closed | |
| // - Retrieval state tracking: any retrieval.State attached to the context will be | |
| // preserved in the session for diagnostic tracking of provider discovery and data retrieval | |
| // | |
| // The session's lifetime is controlled by the context and requires that the | |
| // context be canceled for the session to be closed. | |
| func (bs *Client) NewSession(ctx context.Context) exchange.Fetcher { | |
| ctx, span := internal.StartSpan(ctx, "NewSession") | |
| defer span.End() | |
| session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx)) | |
| context.AfterFunc(ctx, func() { | |
| session.Close() | |
| }) | |
| return session | |
| } |
Both calls tie the new session to the incoming context.
The same argument then actually trickles down to the sessionWantSender as its lifecycle is tied to the sessions':
boxo/bitswap/client/internal/session/sessionwantsender.go
Lines 117 to 142 in 0f1ebac
| func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, canceller SessionWantsCanceller, | |
| bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn, havesReceivedGauge bspm.Gauge, | |
| ) sessionWantSender { | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| sws := sessionWantSender{ | |
| ctx: ctx, | |
| shutdown: cancel, | |
| closed: make(chan struct{}), | |
| sessionID: sid, | |
| changes: chanqueue.New(chanqueue.WithBaseCapacity[change](changesBufferSize)), | |
| wants: make(map[cid.Cid]*wantInfo), | |
| peerConsecutiveDontHaves: make(map[peer.ID]int), | |
| peerRspTrkr: newPeerResponseTracker(), | |
| pm: pm, | |
| spm: spm, | |
| canceller: canceller, | |
| bpm: bpm, | |
| onSend: onSend, | |
| onPeersExhausted: onPeersExhausted, | |
| havesReceivedGauge: havesReceivedGauge, | |
| } | |
| return sws | |
| } |