Skip to content

Commit 2b26853

Browse files
authored
Merge b1c095d into 16530db
2 parents 16530db + b1c095d commit 2b26853

8 files changed

Lines changed: 60 additions & 67 deletions

File tree

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,18 @@ namespace NKqp {
77

88
using namespace NYql;
99

10-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
10+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NKikimr::TLocalPathId& pathId) {
1111
TStringBuilder message;
1212
message << "Transaction locks invalidated.";
1313

1414
TMaybe<TString> tableName;
15-
if (invalidatedLock) {
16-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
17-
auto table = txCtx.TableByIdMap.FindPtr(id);
18-
if (table) {
19-
tableName = *table;
20-
}
21-
}
22-
23-
if (tableName) {
24-
message << " Table: " << *tableName;
25-
}
15+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
16+
YQL_ENSURE(table);
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
18+
}
2619

27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
20+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
21+
return GetLocksInvalidatedIssue(txCtx, invalidatedLock.GetPathId());
2822
}
2923

3024
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/common/kqp_tx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ class TTransactionsCache {
432432
}
433433
};
434434

435+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NKikimr::TLocalPathId& pathId);
435436
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
436437
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
437438

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
208208
void Finalize() {
209209
YQL_ENSURE(!AlreadyReplied);
210210

211-
if (LocksBroken) {
212-
TString message = "Transaction locks invalidated.";
213-
214-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
215-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
216-
}
217-
218211
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
219212
Counters->TxProxyMon->ReportStatusOK->Inc();
220213

@@ -1167,30 +1160,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
11671160
YQL_ENSURE(shardState->State == TShardState::EState::Executing);
11681161
shardState->State = TShardState::EState::Finished;
11691162
Counters->TxProxyMon->TxResultAborted->Inc();
1170-
LocksBroken = true;
11711163

1172-
TMaybe<TString> tableName;
11731164
if (!res->Record.GetTxLocks().empty()) {
1174-
auto& lock = res->Record.GetTxLocks(0);
1175-
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
1176-
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
1177-
if (it != TasksGraph.GetStagesInfo().end()) {
1178-
tableName = it->second.Meta.TableConstInfo->Path;
1179-
}
1165+
ResponseEv->BrokenLockPathId = res->Record.GetTxLocks(0).GetPathId();
11801166
}
11811167

1182-
// Reply as soon as we know which table had locks invalidated
1183-
if (tableName) {
1184-
auto message = TStringBuilder()
1185-
<< "Transaction locks invalidated. Table: " << *tableName;
1186-
1187-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
1188-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
1189-
}
1190-
1191-
1192-
CheckExecutionComplete();
1193-
return;
1168+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
11941169
}
11951170
default:
11961171
{
@@ -1245,30 +1220,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12451220

12461221
Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
12471222

1248-
LocksBroken = true;
1249-
1250-
TMaybe<TString> tableName;
12511223
if (!res->Record.GetTxLocks().empty()) {
1252-
auto& lock = res->Record.GetTxLocks(0);
1253-
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
1254-
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
1255-
if (it != TasksGraph.GetStagesInfo().end()) {
1256-
tableName = it->second.Meta.TableConstInfo->Path;
1257-
}
1258-
}
1259-
1260-
// Reply as soon as we know which table had locks invalidated
1261-
if (tableName) {
1262-
auto message = TStringBuilder()
1263-
<< "Transaction locks invalidated. Table: " << *tableName;
1264-
1265-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
1266-
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
1224+
ResponseEv->BrokenLockPathId = res->Record.GetTxLocks(0).GetPathId();
12671225
}
12681226

1269-
// Receive more replies from other shards
1270-
CheckExecutionComplete();
1271-
return;
1227+
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
12721228
}
12731229
case NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED: {
12741230
YQL_ENSURE(false);
@@ -2806,7 +2762,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28062762
bool VolatileTx = false;
28072763
bool ImmediateTx = false;
28082764
bool TxPlanned = false;
2809-
bool LocksBroken = false;
28102765

28112766
TInstant FirstPrepareReply;
28122767
TInstant LastPrepareReply;

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TEvKqpExecuter {
2626

2727
NLWTrace::TOrbit Orbit;
2828
IKqpGateway::TKqpSnapshot Snapshot;
29+
std::optional<NKikimr::TLocalPathId> BrokenLockPathId;
2930
ui64 ResultRowsCount = 0;
3031
ui64 ResultRowsBytes = 0;
3132

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1751,7 +1751,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17511751
auto& response = *ResponseEv->Record.MutableResponse();
17521752

17531753
response.SetStatus(status);
1754-
response.MutableIssues()->Swap(issues);
1754+
if (issues) {
1755+
response.MutableIssues()->Swap(issues);
1756+
}
17551757

17561758
LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
17571759
<< ", to ActorId: " << Target);

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
325325

326326
for (const auto& info : tableInfos) {
327327
tableInfoMap.emplace(info.GetTableName(), &info);
328-
329-
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
330-
TableByIdMap.emplace(pathId, info.GetTableName());
328+
TableByIdMap.emplace(info.GetTableId().GetTableId(), info.GetTableName());
331329
}
332330

333331
for (const auto& op : operations) {
@@ -429,7 +427,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
429427
public:
430428
bool HasUncommittedChangesRead = false;
431429
THashMap<TString, TYdbOperations> TableOperations;
432-
THashMap<TKikimrPathId, TString> TableByIdMap;
430+
THashMap<NKikimr::TLocalPathId, TString> TableByIdMap;
433431
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
434432
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
435433
bool Readonly = false;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,6 +1458,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14581458

14591459
// Invalidate query cache on scheme/internal errors
14601460
switch (status) {
1461+
case Ydb::StatusIds::ABORTED: {
1462+
if (ev->BrokenLockPathId) {
1463+
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
1464+
}
1465+
break;
1466+
}
14611467
case Ydb::StatusIds::SCHEME_ERROR:
14621468
case Ydb::StatusIds::INTERNAL_ERROR:
14631469
InvalidateQuery();

ydb/core/kqp/ut/tx/kqp_locks_ut.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,42 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
204204
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
205205
CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
206206
}
207+
208+
Y_UNIT_TEST(TwoPhaseTx) {
209+
TKikimrRunner kikimr;
210+
auto db = kikimr.GetTableClient();
211+
212+
auto session1 = db.CreateSession().GetValueSync().GetSession();
213+
auto session2 = db.CreateSession().GetValueSync().GetSession();
214+
215+
auto result = session1.ExecuteDataQuery(Q_(R"(
216+
REPLACE INTO `/Root/Test` (Group, Name, Comment) VALUES (1U, "Paul", "Changed");
217+
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
218+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
219+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
220+
221+
auto tx1 = result.GetTransaction();
222+
UNIT_ASSERT(tx1);
223+
224+
result = session2.ExecuteDataQuery(Q_(R"(
225+
REPLACE INTO `/Root/Test` (Group, Name, Comment)
226+
VALUES (1U, "Paul", "Changed");
227+
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
228+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
229+
230+
result = session1.ExecuteDataQuery(Q_(R"(
231+
SELECT * FROM `KeyValue`;
232+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
233+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
234+
235+
auto commitResult = tx1->Commit().GetValueSync();
236+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
237+
commitResult.GetIssues().PrintTo(Cerr);
238+
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
239+
[] (const NYql::TIssue& issue) {
240+
return issue.GetMessage().Contains("/Root/Test");
241+
}), commitResult.GetIssues().ToString());
242+
}
207243
}
208244

209245
} // namespace NKqp

0 commit comments

Comments
 (0)