Skip to content

Commit c4bafa9

Browse files
committed
Refactor IStream to single generic parameter interface
Refactored IStream and related classes to use a single generic parameter (IStream<TIn>), removing the internal TCurrent type from the public API. Updated all builder interfaces, handlers, and usages to match the new signature. Introduced IBranchInfo for branch metadata, and updated GetBranches() to return IBranchInfo instances. This simplifies the stream abstraction and improves API encapsulation.
1 parent 5b5fe0e commit c4bafa9

17 files changed

Lines changed: 59 additions & 42 deletions

src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors
1414
public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipelineBehavior<TCommand, TResult>
1515
where TCommand : ICommand<TResult>
1616
{
17-
private readonly IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> _stream;
17+
private readonly IStream<CommandExecutionEvent<TCommand, TResult>> _stream;
1818
private readonly bool _emitBeforeExecution;
1919
private readonly bool _emitAfterExecution;
2020

@@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipeline
2525
/// <param name="emitBeforeExecution">If true, emit an event before command execution.</param>
2626
/// <param name="emitAfterExecution">If true, emit an event after command execution.</param>
2727
public StreamEmittingCommandBehavior(
28-
IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> stream,
28+
IStream<CommandExecutionEvent<TCommand, TResult>> stream,
2929
bool emitBeforeExecution = false,
3030
bool emitAfterExecution = true)
3131
{

src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors
1818
public class StreamEmittingNotificationBehavior<TNotification> : INotificationPipelineBehavior<TNotification>
1919
where TNotification : INotification
2020
{
21-
private readonly IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> _stream;
21+
private readonly IStream<NotificationEvent<TNotification>> _stream;
2222
private readonly bool _emitBeforeHandling;
2323
private readonly bool _emitAfterHandling;
2424

@@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior<TNotification> : INotificationPi
2929
/// <param name="emitBeforeHandling">If true, emit an event before notification handling.</param>
3030
/// <param name="emitAfterHandling">If true, emit an event after notification handling.</param>
3131
public StreamEmittingNotificationBehavior(
32-
IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> stream,
32+
IStream<NotificationEvent<TNotification>> stream,
3333
bool emitBeforeHandling = false,
3434
bool emitAfterHandling = true)
3535
{

src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions
2222
/// <returns>The service collection for chaining.</returns>
2323
public static IServiceCollection AddStreamEmittingNotificationHandler<TNotification>(
2424
this IServiceCollection services,
25-
Func<IServiceProvider, IStream<TNotification, TNotification>> streamFactory,
25+
Func<IServiceProvider, IStream<TNotification>> streamFactory,
2626
Action<TNotification, Exception> errorHandler = null)
2727
where TNotification : INotification
2828
{
@@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandler<TNotificat
4747
/// <returns>The service collection for chaining.</returns>
4848
public static IServiceCollection AddTransformingStreamNotificationHandler<TNotification, TStreamInput>(
4949
this IServiceCollection services,
50-
Func<IServiceProvider, IStream<TStreamInput, TStreamInput>> streamFactory,
50+
Func<IServiceProvider, IStream<TStreamInput>> streamFactory,
5151
Func<TNotification, TStreamInput> transformer,
5252
Action<TNotification, Exception> errorHandler = null)
5353
where TNotification : INotification

src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ namespace Cortex.Streams.Mediator.Handlers
1717
public abstract class StreamBackedStreamQueryHandler<TQuery, TResult> : IStreamQueryHandler<TQuery, TResult>
1818
where TQuery : IStreamQuery<TResult>
1919
{
20-
private readonly IStream<TResult, TResult> _stream;
20+
private readonly IStream<TResult> _stream;
2121
private readonly int _channelCapacity;
2222

2323
/// <summary>
2424
/// Initializes a new instance of the <see cref="StreamBackedStreamQueryHandler{TQuery, TResult}"/> class.
2525
/// </summary>
2626
/// <param name="stream">The Cortex Stream to read data from.</param>
2727
/// <param name="channelCapacity">The capacity of the internal channel buffer. Default is 100.</param>
28-
protected StreamBackedStreamQueryHandler(IStream<TResult, TResult> stream, int channelCapacity = 100)
28+
protected StreamBackedStreamQueryHandler(IStream<TResult> stream, int channelCapacity = 100)
2929
{
3030
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
3131
_channelCapacity = channelCapacity;

src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers
1313
public class StreamEmittingNotificationHandler<TNotification> : INotificationHandler<TNotification>
1414
where TNotification : INotification
1515
{
16-
private readonly IStream<TNotification, TNotification> _stream;
16+
private readonly IStream<TNotification> _stream;
1717
private readonly Action<TNotification, Exception> _errorHandler;
1818

1919
/// <summary>
@@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler<TNotification> : INotificationHan
2222
/// <param name="stream">The stream to emit notifications to.</param>
2323
/// <param name="errorHandler">Optional handler for errors during emission.</param>
2424
public StreamEmittingNotificationHandler(
25-
IStream<TNotification, TNotification> stream,
25+
IStream<TNotification> stream,
2626
Action<TNotification, Exception> errorHandler = null)
2727
{
2828
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
@@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat
6262
public class TransformingStreamNotificationHandler<TNotification, TStreamInput> : INotificationHandler<TNotification>
6363
where TNotification : INotification
6464
{
65-
private readonly IStream<TStreamInput, TStreamInput> _stream;
65+
private readonly IStream<TStreamInput> _stream;
6666
private readonly Func<TNotification, TStreamInput> _transformer;
6767
private readonly Action<TNotification, Exception> _errorHandler;
6868

@@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler<TNotification, TStreamInput>
7373
/// <param name="transformer">A function to transform notifications into stream input.</param>
7474
/// <param name="errorHandler">Optional handler for errors during emission.</param>
7575
public TransformingStreamNotificationHandler(
76-
IStream<TStreamInput, TStreamInput> stream,
76+
IStream<TStreamInput> stream,
7777
Func<TNotification, TStreamInput> transformer,
7878
Action<TNotification, Exception> errorHandler = null)
7979
{
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Cortex.Streams.Abstractions
2+
{
3+
/// <summary>
4+
/// Provides information about a branch in the stream processing pipeline.
5+
/// This is a non-generic interface that allows accessing branch metadata
6+
/// without exposing internal type parameters.
7+
/// </summary>
8+
public interface IBranchInfo
9+
{
10+
/// <summary>
11+
/// Gets the name of the branch.
12+
/// </summary>
13+
string BranchName { get; }
14+
}
15+
}

src/Cortex.Streams/Abstractions/IFanOutBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,6 @@ public interface IFanOutBuilder<TIn, TCurrent>
126126
/// </summary>
127127
/// <returns>The built stream instance ready to be started.</returns>
128128
/// <exception cref="InvalidOperationException">Thrown when no sinks have been configured.</exception>
129-
IStream<TIn, TCurrent> Build();
129+
IStream<TIn> Build();
130130
}
131131
}

src/Cortex.Streams/Abstractions/ISinkBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
/// Provides a method to build the stream after adding a sink.
55
/// </summary>
66
/// <typeparam name="TIn">The type of the initial input to the stream.</typeparam>
7-
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
7+
/// <typeparam name="TCurrent">The current type of data in the stream (internal use only).</typeparam>
88
public interface ISinkBuilder<TIn, TCurrent>
99
{
1010
/// <summary>
1111
/// Builds the stream and returns a stream instance that can be started and stopped.
1212
/// </summary>
1313
/// <returns>A stream instance.</returns>
14-
IStream<TIn, TCurrent> Build();
14+
IStream<TIn> Build();
1515
}
1616
}

src/Cortex.Streams/Abstractions/IStream.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
using Cortex.States;
2-
using Cortex.Streams.Operators;
2+
using Cortex.Streams.Abstractions;
33
using Cortex.Streams.Performance;
44
using System.Collections.Generic;
55
using System.Threading;
66
using System.Threading.Tasks;
77

88
namespace Cortex.Streams
99
{
10-
public interface IStream<TIn, TCurrent>
10+
public interface IStream<TIn>
1111
{
1212
/// <summary>
1313
/// Start the stream processing.
@@ -66,7 +66,7 @@ public interface IStream<TIn, TCurrent>
6666

6767
StreamStatuses GetStatus();
6868

69-
IReadOnlyDictionary<string, BranchOperator<TCurrent>> GetBranches();
69+
IReadOnlyDictionary<string, IBranchInfo> GetBranches();
7070

7171
TStateStore GetStateStoreByName<TStateStore>(string name) where TStateStore : IDataStore;
7272
IEnumerable<TStateStore> GetStateStoresByType<TStateStore>() where TStateStore : IDataStore;

src/Cortex.Streams/Abstractions/IStreamBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public interface IStreamBuilder<TIn, TCurrent>
107107
/// Builds the stream
108108
/// </summary>
109109
/// <returns></returns>
110-
IStream<TIn, TCurrent> Build();
110+
IStream<TIn> Build();
111111

112112

113113
/// <summary>

0 commit comments

Comments
 (0)