Skip to content

Commit 25990d0

Browse files
authored
firehose-retry-support (#424)
1 parent d62f6e8 commit 25990d0

4 files changed

Lines changed: 155 additions & 4 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.11",
3+
"version": "1.1.12",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/connectors/firehose.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */
22
import { FirehoseClient, PutRecordBatchCommand } from '@aws-sdk/client-firehose';
33
import { NodeHttpHandler } from '@smithy/node-http-handler';
4+
import { ConfiguredRetryStrategy } from '@smithy/util-retry';
45
import Promise from 'bluebird';
6+
57
import { omit, pick } from 'lodash';
8+
import {
9+
defaultRetryConfig, wait, getDelay, assertMaxRetries, defaultBackoffDelay,
10+
} from '../utils/retry';
611
import { defaultDebugLogger } from '../utils/log';
712

813
class Connector {
@@ -11,12 +16,14 @@ class Connector {
1116
pipelineId,
1217
deliveryStreamName = process.env.DELIVERY_STREAM_NAME,
1318
timeout = Number(process.env.FIREHOSE_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
19+
retryConfig = defaultRetryConfig,
1420
additionalClientOpts = {},
1521
...opt
1622
}) {
1723
this.debug = (msg) => debug('%j', msg);
1824
this.deliveryStreamName = deliveryStreamName || 'undefined';
1925
this.client = Connector.getClient(pipelineId, debug, timeout, additionalClientOpts);
26+
this.retryConfig = retryConfig;
2027
this.opt = opt;
2128
}
2229

@@ -33,6 +40,7 @@ class Connector {
3340
connectionTimeout: timeout,
3441
...addlRequestHandlerOpts,
3542
}),
43+
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
3644
logger: defaultDebugLogger(debug),
3745
...addlClientOpts,
3846
});
@@ -46,7 +54,23 @@ class Connector {
4654
...inputParams,
4755
};
4856

49-
return this._sendCommand(new PutRecordBatchCommand(params), ctx);
57+
return this._putRecordBatch(params, [], ctx);
58+
}
59+
60+
_putRecordBatch(params, attempts, ctx) {
61+
assertMaxRetries(attempts, this.retryConfig.maxRetries);
62+
63+
return wait(getDelay(this.retryConfig.retryWait, attempts.length))
64+
.then(() => this._sendCommand(new PutRecordBatchCommand(params))
65+
.tap(this.debug)
66+
.tapCatch(this.debug)
67+
.then((resp) => {
68+
if (resp.FailedPutCount > 0) {
69+
return this._putRecordBatch(unprocessed(params, resp), [...attempts, resp]);
70+
} else {
71+
return accumlate(attempts, resp);
72+
}
73+
}));
5074
}
5175

5276
_sendCommand(command, ctx) {
@@ -58,3 +82,17 @@ class Connector {
5882
}
5983

6084
export default Connector;
85+
86+
const unprocessed = (params, resp) => ({
87+
...params,
88+
Records: params.Records.filter((e, i) => resp.RequestResponses[i].ErrorCode),
89+
});
90+
91+
const accumlate = (attempts, resp) => attempts.reduceRight((a, c) => ({
92+
...a,
93+
RequestResponses: [
94+
...c.RequestResponses.filter((r) => !r.ErrorCode),
95+
...a.RequestResponses.filter((r) => !r.ErrorCode),
96+
],
97+
attempts: [...attempts, resp],
98+
}), resp);

test/unit/connectors/firehose.test.js

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,117 @@ describe('connectors/firehose.js', () => {
5151
});
5252
expect(data).to.deep.equal({});
5353
});
54+
55+
it('should retry', async () => {
56+
const responses = [
57+
{ RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }], FailedPutCount: 2 },
58+
{ RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }], FailedPutCount: 1 },
59+
{ RequestResponses: [{ SequenceNumber: '3' }], FailedPutCount: 0 },
60+
];
61+
62+
const spy = sinon.spy((_) => responses.shift());
63+
mockFirehose.on(PutRecordBatchCommand).callsFake(spy);
64+
65+
const inputParams = {
66+
Records: [
67+
{
68+
Data: Buffer.from(JSON.stringify({ type: 't1' })),
69+
},
70+
{
71+
Data: Buffer.from(JSON.stringify({ type: 't2' })),
72+
},
73+
{
74+
Data: Buffer.from(JSON.stringify({ type: 't3' })),
75+
},
76+
],
77+
};
78+
79+
const data = await new Connector({
80+
debug: debug('firehose'),
81+
deliveryStreamName: 'ds1',
82+
}).putRecordBatch(inputParams);
83+
84+
expect(spy).to.have.been.calledWith({
85+
DeliveryStreamName: 'ds1',
86+
Records: [inputParams.Records[0], inputParams.Records[1], inputParams.Records[2]],
87+
});
88+
expect(spy).to.have.been.calledWith({
89+
DeliveryStreamName: 'ds1',
90+
Records: [inputParams.Records[1], inputParams.Records[2]],
91+
});
92+
expect(spy).to.have.been.calledWith({
93+
DeliveryStreamName: 'ds1',
94+
Records: [inputParams.Records[2]],
95+
});
96+
97+
expect(data).to.deep.equal({
98+
RequestResponses: [{ SequenceNumber: '1' }, { SequenceNumber: '2' }, { SequenceNumber: '3' }],
99+
FailedPutCount: 0,
100+
attempts: [
101+
{
102+
RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }],
103+
FailedPutCount: 2,
104+
},
105+
{
106+
RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }],
107+
FailedPutCount: 1,
108+
},
109+
{
110+
RequestResponses: [{ SequenceNumber: '3' }],
111+
FailedPutCount: 0,
112+
},
113+
],
114+
});
115+
});
116+
117+
it('should throw on max retry', async () => {
118+
const responses = [
119+
{ RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }], FailedPutCount: 2 },
120+
{ RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }], FailedPutCount: 1 },
121+
];
122+
123+
const spy = sinon.spy((_) => responses.shift());
124+
mockFirehose.on(PutRecordBatchCommand).callsFake(spy);
125+
126+
const inputParams = {
127+
Records: [
128+
{
129+
Data: Buffer.from(JSON.stringify({ type: 't1' })),
130+
},
131+
{
132+
Data: Buffer.from(JSON.stringify({ type: 't2' })),
133+
},
134+
{
135+
Data: Buffer.from(JSON.stringify({ type: 't3' })),
136+
},
137+
],
138+
};
139+
140+
await new Connector({
141+
debug: debug('firehose'),
142+
deliveryStreamName: 'ds1',
143+
retryConfig: {
144+
maxRetries: 1,
145+
retryWait: 100,
146+
},
147+
}).putRecordBatch(inputParams)
148+
.then(() => {
149+
expect.fail('should have thrown');
150+
}).catch((err) => {
151+
expect(spy).to.have.been.calledWith({
152+
DeliveryStreamName: 'ds1',
153+
Records: [inputParams.Records[0], inputParams.Records[1], inputParams.Records[2]],
154+
});
155+
expect(spy).to.have.been.calledWith({
156+
DeliveryStreamName: 'ds1',
157+
Records: [inputParams.Records[1], inputParams.Records[2]],
158+
});
159+
expect(spy).to.not.have.been.calledWith({
160+
DeliveryStreamName: 'ds1',
161+
Records: [inputParams.Records[2]],
162+
});
163+
164+
expect(err.message).to.contain('Failed batch requests');
165+
});
166+
});
54167
});

0 commit comments

Comments
 (0)