Skip to content

Commit f7a5d01

Browse files
committed
JS: Introduce helper functions for event listers
1 parent 245774a commit f7a5d01

5 files changed

Lines changed: 110 additions & 66 deletions

File tree

ktor-client/ktor-client-core/js/src/io/ktor/client/engine/js/JsClientEngine.kt

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.ktor.client.engine.js
66

7-
import io.ktor.client.FetchOptions
7+
import io.ktor.client.*
88
import io.ktor.client.engine.*
99
import io.ktor.client.engine.js.compatibility.*
1010
import io.ktor.client.plugins.*
@@ -132,26 +132,16 @@ internal class JsClientEngine(
132132
private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation ->
133133
if (continuation.isCancelled) return@suspendCancellableCoroutine
134134

135-
lateinit var eventListener: (Event) -> Unit
136-
eventListener = { event: Event ->
137-
removeEventListener("open", callback = eventListener)
138-
removeEventListener("error", callback = eventListener)
135+
val disposable = addOneTimeEventListener("open", "error") { event ->
139136
when (event.type) {
140137
"open" -> continuation.resume(this)
141-
"error" -> {
142-
continuation.resumeWithException(WebSocketException(event.asString()))
143-
}
138+
"error" -> continuation.resumeWithException(WebSocketException(event.asString()))
144139
}
145140
}
146141

147-
addEventListener("open", callback = eventListener)
148-
addEventListener("error", callback = eventListener)
149-
150-
continuation.invokeOnCancellation {
151-
removeEventListener("open", callback = eventListener)
152-
removeEventListener("error", callback = eventListener)
153-
154-
if (it != null) {
142+
continuation.invokeOnCancellation { cause ->
143+
disposable.dispose()
144+
if (cause != null) {
155145
this@awaitConnection.close()
156146
}
157147
}

ktor-client/ktor-client-core/js/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.ktor.client.plugins.websocket
66

7+
import io.ktor.client.utils.*
78
import io.ktor.utils.io.*
89
import io.ktor.utils.io.core.*
910
import io.ktor.websocket.*
@@ -14,10 +15,7 @@ import kotlinx.coroutines.channels.SendChannel
1415
import kotlinx.coroutines.channels.consumeEach
1516
import org.khronos.webgl.ArrayBuffer
1617
import org.khronos.webgl.Int8Array
17-
import org.w3c.dom.ARRAYBUFFER
18-
import org.w3c.dom.BinaryType
19-
import org.w3c.dom.MessageEvent
20-
import org.w3c.dom.WebSocket
18+
import org.w3c.dom.*
2119
import kotlin.coroutines.CoroutineContext
2220

2321
@OptIn(InternalAPI::class)
@@ -59,12 +57,11 @@ internal class JsWebSocketSession(
5957
if (websocket.readyState == WebSocket.OPEN) {
6058
return block()
6159
}
62-
websocket.addEventListener("open", callback = { block() })
60+
websocket.addOneTimeEventListener("open") { block() }
6361
}
6462

6563
init {
66-
val onMessage: (org.w3c.dom.events.Event) -> Unit = { e ->
67-
val event = e.unsafeCast<MessageEvent>()
64+
val onMessage = websocket.addEventListener<MessageEvent>("message") { event ->
6865
val frame: Frame = when (val data = event.data) {
6966
is ArrayBuffer -> Frame.Binary(true, Int8Array(data).unsafeCast<ByteArray>())
7067
is String -> Frame.Text(data)
@@ -77,21 +74,19 @@ internal class JsWebSocketSession(
7774
_incoming.trySend(frame)
7875
}
7976

80-
val onError: (org.w3c.dom.events.Event) -> Unit = { e ->
81-
val cause = WebSocketException("$e")
77+
val onError = websocket.addEventListener<ErrorEvent>("error") { event ->
78+
val cause = WebSocketException("$event")
8279
_closeReason.completeExceptionally(cause)
8380
_incoming.close(cause)
8481
_outgoing.cancel()
8582
}
8683

87-
lateinit var onClose: (dynamic) -> Unit
88-
onClose = { e ->
89-
val reason = CloseReason(e.code as Short, e.reason as String)
84+
websocket.addOneTimeEventListener<CloseEvent>("close") { event ->
85+
val reason = CloseReason(event.code, event.reason)
9086
_closeReason.complete(reason)
9187
_incoming.trySend(Frame.Close(reason))
9288
_incoming.close()
9389
_outgoing.cancel()
94-
websocket.removeEventListener("close", callback = onClose)
9590
}
9691

9792
// we must not throw exceptions before this
@@ -106,14 +101,11 @@ internal class JsWebSocketSession(
106101
websocket.close(CloseReason.Codes.NORMAL.code, "Client failed")
107102
}
108103
}
109-
websocket.removeEventListener("message", callback = onMessage)
110-
websocket.removeEventListener("error", callback = onError)
104+
onMessage.dispose()
105+
onError.dispose()
111106
}
112107

113108
websocket.binaryType = BinaryType.ARRAYBUFFER
114-
websocket.addEventListener("message", callback = onMessage)
115-
websocket.addEventListener("error", callback = onError)
116-
websocket.addEventListener("close", callback = onClose)
117109

118110
launch {
119111
_outgoing.consumeEach {

ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/engine/js/WasmJsClientEngine.kt

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.ktor.client.engine.js
66

7-
import io.ktor.client.FetchOptions
7+
import io.ktor.client.*
88
import io.ktor.client.engine.*
99
import io.ktor.client.engine.js.compatibility.*
1010
import io.ktor.client.plugins.*
@@ -16,7 +16,6 @@ import io.ktor.http.*
1616
import io.ktor.util.*
1717
import io.ktor.util.date.*
1818
import io.ktor.utils.io.*
19-
import io.ktor.websocket.ChannelConfig
2019
import kotlinx.coroutines.*
2120
import org.w3c.dom.WebSocket
2221
import org.w3c.dom.events.Event
@@ -141,26 +140,15 @@ internal class JsClientEngine(
141140
private suspend fun WebSocket.awaitConnection(): WebSocket = suspendCancellableCoroutine { continuation ->
142141
if (continuation.isCancelled) return@suspendCancellableCoroutine
143142

144-
lateinit var eventListener: (JsAny) -> Unit
145-
eventListener = { it: JsAny ->
146-
val event: Event = it.unsafeCast()
147-
removeEventListener("open", callback = eventListener)
148-
removeEventListener("error", callback = eventListener)
143+
val disposable = addOneTimeEventListener("open", "error") { event ->
149144
when (event.type) {
150145
"open" -> continuation.resume(this)
151-
"error" -> {
152-
continuation.resumeWithException(WebSocketException(eventAsString(event)))
153-
}
146+
"error" -> continuation.resumeWithException(WebSocketException(eventAsString(event)))
154147
}
155148
}
156149

157-
addEventListener("open", callback = eventListener)
158-
addEventListener("error", callback = eventListener)
159-
160150
continuation.invokeOnCancellation {
161-
removeEventListener("open", callback = eventListener)
162-
removeEventListener("error", callback = eventListener)
163-
151+
disposable.dispose()
164152
if (it != null) {
165153
this@awaitConnection.close()
166154
}

ktor-client/ktor-client-core/wasmJs/src/io/ktor/client/plugins/websocket/JsWebSocketSession.kt

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,11 @@ internal class JsWebSocketSession(
6464
if (websocket.readyState == WebSocket.OPEN) {
6565
return block()
6666
}
67-
websocket.addEventListener("open", callback = { _: JsAny -> block() })
67+
websocket.addOneTimeEventListener("open") { block() }
6868
}
6969

7070
init {
71-
val onMessage: (JsAny) -> Unit = { e ->
72-
val event = e.unsafeCast<MessageEvent>()
73-
71+
val onMessage = websocket.addEventListener<MessageEvent>("message") { event ->
7472
val data = event.data
7573
if (data == null) {
7674
val error = IllegalStateException("Empty message - no data for: ${event.type}")
@@ -95,22 +93,19 @@ internal class JsWebSocketSession(
9593
_incoming.trySend(frame)
9694
}
9795

98-
val onError: (JsAny) -> Unit = { e ->
99-
val cause = WebSocketException("$e")
96+
val onError = websocket.addEventListener<CloseEvent>("close") { event ->
97+
val cause = WebSocketException("$event")
10098
_closeReason.completeExceptionally(cause)
10199
_incoming.close(cause)
102100
_outgoing.cancel()
103101
}
104102

105-
lateinit var onClose: (JsAny) -> Unit
106-
onClose = { e ->
107-
val closeEvent = e.unsafeCast<CloseEvent>()
108-
val reason = CloseReason(closeEvent.code, closeEvent.reason)
103+
websocket.addOneTimeEventListener<CloseEvent>("close") { event ->
104+
val reason = CloseReason(event.code, event.reason)
109105
_closeReason.complete(reason)
110106
_incoming.trySend(Frame.Close(reason))
111107
_incoming.close()
112108
_outgoing.cancel()
113-
websocket.removeEventListener("close", callback = onClose)
114109
}
115110

116111
coroutineContext[Job]?.invokeOnCompletion { cause ->
@@ -123,14 +118,11 @@ internal class JsWebSocketSession(
123118
websocket.close(CloseReason.Codes.NORMAL.code, "Client failed")
124119
}
125120
}
126-
websocket.removeEventListener("message", callback = onMessage)
127-
websocket.removeEventListener("error", callback = onError)
121+
onMessage.dispose()
122+
onError.dispose()
128123
}
129124

130125
websocket.binaryType = BinaryType.ARRAYBUFFER
131-
websocket.addEventListener("message", callback = onMessage)
132-
websocket.addEventListener("error", callback = onError)
133-
websocket.addEventListener("close", callback = onClose)
134126

135127
launch {
136128
_outgoing.consumeEach {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.client.utils
6+
7+
import kotlinx.coroutines.DisposableHandle
8+
import org.w3c.dom.events.Event
9+
import org.w3c.dom.events.EventTarget
10+
import kotlin.js.unsafeCast
11+
12+
/**
13+
* Registers event listeners for the given [events] and returns a [DisposableHandle] to remove them.
14+
*
15+
* @param events event names to listen for
16+
* @param listener callback invoked when any of the events fire
17+
* @return a handle to remove the listeners
18+
*/
19+
internal inline fun EventTarget.addEventListener(
20+
vararg events: String,
21+
crossinline listener: (Event) -> Unit
22+
): DisposableHandle {
23+
val callback = { event: Event -> listener(event) }
24+
events.forEach { addEventListener(it, callback) }
25+
return DisposableHandle { events.forEach { removeEventListener(it, callback) } }
26+
}
27+
28+
/**
29+
* Typed variant of [addEventListener] that unsafe-casts the event to [T] before invoking [listener].
30+
*
31+
* Use this when the event type is known statically. The cast is unchecked because JS event objects
32+
* from some runtimes (e.g. the `ws` npm package) are plain objects that do not satisfy Kotlin
33+
* `instanceof` checks for DOM event subclasses.
34+
*
35+
* @param events event names to listen for
36+
* @param listener callback invoked with the event cast to [T]
37+
* @return a handle to remove the listeners
38+
*/
39+
internal inline fun <reified T : Event> EventTarget.addEventListener(
40+
vararg events: String,
41+
crossinline listener: (T) -> Unit
42+
): DisposableHandle = addEventListener(*events) {
43+
@Suppress("RemoveExplicitTypeArguments")
44+
listener(it.unsafeCast<T>())
45+
}
46+
47+
/**
48+
* Registers a one-time event listener that removes itself after the first event fires.
49+
* The listener is registered for all specified [events] and fires once for whichever event occurs first.
50+
*
51+
* @param events event names to listen for
52+
* @param listener callback invoked when one of the events fires
53+
* @return a handle to remove the listener before it fires
54+
*/
55+
internal inline fun EventTarget.addOneTimeEventListener(
56+
vararg events: String,
57+
crossinline listener: (Event) -> Unit
58+
): DisposableHandle {
59+
lateinit var disposable: DisposableHandle
60+
disposable = addEventListener(*events) { event ->
61+
disposable.dispose()
62+
listener(event)
63+
}
64+
return disposable
65+
}
66+
67+
/**
68+
* Typed variant of [addOneTimeEventListener] that unsafe-casts the event to [T] before invoking [listener].
69+
*
70+
* See [addEventListener] for why an unsafe cast is used instead of a checked cast.
71+
*
72+
* @param events event names to listen for
73+
* @param listener callback invoked with the event cast to [T]
74+
* @return a handle to remove the listener before it fires
75+
*/
76+
internal inline fun <reified T : Event> EventTarget.addOneTimeEventListener(
77+
vararg events: String,
78+
crossinline listener: (T) -> Unit
79+
): DisposableHandle = addOneTimeEventListener(*events) {
80+
@Suppress("RemoveExplicitTypeArguments")
81+
listener(it.unsafeCast<T>())
82+
}

0 commit comments

Comments
 (0)