Skip to content

Commit b5f010b

Browse files
committed
Merge branch 'release/3.x'
2 parents af7b8f7 + a23c454 commit b5f010b

67 files changed

Lines changed: 1332 additions & 851 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.idea/vcs.xml

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/HttpPlainText.kt

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
2-
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3-
*/
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
44

55
package io.ktor.client.plugins
66

@@ -10,12 +10,11 @@ import io.ktor.client.plugins.api.*
1010
import io.ktor.client.request.*
1111
import io.ktor.http.*
1212
import io.ktor.http.content.*
13-
import io.ktor.util.*
1413
import io.ktor.util.logging.*
1514
import io.ktor.utils.io.*
1615
import io.ktor.utils.io.charsets.*
1716
import io.ktor.utils.io.core.*
18-
import kotlin.math.*
17+
import kotlin.math.roundToInt
1918

2019
private val LOGGER = KtorSimpleLogger("io.ktor.client.plugins.HttpPlainText")
2120

@@ -68,7 +67,7 @@ public class HttpPlainTextConfig {
6867
* [HttpClient] plugin that encodes [String] request bodies to [TextContent]
6968
* and processes the response body as [String].
7069
*
71-
* To configure charsets set following properties in [HttpPlainText.Config].
70+
* To configure charsets, use properties in [HttpPlainTextConfig].
7271
*
7372
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.plugins.HttpPlainText)
7473
*/
@@ -81,24 +80,28 @@ public val HttpPlainText: ClientPlugin<HttpPlainTextConfig> =
8180
.filter { !pluginConfig.charsetQuality.containsKey(it) }
8281
.sortedBy { it.name }
8382

84-
val acceptCharsetHeader = buildString {
85-
withoutQuality.forEach {
86-
if (isNotEmpty()) append(",")
87-
append(it.name)
88-
}
89-
90-
withQuality.forEach { (charset, quality) ->
91-
if (isNotEmpty()) append(",")
92-
93-
check(quality in 0.0..1.0)
94-
95-
val truncatedQuality = (100 * quality).roundToInt() / 100.0
96-
append("${charset.name};q=$truncatedQuality")
97-
}
98-
99-
if (isEmpty()) {
100-
append(responseCharsetFallback.name)
83+
// RFC-9110 assumes UTF-8 as the default encoding. The Accept-Charset header is deprecated,
84+
// so we shouldn't add it if the user didn't specify any charset configuration
85+
val hasRegisteredCharsets = pluginConfig.charsets.any { it != Charsets.UTF_8 } ||
86+
pluginConfig.charsetQuality.keys.any { it != Charsets.UTF_8 }
87+
val acceptCharsetHeader = if (hasRegisteredCharsets) {
88+
buildString {
89+
withoutQuality.forEach {
90+
if (isNotEmpty()) append(",")
91+
append(it.name)
92+
}
93+
94+
withQuality.forEach { (charset, quality) ->
95+
if (isNotEmpty()) append(",")
96+
97+
check(quality in 0.0..1.0)
98+
99+
val truncatedQuality = (100 * quality).roundToInt() / 100.0
100+
append("${charset.name};q=$truncatedQuality")
101+
}
101102
}
103+
} else {
104+
null
102105
}
103106

104107
val requestCharset = pluginConfig.sendCharset
@@ -122,14 +125,14 @@ public val HttpPlainText: ClientPlugin<HttpPlainTextConfig> =
122125
return body.readText(charset = actualCharset)
123126
}
124127

125-
fun addCharsetHeaders(context: HttpRequestBuilder) {
126-
if (context.headers[HttpHeaders.AcceptCharset] != null) return
127-
LOGGER.trace("Adding Accept-Charset=$acceptCharsetHeader to ${context.url}")
128-
context.headers[HttpHeaders.AcceptCharset] = acceptCharsetHeader
128+
fun HttpRequestBuilder.addAcceptCharsetHeader(value: String?) {
129+
if (value == null || headers[HttpHeaders.AcceptCharset] != null) return
130+
LOGGER.trace("Adding Accept-Charset=$value to $url")
131+
headers[HttpHeaders.AcceptCharset] = value
129132
}
130133

131134
on(RenderRequestHook) { request, content ->
132-
addCharsetHeaders(request)
135+
request.addAcceptCharsetHeader(acceptCharsetHeader)
133136

134137
if (content !is String) return@on null
135138

ktor-client/ktor-client-core/common/src/io/ktor/client/utils/CoroutineUtils.kt

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
/*
2-
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3-
*/
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
44

55
package io.ktor.client.utils
66

77
import io.ktor.utils.io.*
8-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CoroutineDispatcher
9+
import kotlinx.coroutines.DelicateCoroutinesApi
10+
import kotlinx.coroutines.Dispatchers
911

1012
/**
1113
* Creates [CoroutineDispatcher] for the client with fixed [threadCount] and specified [dispatcherName].
@@ -16,6 +18,14 @@ import kotlinx.coroutines.*
1618
* @param threadCount the number of threads for the new [CoroutineDispatcher].
1719
* @param dispatcherName the name of the new [CoroutineDispatcher].
1820
*/
21+
@OptIn(DelicateCoroutinesApi::class)
22+
@Deprecated(
23+
"Use Dispatchers.IO.limitedParallelism instead",
24+
ReplaceWith(
25+
"Dispatchers.IO.limitedParallelism(threadCount)",
26+
"kotlinx.coroutines.Dispatchers",
27+
)
28+
)
1929
@InternalAPI
2030
public expect fun Dispatchers.clientDispatcher(
2131
threadCount: Int,
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
/*
2-
* Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package io.ktor.client.utils
66

7-
import io.ktor.util.*
87
import io.ktor.utils.io.*
9-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CoroutineDispatcher
9+
import kotlinx.coroutines.DelicateCoroutinesApi
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.newFixedThreadPoolContext
1012

1113
/**
1214
* Creates [CoroutineDispatcher] for client with fixed [threadCount] and specified [dispatcherName].
1315
*
1416
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.utils.clientDispatcher)
1517
*/
18+
@OptIn(DelicateCoroutinesApi::class)
1619
@InternalAPI
1720
public actual fun Dispatchers.clientDispatcher(
1821
threadCount: Int,
1922
dispatcherName: String
20-
): CoroutineDispatcher = Dispatchers.createFixedThreadDispatcher(dispatcherName, threadCount)
23+
): CoroutineDispatcher = newFixedThreadPoolContext(threadCount, dispatcherName)

ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlClientEngine.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package io.ktor.client.engine.curl
@@ -51,7 +51,12 @@ internal class CurlClientEngine(
5151
val responseBody: Any = if (data.isUpgradeRequest()) {
5252
val wsConfig = data.attributes[WEBSOCKETS_KEY]
5353
val websocket = responseBody as CurlWebSocketResponseBody
54-
CurlWebSocketSession(websocket, callContext, wsConfig.channelsConfig.outgoing)
54+
CurlWebSocketSession(
55+
websocket,
56+
callContext,
57+
wsConfig.channelsConfig.outgoing,
58+
curlProcessor,
59+
)
5560
} else {
5661
val httpResponse = responseBody as CurlHttpResponseBody
5762
data.attributes.getOrNull(ResponseAdapterAttributeKey)
Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
/*
2-
* Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package io.ktor.client.engine.curl
66

7+
import io.ktor.client.engine.curl.CurlTask.*
78
import io.ktor.client.engine.curl.internal.*
8-
import io.ktor.util.*
9-
import io.ktor.utils.io.*
109
import kotlinx.atomicfu.atomic
1110
import kotlinx.cinterop.ExperimentalForeignApi
1211
import kotlinx.cinterop.IntVar
@@ -17,21 +16,17 @@ import kotlinx.coroutines.channels.Channel
1716
import kotlin.coroutines.CoroutineContext
1817
import kotlin.coroutines.cancellation.CancellationException
1918

20-
internal class RequestContainer(
21-
val requestData: CurlRequestData,
22-
val completionHandler: CompletableDeferred<CurlSuccess>
23-
)
24-
19+
@OptIn(ExperimentalForeignApi::class)
2520
internal class CurlProcessor(coroutineContext: CoroutineContext) {
26-
@OptIn(InternalAPI::class)
27-
private val curlDispatcher: CloseableCoroutineDispatcher =
28-
Dispatchers.createFixedThreadDispatcher("curl-dispatcher", 1)
21+
22+
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
23+
private val curlDispatcher = newSingleThreadContext("curl-dispatcher")
2924

3025
private var curlApi: CurlMultiApiHandler? by atomic(null)
3126
private val closed = atomic(false)
3227

3328
private val curlScope = CoroutineScope(coroutineContext + curlDispatcher)
34-
private val requestQueue: Channel<RequestContainer> = Channel(Channel.UNLIMITED)
29+
private val taskQueue: Channel<CurlTask> = Channel(Channel.UNLIMITED)
3530

3631
init {
3732
val init = curlScope.launch {
@@ -49,50 +44,65 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
4944

5045
suspend fun executeRequest(request: CurlRequestData): CurlSuccess {
5146
val result = CompletableDeferred<CurlSuccess>()
52-
requestQueue.send(RequestContainer(request, result))
47+
taskQueue.send(SendRequest(request, result))
5348
curlApi!!.wakeup()
5449
return result.await()
5550
}
5651

57-
@OptIn(DelicateCoroutinesApi::class, ExperimentalForeignApi::class)
58-
private fun runEventLoop(): Job = curlScope.launch {
52+
suspend fun sendWebSocketFrame(websocket: CurlWebSocketResponseBody, flags: Int, data: ByteArray) {
53+
val result = Job()
54+
taskQueue.send(SendWebSocketFrame(websocket, flags, data, result))
55+
curlApi!!.wakeup()
56+
result.join()
57+
}
58+
59+
@OptIn(DelicateCoroutinesApi::class)
60+
private fun runEventLoop(): Job = curlScope.launch(CoroutineName("curl-processor-loop")) {
5961
memScoped {
6062
val transfersRunning = alloc<IntVar>()
6163
val api = curlApi!!
62-
while (!requestQueue.isClosedForReceive) {
63-
drainRequestQueue(api)
64+
while (!taskQueue.isClosedForReceive) {
65+
drainTaskQueue(api)
6466
api.perform(transfersRunning)
6567
}
6668
}
6769
}
6870

69-
@OptIn(ExperimentalForeignApi::class)
70-
private suspend fun drainRequestQueue(api: CurlMultiApiHandler) {
71+
private suspend fun drainTaskQueue(api: CurlMultiApiHandler) {
7172
while (true) {
72-
val container = if (api.hasHandlers()) {
73-
requestQueue.tryReceive()
73+
val task = if (api.hasHandlers()) {
74+
taskQueue.tryReceive()
7475
} else {
75-
requestQueue.receiveCatching()
76+
taskQueue.receiveCatching()
7677
}.getOrNull() ?: break
7778

78-
val requestHandler = api.scheduleRequest(container.requestData, container.completionHandler)
79-
80-
val requestCleaner = container.requestData.executionContext.invokeOnCompletion { cause ->
81-
if (cause == null) return@invokeOnCompletion
82-
cancelRequest(requestHandler, cause)
79+
when (task) {
80+
is SendRequest -> handleSendRequest(api, task)
81+
is SendWebSocketFrame ->
82+
api.sendWebSocketFrame(task.websocket, task.flags, task.data, task.completionHandler)
8383
}
84+
}
85+
}
8486

85-
container.completionHandler.invokeOnCompletion {
86-
requestCleaner.dispose()
87-
}
87+
private fun handleSendRequest(api: CurlMultiApiHandler, task: SendRequest) {
88+
val (requestData, completionHandler) = task
89+
val requestHandler = api.scheduleRequest(requestData, completionHandler)
90+
91+
val requestCleaner = requestData.executionContext.invokeOnCompletion { cause ->
92+
if (cause == null) return@invokeOnCompletion
93+
cancelRequest(requestHandler, cause)
94+
}
95+
96+
completionHandler.invokeOnCompletion {
97+
requestCleaner.dispose()
8898
}
8999
}
90100

91101
@OptIn(DelicateCoroutinesApi::class)
92102
fun close() {
93103
if (!closed.compareAndSet(false, true)) return
94104

95-
requestQueue.close()
105+
taskQueue.close()
96106
curlApi!!.wakeup()
97107

98108
GlobalScope.launch(curlDispatcher) {
@@ -103,10 +113,24 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
103113
}
104114
}
105115

106-
@OptIn(ExperimentalForeignApi::class)
107116
private fun cancelRequest(easyHandle: EasyHandle, cause: Throwable) {
108117
curlScope.launch {
109118
curlApi!!.cancelRequest(easyHandle, cause)
110119
}
111120
}
112121
}
122+
123+
private sealed interface CurlTask {
124+
125+
data class SendRequest(
126+
val requestData: CurlRequestData,
127+
val completionHandler: CompletableDeferred<CurlSuccess>,
128+
) : CurlTask
129+
130+
class SendWebSocketFrame(
131+
val websocket: CurlWebSocketResponseBody,
132+
val flags: Int,
133+
val data: ByteArray,
134+
val completionHandler: CompletableJob,
135+
) : CurlTask
136+
}

0 commit comments

Comments
 (0)