Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes:
### Fixed

- `gateway`: Fixed duplicate peer IDs appearing in retrieval timeout error messages
- `bitswap/client`: fix tracing by using context to pass trace and retrieval state to session [#1059](https://github.com/ipfs/boxo/pull/1059)

### Security

Expand Down
23 changes: 13 additions & 10 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
}

sessionFactory := func(
sessctx context.Context,
sessmgr bssession.SessionManager,
id uint64,
spm bssession.SessionPeerManager,
Expand All @@ -309,7 +310,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
provSearchDelay time.Duration,
rebroadcastDelay time.Duration,
self peer.ID,
retrievalState *retrieval.State,
) bssm.Session {
// careful when bs.pqm is nil. Since we are type-casting it
// into routing.ContentDiscovery when passing it, it will become
Expand All @@ -321,7 +321,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
} else if providerFinder != nil {
sessionProvFinder = providerFinder
}
return bssession.New(sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge, retrievalState)
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge)
}
sessionPeerManagerFactory := func(id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network)
Expand Down Expand Up @@ -446,19 +446,26 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()

session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx))
// Temporary session closed indepentendly of cancellation ctx.
sessCtx, cancelSession := context.WithCancel(context.Background())

// Preserve retrieval.State from the original request context if present
if retrievalState := retrieval.StateFromContext(ctx); retrievalState != nil {
sessCtx = context.WithValue(sessCtx, retrieval.ContextKey, retrievalState)
}
session := bs.sm.NewSession(sessCtx, bs.provSearchDelay, bs.rebroadcastDelay)
Comment on lines +449 to +456
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gammazero do you remember why this is separate session? do we detach if from trace on purpose?

IIUC without passing trace provider discovery operations within this temporary session won't be traced, making debugging harder? Or is this decreasing the noise?

Copy link
Contributor Author

@gammazero gammazero Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that a separate context was used for the session, so that the context passes into GetBlocks would only cancel the call to session.GetBlocks and not cancel the session itself. This allows the goroutine created here to cancel/close the session when finished reading blocks.

The purpose of doing this is so that the caller does not have to explicitly cancel the context passed into this function, which otherwise is required to prevent leaking the session created here, since otherwise there is no way to cancel/close the session.

Looking at this code, it introduces a defect in that it prevents the propagation of tracing state carried by the context passed into GetBlocks. This can be fixed by making the passed in context be the parent of the session context, or by simply returning the blocks channel and depending on the caller to cancel the passed in context.

@lidel See #1060 for fix.


blocksChan, err := session.GetBlocks(ctx, keys)
if err != nil {
session.Close()
cancelSession()
return nil, err
}

out := make(chan blocks.Block)
go func() {
defer func() {
close(out)
session.Close()
cancelSession()
}()

ctxDone := ctx.Done()
Expand Down Expand Up @@ -717,9 +724,5 @@ func (bs *Client) NewSession(ctx context.Context) exchange.Fetcher {
ctx, span := internal.StartSpan(ctx, "NewSession")
defer span.End()

session := bs.sm.NewSession(bs.provSearchDelay, bs.rebroadcastDelay, retrieval.StateFromContext(ctx))
context.AfterFunc(ctx, func() {
session.Close()
})
return session
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
32 changes: 10 additions & 22 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
notifications "github.com/ipfs/boxo/bitswap/client/internal/notifications"
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
"github.com/ipfs/boxo/retrieval"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -133,20 +132,19 @@ type Session struct {
self peer.ID
}

// New creates a new bitswap session.
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
//
// The session maintains its own internal context for operations and is not
// tied to any external context lifecycle. The caller MUST call Close() when
// the session is no longer needed to ensure proper cleanup. When sessions are
// created via Client.NewSession(ctx), automatic cleanup via context.AfterFunc
// is provided.
// The caller MUST call Close() or cancel the context when the session is no
// longer needed to ensure proper cleanup.
//
// The retrievalState parameter, if provided, enables diagnostic tracking of
// the retrieval process. It is attached to the session's internal context and
// used to track provider discovery, connection attempts, and data retrieval
// phases. This is particularly useful for debugging timeout errors and
// understanding retrieval performance.
func New(
ctx context.Context,
sm SessionManager,
id uint64,
sprm SessionPeerManager,
Expand All @@ -159,14 +157,8 @@ func New(
periodicSearchDelay time.Duration,
self peer.ID,
havesReceivedGauge bspm.Gauge,
retrievalState *retrieval.State,
) *Session {
ctx, cancel := context.WithCancel(context.Background())

// If a retrieval state is passed in, then keep it in the context.
if retrievalState != nil {
ctx = context.WithValue(ctx, retrieval.ContextKey, retrievalState)
}
ctx, cancel := context.WithCancel(ctx)

s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
Expand Down Expand Up @@ -198,14 +190,10 @@ func (s *Session) ID() uint64 {
return s.id
}

// Close terminates the session and cleans up its resources. This method MUST
// be called when the session is no longer needed to avoid resource leaks.
// After calling Close, the session should not be used anymore.
//
// Session lifecycle is independent of the context used to create requests -
// canceling a request context does not close the session. When context-based
// automation is desired, Client.NewSession(ctx) uses context.AfterFunc to
// automatically close sessions when the context is canceled.
// Close terminates the session and cleans up its resources. This method must
// be called, or the context used to create the session must be canceled, when
// the session is no longer needed to avoid resource leaks. After calling
// Close, the session should not be used anymore.
func (s *Session) Close() {
s.cancel()
}
Expand Down
47 changes: 27 additions & 20 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func (pm *fakePeerManager) BroadcastWantHaves(cids []cid.Cid) {
func (pm *fakePeerManager) SendCancels(cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -164,15 +167,15 @@ func TestSessionGetBlocks(t *testing.T) {
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr(sim)
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)
defer session.Close()
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}

_, err := session.GetBlocks(context.Background(), cids)
_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

// Wait for initial want request
Expand Down Expand Up @@ -234,6 +237,9 @@ func TestSessionGetBlocks(t *testing.T) {
}

func TestSessionFindMorePeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -243,7 +249,7 @@ func TestSessionFindMorePeers(t *testing.T) {
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr(sim)
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)
defer session.Close()
session.SetBaseTickDelay(200 * time.Microsecond)
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
Expand All @@ -252,9 +258,6 @@ func TestSessionFindMorePeers(t *testing.T) {
cids = append(cids, block.Cid())
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

Expand Down Expand Up @@ -306,6 +309,8 @@ func TestSessionFindMorePeers(t *testing.T) {
}

func TestSessionOnPeersExhausted(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -316,17 +321,14 @@ func TestSessionOnPeersExhausted(t *testing.T) {
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr(sim)
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)
defer session.Close()
blks := random.BlocksOfSize(broadcastLiveWantsLimit+5, blockSize)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

Expand All @@ -347,6 +349,8 @@ func TestSessionOnPeersExhausted(t *testing.T) {
}

func TestSessionFailingToGetFirstBlock(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -356,17 +360,14 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr(sim)
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, 100*time.Millisecond, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, 100*time.Millisecond, "", nil)
defer session.Close()
blks := random.BlocksOfSize(4, blockSize)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

Expand Down Expand Up @@ -461,7 +462,9 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
sm := newMockSessionMgr(sim)

// Create a new session with its own context
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer sesscancel()
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)
defer session.Close()

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
Expand All @@ -475,8 +478,8 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
getBlocksCh, err := session.GetBlocks(getctx, []cid.Cid{blks[0].Cid()})
require.NoError(t, err, "error getting blocks")

// Cancel the session
session.Close()
// Cancel the session context
sesscancel()

// Expect the GetBlocks() channel to be closed
select {
Expand All @@ -493,6 +496,8 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
}

func TestSessionOnShutdownCalled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -504,7 +509,7 @@ func TestSessionOnShutdownCalled(t *testing.T) {
sm := newMockSessionMgr(sim)

// Create a new session
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)

// Shutdown the session
session.Close()
Expand All @@ -516,6 +521,8 @@ func TestSessionOnShutdownCalled(t *testing.T) {
}

func TestSessionReceiveMessageAfterClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -526,13 +533,13 @@ func TestSessionReceiveMessageAfterClose(t *testing.T) {
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr(sim)
session := New(sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil, nil)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, time.Minute, "", nil)
defer session.Close()

blks := random.BlocksOfSize(2, blockSize)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

_, err := session.GetBlocks(context.Background(), cids)
_, err := session.GetBlocks(ctx, cids)
require.NoError(t, err, "error getting blocks")

// Wait for initial want request
Expand Down
24 changes: 7 additions & 17 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@ import (
bssession "github.com/ipfs/boxo/bitswap/client/internal/session"
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
exchange "github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/retrieval"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Session represents a bitswap session managed by the SessionManager.
// Sessions have their own lifecycle independent of request contexts and
// must be explicitly closed via the Close() method when no longer needed.
type Session interface {
exchange.Fetcher
ID() uint64
Expand All @@ -31,6 +28,7 @@ type Session interface {

// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(
ctx context.Context,
sm bssession.SessionManager,
id uint64,
sprm bssession.SessionPeerManager,
Expand All @@ -40,8 +38,7 @@ type SessionFactory func(
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay time.Duration,
self peer.ID,
retrievalState *retrieval.State) Session
self peer.ID) Session

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(id uint64) bssession.SessionPeerManager
Expand Down Expand Up @@ -85,23 +82,16 @@ func New(sessionFactory SessionFactory, sessionInterestManager *bssim.SessionInt

// NewSession initializes a session and adds to the session manager.
//
// The session is created with its own internal context and lifecycle management.
// The retrievalState parameter, if provided, will be attached to the session's
// internal context to enable diagnostic tracking of the retrieval process. This
// includes tracking provider discovery attempts, peer connections, and data
// retrieval phases, which is particularly useful for debugging timeout errors.
//
// The returned Session must be closed via its Close() method when no longer needed.
// Note: When sessions are created via Client.NewSession(ctx), automatic cleanup
// via context.AfterFunc is provided.
func (sm *SessionManager) NewSession(provSearchDelay, rebroadcastDelay time.Duration, retrievalState *retrieval.State) Session {
// The returned Session must be closed via its Close() method, or by canceling
// the context, when no longer needed.
func (sm *SessionManager) NewSession(ctx context.Context, provSearchDelay, rebroadcastDelay time.Duration) Session {
id := sm.GetNextSessionID()

_, span := internal.StartSpan(context.Background(), "SessionManager.NewSession", trace.WithAttributes(attribute.String("ID", strconv.FormatUint(id, 10))))
ctx, span := internal.StartSpan(ctx, "SessionManager.NewSession", trace.WithAttributes(attribute.String("ID", strconv.FormatUint(id, 10))))
defer span.End()

pm := sm.peerManagerFactory(id)
session := sm.sessionFactory(sm, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self, retrievalState)
session := sm.sessionFactory(ctx, sm, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)

sm.sessLk.Lock()
if sm.sessions != nil { // check if SessionManager was shutdown
Expand Down
Loading
Loading