Skip to content

Commit 8b612c6

Browse files
authored
Merge pull request #1003 from adamhathcock/adam/async-lzma
async lzma
2 parents d90b610 + f7b3525 commit 8b612c6

18 files changed

Lines changed: 1574 additions & 179 deletions

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<PackageVersion Include="xunit" Version="2.9.3" />
1515
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.5" />
1616
<PackageVersion Include="ZstdSharp.Port" Version="0.8.6" />
17+
<PackageVersion Include="Microsoft.NET.ILLink.Tasks" Version="8.0.21" />
1718
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
1819
<PackageVersion Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
1920
</ItemGroup>

src/SharpCompress/Compressors/LZMA/AesDecoderStream.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System.IO;
33
using System.Security.Cryptography;
44
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57
using SharpCompress.Compressors.LZMA.Utilites;
68
using SharpCompress.IO;
79

@@ -283,5 +285,70 @@ private int HandleUnderflow(byte[] buffer, int offset, int count)
283285
return count;
284286
}
285287

288+
public override async Task<int> ReadAsync(
289+
byte[] buffer,
290+
int offset,
291+
int count,
292+
CancellationToken cancellationToken = default
293+
)
294+
{
295+
if (count == 0 || mWritten == mLimit)
296+
{
297+
return 0;
298+
}
299+
300+
if (mUnderflow > 0)
301+
{
302+
return HandleUnderflow(buffer, offset, count);
303+
}
304+
305+
// Need at least 16 bytes to proceed.
306+
if (mEnding - mOffset < 16)
307+
{
308+
Buffer.BlockCopy(mBuffer, mOffset, mBuffer, 0, mEnding - mOffset);
309+
mEnding -= mOffset;
310+
mOffset = 0;
311+
312+
do
313+
{
314+
cancellationToken.ThrowIfCancellationRequested();
315+
var read = await mStream
316+
.ReadAsync(mBuffer, mEnding, mBuffer.Length - mEnding, cancellationToken)
317+
.ConfigureAwait(false);
318+
if (read == 0)
319+
{
320+
// We are not done decoding and have less than 16 bytes.
321+
throw new EndOfStreamException();
322+
}
323+
324+
mEnding += read;
325+
} while (mEnding - mOffset < 16);
326+
}
327+
328+
// We shouldn't return more data than we are limited to.
329+
if (count > mLimit - mWritten)
330+
{
331+
count = (int)(mLimit - mWritten);
332+
}
333+
334+
// We cannot transform less than 16 bytes into the target buffer,
335+
// but we also cannot return zero, so we need to handle this.
336+
if (count < 16)
337+
{
338+
return HandleUnderflow(buffer, offset, count);
339+
}
340+
341+
if (count > mEnding - mOffset)
342+
{
343+
count = mEnding - mOffset;
344+
}
345+
346+
// Otherwise we transform directly into the target buffer.
347+
var processed = mDecoder.TransformBlock(mBuffer, mOffset, count & ~15, buffer, offset);
348+
mOffset += processed;
349+
mWritten += processed;
350+
return processed;
351+
}
352+
286353
#endregion
287354
}

src/SharpCompress/Compressors/LZMA/Bcj2DecoderStream.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46
using SharpCompress.IO;
57

68
namespace SharpCompress.Compressors.LZMA;
@@ -191,6 +193,18 @@ public override int Read(byte[] buffer, int offset, int count)
191193
return count;
192194
}
193195

196+
public override Task<int> ReadAsync(
197+
byte[] buffer,
198+
int offset,
199+
int count,
200+
CancellationToken cancellationToken = default
201+
)
202+
{
203+
cancellationToken.ThrowIfCancellationRequested();
204+
// Bcj2DecoderStream uses complex state machine with multiple streams
205+
return Task.FromResult(Read(buffer, offset, count));
206+
}
207+
194208
public override int ReadByte()
195209
{
196210
if (_mFinished)

src/SharpCompress/Compressors/LZMA/LZ/LzOutWindow.cs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
using System;
44
using System.Buffers;
55
using System.IO;
6+
using System.Threading;
7+
using System.Threading.Tasks;
68

79
namespace SharpCompress.Compressors.LZMA.LZ;
810

@@ -85,6 +87,12 @@ public void ReleaseStream()
8587
_stream = null;
8688
}
8789

90+
public async Task ReleaseStreamAsync(CancellationToken cancellationToken = default)
91+
{
92+
await FlushAsync(cancellationToken).ConfigureAwait(false);
93+
_stream = null;
94+
}
95+
8896
private void Flush()
8997
{
9098
if (_stream is null)
@@ -104,6 +112,27 @@ private void Flush()
104112
_streamPos = _pos;
105113
}
106114

115+
private async Task FlushAsync(CancellationToken cancellationToken = default)
116+
{
117+
if (_stream is null)
118+
{
119+
return;
120+
}
121+
var size = _pos - _streamPos;
122+
if (size == 0)
123+
{
124+
return;
125+
}
126+
await _stream
127+
.WriteAsync(_buffer, _streamPos, size, cancellationToken)
128+
.ConfigureAwait(false);
129+
if (_pos >= _windowSize)
130+
{
131+
_pos = 0;
132+
}
133+
_streamPos = _pos;
134+
}
135+
107136
public void CopyPending()
108137
{
109138
if (_pendingLen < 1)
@@ -124,6 +153,26 @@ public void CopyPending()
124153
_pendingLen = rem;
125154
}
126155

156+
public async Task CopyPendingAsync(CancellationToken cancellationToken = default)
157+
{
158+
if (_pendingLen < 1)
159+
{
160+
return;
161+
}
162+
var rem = _pendingLen;
163+
var pos = (_pendingDist < _pos ? _pos : _pos + _windowSize) - _pendingDist - 1;
164+
while (rem > 0 && HasSpace)
165+
{
166+
if (pos >= _windowSize)
167+
{
168+
pos = 0;
169+
}
170+
await PutByteAsync(_buffer[pos++], cancellationToken).ConfigureAwait(false);
171+
rem--;
172+
}
173+
_pendingLen = rem;
174+
}
175+
127176
public void CopyBlock(int distance, int len)
128177
{
129178
var rem = len;
@@ -157,6 +206,43 @@ public void CopyBlock(int distance, int len)
157206
_pendingDist = distance;
158207
}
159208

209+
public async Task CopyBlockAsync(
210+
int distance,
211+
int len,
212+
CancellationToken cancellationToken = default
213+
)
214+
{
215+
var rem = len;
216+
var pos = (distance < _pos ? _pos : _pos + _windowSize) - distance - 1;
217+
var targetSize = HasSpace ? (int)Math.Min(rem, _limit - _total) : 0;
218+
var sizeUntilWindowEnd = Math.Min(_windowSize - _pos, _windowSize - pos);
219+
var sizeUntilOverlap = Math.Abs(pos - _pos);
220+
var fastSize = Math.Min(Math.Min(sizeUntilWindowEnd, sizeUntilOverlap), targetSize);
221+
if (fastSize >= 2)
222+
{
223+
_buffer.AsSpan(pos, fastSize).CopyTo(_buffer.AsSpan(_pos, fastSize));
224+
_pos += fastSize;
225+
pos += fastSize;
226+
_total += fastSize;
227+
if (_pos >= _windowSize)
228+
{
229+
await FlushAsync(cancellationToken).ConfigureAwait(false);
230+
}
231+
rem -= fastSize;
232+
}
233+
while (rem > 0 && HasSpace)
234+
{
235+
if (pos >= _windowSize)
236+
{
237+
pos = 0;
238+
}
239+
await PutByteAsync(_buffer[pos++], cancellationToken).ConfigureAwait(false);
240+
rem--;
241+
}
242+
_pendingLen = rem;
243+
_pendingDist = distance;
244+
}
245+
160246
public void PutByte(byte b)
161247
{
162248
_buffer[_pos++] = b;
@@ -167,6 +253,16 @@ public void PutByte(byte b)
167253
}
168254
}
169255

256+
public async Task PutByteAsync(byte b, CancellationToken cancellationToken = default)
257+
{
258+
_buffer[_pos++] = b;
259+
_total++;
260+
if (_pos >= _windowSize)
261+
{
262+
await FlushAsync(cancellationToken).ConfigureAwait(false);
263+
}
264+
}
265+
170266
public byte GetByte(int distance)
171267
{
172268
var pos = _pos - distance - 1;
@@ -207,6 +303,44 @@ public int CopyStream(Stream stream, int len)
207303
return len - size;
208304
}
209305

306+
public async Task<int> CopyStreamAsync(
307+
Stream stream,
308+
int len,
309+
CancellationToken cancellationToken = default
310+
)
311+
{
312+
var size = len;
313+
while (size > 0 && _pos < _windowSize && _total < _limit)
314+
{
315+
cancellationToken.ThrowIfCancellationRequested();
316+
317+
var curSize = _windowSize - _pos;
318+
if (curSize > _limit - _total)
319+
{
320+
curSize = (int)(_limit - _total);
321+
}
322+
if (curSize > size)
323+
{
324+
curSize = size;
325+
}
326+
var numReadBytes = await stream
327+
.ReadAsync(_buffer, _pos, curSize, cancellationToken)
328+
.ConfigureAwait(false);
329+
if (numReadBytes == 0)
330+
{
331+
throw new DataErrorException();
332+
}
333+
size -= numReadBytes;
334+
_pos += numReadBytes;
335+
_total += numReadBytes;
336+
if (_pos >= _windowSize)
337+
{
338+
await FlushAsync(cancellationToken).ConfigureAwait(false);
339+
}
340+
}
341+
return len - size;
342+
}
343+
210344
public void SetLimit(long size) => _limit = _total + size;
211345

212346
public bool HasSpace => _pos < _windowSize && _total < _limit;

src/SharpCompress/Compressors/LZMA/LZipStream.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Buffers.Binary;
33
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46
using SharpCompress.Common;
57
using SharpCompress.Crypto;
68
using SharpCompress.IO;
@@ -157,6 +159,11 @@ public override int Read(byte[] buffer, int offset, int count) =>
157159

158160
#if !NETFRAMEWORK && !NETSTANDARD2_0
159161

162+
public override ValueTask<int> ReadAsync(
163+
Memory<byte> buffer,
164+
CancellationToken cancellationToken = default
165+
) => _stream.ReadAsync(buffer, cancellationToken);
166+
160167
public override int Read(Span<byte> buffer) => _stream.Read(buffer);
161168

162169
public override void Write(ReadOnlySpan<byte> buffer)
@@ -179,6 +186,25 @@ public override void WriteByte(byte value)
179186
++_writeCount;
180187
}
181188

189+
public override Task<int> ReadAsync(
190+
byte[] buffer,
191+
int offset,
192+
int count,
193+
CancellationToken cancellationToken = default
194+
) => _stream.ReadAsync(buffer, offset, count, cancellationToken);
195+
196+
public override async Task WriteAsync(
197+
byte[] buffer,
198+
int offset,
199+
int count,
200+
CancellationToken cancellationToken
201+
)
202+
{
203+
cancellationToken.ThrowIfCancellationRequested();
204+
await _stream.WriteAsync(buffer, offset, count, cancellationToken);
205+
_writeCount += count;
206+
}
207+
182208
#endregion
183209

184210
/// <summary>

0 commit comments

Comments
 (0)