Skip to content

Commit 08efb09

Browse files
authored
Merge pull request #208 from buildersoftio/v3/bug/207
Refactor IStream to single generic parameter interface
2 parents 5b5fe0e + c4bafa9 commit 08efb09

17 files changed

Lines changed: 59 additions & 42 deletions

src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors
1414
public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipelineBehavior<TCommand, TResult>
1515
where TCommand : ICommand<TResult>
1616
{
17-
private readonly IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> _stream;
17+
private readonly IStream<CommandExecutionEvent<TCommand, TResult>> _stream;
1818
private readonly bool _emitBeforeExecution;
1919
private readonly bool _emitAfterExecution;
2020

@@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipeline
2525
/// <param name="emitBeforeExecution">If true, emit an event before command execution.</param>
2626
/// <param name="emitAfterExecution">If true, emit an event after command execution.</param>
2727
public StreamEmittingCommandBehavior(
28-
IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> stream,
28+
IStream<CommandExecutionEvent<TCommand, TResult>> stream,
2929
bool emitBeforeExecution = false,
3030
bool emitAfterExecution = true)
3131
{

src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors
1818
public class StreamEmittingNotificationBehavior<TNotification> : INotificationPipelineBehavior<TNotification>
1919
where TNotification : INotification
2020
{
21-
private readonly IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> _stream;
21+
private readonly IStream<NotificationEvent<TNotification>> _stream;
2222
private readonly bool _emitBeforeHandling;
2323
private readonly bool _emitAfterHandling;
2424

@@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior<TNotification> : INotificationPi
2929
/// <param name="emitBeforeHandling">If true, emit an event before notification handling.</param>
3030
/// <param name="emitAfterHandling">If true, emit an event after notification handling.</param>
3131
public StreamEmittingNotificationBehavior(
32-
IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> stream,
32+
IStream<NotificationEvent<TNotification>> stream,
3333
bool emitBeforeHandling = false,
3434
bool emitAfterHandling = true)
3535
{

src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions
2222
/// <returns>The service collection for chaining.</returns>
2323
public static IServiceCollection AddStreamEmittingNotificationHandler<TNotification>(
2424
this IServiceCollection services,
25-
Func<IServiceProvider, IStream<TNotification, TNotification>> streamFactory,
25+
Func<IServiceProvider, IStream<TNotification>> streamFactory,
2626
Action<TNotification, Exception> errorHandler = null)
2727
where TNotification : INotification
2828
{
@@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandler<TNotificat
4747
/// <returns>The service collection for chaining.</returns>
4848
public static IServiceCollection AddTransformingStreamNotificationHandler<TNotification, TStreamInput>(
4949
this IServiceCollection services,
50-
Func<IServiceProvider, IStream<TStreamInput, TStreamInput>> streamFactory,
50+
Func<IServiceProvider, IStream<TStreamInput>> streamFactory,
5151
Func<TNotification, TStreamInput> transformer,
5252
Action<TNotification, Exception> errorHandler = null)
5353
where TNotification : INotification

src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ namespace Cortex.Streams.Mediator.Handlers
1717
public abstract class StreamBackedStreamQueryHandler<TQuery, TResult> : IStreamQueryHandler<TQuery, TResult>
1818
where TQuery : IStreamQuery<TResult>
1919
{
20-
private readonly IStream<TResult, TResult> _stream;
20+
private readonly IStream<TResult> _stream;
2121
private readonly int _channelCapacity;
2222

2323
/// <summary>
2424
/// Initializes a new instance of the <see cref="StreamBackedStreamQueryHandler{TQuery, TResult}"/> class.
2525
/// </summary>
2626
/// <param name="stream">The Cortex Stream to read data from.</param>
2727
/// <param name="channelCapacity">The capacity of the internal channel buffer. Default is 100.</param>
28-
protected StreamBackedStreamQueryHandler(IStream<TResult, TResult> stream, int channelCapacity = 100)
28+
protected StreamBackedStreamQueryHandler(IStream<TResult> stream, int channelCapacity = 100)
2929
{
3030
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
3131
_channelCapacity = channelCapacity;

src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers
1313
public class StreamEmittingNotificationHandler<TNotification> : INotificationHandler<TNotification>
1414
where TNotification : INotification
1515
{
16-
private readonly IStream<TNotification, TNotification> _stream;
16+
private readonly IStream<TNotification> _stream;
1717
private readonly Action<TNotification, Exception> _errorHandler;
1818

1919
/// <summary>
@@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler<TNotification> : INotificationHan
2222
/// <param name="stream">The stream to emit notifications to.</param>
2323
/// <param name="errorHandler">Optional handler for errors during emission.</param>
2424
public StreamEmittingNotificationHandler(
25-
IStream<TNotification, TNotification> stream,
25+
IStream<TNotification> stream,
2626
Action<TNotification, Exception> errorHandler = null)
2727
{
2828
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
@@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat
6262
public class TransformingStreamNotificationHandler<TNotification, TStreamInput> : INotificationHandler<TNotification>
6363
where TNotification : INotification
6464
{
65-
private readonly IStream<TStreamInput, TStreamInput> _stream;
65+
private readonly IStream<TStreamInput> _stream;
6666
private readonly Func<TNotification, TStreamInput> _transformer;
6767
private readonly Action<TNotification, Exception> _errorHandler;
6868

@@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler<TNotification, TStreamInput>
7373
/// <param name="transformer">A function to transform notifications into stream input.</param>
7474
/// <param name="errorHandler">Optional handler for errors during emission.</param>
7575
public TransformingStreamNotificationHandler(
76-
IStream<TStreamInput, TStreamInput> stream,
76+
IStream<TStreamInput> stream,
7777
Func<TNotification, TStreamInput> transformer,
7878
Action<TNotification, Exception> errorHandler = null)
7979
{
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Cortex.Streams.Abstractions
2+
{
3+
/// <summary>
4+
/// Provides information about a branch in the stream processing pipeline.
5+
/// This is a non-generic interface that allows accessing branch metadata
6+
/// without exposing internal type parameters.
7+
/// </summary>
8+
public interface IBranchInfo
9+
{
10+
/// <summary>
11+
/// Gets the name of the branch.
12+
/// </summary>
13+
string BranchName { get; }
14+
}
15+
}

src/Cortex.Streams/Abstractions/IFanOutBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,6 @@ public interface IFanOutBuilder<TIn, TCurrent>
126126
/// </summary>
127127
/// <returns>The built stream instance ready to be started.</returns>
128128
/// <exception cref="InvalidOperationException">Thrown when no sinks have been configured.</exception>
129-
IStream<TIn, TCurrent> Build();
129+
IStream<TIn> Build();
130130
}
131131
}

src/Cortex.Streams/Abstractions/ISinkBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
/// Provides a method to build the stream after adding a sink.
55
/// </summary>
66
/// <typeparam name="TIn">The type of the initial input to the stream.</typeparam>
7-
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
7+
/// <typeparam name="TCurrent">The current type of data in the stream (internal use only).</typeparam>
88
public interface ISinkBuilder<TIn, TCurrent>
99
{
1010
/// <summary>
1111
/// Builds the stream and returns a stream instance that can be started and stopped.
1212
/// </summary>
1313
/// <returns>A stream instance.</returns>
14-
IStream<TIn, TCurrent> Build();
14+
IStream<TIn> Build();
1515
}
1616
}

src/Cortex.Streams/Abstractions/IStream.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
using Cortex.States;
2-
using Cortex.Streams.Operators;
2+
using Cortex.Streams.Abstractions;
33
using Cortex.Streams.Performance;
44
using System.Collections.Generic;
55
using System.Threading;
66
using System.Threading.Tasks;
77

88
namespace Cortex.Streams
99
{
10-
public interface IStream<TIn, TCurrent>
10+
public interface IStream<TIn>
1111
{
1212
/// <summary>
1313
/// Start the stream processing.
@@ -66,7 +66,7 @@ public interface IStream<TIn, TCurrent>
6666

6767
StreamStatuses GetStatus();
6868

69-
IReadOnlyDictionary<string, BranchOperator<TCurrent>> GetBranches();
69+
IReadOnlyDictionary<string, IBranchInfo> GetBranches();
7070

7171
TStateStore GetStateStoreByName<TStateStore>(string name) where TStateStore : IDataStore;
7272
IEnumerable<TStateStore> GetStateStoresByType<TStateStore>() where TStateStore : IDataStore;

src/Cortex.Streams/Abstractions/IStreamBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public interface IStreamBuilder<TIn, TCurrent>
107107
/// Builds the stream
108108
/// </summary>
109109
/// <returns></returns>
110-
IStream<TIn, TCurrent> Build();
110+
IStream<TIn> Build();
111111

112112

113113
/// <summary>

0 commit comments

Comments
 (0)