Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -249,24 +248,24 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
// Only use the resources in ideal states to parse all replicas.
Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
Map<String, Resource> resourceToMonitorMap = resourceMap.entrySet().stream()
.filter(resourceName -> idealStateMap.containsKey(resourceName))
.filter(idealStateMap::containsKey)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, ResourceAssignment> currentStateAssignment =
currentStateOutput.getAssignment(resourceToMonitorMap.keySet());
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
dataProvider, resourceToMonitorMap, currentStateAssignment);

Map<String, Double> maxUsageMap = new HashMap<>();
for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
String instanceName = node.getInstanceName();
// There is no new usage adding to this node, so an empty map is passed in.
double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
Comment thread
huizhilu marked this conversation as resolved.
maxUsageMap.put(instanceName, usage);
clusterStatusMonitor
.updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
}

clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
} catch (Exception ex) {
LOG.error("Failed to report instance capacity metrics.", ex);
LOG.error("Failed to report instance capacity metrics. Exception message: {}",
ex.getMessage());
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,25 @@ public void increaseMessageReceived(List<Message> messages) {
}

/**
* Update max capacity usage for per instance. Before calling this API, we assume the instance
* monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
* max usage update will fail.
* Updates instance capacity status for per instance, including max usage and capacity of each
* capacity key. Before calling this API, we assume the instance monitors are already registered
* in ReadClusterDataStage. If the monitor is not registered, this instance capacity status update
* will fail.
*
* @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
* @param instanceName This instance name
* @param maxUsage Max capacity usage of this instance
* @param capacityMap A map of this instance capacity, {capacity key: capacity value}
*/
public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
synchronized (_instanceMonitorMap) {
for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
if (monitor == null) {
LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
entry.getKey());
continue;
}
monitor.updateMaxCapacityUsage(entry.getValue());
}
public void updateInstanceCapacityStatus(String instanceName, double maxUsage,
Comment thread
huizhilu marked this conversation as resolved.
Map<String, Integer> capacityMap) {
InstanceMonitor monitor = _instanceMonitorMap.get(instanceName);
if (monitor == null) {
LOG.warn("Failed to update instance capacity status because instance monitor is not found, "
+ "instance: {}.", instanceName);
return;
}
monitor.updateMaxCapacityUsage(maxUsage);
monitor.updateCapacity(capacityMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import javax.management.ObjectName;

Expand All @@ -41,17 +42,17 @@ public class InstanceMonitor extends DynamicMBeanProvider {
/**
* Metric names for instance capacity.
*/
public enum InstanceMonitorMetrics {
public enum InstanceMonitorMetric {
Comment thread
huizhilu marked this conversation as resolved.
// TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");

private String metricName;
private final String metricName;

InstanceMonitorMetrics(String name) {
InstanceMonitorMetric(String name) {
metricName = name;
}

Expand All @@ -75,6 +76,9 @@ public String metricName() {
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;

// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;

/**
* Initialize the bean
* @param clusterName the cluster to monitor
Expand All @@ -85,26 +89,41 @@ public InstanceMonitor(String clusterName, String participantName, ObjectName ob
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();

createMetrics();
}

private void createMetrics() {
_totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);

_disabledPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_enabledStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
_maxCapacityUsageGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
0.0d);
}

private List<DynamicMetric<?, ?>> buildAttributeList() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge
);

attributeList.addAll(_dynamicCapacityMetricsMap.values());

return attributeList;
}

@Override
public String getSensorName() {
return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
Expand Down Expand Up @@ -183,33 +202,58 @@ public synchronized void increaseMessageCount(long messageReceived) {
}

/**
* Update max capacity usage for this instance.
* Updates max capacity usage for this instance.
* @param maxUsage max capacity usage of this instance
*/
public synchronized void updateMaxCapacityUsage(double maxUsage) {
_maxCapacityUsageGauge.updateValue(maxUsage);
}

/**
* Get max capacity usage of this instance.
* Gets max capacity usage of this instance.
* @return Max capacity usage of this instance.
*/
protected synchronized double getMaxCapacityUsageGauge() {
return _maxCapacityUsageGauge.getValue();
Comment thread
huizhilu marked this conversation as resolved.
}

@Override
public DynamicMBeanProvider register()
throws JMException {
List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge
);
/**
* Updates instance capacity metrics.
* @param capacity A map of instance capacity.
*/
public void updateCapacity(Map<String, Integer> capacity) {
synchronized (_dynamicCapacityMetricsMap) {
// If capacity keys don't have any change, we just update the metric values.
if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
}
return;
}

doRegister(attributeList, _initObjectName);
// If capacity keys have any changes, we need to retain the capacity metrics.
// Make sure capacity metrics map has the same capacity keys.
// And update metrics values.
_dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
String capacityName = entry.getKey();
if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
_dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
} else {
_dynamicCapacityMetricsMap.put(capacityName,
new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
}
}
}

// Update MBean's all attributes.
updateAttributesInfo(buildAttributeList(),
"Instance monitor for instance: " + getInstanceName());
}

@Override
public DynamicMBeanProvider register() throws JMException {
doRegister(buildAttributeList(), _initObjectName);

return this;
}
Expand Down
Loading