@@ -23,6 +23,7 @@ package fs2
2323package concurrent
2424
2525import cats .effect ._
26+ import cats .effect .Resource .ExitCase
2627import cats .effect .implicits ._
2728import cats .syntax .all ._
2829import scala .collection .immutable .LongMap
@@ -208,7 +209,8 @@ object Topic {
208209 }
209210
210211 def publish : Pipe [F , A , Nothing ] = { in =>
211- in.onFinalize(close.void)
212+ in
213+ .onFinalizeCase(closeWithExitCase(_).void)
212214 .evalMap(publish1)
213215 .takeWhile(_.isRight)
214216 .drain
@@ -223,13 +225,24 @@ object Topic {
223225 def subscribers : Stream [F , Int ] = subscriberCount.discrete
224226
225227 def close : F [Either [Topic .Closed , Unit ]] =
228+ closeWithExitCase(ExitCase .Succeeded )
229+
230+ def closeWithExitCase (exitCase : ExitCase ): F [Either [Closed , Unit ]] =
226231 signalClosure
227232 .complete(())
228233 .flatMap { completedNow =>
229234 val result = if (completedNow) Topic .rightUnit else Topic .closed
230235
231236 state.get
232- .flatMap { case (subs, _) => foreach(subs)(_.close.void) }
237+ .flatMap { case (subs, _) =>
238+ foreach(subs)(channel =>
239+ exitCase match {
240+ case ExitCase .Succeeded => channel.close.void
241+ case ExitCase .Errored (e) => channel.raiseError(e).void
242+ case ExitCase .Canceled => channel.cancel.void
243+ }
244+ )
245+ }
233246 .as(result)
234247 }
235248 .uncancelable
0 commit comments