Skip to content

Commit a62dd2a

Browse files
committed
fix: socket back pressure memory leak
Fixes: #434
1 parent 6a22361 commit a62dd2a

2 files changed

Lines changed: 63 additions & 4 deletions

File tree

lib/core/client.js

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ class Parser extends HTTPParser {
555555

556556
try {
557557
if (request.onBody(chunk, offset, length) === false) {
558-
socket.pause()
558+
socket[kPause]()
559559
}
560560
} catch (err) {
561561
util.destroy(socket, err)
@@ -630,7 +630,7 @@ class Parser extends HTTPParser {
630630
util.destroy(socket, new InformationalError('reset'))
631631
}
632632
} else {
633-
socket.resume()
633+
socket[kResume]()
634634
resume(client)
635635
}
636636
}
@@ -779,8 +779,8 @@ function connect (client) {
779779
parser.consume(socket._handle._externalStream)
780780
}
781781

782-
socket[kPause] = socket.pause.bind(socket)
783-
socket[kResume] = socket.resume.bind(socket)
782+
socket[kPause] = socketPause.bind(socket)
783+
socket[kResume] = socketResume.bind(socket)
784784
socket[kError] = null
785785
socket[kParser] = parser
786786
socket[kClient] = client
@@ -794,6 +794,20 @@ function connect (client) {
794794
.on('close', onSocketClose)
795795
}
796796

797+
function socketPause () {
798+
if (this._handle && this._handle.reading) {
799+
this._handle.reading = false
800+
this._handle.readStop()
801+
}
802+
}
803+
804+
function socketResume () {
805+
if (this._handle && !this._handle.reading) {
806+
this._handle.reading = true
807+
this._handle.readStart()
808+
}
809+
}
810+
797811
function emitDrain (client) {
798812
client[kNeedDrain] = 0
799813
client.emit('drain')

test/socket-back-pressure.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
'use strict'
2+
3+
const { Client } = require('..')
4+
const { createServer } = require('http')
5+
const { Readable } = require('stream')
6+
const { test } = require('tap')
7+
8+
test('socket back-pressure', (t) => {
9+
t.plan(2)
10+
11+
const server = createServer()
12+
13+
let body
14+
15+
server.on('request', (req, res) => {
16+
let bytesWritten = 0
17+
const buf = Buffer.allocUnsafe(16384)
18+
new Readable({
19+
read () {
20+
bytesWritten += buf.length
21+
this.push(buf)
22+
if (bytesWritten >= 1e6) {
23+
this.push(null)
24+
}
25+
}
26+
}).on('end', () => {
27+
t.ok(body._readableState.length < body._readableState.highWaterMark)
28+
}).pipe(res)
29+
})
30+
31+
server.listen(0, () => {
32+
const client = new Client(`http://localhost:${server.address().port}`, {
33+
pipelining: 1
34+
})
35+
36+
client.request({ path: '/', method: 'GET', opaque: 'asd' }, (err, data) => {
37+
t.error(err)
38+
body = data.body
39+
.resume()
40+
.on('data', () => {
41+
data.body.pause()
42+
})
43+
})
44+
})
45+
})

0 commit comments

Comments
 (0)