Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 208 additions & 36 deletions src/SharpCompress/IO/BufferedSubStream.cs
Original file line number Diff line number Diff line change
@@ -1,83 +1,255 @@
using System;
#nullable disable

using System;
using System.IO;

namespace SharpCompress.IO
{
internal class BufferedSubStream : NonDisposingStream
{
private long position;
private int cacheOffset;
private int cacheLength;
private readonly byte[] cache;
private const int DEFAULT_BUFFER_SIZE = 32768;
private readonly long startIndexInBaseStream;
private readonly long endIndexInBaseStream;
private long positionInBaseStream;
private readonly long length;
private readonly Stream baseStream;
private bool canRead;
private bool disposedValue;

private readonly int bufferSize;
private int bufferOffset;
private int bufferLength;
private byte[] buffer;

public BufferedSubStream(Stream stream) : this(stream, stream.Position, stream.Length - stream.Position, DEFAULT_BUFFER_SIZE)
{
}

public BufferedSubStream(Stream stream, long origin, long bytesToRead) : base(stream, throwOnDispose: false)
public BufferedSubStream(Stream stream, int bufferSize) : this(stream, stream.Position, stream.Length - stream.Position, bufferSize)
{
position = origin;
BytesLeftToRead = bytesToRead;
cache = new byte[32 << 10];
}

private long BytesLeftToRead { get; set; }
public BufferedSubStream(Stream stream, long startIndex, long length) : this(stream, startIndex, length, DEFAULT_BUFFER_SIZE)
{
}

public override bool CanRead => true;
public BufferedSubStream(Stream stream, long startIndex, long length, int bufferSize) : base(stream, false)
{
if (stream == null)
throw new ArgumentNullException("stream");

if (!stream.CanRead)
throw new NotSupportedException("A stream that can be read is required");

if (!stream.CanSeek)
throw new NotSupportedException("A stream that supports seeking is required");

this.endIndexInBaseStream = startIndex + length;
if (this.endIndexInBaseStream > stream.Length)
throw new ArgumentException("length");

this.baseStream = stream;
this.startIndexInBaseStream = startIndex;
this.positionInBaseStream = startIndex;
this.length = length;
this.bufferOffset = 0;
this.bufferLength = 0;
this.buffer = null;
this.bufferSize = bufferSize <= 4096 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.canRead = true;
this.disposedValue = false;
}

public override bool CanSeek => false;
public Stream BaseStream
{
get
{
ThrowIfDisposed();
return this.baseStream;
}
}

public override bool CanWrite => false;
public override long Length
{
get
{
ThrowIfDisposed();
return this.length;
}
}

public override void Flush()
public override long Position
{
throw new NotSupportedException();
get
{
ThrowIfDisposed();
return positionInBaseStream - startIndexInBaseStream - bufferLength + bufferOffset;
}
set
{
ThrowIfDisposed();
if (value == Position)
return;
Seek(value, SeekOrigin.Begin);
}
}

public override long Length => BytesLeftToRead;
public override bool CanRead => canRead && baseStream.CanRead;

public override bool CanSeek => true;

public override bool CanWrite => false;

private void ThrowIfDisposed()
{
if (disposedValue)
throw new ObjectDisposedException(GetType().ToString());
}

public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
private void ThrowIfCantRead()
{
if (!CanRead)
throw new NotSupportedException("This stream does not support reading");
}

public override int Read(byte[] buffer, int offset, int count)
public override int Read(byte[] array, int offset, int count)
{
if (count > BytesLeftToRead)
if (array == null)
throw new ArgumentNullException("array");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset", "offset < 0");
if (count < 0)
throw new ArgumentOutOfRangeException("count", "count < 0");
if (array.Length - offset < count)
throw new ArgumentException("The size of the array is not enough.");

ThrowIfDisposed();
ThrowIfCantRead();

if (count == 0)
return 0;

int bytesFromBuffer = ReadFromBuffer(array, offset, count);

if (bytesFromBuffer == count)
return bytesFromBuffer;

int alreadySatisfied = bytesFromBuffer;
if (bytesFromBuffer > 0)
{
count = (int)BytesLeftToRead;
count -= bytesFromBuffer;
offset += bytesFromBuffer;
}

if (count > 0)
bufferOffset = bufferLength = 0;
lock (baseStream)
{
if (cacheLength == 0)
{
cacheOffset = 0;
Stream.Position = position;
cacheLength = Stream.Read(cache, 0, cache.Length);
position += cacheLength;
}
long remaining = endIndexInBaseStream - positionInBaseStream;
if (remaining <= 0)
return 0;

if (count > remaining)
count = (int)remaining;

if (count > cacheLength)
if (baseStream.Position != positionInBaseStream)
baseStream.Seek(positionInBaseStream, SeekOrigin.Begin);

if (count >= this.bufferSize)
{
count = cacheLength;
int read = baseStream.Read(array, offset, count);
this.positionInBaseStream += read;
return read + alreadySatisfied;
}

Buffer.BlockCopy(cache, cacheOffset, buffer, offset, count);
cacheOffset += count;
cacheLength -= count;
BytesLeftToRead -= count;
if (buffer == null)
buffer = new byte[bufferSize];

bufferLength = baseStream.Read(buffer, 0, (int)Math.Min(remaining, bufferSize));
if (bufferLength < 0)
throw new EndOfStreamException();
this.positionInBaseStream += bufferLength;
}

return count;
bytesFromBuffer = ReadFromBuffer(array, offset, count);
return bytesFromBuffer + alreadySatisfied;
}

private int ReadFromBuffer(byte[] array, int offset, int count)
{
int readBytes = bufferLength - bufferOffset;
if (readBytes == 0)
return 0;

if (readBytes > count)
readBytes = count;

Buffer.BlockCopy(buffer, bufferOffset, array, offset, readBytes);
bufferOffset += readBytes;
return readBytes;
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
ThrowIfDisposed();
long newPos = 0;
switch (origin)
{
case SeekOrigin.Begin:
newPos = offset;
break;
case SeekOrigin.Current:
newPos = this.Position + offset;
break;
case SeekOrigin.End:
newPos = this.length - offset;
break;
}

long newPosInBaseStream = startIndexInBaseStream + newPos;
if (positionInBaseStream - bufferLength <= newPosInBaseStream && newPosInBaseStream <= positionInBaseStream)
{
bufferOffset = (int)(newPosInBaseStream - (positionInBaseStream - bufferLength));
}
else
{
bufferLength = 0;
bufferOffset = 0;
this.positionInBaseStream = newPosInBaseStream;
}

return newPos;
}

public override void SetLength(long value)
{
ThrowIfDisposed();
throw new NotSupportedException();
}

public override void Write(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
throw new NotSupportedException();
}

public override void Flush()
{
ThrowIfDisposed();
throw new NotSupportedException();
}

protected override void Dispose(bool disposing)
{
if (disposing && !disposedValue)
{
this.canRead = false;
this.disposedValue = true;
if (this.buffer != null)
this.buffer = null;
this.bufferLength = 0;
this.bufferOffset = 0;
}
base.Dispose(disposing);
}
}
}
Loading