diff --git a/src/StackExchange.Redis/ClientInfo.cs b/src/StackExchange.Redis/ClientInfo.cs index f04058495..c5ce0d0bf 100644 --- a/src/StackExchange.Redis/ClientInfo.cs +++ b/src/StackExchange.Redis/ClientInfo.cs @@ -129,10 +129,15 @@ public sealed class ClientInfo public string? Name { get; private set; } /// - /// Number of pattern matching subscriptions. + /// Number of pattern-matching subscriptions. /// public int PatternSubscriptionCount { get; private set; } + /// + /// Number of sharded subscriptions. + /// + public int ShardedSubscriptionCount { get; private set; } + /// /// The port of the client. /// @@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[ case "name": client.Name = value; break; case "sub": client.SubscriptionCount = Format.ParseInt32(value); break; case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break; + case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break; case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break; case "cmd": client.LastCommand = value; break; case "flags": diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 5e0dbbf60..954738910 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1670,10 +1670,10 @@ private void MatchResult(in RawResult result) // invoke the handlers RedisChannel channel; if (items[0].IsEqual(message)) { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); Trace("MESSAGE: " + channel); } else { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); Trace("SMESSAGE: " + channel); } if (!channel.IsNull) @@ -1696,19 +1696,22 @@ private void MatchResult(in RawResult result) { _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + Trace("PMESSAGE: " + channel); if (!channel.IsNull) { if (TryGetPubSubPayload(items[3], out var payload)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payloads); } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 3b42e204e..a797b641a 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1896,4 +1896,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult! StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void +StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool +static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel +StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void diff --git a/src/StackExchange.Redis/RawResult.cs b/src/StackExchange.Redis/RawResult.cs index dd3ce9920..55c44652b 100644 --- a/src/StackExchange.Redis/RawResult.cs +++ b/src/StackExchange.Redis/RawResult.cs @@ -161,7 +161,7 @@ public bool MoveNext() } public ReadOnlySequence Current { get; private set; } } - internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded) + internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options) { switch (Resp2TypeBulkString) { @@ -169,12 +169,13 @@ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.Pattern case ResultType.BulkString: if (channelPrefix == null) { - return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode); + return new RedisChannel(GetBlob(), options); } if (StartsWith(channelPrefix)) { byte[] copy = Payload.Slice(channelPrefix.Length).ToArray(); - return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode); + + return new RedisChannel(copy, options); } return default; default: diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 9c0cadbf4..d93651604 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -9,8 +9,18 @@ namespace StackExchange.Redis public readonly struct RedisChannel : IEquatable { internal readonly byte[]? Value; - internal readonly bool _isPatternBased; - internal readonly bool _isSharded; + + internal readonly RedisChannelOptions Options; + + [Flags] + internal enum RedisChannelOptions + { + None = 0, + Pattern = 1 << 0, + Sharded = 1 << 1, + } + + internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH; /// /// Indicates whether the channel-name is either null or a zero-length value. @@ -20,7 +30,12 @@ namespace StackExchange.Redis /// /// Indicates whether this channel represents a wildcard pattern (see PSUBSCRIBE). /// - public bool IsPattern => _isPatternBased; + public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0; + + /// + /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) + /// + public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0; /// /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) @@ -65,34 +80,31 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { } + public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode. /// /// The string name of the channel to create. /// The mode for name matching. - public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } + public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } /// - /// Create a new redis channel from a buffer, explicitly controlling the sharding mode. + /// Create a new redis channel from a buffer, representing a sharded channel. /// /// The name of the channel to create. - /// Whether the channel is sharded. - public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {} + public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded); /// - /// Create a new redis channel from a string, explicitly controlling the sharding mode. + /// Create a new redis channel from a string, representing a sharded channel. /// /// The string name of the channel to create. - /// Whether the channel is sharded. - public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {} + public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded); - private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) + internal RedisChannel(byte[]? value, RedisChannelOptions options) { Value = value; - _isPatternBased = isPatternBased; - _isSharded = isSharded; + Options = options; } private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch @@ -144,7 +156,7 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, RedisChannel y) => - x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded; + x.Options == y.Options && RedisValue.Equals(x.Value, y.Value); /// /// Indicate whether two channel names are equal. @@ -192,10 +204,10 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) /// Indicate whether two channel names are equal. /// /// The to compare to. - public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded; + public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value); /// - public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0); + public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options; /// /// Obtains a string representation of the channel name. @@ -224,7 +236,7 @@ internal RedisChannel Clone() return this; } var copy = (byte[])Value.Clone(); // defensive array copy - return new RedisChannel(copy, _isPatternBased); + return new RedisChannel(copy, Options); } /// diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 7468bdb64..716176662 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1575,14 +1575,14 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index 225516433..faba07e68 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -186,6 +186,11 @@ public RedisFeatures(Version version) /// public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0); + /// + /// Are SSUBSCRIBE and SPUBLISH available? + /// + public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1); + /// /// Are ZPOPMIN and ZPOPMAX available? /// diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 2b2076e03..b641baf05 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -182,19 +182,25 @@ public Subscription(CommandFlags flags) /// internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) { - var isPattern = channel._isPatternBased; - var isSharded = channel._isSharded; + var isPattern = channel.IsPattern; + var isSharded = channel.IsSharded; var command = action switch { - SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE, - SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE, - - SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE, - SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE, - - SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE, - SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE, - _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"), + SubscriptionAction.Subscribe => channel.Options switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + SubscriptionAction.Unsubscribe => channel.Options switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + _ => Unknown(action, channel.Options), }; // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica @@ -207,6 +213,9 @@ internal Message GetMessage(RedisChannel channel, SubscriptionAction action, Com return msg; } + private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options) + => throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'"); + public void Add(Action? handler, ChannelMessageQueue? queue) { if (handler != null) @@ -374,14 +383,14 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 88e4a00ef..15619a4bb 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -73,7 +73,7 @@ public static readonly ResultProcessor public static readonly ResultProcessor PersistResultArray = new PersistResultArrayProcessor(); public static readonly ResultProcessor - RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.PatternMode.Literal); + RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions.None); public static readonly ResultProcessor RedisKey = new RedisKeyProcessor(); @@ -1504,20 +1504,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes private sealed class RedisChannelArrayProcessor : ResultProcessor { - private readonly RedisChannel.PatternMode mode; - public RedisChannelArrayProcessor(RedisChannel.PatternMode mode) + private readonly RedisChannel.RedisChannelOptions options; + public RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions options) { - this.mode = mode; + this.options = options; } private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell { public readonly byte[]? Prefix; - public readonly RedisChannel.PatternMode Mode; - public ChannelState(byte[]? prefix, RedisChannel.PatternMode mode) + public readonly RedisChannel.RedisChannelOptions Options; + public ChannelState(byte[]? prefix, RedisChannel.RedisChannelOptions options) { Prefix = prefix; - Mode = mode; + Options = options; } } protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) @@ -1526,8 +1526,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { case ResultType.Array: var final = result.ToArray( - (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), - new ChannelState(connection.ChannelPrefix, mode))!; + (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Options), + new ChannelState(connection.ChannelPrefix, options))!; SetResult(message, final); return true; diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 742ce51bb..34890a89a 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -1,11 +1,11 @@ -using System; +using StackExchange.Redis.Profiling; +using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; -using StackExchange.Redis.Profiling; using Xunit; using Xunit.Abstractions; @@ -746,4 +746,62 @@ public void ConnectIncludesSubscriber() Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState); } } + + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ClusterPubSub(bool sharded) + { + var guid = Guid.NewGuid().ToString(); + var channel = sharded ? RedisChannel.Sharded(guid) : RedisChannel.Literal(guid); + using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false); + Assert.True(conn.IsConnected); + + var pubsub = conn.GetSubscriber(); + List<(RedisChannel, RedisValue)> received = new(); + var queue = await pubsub.SubscribeAsync(channel); + _ = Task.Run(async () => + { // use queue API to have control over order + await foreach (var item in queue) + { + lock (received) + { + received.Add((item.Channel, item.Message)); + } + } + }); + + var db = conn.GetDatabase(); + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + for (int i = 0; i < 10; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + await pubsub.UnsubscribeAsync(channel); + + (RedisChannel Channel, RedisValue Value)[] snap; + lock (received) + { + snap = received.ToArray(); // in case of concurrency + } + Log("items received: {0}", snap.Length); + Assert.Equal(10, snap.Length); + // separate log and validate loop here simplifies debugging (ask me how I know!) + for (int i = 0; i < 10; i++) + { + var pair = snap[i]; + Log("element {0}: {1}/{2}", i, pair.Channel, pair.Value); + } + for (int i = 0; i < 10; i++) + { + var pair = snap[i]; + Assert.Equal(channel, pair.Channel); + Assert.Equal(i, pair.Value); + } + } } diff --git a/toys/StackExchange.Redis.Server/RedisRequest.cs b/toys/StackExchange.Redis.Server/RedisRequest.cs index 283076905..36d133bab 100644 --- a/toys/StackExchange.Redis.Server/RedisRequest.cs +++ b/toys/StackExchange.Redis.Server/RedisRequest.cs @@ -45,8 +45,8 @@ public int GetInt32(int index) public RedisKey GetKey(int index) => _inner[index].AsRedisKey(); - public RedisChannel GetChannel(int index, RedisChannel.PatternMode mode) - => _inner[index].AsRedisChannel(null, mode, false); + internal RedisChannel GetChannel(int index, RedisChannel.RedisChannelOptions options) + => _inner[index].AsRedisChannel(null, options); internal bool TryGetCommandBytes(int i, out CommandBytes command) { diff --git a/toys/StackExchange.Redis.Server/RedisServer.cs b/toys/StackExchange.Redis.Server/RedisServer.cs index 63efbfd1b..52728fd44 100644 --- a/toys/StackExchange.Redis.Server/RedisServer.cs +++ b/toys/StackExchange.Redis.Server/RedisServer.cs @@ -479,7 +479,7 @@ private TypedRedisValue SubscribeImpl(RedisClient client, RedisRequest request) int index = 0; request.TryGetCommandBytes(0, out var cmd); var cmdString = TypedRedisValue.BulkString(cmd.ToArray()); - var mode = cmd[0] == (byte)'p' ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + var mode = cmd[0] == (byte)'p' ? RedisChannel.RedisChannelOptions.Pattern : RedisChannel.RedisChannelOptions.None; for (int i = 1; i < request.Count; i++) { var channel = request.GetChannel(i, mode);