@@ -217,6 +217,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
217217 logic =>
218218
219219 import Http2Demux .CompletionTimeout
220+ import Http2Demux .GoAwayGracePeriod
220221
221222 def wrapTrailingHeaders (headers : ParsedHeadersFrame ): Option [HttpEntity .ChunkStreamPart ] = stage.wrapTrailingHeaders(headers)
222223
@@ -233,6 +234,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
233234
234235 private val terminationPromise = Promise [Http .HttpTerminated ]()
235236 private var terminating : Boolean = false
237+ private var goAwayGracePeriodElapsed : Boolean = false
236238 private var lastIdBeforeTermination : Int = 0
237239 private val terminateCallback = getAsyncCallback[FiniteDuration ](triggerTermination)
238240 override def terminate (deadline : FiniteDuration )(implicit ex : ExecutionContext ): Future [Http .HttpTerminated ] = {
@@ -242,11 +244,24 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
242244 private def triggerTermination (deadline : FiniteDuration ): Unit =
243245 // check if we are already terminating, otherwise start termination
244246 if (! terminating) {
245- log.debug(s " Termination of this connection was triggered. Sending GOAWAY and waiting for open requests to complete for $CompletionTimeout. " )
246- terminating = true
247- pushGOAWAY(ErrorCode .NO_ERROR , " Voluntary connection close." )
248- lastIdBeforeTermination = lastStreamId()
249- completeIfDone()
247+ if (deadline == Duration .Zero ) {
248+ log.debug(" Termination of this connection was triggered. Sending GOAWAY and waiting for open requests to complete for {}." , CompletionTimeout )
249+ terminating = true
250+ goAwayGracePeriodElapsed = true
251+ pushGOAWAY(ErrorCode .NO_ERROR , " Voluntary connection close." )
252+ lastIdBeforeTermination = lastStreamId()
253+ completeIfDone()
254+ } else {
255+ log.debug(" Termination of this connection was triggered. Sending GOAWAY and waiting for open requests to complete for {}." , deadline)
256+ terminating = true
257+ // First GOAWAY per RFC 7540 §6.8: use last-stream-id = Int.MaxValue to signal graceful
258+ // shutdown intent. The client must stop initiating new streams, but streams already in
259+ // flight before the client receives this frame may still arrive. We accept those during
260+ // the grace period by keeping lastIdBeforeTermination at Int.MaxValue for now.
261+ multiplexer.pushControlFrame(GoAwayFrame (Int .MaxValue , ErrorCode .NO_ERROR , ByteString (" Voluntary connection close." )))
262+ lastIdBeforeTermination = Int .MaxValue
263+ scheduleOnce(GoAwayGracePeriod , http2Settings.goawayGracePeriod)
264+ }
250265 if (! isClosed(frameOut))
251266 scheduleOnce(CompletionTimeout , deadline)
252267 }
@@ -297,7 +312,6 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
297312 override def pushGOAWAY (errorCode : ErrorCode , debug : String ): Unit = {
298313 val frame = GoAwayFrame (lastStreamId(), errorCode, ByteString (debug))
299314 multiplexer.pushControlFrame(frame)
300- // FIXME: handle the connection closing according to the specification
301315 }
302316 private [this ] var allowReadingIncomingFrames : Boolean = true
303317 override def allowReadingIncomingFrames (allow : Boolean ): Unit = {
@@ -415,7 +429,10 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
415429 private def completeIfDone (): Unit = {
416430 val noMoreOutgoingStreams = (terminating || isClosed(substreamIn)) && activeStreamCount() == 0
417431 def allOutgoingDataFlushed = isClosed(frameOut) || multiplexer.hasFlushedAllData
418- if (noMoreOutgoingStreams && allOutgoingDataFlushed) {
432+ // When terminating via GOAWAY, delay closure until the grace period has elapsed so the
433+ // client has time to receive the GOAWAY frame before we tear down the TCP connection.
434+ val gracePeriodDone = ! terminating || goAwayGracePeriodElapsed
435+ if (noMoreOutgoingStreams && allOutgoingDataFlushed && gracePeriodDone) {
419436 log.debug(" Closing connection after all streams are done and all data has been flushed." )
420437 if (isServer)
421438 completeStage()
@@ -488,6 +505,18 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
488505 }
489506
490507 override protected def onTimer (timerKey : Any ): Unit = timerKey match {
508+ case GoAwayGracePeriod =>
509+ // Second GOAWAY per RFC 7540 §6.8: now that the grace period has elapsed, confirm the
510+ // actual last stream ID that was processed. The client must retry any streams above this
511+ // value. Only send if the connection is still open — it may have already been closed by
512+ // CompletionTimeout or a peer-initiated close during the grace period.
513+ if (! isClosed(frameOut)) {
514+ lastIdBeforeTermination = lastStreamId()
515+ pushGOAWAY(ErrorCode .NO_ERROR , " Voluntary connection close." )
516+ }
517+ goAwayGracePeriodElapsed = true
518+ completeIfDone()
519+
491520 case ConfigurablePing .Tick =>
492521 // don't do anything unless there are active streams
493522 if (activeStreamCount() > 0 ) {
@@ -524,4 +553,5 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
524553@ InternalApi
525554private [akka] object Http2Demux {
526555 case object CompletionTimeout
556+ case object GoAwayGracePeriod
527557}
0 commit comments