diff --git a/src/System.IO.Pipelines/System.IO.Pipelines.sln b/src/System.IO.Pipelines/System.IO.Pipelines.sln index cc8c4cbdb892..8d8b0ee1d3eb 100644 --- a/src/System.IO.Pipelines/System.IO.Pipelines.sln +++ b/src/System.IO.Pipelines/System.IO.Pipelines.sln @@ -34,10 +34,10 @@ Global {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Debug|Any CPU.Build.0 = netcoreapp-Debug|Any CPU {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.ActiveCfg = netcoreapp-Release|Any CPU {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.Build.0 = netcoreapp-Release|Any CPU - {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU - {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU - {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU - {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netcoreapp-Debug|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netcoreapp-Debug|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netcoreapp-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netcoreapp-Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj b/src/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj index a3f7344af816..dfb93033bdb1 100644 --- a/src/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj +++ b/src/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj @@ -7,7 +7,7 @@ - net461;netcoreapp2.0;uap10.0.16299;$(AllXamarinFrameworks) + netcoreapp3.0;$(UAPvNextTFM);$(AllXamarinFrameworks) diff --git a/src/System.IO.Pipelines/ref/Configurations.props b/src/System.IO.Pipelines/ref/Configurations.props index 2870dced4592..361fc95318ff 100644 --- a/src/System.IO.Pipelines/ref/Configurations.props +++ b/src/System.IO.Pipelines/ref/Configurations.props @@ -1,6 +1,10 @@  + + netcoreapp; + + $(PackageConfigurations); netstandard; diff --git a/src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj b/src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj index f7335c97600f..7e974069d512 100644 --- a/src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj +++ b/src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj @@ -1,10 +1,7 @@ {9C524CA0-92FF-437B-B568-BCE8A794A69A} - netstandard-Debug;netstandard-Release - - netcoreapp2.0 + netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release @@ -16,4 +13,10 @@ - + + + + + + + \ No newline at end of file diff --git a/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj index c537773c3d29..fc6eea834871 100644 --- a/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -2,6 +2,7 @@ {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977} netcoreapp-Debug;netcoreapp-Debug;netcoreapp-Release;netcoreapp-Release;netstandard-Debug;netstandard-Release + $(DefineConstants);netstandard @@ -58,4 +59,7 @@ + + + \ No newline at end of file diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs index 340c13ed7a9f..3e29fb57b0c4 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs @@ -4,12 +4,14 @@ using System.Buffers; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; namespace System.IO.Pipelines { - internal class StreamPipeReader : PipeReader + internal class StreamPipeReader : PipeReader, IValueTaskSource { internal const int InitialSegmentPoolSize = 4; // 16K internal const int MaxSegmentPoolSize = 256; // 1MB @@ -28,11 +30,19 @@ internal class StreamPipeReader : PipeReader private BufferSegment _readTail; private long _bufferedBytes; private bool _examinedEverything; - private object _lock = new object(); + private readonly object _lock = new object(); // Mutable struct! Don't make this readonly private BufferSegmentStack _bufferSegmentPool; - private bool _leaveOpen; + private readonly bool _leaveOpen; + + // State for async reads + private volatile bool _readInProgress; + private readonly Action _onReadCompleted; + private ManualResetValueTaskSourceCore _readMrvts; + private ValueTaskAwaiter _readAwaiter; + private CancellationToken _readCancellation; + private CancellationTokenRegistration _readRegistration; /// /// Creates a new StreamPipeReader. @@ -53,6 +63,7 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options) _pool = options.Pool == MemoryPool.Shared ? null : options.Pool; _bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize); _leaveOpen = options.LeaveOpen; + _onReadCompleted = new Action(OnReadCompleted); } /// @@ -72,11 +83,7 @@ private CancellationTokenSource InternalTokenSource { lock (_lock) { - if (_internalTokenSource == null) - { - _internalTokenSource = new CancellationTokenSource(); - } - return _internalTokenSource; + return (_internalTokenSource ??= new CancellationTokenSource()); } } } @@ -188,31 +195,40 @@ public override void Complete(Exception exception = null) } /// - public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) { - // TODO ReadyAsync needs to throw if there are overlapping reads. - ThrowIfCompleted(); - - // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock) - CancellationTokenSource tokenSource = InternalTokenSource; - if (TryReadInternal(tokenSource, out ReadResult readResult)) + if (_readInProgress) { - return readResult; + // Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited + // so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited). + ThrowConcurrentReadsNotSupported(); } + _readInProgress = true; - if (_isStreamCompleted) + bool isAsync = false; + try { - return new ReadResult(buffer: default, isCanceled: false, isCompleted: true); - } - var reg = new CancellationTokenRegistration(); - if (cancellationToken.CanBeCanceled) - { - reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this); - } + ThrowIfCompleted(); + + // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock) + CancellationTokenSource tokenSource = InternalTokenSource; + if (TryReadInternal(tokenSource, out ReadResult readResult)) + { + return new ValueTask(readResult); + } + + if (_isStreamCompleted) + { + return new ValueTask(new ReadResult(buffer: default, isCanceled: false, isCompleted: true)); + } + + var reg = new CancellationTokenRegistration(); + if (cancellationToken.CanBeCanceled) + { + reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this); + } - using (reg) - { var isCanceled = false; try { @@ -220,7 +236,18 @@ public override async ValueTask ReadAsync(CancellationToken cancella Memory buffer = _readTail.AvailableMemory.Slice(_readTail.End); - int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false); + ValueTask resultTask = InnerStream.ReadAsync(buffer, tokenSource.Token); + int length; + if (resultTask.IsCompletedSuccessfully) + { + length = resultTask.Result; + } + else + { + isAsync = true; + // Need to go async + return CompleteReadAsync(resultTask, cancellationToken, reg); + } Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length); @@ -247,8 +274,27 @@ public override async ValueTask ReadAsync(CancellationToken cancella } } + finally + { + if (!isAsync) + { + reg.Dispose(); + } + } - return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted); + return new ValueTask(new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted)); + } + catch (Exception ex) + { + return new ValueTask(Task.FromException(ex)); + } + finally + { + if (!isAsync) + { + Debug.Assert(_readInProgress); + _readInProgress = false; + } } } @@ -270,6 +316,11 @@ private void ThrowIfCompleted() public override bool TryRead(out ReadResult result) { + if (_readInProgress) + { + ThrowConcurrentReadsNotSupported(); + } + ThrowIfCompleted(); return TryReadInternal(InternalTokenSource, out result); @@ -357,5 +408,106 @@ private void Cancel() { InternalTokenSource.Cancel(); } + + static void ThrowConcurrentReadsNotSupported() + { + throw new InvalidOperationException($"Concurrent reads are not supported; await the {nameof(ValueTask)} before starting next read."); + } + + private ValueTask CompleteReadAsync(ValueTask task, CancellationToken cancellationToken, CancellationTokenRegistration reg) + { + Debug.Assert(_readInProgress, "Read not in progress"); + + _readCancellation = cancellationToken; + _readRegistration = reg; + + _readAwaiter = task.GetAwaiter(); + + return new ValueTask(this, _readMrvts.Version); + } + + private void OnReadCompleted() + { + try + { + int length = _readAwaiter.GetResult(); + + Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length); + + _readTail.End += length; + _bufferedBytes += length; + + if (length == 0) + { + _isStreamCompleted = true; + } + + _readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: false, _isStreamCompleted)); + } + catch (OperationCanceledException oce) + { + // Get the source before clearing (and replacing) + CancellationTokenSource tokenSource = InternalTokenSource; + ClearCancellationToken(); + if (tokenSource.IsCancellationRequested && !_readCancellation.IsCancellationRequested) + { + // Catch cancellation and translate it into setting isCanceled = true + _readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: true, _isStreamCompleted)); + } + else + { + _readMrvts.SetException(oce); + } + } + catch (Exception ex) + { + _readMrvts.SetException(ex); + } + finally + { + _readRegistration.Dispose(); + _readRegistration = default; + } + } + + ReadResult IValueTaskSource.GetResult(short token) + { + ValidateReading(); + ReadResult result = _readMrvts.GetResult(token); + + _readCancellation = default; + _readAwaiter = default; + _readMrvts.Reset(); + + Debug.Assert(_readInProgress); + _readInProgress = false; + + return result; + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + => _readMrvts.GetStatus(token); + + void IValueTaskSource.OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) + { + ValidateReading(); + _readMrvts.OnCompleted(continuation, state, token, flags); + + _readAwaiter.UnsafeOnCompleted(_onReadCompleted); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ValidateReading() + { + if (!_readInProgress) + { + ThrowReadNotInProgress(); + } + + static void ThrowReadNotInProgress() + { + throw new InvalidOperationException("Read not in progress"); + } + } } } diff --git a/src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs b/src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs index c0ce8a36a3b9..3bf8a38d698f 100644 --- a/src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs +++ b/src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs @@ -120,6 +120,27 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes) pipe.Reader.Complete(); } + [Fact] + public void ConcurrentReadsThrow() + { + var pipe = new Pipe(); + var options = new StreamPipeReaderOptions(bufferSize: 4096); + PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options); + + ValueTask valueTask = reader.ReadAsync(); + + Assert.False(valueTask.IsCompleted); + + Assert.Throws(() => reader.ReadAsync()); + Assert.Throws(() => reader.TryRead(out _)); + + Assert.False(valueTask.IsCompleted); + + reader.Complete(); + + pipe.Reader.Complete(); + } + [Theory] [MemberData(nameof(ReadSettings))] public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, int minimumReadSize, int[] readBufferSizes)