Skip to content

Commit 50dce30

Browse files
Merge ded3a4e into 3a59d95
2 parents 3a59d95 + ded3a4e commit 50dce30

12 files changed

Lines changed: 442 additions & 34 deletions

File tree

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() {
323323
const auto& operations = GetTopicOperations();
324324

325325
TMaybe<TString> consumer;
326-
if (operations.HasConsumer())
326+
if (operations.HasConsumer()) {
327327
consumer = operations.GetConsumer();
328+
}
329+
330+
TMaybe<ui32> supportivePartition;
331+
if (operations.HasSupportivePartition()) {
332+
supportivePartition = operations.GetSupportivePartition();
333+
}
328334

329335
TopicOperations = NTopic::TTopicOperations();
330336

@@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
334340

335341
for (auto& partition : topic.partitions()) {
336342
if (partition.partition_offsets().empty()) {
337-
TopicOperations.AddOperation(path, partition.partition_id());
343+
TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition);
338344
} else {
339345
for (auto& range : partition.partition_offsets()) {
340346
YQL_ENSURE(consumer.Defined());

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
7878
Operations_[consumer].AddOperation(consumer, range);
7979
}
8080

81-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
81+
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
82+
TMaybe<ui32> supportivePartition)
8283
{
8384
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
8485
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,6 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
8889
Partition_ = partition;
8990
}
9091

92+
SupportivePartition_ = supportivePartition;
93+
9194
HasWriteOperations_ = true;
9295
}
9396

@@ -112,6 +115,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
112115
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
113116
o->SetPartitionId(*Partition_);
114117
o->SetPath(*Topic_);
118+
if (SupportivePartition_.Defined()) {
119+
o->SetSupportivePartition(*SupportivePartition_);
120+
}
115121
}
116122
}
117123

@@ -127,6 +133,10 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
127133
TabletId_ = rhs.TabletId_;
128134
}
129135

136+
if (!SupportivePartition_.Defined() || (*SupportivePartition_ != Max<ui32>())) {
137+
SupportivePartition_ = rhs.SupportivePartition_;
138+
}
139+
130140
for (auto& [key, value] : rhs.Operations_) {
131141
Operations_[key].Merge(value);
132142
}
@@ -240,10 +250,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
240250
HasReadOperations_ = true;
241251
}
242252

243-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
253+
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
254+
TMaybe<ui32> supportivePartition)
244255
{
245256
TTopicPartition key{topic, partition};
246-
Operations_[key].AddOperation(topic, partition);
257+
Operations_[key].AddOperation(topic, partition, supportivePartition);
247258
HasWriteOperations_ = true;
248259
}
249260

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class TTopicPartitionOperations {
4949
void AddOperation(const TString& topic, ui32 partition,
5050
const TString& consumer,
5151
const Ydb::Topic::OffsetsRange& range);
52-
void AddOperation(const TString& topic, ui32 partition);
52+
void AddOperation(const TString& topic, ui32 partition,
53+
TMaybe<ui32> supportivePartition);
5354

5455
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
5556

@@ -67,6 +68,7 @@ class TTopicPartitionOperations {
6768
THashMap<TString, TConsumerOperations> Operations_;
6869
bool HasWriteOperations_ = false;
6970
TMaybe<ui64> TabletId_;
71+
TMaybe<ui32> SupportivePartition_;
7072
};
7173

7274
struct TTopicPartition {
@@ -98,7 +100,8 @@ class TTopicOperations {
98100
void AddOperation(const TString& topic, ui32 partition,
99101
const TString& consumer,
100102
const Ydb::Topic::OffsetsRange& range);
101-
void AddOperation(const TString& topic, ui32 partition);
103+
void AddOperation(const TString& topic, ui32 partition,
104+
TMaybe<ui32> supportivePartition);
102105

103106
void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
104107
TMaybe<TString> consumer);

ydb/core/persqueue/partition.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3040,13 +3040,19 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30403040
Send(ev->Sender, response.Release());
30413041
}
30423042

3043-
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
3043+
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
30443044
{
3045+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
3046+
"Delete supportive partition " << Partition);
3047+
Y_ABORT_UNLESS(IsSupportive());
3048+
30453049
PendingEvents.emplace_back(ev->ReleaseBase().Release());
30463050
}
30473051

30483052
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
30493053
{
3054+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
3055+
"Delete supportive partition " << Partition);
30503056
Y_ABORT_UNLESS(IsSupportive());
30513057
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
30523058

ydb/core/persqueue/partition_write.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
4141
r->SetOwnerCookie(cookie);
4242
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
4343
r->SetSeqNo(seqNo);
44+
if (IsSupportive()) {
45+
r->SetSupportivePartition(Partition.InternalPartitionId);
46+
}
4447

4548
ctx.Send(Tablet, response.Release());
4649
}

0 commit comments

Comments
 (0)