Skip to content

Commit 02597a4

Browse files
authored
Topics alter fix to 24-3-9-hotfix (#9753)
1 parent 614aeb3 commit 02597a4

5 files changed

Lines changed: 198 additions & 14 deletions

File tree

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
727727
}
728728
}
729729

730+
Y_UNIT_TEST(ControlPlane_BackCompatibility) {
731+
auto topicName = "back-compatibility-test";
732+
733+
TTopicSdkTestSetup setup = CreateSetup();
734+
TTopicClient client = setup.MakeClient();
735+
736+
{
737+
TCreateTopicSettings createSettings;
738+
createSettings
739+
.BeginConfigurePartitioningSettings()
740+
.MinActivePartitions(3)
741+
.EndConfigurePartitioningSettings();
742+
client.CreateTopic(topicName, createSettings).Wait();
743+
}
744+
745+
{
746+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
747+
748+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3);
749+
}
750+
751+
{
752+
TAlterTopicSettings alterSettings;
753+
alterSettings
754+
.BeginAlterPartitioningSettings()
755+
.MinActivePartitions(5)
756+
.EndAlterTopicPartitioningSettings();
757+
client.AlterTopic(topicName, alterSettings).Wait();
758+
}
759+
760+
{
761+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
762+
763+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5);
764+
}
765+
}
766+
730767
Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) {
731768
auto topicName = "autoscalit-topic";
732769

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -430,14 +430,7 @@ namespace NKikimr::NDataStreams::V1 {
430430
Y_UNUSED(selfInfo);
431431

432432
TString error;
433-
if (!GetProtoRequest()->has_partitioning_settings()) {
434-
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
435-
{
436-
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
437-
}
438433

439-
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
440-
}
441434
switch (GetProtoRequest()->retention_case()) {
442435
case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours:
443436
groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds(
@@ -479,7 +472,19 @@ namespace NKikimr::NDataStreams::V1 {
479472
}
480473
}
481474

482-
if (GetProtoRequest()->has_partitioning_settings()) {
475+
if (!GetProtoRequest()->has_partitioning_settings() ||
476+
(GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() &&
477+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) ||
478+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) {
479+
480+
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
481+
{
482+
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
483+
}
484+
485+
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
486+
487+
} else {
483488
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
484489
if (!r.empty()) {
485490
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);

ydb/services/datastreams/datastreams_ut.cpp

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2706,7 +2706,6 @@ Y_UNIT_TEST_SUITE(DataStreams) {
27062706

27072707
TString streamName = "test-topic";
27082708
TString streamName2 = "test-topic-2";
2709-
27102709
{
27112710
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
27122711
auto settings = NYdb::NTopic::TCreateTopicSettings()
@@ -2792,6 +2791,127 @@ Y_UNIT_TEST_SUITE(DataStreams) {
27922791
UNIT_ASSERT_VALUES_EQUAL(description.shards(4).parent_shard_id(), "shard-000001");
27932792
}
27942793

2794+
auto streamForAlterTest = "stream-alter-test";
2795+
{
2796+
auto result = testServer.DataStreamsClient->CreateStream(streamForAlterTest,
2797+
NYDS_V1::TCreateStreamSettings()
2798+
.ShardCount(3)
2799+
).ExtractValueSync();
2800+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2801+
if (result.GetStatus() != EStatus::SUCCESS) {
2802+
result.GetIssues().PrintTo(Cerr);
2803+
}
2804+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2805+
}
2806+
2807+
{
2808+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2809+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2810+
Cerr << result.GetIssues().ToString() << "\n";
2811+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2812+
2813+
auto& d = result.GetResult().stream_description();
2814+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 3);
2815+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2816+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2817+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2818+
2819+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2820+
}
2821+
2822+
{
2823+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2824+
NYDS_V1::TUpdateStreamSettings()
2825+
.TargetShardCount(5)
2826+
.BeginConfigurePartitioningSettings()
2827+
.BeginConfigureAutoPartitioningSettings()
2828+
.Strategy(NYdb::NDataStreams::V1::EAutoPartitioningStrategy::Disabled)
2829+
.EndConfigureAutoPartitioningSettings()
2830+
.EndConfigurePartitioningSettings()
2831+
).ExtractValueSync();
2832+
2833+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2834+
if (result.GetStatus() != EStatus::SUCCESS) {
2835+
result.GetIssues().PrintTo(Cerr);
2836+
}
2837+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2838+
}
2839+
2840+
{
2841+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2842+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2843+
Cerr << result.GetIssues().ToString() << "\n";
2844+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2845+
2846+
auto& d = result.GetResult().stream_description();
2847+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 5);
2848+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2849+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2850+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2851+
2852+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2853+
}
2854+
2855+
{
2856+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2857+
NYDS_V1::TUpdateStreamSettings()
2858+
.TargetShardCount(10)
2859+
).ExtractValueSync();
2860+
2861+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2862+
if (result.GetStatus() != EStatus::SUCCESS) {
2863+
result.GetIssues().PrintTo(Cerr);
2864+
}
2865+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2866+
}
2867+
2868+
{
2869+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2870+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2871+
Cerr << result.GetIssues().ToString() << "\n";
2872+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2873+
2874+
auto& d = result.GetResult().stream_description();
2875+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10);
2876+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2877+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2878+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2879+
2880+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2881+
}
2882+
2883+
{
2884+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2885+
NYDS_V1::TUpdateStreamSettings()
2886+
.TargetShardCount(15)
2887+
.BeginConfigurePartitioningSettings()
2888+
.BeginConfigureAutoPartitioningSettings()
2889+
.EndConfigureAutoPartitioningSettings()
2890+
.EndConfigurePartitioningSettings()
2891+
).ExtractValueSync();
2892+
2893+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2894+
if (result.GetStatus() != EStatus::SUCCESS) {
2895+
result.GetIssues().PrintTo(Cerr);
2896+
}
2897+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2898+
}
2899+
2900+
{
2901+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2902+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2903+
Cerr << result.GetIssues().ToString() << "\n";
2904+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2905+
2906+
auto& d = result.GetResult().stream_description();
2907+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15);
2908+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2909+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2910+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2911+
2912+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2913+
}
2914+
27952915
{
27962916
auto result = testServer.DataStreamsClient->CreateStream(streamName2,
27972917
NYDS_V1::TCreateStreamSettings()

ydb/services/lib/actors/pq_schema_actor.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,29 @@ namespace NKikimr::NGRpcProxy::V1 {
12551255
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
12561256
NPQ::Migrate(*pqTabletConfig);
12571257
auto partConfig = pqTabletConfig->MutablePartitionConfig();
1258-
auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge();
1258+
1259+
auto needHandleAutoPartitioning = false;
1260+
if (appData->FeatureFlags.GetEnableTopicSplitMerge()) {
1261+
1262+
auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() &&
1263+
request.alter_partitioning_settings().has_alter_auto_partitioning_settings() &&
1264+
request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy();
1265+
1266+
auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() &&
1267+
pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() &&
1268+
pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType();
1269+
1270+
if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) {
1271+
needHandleAutoPartitioning = true;
1272+
} else if (reqHasAutoPartitioningStrategyChange) {
1273+
auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy();
1274+
needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED ||
1275+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP ||
1276+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
1277+
}
1278+
1279+
}
1280+
12591281

12601282
if (request.has_set_retention_storage_mb()) {
12611283
CHECK_CDC;
@@ -1269,12 +1291,12 @@ namespace NKikimr::NGRpcProxy::V1 {
12691291
if (settings.has_set_min_active_partitions()) {
12701292
auto minParts = IfEqualThenDefault<i64>(settings.set_min_active_partitions(), 0L, 1L);
12711293
pqDescr.SetTotalGroupCount(minParts);
1272-
if (splitMergeFeatureEnabled) {
1294+
if (needHandleAutoPartitioning) {
12731295
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
12741296
}
12751297
}
12761298

1277-
if (splitMergeFeatureEnabled) {
1299+
if (needHandleAutoPartitioning) {
12781300
if (settings.has_set_max_active_partitions()) {
12791301
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
12801302
}
@@ -1310,7 +1332,7 @@ namespace NKikimr::NGRpcProxy::V1 {
13101332
}
13111333
}
13121334

1313-
if (splitMergeFeatureEnabled) {
1335+
if (needHandleAutoPartitioning) {
13141336
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
13151337
if (code) return code->YdbCode;
13161338
}

ydb/services/persqueue_v1/actors/schema_actors.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
10941094
}
10951095

10961096
const auto &config = pqDescr.GetPQTabletConfig();
1097-
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) {
1097+
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
10981098
Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
10991099
} else {
11001100
Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());

0 commit comments

Comments
 (0)