Skip to content

Commit 5207b02

Browse files
committed
Introducing NATS stream provider
1 parent 4bbe176 commit 5207b02

7 files changed

Lines changed: 60 additions & 33 deletions

File tree

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
<PackageVersion Include="StackExchange.Redis" Version="2.8.16" />
7878
<PackageVersion Include="KubernetesClient" Version="15.0.1" />
7979
<PackageVersion Include="CassandraCSharpDriver" Version="3.20.1" />
80-
<PackageVersion Include="NATS.Net" Version="2.5.9" />
80+
<PackageVersion Include="NATS.Net" Version="2.6.4" />
8181
<!-- Test related packages -->
8282
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
8383
<PackageVersion Include="coverlet.msbuild" Version="6.0.0" />

Orleans.slnx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@
8383
<Folder Name="/src/Extensions/Cassandra/">
8484
<Project Path="src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj" />
8585
</Folder>
86+
<Folder Name="/src/Extensions/NATS/">
87+
<Project Path="src\Orleans.Streaming.NATS\Orleans.Streaming.NATS.csproj" Type="Classic C#" />
88+
</Folder>
8689
<Folder Name="/src/Extensions/Redis/">
8790
<Project Path="src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj" />
8891
<Project Path="src/Redis/Orleans.GrainDirectory.Redis/Orleans.GrainDirectory.Redis.csproj" />
@@ -114,11 +117,11 @@
114117
<Project Path="test/Grains/BenchmarkGrains/BenchmarkGrains.csproj" />
115118
</Folder>
116119
<Folder Name="/test/DistributedTests/">
117-
<File Path="test/DistributedTests/README.md" />
118120
<Project Path="test/DistributedTests/DistributedTests.Client/DistributedTests.Client.csproj" />
119121
<Project Path="test/DistributedTests/DistributedTests.Common/DistributedTests.Common.csproj" />
120122
<Project Path="test/DistributedTests/DistributedTests.Grains/DistributedTests.Grains.csproj" />
121123
<Project Path="test/DistributedTests/DistributedTests.Server/DistributedTests.Server.csproj" />
124+
<File Path="test/DistributedTests/README.md" />
122125
</Folder>
123126
<Folder Name="/test/Extensions/">
124127
<Project Path="test/Extensions/AWSUtils.Tests/AWSUtils.Tests.csproj" />
@@ -130,6 +133,7 @@
130133
<Project Path="test/Extensions/TesterAdoNet/Tester.AdoNet.csproj" />
131134
<Project Path="test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj" />
132135
<Project Path="test/Extensions/TesterZooKeeperUtils/Tester.ZooKeeperUtils.csproj" />
136+
<Project Path="test\Extensions\NATS.Tests\NATS.Tests.csproj" Type="Classic C#" />
133137
</Folder>
134138
<Folder Name="/test/Grains/">
135139
<Project Path="test/Grains/TestFSharp/TestFSharp.fsproj" />
@@ -155,4 +159,4 @@
155159
<Project Path="test/Transactions/Orleans.Transactions.Azure.Test/Orleans.Transactions.Azure.Test.csproj" />
156160
<Project Path="test/Transactions/Orleans.Transactions.Tests/Orleans.Transactions.Tests.csproj" />
157161
</Folder>
158-
</Solution>
162+
</Solution>

src/Orleans.Streaming.NATS/NatsOptions.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using NATS.Client.Core;
1+
using System.Text.Json;
2+
using NATS.Client.Core;
23

34
namespace Orleans.Streaming.NATS;
45

@@ -44,4 +45,9 @@ public class NatsOptions
4445
/// The number of connections used to send stream messages to NATS JetStream.
4546
/// </summary>
4647
public int ProducerCount { get; set; } = 8;
47-
}
48+
49+
/// <summary>
50+
/// System.Text.Json serializer options to be used by the NATS provider.
51+
/// </summary>
52+
public JsonSerializerOptions JsonSerializerOptions { get; set; } = default!;
53+
}

src/Orleans.Streaming.NATS/Providers/NatsAdapterFactory.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public NatsAdapterFactory(
4242
adapterCache = new SimpleQueueAdapterCache(cacheOptions, this.providerName, this.loggerFactory);
4343
}
4444

45-
4645
/// <summary> Init the factory.</summary>
4746
public virtual void Init()
4847
{
@@ -97,4 +96,4 @@ public static NatsAdapterFactory Create(IServiceProvider services, string name)
9796
factory.Init();
9897
return factory;
9998
}
100-
}
99+
}

src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
using NATS.Client.Core;
77
using NATS.Client.JetStream;
88
using NATS.Client.JetStream.Models;
9+
using NATS.Client.Serializers.Json;
910

1011
namespace Orleans.Streaming.NATS;
1112

1213
/// <summary>
13-
/// Wrapper around a NATS and JetStream APIs
14+
/// Wrapper around NATS and JetStream APIs
1415
/// </summary>
1516
internal sealed class NatsConnectionManager
1617
{
17-
private const string AckPayload = "+ACK";
18-
private static readonly NatsJsonContextSerializer<NatsStreamMessage> Serializer;
18+
private static readonly byte[] AckPayload = "+ACK"u8.ToArray();
1919
private readonly string _providerName;
2020
private readonly NatsOpts _natsClientOptions;
2121
private readonly NatsConnection _natsConnection;
@@ -25,20 +25,35 @@ internal sealed class NatsConnectionManager
2525
private readonly NatsJSContext[] _producerNatsContexts;
2626
private readonly NatsJSContext _natsContext;
2727

28-
static NatsConnectionManager()
29-
{
30-
Serializer = new NatsJsonContextSerializer<NatsStreamMessage>(NatsSerializerContext.Default);
31-
}
32-
3328
[GeneratedActivatorConstructor]
3429
public NatsConnectionManager(string providerName, ILoggerFactory loggerFactory, NatsOptions options)
3530
{
3631
this._providerName = providerName;
3732
this._loggerFactory = loggerFactory;
3833
this._logger = this._loggerFactory.CreateLogger<NatsConnectionManager>();
3934
this._options = options;
40-
this._natsClientOptions =
41-
this._options.NatsClientOptions ?? NatsOpts.Default with { Name = $"Orleans-{this._providerName}" };
35+
this._options.JsonSerializerOptions.TypeInfoResolverChain.Add(NatsSerializerContext.Default);
36+
if (this._options.NatsClientOptions is null)
37+
{
38+
this._options.NatsClientOptions = NatsOpts.Default with
39+
{
40+
Name = $"Orleans-{this._providerName}",
41+
SerializerRegistry =
42+
new NatsJsonContextOptionsSerializerRegistry(this._options.JsonSerializerOptions)
43+
};
44+
}
45+
else
46+
{
47+
this._options.NatsClientOptions = this._options.NatsClientOptions with
48+
{
49+
Name = string.IsNullOrWhiteSpace(this._options.NatsClientOptions.Name)
50+
? $"Orleans-{this._providerName}"
51+
: this._options.NatsClientOptions.Name,
52+
SerializerRegistry = new NatsJsonContextOptionsSerializerRegistry(this._options.JsonSerializerOptions)
53+
};
54+
}
55+
56+
this._natsClientOptions = this._options.NatsClientOptions;
4257
this._natsConnection = new NatsConnection(this._natsClientOptions);
4358
this._natsContext = new NatsJSContext(this._natsConnection);
4459

@@ -93,6 +108,7 @@ public async Task Initialize(CancellationToken cancellationToken = default)
93108
{
94109
var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"])
95110
{
111+
Retention = StreamConfigRetention.Workqueue,
96112
SubjectTransform = new SubjectTransform
97113
{
98114
Src = $"{this._providerName}.*.*",
@@ -140,7 +156,10 @@ public async Task EnqueueMessage(NatsStreamMessage message, CancellationToken ca
140156

141157
var context = this._producerNatsContexts[Math.Abs(id.GetHashCode()) % this._producerNatsContexts.Length];
142158

143-
var ack = await context.TryPublishAsync(subject, message, Serializer,
159+
var ack = await context.TryPublishAsync(
160+
subject,
161+
message,
162+
this._natsClientOptions.SerializerRegistry.GetSerializer<NatsStreamMessage>(),
144163
cancellationToken: cancellationToken);
145164

146165
if (ack.Success)
@@ -165,14 +184,15 @@ public NatsStreamConsumer CreateConsumer(uint partition) =>
165184
this._options.StreamName,
166185
partition,
167186
this._options.BatchSize,
168-
Serializer);
187+
this._natsClientOptions.SerializerRegistry.GetDeserializer<NatsStreamMessage>());
169188

170189
/// <summary>
171190
/// Acknowledge messages on a subject in a NATS JetStream stream
172191
/// </summary>
173192
/// <param name="subject">The ReplyTo subject</param>
174193
public async Task AcknowledgeMessages(string subject)
175194
{
176-
await this._natsConnection.PublishAsync(subject, AckPayload);
195+
await this._natsConnection
196+
.PublishAsync(subject, AckPayload, serializer: NatsRawSerializer<byte[]>.Default);
177197
}
178-
}
198+
}

src/Orleans.Streaming.NATS/Providers/NatsQueueAdapterReceiver.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,16 @@ public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
130130

131131
if (messages.Count == 0) return;
132132

133-
var natsMessages = messages
134-
.Cast<NatsBatchContainer>().Select(b => b.ReplyTo).ToArray();
135-
136133
var tasks = new List<Task>();
137134

138-
foreach (var message in natsMessages)
135+
foreach (var message in messages)
139136
{
140-
if (message is null) continue;
141-
142-
tasks.Add(this._nats.AcknowledgeMessages(message));
137+
if (message is NatsBatchContainer natsMessage && !string.IsNullOrWhiteSpace(natsMessage.ReplyTo))
138+
{
139+
tasks.Add(this._nats.AcknowledgeMessages(natsMessage.ReplyTo));
140+
}
143141
}
144142

145-
await Task.WhenAll(tasks);
143+
await Task.WhenAll(tasks);
146144
}
147-
}
145+
}

src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ internal sealed class NatsStreamConsumer(
1919
string stream,
2020
uint partition,
2121
int batchSize,
22-
NatsJsonContextSerializer<NatsStreamMessage> serializer)
22+
INatsDeserialize<NatsStreamMessage> serializer)
2323
{
2424
private readonly ILogger _logger = loggerFactory.CreateLogger<NatsStreamConsumer>();
2525

2626
private readonly ConsumerConfig _config = new($"orleans-{provider}-{stream}-{partition}")
2727
{
2828
FilterSubject = $"{provider}.{partition}.>",
2929
MaxBatch = batchSize,
30-
DeliverPolicy = ConsumerConfigDeliverPolicy.LastPerSubject,
30+
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
3131
MaxAckPending = batchSize
3232
};
3333

@@ -77,4 +77,4 @@ public async Task Initialize(CancellationToken cancellationToken = default)
7777

7878
this._consumer = consumer;
7979
}
80-
}
80+
}

0 commit comments

Comments
 (0)