Native .NET client library for Streamline streaming platform.
dotnet add package Streamline.Clientusing Streamline.Client;
// Create client
var client = new StreamlineClient("localhost:9092");
// Produce a message
var metadata = await client.ProduceAsync("my-topic", "key", "Hello, Streamline!");
Console.WriteLine($"Produced to partition {metadata.Partition} at offset {metadata.Offset}");using var producer = client.CreateProducer<string, string>();
await producer.BeginTransactionAsync();
try
{
await producer.SendAsync("orders", "k1", "v1");
await producer.SendAsync("orders", "k2", "v2");
await producer.CommitTransactionAsync();
}
catch
{
await producer.AbortTransactionAsync();
throw;
}Note: Transactions use client-side buffering. Messages are collected and sent as a batch on commit, providing all-or-nothing delivery at the client level.
using Streamline.Client;
var client = new StreamlineClient("localhost:9092");
await using var consumer = client.CreateConsumer<string, string>("my-topic", "my-group");
await consumer.SubscribeAsync();
await foreach (var record in consumer.ConsumeAsync())
{
Console.WriteLine($"Received: {record.Value} at offset {record.Offset}");
}// In Program.cs or Startup.cs
services.AddStreamline(options =>
{
options.BootstrapServers = "localhost:9092";
options.Producer.BatchSize = 16384;
options.Consumer.GroupId = "my-app";
});
// In your service
public class EventService
{
private readonly IStreamlineClient _client;
public EventService(IStreamlineClient client)
{
_client = client;
}
public async Task PublishEventAsync(string topic, Event evt)
{
await _client.ProduceAsync(topic, evt.Id, evt);
}
}- Async/await support throughout
IAsyncEnumerablefor consuming messages- Admin client (topic management, consumer groups, SQL queries via HTTP REST API)
- SQL query support via
QueryClient - Dependency injection integration (
AddStreamline(),AddStreamlineAdmin()) - Connection pooling
- Automatic reconnection
- Compression support (LZ4, Zstd, Snappy, Gzip)
- TLS/mTLS support with configurable certificates
- SASL authentication (PLAIN, SCRAM-SHA-256/512)
- Schema Registry integration
- Testcontainers integration for seamless integration testing
- OpenTelemetry-compatible distributed tracing via
System.Diagnostics.ActivitySource
The SDK provides distributed tracing using the standard .NET ActivitySource API,
which is built into the runtime and compatible with OpenTelemetry .NET SDK. No
additional NuGet packages are required for basic tracing.
// In Program.cs
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource(StreamlineActivitySource.SourceName) // "Streamline.Client"
.AddOtlpExporter());using Streamline.Client.Telemetry;
// Produce with tracing
using var produceActivity = StreamlineActivitySource.StartProduce("orders");
var headers = new Headers();
StreamlineActivitySource.InjectContext(headers); // propagate trace context
await producer.SendAsync("orders", key, value, headers);
// Consume with tracing
using var consumeActivity = StreamlineActivitySource.StartConsume("events");
var records = await consumer.PollAsync(TimeSpan.FromMilliseconds(100));
consumeActivity?.SetTag("messaging.batch.message_count", records.Count.ToString());
// Process individual records with context extraction
foreach (var record in records)
{
var parentCtx = StreamlineActivitySource.ExtractContext(record.Headers);
using var processActivity = StreamlineActivitySource.StartProcess(
record.Topic, record.Partition, record.Offset, parentCtx);
ProcessRecord(record);
}| Attribute | Value |
|---|---|
| Activity name | {topic} {operation} (e.g., "orders produce") |
messaging.system |
streamline |
messaging.destination.name |
Topic name |
messaging.operation |
produce, consume, or process |
| Activity kind | Producer for produce, Consumer for consume |
When no OpenTelemetry listener is registered, StartActivity returns null and
there is zero overhead. Trace context is propagated via W3C TraceContext headers.
var options = new StreamlineOptions
{
BootstrapServers = "localhost:9092",
ConnectionPoolSize = 4,
ConnectTimeout = TimeSpan.FromSeconds(30),
RequestTimeout = TimeSpan.FromSeconds(30)
};
var client = new StreamlineClient(options);var producerOptions = new ProducerOptions
{
BatchSize = 16384,
LingerMs = 1,
CompressionType = CompressionType.Lz4,
Retries = 3
};
var producer = client.CreateProducer<string, string>(producerOptions);var consumerOptions = new ConsumerOptions
{
GroupId = "my-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
MaxPollRecords = 500
};
var consumer = client.CreateConsumer<string, string>("my-topic", consumerOptions);- .NET 8.0 or later
- Streamline server 0.2.0 or later
Use the Streamline.TestContainers package for fully automated integration testing with Docker:
dotnet add package Streamline.TestContainersusing Streamline.TestContainers;
using Xunit;
public class StreamlineFixture : IAsyncLifetime
{
public StreamlineContainer Container { get; private set; } = null!;
public async Task InitializeAsync()
{
Container = new StreamlineBuilder()
.WithDebugLogging()
.Build();
await Container.StartAsync();
}
public async Task DisposeAsync()
{
await Container.DisposeAsync();
}
}
[Collection("Streamline")]
public class MyIntegrationTests
{
private readonly StreamlineFixture _fixture;
public MyIntegrationTests(StreamlineFixture fixture) => _fixture = fixture;
[Fact]
public async Task Should_produce_and_consume()
{
await using var client = _fixture.Container.CreateClient();
await client.ProduceAsync("my-topic", "key", "Hello!");
}
}See the testcontainers README for full documentation.
Start a local Streamline server:
docker compose -f docker-compose.test.yml up -dRun tests:
dotnet test| Method | Description |
|---|---|
new StreamlineClient(bootstrapServers) |
Create a new client |
new StreamlineClient(options, logger) |
Create with configuration |
await client.ProduceAsync(topic, key, value) |
Send a message |
await client.ProduceAsync(topic, key, value, headers) |
Send with headers |
client.CreateProducer<TKey, TValue>() |
Create a producer |
client.CreateConsumer<TKey, TValue>(topic, groupId) |
Create a consumer |
await client.IsHealthyAsync() |
Check health status |
| Method | Description |
|---|---|
await producer.SendAsync(topic, key, value) |
Send a message |
await producer.SendAsync(topic, key, value, headers) |
Send with headers |
await producer.FlushAsync() |
Flush buffered messages |
| Method | Description |
|---|---|
await consumer.SubscribeAsync() |
Subscribe to topic |
consumer.ConsumeAsync() |
Get IAsyncEnumerable of messages |
await consumer.PollAsync(timeout) |
Poll for records |
await consumer.CommitAsync() |
Commit offsets |
await consumer.SeekToBeginningAsync() |
Seek to start |
await consumer.SeekToEndAsync() |
Seek to end |
await consumer.SeekAsync(partition, offset) |
Seek to specific offset |
var admin = client.CreateAdmin("http://localhost:9094");
// Cluster overview
var cluster = await admin.GetClusterInfoAsync();
Console.WriteLine($"Cluster: {cluster.ClusterId}, Brokers: {cluster.Brokers.Count}");
// Consumer group lag monitoring
var lag = await admin.GetConsumerGroupLagAsync("my-group");
Console.WriteLine($"Total lag: {lag.TotalLag}");
foreach (var p in lag.Partitions)
Console.WriteLine($" {p.Topic}:{p.Partition} lag={p.Lag}");
// Message inspection
var messages = await admin.InspectMessagesAsync("events", partition: 0, limit: 10);
foreach (var m in messages)
Console.WriteLine($"offset={m.Offset} value={m.Value}");
// Latest messages
var latest = await admin.LatestMessagesAsync("events", count: 5);
// Server metrics
var metrics = await admin.MetricsHistoryAsync();
// Basic operations
await admin.CreateTopicAsync("events", partitions: 3);
var topics = await admin.ListTopicsAsync();
var info = await admin.GetServerInfoAsync();
var healthy = await admin.IsHealthyAsync();using Streamline;
try
{
await client.ProduceAsync("my-topic", "key", "value");
}
catch (TopicNotFoundException ex)
{
Console.WriteLine($"Topic not found: {ex.Message}");
Console.WriteLine($"Hint: {ex.Hint}");
}
catch (StreamlineException ex) when (ex.Retryable)
{
Console.WriteLine($"Retryable error: {ex.Message}");
}
catch (StreamlineException ex)
{
Console.WriteLine($"Fatal error: {ex.Message}");
}| Property | Default | Description |
|---|---|---|
BootstrapServers |
"localhost:9092" |
Comma-separated broker addresses |
ConnectionPoolSize |
4 |
Number of connections per broker |
ConnectTimeout |
10s |
Connection timeout (TimeSpan) |
RequestTimeout |
30s |
Request timeout (TimeSpan) |
| Property | Default | Description |
|---|---|---|
BatchSize |
16384 |
Maximum batch size in bytes |
LingerMs |
0 |
Batch linger time in milliseconds |
CompressionType |
None |
Compression: None, Gzip, Snappy, Lz4, Zstd |
Retries |
3 |
Retries on transient failures |
Acks |
Leader |
Acknowledgment: None, Leader, All |
EnableIdempotence |
false |
Enable exactly-once semantics |
| Property | Default | Description |
|---|---|---|
GroupId |
(required) | Consumer group identifier |
AutoOffsetReset |
Latest |
Start position: Earliest, Latest |
EnableAutoCommit |
true |
Automatically commit offsets |
MaxPollRecords |
500 |
Maximum records per poll |
SessionTimeoutMs |
30000 |
Session timeout in milliseconds |
| Property | Default | Description |
|---|---|---|
SecurityProtocol |
Plaintext |
Protocol: Plaintext, Ssl, SaslPlaintext, SaslSsl |
SaslMechanism |
— | SASL mechanism: Plain, ScramSha256, ScramSha512 |
SslCaLocation |
— | Path to CA certificate |
Protect your application from cascading failures when the Streamline server is unresponsive:
using Streamline.Client;
var breaker = new CircuitBreaker(new CircuitBreakerOptions
{
FailureThreshold = 5, // Open after 5 consecutive failures
SuccessThreshold = 2, // Close after 2 half-open successes
OpenTimeout = TimeSpan.FromSeconds(30),
});
breaker.OnStateChange += (from, to) =>
logger.LogInformation("Circuit: {From} → {To}", from, to);
// Wrap async producer calls
var metadata = await breaker.ExecuteAsync(async () =>
await producer.SendAsync("events", "user-1", eventPayload)
);When the circuit is open, ExecuteAsync throws a retryable StreamlineException. See the Circuit Breaker guide for details.
The examples/ directory contains runnable examples:
| Example | Description |
|---|---|
| BasicUsage.cs | Produce, consume, and admin operations |
| QueryUsage | SQL analytics with the embedded query engine |
| SchemaRegistryUsage | Schema registration and validation |
| CircuitBreakerUsage | Resilient production with circuit breaker |
| SecurityUsage | TLS and SASL authentication |
Run any example:
dotnet run --project examples/QueryUsage
⚠️ Experimental — These features require Streamline server 0.3.0+ with moonshot feature flags enabled.
Query topics by meaning instead of offset. Requires a topic created with semantic.embed=true.
var results = await client.SearchAsync("logs.app", "payment failure", k: 10);
foreach (var hit in results)
{
Console.WriteLine($"[p{hit.Partition}] offset={hit.Offset} score={hit.Score:F2}");
}Verify cryptographic provenance attestations attached to records by data contracts.
using Streamline.Client;
var verifier = new StreamlineVerifier(publicKeyBytes);
var result = verifier.Verify(record);
Console.WriteLine($"Verified: {result.Verified}, Producer: {result.ProducerId}");Use Streamline as persistent memory for AI agents via the MCP protocol.
using Streamline.Client;
var memory = new MemoryClient("http://localhost:9094/mcp/v1");
await memory.RememberAsync("user prefers dark mode", tags: new[] { "preferences" });
var results = await memory.RecallAsync("user preferences", k: 5);Create topic branches for replay, A/B testing, or counterfactual analysis.
var branch = await admin.CreateBranchAsync("events", "experiment-v2");
await using var consumer = client.CreateConsumer<string, string>(branch.Topic, "branch-group");
await foreach (var record in consumer.ConsumeAsync())
{
Process(record);
}Contributions are welcome! Please see the organization contributing guide for guidelines.
Apache 2.0
To report a security vulnerability, please email security@streamline.dev. Do not open a public issue.
See the Security Policy for details.