Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ private void checkIfStarted() {
public boolean allEventSourcesAreHealthy() {
checkIfStarted();
return registeredControllers.stream()
.filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty())
.findFirst()
.isEmpty();
.noneMatch(rc -> rc.getControllerHealthInfo().hasUnhealthyEventSources());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package io.javaoperatorsdk.operator.health;

import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
Expand All @@ -25,30 +28,49 @@
@SuppressWarnings("rawtypes")
public class ControllerHealthInfo {

private static final Predicate<EventSource> UNHEALTHY = e -> e.getStatus() == Status.UNHEALTHY;
private static final Predicate<EventSource> INFORMER =
e -> e instanceof InformerWrappingEventSourceHealthIndicator;
private static final Predicate<EventSource> UNHEALTHY_INFORMER =
e -> INFORMER.test(e) && e.getStatus() == Status.UNHEALTHY;
private static final Collector<EventSource, ?, Map<String, EventSourceHealthIndicator>>
NAME_TO_ES_MAP = Collectors.toMap(EventSource::name, e -> e);
private static final Collector<
EventSource, ?, Map<String, InformerWrappingEventSourceHealthIndicator>>
NAME_TO_ES_HEALTH_MAP =
Collectors.toMap(EventSource::name, e -> (InformerWrappingEventSourceHealthIndicator) e);
private final EventSourceManager<?> eventSourceManager;

public ControllerHealthInfo(EventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

public Map<String, EventSourceHealthIndicator> eventSourceHealthIndicators() {
return eventSourceManager.allEventSources().stream()
.collect(Collectors.toMap(EventSource::name, e -> e));
return eventSourceManager.allEventSourcesStream().collect(NAME_TO_ES_MAP);
}

/**
* Whether the associated {@link io.javaoperatorsdk.operator.processing.Controller} has unhealthy
* event sources.
*
* @return {@code true} if any of the associated controller is unhealthy, {@code false} otherwise
* @since 5.3.0
*/
public boolean hasUnhealthyEventSources() {
return filteredEventSources(UNHEALTHY).findAny().isPresent();
}

public Map<String, EventSourceHealthIndicator> unhealthyEventSources() {
return eventSourceManager.allEventSources().stream()
.filter(e -> e.getStatus() == Status.UNHEALTHY)
.collect(Collectors.toMap(EventSource::name, e -> e));
return filteredEventSources(UNHEALTHY).collect(NAME_TO_ES_MAP);
}

private Stream<EventSource> filteredEventSources(Predicate<EventSource> filter) {
return eventSourceManager.allEventSourcesStream().filter(filter);
}

public Map<String, InformerWrappingEventSourceHealthIndicator>
informerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().stream()
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
.collect(
Collectors.toMap(
EventSource::name, e -> (InformerWrappingEventSourceHealthIndicator) e));
return filteredEventSources(INFORMER).collect(NAME_TO_ES_HEALTH_MAP);
}

/**
Expand All @@ -58,11 +80,6 @@ public Map<String, EventSourceHealthIndicator> unhealthyEventSources() {
*/
public Map<String, InformerWrappingEventSourceHealthIndicator>
unhealthyInformerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().stream()
.filter(e -> e.getStatus() == Status.UNHEALTHY)
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
.collect(
Collectors.toMap(
EventSource::name, e -> (InformerWrappingEventSourceHealthIndicator) e));
return filteredEventSources(UNHEALTHY_INFORMER).collect(NAME_TO_ES_HEALTH_MAP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,5 @@ public interface InformerHealthIndicator extends EventSourceHealthIndicator {

boolean isRunning();

@Override
Status getStatus();

String getTargetNamespace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,12 @@ public Set<EventSource<?, P>> getRegisteredEventSources() {

@SuppressWarnings("rawtypes")
public List<EventSource> allEventSources() {
return eventSources.allEventSources().toList();
return allEventSourcesStream().toList();
}

@SuppressWarnings("rawtypes")
public Stream<EventSource> allEventSourcesStream() {
return eventSources.allEventSources();
}

@SuppressWarnings("unused")
Expand Down