Safe, async-first parallel operators with bounded concurrency, retries, cancellation, and streaming backpressure for I/O-heavy workloads.
β Rivulet.Core
Core parallel processing operators with bounded concurrency, retry policies, and error handling
Key Features:
- Bounded concurrency control
- Retry policies with exponential backoff
- Circuit breaker pattern
- Error handling modes (StopOnFirstError, CollectAndContinue)
- Ordered and unordered output
Production-ready observability with EventSource metrics, structured logging, and health checks
Key Features:
- EventSource-based metrics (ETW, EventPipe)
- Multiple export formats
- Health monitoring
- Throughput and error rate tracking
- Zero allocation in hot paths
OpenTelemetry integration for distributed tracing and W3C Trace Context propagation
Key Features:
- W3C Trace Context propagation
- OpenTelemetry Metrics and Traces
- Correlation across distributed systems
- Integration with Jaeger/Zipkin/OTLP exporters
ASP.NET Core integration with background services, dependency injection, and configuration binding
Key Features:
- Dependency injection integration
- Configuration binding (appsettings.json)
- Background services
- Health checks
- Graceful shutdown support
Testing utilities for deterministic tests with time control, chaos injection, and concurrency verification
Key Features:
- Fast deterministic tests
- Fault injection testing
- Concurrency verification
- No actual delays needed
- Integration with xUnit/NUnit/MSTest
β Rivulet.Http
Parallel HTTP operations with HttpClientFactory integration and connection pooling awareness
Key Features:
- HttpClientFactory integration
- Connection pooling awareness
- Transient error handling (timeouts, 5xx responses)
- Bounded concurrency to avoid overwhelming servers
- Progress reporting for downloads
β Rivulet.IO
Parallel file operations with safe directory processing and file transformations
Key Features:
- Safe concurrent file access
- Directory tree processing
- File pattern matching (glob patterns)
- Progress reporting
- Atomic write operations
β Rivulet.Sql
Provider-agnostic parallel SQL operations with connection pooling awareness
Key Features:
- Works with any ADO.NET provider
- Connection pooling awareness
- Transaction support
- Parameterized queries
- Respects database connection pool limits
SQL Server optimizations with SqlBulkCopy integration (10-100x faster bulk inserts)
Key Features:
- SqlBulkCopy integration (10-100x faster)
- Batch size optimization
- Table-valued parameters
- Progress reporting
- Automatic table creation
PostgreSQL optimizations with COPY command integration (10-100x faster bulk operations)
Key Features:
- COPY command integration (10-100x faster)
- Binary and text format support
- Progress reporting
- Automatic table creation
MySQL optimizations with LOAD DATA INFILE integration using MySqlBulkLoader
Key Features:
- MySqlBulkLoader integration (10-100x faster)
- Local and remote file loading
- Progress reporting
- Automatic table creation
Polly v8 integration with hedging, result-based retry, and resilience pipeline composition
Key Features:
- Polly v8 ResiliencePipeline integration
- Hedging pattern support
- Result-based retry policies
- Policy composition
- Fallback strategies
π§ Rivulet.Csv
Parallel CSV parsing and writing for Rivulet with CsvHelper integration, bounded concurrency, and batching support for high-throughput data processing
Key Features:
- CsvHelper integration for robust CSV parsing
- Multi-type operations (2-5 generic type parameters)
- Memory-efficient streaming with IAsyncEnumerable
- Per-file CSV configuration (delimiters, culture, class maps)
- Progress tracking and lifecycle callbacks
- Error handling modes (FailFast, CollectAndContinue, BestEffort)
- Circuit breaker and retry support
- Ordered and unordered output options
π§ Rivulet.Pipeline
Multi-stage pipeline composition for Rivulet with fluent API, per-stage concurrency, backpressure management between stages, and streaming support
Key Features:
- Fluent builder API with type-safe stage chaining
- Per-stage concurrency configuration via StageOptions
- Backpressure management using System.Threading.Channels
- Reuses Core components (TokenBucket, ParallelOptionsRivulet)
- Pipeline lifecycle callbacks (start, complete, stage events)
- Per-stage metrics tracking (items in/out, timing)
- Retry policies, circuit breaker, and error modes per stage
- Cancellation support propagated through all stages
- Streaming execution with IAsyncEnumerable
- Async-first (
ValueTask), works withIEnumerable<T>andIAsyncEnumerable<T> - Bounded concurrency with backpressure (Channels)
- Retry policy with transient detection and configurable backoff strategies (Exponential, ExponentialJitter, DecorrelatedJitter, Linear, LinearJitter)
- Per-item timeouts, cancellation, lifecycle hooks
- Flexible error modes: FailFast, CollectAndContinue, BestEffort
- Ordered output mode for sequence-sensitive operations
var results = await urls.SelectParallelAsync(
async (url, ct) =>
{
using var resp = await http.GetAsync(url, ct);
resp.EnsureSuccessStatusCode();
return (url, (int)resp.StatusCode);
},
new ParallelOptionsRivulet {
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException or TaskCanceledException,
ErrorMode = ErrorMode.CollectAndContinue
});await foreach (var r in source.SelectParallelStreamAsync(
async (x, ct) => await ComputeAsync(x, ct),
new ParallelOptionsRivulet { MaxDegreeOfParallelism = 16 }))
{
// consume incrementally
}Maintain input order when sequence matters:
// Results returned in same order as input, despite parallel processing
var results = await items.SelectParallelAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
OrderedOutput = true // Ensures results match input order
});
// Streaming with ordered output
await foreach (var result in source.SelectParallelStreamAsync(
async (x, ct) => await TransformAsync(x, ct),
new ParallelOptionsRivulet { OrderedOutput = true }))
{
// Results arrive in input order
}Choose from multiple backoff strategies to optimize retry behavior:
// Exponential backoff with jitter - recommended for rate-limited APIs
// Reduces thundering herd by randomizing retry delays
var results = await requests.SelectParallelAsync(
async (req, ct) => await apiClient.SendAsync(req, ct),
new ParallelOptionsRivulet
{
MaxRetries = 4,
BaseDelay = TimeSpan.FromMilliseconds(100),
BackoffStrategy = BackoffStrategy.ExponentialJitter, // Random(0, BaseDelay * 2^attempt)
IsTransient = ex => ex is HttpRequestException
});
// Decorrelated jitter - best for preventing synchronization across multiple clients
var results = await tasks.SelectParallelAsync(
async (task, ct) => await ProcessAsync(task, ct),
new ParallelOptionsRivulet
{
MaxRetries = 3,
BackoffStrategy = BackoffStrategy.DecorrelatedJitter, // Random based on previous delay
IsTransient = ex => ex is TimeoutException
});
// Linear backoff - gentler, predictable increase
var results = await items.SelectParallelAsync(
async (item, ct) => await SaveAsync(item, ct),
new ParallelOptionsRivulet
{
MaxRetries = 5,
BaseDelay = TimeSpan.FromSeconds(1),
BackoffStrategy = BackoffStrategy.Linear, // BaseDelay * attempt
IsTransient = ex => ex is InvalidOperationException
});Available strategies:
- Exponential (default):
BaseDelay * 2^(attempt-1)- Predictable exponential growth - ExponentialJitter:
Random(0, BaseDelay * 2^(attempt-1))- Reduces thundering herd - DecorrelatedJitter:
Random(BaseDelay, PreviousDelay * 3)- Prevents client synchronization - Linear:
BaseDelay * attempt- Gentler, linear growth - LinearJitter:
Random(0, BaseDelay * attempt)- Linear with randomization
Track progress with real-time metrics for long-running operations:
// Monitor ETL job progress with ETA
var records = await database.GetRecordsAsync().SelectParallelAsync(
async (record, ct) => await TransformAndLoadAsync(record, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 20,
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(5),
OnProgress = progress =>
{
Console.WriteLine($"Progress: {progress.ItemsCompleted}/{progress.TotalItems}");
Console.WriteLine($"Rate: {progress.ItemsPerSecond:F1} items/sec");
Console.WriteLine($"ETA: {progress.EstimatedTimeRemaining}");
Console.WriteLine($"Errors: {progress.ErrorCount}");
return ValueTask.CompletedTask;
}
}
});
// Streaming progress (total unknown)
await foreach (var result in stream.SelectParallelStreamAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet
{
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(10),
OnProgress = progress =>
{
// No ETA or percent for streams - total is unknown
Console.WriteLine($"Processed: {progress.ItemsCompleted}");
Console.WriteLine($"Rate: {progress.ItemsPerSecond:F1} items/sec");
return ValueTask.CompletedTask;
}
}
}))
{
// Process results as they arrive
}Progress metrics:
- ItemsStarted: Total items that began processing
- ItemsCompleted: Successfully completed items
- TotalItems: Total count (known for arrays/lists, null for streams)
- ErrorCount: Failed items across all retries
- Elapsed: Time since operation started
- ItemsPerSecond: Processing rate
- EstimatedTimeRemaining: ETA (when total is known)
- PercentComplete: 0-100% (when total is known)
Process items in batches for bulk operations like database inserts, batch API calls, or file operations:
// Bulk database inserts - batch 100 records at a time
var results = await records.BatchParallelAsync(
batchSize: 100,
async (batch, ct) =>
{
// Insert entire batch in a single database call
await db.BulkInsertAsync(batch, ct);
return batch.Count;
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 4, // Process 4 batches in parallel
MaxRetries = 3,
IsTransient = ex => ex is SqlException
});
// Batch API calls with timeout - flush partial batches after delay
var apiResults = await items.BatchParallelAsync(
batchSize: 50,
async (batch, ct) =>
{
// Call API with batch of items
return await apiClient.ProcessBatchAsync(batch, ct);
},
batchTimeout: TimeSpan.FromSeconds(2) // Flush batch after 2 seconds even if not full
);
// Streaming batches from async source
await foreach (var result in dataStream.BatchParallelStreamAsync(
batchSize: 100,
async (batch, ct) =>
{
await ProcessBatchAsync(batch, ct);
return batch.Count;
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 8,
OrderedOutput = true, // Maintain batch order
Progress = new ProgressOptions
{
ReportInterval = TimeSpan.FromSeconds(5),
OnProgress = progress =>
{
Console.WriteLine($"Batches processed: {progress.ItemsCompleted}");
return ValueTask.CompletedTask;
}
}
}))
{
// Process batch results as they complete
Console.WriteLine($"Batch completed with {result} items");
}Key Features:
- Size-based batching: Groups items into batches of specified size
- Timeout-based flushing: Optional timeout to flush incomplete batches (async streams only)
- Parallel batch processing: Process multiple batches concurrently with bounded parallelism
- All existing features: Works with retries, error handling, progress tracking, ordered output
- Efficient for bulk operations: Reduces API calls, database round-trips, and I/O overhead
Use Cases:
- Bulk database inserts/updates/deletes
- Batch API calls to external services
- File processing in chunks
- Message queue batch processing
- ETL pipelines with staged operations
Monitor parallel operations with built-in metrics via .NET EventCounters and optional callbacks for custom monitoring systems:
// Zero-cost monitoring with EventCounters (always enabled)
// Monitor with: dotnet-counters monitor --process-id <PID> --counters Rivulet.Core
var results = await items.SelectParallelAsync(ProcessAsync, options);
// Custom metrics callback for Prometheus, DataDog, Application Insights
var options = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
Metrics = new MetricsOptions
{
SampleInterval = TimeSpan.FromSeconds(10),
OnMetricsSample = async snapshot =>
{
// Export to your monitoring system
await prometheus.RecordMetrics(new
{
active_workers = snapshot.ActiveWorkers,
items_completed = snapshot.ItemsCompleted,
throughput = snapshot.ItemsPerSecond,
error_rate = snapshot.ErrorRate,
total_retries = snapshot.TotalRetries
});
}
}
};
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
options);Available Metrics:
- ActiveWorkers: Current number of active worker tasks
- QueueDepth: Items waiting in the input channel queue
- ItemsStarted: Total items that began processing
- ItemsCompleted: Total items completed successfully
- TotalRetries: Cumulative retry attempts across all items
- TotalFailures: Total failed items (after all retries)
- ThrottleEvents: Backpressure events when queue is full
- ItemsPerSecond: Current throughput rate
- ErrorRate: Failure rate (TotalFailures / ItemsStarted)
- Elapsed: Time since operation started
EventCounters (zero-cost monitoring):
# Monitor in real-time with dotnet-counters
dotnet-counters monitor --process-id <PID> --counters Rivulet.Core
# Available counters:
# - items-started
# - items-completed
# - total-retries
# - total-failures
# - throttle-events
# - drain-eventsKey Features:
- Zero-cost when not monitored: EventCounters have minimal overhead
- Thread-safe: Uses lock-free Interlocked operations
- Callback isolation: Exceptions in callbacks don't break operations
- Integrates with all operators: SelectParallelAsync, SelectParallelStreamAsync, ForEachParallelAsync, BatchParallel*
Use Cases:
- Production monitoring and alerting
- Performance tuning and capacity planning
- Debugging throughput issues
- SLA compliance verification
- Auto-scaling triggers
Control the maximum rate of operations using the token bucket algorithm, perfect for respecting API rate limits or smoothing traffic bursts:
// Limit to 100 requests/sec with burst capacity of 200
var results = await apiUrls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
RateLimit = new RateLimitOptions
{
TokensPerSecond = 100, // Sustained rate
BurstCapacity = 200 // Allow brief bursts
}
});
// Heavy operations consume more tokens
var results = await heavyTasks.SelectParallelAsync(
async (task, ct) => await ProcessHeavyAsync(task, ct),
new ParallelOptionsRivulet
{
RateLimit = new RateLimitOptions
{
TokensPerSecond = 50,
BurstCapacity = 50,
TokensPerOperation = 5 // Each operation costs 5 tokens
}
});Key Features:
- Token bucket algorithm: Allows controlled bursts while maintaining average rate
- Configurable rates: Set tokens per second and burst capacity
- Weighted operations: Different operations can consume different token amounts
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
- Combines with retries: Rate limiting applies to retry attempts too
Use Cases:
- Respect API rate limits (e.g., 1000 requests/hour)
- Smooth traffic bursts to downstream services
- Prevent resource exhaustion
- Control database connection usage
- Implement fair resource sharing
Protect your application from cascading failures when a downstream service is unhealthy. The circuit breaker monitors for failures and automatically fails fast when a threshold is reached, giving the failing service time to recover.
// Protect against a flaky API - open after 5 consecutive failures
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5, // Open after 5 consecutive failures
SuccessThreshold = 2, // Close after 2 consecutive successes in HalfOpen
OpenTimeout = TimeSpan.FromSeconds(30), // Test recovery after 30 seconds
OnStateChange = async (from, to) =>
{
Console.WriteLine($"Circuit {from} β {to}");
await LogCircuitStateChangeAsync(from, to);
}
}
});
// Time-based failure tracking (percentage within window)
var results = await requests.SelectParallelAsync(
async (req, ct) => await apiClient.SendAsync(req, ct),
new ParallelOptionsRivulet
{
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 10, // Open if 10 failures occur...
SamplingDuration = TimeSpan.FromSeconds(60), // ...within 60 seconds
OpenTimeout = TimeSpan.FromMinutes(5)
}
});Circuit States:
- Closed: Normal operation. Operations execute normally. Failures are tracked.
- Open: Failure threshold exceeded. Operations fail immediately with
CircuitBreakerOpenExceptionwithout executing. Prevents cascading failures. - HalfOpen: After
OpenTimeoutexpires, circuit allows limited operations to test recovery. Success transitions to Closed. Failure transitions back to Open.
Key Features:
- Fail-fast protection: Prevents overwhelming failing services
- Automatic recovery testing: Transitions to HalfOpen after timeout to probe health
- Flexible failure tracking: Consecutive failures or time-window based (with
SamplingDuration) - State change callbacks: Monitor circuit transitions for alerting/logging
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
Use Cases:
- Protecting downstream microservices from overload
- Preventing cascading failures in distributed systems
- Graceful degradation when dependencies are unhealthy
- Reducing latency by failing fast instead of waiting for timeouts
Automatically adjust parallelism based on real-time performance metrics. Instead of using a fixed MaxDegreeOfParallelism, adaptive concurrency dynamically scales workers up when performance is good and scales down when latency increases or errors occur.
// Auto-scale between 1-32 workers based on latency and success rate
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
new ParallelOptionsRivulet
{
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 1, // Lower bound
MaxConcurrency = 32, // Upper bound
InitialConcurrency = 8, // Starting point (optional)
TargetLatency = TimeSpan.FromMilliseconds(100), // Target p50 latency
MinSuccessRate = 0.95, // 95% success rate threshold
SampleInterval = TimeSpan.FromSeconds(1), // How often to adjust
OnConcurrencyChange = async (old, @new) =>
{
Console.WriteLine($"Concurrency: {old} β {@new}");
await metricsClient.RecordGaugeAsync("concurrency", @new);
}
}
});
// Different adjustment strategies
var results = await tasks.SelectParallelAsync(
async (task, ct) => await ProcessAsync(task, ct),
new ParallelOptionsRivulet
{
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 2,
MaxConcurrency = 64,
IncreaseStrategy = AdaptiveConcurrencyStrategy.Aggressive, // Faster increase
DecreaseStrategy = AdaptiveConcurrencyStrategy.Gradual, // Slower decrease
MinSuccessRate = 0.90
}
});How It Works: Uses AIMD (Additive Increase Multiplicative Decrease) algorithm similar to TCP congestion control:
- Increase: When success rate is high and latency is acceptable, add workers gradually (AIMD: +1, Aggressive: +10%)
- Decrease: When latency exceeds target or success rate drops, reduce workers sharply (AIMD/Aggressive: -50%, Gradual: -25%)
- Samples performance every
SampleIntervaland adjusts within[MinConcurrency, MaxConcurrency]bounds
Adjustment Strategies:
- AIMD (default): Additive Increase (+1), Multiplicative Decrease (-50%) - Like TCP
- Aggressive: Faster increase (+10%), same decrease (-50%) - For rapidly changing workloads
- Gradual: Same increase (+1), gentler decrease (-25%) - For stable workloads
Key Features:
- Self-tuning: Automatically finds optimal concurrency for current load
- Latency-aware: Reduces workers when operations are too slow
- Error-aware: Scales down when success rate drops below threshold
- Bounded: Always stays within configured min/max limits
- Observable: Callbacks for monitoring concurrency changes
- Works with all operators: SelectParallel*, ForEachParallel*, BatchParallel*
Use Cases:
- Variable load scenarios where optimal concurrency changes over time
- Auto-scaling to match downstream service capacity
- Preventing overload when downstream services slow down
- Maximizing throughput without manual tuning
- Handling unpredictable workload patterns
Rivulet.Diagnostics extends the core library with production-ready observability features for comprehensive monitoring and health checks.
- EventListener Wrappers: Console, File, and Structured JSON logging
- Metrics Aggregation: Time-window statistics with min/max/avg/current values
- Prometheus Export: Export metrics in Prometheus text format
- Health Check Integration: Microsoft.Extensions.Diagnostics.HealthChecks support
- Fluent Builder API: Easy configuration with DiagnosticsBuilder
Console Listener - Development and debugging
using var listener = new RivuletConsoleListener();
await urls.SelectParallelAsync(ProcessAsync, options);
// Console output:
// [2025-01-15 10:30:45] Items Started: 100.00
// [2025-01-15 10:30:46] Items Completed: 100.00File Listener with Rotation - Production logging
using var listener = new RivuletFileListener(
"metrics.log",
maxFileSizeBytes: 10 * 1024 * 1024 // 10MB
);Structured JSON Logging - Log aggregation (ELK, Splunk, Azure Monitor)
using var listener = new RivuletStructuredLogListener("metrics.json");
// Or custom action for your logging system
using var listener = new RivuletStructuredLogListener(json =>
{
logger.LogInformation(json);
});Metrics Aggregation - Time-window statistics
using var aggregator = new MetricsAggregator(TimeSpan.FromSeconds(10));
aggregator.OnAggregation += metrics =>
{
foreach (var metric in metrics)
{
Console.WriteLine($"{metric.DisplayName}:");
Console.WriteLine($" Min: {metric.Min:F2}, Max: {metric.Max:F2}");
Console.WriteLine($" Avg: {metric.Average:F2}, Current: {metric.Current:F2}");
}
};Prometheus Export - Scraping endpoint
using var exporter = new PrometheusExporter();
// In your ASP.NET Core app
app.MapGet("/metrics", () => exporter.Export());
// Output:
// # HELP rivulet_items_started Total number of items started
// # TYPE rivulet_items_started gauge
// rivulet_items_started 1000.00Health Check Integration - ASP.NET Core health checks
// Startup/Program.cs
builder.Services.AddHealthChecks()
.AddCheck<RivuletHealthCheck>("rivulet", tags: new[] { "ready" });
builder.Services.Configure<RivuletHealthCheckOptions>(options =>
{
options.ErrorRateThreshold = 0.1; // 10% error rate
options.FailureCountThreshold = 100;
});
app.MapHealthChecks("/health");Fluent Builder - Configure multiple listeners
using var diagnostics = new DiagnosticsBuilder()
.AddConsoleListener()
.AddFileListener("metrics.log")
.AddStructuredLogListener("metrics.json")
.AddMetricsAggregator(TimeSpan.FromSeconds(10), metrics =>
{
// Handle aggregated metrics
})
.AddPrometheusExporter(out var exporter)
.Build();
// All listeners capture metrics simultaneously
await urls.SelectParallelAsync(ProcessAsync, options);
// Export Prometheus metrics
var prometheusText = exporter.Export();See the Rivulet.Diagnostics README for complete documentation.
Rivulet.Diagnostics.OpenTelemetry provides industry-standard observability through OpenTelemetry integration with distributed tracing, metrics export, and comprehensive telemetry.
- Distributed Tracing: Automatic activity creation with parent-child relationships
- Metrics Export: Bridge EventCounters to OpenTelemetry Meters
- Retry Tracking: Record retry attempts as activity events
- Circuit Breaker Events: Track circuit state changes in traces
- Adaptive Concurrency: Monitor concurrency adjustments
- Multi-Platform Support: Export to Jaeger, Zipkin, Azure Monitor, DataDog, and more
1. Configure OpenTelemetry
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
using Rivulet.Diagnostics.OpenTelemetry;
// At application startup
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyService"))
.AddSource(RivuletActivitySource.SourceName)
.AddJaegerExporter()
.Build();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyService"))
.AddMeter(RivuletMetricsExporter.MeterName)
.AddPrometheusExporter()
.Build();
// Create metrics exporter
using var metricsExporter = new RivuletMetricsExporter();2. Use with Rivulet Operations
var options = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException
}.WithOpenTelemetryTracing("FetchUrls");
var results = await urls.SelectParallelAsync(
async (url, ct) => await httpClient.GetAsync(url, ct),
options);Activity Hierarchy
Rivulet.FetchUrls [Root Activity]
βββ Rivulet.FetchUrls.Item [Item 0] - Status: Ok
βββ Rivulet.FetchUrls.Item [Item 1] - Retry attempt 1 - Status: Ok
βββ Rivulet.FetchUrls.Item [Item 2] - Error - Status: Error
Export to Azure Monitor
using Azure.Monitor.OpenTelemetry.Exporter;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(RivuletActivitySource.SourceName)
.AddAzureMonitorTraceExporter(options =>
{
options.ConnectionString = "InstrumentationKey=...";
})
.Build();See the Rivulet.Diagnostics.OpenTelemetry README for complete documentation.
Rivulet.Core includes comprehensive benchmarks using BenchmarkDotNet to measure performance across .NET 8.0 and .NET 9.0. The benchmarks help validate performance characteristics, identify regressions, and guide optimization efforts.
# Run all benchmarks
cd tests\Rivulet.Benchmarks
dotnet run -c Release
# Run specific benchmark suite
dotnet run -c Release -- --filter "*CoreOperatorsBenchmarks*"
# Quick run with fewer iterations
dotnet run -c Release -- --job short
# Export results to multiple formats
dotnet run -c Release -- --exporters json,html,markdownMeasures performance of core parallel operators:
SelectParallelAsync(CPU-bound and I/O-bound workloads)SelectParallelStreamAsync(streaming results)ForEachParallelAsync(side effects)- Comparison with sequential processing and unbounded
Task.WhenAll
Configuration: 1,000 items with various MaxDegreeOfParallelism settings
Evaluates batch processing performance with different batch sizes (100, 500, 1000):
BatchParallelAsyncperformance characteristicsBatchParallelStreamAsyncstreaming behavior- Optimal batch sizing analysis
Configuration: 10,000 items, MaxDegreeOfParallelism = 4
Quantifies the overhead of error handling and retry mechanisms:
- Retry policy overhead with transient failures (10% failure rate)
- Different error modes (FailFast, BestEffort, CollectAndContinue)
- Backoff strategy performance (Exponential, ExponentialJitter)
Configuration: 500 items with simulated failures
Measures the performance cost of production-grade features:
- Circuit breaker overhead
- Rate limiting (token bucket) overhead
- Adaptive concurrency overhead
- Progress tracking overhead
- Metrics tracking overhead
- Combined feature overhead
Configuration: 500 items to isolate feature-specific costs
Analyzes how performance scales with different MaxDegreeOfParallelism values (1, 2, 4, 8, 16, 32, 64, 128) to help identify optimal concurrency levels for various workload types.
Configuration: 1,000 items with 1ms I/O simulation per item
Based on benchmark runs on modern hardware:
- I/O-Bound Operations: 10-30x faster than sequential processing with optimal parallelism
- Memory Efficiency: ~60-80% less allocation than unbounded
Task.WhenAllfor large workloads - Advanced Features Overhead: <5-10% overhead when features are not actively triggered
- Optimal Parallelism: Typically 16-64 for I/O-bound, 2-8 for CPU-bound (varies by hardware)
- .NET 9.0 Performance: Generally 5-15% faster than .NET 8.0 due to runtime improvements
BenchmarkDotNet v0.14.0, Windows
Intel Core, 16 cores
| Method | Runtime | Mean | Allocated |
|---------------------- |-------- |---------:|----------:|
| SelectParallelAsync | .NET 8 | 498.3 ms | 1.05 MB |
| SelectParallelAsync | .NET 9 | 474.1 ms | 0.92 MB | 5% faster!
| Sequential Processing | .NET 8 | 1004 ms | 0.51 MB |
| Task.WhenAll | .NET 8 | 45.2 ms | 4.82 MB | Unbounded!
// 1000 items, 1ms I/O delay each, MaxDegreeOfParallelism = 32
// SelectParallelAsync achieves ~20x speedup with controlled memory usage
Key Insights:
- Rivulet provides near-optimal performance while maintaining bounded concurrency
- Memory usage is significantly lower than unbounded parallelism
- .NET 9.0 shows measurable improvements in both speed and memory
- Advanced features add minimal overhead when not actively engaged
- Mean: Average execution time across iterations
- Allocated: Total memory allocated per operation (lower is better)
- Gen0/Gen1/Gen2: Garbage collection counts
- Baseline: Reference implementation for comparison (usually marked with
*)
When adding new benchmarks:
- Focus each benchmark on measuring one specific aspect
- Include a baseline for meaningful comparison
- Use realistic workload sizes (avoid micro-benchmarks)
- Add descriptive names and documentation
- Test on both .NET 8.0 and .NET 9.0
See tests/Rivulet.Benchmarks/README.md for detailed documentation.
See the full Roadmap for detailed plans.
- Rivulet.Http β - Parallel HTTP operations with HttpClientFactory integration
- Rivulet.IO β - Parallel file operations, directory processing
- Rivulet.Sql β - Provider-agnostic parallel SQL operations
- Rivulet.Sql.SqlServer β - SqlBulkCopy integration (10-100x faster bulk inserts)
- Rivulet.Sql.PostgreSql β - COPY command integration (10-100x faster)
- Rivulet.Sql.MySql β - LOAD DATA INFILE with MySqlBulkLoader (10-100x faster)
- Rivulet.Polly β - Polly v8 integration, hedging, result-based retry
- Rivulet.Json π - Parallel JSON processing, deserialization, JsonPath queries
- Rivulet.Azure.Storage - Blob Storage parallel operations
- Rivulet.Aws.S3 - S3 parallel operations
- Rivulet.EntityFramework - EF Core parallel queries, multi-tenant support
- Rivulet.Csv π - Parallel CSV parsing and writing
- Pipeline Composition API - Multi-stage processing with different concurrency per stage
The repository includes PowerShell scripts to streamline development and release workflows.
Build, restore, and test the solution locally.
# Debug build with tests (default)
.\Build.ps1
# Release build with tests
.\Build.ps1 -Configuration Release
# Skip tests
.\Build.ps1 -SkipTestsBuild and inspect NuGet packages locally before releasing.
# Build all packages with test version
.\NugetPackage.ps1
# Build specific package
.\NugetPackage.ps1 -Project Core
.\NugetPackage.ps1 -Project Diagnostics
# Build with specific version
.\NugetPackage.ps1 -Version "1.2.3" -Project AllCreates packages in ./test-packages and extracts contents to ./test-extract for verification.
Generate high-quality commit messages using AI (Claude, Gemini, or OpenAI).
# Quick setup - set API key for your preferred provider
$env:ANTHROPIC_API_KEY = "your-key" # For Claude
$env:GOOGLE_API_KEY = "your-key" # For Gemini
$env:OPENAI_API_KEY = "your-key" # For OpenAI
# Auto-detect provider from environment
.\SmartCommit.ps1
# Or specify provider explicitly
.\SmartCommit.ps1 -Provider Claude
.\SmartCommit.ps1 -Provider Gemini
.\SmartCommit.ps1 -Provider OpenAIAdvanced: Create .smartcommit.config.json (see .smartcommit.config.example.json) to configure:
- Default provider
- API keys (alternative to environment variables)
- Model versions (claude-3-5-sonnet, gemini-2.0-flash, gpt-4o, etc.)
This script:
- Analyzes your staged changes using git diff
- Calls your chosen AI provider to generate a meaningful commit message
- Shows the suggested message and allows you to:
- [y] Accept and commit
- [r] Request revision with feedback (e.g., "make it shorter", "add more detail")
- [n] Cancel
- Iteratively refines the message based on your feedback
- Commits changes when you accept
Get API keys:
- Claude: console.anthropic.com
- Gemini: aistudio.google.com/apikey
- OpenAI: platform.openai.com/api-keys
Create release branch, tag, and trigger automated publishing.
# Create release for version 1.0.0 (creates branch release/1.0.x, tag v1.0.0)
.\Release.ps1 -Version "1.0.0"
# Create patch release 1.0.1 (uses existing branch release/1.0.x, tag v1.0.1)
.\Release.ps1 -Version "1.0.1"
# Create pre-release (creates branch release/2.0.x, tag v2.0.0-beta)
.\Release.ps1 -Version "2.0.0-beta"Branching Strategy:
- Branches:
release/{major}.{minor}.x(e.g.,release/1.0.xfor all 1.0.* versions) - Tags:
v{full.version}(e.g.,v1.0.0,v1.0.1,v1.0.2) - Master branch for active development
- Patch releases reuse the same release branch
This script:
- Creates/switches to
release/{major}.{minor}.xbranch - Displays release information (commit details, author, version, repository)
- Asks for confirmation (y/Y) before proceeding
- Creates git tag
v{version}and pushes to GitHub - Triggers the release workflow that builds, tests, and publishes to NuGet.org
The confirmation step shows:
- Version and tag information
- Current commit hash, author, date, and message
- List of actions that will be performed
- Allows you to cancel before any changes are pushed
- Contributing Guide
- Roadmap - Future plans
- Security Policy - Vulnerability reporting
- Code of Conduct
This project is licensed under the MIT License - see the LICENSE file for details.
Built with β€οΈ using:
- .NET 8.0 and .NET 9.0
- System.Threading.Channels for backpressure
- xUnit for testing
- BenchmarkDotNet for performance validation
