SignalR 如何借助redis 实现跨进程通信

关于redis的订阅和发布功能,这里讲到比较好https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

signalr 实际上就是使用了Redis内置的发布订阅功能来同步不同服务器上的客户端信息。

每个连接有一个connectionId,连接成功的时候就会订阅这个connectionId + 回调,
send的实质是会发布一下这个connectionId。
这样 即使是在其他进程上send ,也会通过redis的订阅机制,传信息给订阅者,然后执行回调操作。
这样就轻松实现了跨进程通讯。

具体可以参考源码:

csharp;gutter:true; // Copyright (c) .NET Foundation. All rights reserved.</p> <p>// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.</p> <p>using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.AspNetCore.SignalR.Redis.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis;</p> <p>namespace Microsoft.AspNetCore.SignalR.Redis { public class RedisHubLifetimeManager : HubLifetimeManager, IDisposable where THub : Hub { private readonly HubConnectionStore _connections = new HubConnectionStore(); private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager(); private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager(); private IConnectionMultiplexer _redisServerConnection; private ISubscriber _bus; private readonly ILogger _logger; private readonly RedisOptions _options; private readonly RedisChannels _channels; private readonly string _serverName = GenerateServerName(); private readonly RedisProtocol _protocol; private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);</p> <pre><code> private readonly AckHandler _ackHandler; private int _internalId; public RedisHubLifetimeManager(ILogger> logger, IOptions options, IHubProtocolResolver hubProtocolResolver) { _logger = logger; _options = options.Value; _ackHandler = new AckHandler(); _channels = new RedisChannels(typeof(THub).FullName); _protocol = new RedisProtocol(hubProtocolResolver.AllProtocols); RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName); _ = EnsureRedisServerConnection(); } public override async Task OnConnectedAsync(HubConnectionContext connection) { await EnsureRedisServerConnection(); var feature = new RedisFeature(); connection.Features.Set(feature); var connectionTask = Task.CompletedTask; var userTask = Task.CompletedTask; _connections.Add(connection); connectionTask = SubscribeToConnection(connection); if (!string.IsNullOrEmpty(connection.UserIdentifier)) { userTask = SubscribeToUser(connection); } await Task.WhenAll(connectionTask, userTask); } public override Task OnDisconnectedAsync(HubConnectionContext connection) { _connections.Remove(connection); var tasks = new List(); var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Unsubscribe(_logger, connectionChannel); tasks.Add(_bus.UnsubscribeAsync(connectionChannel)); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { // Copy the groups to an array here because they get removed from this collection // in RemoveFromGroupAsync foreach (var group in groupNames.ToArray()) { // Use RemoveGroupAsyncCore because the connection is local and we don't want to // accidentally go to other servers with our remove request. tasks.Add(RemoveGroupAsyncCore(connection, group)); } } if (!string.IsNullOrEmpty(connection.UserIdentifier)) { tasks.Add(RemoveUserAsync(connection)); } return Task.WhenAll(tasks); } public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.All, message); } public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.All, message); } public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } // If the connection is local we can skip sending the message through the bus since we require sticky connections. // This also saves serializing and deserializing the message! var connection = _connections[connectionId]; if (connection != null) { return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask(); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Connection(connectionId), message); } public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Group(groupName), message); } public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.Group(groupName), message); } public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.User(userId), message); } public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return AddGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add); } public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return RemoveGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove); } public override Task SendConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionIds == null) { throw new ArgumentNullException(nameof(connectionIds)); } var publishTasks = new List(connectionIds.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var connectionId in connectionIds) { publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload)); } return Task.WhenAll(publishTasks); } public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupNames == null) { throw new ArgumentNullException(nameof(groupNames)); } var publishTasks = new List(groupNames.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var groupName in groupNames) { if (!string.IsNullOrEmpty(groupName)) { publishTasks.Add(PublishAsync(_channels.Group(groupName), payload)); } } return Task.WhenAll(publishTasks); } public override Task SendUsersAsync(IReadOnlyList userIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (userIds.Count > 0) { var payload = _protocol.WriteInvocation(methodName, args); var publishTasks = new List(userIds.Count); foreach (var userId in userIds) { if (!string.IsNullOrEmpty(userId)) { publishTasks.Add(PublishAsync(_channels.User(userId), payload)); } } return Task.WhenAll(publishTasks); } return Task.CompletedTask; } private async Task PublishAsync(string channel, byte[] payload) { await EnsureRedisServerConnection(); RedisLog.PublishToChannel(_logger, channel); await _bus.PublishAsync(channel, payload); } private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName) { var feature = connection.Features.Get(); var groupNames = feature.Groups; lock (groupNames) { // Connection already in group if (!groupNames.Add(groupName)) { return Task.CompletedTask; } } var groupChannel = _channels.Group(groupName); return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync); } /// /// This takes because we want to remove the connection from the /// _connections list in OnDisconnectedAsync and still be able to remove groups with this method. /// private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName) { var groupChannel = _channels.Group(groupName); await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { lock (groupNames) { groupNames.Remove(groupName); } } } private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action) { var id = Interlocked.Increment(ref _internalId); var ack = _ackHandler.CreateAck(id); // Send Add/Remove Group to other servers and wait for an ack or timeout var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId)); await PublishAsync(_channels.GroupManagement, message); await ack; } private Task RemoveUserAsync(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.RemoveSubscriptionAsync(userChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); } public void Dispose() { _bus?.UnsubscribeAll(); _redisServerConnection?.Dispose(); _ackHandler.Dispose(); } private Task SubscribeToAll() { RedisLog.Subscribing(_logger, _channels.All); return _bus.SubscribeAsync(_channels.All, async (c, data) => { try { RedisLog.ReceivedFromChannel(_logger, _channels.All); var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(_connections.Count); foreach (var connection in _connections) { if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId)) { tasks.Add(connection.WriteAsync(invocation.Message).AsTask()); } } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private Task SubscribeToGroupManagementChannel() { return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) => { try { var groupMessage = _protocol.ReadGroupCommand((byte[])data); var connection = _connections[groupMessage.ConnectionId]; if (connection == null) { // user not on this server return; } if (groupMessage.Action == GroupAction.Remove) { await RemoveGroupAsyncCore(connection, groupMessage.GroupName); } if (groupMessage.Action == GroupAction.Add) { await AddGroupAsyncCore(connection, groupMessage.GroupName); } // Send an ack to the server that sent the original command. await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id)); } catch (Exception ex) { RedisLog.InternalMessageFailed(_logger, ex); } }); } private Task SubscribeToAckChannel() { // Create server specific channel in order to send an ack to a single server return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) => { var ackId = _protocol.ReadAck((byte[])data); _ackHandler.TriggerAck(ackId); }); } private Task SubscribeToConnection(HubConnectionContext connection) { var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Subscribing(_logger, connectionChannel); return _bus.SubscribeAsync(connectionChannel, async (c, data) => { var invocation = _protocol.ReadInvocation((byte[])data); await connection.WriteAsync(invocation.Message); }); } private Task SubscribeToUser(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) => { RedisLog.Subscribing(_logger, channelName); return _bus.SubscribeAsync(channelName, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var userConnection in subscriptions) { tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); }); } private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections) { RedisLog.Subscribing(_logger, groupChannel); return _bus.SubscribeAsync(groupChannel, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var groupConnection in groupConnections) { if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true) { continue; } tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private async Task EnsureRedisServerConnection() { if (_redisServerConnection == null) { await _connectionLock.WaitAsync(); try { if (_redisServerConnection == null) { var writer = new LoggerTextWriter(_logger); _redisServerConnection = await _options.ConnectAsync(writer); _bus = _redisServerConnection.GetSubscriber(); _redisServerConnection.ConnectionRestored += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionRestored(_logger); }; _redisServerConnection.ConnectionFailed += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionFailed(_logger, e.Exception); }; if (_redisServerConnection.IsConnected) { RedisLog.Connected(_logger); } else { RedisLog.NotConnected(_logger); } await SubscribeToAll(); await SubscribeToGroupManagementChannel(); await SubscribeToAckChannel(); } } finally { _connectionLock.Release(); } } } private static string GenerateServerName() { // Use the machine name for convenient diagnostics, but add a guid to make it unique. // Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29 return $"{Environment.MachineName}_{Guid.NewGuid():N}"; } private class LoggerTextWriter : TextWriter { private readonly ILogger _logger; public LoggerTextWriter(ILogger logger) { _logger = logger; } public override Encoding Encoding => Encoding.UTF8; public override void Write(char value) { } public override void WriteLine(string value) { RedisLog.ConnectionMultiplexerMessage(_logger, value); } } private interface IRedisFeature { HashSet Groups { get; } } private class RedisFeature : IRedisFeature { public HashSet Groups { get; } = new HashSet(StringComparer.OrdinalIgnoreCase); } } </code></pre> <p>}

Original: https://www.cnblogs.com/zendu/p/13898869.html
Author: zyz913614263
Title: SignalR 如何借助redis 实现跨进程通信

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529430/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • .NET使用StackExchange.Redis

    csharp;gutter:true; using StackExchange.Redis; using System; using System.Collections.Gene…

    Linux 2023年5月28日
    096
  • Linux lsof命令的使用示例

    Linux命令中,lsof代表 Li St Open Files,用于查看所有被打开的文件,同时显示打开文件相对应的进程。Linux/Unix把一切都看做文件(pipes,sock…

    Linux 2023年6月13日
    092
  • 【电台谈心】2022罗翔老师回答毕业生的4个问题

    罗翔老师:谢谢这位同学的问题。我回想起自己找工作的经历,其实也是 一地鸡毛。本科毕业的时候,也没有多少招聘会。我只有一个单位,一个单位地去找。少数几家单位让你进去了,聊了聊又觉得你…

    Linux 2023年6月13日
    0120
  • bochs(2.6.11)配置安装

    下载:https://bochs.sourceforge.io/ 建议下载2.6.11,下文一开始安装的2.7,但运行时有无法解决的错误。但是大致安装过程一致。 linux 提前安…

    Linux 2023年5月27日
    0139
  • 解决报错 Microsoft Visual C++ 14.0 is required

    环境:Surface Windows 10 专业版 问题:安装 Python3 的第三方库 py7zr 时不成功。而报错的是另外一个依赖库 pycryptodomex distut…

    Linux 2023年6月14日
    0116
  • freePBR的UE4材质合集

    我手动下载了freepbr.com上的所有ue4材质,放到百度云上分享给大家。 freePBR的UE4材质合集 想开个新坑了。但工欲善其事必先利其器。于是我手动下载了freepbr…

    Linux 2023年6月6日
    0101
  • Golang 实现 Redis(11): RDB 文件格式

    RDB 文件使用二进制方式存储 Redis 内存中的数据,具有体积小、加载快的优点。本文主要介绍 RDB 文件的结构和编码方式,并借此探讨二进制编解码和文件处理方式,希望对您有所帮…

    Linux 2023年5月28日
    0101
  • Linux、Windows下Redis的安装即Redis的基本使用详解

    前言 什么是Redis Redis是一个基于 内存的key-value结构数据库。Redis 是互联网技术领域使用最为广泛的存储中间件,它是「 Remote Dictionary …

    Linux 2023年6月6日
    0121
  • prometheus监控redis集群

    【1】利用 redis_exporter 监控 redis 集群 (1.0)redis_exporter 以前都是用傻办法,一个实例一个采集器; redis_exporter 支持…

    Linux 2023年5月28日
    0108
  • mysql通过mysqldump命令重做从库详细操作步骤

    备份主库所有数据,并将dump.sql文件拷贝到从库/tmp目录 mysqldump -uroot -p –set-gtid-purged=OFF –single-transa…

    Linux 2023年6月8日
    079
  • 尤娜故事-迷雾-springboot扮酷小技巧

    前情回顾 从前,有一个简单的通道系统叫尤娜…… 尤娜系统的第一次飞行中换引擎的架构垂直拆分改造 四种常用的微服务架构拆分方式 尤娜,我去面试了 正文 我回到…

    Linux 2023年6月14日
    086
  • Docker存储卷

    Docker存储卷 1、COW机制 Docker镜像由多个只读层叠加而成,启动容器时,Docker会加载只读镜像层并在镜像栈顶部添加一个读写层。 如果运行中的容器修改了现有的一个已…

    Linux 2023年6月7日
    090
  • 机器学习学习笔记之三:朴素贝叶斯

    条件概率和贝叶斯公式 (p(x|y)) 表示在 (y) 发生的条件下 (x) 发生的概率。 条件概率公式:已知 (p(x)) 和 (p(y)),以及(x), (y)同时发生的概率(…

    Linux 2023年6月14日
    068
  • 服务器监控 -TOP命令详解(下)

    用jmeter做性能测试的时候,top命令是最简单扼要的查看服务器CPU和内存占用情况的命令。如下图就是TOP命令展示出来的结果,下面对结果进行详细解说。 Mem 物理内存: to…

    Linux 2023年6月8日
    091
  • 学习一下 JVM (三) — 了解一下 垃圾回收

    一、简单了解几个概念 1、什么是垃圾(Garbage)?什么是垃圾回收(Garbage Collection,简称 GC)? (1)什么是垃圾(Garbage)?这里的垃圾 指的是…

    Linux 2023年6月11日
    0103
  • python openpyxl UserWarning: Workbook contains no default style, apply openpyxl‘s default

    告警解释 这是一个告警,这个excel文件没有设置默认的样式。一般这种没有默认样式的excel文档是由java程序生成的,不是像windows系统日常使用中通过右键点击创建的exc…

    Linux 2023年6月8日
    0100
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球