Skip to content

Commit f7bf886

Browse files
authored
Add management of the lifecycle of all Sessions (#3483)
1 parent 624b975 commit f7bf886

10 files changed

Lines changed: 289 additions & 34 deletions

File tree

docs/UserGuide/IoTDB-SQL-Language/Maintenance-Command.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,15 @@ IoTDB> CREATE SNAPSHOT FOR SCHEMA
5959
```
6060

6161

62-
## Kill Query
62+
## Timeout
6363

64-
When using IoTDB, you may encounter the following situations: you have entered a query statement, but can not get the result for a long time, as this query contains too much data or some other reasons, and have to wait until the query ends.
65-
Since version 0.12, IoTDB has provided two solutions for queries with long execution time: query timeout and query abort.
64+
IoTDB supports session and query level timeout.
65+
66+
### Session timeout
67+
68+
Session timeout controls when idle sessions are closed. An idle session is one that had not initiated any query or non-query operations for a period of time.
69+
70+
Session timeout is disabled by default and can be set using the `session_timeout_threshold` parameter in IoTDB configuration file.
6671

6772
### Query timeout
6873

@@ -73,7 +78,7 @@ IoTDB> select * from root;
7378
Msg: 701 Current query is time out, please check your statement or modify timeout parameter.
7479
```
7580

76-
The default timeout of the system is 60000 ms,which can be customized in the configuration file through the `query_timeout_threshold` parameter.
81+
The default timeout of a query is 60000 ms,which can be customized in the configuration file through the `query_timeout_threshold` parameter.
7782

7883
If you use JDBC or Session, we also support setting a timeout for a single query(Unit: ms):
7984

docs/zh/UserGuide/IoTDB-SQL-Language/Maintenance-Command.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,15 @@ IoTDB> CLEAR CACHE
5656
IoTDB> CREATE SNAPSHOT FOR SCHEMA
5757
```
5858

59-
## 中止查询
59+
## 超时
6060

61-
当使用 IoTDB 时,您可能会遇到以下情形:输入了一个查询,但是由于其包含的数据量过大或是其他原因,导致长时间无法返回结果,但是迫于生产环境无法中止该命令,只能被迫等待
61+
IoTDB 支持 Session 超时和查询超时
6262

63-
从 0.12 版本开始,IoTDB 对执行时间过长的查询给出了两种解决方案:查询超时和查询中止。
63+
### Session 超时
64+
65+
Session 超时控制何时关闭空闲 Session。空闲 Session 指在一段时间内没有发起任何操作的 Session。
66+
67+
Session 超时默认未开启。可以在配置文件中通过 `session_timeout_threshold` 参数进行配置。
6468

6569
### 查询超时
6670

server/src/assembly/resources/conf/iotdb-engine.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,12 @@ timestamp_precision=ms
462462
# Datatype: int
463463
# merge_write_throughput_mb_per_sec=8
464464

465+
# The maximum session idle time. unit: ms
466+
# Idle sessions are the ones that performs neither query or non-query operations for a period of time
467+
# Set to 0 to disable session timeout
468+
# Datatype: int
469+
# session_timeout_threshold=0
470+
465471
# The max executing time of query. unit: ms
466472
# Datatype: int
467473
# query_timeout_threshold=60000

server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,9 @@ public class IoTDBConfig {
407407
/** the max executing time of query in ms. Unit: millisecond */
408408
private int queryTimeoutThreshold = 60000;
409409

410+
/** the max time to live of a session in ms. Unit: millisecond */
411+
private int sessionTimeoutThreshold = 0;
412+
410413
/** Replace implementation class of JDBC service */
411414
private String rpcImplClassName = TSServiceImpl.class.getName();
412415

@@ -1217,6 +1220,14 @@ public void setQueryTimeoutThreshold(int queryTimeoutThreshold) {
12171220
this.queryTimeoutThreshold = queryTimeoutThreshold;
12181221
}
12191222

1223+
public int getSessionTimeoutThreshold() {
1224+
return sessionTimeoutThreshold;
1225+
}
1226+
1227+
public void setSessionTimeoutThreshold(int sessionTimeoutThreshold) {
1228+
this.sessionTimeoutThreshold = sessionTimeoutThreshold;
1229+
}
1230+
12201231
public boolean isReadOnly() {
12211232
return readOnly;
12221233
}

server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,12 @@ private void loadProps() {
360360
properties.getProperty(
361361
"query_timeout_threshold", Integer.toString(conf.getQueryTimeoutThreshold()))));
362362

363+
conf.setSessionTimeoutThreshold(
364+
Integer.parseInt(
365+
properties.getProperty(
366+
"session_timeout_threshold",
367+
Integer.toString(conf.getSessionTimeoutThreshold()))));
368+
363369
conf.setSyncEnable(
364370
Boolean.parseBoolean(
365371
properties.getProperty("is_sync_enable", Boolean.toString(conf.isSyncEnable()))));

server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public AtomicBoolean unRegisterQuery(long queryId) {
9494
if (scheduledFuture != null) {
9595
scheduledFuture.cancel(false);
9696
}
97+
SessionTimeoutManager.getInstance()
98+
.refresh(SessionManager.getInstance().getSessionIdByQueryId(queryId));
9799
return null;
98100
});
99101
return successRemoved;

server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,20 @@ public boolean releaseSessionResource(long sessionId) {
7777
return sessionIdToUsername.remove(sessionId) != null;
7878
}
7979

80+
public long getSessionIdByQueryId(long queryId) {
81+
// TODO: make this more efficient with a queryId -> sessionId map
82+
for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
83+
if (statementToQueries.getValue().contains(queryId)) {
84+
for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
85+
if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
86+
return sessionToStatements.getKey();
87+
}
88+
}
89+
}
90+
}
91+
return -1;
92+
}
93+
8094
public long requestStatementId(long sessionId) {
8195
long statementId = statementIdGenerator.incrementAndGet();
8296
sessionIdToStatementId
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iotdb.db.query.control;
20+
21+
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
22+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
32+
public class SessionTimeoutManager {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(SessionTimeoutManager.class);
35+
private static final long MINIMUM_CLEANUP_PERIOD = 2000;
36+
private static final long SESSION_TIMEOUT =
37+
IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
38+
39+
private Map<Long, Long> sessionIdToLastActiveTime;
40+
private ScheduledExecutorService executorService;
41+
42+
private SessionTimeoutManager() {
43+
if (SESSION_TIMEOUT == 0) {
44+
return;
45+
}
46+
47+
this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
48+
this.executorService =
49+
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
50+
51+
executorService.scheduleAtFixedRate(
52+
() -> {
53+
LOGGER.info("cleaning up expired sessions");
54+
cleanup();
55+
},
56+
0,
57+
Math.max(MINIMUM_CLEANUP_PERIOD, SESSION_TIMEOUT / 5),
58+
TimeUnit.MILLISECONDS);
59+
}
60+
61+
public void register(long id) {
62+
if (SESSION_TIMEOUT == 0) {
63+
return;
64+
}
65+
66+
sessionIdToLastActiveTime.put(id, System.currentTimeMillis());
67+
}
68+
69+
public boolean unregister(long id) {
70+
if (SESSION_TIMEOUT == 0) {
71+
return SessionManager.getInstance().releaseSessionResource(id);
72+
}
73+
74+
if (SessionManager.getInstance().releaseSessionResource(id)) {
75+
return sessionIdToLastActiveTime.remove(id) != null;
76+
}
77+
78+
return false;
79+
}
80+
81+
public void refresh(long id) {
82+
if (SESSION_TIMEOUT == 0) {
83+
return;
84+
}
85+
86+
sessionIdToLastActiveTime.computeIfPresent(id, (k, v) -> System.currentTimeMillis());
87+
}
88+
89+
private void cleanup() {
90+
long currentTime = System.currentTimeMillis();
91+
sessionIdToLastActiveTime.entrySet().stream()
92+
.filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
93+
.forEach(
94+
entry -> {
95+
if (unregister(entry.getKey())) {
96+
LOGGER.debug(
97+
String.format(
98+
"session-%s timed out in %d ms",
99+
entry.getKey(), currentTime - entry.getValue()));
100+
}
101+
});
102+
}
103+
104+
public static SessionTimeoutManager getInstance() {
105+
return SessionTimeoutManagerHelper.INSTANCE;
106+
}
107+
108+
private static class SessionTimeoutManagerHelper {
109+
110+
private static final SessionTimeoutManager INSTANCE = new SessionTimeoutManager();
111+
112+
private SessionTimeoutManagerHelper() {}
113+
}
114+
}

0 commit comments

Comments
 (0)