Skip to content

Commit 943796a

Browse files
committed
implement websocket server
1 parent c519eb6 commit 943796a

3 files changed

Lines changed: 375 additions & 15 deletions

File tree

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
'use strict'
2+
3+
const http = require('node:http')
4+
const crypto = require('node:crypto')
5+
const stream = require('node:stream')
6+
7+
const uid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
8+
9+
class ws {
10+
/**
11+
* @param {number} opcode
12+
* @param {Uint8Array} data
13+
*/
14+
static createFrame (opcode, data) {
15+
const length = data.length
16+
17+
let payloadLength = length
18+
let offset = 2
19+
20+
if (length > 65535) {
21+
offset += 8
22+
payloadLength = 127
23+
} else if (length > 125) {
24+
offset += 2
25+
payloadLength = 126
26+
}
27+
28+
const frame = Buffer.allocUnsafe(length + offset)
29+
30+
frame[0] = 0x80 | opcode
31+
32+
frame[1] = payloadLength
33+
34+
if (payloadLength === 126) {
35+
frame.writeUInt16BE(length, 2)
36+
} else if (payloadLength === 127) {
37+
frame[2] = frame[3] = 0
38+
frame.writeUIntBE(length, 4, 6)
39+
}
40+
41+
if (length !== offset) {
42+
frame.set(data, offset)
43+
}
44+
45+
return frame
46+
}
47+
48+
/**
49+
* @param {Uint8Array} buffer
50+
*/
51+
static getHeadLength (buffer) {
52+
if (buffer.length < 2) {
53+
return null
54+
}
55+
const payloadLength = buffer[1] & 0x7f
56+
return (
57+
2 +
58+
(payloadLength === 126 ? 2 : payloadLength === 127 ? 8 : 0) +
59+
((buffer[1] & 0x80) === 0x80 ? 4 : 0)
60+
)
61+
}
62+
63+
/**
64+
* @param {Uint8Array} buffer
65+
*/
66+
static parseFrame (buffer) {
67+
if (buffer.length < 2) {
68+
return null
69+
}
70+
71+
const fin = (buffer[0] & 0x80) === 0x80
72+
const opcode = buffer[0] & 0x0f
73+
const masked = (buffer[1] & 0x80) === 0x80
74+
const payloadLength = buffer[1] & 0x7f
75+
const offset =
76+
6 + (payloadLength === 126 ? 2 : payloadLength === 127 ? 8 : 0)
77+
if (!fin || !masked) {
78+
throw new Error('Invalid frame')
79+
}
80+
81+
if (buffer.length < offset) {
82+
return null
83+
}
84+
85+
let length = 0
86+
if (payloadLength < 126) {
87+
length = payloadLength
88+
} else if (payloadLength === 126) {
89+
length |= buffer[2] << 8
90+
length |= buffer[3] << 0
91+
} else if (payloadLength === 127) {
92+
length |= buffer[2] << 56
93+
length |= buffer[3] << 48
94+
length |= buffer[4] << 40
95+
length |= buffer[5] << 32
96+
length |= buffer[6] << 24
97+
length |= buffer[7] << 16
98+
length |= buffer[8] << 8
99+
length |= buffer[9] << 0
100+
}
101+
102+
return {
103+
opcode,
104+
length,
105+
offset,
106+
maskKey: buffer.subarray(offset - 4, offset),
107+
complete: buffer.length >= offset + length
108+
}
109+
}
110+
111+
static Stream = class extends stream.Writable {
112+
#head = null
113+
#receivedLength = 0
114+
115+
_write (chunk, _encoding, callback) {
116+
if (this.parseBody) {
117+
if (this.#head === null) {
118+
this.#head = chunk
119+
} else {
120+
this.#head = Buffer.concat([this.#head, chunk])
121+
}
122+
const head = this.#head
123+
const parsed = ws.parseFrame(head)
124+
if (parsed !== null) {
125+
if (parsed.complete) {
126+
const buffer = head.subarray(
127+
parsed.offset,
128+
parsed.offset + parsed.length
129+
)
130+
if (head.length > parsed.offset + parsed.length) {
131+
this.#head = head.subarray(
132+
parsed.offset + parsed.length,
133+
head.length
134+
)
135+
} else {
136+
this.#head = null
137+
}
138+
if (
139+
parsed.opcode === ws.opcode.TEXT ||
140+
parsed.opcode === ws.opcode.BINARY
141+
) {
142+
this.onData({
143+
maskKey: parsed.maskKey,
144+
buffer,
145+
isBinary: parsed.opcode === ws.opcode.BINARY
146+
})
147+
} else if (parsed.opcode === ws.opcode.CLOSE) {
148+
this.onClose()
149+
} else {
150+
throw new Error('Unsupported frame opcode')
151+
}
152+
}
153+
}
154+
} else {
155+
let merged = false
156+
if (this.#head === null) {
157+
this.#head = chunk
158+
} else if (this.#head.length < 2) {
159+
this.#head = Buffer.concat([this.#head, chunk])
160+
merged = true
161+
} else {
162+
this.#receivedLength += chunk.length
163+
}
164+
const head = this.#head
165+
const size = ws.getHeadLength(head)
166+
if (size !== null) {
167+
const parsed = ws.parseFrame(head)
168+
if (parsed !== null) {
169+
const length = head.length + this.#receivedLength
170+
if (length >= parsed.offset + parsed.length) {
171+
if (length !== parsed.offset + parsed.length) {
172+
const start = length - (parsed.offset + parsed.length)
173+
if (chunk.length < start) {
174+
if (merged) throw new Error('fatal error')
175+
this.#head = Buffer.concat([this.#head, chunk]).subarray(
176+
start
177+
)
178+
} else {
179+
this.#head = chunk.subarray(start)
180+
}
181+
} else {
182+
this.#head = null
183+
}
184+
this.#receivedLength = 0
185+
if (
186+
parsed.opcode === ws.opcode.TEXT ||
187+
parsed.opcode === ws.opcode.BINARY
188+
) {
189+
this.onData({})
190+
} else if (parsed.opcode === ws.opcode.CLOSE) {
191+
this.onClose()
192+
} else {
193+
throw new Error('Unsupported frame opcode')
194+
}
195+
}
196+
}
197+
}
198+
}
199+
callback()
200+
}
201+
202+
parseBody = true
203+
204+
/**
205+
* @type {(...args: any[]) => void}
206+
*/
207+
onData
208+
/**
209+
* @type {(...args: any[]) => void}
210+
*/
211+
onClose
212+
}
213+
214+
/**
215+
* @param {Uint8Array} buffer
216+
* @param {Uint8Array} mask
217+
* @returns {Uint8Array}
218+
*/
219+
static unmask (buffer, mask) {
220+
const length = buffer.length
221+
const fixedLength = length - (length & 3)
222+
for (let i = 0; i < fixedLength; i += 4) {
223+
buffer[i] ^= mask[0]
224+
buffer[i + 1] ^= mask[1]
225+
buffer[i + 2] ^= mask[2]
226+
buffer[i + 3] ^= mask[3]
227+
}
228+
for (let i = fixedLength; i < length; ++i) {
229+
buffer[i] ^= mask[i & 3]
230+
}
231+
return buffer
232+
}
233+
234+
static opcode = {
235+
CONTINUATION: 0x0,
236+
TEXT: 0x1,
237+
BINARY: 0x2,
238+
CLOSE: 0x8,
239+
PING: 0x9,
240+
PONG: 0xa
241+
}
242+
243+
static Controller = class {
244+
#socket
245+
constructor (socket) {
246+
this.#socket = socket
247+
}
248+
249+
write (buffer, isBinary) {
250+
return this.writeFrame(
251+
ws.createFrame(isBinary ? ws.opcode.BINARY : ws.opcode.TEXT, buffer)
252+
)
253+
}
254+
255+
writeFrame (frame) {
256+
if (this.#socket.writable) {
257+
return new Promise((resolve, reject) => {
258+
this.#socket.write(frame, (err) => {
259+
if (err) {
260+
reject(err)
261+
} else {
262+
resolve()
263+
}
264+
})
265+
})
266+
}
267+
}
268+
269+
async close () {
270+
if (this.#socket.writable) {
271+
await this.writeFrame(ws.createFrame(ws.opcode.CLOSE))
272+
this.#socket.end()
273+
}
274+
}
275+
276+
/** @type {((...args: any[]) => void) | null} */
277+
onMessage
278+
}
279+
}
280+
281+
/**
282+
* @param {{ onConnection: (ctrl: InstanceType<ws.controller>) => void; parseBody: boolean; }} param0
283+
*/
284+
function setup ({ onConnection, parseBody }) {
285+
const server = http.createServer((_req, res) => {
286+
res.end('')
287+
})
288+
289+
server.on('upgrade', (req, socket, _head) => {
290+
const key = crypto
291+
.createHash('sha1')
292+
.update(`${req.headers['sec-websocket-key']}${uid}`)
293+
.digest('base64')
294+
295+
socket.cork()
296+
socket.write('HTTP/1.1 101 Switching Protocols\r\n')
297+
socket.write('Upgrade: websocket\r\n')
298+
socket.write('Connection: Upgrade\r\n')
299+
socket.write('Sec-WebSocket-Version: 13\r\n')
300+
socket.write(`Sec-WebSocket-Accept: ${key}\r\n`)
301+
socket.write('\r\n')
302+
socket.uncork()
303+
304+
const stream = new ws.Stream()
305+
const controller = new ws.Controller(socket)
306+
307+
stream.onData = (data) => {
308+
if (typeof controller.onMessage === 'function') {
309+
controller.onMessage(data)
310+
}
311+
}
312+
313+
stream.parseBody = !!(parseBody ?? true)
314+
315+
stream.onClose = () => {
316+
socket.end()
317+
}
318+
319+
stream.on('drain', () => {
320+
socket.resume()
321+
})
322+
323+
socket.on('data', (buffer) => {
324+
if (!stream.write(buffer)) {
325+
socket.pause()
326+
}
327+
})
328+
329+
socket.on('error', (err) => {
330+
stream.destroy(err)
331+
})
332+
333+
socket.on('close', () => {
334+
stream.destroy()
335+
})
336+
337+
onConnection(controller)
338+
})
339+
340+
return server
341+
}
342+
343+
module.exports = { setup, ws }

benchmarks/websocket/server/simple.mjs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import cluster from 'node:cluster'
2-
import { WebSocketServer } from 'ws'
2+
import { setup, ws } from '../../_util/websocket-simple-server.js'
33
import { cpus } from 'node:os'
44

55
if (cluster.isPrimary) {
@@ -8,23 +8,19 @@ if (cluster.isPrimary) {
88
cluster.fork()
99
}
1010
} else {
11-
const server = new WebSocketServer({
12-
maxPayload: 600 * 1024 * 1024,
13-
perMessageDeflate: false,
14-
clientTracking: false,
15-
port: 5001
16-
})
17-
18-
// Workaround for https://github.com/nodejs/undici/issues/3202
19-
const emptyBuffer = Buffer.allocUnsafe(1)
11+
const emptyFrame = ws.createFrame(ws.opcode.BINARY, Buffer.allocUnsafe(0))
2012

21-
server.on('connection', (socket) => {
22-
socket.on('message', (_data, _isBinary) => {
23-
socket.send(emptyBuffer)
24-
// socket.close();
25-
})
13+
const server = setup({
14+
onConnection (ctrl) {
15+
ctrl.onMessage = () => {
16+
ctrl.writeFrame(emptyFrame)
17+
}
18+
},
19+
parseBody: false
2620
})
2721

22+
server.listen(5001)
23+
2824
cluster.on('exit', () => {
2925
server.close()
3026
})

0 commit comments

Comments
 (0)