Skip to content

Commit 97f02a0

Browse files
Demogorgon314walkinggo
authored andcommitted
[improve][broker] PIP-380: Support-setting-up-specific-namespaces-to-skipping-the-load-shedding (apache#23549)
1 parent d7ad6a2 commit 97f02a0

9 files changed

Lines changed: 201 additions & 6 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2973,6 +2973,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
29732973
)
29742974
private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false;
29752975

2976+
@FieldContext(
2977+
dynamic = true,
2978+
category = CATEGORY_LOAD_BALANCER,
2979+
doc = "The namespaces to be excluded from load shedding"
2980+
)
2981+
private Set<String> loadBalancerSheddingExcludedNamespaces = new HashSet<>();
2982+
29762983
@FieldContext(
29772984
category = CATEGORY_LOAD_BALANCER,
29782985
doc = "Time to wait before fixing any stuck in-flight service unit states. "

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
8585
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
8686
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
87+
import org.apache.pulsar.broker.loadbalance.extensions.strategy.RoundRobinBrokerSelectionStrategy;
8788
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
8889
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
8990
import org.apache.pulsar.broker.namespace.LookupOptions;
@@ -161,6 +162,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
161162
@Getter
162163
private final BrokerSelectionStrategy brokerSelectionStrategy;
163164

165+
private final BrokerSelectionStrategy sheddingExcludedNamespaceSelectionStrategy;
166+
164167
@Getter
165168
private final List<BrokerFilter> brokerFilterPipeline;
166169

@@ -254,6 +257,7 @@ public ExtensibleLoadManagerImpl() {
254257
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
255258
this.brokerFilterPipeline.add(new BrokerVersionFilter());
256259
this.brokerSelectionStrategy = createBrokerSelectionStrategy();
260+
this.sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelectionStrategy();
257261
}
258262

259263
public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
@@ -636,11 +640,33 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
636640
return Optional.empty();
637641
}
638642
Set<String> candidateBrokers = availableBrokerCandidates.keySet();
639-
return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
643+
return getBrokerSelectionStrategy(bundle).select(candidateBrokers, bundle, context);
640644
});
641645
});
642646
}
643647

648+
/**
649+
* For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership,
650+
* it can make the assignment more average because these will not automatically rebalance to
651+
* another broker unless manually unloaded it.
652+
*
653+
* @param bundle the bundle to assign
654+
* @return the broker selection strategy
655+
*/
656+
private BrokerSelectionStrategy getBrokerSelectionStrategy(ServiceUnitId bundle) {
657+
658+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
659+
660+
var namespace = NamespaceBundle.getBundleNamespace(bundle.toString());
661+
if (sheddingExcludedNamespaces.contains(namespace)) {
662+
if (debug(conf, log)) {
663+
log.info("Use round robin broker selector for {}", bundle);
664+
}
665+
return sheddingExcludedNamespaceSelectionStrategy;
666+
}
667+
return brokerSelectionStrategy;
668+
}
669+
644670
@Override
645671
public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
646672
return getOwnershipAsync(topic, bundleUnit)

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.Set;
2526
import lombok.EqualsAndHashCode;
2627
import lombok.Getter;
2728
import lombok.ToString;
@@ -68,8 +69,10 @@ public TopKBundles(PulsarService pulsar) {
6869
public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
6970
arr.clear();
7071
try {
72+
var conf = pulsar.getConfiguration();
7173
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
72-
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
74+
conf.isLoadBalancerSheddingBundlesWithPoliciesEnabled();
75+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
7376
for (var etr : bundleStats.entrySet()) {
7477
String bundle = etr.getKey();
7578
var stat = etr.getValue();
@@ -79,12 +82,16 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
7982
continue;
8083
}
8184
// TODO: do not filter system topic while shedding
82-
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
85+
String namespace = NamespaceBundle.getBundleNamespace(bundle);
86+
if (NamespaceService.isSystemServiceNamespace(namespace)) {
8387
continue;
8488
}
8589
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
8690
continue;
8791
}
92+
if (sheddingExcludedNamespaces.contains(namespace)) {
93+
continue;
94+
}
8895
arr.add(etr);
8996
}
9097
var topKBundlesLoadData = loadData.getTopBundlesLoadData();

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
493493
}
494494

495495
int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
496+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
496497
for (var e : maxBrokerTopBundlesLoadData) {
497498
String bundle = e.bundleName();
498499
if (channel != null && !channel.isOwner(bundle, maxBroker)) {
@@ -502,6 +503,14 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
502503
}
503504
continue;
504505
}
506+
final String namespaceName = NamespaceBundle.getBundleNamespace(bundle);
507+
if (sheddingExcludedNamespaces.contains(namespaceName)) {
508+
if (debugMode) {
509+
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
510+
+ " Bundle namespace has been found in sheddingExcludedNamespaces", bundle));
511+
}
512+
continue;
513+
}
505514
if (recentlyUnloadedBundles.containsKey(bundle)) {
506515
if (debugMode) {
507516
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.strategy;
20+
21+
import java.util.Optional;
22+
import java.util.Set;
23+
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
24+
import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
25+
import org.apache.pulsar.common.naming.ServiceUnitId;
26+
27+
/**
28+
* Simple Round Robin Broker Selection Strategy.
29+
*/
30+
public class RoundRobinBrokerSelectionStrategy implements BrokerSelectionStrategy {
31+
private final RoundRobinBrokerSelector selector = new RoundRobinBrokerSelector();
32+
33+
@Override
34+
public Optional<String> select(Set<String> brokers, ServiceUnitId bundle, LoadManagerContext context) {
35+
return selector.selectBroker(brokers, null, null, context.brokerConfiguration());
36+
}
37+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
151151
// Strategy used to determine where new topics should be placed.
152152
private ModularLoadManagerStrategy placementStrategy;
153153

154+
private ModularLoadManagerStrategy sheddingExcludedNamespaceSelectionStrategy;
155+
154156
// Policies used to determine which brokers are available for particular namespaces.
155157
private SimpleResourceAllocationPolicies policies;
156158

@@ -251,6 +253,7 @@ public void initialize(final PulsarService pulsar) {
251253
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
252254

253255
placementStrategy = ModularLoadManagerStrategy.create(conf);
256+
sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelector();
254257
policies = new SimpleResourceAllocationPolicies(pulsar);
255258
filterPipeline.add(new BrokerLoadManagerClassFilter());
256259
filterPipeline.add(new BrokerVersionFilter());
@@ -630,13 +633,21 @@ public synchronized void doLoadShedding() {
630633
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
631634
recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);
632635

636+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
633637
final Multimap<String, String> bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
634638

635639
bundlesToUnload.asMap().forEach((broker, bundles) -> {
636640
AtomicBoolean unloadBundleForBroker = new AtomicBoolean(false);
637641
bundles.forEach(bundle -> {
638642
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
639643
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
644+
if (sheddingExcludedNamespaces.contains(namespaceName)) {
645+
if (log.isDebugEnabled()) {
646+
log.debug("[{}] Skipping load shedding for namespace {}",
647+
loadSheddingStrategy.getClass().getSimpleName(), namespaceName);
648+
}
649+
return;
650+
}
640651
if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) {
641652
return;
642653
}
@@ -914,8 +925,22 @@ Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
914925
brokerTopicLoadingPredicate);
915926
}
916927

917-
// Choose a broker among the potentially smaller filtered list, when possible
918-
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
928+
Optional<String> broker;
929+
// For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership,
930+
// it can make the assignment more average because these will not automatically rebalance to
931+
// another broker unless manually unloaded it.
932+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
933+
String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
934+
if (sheddingExcludedNamespaces.contains(namespaceNameFromBundleName)) {
935+
if (log.isDebugEnabled()) {
936+
log.debug("Use round robin broker selector for {}", bundle);
937+
}
938+
broker = sheddingExcludedNamespaceSelectionStrategy
939+
.selectBroker(brokerCandidateCache, data, loadData, conf);
940+
} else {
941+
// Choose a broker among the potentially smaller filtered list, when possible
942+
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
943+
}
919944
if (log.isDebugEnabled()) {
920945
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
921946
}
@@ -1122,7 +1147,15 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
11221147
*/
11231148
private int selectTopKBundle() {
11241149
bundleArr.clear();
1125-
bundleArr.addAll(loadData.getBundleData().entrySet());
1150+
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
1151+
for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
1152+
String bundle = entry.getKey();
1153+
String namespace = NamespaceBundle.getBundleNamespace(bundle);
1154+
if (sheddingExcludedNamespaces.contains(namespace)) {
1155+
continue;
1156+
}
1157+
bundleArr.add(entry);
1158+
}
11261159

11271160
int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
11281161
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,41 @@ public void testAssign() throws Exception {
205205
assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl());
206206
}
207207

208+
// Test that the load manager will use round-robin assignment
209+
// if the namespace is in loadBalancerSheddingExcludedNamespaces.
210+
@Test
211+
public void testSelectBrokerForSheddingExcludedNamespaces() throws Exception {
212+
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of(defaultTestNamespace));
213+
try {
214+
Pair<TopicName, NamespaceBundle> topicAndBundle =
215+
getBundleIsNotOwnByChangeEventTopic("test-topic" + UUID.randomUUID());
216+
NamespaceBundle bundle1 = topicAndBundle.getRight();
217+
Optional<BrokerLookupData> brokerLookupData1 = primaryLoadManager.assign(Optional.empty(), bundle1,
218+
LookupOptions.builder().build()).get();
219+
assertTrue(brokerLookupData1.isPresent());
220+
log.info("Assign the bundle1 {} to {}", bundle1, brokerLookupData1);
221+
222+
String webServiceUrl1 = brokerLookupData1.get().getWebServiceUrl();
223+
224+
Pair<TopicName, NamespaceBundle> topicAndBundle2 =
225+
getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());
226+
227+
while (topicAndBundle2.getRight().toString().equals(topicAndBundle.getRight().toString())
228+
|| primaryLoadManager.checkOwnershipAsync(Optional.empty(), topicAndBundle2.getRight()).get()) {
229+
topicAndBundle2 = getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());
230+
}
231+
NamespaceBundle bundle2 = topicAndBundle2.getRight();
232+
Optional<BrokerLookupData> brokerLookupData2 = primaryLoadManager.assign(Optional.empty(), bundle2,
233+
LookupOptions.builder().build()).get();
234+
assertTrue(brokerLookupData2.isPresent());
235+
log.info("Assign the bundle2 {} to {}", bundle2, brokerLookupData2);
236+
String webServiceUrl2 = brokerLookupData2.get().getWebServiceUrl();
237+
assertNotEquals(webServiceUrl1, webServiceUrl2);
238+
} finally {
239+
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of());
240+
}
241+
}
242+
208243
@Test
209244
public void testLookupOptions() throws Exception {
210245
Pair<TopicName, NamespaceBundle> topicAndBundle =

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.Optional;
3333
import java.util.Random;
34+
import java.util.Set;
3435
import org.apache.pulsar.broker.PulsarService;
3536
import org.apache.pulsar.broker.ServiceConfiguration;
3637
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
@@ -136,6 +137,27 @@ public void testSystemNamespace() {
136137
assertEquals(top0.bundleName(), bundle1);
137138
}
138139

140+
@Test
141+
public void testSheddingExcludedNamespaces() {
142+
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
143+
var topKBundles = new TopKBundles(pulsar);
144+
pulsar.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of("my-tenant/my-namespace2"));
145+
NamespaceBundleStats stats1 = new NamespaceBundleStats();
146+
stats1.msgRateIn = 500;
147+
bundleStats.put("my-tenant/my-namespace2/0x00000000_0x0FFFFFFF", stats1);
148+
149+
NamespaceBundleStats stats2 = new NamespaceBundleStats();
150+
stats2.msgRateIn = 10000;
151+
stats2.msgThroughputOut = 10;
152+
bundleStats.put(bundle1, stats2);
153+
154+
topKBundles.update(bundleStats, 2);
155+
156+
assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 1);
157+
var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0);
158+
assertEquals(top0.bundleName(), bundle1);
159+
}
160+
139161
@Test
140162
public void testZeroMsgThroughputBundleStats() {
141163
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,25 @@ public void testRecentlyUnloadedBundles() {
615615
assertEquals(counter.getLoadStd(), setupLoadStd);
616616
}
617617

618+
@Test
619+
public void testSheddingExcludedNamespaces() {
620+
UnloadCounter counter = new UnloadCounter();
621+
TransferShedder transferShedder = new TransferShedder(counter);
622+
var ctx = setupContext();
623+
ctx.brokerConfiguration().setLoadBalancerSheddingExcludedNamespaces(
624+
Set.of("my-tenant/my-namespaceE", "my-tenant/my-namespaceD"));
625+
626+
var res = transferShedder.findBundlesForUnloading(ctx, new HashMap<>(), Map.of());
627+
var expected = new HashSet<UnloadDecision>();
628+
expected.add(new UnloadDecision(new Unload("broker3:8080",
629+
"my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
630+
Optional.of("broker1:8080")),
631+
Success, Overloaded));
632+
assertEquals(res, expected);
633+
assertEquals(counter.getLoadAvg(), setupLoadAvg);
634+
assertEquals(counter.getLoadStd(), setupLoadStd);
635+
}
636+
618637
@Test
619638
public void testGetAvailableBrokersFailed() {
620639
UnloadCounter counter = new UnloadCounter();

0 commit comments

Comments
 (0)