Skip to content

Commit dca255b

Browse files
fix/updates to use context passed in New function for context cancellation (#1081)
* fix/updates to use context passed in New function for context cancellation * updated test to fix CI * fix context --------- Co-authored-by: guillaumemichel <guillaume@michel.id>
1 parent 4304f67 commit dca255b

8 files changed

Lines changed: 27 additions & 26 deletions

File tree

dht.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
195195
return nil, err
196196
}
197197

198-
dht, err := makeDHT(h, cfg)
198+
dht, err := makeDHT(ctx, h, cfg)
199199
if err != nil {
200200
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
201201
}
@@ -280,7 +280,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
280280
return dht
281281
}
282282

283-
func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
283+
func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
284284
var protocols, serverProtocols []protocol.ID
285285

286286
v1proto := cfg.ProtocolPrefix + kad1
@@ -352,20 +352,20 @@ func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
352352
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
353353
}
354354

355+
// create a tagged context derived from the original context
356+
// the DHT context should be done when the process is closed
357+
dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(ctx))
358+
355359
// rt refresh manager
356360
dht.rtRefreshManager, err = makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
357361
if err != nil {
358362
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
359363
}
360364

361-
// create a tagged context derived from the original context
362-
// the DHT context should be done when the process is closed
363-
dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(context.Background()))
364-
365365
if cfg.ProviderStore != nil {
366366
dht.providerStore = cfg.ProviderStore
367367
} else {
368-
dht.providerStore, err = providers.NewProviderManager(h.ID(), dht.peerstore, cfg.Datastore)
368+
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
369369
if err != nil {
370370
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
371371
}
@@ -402,6 +402,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
402402
}
403403

404404
r, err := rtrefresh.NewRtRefreshManager(
405+
dht.ctx,
405406
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
406407
keyGenFnc,
407408
queryFnc,

fullrt/dht.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
184184
ctx, cancel := context.WithCancel(context.Background())
185185

186186
self := h.ID()
187-
pm, err := providers.NewProviderManager(self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
187+
pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
188188
if err != nil {
189189
cancel()
190190
return nil, err

internal/metrics/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,6 @@ func RecordMessageSendErr(ctx context.Context) {
171171
sentMessageErrors.Add(ctx, 1, attrSetOpt)
172172
}
173173

174-
func RecordNetworkSize(ns int64) {
175-
networkSize.Record(context.Background(), int64(ns))
174+
func RecordNetworkSize(ctx context.Context, ns int64) {
175+
networkSize.Record(ctx, int64(ns))
176176
}

lookup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
4848
}
4949

5050
if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
51-
metrics.RecordNetworkSize(int64(ns))
51+
metrics.RecordNetworkSize(dht.ctx, int64(ns))
5252
}
5353

5454
// Reset the refresh timer for this key's bucket since we've just

lookup_optim.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ func (dht *IpfsDHT) optimisticProvide(outerCtx context.Context, keyMH multihash.
114114
return errors.New("can't lookup empty key")
115115
}
116116

117-
// initialize new context for all putProvider operations.
117+
// use dht.ctx for all putProvider operations.
118118
// We don't want to give the outer context to the put operations as we return early before all
119119
// put operations have finished to avoid the long tail of the latency distribution. If we
120120
// provided the outer context the put operations may be cancelled depending on what happens
121121
// with the context on the user side.
122-
putCtx, putCtxCancel := context.WithTimeout(context.Background(), time.Minute)
122+
putCtx, putCtxCancel := context.WithTimeout(dht.ctx, time.Minute)
123123

124124
es, err := dht.newOptimisticState(putCtx, key)
125125
if err != nil {
@@ -177,7 +177,7 @@ func (dht *IpfsDHT) optimisticProvide(outerCtx context.Context, keyMH multihash.
177177
}
178178

179179
if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
180-
metrics.RecordNetworkSize(int64(ns))
180+
metrics.RecordNetworkSize(dht.ctx, int64(ns))
181181
}
182182

183183
// refresh the cpl for this key as the query was successful

providers/providers_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type getProv struct {
117117
}
118118

119119
// NewProviderManager constructor
120-
func NewProviderManager(local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
120+
func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
121121
pm := new(ProviderManager)
122122
pm.self = local
123123
pm.getprovs = make(chan *getProv)
@@ -133,7 +133,7 @@ func NewProviderManager(local peer.ID, ps peerstore.Peerstore, dstore ds.Batchin
133133
if err := pm.applyOptions(opts...); err != nil {
134134
return nil, err
135135
}
136-
pm.ctx, pm.cancel = context.WithCancel(context.Background())
136+
pm.ctx, pm.cancel = context.WithCancel(ctx)
137137
pm.run()
138138
return pm, nil
139139
}

providers/providers_manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestProviderManager(t *testing.T) {
3131
if err != nil {
3232
t.Fatal(err)
3333
}
34-
p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
34+
p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
3535
if err != nil {
3636
t.Fatal(err)
3737
}
@@ -77,7 +77,7 @@ func TestProvidersDatastore(t *testing.T) {
7777
t.Fatal(err)
7878
}
7979

80-
p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
80+
p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
8181
if err != nil {
8282
t.Fatal(err)
8383
}
@@ -164,7 +164,7 @@ func TestProvidesExpire(t *testing.T) {
164164
if err != nil {
165165
t.Fatal(err)
166166
}
167-
p, err := NewProviderManager(mid, ps, ds)
167+
p, err := NewProviderManager(ctx, mid, ps, ds)
168168
if err != nil {
169169
t.Fatal(err)
170170
}
@@ -278,7 +278,7 @@ func TestLargeProvidersSet(t *testing.T) {
278278
t.Fatal(err)
279279
}
280280

281-
p, err := NewProviderManager(mid, ps, dstore)
281+
p, err := NewProviderManager(ctx, mid, ps, dstore)
282282
if err != nil {
283283
t.Fatal(err)
284284
}
@@ -318,7 +318,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
318318
t.Fatal(err)
319319
}
320320

321-
pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
321+
pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
322322
if err != nil {
323323
t.Fatal(err)
324324
}
@@ -347,7 +347,7 @@ func TestWriteUpdatesCache(t *testing.T) {
347347
t.Fatal(err)
348348
}
349349

350-
pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
350+
pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
351351
if err != nil {
352352
t.Fatal(err)
353353
}

rtrefresh/rt_refresh_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ type RtRefreshManager struct {
5757
refreshDoneCh chan struct{} // write to this channel after every refresh
5858
}
5959

60-
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
60+
func NewRtRefreshManager(ctx context.Context, h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
6161
refreshKeyGenFnc func(cpl uint) (string, error),
6262
refreshQueryFnc func(ctx context.Context, key string) error,
6363
refreshPingFnc func(ctx context.Context, p peer.ID) error,
6464
refreshQueryTimeout time.Duration,
6565
refreshInterval time.Duration,
6666
successfulOutboundQueryGracePeriod time.Duration,
67-
refreshDoneCh chan struct{}) (*RtRefreshManager, error) {
68-
69-
ctx, cancel := context.WithCancel(context.Background())
67+
refreshDoneCh chan struct{},
68+
) (*RtRefreshManager, error) {
69+
ctx, cancel := context.WithCancel(ctx)
7070
return &RtRefreshManager{
7171
ctx: ctx,
7272
cancel: cancel,

0 commit comments

Comments
 (0)