关于redis的订阅和发布功能,这里讲到比较好https://redisbook.readthedocs.io/en/latest/feature/pubsub.html
signalr 实际上就是使用了Redis内置的发布订阅功能来同步不同服务器上的客户端信息。
每个连接有一个connectionId,连接成功的时候就会订阅这个connectionId + 回调,
send的实质是会发布一下这个connectionId。
这样 即使是在其他进程上send ,也会通过redis的订阅机制,传信息给订阅者,然后执行回调操作。
这样就轻松实现了跨进程通讯。
具体可以参考源码:
csharp;gutter:true;
// Copyright (c) .NET Foundation. All rights reserved.</p>
<p>// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.</p>
<p>using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Redis.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;</p>
<p>namespace Microsoft.AspNetCore.SignalR.Redis
{
public class RedisHubLifetimeManager : HubLifetimeManager, IDisposable where THub : Hub
{
private readonly HubConnectionStore _connections = new HubConnectionStore();
private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager();
private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager();
private IConnectionMultiplexer _redisServerConnection;
private ISubscriber _bus;
private readonly ILogger _logger;
private readonly RedisOptions _options;
private readonly RedisChannels _channels;
private readonly string _serverName = GenerateServerName();
private readonly RedisProtocol _protocol;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);</p>
<pre><code> private readonly AckHandler _ackHandler;
private int _internalId;
public RedisHubLifetimeManager(ILogger> logger,
IOptions options,
IHubProtocolResolver hubProtocolResolver)
{
_logger = logger;
_options = options.Value;
_ackHandler = new AckHandler();
_channels = new RedisChannels(typeof(THub).FullName);
_protocol = new RedisProtocol(hubProtocolResolver.AllProtocols);
RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName);
_ = EnsureRedisServerConnection();
}
public override async Task OnConnectedAsync(HubConnectionContext connection)
{
await EnsureRedisServerConnection();
var feature = new RedisFeature();
connection.Features.Set(feature);
var connectionTask = Task.CompletedTask;
var userTask = Task.CompletedTask;
_connections.Add(connection);
connectionTask = SubscribeToConnection(connection);
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
userTask = SubscribeToUser(connection);
}
await Task.WhenAll(connectionTask, userTask);
}
public override Task OnDisconnectedAsync(HubConnectionContext connection)
{
_connections.Remove(connection);
var tasks = new List();
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Unsubscribe(_logger, connectionChannel);
tasks.Add(_bus.UnsubscribeAsync(connectionChannel));
var feature = connection.Features.Get();
var groupNames = feature.Groups;
if (groupNames != null)
{
// Copy the groups to an array here because they get removed from this collection
// in RemoveFromGroupAsync
foreach (var group in groupNames.ToArray())
{
// Use RemoveGroupAsyncCore because the connection is local and we don't want to
// accidentally go to other servers with our remove request.
tasks.Add(RemoveGroupAsyncCore(connection, group));
}
}
if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
tasks.Add(RemoveUserAsync(connection));
}
return Task.WhenAll(tasks);
}
public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.All, message);
}
public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.All, message);
}
public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
// If the connection is local we can skip sending the message through the bus since we require sticky connections.
// This also saves serializing and deserializing the message!
var connection = _connections[connectionId];
if (connection != null)
{
return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask();
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Connection(connectionId), message);
}
public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
return PublishAsync(_channels.Group(groupName), message);
}
public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
var message = _protocol.WriteInvocation(methodName, args);
return PublishAsync(_channels.User(userId), message);
}
public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return AddGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
}
public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
{
if (connectionId == null)
{
throw new ArgumentNullException(nameof(connectionId));
}
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}
var connection = _connections[connectionId];
if (connection != null)
{
// short circuit if connection is on this server
return RemoveGroupAsyncCore(connection, groupName);
}
return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
}
public override Task SendConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (connectionIds == null)
{
throw new ArgumentNullException(nameof(connectionIds));
}
var publishTasks = new List(connectionIds.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var connectionId in connectionIds)
{
publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload));
}
return Task.WhenAll(publishTasks);
}
public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (groupNames == null)
{
throw new ArgumentNullException(nameof(groupNames));
}
var publishTasks = new List(groupNames.Count);
var payload = _protocol.WriteInvocation(methodName, args);
foreach (var groupName in groupNames)
{
if (!string.IsNullOrEmpty(groupName))
{
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
}
}
return Task.WhenAll(publishTasks);
}
public override Task SendUsersAsync(IReadOnlyList userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (userIds.Count > 0)
{
var payload = _protocol.WriteInvocation(methodName, args);
var publishTasks = new List(userIds.Count);
foreach (var userId in userIds)
{
if (!string.IsNullOrEmpty(userId))
{
publishTasks.Add(PublishAsync(_channels.User(userId), payload));
}
}
return Task.WhenAll(publishTasks);
}
return Task.CompletedTask;
}
private async Task PublishAsync(string channel, byte[] payload)
{
await EnsureRedisServerConnection();
RedisLog.PublishToChannel(_logger, channel);
await _bus.PublishAsync(channel, payload);
}
private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var feature = connection.Features.Get();
var groupNames = feature.Groups;
lock (groupNames)
{
// Connection already in group
if (!groupNames.Add(groupName))
{
return Task.CompletedTask;
}
}
var groupChannel = _channels.Group(groupName);
return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync);
}
///
/// This takes because we want to remove the connection from the
/// _connections list in OnDisconnectedAsync and still be able to remove groups with this method.
///
private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName)
{
var groupChannel = _channels.Group(groupName);
await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
var feature = connection.Features.Get();
var groupNames = feature.Groups;
if (groupNames != null)
{
lock (groupNames)
{
groupNames.Remove(groupName);
}
}
}
private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action)
{
var id = Interlocked.Increment(ref _internalId);
var ack = _ackHandler.CreateAck(id);
// Send Add/Remove Group to other servers and wait for an ack or timeout
var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId));
await PublishAsync(_channels.GroupManagement, message);
await ack;
}
private Task RemoveUserAsync(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.RemoveSubscriptionAsync(userChannel, connection, channelName =>
{
RedisLog.Unsubscribe(_logger, channelName);
return _bus.UnsubscribeAsync(channelName);
});
}
public void Dispose()
{
_bus?.UnsubscribeAll();
_redisServerConnection?.Dispose();
_ackHandler.Dispose();
}
private Task SubscribeToAll()
{
RedisLog.Subscribing(_logger, _channels.All);
return _bus.SubscribeAsync(_channels.All, async (c, data) =>
{
try
{
RedisLog.ReceivedFromChannel(_logger, _channels.All);
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List(_connections.Count);
foreach (var connection in _connections)
{
if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId))
{
tasks.Add(connection.WriteAsync(invocation.Message).AsTask());
}
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private Task SubscribeToGroupManagementChannel()
{
return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) =>
{
try
{
var groupMessage = _protocol.ReadGroupCommand((byte[])data);
var connection = _connections[groupMessage.ConnectionId];
if (connection == null)
{
// user not on this server
return;
}
if (groupMessage.Action == GroupAction.Remove)
{
await RemoveGroupAsyncCore(connection, groupMessage.GroupName);
}
if (groupMessage.Action == GroupAction.Add)
{
await AddGroupAsyncCore(connection, groupMessage.GroupName);
}
// Send an ack to the server that sent the original command.
await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id));
}
catch (Exception ex)
{
RedisLog.InternalMessageFailed(_logger, ex);
}
});
}
private Task SubscribeToAckChannel()
{
// Create server specific channel in order to send an ack to a single server
return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) =>
{
var ackId = _protocol.ReadAck((byte[])data);
_ackHandler.TriggerAck(ackId);
});
}
private Task SubscribeToConnection(HubConnectionContext connection)
{
var connectionChannel = _channels.Connection(connection.ConnectionId);
RedisLog.Subscribing(_logger, connectionChannel);
return _bus.SubscribeAsync(connectionChannel, async (c, data) =>
{
var invocation = _protocol.ReadInvocation((byte[])data);
await connection.WriteAsync(invocation.Message);
});
}
private Task SubscribeToUser(HubConnectionContext connection)
{
var userChannel = _channels.User(connection.UserIdentifier);
return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) =>
{
RedisLog.Subscribing(_logger, channelName);
return _bus.SubscribeAsync(channelName, async (c, data) =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List();
foreach (var userConnection in subscriptions)
{
tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
});
}
private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections)
{
RedisLog.Subscribing(_logger, groupChannel);
return _bus.SubscribeAsync(groupChannel, async (c, data) =>
{
try
{
var invocation = _protocol.ReadInvocation((byte[])data);
var tasks = new List();
foreach (var groupConnection in groupConnections)
{
if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true)
{
continue;
}
tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask());
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
RedisLog.FailedWritingMessage(_logger, ex);
}
});
}
private async Task EnsureRedisServerConnection()
{
if (_redisServerConnection == null)
{
await _connectionLock.WaitAsync();
try
{
if (_redisServerConnection == null)
{
var writer = new LoggerTextWriter(_logger);
_redisServerConnection = await _options.ConnectAsync(writer);
_bus = _redisServerConnection.GetSubscriber();
_redisServerConnection.ConnectionRestored += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionRestored(_logger);
};
_redisServerConnection.ConnectionFailed += (_, e) =>
{
// We use the subscription connection type
// Ignore messages from the interactive connection (avoids duplicates)
if (e.ConnectionType == ConnectionType.Interactive)
{
return;
}
RedisLog.ConnectionFailed(_logger, e.Exception);
};
if (_redisServerConnection.IsConnected)
{
RedisLog.Connected(_logger);
}
else
{
RedisLog.NotConnected(_logger);
}
await SubscribeToAll();
await SubscribeToGroupManagementChannel();
await SubscribeToAckChannel();
}
}
finally
{
_connectionLock.Release();
}
}
}
private static string GenerateServerName()
{
// Use the machine name for convenient diagnostics, but add a guid to make it unique.
// Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29
return $"{Environment.MachineName}_{Guid.NewGuid():N}";
}
private class LoggerTextWriter : TextWriter
{
private readonly ILogger _logger;
public LoggerTextWriter(ILogger logger)
{
_logger = logger;
}
public override Encoding Encoding => Encoding.UTF8;
public override void Write(char value)
{
}
public override void WriteLine(string value)
{
RedisLog.ConnectionMultiplexerMessage(_logger, value);
}
}
private interface IRedisFeature
{
HashSet Groups { get; }
}
private class RedisFeature : IRedisFeature
{
public HashSet Groups { get; } = new HashSet(StringComparer.OrdinalIgnoreCase);
}
}
</code></pre>
<p>}
Original: https://www.cnblogs.com/zendu/p/13898869.html
Author: zyz913614263
Title: SignalR 如何借助redis 实现跨进程通信
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529430/
转载文章受原作者版权保护。转载请注明原作者出处!