Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions jdbc/src/test/java/org/apache/iotdb/jdbc/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.jdbc;

import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.SessionTimeoutException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;

Expand Down Expand Up @@ -118,6 +119,16 @@ public void testVerifySuccess() {
fail();
}

try {
TSStatus errorStatus = new TSStatus(TSStatusCode.SESSION_TIMEOUT.getStatusCode());
RpcUtils.verifySuccess(errorStatus);
fail();
} catch (SessionTimeoutException e) {
assertTrue(true);
} catch (Exception e) {
fail();
}

try {
TSStatus errorStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
RpcUtils.verifySuccess(errorStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public static SessionTimeoutManager getInstance() {
return SessionTimeoutManagerHelper.INSTANCE;
}

public boolean isSessionExits(long sessionId) {
Comment thread
HTHou marked this conversation as resolved.
Outdated
Comment thread
suchenglong marked this conversation as resolved.
Outdated
if (SESSION_TIMEOUT == 0) {
return true;
}
return sessionIdToLastActiveTime.containsKey(sessionId);
}

private static class SessionTimeoutManagerHelper {

private static final SessionTimeoutManager INSTANCE = new SessionTimeoutManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,26 @@ public boolean checkLogin(long sessionId) {
boolean isLoggedIn = currSessionId != null && currSessionId == sessionId;
if (!isLoggedIn) {
LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
return false;
} else {
SessionTimeoutManager.getInstance().refresh(sessionId);
}
return isLoggedIn;
}

/**
* Check whether current session is timeout.
*
* @param sessionId Session id.
* @return true: If session timeout; false: If not session timeout.
*/
public boolean checkSessionTimeout(long sessionId) {
Comment thread
HTHou marked this conversation as resolved.
if (!SessionTimeoutManager.getInstance().isSessionExits(sessionId)) {
return true;
}
return false;
}

public boolean checkAuthorization(PhysicalPlan plan, String username) throws AuthException {
if (!plan.isAuthenticationRequired()) {
return true;
Expand Down Expand Up @@ -230,7 +244,9 @@ public TSStatus closeOperation(
TSStatusCode.NOT_LOGIN_ERROR,
"Log in failed. Either you are not authorized or the session has timed out.");
}

if (checkSessionTimeout(sessionId)) {
return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"{}: receive close operation from Session {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ public TSStatus writePoints(TSWritePointsReq req) {
if (!serviceProvider.checkLogin(req.sessionId)) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getInfluxDBStatus(
TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
}
List<TSStatus> tsStatusList = new ArrayList<>();
int executeCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
for (Point point :
Expand Down Expand Up @@ -119,6 +122,10 @@ public TSStatus createDatabase(TSCreateDatabaseReq req) throws TException {
if (!serviceProvider.checkLogin(req.sessionId)) {
return getNotLoggedInStatus();
}
if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getInfluxDBStatus(
TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
}
try {
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return resp.setStatus(getNotLoggedInStatus());
}
if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return resp.setStatus(getSessionTimeoutStatus());
}

TSStatus status;
try {
Expand Down Expand Up @@ -509,6 +512,9 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}
if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}

InsertRowsPlan insertRowsPlan;
int index = 0;
Expand Down Expand Up @@ -615,6 +621,9 @@ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getSessionTimeoutStatus());
}

long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
Expand Down Expand Up @@ -653,7 +662,9 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getSessionTimeoutStatus());
}
long startTime = System.currentTimeMillis();
String statement = req.getStatement();
PhysicalPlan physicalPlan =
Expand Down Expand Up @@ -689,7 +700,9 @@ public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getSessionTimeoutStatus());
}
long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
serviceProvider
Expand Down Expand Up @@ -735,7 +748,9 @@ public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getSessionTimeoutStatus());
}
long startTime = System.currentTimeMillis();
PhysicalPlan physicalPlan =
serviceProvider
Expand Down Expand Up @@ -1003,7 +1018,9 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSFetchResultsResp(getSessionTimeoutStatus());
}
if (!SESSION_MANAGER.hasDataset(req.queryId)) {
return RpcUtils.getTSFetchResultsResp(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
Expand Down Expand Up @@ -1069,7 +1086,9 @@ public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
if (!serviceProvider.checkLogin(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return RpcUtils.getTSExecuteStatementResp(getSessionTimeoutStatus());
}
try {
PhysicalPlan physicalPlan =
serviceProvider
Expand Down Expand Up @@ -1189,7 +1208,9 @@ public TSStatus insertRecords(TSInsertRecordsReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
Expand Down Expand Up @@ -1261,7 +1282,9 @@ public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
Expand Down Expand Up @@ -1307,7 +1330,9 @@ public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceR
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
Expand Down Expand Up @@ -1365,7 +1390,9 @@ public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
Expand Down Expand Up @@ -1479,7 +1506,9 @@ public TSStatus insertRecord(TSInsertRecordReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
SESSION_MANAGER.getCurrSessionId(),
Expand Down Expand Up @@ -1514,7 +1543,9 @@ public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
SESSION_MANAGER.getCurrSessionId(),
Expand Down Expand Up @@ -1550,7 +1581,9 @@ public TSStatus deleteData(TSDeleteDataReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
DeletePlan plan = new DeletePlan();
plan.setDeleteStartTime(req.getStartTime());
plan.setDeleteEndTime(req.getEndTime());
Expand All @@ -1577,7 +1610,9 @@ public TSStatus insertTablet(TSInsertTabletReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size));
Expand Down Expand Up @@ -1613,7 +1648,9 @@ public TSStatus insertTablets(TSInsertTabletsReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
return insertTabletsInternally(req);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode());
Expand Down Expand Up @@ -1714,7 +1751,9 @@ public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
Expand Down Expand Up @@ -1746,7 +1785,9 @@ public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
// if measurements.size() == 1, convert to create timeseries
if (req.measurements.size() == 1) {
return createTimeseries(
Expand Down Expand Up @@ -1804,7 +1845,9 @@ public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
Expand Down Expand Up @@ -1918,7 +1961,9 @@ public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TExce
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create schema template {}",
Expand Down Expand Up @@ -2022,7 +2067,9 @@ public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set device template {}.{}",
Expand All @@ -2045,7 +2092,9 @@ public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TExcept
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} unset schema template {}.{}",
Expand Down Expand Up @@ -2093,7 +2142,9 @@ public TSStatus setUsingTemplate(TSSetUsingTemplateReq req) throws TException {
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create timeseries of schema template on path {}",
Expand All @@ -2115,7 +2166,9 @@ public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TExceptio
if (!serviceProvider.checkLogin(req.getSessionId())) {
return getNotLoggedInStatus();
}

if (serviceProvider.checkSessionTimeout(req.getSessionId())) {
return getSessionTimeoutStatus();
}
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} drop schema template {}.",
Expand All @@ -2128,6 +2181,10 @@ public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TExceptio
return status != null ? status : executeNonQueryPlan(plan);
}

private TSStatus getSessionTimeoutStatus() {
return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
}

@Override
public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
PhysicalPlan physicalPlan;
Expand Down
4 changes: 4 additions & 0 deletions service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public static void verifySuccess(TSStatus status) throws StatementExecutionExcep
if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
return;
}
if (status.getCode() == TSStatusCode.SESSION_TIMEOUT.getStatusCode()) {
throw new SessionTimeoutException(status);
}

if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StatementExecutionException(status);
}
Expand Down
Loading