Skip to content

bitswap/session: context-aware lifecycle #1052

@dennis-tra

Description

@dennis-tra

I'm trying to trace a content retrieval request via ipfs get ... with the actual bitswap interactions.

This is tricky though because a bitswap client session creates a new disparate context. Then I also found this comment for the Close() method which only cancels the session context:

// Session lifecycle is independent of the context used to create requests -
// canceling a request context does not close the session. When context-based
// automation is desired, Client.NewSession(ctx) uses context.AfterFunc to
// automatically close sessions when the context is canceled.
func (s *Session) Close() {
	s.cancel()
}

@gammazero I saw that this comment is from you. Is the meaning descriptive or normative? Is it just describing how it's working or stating how it's supposed to work?

Looking at the session lifecycle (checking where new sessions are created and closed) I can see that they are always tied to the context that instantiated the session. This to me indicates that a separate context for the session isn't needed which would enable proper context propagation and improve the tracing. As a nice byproduct, the retrievalState wouldn't need to be passed manually through the stack.

New sessions are created here:

// This method creates a temporary internal session that is automatically cleaned up
// when the operation completes. Any retrieval.State attached to the context will be
// preserved and used for diagnostic tracking throughout the retrieval process.
func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()
session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx))
blocksChan, err := session.GetBlocks(ctx, keys)
if err != nil {
session.Close()
return nil, err
}
out := make(chan blocks.Block)
go func() {
defer func() {
close(out)
session.Close()
}()
ctxDone := ctx.Done()
for {
select {
case blk, ok := <-blocksChan:
if !ok {
return
}
select {
case out <- blk:
case <-ctxDone:
return
}
case <-ctxDone:
return
}
}
}()
return out, nil
}

and here:

// NewSession generates a new Bitswap session. You should use this, rather
// that calling Client.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from blockservice, it will create a bitswap session automatically.
//
// The context is used for:
// - Automatic session cleanup: when the context is canceled, the session will be closed
// - Retrieval state tracking: any retrieval.State attached to the context will be
// preserved in the session for diagnostic tracking of provider discovery and data retrieval
//
// The session's lifetime is controlled by the context and requires that the
// context be canceled for the session to be closed.
func (bs *Client) NewSession(ctx context.Context) exchange.Fetcher {
ctx, span := internal.StartSpan(ctx, "NewSession")
defer span.End()
session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx))
context.AfterFunc(ctx, func() {
session.Close()
})
return session
}

Both calls tie the new session to the incoming context.

The same argument then actually trickles down to the sessionWantSender as its lifecycle is tied to the sessions':

func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, canceller SessionWantsCanceller,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn, havesReceivedGauge bspm.Gauge,
) sessionWantSender {
ctx, cancel := context.WithCancel(context.Background())
sws := sessionWantSender{
ctx: ctx,
shutdown: cancel,
closed: make(chan struct{}),
sessionID: sid,
changes: chanqueue.New(chanqueue.WithBaseCapacity[change](changesBufferSize)),
wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
peerRspTrkr: newPeerResponseTracker(),
pm: pm,
spm: spm,
canceller: canceller,
bpm: bpm,
onSend: onSend,
onPeersExhausted: onPeersExhausted,
havesReceivedGauge: havesReceivedGauge,
}
return sws
}

Metadata

Metadata

Assignees

Labels

need/maintainers-inputNeeds input from the current maintainer(s)need/triageNeeds initial labeling and prioritization

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions