Skip to content

Commit f3d7c05

Browse files
committed
[s] address comments
1 parent a39057f commit f3d7c05

2 files changed

Lines changed: 39 additions & 19 deletions

File tree

lib/s3middleware/azureHelpers/SubStreamInterface.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
const stream = require('stream');
22

3+
class SubStream extends stream.PassThrough {
4+
constructor(options) {
5+
super(options);
6+
7+
this.on('stopStreamingToAzure', function stopStreamingToAzure() {
8+
this._abortStreaming();
9+
});
10+
}
11+
12+
_abortStreaming() {
13+
this.push(null);
14+
this.end();
15+
}
16+
}
17+
318
/**
419
* Interface for streaming subparts.
520
* @class SubStreamInterface
@@ -14,7 +29,7 @@ class SubStreamInterface {
1429
this._totalLengthCounter = 0;
1530
this._lengthCounter = 0;
1631
this._subPartIndex = 0;
17-
this._currentStream = new stream.PassThrough();
32+
this._currentStream = new SubStream();
1833
this._streamingAborted = false;
1934
}
2035

@@ -55,13 +70,8 @@ class SubStreamInterface {
5570
this._streamingAborted = true;
5671
if (piper) {
5772
piper.unpipe();
58-
piper.push(null);
5973
}
60-
this._sourceStream.pause();
61-
this._sourceStream.push(null);
62-
this._sourceStream.end();
63-
this._currentStream.push(null);
64-
this._currentStream.end();
74+
this._currentStream.emit('stopStreamingToAzure');
6575
}
6676

6777
/**
@@ -102,7 +112,7 @@ class SubStreamInterface {
102112
this._totalLengthCounter += this._lengthCounter;
103113
this._lengthCounter = 0;
104114
this._subPartIndex++;
105-
this._currentStream = new stream.PassThrough();
115+
this._currentStream = new SubStream();
106116
this.resumeStreaming();
107117
return {
108118
nextStream: this._currentStream,

tests/unit/s3middleware/azureHelpers/SubStreamingInterface.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ const SubStreamInterface =
44
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');
55

66
describe('s3middleware SubStreamInterface.stopStreaming()', () => {
7-
const emittedFinished = {
8-
sourceStream: false,
9-
currentStream: false,
7+
const eventsEmitted = {
8+
sourceStreamUnpiped: false,
9+
currentStreamStopStreamingToAzure: false,
10+
currentStreamEnded: false,
11+
};
12+
const expectedSequence = {
13+
sourceStreamUnpiped: 0,
14+
currentStreamStopStreamingToAzure: 1,
15+
currentStreamEnded: 2,
1016
};
1117
const data = Buffer.alloc(100);
1218
let dataMarker = 0;
19+
let eventSequence = 0;
1320
const mockRequest = new stream.Readable({
1421
read: () => {
1522
if (dataMarker >= data.length) {
@@ -22,18 +29,21 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => {
2229
});
2330
const sourceStream = new stream.PassThrough();
2431
const subStreamInterface = new SubStreamInterface(sourceStream);
25-
sourceStream.on('finish', () => {
26-
emittedFinished.sourceStream = true;
32+
sourceStream.on('unpipe', () => {
33+
eventsEmitted.sourceStreamUnpiped = eventSequence++;
34+
});
35+
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
36+
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
2737
});
2838
subStreamInterface._currentStream.on('finish', () => {
29-
emittedFinished.currentStream = true;
39+
eventsEmitted.currentStreamEnded = eventSequence++;
3040
});
31-
it('should stop streaming data and end all streams', done => {
41+
it('should stop streaming data and end current stream', done => {
3242
sourceStream.on('data', chunk => {
3343
const currentLength = subStreamInterface.getLengthCounter();
3444
if (currentLength === 10) {
35-
Object.keys(emittedFinished).forEach(key => {
36-
assert.strictEqual(emittedFinished[key], false);
45+
Object.keys(eventsEmitted).forEach(key => {
46+
assert.strictEqual(eventsEmitted[key], false);
3747
});
3848
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
3949
return subStreamInterface.stopStreaming(mockRequest);
@@ -42,8 +52,8 @@ describe('s3middleware SubStreamInterface.stopStreaming()', () => {
4252
});
4353
mockRequest.pipe(sourceStream);
4454
setTimeout(() => {
45-
Object.keys(emittedFinished).forEach(key => {
46-
assert.strictEqual(emittedFinished[key], true);
55+
Object.keys(eventsEmitted).forEach(key => {
56+
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
4757
});
4858
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
4959
assert.strictEqual(mockRequest._readableState.pipesCount, 0);

0 commit comments

Comments
 (0)