Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,23 @@ private async Task WriteOutputAsync(Stream stream)
}
else if (buffer.IsSingleSegment)
{
#if NETCOREAPP2_1
await stream.WriteAsync(buffer.First);
#else
var array = buffer.First.GetArray();
await stream.WriteAsync(array.Array, array.Offset, array.Count);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we know ReadOnlyMemory<byte>.GetArray() succeeds, what's the benefit of calling Stream.WriteAsync(ReadOnlyMemory<byte>)?

Copy link
Copy Markdown
Member Author

@davidfowl davidfowl Feb 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Stream implementation we're calling into doesn't call TryGetArray and wants to pin the memory then it's free here (avoids the GCHandle overhead). It also avoids the conversion from Memory<T> -> T[] (in theory) in a bunch of places. So for example, when writing through SSL Stream we can avoid the extra pinning required to call into SChannel and OpenSSL.

#endif
}
else
{
foreach (var memory in buffer)
{
#if NETCOREAPP2_1
await stream.WriteAsync(memory);
#else
var array = memory.GetArray();
await stream.WriteAsync(array.Array, array.Offset, array.Count);
#endif
}
}
}
Expand Down Expand Up @@ -125,10 +133,14 @@ private async Task ReadInputAsync(Stream stream)

var outputBuffer = Input.Writer.GetMemory(MinAllocBufferSize);

var array = outputBuffer.GetArray();
try
{
#if NETCOREAPP2_1
var bytesRead = await stream.ReadAsync(outputBuffer);
#else
var array = outputBuffer.GetArray();
var bytesRead = await stream.ReadAsync(array.Array, array.Offset, array.Count);
#endif
Input.Writer.Advance(bytesRead);

if (bytesRead == 0)
Expand Down
50 changes: 42 additions & 8 deletions src/Kestrel.Core/Adapter/Internal/LoggingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,34 @@ public override Task FlushAsync(CancellationToken cancellationToken)
public override int Read(byte[] buffer, int offset, int count)
{
int read = _inner.Read(buffer, offset, count);
Log("Read", read, buffer, offset);
Log("Read", new ReadOnlySpan<byte>(buffer, offset, read));
return read;
}

#if NETCOREAPP2_1
public override int Read(Span<byte> destination)
{
int read = _inner.Read(destination);
Log("Read", destination.Slice(0, read));
return read;
}
#endif

public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await _inner.ReadAsync(buffer, offset, count, cancellationToken);
Log("ReadAsync", read, buffer, offset);
Log("ReadAsync", new ReadOnlySpan<byte>(buffer, offset, read));
return read;
}

#if NETCOREAPP2_1
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
int read = await _inner.ReadAsync(destination, cancellationToken);
Log("ReadAsync", destination.Span.Slice(0, read));
return read;
}
#endif

public override long Seek(long offset, SeekOrigin origin)
{
Expand All @@ -102,29 +120,45 @@ public override void SetLength(long value)

public override void Write(byte[] buffer, int offset, int count)
{
Log("Write", count, buffer, offset);
Log("Write", new ReadOnlySpan<byte>(buffer, offset, count));
_inner.Write(buffer, offset, count);
}

#if NETCOREAPP2_1
public override void Write(ReadOnlySpan<byte> source)
{
Log("Write", source);
_inner.Write(source);
}
#endif

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Log("WriteAsync", count, buffer, offset);
Log("WriteAsync", new ReadOnlySpan<byte>(buffer, offset, count));
return _inner.WriteAsync(buffer, offset, count, cancellationToken);
}

private void Log(string method, int count, byte[] buffer, int offset)
#if NETCOREAPP2_1
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
Log("WriteAsync", source.Span);
return _inner.WriteAsync(source, cancellationToken);
}
#endif

private void Log(string method, ReadOnlySpan<byte> buffer)
{
var builder = new StringBuilder($"{method}[{count}] ");
var builder = new StringBuilder($"{method}[{buffer.Length}] ");

// Write the hex
for (int i = offset; i < offset + count; i++)
for (int i = 0; i < buffer.Length; i++)
{
builder.Append(buffer[i].ToString("X2"));
builder.Append(" ");
}
builder.AppendLine();
// Write the bytes as if they were ASCII
for (int i = offset; i < offset + count; i++)
for (int i = 0; i < buffer.Length; i++)
{
builder.Append((char)buffer[i]);
}
Expand Down
29 changes: 22 additions & 7 deletions src/Kestrel.Core/Adapter/Internal/RawStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,43 @@ public override int Read(byte[] buffer, int offset, int count)
{
// ValueTask uses .GetAwaiter().GetResult() if necessary
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
return ReadAsync(new ArraySegment<byte>(buffer, offset, count)).Result;
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).Result;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsync(new ArraySegment<byte>(buffer, offset, count));
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).AsTask();
}

#if NETCOREAPP2_1
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination);
}
#endif

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer != null)
{
_output.Write(new ReadOnlySpan<byte>(buffer, offset, count));
}

await _output.FlushAsync(token);
await _output.FlushAsync(cancellationToken);
}

#if NETCOREAPP2_1
public override async Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
_output.Write(source.Span);
await _output.FlushAsync(cancellationToken);
}
#endif

public override void Flush()
{
Expand All @@ -94,7 +109,7 @@ public override Task FlushAsync(CancellationToken cancellationToken)
return WriteAsync(null, 0, 0, cancellationToken);
}

private async Task<int> ReadAsync(ArraySegment<byte> buffer)
private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination)
{
while (true)
{
Expand All @@ -105,9 +120,9 @@ private async Task<int> ReadAsync(ArraySegment<byte> buffer)
if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var count = (int) Math.Min(readableBuffer.Length, buffer.Count);
var count = (int) Math.Min(readableBuffer.Length, destination.Length);
readableBuffer = readableBuffer.Slice(0, count);
readableBuffer.CopyTo(buffer);
readableBuffer.CopyTo(destination.Span);
return count;
}
else if (result.IsCompleted)
Expand Down
18 changes: 9 additions & 9 deletions src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
public class Http1OutputProducer : IHttpOutputProducer
{
private static readonly ArraySegment<byte> _continueBytes = new ArraySegment<byte>(Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n"));
private static readonly ReadOnlyMemory<byte> _continueBytes = new ReadOnlyMemory<byte>(Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n"));
private static readonly byte[] _bytesHttpVersion11 = Encoding.ASCII.GetBytes("HTTP/1.1 ");
private static readonly byte[] _bytesEndHeaders = Encoding.ASCII.GetBytes("\r\n\r\n");
private static readonly ArraySegment<byte> _endChunkedResponseBytes = new ArraySegment<byte>(Encoding.ASCII.GetBytes("0\r\n\r\n"));
private static readonly ReadOnlyMemory<byte> _endChunkedResponseBytes = new ReadOnlyMemory<byte>(Encoding.ASCII.GetBytes("0\r\n\r\n"));

private readonly string _connectionId;
private readonly ITimeoutControl _timeoutControl;
Expand Down Expand Up @@ -53,7 +53,7 @@ public Http1OutputProducer(
_flushCompleted = OnFlushCompleted;
}

public Task WriteDataAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
Expand All @@ -65,7 +65,7 @@ public Http1OutputProducer(

public Task WriteStreamSuffixAsync(CancellationToken cancellationToken)
{
return WriteAsync(_endChunkedResponseBytes, cancellationToken);
return WriteAsync(_endChunkedResponseBytes.Span, cancellationToken);
}

public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -160,11 +160,11 @@ public void Abort(Exception error)

public Task Write100ContinueAsync(CancellationToken cancellationToken)
{
return WriteAsync(_continueBytes, default(CancellationToken));
return WriteAsync(_continueBytes.Span, default(CancellationToken));
}

private Task WriteAsync(
ArraySegment<byte> buffer,
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken)
{
var writableBuffer = default(PipeWriter);
Expand All @@ -178,10 +178,10 @@ private Task WriteAsync(

writableBuffer = _pipeWriter;
var writer = OutputWriter.Create(writableBuffer);
if (buffer.Count > 0)
if (buffer.Length > 0)
{
writer.Write(new ReadOnlySpan<byte>(buffer.Array, buffer.Offset, buffer.Count));
bytesWritten += buffer.Count;
writer.Write(buffer);
bytesWritten += buffer.Length;
}

writableBuffer.Commit();
Expand Down
28 changes: 14 additions & 14 deletions src/Kestrel.Core/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract partial class HttpProtocol : IHttpResponseControl
private static readonly byte[] _bytesConnectionKeepAlive = Encoding.ASCII.GetBytes("\r\nConnection: keep-alive");
private static readonly byte[] _bytesTransferEncodingChunked = Encoding.ASCII.GetBytes("\r\nTransfer-Encoding: chunked");
private static readonly byte[] _bytesServer = Encoding.ASCII.GetBytes("\r\nServer: " + Constants.ServerName);
private static readonly Action<PipeWriter, ArraySegment<byte>> _writeChunk = WriteChunk;
private static readonly Action<PipeWriter, ReadOnlyMemory<byte>> _writeChunk = WriteChunk;

private readonly object _onStartingSync = new Object();
private readonly object _onCompletedSync = new Object();
Expand Down Expand Up @@ -762,14 +762,14 @@ private async Task FlushAsyncAwaited(Task initializeTask, CancellationToken canc
await Output.FlushAsync(cancellationToken);
}

public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken = default(CancellationToken))
public Task WriteAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default(CancellationToken))
{
// For the first write, ensure headers are flushed if WriteDataAsync isn't called.
var firstWrite = !HasResponseStarted;

if (firstWrite)
{
var initializeTask = InitializeResponseAsync(data.Count);
var initializeTask = InitializeResponseAsync(data.Length);
// If return is Task.CompletedTask no awaiting is required
if (!ReferenceEquals(initializeTask, Task.CompletedTask))
{
Expand All @@ -778,14 +778,14 @@ private async Task FlushAsyncAwaited(Task initializeTask, CancellationToken canc
}
else
{
VerifyAndUpdateWrite(data.Count);
VerifyAndUpdateWrite(data.Length);
}

if (_canHaveBody)
{
if (_autoChunk)
{
if (data.Count == 0)
if (data.Length == 0)
{
return !firstWrite ? Task.CompletedTask : FlushAsync(cancellationToken);
}
Expand All @@ -794,7 +794,7 @@ private async Task FlushAsyncAwaited(Task initializeTask, CancellationToken canc
else
{
CheckLastWrite();
return Output.WriteDataAsync(data, cancellationToken: cancellationToken);
return Output.WriteDataAsync(data.Span, cancellationToken: cancellationToken);
}
}
else
Expand All @@ -804,7 +804,7 @@ private async Task FlushAsyncAwaited(Task initializeTask, CancellationToken canc
}
}

public async Task WriteAsyncAwaited(Task initializeTask, ArraySegment<byte> data, CancellationToken cancellationToken)
public async Task WriteAsyncAwaited(Task initializeTask, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
await initializeTask;

Expand All @@ -814,7 +814,7 @@ public async Task WriteAsyncAwaited(Task initializeTask, ArraySegment<byte> data
{
if (_autoChunk)
{
if (data.Count == 0)
if (data.Length == 0)
{
await FlushAsync(cancellationToken);
return;
Expand All @@ -825,7 +825,7 @@ public async Task WriteAsyncAwaited(Task initializeTask, ArraySegment<byte> data
else
{
CheckLastWrite();
await Output.WriteDataAsync(data, cancellationToken: cancellationToken);
await Output.WriteDataAsync(data.Span, cancellationToken: cancellationToken);
}
}
else
Expand Down Expand Up @@ -892,18 +892,18 @@ protected void VerifyResponseContentLength()
}
}

private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
private Task WriteChunkedAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
{
return Output.WriteAsync(_writeChunk, data);
}

private static void WriteChunk(PipeWriter writableBuffer, ArraySegment<byte> buffer)
private static void WriteChunk(PipeWriter writableBuffer, ReadOnlyMemory<byte> buffer)
{
var writer = OutputWriter.Create(writableBuffer);
if (buffer.Count > 0)
if (buffer.Length > 0)
{
ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Count);
writer.Write(new ReadOnlySpan<byte>(buffer.Array, buffer.Offset, buffer.Count));
ChunkWriter.WriteBeginChunkBytes(ref writer, buffer.Length);
writer.Write(buffer.Span);
ChunkWriter.WriteEndChunkBytes(ref writer);
}
}
Expand Down
Loading