Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<Compile Include="System\IO\Pipelines\BufferSegment.cs" />
<Compile Include="System\IO\Pipelines\CompletionData.cs" />
<Compile Include="System\IO\Pipelines\FlushResult.cs" />
<Compile Include="System\IO\Pipelines\Gate.cs" />
<Compile Include="System\IO\Pipelines\InlineScheduler.cs" />
<Compile Include="System\IO\Pipelines\IDuplexPipe.cs" />
<Compile Include="System\IO\Pipelines\BufferSegmentStack.cs" />
Expand Down
51 changes: 51 additions & 0 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Gate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace System.IO.Pipelines
{
internal class Gate : IValueTaskSource
{
private readonly static Action<object> s_cancelledEvent = (o) => ((Gate)o).OnCancel();

private ManualResetValueTaskSourceCore<VoidResult> _mrvts;
private CancellationTokenRegistration _cancellationRegistration;
private CancellationToken _cancellationToken;

public ValueTask WaitAsync(CancellationToken cancellationToken)
{
_cancellationRegistration = cancellationToken.UnsafeRegister(s_cancelledEvent, this);
_cancellationToken = cancellationToken;

return new ValueTask(this, _mrvts.Version);
}

public void Release()
=> _mrvts.SetResult(default);

private void OnCancel()
=> _mrvts.SetException(new OperationCanceledException(_cancellationToken));

public void GetResult(short token)
{
_mrvts.GetResult(token);
_mrvts.Reset();
_cancellationRegistration.Dispose();
_cancellationToken = default;
}

public ValueTaskSourceStatus GetStatus(short token)
=> _mrvts.GetStatus(token);

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _mrvts.OnCompleted(continuation, state, token, flags);

private struct VoidResult
{
}
}
}
237 changes: 191 additions & 46 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace System.IO.Pipelines
{
Expand All @@ -28,11 +29,13 @@ internal class StreamPipeReader : PipeReader
private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private object _lock = new object();
private readonly object _lock = new object();

// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
private bool _leaveOpen;
private readonly bool _leaveOpen;
private ValueTaskAsyncReader _asyncReader;
private volatile bool _readInProgress;

/// <summary>
/// Creates a new StreamPipeReader.
Expand Down Expand Up @@ -171,6 +174,7 @@ public override void Complete(Exception exception = null)
}

_isReaderCompleted = true;
_asyncReader?.Dispose();

BufferSegment segment = _readHead;
while (segment != null)
Expand All @@ -193,70 +197,107 @@ public override void OnWriterCompleted(Action<Exception, object> callback, objec
}

/// <inheritdoc />
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
bool isAsync = false;
if (_readInProgress)
{
return readResult;
// Throw if there are overlapping reads.
ThrowConcurrentReadsNotSupported();
}
_readInProgress = true;

if (_isStreamCompleted)
try
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
}
ThrowIfCompleted();

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}

using (reg)
{
var isCanceled = false;
try
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
{
AllocateReadTail();

Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);

int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

_readTail.End += length;
_bufferedBytes += length;
return new ValueTask<ReadResult>(readResult);
}

if (length == 0)
{
_isStreamCompleted = true;
}
if (_isStreamCompleted)
{
return new ValueTask<ReadResult>(new ReadResult(buffer: default, isCanceled: false, isCompleted: true));
}
catch (OperationCanceledException)

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
ClearCancellationToken();
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}

if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
using (reg)
{
try
{
// Catch cancellation and translate it into setting isCanceled = true
isCanceled = true;
AllocateReadTail();

Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);

ValueTask<int> resultTask = InnerStream.ReadAsync(buffer, tokenSource.Token);
int length;
if (resultTask.IsCompletedSuccessfully)
{
length = resultTask.Result;
}
else
{
isAsync = true;
ValueTaskAsyncReader asyncReader = (_asyncReader ??= new ValueTaskAsyncReader(this));
return asyncReader.AwaitTask(resultTask, tokenSource, cancellationToken);
}

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

_readTail.End += length;
_bufferedBytes += length;

if (length == 0)
{
_isStreamCompleted = true;
}

return new ValueTask<ReadResult>(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: false, _isStreamCompleted));
}
else
catch (OperationCanceledException oce)
{
throw;
ClearCancellationToken();

if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
// Catch cancellation and translate it into setting isCanceled = true
return new ValueTask<ReadResult>(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: true, _isStreamCompleted));
}
else
{
return new ValueTask<ReadResult>(Task.FromException<ReadResult>(oce));
}
}

}

return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
}
catch (Exception ex)
{
return new ValueTask<ReadResult>(Task.FromException<ReadResult>(ex));
}
finally
{
if (!isAsync)
{
Debug.Assert(_readInProgress);
_readInProgress = false;
}
}
}

static void ThrowConcurrentReadsNotSupported()
{
throw new InvalidOperationException("Concurrent reads are not supported");
}

private void ClearCancellationToken()
{
lock (_lock)
Expand All @@ -275,6 +316,12 @@ private void ThrowIfCompleted()

public override bool TryRead(out ReadResult result)
{
if (_readInProgress)
{
// Throw if there are overlapping reads.
ThrowConcurrentReadsNotSupported();
}

ThrowIfCompleted();

return TryReadInternal(InternalTokenSource, out result);
Expand Down Expand Up @@ -362,5 +409,103 @@ private void Cancel()
{
InternalTokenSource.Cancel();
}

private class ValueTaskAsyncReader : IValueTaskSource<ReadResult>, IDisposable
{
private readonly StreamPipeReader _reader;
private readonly Task _readTask;
private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource();
private readonly Gate _gate = new Gate();
private ManualResetValueTaskSourceCore<ReadResult> _mrvts;

private ValueTask<int> _valueTask;
private CancellationToken _cancellationToken;
private CancellationTokenSource _tokenSource;

public ValueTaskAsyncReader(StreamPipeReader reader)
{
_reader = reader;
_readTask = ReadAsync();
}

public ValueTask<ReadResult> AwaitTask(ValueTask<int> valueTask, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
{
_valueTask = valueTask;
_cancellationToken = cancellationToken;
_tokenSource = tokenSource;
_gate.Release();

return new ValueTask<ReadResult>(this, _mrvts.Version);
}

private async Task ReadAsync()
{
while (!_disposeCts.IsCancellationRequested)
{
try
{
await _gate.WaitAsync(_disposeCts.Token).ConfigureAwait(false);

int length = await _valueTask.ConfigureAwait(false);
_cancellationToken.ThrowIfCancellationRequested();

Debug.Assert(length + _reader._readTail.End <= _reader._readTail.AvailableMemory.Length);

_reader._readTail.End += length;
_reader._bufferedBytes += length;

if (length == 0)
{
_reader._isStreamCompleted = true;
}

_mrvts.SetResult(new ReadResult(_reader.GetCurrentReadOnlySequence(), isCanceled: false, _reader._isStreamCompleted));
}
catch (OperationCanceledException oce)
{
_reader.ClearCancellationToken();

if (_tokenSource.IsCancellationRequested && !_cancellationToken.IsCancellationRequested)
{
// Catch cancellation and translate it into setting isCanceled = true
_mrvts.SetResult(new ReadResult(_reader.GetCurrentReadOnlySequence(), isCanceled: true, _reader._isStreamCompleted));
}
else
{
_mrvts.SetException(oce);
}
}
catch (Exception ex)
{
_mrvts.SetException(ex);
}
}

_disposeCts.Dispose();
}

ReadResult IValueTaskSource<ReadResult>.GetResult(short token)
{
ReadResult result = _mrvts.GetResult(token);

_valueTask = default;
_cancellationToken = default;
_tokenSource = null;
_mrvts.Reset();

Debug.Assert(_reader._readInProgress);
_reader._readInProgress = false;

return result;
}

ValueTaskSourceStatus IValueTaskSource<ReadResult>.GetStatus(short token)
=> _mrvts.GetStatus(token);

void IValueTaskSource<ReadResult>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _mrvts.OnCompleted(continuation, state, token, flags);

public void Dispose() => _disposeCts.Cancel();
}
}
}