Skip to content

Commit 2a7dbc1

Browse files
authored
fix: handle FENCED_LEADER_EPOCH and undefined entries in MultipleErrors (#250)
* fix: handle FENCED_LEADER_EPOCH and undefined entries in MultipleErrors When Kafka broker leadership changes, the consumer receives FENCED_LEADER_EPOCH errors but never refreshes metadata because this error code was not in the hasStaleMetadata list. Additionally, runConcurrentCallbacks creates MultipleErrors with undefined entries for successful operations, causing findBy() to crash with a TypeError. - Add FENCED_LEADER_EPOCH to hasStaleMetadata in ProtocolError - Filter undefined entries from errors array in runConcurrentCallbacks - Add null guard in MultipleErrors.findBy() for defensive safety Signed-off-by: Tristan Burch <tristan@day.ai> * fix: update callback tests to match filtered MultipleErrors behavior The errors array is now filtered with .filter(Boolean) to remove undefined entries, so test assertions need to expect only actual error entries. Signed-off-by: Tristan Burch <tristan@day.ai> * fix: clear stale leader epochs on FENCED_LEADER_EPOCH during fetch When Kafka returns FENCED_LEADER_EPOCH during a fetch, the consumer refreshes metadata but the cached partition epochs remain stale. The next fetch sends the old epoch to the new leader, triggering another FENCED_LEADER_EPOCH in an infinite loop until retries are exhausted. Fix by wrapping the fetch retry callback to detect stale metadata errors: clear cached metadata and reset currentLeaderEpoch and lastFetchedEpoch to -1 so the retry uses fresh epoch data from the broker. Also clear partitionsEpochs on consumer group rejoin to prevent stale epochs from persisting across rebalances. Signed-off-by: Tristan Burch <tristan@day.ai> * fix: narrow fetch retry to only clear epochs on FENCED_LEADER_EPOCH The fetchCallback wrapper used hasStaleMetadata to decide when to clear cached metadata and partition epochs. This matched UNKNOWN_TOPIC_OR_PARTITION in addition to FENCED_LEADER_EPOCH, causing deleted-topic errors to trigger a metadata refresh that wrapped the error as a non-retryable UserError instead of the expected ProtocolError. This broke the "should properly handle deleting topics in between" test. Narrow the check to only match FENCED_LEADER_EPOCH (the specific error this wrapper was introduced to fix) so other stale-metadata errors continue through the existing retry path in base.ts. Signed-off-by: Tristan Burch <tristan@day.ai> * fix: inline fetch callback in consumer Signed-off-by: Tristan Burch <tristan@day.ai> * fix: use metadata leader epoch instead of batch epoch for partitionsEpochs Use the latest leader epoch from metadata rather than the potentially stale epoch from the batch data when updating partitionsEpochs. This prevents unnecessary FENCED_LEADER_EPOCH retry cycles after leader elections. Signed-off-by: Tristan Burch <tristan@day.ai> --------- Signed-off-by: Tristan Burch <tristan@day.ai>
1 parent d23ec64 commit 2a7dbc1

7 files changed

Lines changed: 139 additions & 8 deletions

File tree

src/apis/callbacks.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export function runConcurrentCallbacks<ReturnType> (
7171
remaining--
7272

7373
if (remaining === 0) {
74-
callback(hasErrors ? new MultipleErrors(errorMessage, errors) : null, results)
74+
callback(hasErrors ? new MultipleErrors(errorMessage, errors.filter(Boolean) as Error[]) : null, results)
7575
}
7676
}
7777

src/clients/consumer/consumer.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,22 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
789789
options.topics,
790790
[],
791791
'',
792-
retryCallback
792+
(error, result) => {
793+
if (error) {
794+
const genericError = error as GenericError
795+
if (genericError.findBy?.('apiId', 'FENCED_LEADER_EPOCH')) {
796+
this.clearMetadata()
797+
for (const topic of options.topics) {
798+
for (const partition of topic.partitions) {
799+
partition.currentLeaderEpoch = -1
800+
partition.lastFetchedEpoch = -1
801+
}
802+
}
803+
}
804+
}
805+
806+
retryCallback(error, result)
807+
}
793808
)
794809
})
795810
})

src/clients/consumer/messages-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
211211
// having some.
212212
this.#consumer.on('consumer:group:join', () => {
213213
this.#offsetsCommitted.clear()
214+
this.#partitionsEpochs.clear()
214215
this.#scheduleRefreshOffsetsAndFetch()
215216
})
216217

@@ -616,7 +617,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
616617
const firstOffset = batch.firstOffset
617618
const leaderEpoch = metadata.topics.get(topic)!.partitions[partition].leaderEpoch
618619

619-
this.#partitionsEpochs.set(`${topic}:${partition}`, batch.partitionLeaderEpoch)
620+
this.#partitionsEpochs.set(`${topic}:${partition}`, leaderEpoch)
620621

621622
// Track offsets
622623
if (batch === recordsBatches[recordsBatches.length - 1]) {

src/errors.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ export class MultipleErrors extends AggregateError {
112112
}
113113

114114
for (const error of this.errors) {
115+
if (!error) {
116+
continue
117+
}
118+
115119
if (error[property] === value) {
116120
return error as unknown as ErrorType
117121
}
@@ -162,7 +166,7 @@ export class ProtocolError extends GenericError {
162166
apiCode: code,
163167
serverErrorMessage,
164168
canRetry,
165-
hasStaleMetadata: ['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_OR_FOLLOWER'].includes(id),
169+
hasStaleMetadata: ['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_OR_FOLLOWER', 'FENCED_LEADER_EPOCH'].includes(id),
166170
needsRejoin: ['MEMBER_ID_REQUIRED', 'UNKNOWN_MEMBER_ID', 'REBALANCE_IN_PROGRESS'].includes(id),
167171
producerFenced: id === 'INVALID_PRODUCER_EPOCH',
168172
rebalanceInProgress: id === 'REBALANCE_IN_PROGRESS',

test/apis/callbacks.test.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { deepStrictEqual, ok } from 'assert'
2+
import { test } from 'node:test'
3+
import { GenericError, MultipleErrors } from '../../src/errors.ts'
4+
import { runConcurrentCallbacks } from '../../src/apis/callbacks.ts'
5+
6+
test('runConcurrentCallbacks - all operations succeed', (_, done) => {
7+
const collection = ['a', 'b', 'c']
8+
9+
runConcurrentCallbacks<string, string>(
10+
'should not error',
11+
collection,
12+
(item, cb) => {
13+
cb(null, item.toUpperCase())
14+
},
15+
(error, results) => {
16+
deepStrictEqual(error, null)
17+
deepStrictEqual(results, ['A', 'B', 'C'])
18+
done()
19+
}
20+
)
21+
})
22+
23+
test('runConcurrentCallbacks - all operations fail', (_, done) => {
24+
const collection = ['a', 'b']
25+
26+
runConcurrentCallbacks<string, string>(
27+
'all failed',
28+
collection,
29+
(item, cb) => {
30+
cb(new GenericError('PLT_KFK_USER', `failed: ${item}`))
31+
},
32+
(error, results) => {
33+
ok(error)
34+
ok(MultipleErrors.isMultipleErrors(error))
35+
const multi = error as MultipleErrors
36+
deepStrictEqual(multi.errors.length, 2)
37+
deepStrictEqual(multi.errors[0].message, 'failed: a')
38+
deepStrictEqual(multi.errors[1].message, 'failed: b')
39+
done()
40+
}
41+
)
42+
})
43+
44+
test('runConcurrentCallbacks - partial failures should not contain undefined in errors', (_, done) => {
45+
const collection = ['a', 'b', 'c']
46+
47+
runConcurrentCallbacks<string, string>(
48+
'partial failure',
49+
collection,
50+
(item, cb) => {
51+
if (item === 'b') {
52+
cb(new GenericError('PLT_KFK_USER', 'failed: b'))
53+
} else {
54+
cb(null, item.toUpperCase())
55+
}
56+
},
57+
(error, results) => {
58+
ok(error)
59+
ok(MultipleErrors.isMultipleErrors(error))
60+
const multi = error as MultipleErrors
61+
62+
// The errors array should only contain actual errors, no undefined entries
63+
deepStrictEqual(multi.errors.length, 1)
64+
deepStrictEqual(multi.errors[0].message, 'failed: b')
65+
66+
for (const e of multi.errors) {
67+
ok(e !== undefined, 'errors array should not contain undefined entries')
68+
ok(e instanceof Error, 'every entry should be an Error')
69+
}
70+
71+
// Successful results should still be available
72+
deepStrictEqual(results[0], 'A')
73+
deepStrictEqual(results[2], 'C')
74+
done()
75+
}
76+
)
77+
})
78+
79+
test('runConcurrentCallbacks - empty collection', (_, done) => {
80+
runConcurrentCallbacks<string, string>(
81+
'empty',
82+
[],
83+
(_item, _cb) => {
84+
throw new Error('should not be called')
85+
},
86+
(error, results) => {
87+
deepStrictEqual(error, null)
88+
deepStrictEqual(results, [])
89+
done()
90+
}
91+
)
92+
})

test/clients/callbacks.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ test('runConcurrentCallbacks handles errors correctly', async () => {
134134

135135
if (error instanceof MultipleErrors) {
136136
strictEqual(error.message, 'Test error message')
137-
strictEqual(error.errors.length, 3)
138-
strictEqual(error.errors[1]?.message, 'Test operation error')
137+
strictEqual(error.errors.length, 1)
138+
strictEqual(error.errors[0]?.message, 'Test operation error')
139139
}
140140

141141
resolve()
@@ -185,9 +185,9 @@ test('runConcurrentCallbacks with multiple errors', async () => {
185185

186186
if (error instanceof MultipleErrors) {
187187
strictEqual(error.message, 'Multiple errors occurred')
188-
strictEqual(error.errors.length, 3)
188+
strictEqual(error.errors.length, 2)
189189
strictEqual(error.errors[0]?.message, 'Error from error-1')
190-
strictEqual(error.errors[2]?.message, 'Error from error-3')
190+
strictEqual(error.errors[1]?.message, 'Error from error-3')
191191
}
192192

193193
resolve()

test/errors.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,22 @@ test('UserError', () => {
265265
deepStrictEqual(error.param, 'clientId')
266266
ok(GenericError.isGenericError(error))
267267
})
268+
269+
test('ProtocolError FENCED_LEADER_EPOCH has hasStaleMetadata', () => {
270+
const error = new ProtocolError('FENCED_LEADER_EPOCH')
271+
deepStrictEqual(error.code, 'PLT_KFK_PROTOCOL')
272+
deepStrictEqual(error.apiId, 'FENCED_LEADER_EPOCH')
273+
deepStrictEqual(error.hasStaleMetadata, true)
274+
})
275+
276+
test('MultipleErrors.findBy - handles undefined entries in errors array', () => {
277+
const error1 = new GenericError('PLT_KFK_USER', 'error1', { foo: 'bar' })
278+
279+
// Simulate undefined entries as would happen with pre-allocated sparse arrays
280+
const errors = [undefined, error1, undefined] as unknown as Error[]
281+
const multiError = new MultipleErrors('with undefined entries', errors)
282+
283+
// findBy should not crash and should find the valid error
284+
deepStrictEqual(multiError.findBy('foo', 'bar'), error1)
285+
deepStrictEqual(multiError.findBy('unknown', 'value'), null)
286+
})

0 commit comments

Comments
 (0)