Skip to content

Commit 6763ae1

Browse files
committed
The consumer's generation number is not stored in the transaction (ydb-platform#9590) (ydb-platform#9619)
1 parent b8d484e commit 6763ae1

3 files changed

Lines changed: 214 additions & 22 deletions

File tree

ydb/core/persqueue/pq_impl.cpp

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
715715
{
716716
Config = newConfig;
717717

718+
PQ_LOG_D("Apply new config " << Config.ShortDebugString());
719+
718720
ui32 cacheSize = CACHE_SIZE;
719721
if (Config.HasCacheSize()) {
720722
cacheSize = Config.GetCacheSize();
@@ -1630,6 +1632,32 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16301632
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
16311633
}
16321634

1635+
void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
1636+
{
1637+
Y_ABORT_UNLESS(cfg.HasVersion());
1638+
const int curConfigVersion = cfg.GetVersion();
1639+
1640+
// set rr generation for provided read rules
1641+
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1642+
for (const auto& c : Config.GetConsumers()) {
1643+
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1644+
}
1645+
1646+
for (auto& c : *cfg.MutableConsumers()) {
1647+
auto it = existed.find(c.GetName());
1648+
ui64 generation = 0;
1649+
if (it != existed.end() && it->second.first == c.GetVersion()) {
1650+
generation = it->second.second;
1651+
} else {
1652+
generation = curConfigVersion;
1653+
}
1654+
c.SetGeneration(generation);
1655+
if (ReadRuleCompatible()) {
1656+
cfg.AddReadRuleGenerations(generation);
1657+
}
1658+
}
1659+
}
1660+
16331661
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16341662
{
16351663
const auto& record = ev->GetRecord();
@@ -1642,7 +1670,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
16421670
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();
16431671

16441672
Y_ABORT_UNLESS(cfg.HasVersion());
1645-
int curConfigVersion = cfg.GetVersion();
1673+
const int curConfigVersion = cfg.GetVersion();
16461674

16471675
if (curConfigVersion == oldConfigVersion) { //already applied
16481676
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
@@ -1741,27 +1769,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
17411769

17421770
Migrate(cfg);
17431771

1744-
// set rr generation for provided read rules
1745-
{
1746-
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1747-
for (const auto& c : Config.GetConsumers()) {
1748-
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1749-
}
1750-
1751-
for (auto& c : *cfg.MutableConsumers()) {
1752-
auto it = existed.find(c.GetName());
1753-
ui64 generation = 0;
1754-
if (it != existed.end() && it->second.first == c.GetVersion()) {
1755-
generation = it->second.second;
1756-
} else {
1757-
generation = curConfigVersion;
1758-
}
1759-
c.SetGeneration(generation);
1760-
if (ReadRuleCompatible()) {
1761-
cfg.AddReadRuleGenerations(generation);
1762-
}
1763-
}
1764-
}
1772+
UpdateReadRuleGenerations(cfg);
17651773

17661774
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
17671775
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
@@ -3727,6 +3735,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37273735
tx.OnProposeTransaction(event, GetAllowedStep(),
37283736
TabletID());
37293737

3738+
if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
3739+
UpdateReadRuleGenerations(tx.TabletConfig);
3740+
}
3741+
37303742
if (tx.WriteId.Defined()) {
37313743
const TWriteId& writeId = *tx.WriteId;
37323744
Y_ABORT_UNLESS(TxWrites.contains(writeId),

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
530530

531531
bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
532532
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
533+
534+
void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
533535
};
534536

535537

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
44
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
55
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
6+
#include <ydb/core/cms/console/console.h>
67
#include <ydb/core/keyvalue/keyvalue_events.h>
78
#include <ydb/core/persqueue/key.h>
89
#include <ydb/core/persqueue/blob.h>
@@ -37,8 +38,14 @@ class TFixture : public NUnitTest::TBaseFixture {
3738
void Write(const TString& message, NTable::TTransaction* tx = nullptr);
3839
};
3940

41+
struct TFeatureFlags {
42+
bool EnablePQConfigTransactionsAtSchemeShard = true;
43+
};
44+
4045
void SetUp(NUnitTest::TTestContext&) override;
4146

47+
void NotifySchemeShard(const TFeatureFlags& flags);
48+
4249
NTable::TSession CreateTableSession();
4350
NTable::TTransaction BeginTx(NTable::TSession& session);
4451
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -62,6 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6269
std::optional<size_t> maxPartitionCount = std::nullopt);
6370
void DescribeTopic(const TString& path);
6471

72+
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
73+
6574
void WriteToTopicWithInvalidTxId(bool invalidTxId);
6675

6776
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -95,6 +104,8 @@ class TFixture : public NUnitTest::TBaseFixture {
95104
NYdb::EStatus status);
96105
void CloseTopicWriteSession(const TString& topicPath,
97106
const TString& messageGroupId);
107+
void CloseTopicReadSession(const TString& topicPath,
108+
const TString& consumerName);
98109

99110
enum EEndOfTransaction {
100111
Commit,
@@ -173,6 +184,8 @@ class TFixture : public NUnitTest::TBaseFixture {
173184
ui64 tabletId,
174185
const NPQ::TWriteId& writeId);
175186

187+
ui64 GetSchemeShardTabletId(const TActorId& actorId);
188+
176189
std::unique_ptr<TTopicSdkTestSetup> Setup;
177190
std::unique_ptr<TDriver> Driver;
178191

@@ -190,11 +203,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
190203
{
191204
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
192205
settings.SetEnableTopicServiceTx(true);
206+
193207
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
194208

195209
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
196210
}
197211

212+
void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
213+
{
214+
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
215+
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
216+
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);
217+
218+
auto& runtime = Setup->GetRuntime();
219+
auto actorId = runtime.AllocateEdgeActor();
220+
221+
ui64 ssId = GetSchemeShardTabletId(actorId);
222+
223+
runtime.SendToPipe(ssId, actorId, request.release());
224+
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
225+
}
226+
198227
NTable::TSession TFixture::CreateTableSession()
199228
{
200229
NTable::TTableClient client(GetDriver());
@@ -321,6 +350,20 @@ void TFixture::CreateTopic(const TString& path,
321350
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
322351
}
323352

353+
void TFixture::AddConsumer(const TString& path,
354+
const TVector<TString>& consumers)
355+
{
356+
NTopic::TTopicClient client(GetDriver());
357+
NTopic::TAlterTopicSettings settings;
358+
359+
for (const auto& consumer : consumers) {
360+
settings.BeginAddConsumer(consumer);
361+
}
362+
363+
auto result = client.AlterTopic(path, settings).GetValueSync();
364+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
365+
}
366+
324367
void TFixture::DescribeTopic(const TString& path)
325368
{
326369
Setup->DescribeTopic(path);
@@ -643,6 +686,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
643686
TopicWriteSessions.erase(key);
644687
}
645688

689+
void TFixture::CloseTopicReadSession(const TString& topicPath,
690+
const TString& consumerName)
691+
{
692+
Y_UNUSED(consumerName);
693+
TopicReadSessions.erase(topicPath);
694+
}
695+
646696
void TFixture::WriteToTopic(const TString& topicPath,
647697
const TString& messageGroupId,
648698
const TString& message,
@@ -761,6 +811,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
761811
UNIT_ASSERT(context.AckCount <= context.WriteCount);
762812
}
763813

814+
ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
815+
{
816+
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
817+
navigate->DatabaseName = "/Root";
818+
819+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
820+
entry.Path = SplitPath("/Root");
821+
entry.SyncVersion = true;
822+
entry.ShowPrivatePath = true;
823+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
824+
825+
navigate->ResultSet.push_back(std::move(entry));
826+
//navigate->UserToken = "root@builtin";
827+
navigate->Cookie = 12345;
828+
829+
auto& runtime = Setup->GetRuntime();
830+
831+
runtime.Send(MakeSchemeCacheID(), actorId,
832+
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
833+
0,
834+
true);
835+
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
836+
837+
UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
838+
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);
839+
840+
auto& front = response->Request->ResultSet.front();
841+
842+
return front.Self->Info.GetSchemeshardId();
843+
}
844+
764845
ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
765846
{
766847
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -1914,3 +1995,100 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
19141995
}
19151996

19161997
}
1998+
1999+
Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
2000+
{
2001+
WriteMessagesInTx(1, 0);
2002+
WriteMessagesInTx(0, 1);
2003+
}
2004+
2005+
Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
2006+
{
2007+
WriteMessagesInTx(1, 0);
2008+
WriteMessagesInTx(1, 1);
2009+
}
2010+
2011+
Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
2012+
{
2013+
WriteMessagesInTx(0, 1);
2014+
WriteMessagesInTx(1, 0);
2015+
}
2016+
2017+
Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
2018+
{
2019+
WriteMessagesInTx(0, 1);
2020+
WriteMessagesInTx(0, 1);
2021+
}
2022+
2023+
Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
2024+
{
2025+
WriteMessagesInTx(0, 1);
2026+
WriteMessagesInTx(1, 1);
2027+
}
2028+
2029+
Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
2030+
{
2031+
WriteMessagesInTx(1, 1);
2032+
WriteMessagesInTx(1, 0);
2033+
}
2034+
2035+
Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
2036+
{
2037+
WriteMessagesInTx(1, 1);
2038+
WriteMessagesInTx(0, 1);
2039+
}
2040+
2041+
Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
2042+
{
2043+
WriteMessagesInTx(1, 1);
2044+
WriteMessagesInTx(1, 1);
2045+
}
2046+
2047+
2048+
Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
2049+
{
2050+
WriteMessagesInTx(2, 202);
2051+
WriteMessagesInTx(2, 200);
2052+
WriteMessagesInTx(0, 1);
2053+
WriteMessagesInTx(4, 0);
2054+
WriteMessagesInTx(0, 1);
2055+
}
2056+
2057+
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2058+
{
2059+
// There was a server
2060+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});
2061+
2062+
// Users have created their own topic on it
2063+
CreateTopic(TEST_TOPIC);
2064+
2065+
// And they wrote their messages into it
2066+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
2067+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
2068+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");
2069+
2070+
// And he had a consumer
2071+
AddConsumer(TEST_TOPIC, {"consumer-1"});
2072+
2073+
// We read messages from the topic and committed offsets
2074+
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2075+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
2076+
CloseTopicReadSession(TEST_TOPIC, "consumer-1");
2077+
2078+
// And then the Logbroker team turned on the feature flag
2079+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});
2080+
2081+
// Users continued to write to the topic
2082+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");
2083+
2084+
// Users have added new consumers
2085+
AddConsumer(TEST_TOPIC, {"consumer-2"});
2086+
2087+
// And they wanted to continue reading their messages
2088+
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2089+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
2090+
}
2091+
2092+
}
2093+
2094+
}

0 commit comments

Comments
 (0)