Skip to content

Commit 2db1b59

Browse files
authored
Implement master connection closure functionality (#11)
* Implement master connection closure functionality ... and update README with new examples. Add tests for connection closure in test-websocket-multiplexer.js. https://linear.app/zetra/issue/HYX-774/ws-multiplexer-dynamic-upstream-switch * Refactor master message handling and add validation for injection and close messages
1 parent 4c0d6a0 commit 2db1b59

File tree

3 files changed

+206
-31
lines changed

3 files changed

+206
-31
lines changed

README.md

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,13 @@ The multiplexer sends several types of messages to the master control:
130130
```javascript
131131
{
132132
type: 'connection',
133-
event: 'client-connected', // or 'client-disconnected', 'upstream-connected', 'upstream-disconnected'
133+
event: 'client-connected', // or 'client-disconnected', 'upstream-connected', 'upstream-disconnected', 'connection-closed-by-master'
134134
connectionId: '/path',
135135
ip: '127.0.0.1', // Only for client-connected
136136
headers: { ... }, // Only for client-connected
137137
code: 1000, // Only for disconnection events
138-
reason: 'Normal closure' // Only for disconnection events
138+
reason: 'Normal closure', // Only for disconnection events
139+
timestamp: '2024-01-01T12:00:00.000Z' // Only for connection-closed-by-master
139140
}
140141
```
141142

@@ -277,6 +278,24 @@ master.send(JSON.stringify({
277278
}));
278279
```
279280

281+
### Example: Closing Connections
282+
283+
You can cleanly close specific connections through the master control:
284+
285+
```javascript
286+
// Close a specific connection (both client and upstream)
287+
master.send(JSON.stringify({
288+
type: 'close',
289+
connectionId: '/chat',
290+
reason: 'Connection terminated by administrator' // Optional reason
291+
}));
292+
```
293+
294+
When a connection is closed via master control:
295+
- Both client and upstream connections are closed cleanly with code 1000
296+
- Message queues for the connection are cleared
297+
- A notification is sent to all root master connections with event type `connection-closed-by-master`
298+
280299
## Use Cases
281300

282301
- Debugging WebSocket applications

test-websocket-multiplexer.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,53 @@ class TestRunner {
620620
assert.equal(disconnectData.event, 'client-disconnected');
621621
assert.equal(disconnectData.connectionId, CONFIG.TEST_PATH);
622622

623+
// Test 8: Master connection close functionality
624+
console.log('Test: Master connection close functionality');
625+
626+
// Create a new client for the close test
627+
const clientForClose = new WebSocketClient(
628+
`ws://localhost:${this.config.PROXY_PORT}${this.config.TEST_PATH}`,
629+
testHeaders
630+
);
631+
await clientForClose.connect();
632+
633+
// Wait for connection to be established and consume the client-connected notification
634+
await wait(500);
635+
const clientConnectedMsg = await masterControl.receiveMessage();
636+
const clientConnectedData = JSON.parse(clientConnectedMsg);
637+
assert.equal(clientConnectedData.type, 'connection');
638+
assert.equal(clientConnectedData.event, 'client-connected');
639+
assert.equal(clientForClose.ws.readyState, WebSocket.OPEN, 'Connection should be open');
640+
641+
// Send close command via master
642+
const closeReason = 'Test closure from master';
643+
await masterControl.send(
644+
JSON.stringify({
645+
type: 'close',
646+
connectionId: this.config.TEST_PATH,
647+
reason: closeReason,
648+
})
649+
);
650+
651+
// Verify the connection-closed-by-master notification
652+
const closedByMasterMsg = await masterControl.receiveMessage();
653+
const closedByMasterData = JSON.parse(closedByMasterMsg);
654+
assert.equal(closedByMasterData.type, 'connection');
655+
assert.equal(closedByMasterData.event, 'connection-closed-by-master');
656+
assert.equal(closedByMasterData.connectionId, CONFIG.TEST_PATH);
657+
assert.equal(closedByMasterData.reason, closeReason);
658+
assert(closedByMasterData.timestamp);
659+
660+
// Try to send a message - it should fail because the connection is closed
661+
try {
662+
await clientForClose.send('This should fail');
663+
assert.fail('Message should not be sent, connection should be closed');
664+
} catch (error) {
665+
console.log('Connection properly closed, message send failed as expected: ' + error.message);
666+
}
667+
668+
assert.equal(clientForClose.ws.readyState, WebSocket.CLOSED, 'Connection should be closed');
669+
623670
// Cleanup
624671
masterControl.close();
625672
masterClientMonitor.close();

websocket-multiplex.js

Lines changed: 138 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ function sendMessage(ws, message, source, target) {
165165

166166
/**
167167
* Handles message injection from master to specified targets
168-
* @param {{ target: string, message: string | Buffer }} data - The message data containing target and message
168+
* @param {InjectionMessage} data - The message data containing target and message
169169
*/
170170
function handleMasterInjection(data) {
171171
if (data.target === 'all-clients') {
@@ -209,41 +209,150 @@ function handleMasterInjection(data) {
209209
}
210210
}
211211

212+
/**
213+
* Handles connection closure requests from master
214+
* @param {CloseMessage} data - The close data containing connection ID and optional reason
215+
*/
216+
function handleMasterClose(data) {
217+
const connectionId = data.connectionId;
218+
const reason = data.reason || 'Closed by websocket-multiplex master control';
219+
220+
logger.info(`Master requesting to close connection: ${connectionId}`);
221+
222+
const client = connections.clients.get(connectionId);
223+
const upstream = connections.upstreams.get(connectionId);
224+
225+
if (!client && !upstream) {
226+
logger.warn(`Connection ${connectionId} not found for closure`);
227+
return;
228+
}
229+
230+
// Close client connection if it exists
231+
if (client && client.ws.readyState === WebSocket.OPEN) {
232+
logger.info(`Closing client connection for ${connectionId}`);
233+
client.ws.close(1000, reason);
234+
}
235+
236+
// Close upstream connection if it exists
237+
if (upstream && upstream.ws.readyState === WebSocket.OPEN) {
238+
logger.info(`Closing upstream connection for ${connectionId}`);
239+
upstream.ws.close(1000, reason);
240+
}
241+
242+
// Clean up message queue
243+
connections.messageQueues.delete(connectionId);
244+
245+
// Notify root masters about the forced closure
246+
notifyRootMasters('connection', 'connection-closed-by-master', connectionId, {
247+
reason,
248+
timestamp: new Date().toISOString(),
249+
});
250+
}
251+
252+
/**
253+
* @typedef {Object} InjectionMessage
254+
* @property {'inject'} type - Message type
255+
* @property {string} target - Target for injection ('all-clients', 'all-upstreams', 'client:...', 'upstream:...')
256+
* @property {string | Buffer} message - Message to inject
257+
*/
258+
259+
/**
260+
* @typedef {Object} CloseMessage
261+
* @property {'close'} type - Message type
262+
* @property {string} connectionId - Connection ID to close
263+
* @property {string} [reason] - Optional reason for closure
264+
*/
265+
266+
/**
267+
* Validates an injection message
268+
* @param {any} data - Data to validate
269+
* @returns {data is InjectionMessage} True if valid injection message
270+
*/
271+
function validateInjectionMessage(data) {
272+
return (
273+
data &&
274+
typeof data === 'object' &&
275+
data.type === 'inject' &&
276+
typeof data.target === 'string' &&
277+
data.target.length > 0 &&
278+
(typeof data.message === 'string' || Buffer.isBuffer(data.message))
279+
);
280+
}
281+
282+
/**
283+
* Validates a close message
284+
* @param {any} data - Data to validate
285+
* @returns {data is CloseMessage} True if valid close message
286+
*/
287+
function validateCloseMessage(data) {
288+
return (
289+
data &&
290+
typeof data === 'object' &&
291+
data.type === 'close' &&
292+
typeof data.connectionId === 'string' &&
293+
data.connectionId.length > 0
294+
);
295+
}
296+
297+
/**
298+
* Handles messages from root master connections
299+
* @param {string} message - The message received
300+
* @throws {Error} When message is invalid
301+
*/
302+
function handleRootMasterMessage(message) {
303+
const data = JSON.parse(message);
304+
logger.debug(
305+
`multiplexer <- master client: root ${JSON.stringify(data)}`
306+
);
307+
308+
if (validateInjectionMessage(data)) return handleMasterInjection(data);
309+
else if (validateCloseMessage(data)) return handleMasterClose(data);
310+
else throw new Error(`Invalid master message: unsupported type '${data?.type}' or missing required fields. Message: ${JSON.stringify(data)}`);
311+
}
312+
313+
/**
314+
* Handles messages from master connection with type = 'client'
315+
* @param {string} targetPath - The target client path
316+
* @param {string} message - The message to forward
317+
*/
318+
function handleMasterMessageForClient(targetPath, message) {
319+
const client = connections.clients.get(targetPath);
320+
if (client?.connected) {
321+
sendMessage(client.ws, message, 'master', `client:${targetPath}`);
322+
}
323+
}
324+
325+
/**
326+
* Handles messages from master connection with type = 'upstream'
327+
* @param {string} targetPath - The target upstream path
328+
* @param {string} message - The message to forward
329+
*/
330+
function handleMasterMessageForUpstream(targetPath, message) {
331+
const upstream = connections.upstreams.get(targetPath);
332+
if (upstream?.connected) {
333+
sendMessage(upstream.ws, message, 'master', `upstream:${targetPath}`);
334+
}
335+
}
336+
212337
/**
213338
* Handles messages received from the master connection
214339
* @param { MasterConnection } masterConnection - The master WebSocket connection
215340
* @param {string} message - The message received
216341
*/
217342
function handleMasterMessage(masterConnection, message) {
218-
const { path, type, targetPath, ws } = masterConnection;
219-
if (type === 'root') {
220-
try {
221-
/** @type {{ type: string, target: string, message: string }} */
222-
const data = JSON.parse(message);
223-
const target = data.target;
224-
const contents = data.message;
225-
logger.debug(
226-
`multiplexer <- master client: ${type} ${target} ${contents}`
227-
);
228-
229-
if (data.type === 'inject') {
230-
handleMasterInjection(data);
231-
}
232-
} catch (error) {
233-
logger.error('Error processing master message:', error);
234-
}
235-
} else {
236-
if (type === 'client') {
237-
const client = connections.clients.get(targetPath);
238-
if (client?.connected) {
239-
sendMessage(client.ws, message, 'master', `client:${targetPath}`);
240-
}
241-
} else if (type === 'upstream') {
242-
const upstream = connections.upstreams.get(targetPath);
243-
if (upstream?.connected) {
244-
sendMessage(upstream.ws, message, 'master', `upstream:${targetPath}`);
245-
}
343+
const { type, targetPath } = masterConnection;
344+
345+
try {
346+
switch (type) {
347+
case 'root':
348+
return handleRootMasterMessage(message);
349+
case 'client':
350+
return handleMasterMessageForClient(targetPath, message);
351+
case 'upstream':
352+
return handleMasterMessageForUpstream(targetPath, message);
246353
}
354+
} catch (error) {
355+
logger.error('Error processing master message:', error);
247356
}
248357
}
249358

0 commit comments

Comments
 (0)