Skip to content

Commit feb2562

Browse files
huizhilujiajunwang
authored andcommitted
Add instance capacity gauge (#557)
We need to monitor instance utilization in purpose of understanding what the instance capacity is. Change list: - Change instance monitor to update capacity - Change getAttribute to throw AttributeNotFoundException in DynamicMBeanProvider - Combine max usage and instance capacity update into one method in cluster status monitor - Add unit test
1 parent a1557df commit feb2562

7 files changed

Lines changed: 224 additions & 103 deletions

File tree

helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.util.Collection;
2323
import java.util.Collections;
24-
import java.util.HashMap;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.stream.Collectors;
@@ -249,24 +248,24 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
249248
// Only use the resources in ideal states to parse all replicas.
250249
Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
251250
Map<String, Resource> resourceToMonitorMap = resourceMap.entrySet().stream()
252-
.filter(resourceName -> idealStateMap.containsKey(resourceName))
251+
.filter(idealStateMap::containsKey)
253252
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
254253

255254
Map<String, ResourceAssignment> currentStateAssignment =
256255
currentStateOutput.getAssignment(resourceToMonitorMap.keySet());
257256
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
258257
dataProvider, resourceToMonitorMap, currentStateAssignment);
259258

260-
Map<String, Double> maxUsageMap = new HashMap<>();
261259
for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
262260
String instanceName = node.getInstanceName();
261+
// There is no new usage adding to this node, so an empty map is passed in.
263262
double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
264-
maxUsageMap.put(instanceName, usage);
263+
clusterStatusMonitor
264+
.updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
265265
}
266-
267-
clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
268266
} catch (Exception ex) {
269-
LOG.error("Failed to report instance capacity metrics.", ex);
267+
LOG.error("Failed to report instance capacity metrics. Exception message: {}",
268+
ex.getMessage());
270269
}
271270

272271
return null;

helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -367,24 +367,25 @@ public void increaseMessageReceived(List<Message> messages) {
367367
}
368368

369369
/**
370-
* Update max capacity usage for per instance. Before calling this API, we assume the instance
371-
* monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
372-
* max usage update will fail.
370+
* Updates instance capacity status for per instance, including max usage and capacity of each
371+
* capacity key. Before calling this API, we assume the instance monitors are already registered
372+
* in ReadClusterDataStage. If the monitor is not registered, this instance capacity status update
373+
* will fail.
373374
*
374-
* @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
375+
* @param instanceName This instance name
376+
* @param maxUsage Max capacity usage of this instance
377+
* @param capacityMap A map of this instance capacity, {capacity key: capacity value}
375378
*/
376-
public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
377-
synchronized (_instanceMonitorMap) {
378-
for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
379-
InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
380-
if (monitor == null) {
381-
LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
382-
entry.getKey());
383-
continue;
384-
}
385-
monitor.updateMaxCapacityUsage(entry.getValue());
386-
}
379+
public void updateInstanceCapacityStatus(String instanceName, double maxUsage,
380+
Map<String, Integer> capacityMap) {
381+
InstanceMonitor monitor = _instanceMonitorMap.get(instanceName);
382+
if (monitor == null) {
383+
LOG.warn("Failed to update instance capacity status because instance monitor is not found, "
384+
+ "instance: {}.", instanceName);
385+
return;
387386
}
387+
monitor.updateMaxCapacityUsage(maxUsage);
388+
monitor.updateCapacity(capacityMap);
388389
}
389390

390391
/**

helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import java.util.concurrent.ConcurrentHashMap;
2627
import javax.management.JMException;
2728
import javax.management.ObjectName;
2829

@@ -41,17 +42,17 @@ public class InstanceMonitor extends DynamicMBeanProvider {
4142
/**
4243
* Metric names for instance capacity.
4344
*/
44-
public enum InstanceMonitorMetrics {
45+
public enum InstanceMonitorMetric {
4546
// TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
4647
TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
4748
ENABLED_STATUS_GAUGE("Enabled"),
4849
ONLINE_STATUS_GAUGE("Online"),
4950
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
5051
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
5152

52-
private String metricName;
53+
private final String metricName;
5354

54-
InstanceMonitorMetrics(String name) {
55+
InstanceMonitorMetric(String name) {
5556
metricName = name;
5657
}
5758

@@ -75,6 +76,9 @@ public String metricName() {
7576
private SimpleDynamicMetric<Long> _onlineStatusGauge;
7677
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
7778

79+
// A map of dynamic capacity Gauges. The map's keys could change.
80+
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
81+
7882
/**
7983
* Initialize the bean
8084
* @param clusterName the cluster to monitor
@@ -85,26 +89,41 @@ public InstanceMonitor(String clusterName, String participantName, ObjectName ob
8589
_participantName = participantName;
8690
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
8791
_initObjectName = objectName;
92+
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
8893

8994
createMetrics();
9095
}
9196

9297
private void createMetrics() {
9398
_totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
94-
InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
99+
InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
95100

96101
_disabledPartitionsGauge =
97-
new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
102+
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
98103
0L);
99104
_enabledStatusGauge =
100-
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
105+
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
101106
_onlineStatusGauge =
102-
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
107+
new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
103108
_maxCapacityUsageGauge =
104-
new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
109+
new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
105110
0.0d);
106111
}
107112

113+
private List<DynamicMetric<?, ?>> buildAttributeList() {
114+
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
115+
_totalMessagedReceivedCounter,
116+
_disabledPartitionsGauge,
117+
_enabledStatusGauge,
118+
_onlineStatusGauge,
119+
_maxCapacityUsageGauge
120+
);
121+
122+
attributeList.addAll(_dynamicCapacityMetricsMap.values());
123+
124+
return attributeList;
125+
}
126+
108127
@Override
109128
public String getSensorName() {
110129
return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
@@ -183,33 +202,58 @@ public synchronized void increaseMessageCount(long messageReceived) {
183202
}
184203

185204
/**
186-
* Update max capacity usage for this instance.
205+
* Updates max capacity usage for this instance.
187206
* @param maxUsage max capacity usage of this instance
188207
*/
189208
public synchronized void updateMaxCapacityUsage(double maxUsage) {
190209
_maxCapacityUsageGauge.updateValue(maxUsage);
191210
}
192211

193212
/**
194-
* Get max capacity usage of this instance.
213+
* Gets max capacity usage of this instance.
195214
* @return Max capacity usage of this instance.
196215
*/
197216
protected synchronized double getMaxCapacityUsageGauge() {
198217
return _maxCapacityUsageGauge.getValue();
199218
}
200219

201-
@Override
202-
public DynamicMBeanProvider register()
203-
throws JMException {
204-
List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
205-
_totalMessagedReceivedCounter,
206-
_disabledPartitionsGauge,
207-
_enabledStatusGauge,
208-
_onlineStatusGauge,
209-
_maxCapacityUsageGauge
210-
);
220+
/**
221+
* Updates instance capacity metrics.
222+
* @param capacity A map of instance capacity.
223+
*/
224+
public void updateCapacity(Map<String, Integer> capacity) {
225+
synchronized (_dynamicCapacityMetricsMap) {
226+
// If capacity keys don't have any change, we just update the metric values.
227+
if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
228+
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
229+
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
230+
}
231+
return;
232+
}
211233

212-
doRegister(attributeList, _initObjectName);
234+
// If capacity keys have any changes, we need to retain the capacity metrics.
235+
// Make sure capacity metrics map has the same capacity keys.
236+
// And update metrics values.
237+
_dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
238+
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
239+
String capacityName = entry.getKey();
240+
if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
241+
_dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
242+
} else {
243+
_dynamicCapacityMetricsMap.put(capacityName,
244+
new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
245+
}
246+
}
247+
}
248+
249+
// Update MBean's all attributes.
250+
updateAttributesInfo(buildAttributeList(),
251+
"Instance monitor for instance: " + getInstanceName());
252+
}
253+
254+
@Override
255+
public DynamicMBeanProvider register() throws JMException {
256+
doRegister(buildAttributeList(), _initObjectName);
213257

214258
return this;
215259
}

0 commit comments

Comments
 (0)