Skip to content

Commit 6ec78be

Browse files
manastPavelPashov
andauthored
feat: add timeout blocking commands (#2052)
* fix: add blockingTimeout option for blocking commands * docs: update readme with blockingTimeout * fix: resolve blocking commands with null on timeout instead of hanging - Blocking commands (BLPOP, BRPOP, BZPOPMIN, XREAD, etc.) now resolve with `null` when their timeout expires without a response, matching Redis behavior and preventing indefinite hangs during undetectable network failures - Added `blockingTimeout` option as a safety net for commands that block forever (timeout=0 / BLOCK 0) - Automatically extracts and tracks timeout from command arguments, adding a 100ms grace period for finite timeouts - Protects commands in both offline queue and command queue with absolute deadline tracking - Simplified setBlockingTimeout API to resolve with null (no callback, no error rejection) * refactor(command): move blocking timeout extraction to Command class * fix: blocking timeout reamde and add more unit tests * test: add more unit and functional tests * feat: add configurable blockingTimeoutGrace option --------- Co-authored-by: Pavel Pashov <pavel.pashov@redis.com>
1 parent a523f3a commit 6ec78be

7 files changed

Lines changed: 841 additions & 13 deletions

File tree

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,20 @@ const redis = new Redis({
798798

799799
Set maxRetriesPerRequest to `null` to disable this behavior, and every command will wait forever until the connection is alive again (which is the default behavior before ioredis v4).
800800

801+
### Blocking Command Timeout
802+
803+
ioredis can apply a client-side timeout to blocking commands (such as `blpop`, `brpop`, `bzpopmin`, `bzmpop`, `blmpop`, `xread`, `xreadgroup`, etc.). This protects against scenarios where the TCP connection becomes a zombie (e.g., due to a silent network failure like a Docker network disconnect) and Redis never replies.
804+
805+
For commands with a finite timeout (e.g., `blpop("key", 5)`), ioredis automatically sets a client-side deadline based on the command's timeout plus a small grace period. If no reply arrives before the deadline, the command resolves with `null`—the same value Redis returns when a blocking command times out normally.
806+
807+
For commands that intentionally block forever (e.g., `timeout = 0` or `BLOCK 0`), you can provide a safety net via the optional `blockingTimeout` option (milliseconds):
808+
809+
```javascript
810+
const redis = new Redis({
811+
blockingTimeout: 30000, // Resolve with null after 30 seconds when timeout=0/BLOCK 0
812+
});
813+
```
814+
801815
### Reconnect on Error
802816

803817
Besides auto-reconnect when the connection is closed, ioredis supports reconnecting on certain Redis errors using the `reconnectOnError` option. Here's an example that will reconnect when receiving `READONLY` error:

lib/Command.ts

Lines changed: 154 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import {
99
convertObjectToArray,
1010
} from "./utils";
1111
import { Callback, Respondable, CommandParameter } from "./types";
12+
import {
13+
parseBlockOption,
14+
parseSecondsArgument,
15+
} from "./utils/argumentParsers";
1216

1317
export type ArgumentType =
1418
| string
@@ -59,6 +63,32 @@ export interface CommandNameFlags {
5963
HANDSHAKE_COMMANDS: ["auth", "select", "client", "readonly", "info"];
6064
// Commands that should not trigger a reconnection when errors occur
6165
IGNORE_RECONNECT_ON_ERROR: ["client"];
66+
// Commands that block
67+
BLOCKING_COMMANDS: [
68+
"blpop",
69+
"brpop",
70+
"brpoplpush",
71+
"blmove",
72+
"bzpopmin",
73+
"bzpopmax",
74+
"bzmpop",
75+
"blmpop",
76+
"xread",
77+
"xreadgroup"
78+
];
79+
// Commands that have timeout as the last argument
80+
LAST_ARG_TIMEOUT_COMMANDS: [
81+
"blpop",
82+
"brpop",
83+
"brpoplpush",
84+
"blmove",
85+
"bzpopmin",
86+
"bzpopmax"
87+
];
88+
// Commands that have timeout as the first argument
89+
FIRST_ARG_TIMEOUT_COMMANDS: ["bzmpop", "blmpop"];
90+
// Commands that have BLOCK option
91+
BLOCK_OPTION_COMMANDS: ["xread", "xreadgroup"];
6292
}
6393

6494
/**
@@ -101,6 +131,28 @@ export default class Command implements Respondable {
101131
WILL_DISCONNECT: ["quit"],
102132
HANDSHAKE_COMMANDS: ["auth", "select", "client", "readonly", "info"],
103133
IGNORE_RECONNECT_ON_ERROR: ["client"],
134+
BLOCKING_COMMANDS: [
135+
"blpop",
136+
"brpop",
137+
"brpoplpush",
138+
"blmove",
139+
"bzpopmin",
140+
"bzpopmax",
141+
"bzmpop",
142+
"blmpop",
143+
"xread",
144+
"xreadgroup",
145+
],
146+
LAST_ARG_TIMEOUT_COMMANDS: [
147+
"blpop",
148+
"brpop",
149+
"brpoplpush",
150+
"blmove",
151+
"bzpopmin",
152+
"bzpopmax",
153+
],
154+
FIRST_ARG_TIMEOUT_COMMANDS: ["bzmpop", "blmpop"],
155+
BLOCK_OPTION_COMMANDS: ["xread", "xreadgroup"],
104156
};
105157

106158
private static flagMap?: FlagMap;
@@ -165,6 +217,8 @@ export default class Command implements Respondable {
165217
private callback: Callback;
166218
private transformed = false;
167219
private _commandTimeoutTimer?: NodeJS.Timeout;
220+
private _blockingTimeoutTimer?: NodeJS.Timeout;
221+
private _blockingDeadline?: number;
168222

169223
private slot?: number | null;
170224
private keys?: Array<string | Buffer>;
@@ -327,6 +381,98 @@ export default class Command implements Respondable {
327381
}
328382
}
329383

384+
/**
385+
* Set a timeout for blocking commands.
386+
* When the timeout expires, the command resolves with null (matching Redis behavior).
387+
* This handles the case of undetectable network failures (e.g., docker network disconnect)
388+
* where the TCP connection becomes a zombie and no close event fires.
389+
*/
390+
setBlockingTimeout(ms: number) {
391+
if (ms <= 0) {
392+
return;
393+
}
394+
395+
// Clear existing timer if any (can happen when command moves from offline to command queue)
396+
if (this._blockingTimeoutTimer) {
397+
clearTimeout(this._blockingTimeoutTimer);
398+
this._blockingTimeoutTimer = undefined;
399+
}
400+
401+
const now = Date.now();
402+
403+
// First call: establish absolute deadline
404+
if (this._blockingDeadline === undefined) {
405+
this._blockingDeadline = now + ms;
406+
}
407+
408+
// Check if we've already exceeded the deadline
409+
const remaining = this._blockingDeadline - now;
410+
if (remaining <= 0) {
411+
// Resolve with null to indicate timeout (same as Redis behavior)
412+
this.resolve(null);
413+
return;
414+
}
415+
416+
this._blockingTimeoutTimer = setTimeout(() => {
417+
if (this.isResolved) {
418+
this._blockingTimeoutTimer = undefined;
419+
return;
420+
}
421+
422+
this._blockingTimeoutTimer = undefined;
423+
424+
// Timeout expired - resolve with null (same as Redis behavior when blocking command times out)
425+
this.resolve(null);
426+
}, remaining);
427+
}
428+
429+
/**
430+
* Extract the blocking timeout from the command arguments.
431+
*
432+
* @returns The timeout in seconds, null for indefinite blocking (timeout of 0),
433+
* or undefined if this is not a blocking command
434+
*/
435+
extractBlockingTimeout(): number | null | undefined {
436+
const args = this.args;
437+
438+
if (!args || args.length === 0) {
439+
return undefined;
440+
}
441+
442+
const name = this.name.toLowerCase();
443+
444+
if (Command.checkFlag("LAST_ARG_TIMEOUT_COMMANDS", name)) {
445+
return parseSecondsArgument(args[args.length - 1]);
446+
}
447+
448+
if (Command.checkFlag("FIRST_ARG_TIMEOUT_COMMANDS", name)) {
449+
return parseSecondsArgument(args[0]);
450+
}
451+
452+
if (Command.checkFlag("BLOCK_OPTION_COMMANDS", name)) {
453+
return parseBlockOption(args);
454+
}
455+
456+
return undefined;
457+
}
458+
459+
/**
460+
* Clear the command and blocking timers
461+
*/
462+
private _clearTimers() {
463+
const existingTimer = this._commandTimeoutTimer;
464+
if (existingTimer) {
465+
clearTimeout(existingTimer);
466+
delete this._commandTimeoutTimer;
467+
}
468+
469+
const blockingTimer = this._blockingTimeoutTimer;
470+
if (blockingTimer) {
471+
clearTimeout(blockingTimer);
472+
delete this._blockingTimeoutTimer;
473+
}
474+
}
475+
330476
private initPromise() {
331477
const promise = new Promise((resolve, reject) => {
332478
if (!this.transformed) {
@@ -339,13 +485,14 @@ export default class Command implements Respondable {
339485
}
340486

341487
this.resolve = this._convertValue(resolve);
342-
if (this.errorStack) {
343-
this.reject = (err) => {
488+
this.reject = (err: Error) => {
489+
this._clearTimers();
490+
if (this.errorStack) {
344491
reject(optimizeErrorStack(err, this.errorStack.stack, __dirname));
345-
};
346-
} else {
347-
this.reject = reject;
348-
}
492+
} else {
493+
reject(err);
494+
}
495+
};
349496
});
350497

351498
this.promise = asCallback(promise, this.callback);
@@ -379,12 +526,7 @@ export default class Command implements Respondable {
379526
private _convertValue(resolve: Function): (result: any) => void {
380527
return (value) => {
381528
try {
382-
const existingTimer = this._commandTimeoutTimer;
383-
if (existingTimer) {
384-
clearTimeout(existingTimer);
385-
delete this._commandTimeoutTimer;
386-
}
387-
529+
this._clearTimers();
388530
resolve(this.transformReply(value));
389531
this.isResolved = true;
390532
} catch (err) {

lib/Redis.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,8 @@ class Redis extends Commander implements DataHandledable {
445445
command.setTimeout(this.options.commandTimeout);
446446
}
447447

448+
const blockingTimeout = this.getBlockingTimeoutInMs(command);
449+
448450
let writable =
449451
this.status === "ready" ||
450452
(!stream &&
@@ -495,6 +497,18 @@ class Redis extends Commander implements DataHandledable {
495497
stream: stream,
496498
select: this.condition.select,
497499
});
500+
501+
// For blocking commands, set a timeout while queued to ensure they don't wait forever
502+
// if connection never becomes ready (e.g., docker network disconnect scenario)
503+
// Use blockingTimeout if configured, otherwise fall back to the command's own timeout
504+
if (Command.checkFlag("BLOCKING_COMMANDS", command.name)) {
505+
const offlineTimeout =
506+
this.getConfiguredBlockingTimeout() ?? blockingTimeout;
507+
508+
if (offlineTimeout !== undefined) {
509+
command.setBlockingTimeout(offlineTimeout);
510+
}
511+
}
498512
} else {
499513
// @ts-expect-error
500514
if (debug.enabled) {
@@ -523,6 +537,10 @@ class Redis extends Commander implements DataHandledable {
523537
select: this.condition.select,
524538
});
525539

540+
if (blockingTimeout !== undefined) {
541+
command.setBlockingTimeout(blockingTimeout);
542+
}
543+
526544
if (Command.checkFlag("WILL_DISCONNECT", command.name)) {
527545
this.manuallyClosing = true;
528546
}
@@ -544,6 +562,40 @@ class Redis extends Commander implements DataHandledable {
544562
return command.promise;
545563
}
546564

565+
private getBlockingTimeoutInMs(command: Command): number | undefined {
566+
if (!Command.checkFlag("BLOCKING_COMMANDS", command.name)) {
567+
return undefined;
568+
}
569+
570+
const timeout = command.extractBlockingTimeout();
571+
if (typeof timeout === "number") {
572+
if (timeout > 0) {
573+
// Finite timeout from command args - add grace period
574+
return timeout + (this.options.blockingTimeoutGrace ?? DEFAULT_REDIS_OPTIONS.blockingTimeoutGrace);
575+
}
576+
// Command has timeout=0 (block forever), use blockingTimeout option as safety net
577+
return this.getConfiguredBlockingTimeout();
578+
}
579+
580+
if (timeout === null) {
581+
// No BLOCK option found (e.g., XREAD without BLOCK), use blockingTimeout as safety net
582+
return this.getConfiguredBlockingTimeout();
583+
}
584+
585+
return undefined;
586+
}
587+
588+
private getConfiguredBlockingTimeout(): number | undefined {
589+
if (
590+
typeof this.options.blockingTimeout === "number" &&
591+
this.options.blockingTimeout > 0
592+
) {
593+
return this.options.blockingTimeout;
594+
}
595+
596+
return undefined;
597+
}
598+
547599
private setSocketTimeout() {
548600
this.socketTimeoutTimer = setTimeout(() => {
549601
this.stream.destroy(new Error(`Socket timeout. Expecting data, but didn't receive any in ${this.options.socketTimeout}ms.`));

lib/redis/RedisOptions.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ export interface CommonRedisOptions extends CommanderOptions {
1515
*/
1616
commandTimeout?: number;
1717

18+
/**
19+
* Timeout (ms) for blocking commands with timeout=0 / BLOCK 0.
20+
* When exceeded, the command resolves with null.
21+
*/
22+
blockingTimeout?: number;
23+
24+
/**
25+
* Grace period (ms) added to blocking command timeouts to account for network latency.
26+
* @default 100
27+
*/
28+
blockingTimeoutGrace?: number;
29+
1830
/**
1931
* If the socket does not receive data within a set number of milliseconds:
2032
* 1. the socket is considered "dead" and will be destroyed
@@ -262,4 +274,5 @@ export const DEFAULT_REDIS_OPTIONS: RedisOptions = {
262274
enableAutoPipelining: false,
263275
autoPipeliningIgnoredCommands: [],
264276
sentinelMaxConnections: 10,
277+
blockingTimeoutGrace: 100,
265278
};

0 commit comments

Comments
 (0)