Skip to content

Commit e8cf47f

Browse files
xBaankxBaank
andauthored
Optimistic choking (#25)
* wip * wip * wip * wip * wip * wip * Reduce capacity --------- Co-authored-by: xBaank <you@example.com> Co-authored-by: xBaank <xbaank@users.noreply.github.com>
1 parent f8f213f commit e8cf47f

15 files changed

Lines changed: 349 additions & 158 deletions

Netorrent.Tests/Fakes/FakePeerConnection.cs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,51 @@
1+
using Netorrent.Extensions;
12
using Netorrent.P2P;
23
using Netorrent.P2P.Download;
34
using Netorrent.P2P.Measurement;
45
using Netorrent.P2P.Messages;
56
using R3;
67

7-
internal class FakePeerConnection : IPeerConnection
8+
internal class FakePeerConnection(Bitfield myBitfield) : IPeerConnection
89
{
910
public Subject<Block> SentBlocks = new();
10-
public ReactiveProperty<bool> AmChoking => new(true);
11+
public SynchronizedReactiveProperty<bool> AmChoking { get; } = new(true);
1112

12-
public ReactiveProperty<bool> AmInterested => new(false);
13+
public SynchronizedReactiveProperty<bool> AmInterested { get; } = new(false);
1314

14-
public ReactiveProperty<bool> PeerChoking => new(true);
15+
public SynchronizedReactiveProperty<bool> PeerChoking { get; } = new(true);
1516

16-
public ReactiveProperty<bool> PeerInterested => new(false);
17+
public SynchronizedReactiveProperty<bool> PeerInterested { get; } = new(false);
1718

1819
public TimeSpan ConnectionDuration => throw new NotImplementedException();
1920

20-
public SpeedTracker DownloadSpeedTracker => throw new NotImplementedException();
21+
public SpeedTracker DownloadTracker => new();
2122

22-
public SpeedTracker UploadSpeedTracker => throw new NotImplementedException();
23+
public SpeedTracker UploadTracker => new();
2324

2425
public PeerEndpoint PeerEndpoint => throw new NotImplementedException();
2526

2627
public int RequestedBlocksCount => throw new NotImplementedException();
2728

2829
public int UploadRequestedBlocksCount => _uploadRequestedCount;
2930

30-
public Bitfield MyBitField => throw new NotImplementedException();
31+
public Bitfield MyBitField => myBitfield;
3132

3233
public Bitfield? PeerBitField => throw new NotImplementedException();
3334

3435
public PeerRequestWindow PeerRequestWindow => throw new NotImplementedException();
3536

37+
ReadOnlyReactiveProperty<bool> IPeerConnection.AmChoking => AmChoking;
38+
39+
ReadOnlyReactiveProperty<bool> IPeerConnection.AmInterested => AmInterested;
40+
41+
ReadOnlyReactiveProperty<bool> IPeerConnection.PeerChoking => PeerChoking;
42+
43+
ReadOnlyReactiveProperty<bool> IPeerConnection.PeerInterested => PeerInterested;
44+
45+
public TimeSpan TimeSinceReceivedBlock => 0.Seconds;
46+
47+
public TimeSpan TimeSinceSentBlock => 0.Seconds;
48+
3649
private int _uploadRequestedCount = 0;
3750

3851
public int DecrementRequestedBlock()
@@ -72,9 +85,13 @@ public bool TrySendRequest(RequestBlock nextBlock)
7285
throw new NotImplementedException();
7386
}
7487

75-
public bool TrySendUnchoked()
88+
public async ValueTask UnchokeAsync(CancellationToken cancellationToken)
7689
{
7790
AmChoking.Value = false;
78-
return true;
91+
}
92+
93+
public async ValueTask ChokeAsync(CancellationToken cancellationToken)
94+
{
95+
AmChoking.Value = true;
7996
}
8097
}

Netorrent.Tests/Fakes/FakeUploadScheduler.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ namespace Netorrent.Tests.Fakes;
66

77
internal class FakeUploadScheduler : IUploadScheduler
88
{
9+
public async ValueTask AddPeerAsync(
10+
IPeerConnection peerConnection,
11+
CancellationToken cancellationToken
12+
) { }
13+
914
public ValueTask AddRequestAsync(RequestBlock request, CancellationToken cancellationToken)
1015
{
1116
throw new NotImplementedException();
@@ -18,18 +23,10 @@ public void CancelRequest(RequestBlock request)
1823

1924
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
2025

21-
public ValueTask FreeSlotAsync(
26+
public async ValueTask RemovePeerAsync(
2227
IPeerConnection peerConnection,
2328
CancellationToken cancellationToken
24-
) => ValueTask.CompletedTask;
25-
26-
public ValueTask RequestSlotAsync(
27-
IPeerConnection peerConnection,
28-
CancellationToken cancellationToken
29-
)
30-
{
31-
throw new NotImplementedException();
32-
}
29+
) { }
3330

3431
public Task StartAsync(CancellationToken cancellationToken) =>
3532
Task.Delay(-1, cancellationToken);

Netorrent.Tests/P2P/PeerConectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public async Task Should_Send_Unchoke(CancellationToken cancellationToken)
5555

5656
_ = peerConnection.StartAsync(cancellationToken);
5757
var statesTask = peerConnection.AmChoking.Take(2).ToListAsync(cancellationToken);
58-
peerConnection.TrySendUnchoked();
58+
await peerConnection.UnchokeAsync(cancellationToken);
5959
var states = await statesTask;
6060

6161
states.First().ShouldBeTrue();

Netorrent.Tests/P2P/UploadSchedulerTests.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,27 @@ public class UploadSchedulerTests
1515
public async Task Should_Upload_To_Peer(CancellationToken cancellationToken)
1616
{
1717
var logger = NullLogger.Instance;
18-
await using var peerConnection = new FakePeerConnection();
18+
var bitfield = new Bitfield(5, true);
19+
await using var peerConnection = new FakePeerConnection(bitfield);
1920
await using var uploadScheduler = new UploadScheduler(
2021
new FakePieceStorage(),
21-
new Bitfield(5, true),
22+
bitfield,
2223
new TransferStatistics(10),
2324
logger
2425
);
2526
var requestBlock = new RequestBlock(0, 0, 0);
26-
var amChokingTask = peerConnection.AmChoking.FirstAsync(cancellationToken);
27+
var amChokingTask = peerConnection.AmChoking.FirstAsync(i => i == false, cancellationToken);
2728
var blockTask = peerConnection.SentBlocks.FirstAsync(cancellationToken);
29+
peerConnection.PeerInterested.Value = true;
2830
requestBlock.RequestedFrom.Add(peerConnection);
29-
await uploadScheduler.RequestSlotAsync(peerConnection, cancellationToken);
30-
await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken);
31+
await uploadScheduler.AddPeerAsync(peerConnection, cancellationToken);
3132
var uploadTask = uploadScheduler.StartAsync(cancellationToken);
33+
var chokeState = await amChokingTask;
34+
await uploadScheduler.AddRequestAsync(requestBlock, cancellationToken);
3235
var block = await blockTask;
3336
block.Index.ShouldBe(0);
3437
block.Begin.ShouldBe(0);
3538
block.Payload.Length.ShouldBe(0);
36-
(await amChokingTask).ShouldBe(true);
39+
chokeState.ShouldBe(false);
3740
}
3841
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Netorrent.Other;
2+
3+
namespace Netorrent.Extensions;
4+
5+
public static class SemaphoreSlimExtensions
6+
{
7+
extension(SemaphoreSlim semaphoreSlim)
8+
{
9+
public async Task<SemaphoreSlimDisposable> LockAsync(CancellationToken cancellationToken)
10+
{
11+
await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
12+
return new SemaphoreSlimDisposable(semaphoreSlim);
13+
}
14+
}
15+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace Netorrent.Other;
2+
3+
public readonly struct SemaphoreSlimDisposable(SemaphoreSlim semaphoreSlim) : IDisposable
4+
{
5+
public void Dispose() => semaphoreSlim.Release();
6+
}

Netorrent/P2P/Download/PeerRequestWindow.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ private void CalculateWindow(long bytesPerSecond)
2020
public Timer StartSampling(TimeSpan period, SpeedTracker speedTracker)
2121
{
2222
return new Timer(
23-
_ => CalculateWindow((long)speedTracker.CurrentBps.Bps),
23+
_ => CalculateWindow((long)speedTracker.Speed.Bps),
2424
null,
2525
TimeSpan.Zero,
2626
period

Netorrent/P2P/Download/RequestScheduler.cs

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ ILogger logger
1818
) : IRequestScheduler
1919
{
2020
const int MinPeersForRarity = 6;
21-
const int MinPeers = 6;
22-
const int MaxPeers = 10;
2321

2422
private readonly Channel<Block> _receiveBlocksChannel = Channel.CreateBounded<Block>(
2523
new BoundedChannelOptions(256) { SingleWriter = false, SingleReader = true }
@@ -30,10 +28,8 @@ ILogger logger
3028
);
3129

3230
private readonly HashSet<IPeerConnection> _activePeers = [];
33-
private readonly List<IPeerConnection> _interestedPeers = [];
3431
private readonly Dictionary<int, PieceBuffer> _pieceBuffers = [];
3532
private readonly Lock _activePeersLock = new();
36-
private int _maxCurrentPeers = MinPeers;
3733
private CancellationTokenSource? _cts;
3834
private Task? _runningTask;
3935
private bool _disposed;
@@ -183,7 +179,7 @@ private async Task WarmupAsync(CancellationToken cancellationToken)
183179
lock (_activePeersLock)
184180
{
185181
minPeersReady = _activePeers.Count(i =>
186-
i.AmInterested.Value && !i.PeerChoking.Value
182+
i.AmInterested.CurrentValue && !i.PeerChoking.CurrentValue
187183
);
188184
}
189185

@@ -236,25 +232,13 @@ public async ValueTask RequestSlotAsync(
236232
CancellationToken cancellationToken
237233
)
238234
{
239-
bool shouldEnqueue;
240-
241235
lock (_activePeersLock)
242236
{
243-
if (_activePeers.Count >= _maxCurrentPeers)
244-
{
245-
_interestedPeers.Add(peerConnection);
246-
return;
247-
}
248237
_activePeers.Add(peerConnection);
249-
shouldEnqueue = true;
250-
}
251-
252-
if (shouldEnqueue)
253-
{
254-
await _slotsChannel
255-
.Writer.WriteAsync(peerConnection, cancellationToken)
256-
.ConfigureAwait(false);
257238
}
239+
await _slotsChannel
240+
.Writer.WriteAsync(peerConnection, cancellationToken)
241+
.ConfigureAwait(false);
258242
}
259243

260244
public async ValueTask FreeSlotAsync(
@@ -266,15 +250,7 @@ CancellationToken cancellationToken
266250

267251
lock (_activePeersLock)
268252
{
269-
_interestedPeers.Remove(peerConnection);
270253
_activePeers.Remove(peerConnection);
271-
272-
if (_interestedPeers.Count > 0)
273-
{
274-
nextPeer = _interestedPeers[0];
275-
_interestedPeers.RemoveAt(0);
276-
_activePeers.Add(nextPeer);
277-
}
278254
}
279255

280256
if (nextPeer is not null)

Netorrent/P2P/IPeerConnection.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@ namespace Netorrent.P2P;
77

88
internal interface IPeerConnection : IAsyncDisposable
99
{
10-
ReactiveProperty<bool> AmChoking { get; }
11-
ReactiveProperty<bool> AmInterested { get; }
12-
ReactiveProperty<bool> PeerChoking { get; }
13-
ReactiveProperty<bool> PeerInterested { get; }
10+
ReadOnlyReactiveProperty<bool> AmChoking { get; }
11+
ReadOnlyReactiveProperty<bool> AmInterested { get; }
12+
ReadOnlyReactiveProperty<bool> PeerChoking { get; }
13+
ReadOnlyReactiveProperty<bool> PeerInterested { get; }
1414
TimeSpan ConnectionDuration { get; }
15-
SpeedTracker DownloadSpeedTracker { get; }
16-
SpeedTracker UploadSpeedTracker { get; }
15+
SpeedTracker DownloadTracker { get; }
16+
SpeedTracker UploadTracker { get; }
1717
Bitfield MyBitField { get; }
1818
Bitfield? PeerBitField { get; }
1919
PeerEndpoint PeerEndpoint { get; }
2020
PeerRequestWindow PeerRequestWindow { get; }
2121
int RequestedBlocksCount { get; }
2222
int UploadRequestedBlocksCount { get; }
23+
TimeSpan TimeSinceReceivedBlock { get; }
24+
TimeSpan TimeSinceSentBlock { get; }
2325

2426
int DecrementRequestedBlock();
2527
int DecrementUploadRequested();
@@ -28,6 +30,7 @@ internal interface IPeerConnection : IAsyncDisposable
2830
bool TrySendBlock(Block block);
2931
bool TrySendCancel(RequestBlock request);
3032
bool TrySendRequest(RequestBlock nextBlock);
31-
bool TrySendUnchoked();
33+
ValueTask UnchokeAsync(CancellationToken cancellationToken);
34+
ValueTask ChokeAsync(CancellationToken cancellationToken);
3235
Task StartAsync(CancellationToken cancellationToken);
3336
}

Netorrent/P2P/Measurement/DownloadSpeed.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
/// Mbps, and Gbps. The struct supports arithmetic operations and implicit conversion from a double value representing
88
/// bits per second.</remarks>
99
/// <param name="bps">The download speed in bits per second (bps).</param>
10-
public readonly struct DownloadSpeed(double bps)
10+
public readonly record struct DownloadSpeed(double Bps)
1111
{
12-
public double Bps => bps;
13-
public double Kbps => bps / 1_000d;
14-
public double Mbps => bps / 1_000_000d;
15-
public double Gbps => bps / 1_000_000_000d;
12+
public double Kbps => Bps / 1_000d;
13+
public double Mbps => Bps / 1_000_000d;
14+
public double Gbps => Bps / 1_000_000_000d;
1615

1716
public static implicit operator DownloadSpeed(double bps) => new(bps);
1817

@@ -22,14 +21,18 @@ public readonly struct DownloadSpeed(double bps)
2221
public static DownloadSpeed operator -(DownloadSpeed left, DownloadSpeed right) =>
2322
new(left.Bps - right.Bps);
2423

24+
public static bool operator >(DownloadSpeed left, DownloadSpeed right) => left.Bps > right.Bps;
25+
26+
public static bool operator <(DownloadSpeed left, DownloadSpeed right) => left.Bps < right.Bps;
27+
2528
public override string ToString()
2629
{
27-
if (bps >= 1_000_000_000)
30+
if (Bps >= 1_000_000_000)
2831
return $"{Gbps:F2}/Gbps";
29-
if (bps >= 1_000_000)
32+
if (Bps >= 1_000_000)
3033
return $"{Mbps:F2}/Mbps";
31-
if (bps >= 1_000)
34+
if (Bps >= 1_000)
3235
return $"{Kbps:F2}/Kbps";
33-
return $"{bps:F2}/bps";
36+
return $"{Bps:F2}/bps";
3437
}
3538
}

0 commit comments

Comments
 (0)