Skip to content
8 changes: 7 additions & 1 deletion src/StackExchange.Redis/ClientInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ public sealed class ClientInfo
public string? Name { get; private set; }

/// <summary>
/// Number of pattern matching subscriptions.
/// Number of pattern-matching subscriptions.
/// </summary>
public int PatternSubscriptionCount { get; private set; }

/// <summary>
/// Number of sharded subscriptions.
/// </summary>
public int ShardedSubscriptionCount { get; private set; }

/// <summary>
/// The port of the client.
/// </summary>
Expand Down Expand Up @@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
case "name": client.Name = value; break;
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
case "cmd": client.LastCommand = value; break;
case "flags":
Expand Down
13 changes: 8 additions & 5 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,10 +1670,10 @@ private void MatchResult(in RawResult result)
// invoke the handlers
RedisChannel channel;
if (items[0].IsEqual(message)) {
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
Trace("MESSAGE: " + channel);
} else {
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true);
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
Expand All @@ -1696,19 +1696,22 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
Expand Down
4 changes: 4 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1896,4 +1896,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void
7 changes: 4 additions & 3 deletions src/StackExchange.Redis/RawResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,21 @@ public bool MoveNext()
}
public ReadOnlySequence<byte> Current { get; private set; }
}
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded)
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
{
switch (Resp2TypeBulkString)
{
case ResultType.SimpleString:
case ResultType.BulkString:
if (channelPrefix == null)
{
return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode);
return new RedisChannel(GetBlob(), options);
}
if (StartsWith(channelPrefix))
{
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode);

return new RedisChannel(copy, options);
}
return default;
default:
Expand Down
48 changes: 30 additions & 18 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@
public readonly struct RedisChannel : IEquatable<RedisChannel>
{
internal readonly byte[]? Value;
internal readonly bool _isPatternBased;
internal readonly bool _isSharded;

internal readonly RedisChannelOptions Options;

[Flags]
internal enum RedisChannelOptions
{
None = 0,
Pattern = 1 << 0,
Sharded = 1 << 1,
}

internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
Expand All @@ -20,12 +30,17 @@
/// <summary>
/// Indicates whether this channel represents a wildcard pattern (see <c>PSUBSCRIBE</c>).
/// </summary>
public bool IsPattern => _isPatternBased;
public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;

/// <summary>
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
/// </summary>
public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;

/// <summary>
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
/// </summary>
public bool IsSharded => _isSharded;

Check failure on line 43 in src/StackExchange.Redis/RedisChannel.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

The type 'RedisChannel' already contains a definition for 'IsSharded'

Check failure on line 43 in src/StackExchange.Redis/RedisChannel.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

The type 'RedisChannel' already contains a definition for 'IsSharded'

Check failure on line 43 in src/StackExchange.Redis/RedisChannel.cs

View workflow job for this annotation

GitHub Actions / StackExchange.Redis (Ubuntu)

The type 'RedisChannel' already contains a definition for 'IsSharded'

internal bool IsNull => Value == null;

Expand Down Expand Up @@ -65,34 +80,31 @@
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { }
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { }

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { }

/// <summary>
/// Create a new redis channel from a buffer, explicitly controlling the sharding mode.
/// Create a new redis channel from a buffer, representing a sharded channel.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="isSharded">Whether the channel is sharded.</param>
public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {}
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the sharding mode.
/// Create a new redis channel from a string, representing a sharded channel.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="isSharded">Whether the channel is sharded.</param>
public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {}
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);

private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
_isPatternBased = isPatternBased;
_isSharded = isSharded;
Options = options;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
Expand Down Expand Up @@ -144,7 +156,7 @@
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded;
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
Expand Down Expand Up @@ -192,10 +204,10 @@
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded;
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0);
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -224,7 +236,7 @@
return this;
}
var copy = (byte[])Value.Clone(); // defensive array copy
return new RedisChannel(copy, _isPatternBased);
return new RedisChannel(copy, Options);
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,14 +1575,14 @@ public Task<LCSMatchResult> StringLongestCommonSubsequenceWithMatchesAsync(Redis
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

Expand Down
5 changes: 5 additions & 0 deletions src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ public RedisFeatures(Version version)
/// </summary>
public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0);

/// <summary>
/// Are <see href="https://redis.io/commands/ssubscribe/">SSUBSCRIBE</see> and <see href="https://redis.io/commands/spublish/">SPUBLISH</see> available?
/// </summary>
public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1);

/// <summary>
/// Are <see href="https://redis.io/commands/zpopmin/">ZPOPMIN</see> and <see href="https://redis.io/commands/zpopmax/">ZPOPMAX</see> available?
/// </summary>
Expand Down
35 changes: 22 additions & 13 deletions src/StackExchange.Redis/RedisSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,25 @@ public Subscription(CommandFlags flags)
/// </summary>
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
{
var isPattern = channel._isPatternBased;
var isSharded = channel._isSharded;
var isPattern = channel.IsPattern;
var isSharded = channel.IsSharded;
var command = action switch
{
SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE,
SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE,

SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE,
SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE,

SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE,
SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE,
_ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"),
SubscriptionAction.Subscribe => channel.Options switch
{
RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
_ => Unknown(action, channel.Options),
},
SubscriptionAction.Unsubscribe => channel.Options switch
{
RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE,
_ => Unknown(action, channel.Options),
},
_ => Unknown(action, channel.Options),
};

// TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica
Expand All @@ -207,6 +213,9 @@ internal Message GetMessage(RedisChannel channel, SubscriptionAction action, Com
return msg;
}

private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options)
=> throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'");

public void Add(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueue? queue)
{
if (handler != null)
Expand Down Expand Up @@ -374,14 +383,14 @@ private static void ThrowIfNull(in RedisChannel channel)
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

Expand Down
18 changes: 9 additions & 9 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static readonly ResultProcessor<long?>
public static readonly ResultProcessor<PersistResult[]> PersistResultArray = new PersistResultArrayProcessor();

public static readonly ResultProcessor<RedisChannel[]>
RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.PatternMode.Literal);
RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions.None);

public static readonly ResultProcessor<RedisKey>
RedisKey = new RedisKeyProcessor();
Expand Down Expand Up @@ -1504,20 +1504,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes

private sealed class RedisChannelArrayProcessor : ResultProcessor<RedisChannel[]>
{
private readonly RedisChannel.PatternMode mode;
public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
private readonly RedisChannel.RedisChannelOptions options;
public RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions options)
{
this.mode = mode;
this.options = options;
}

private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell
{
public readonly byte[]? Prefix;
public readonly RedisChannel.PatternMode Mode;
public ChannelState(byte[]? prefix, RedisChannel.PatternMode mode)
public readonly RedisChannel.RedisChannelOptions Options;
public ChannelState(byte[]? prefix, RedisChannel.RedisChannelOptions options)
{
Prefix = prefix;
Mode = mode;
Options = options;
}
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
Expand All @@ -1526,8 +1526,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
case ResultType.Array:
var final = result.ToArray(
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false),
new ChannelState(connection.ChannelPrefix, mode))!;
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Options),
new ChannelState(connection.ChannelPrefix, options))!;

SetResult(message, final);
return true;
Expand Down
Loading
Loading