Enterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration.
- EventListener Wrappers: Console, File, and Structured JSON logging
- Metrics Aggregation: Time-window based metric aggregation with statistics
- Prometheus Export: Export metrics in Prometheus text format
- Health Check Integration: Microsoft.Extensions.Diagnostics.HealthChecks support
- Fluent Builder API: Easy configuration with DiagnosticsBuilder
- RivuletEventSource - EventSource-based metrics
- RivuletConsoleListener - Real-time console metrics
- RivuletFileListener - File-based logging with rotation
- RivuletStructuredLogListener - JSON structured logging
- PrometheusExporter - Prometheus text format export
- RivuletHealthCheck - Health check integration
- MetricsAggregator - Statistical analysis
dotnet add package Rivulet.Diagnosticsusing Rivulet.Diagnostics;
using var listener = new RivuletConsoleListener();
await Enumerable.Range(1, 100)
.ToAsyncEnumerable()
.SelectParallelAsync(async x =>
{
await ProcessAsync(x);
return x;
}, new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10
});using var listener = new RivuletFileListener(
"metrics.log",
maxFileSizeBytes: 10 * 1024 * 1024 // 10MB
);
// Your parallel operations...using var listener = new RivuletStructuredLogListener("metrics.json");
// Or with custom action
using var listener = new RivuletStructuredLogListener(json =>
{
// Send to your logging system
logger.LogInformation(json);
});using var aggregator = new MetricsAggregator(TimeSpan.FromSeconds(10));
aggregator.OnAggregation += metrics =>
{
foreach (var metric in metrics)
{
Console.WriteLine($"{metric.DisplayName}:");
Console.WriteLine($" Min: {metric.Min:F2}");
Console.WriteLine($" Max: {metric.Max:F2}");
Console.WriteLine($" Avg: {metric.Average:F2}");
Console.WriteLine($" Current: {metric.Current:F2}");
}
};
// Your parallel operations...using var exporter = new PrometheusExporter();
// Your parallel operations...
// Export to Prometheus format
var prometheusText = exporter.Export();
await File.WriteAllTextAsync("metrics.prom", prometheusText);
// Or get as dictionary
var metrics = exporter.ExportDictionary();// In Program.cs or Startup.cs
// Step 1: Register PrometheusExporter (required dependency for health check)
builder.Services.AddSingleton<PrometheusExporter>();
// Step 2: Register health check
builder.Services.AddHealthChecks()
.AddCheck<RivuletHealthCheck>("rivulet", tags: new[] { "ready" });
// Step 3: Configure thresholds (optional)
builder.Services.Configure<RivuletHealthCheckOptions>(options =>
{
options.ErrorRateThreshold = 0.1; // 10% error rate triggers degraded status
options.FailureCountThreshold = 1000; // 1000 failures triggers unhealthy status
});
// Step 4: Add health check endpoint
app.MapHealthChecks("/health");
// Health check will return:
// - Healthy: Error rate below threshold and failures below threshold
// - Degraded: Error rate exceeds threshold
// - Unhealthy: Failure count exceeds threshold or unable to collect metricsusing var diagnostics = new DiagnosticsBuilder()
.AddConsoleListener()
.AddFileListener("metrics.log")
.AddStructuredLogListener("metrics.json")
.AddMetricsAggregator(TimeSpan.FromSeconds(10), metrics =>
{
// Handle aggregated metrics
})
.AddPrometheusExporter(out var exporter)
.Build();
// Your parallel operations...
// Export Prometheus metrics
var prometheusText = exporter.Export();Rivulet.Diagnostics exposes the following metrics from Rivulet.Core:
- items-started: Total number of items that have started processing
- items-completed: Total number of items that have completed processing
- total-retries: Total number of retry attempts
- total-failures: Total number of failed items
- throttle-events: Number of throttle events (backpressure)
- drain-events: Number of drain events
// When creating RivuletHealthCheck manually (not recommended, use DI instead)
var exporter = new PrometheusExporter();
var healthCheck = new RivuletHealthCheck(exporter, new RivuletHealthCheckOptions
{
ErrorRateThreshold = 0.05, // 5% error rate triggers degraded
FailureCountThreshold = 50 // 50 failures triggers unhealthy
});
// Recommended: Use dependency injection (shown above in Health Check Integration)RivuletHealthCheckOptions properties:
-
ErrorRateThreshold(double, default: 0.1)- Error rate (0.0 to 1.0) above which health check reports degraded status
- Calculated as: total_failures / items_started
- Example: 0.1 = 10% error rate
-
FailureCountThreshold(long, default: 1000)- Absolute failure count above which health check reports unhealthy status
- Useful for catching high-volume failure scenarios
using var console = new RivuletConsoleListener();
using var file = new RivuletFileListener("metrics.log");
using var structured = new RivuletStructuredLogListener("metrics.json");
using var aggregator = new MetricsAggregator(TimeSpan.FromSeconds(5));
// All listeners will receive metrics simultaneouslyusing var listener = new RivuletStructuredLogListener(json =>
{
telemetryClient.TrackEvent("RivuletMetrics", new Dictionary<string, string>
{
["metrics"] = json
});
});using var listener = new RivuletStructuredLogListener("metrics.json");
// Configure Filebeat to ship metrics.json to Elasticsearchusing var exporter = new PrometheusExporter();
// Expose metrics endpoint
app.MapGet("/metrics", () => exporter.Export());- Zero-cost when not listening: EventCounters have zero overhead when no listeners are attached
- Minimal allocation: Uses polling counters to avoid allocations
- Thread-safe: All listeners are thread-safe and can be used concurrently
- .NET 8.0 or .NET 9.0
- Rivulet.Core 1.2.0+
- Microsoft.Extensions.Diagnostics.HealthChecks 9.0.0+ (for health checks)
- .NET 8.0
- .NET 9.0
- GitHub Repository: https://github.com/Jeffeek/Rivulet
- Report Issues: https://github.com/Jeffeek/Rivulet/issues
- License: MIT
MIT License - see LICENSE file for details