SignalR 如何借助redis 实现跨进程通信

关于redis的订阅和发布功能,这里讲到比较好https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

signalr 实际上就是使用了Redis内置的发布订阅功能来同步不同服务器上的客户端信息。

每个连接有一个connectionId,连接成功的时候就会订阅这个connectionId + 回调,
send的实质是会发布一下这个connectionId。
这样 即使是在其他进程上send ,也会通过redis的订阅机制,传信息给订阅者,然后执行回调操作。
这样就轻松实现了跨进程通讯。

具体可以参考源码:

csharp;gutter:true; // Copyright (c) .NET Foundation. All rights reserved.</p> <p>// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.</p> <p>using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.AspNetCore.SignalR.Redis.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis;</p> <p>namespace Microsoft.AspNetCore.SignalR.Redis { public class RedisHubLifetimeManager : HubLifetimeManager, IDisposable where THub : Hub { private readonly HubConnectionStore _connections = new HubConnectionStore(); private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager(); private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager(); private IConnectionMultiplexer _redisServerConnection; private ISubscriber _bus; private readonly ILogger _logger; private readonly RedisOptions _options; private readonly RedisChannels _channels; private readonly string _serverName = GenerateServerName(); private readonly RedisProtocol _protocol; private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);</p> <pre><code> private readonly AckHandler _ackHandler; private int _internalId; public RedisHubLifetimeManager(ILogger> logger, IOptions options, IHubProtocolResolver hubProtocolResolver) { _logger = logger; _options = options.Value; _ackHandler = new AckHandler(); _channels = new RedisChannels(typeof(THub).FullName); _protocol = new RedisProtocol(hubProtocolResolver.AllProtocols); RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName); _ = EnsureRedisServerConnection(); } public override async Task OnConnectedAsync(HubConnectionContext connection) { await EnsureRedisServerConnection(); var feature = new RedisFeature(); connection.Features.Set(feature); var connectionTask = Task.CompletedTask; var userTask = Task.CompletedTask; _connections.Add(connection); connectionTask = SubscribeToConnection(connection); if (!string.IsNullOrEmpty(connection.UserIdentifier)) { userTask = SubscribeToUser(connection); } await Task.WhenAll(connectionTask, userTask); } public override Task OnDisconnectedAsync(HubConnectionContext connection) { _connections.Remove(connection); var tasks = new List(); var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Unsubscribe(_logger, connectionChannel); tasks.Add(_bus.UnsubscribeAsync(connectionChannel)); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { // Copy the groups to an array here because they get removed from this collection // in RemoveFromGroupAsync foreach (var group in groupNames.ToArray()) { // Use RemoveGroupAsyncCore because the connection is local and we don't want to // accidentally go to other servers with our remove request. tasks.Add(RemoveGroupAsyncCore(connection, group)); } } if (!string.IsNullOrEmpty(connection.UserIdentifier)) { tasks.Add(RemoveUserAsync(connection)); } return Task.WhenAll(tasks); } public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.All, message); } public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.All, message); } public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } // If the connection is local we can skip sending the message through the bus since we require sticky connections. // This also saves serializing and deserializing the message! var connection = _connections[connectionId]; if (connection != null) { return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask(); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Connection(connectionId), message); } public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Group(groupName), message); } public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.Group(groupName), message); } public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.User(userId), message); } public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return AddGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add); } public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return RemoveGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove); } public override Task SendConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionIds == null) { throw new ArgumentNullException(nameof(connectionIds)); } var publishTasks = new List(connectionIds.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var connectionId in connectionIds) { publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload)); } return Task.WhenAll(publishTasks); } public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupNames == null) { throw new ArgumentNullException(nameof(groupNames)); } var publishTasks = new List(groupNames.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var groupName in groupNames) { if (!string.IsNullOrEmpty(groupName)) { publishTasks.Add(PublishAsync(_channels.Group(groupName), payload)); } } return Task.WhenAll(publishTasks); } public override Task SendUsersAsync(IReadOnlyList userIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (userIds.Count > 0) { var payload = _protocol.WriteInvocation(methodName, args); var publishTasks = new List(userIds.Count); foreach (var userId in userIds) { if (!string.IsNullOrEmpty(userId)) { publishTasks.Add(PublishAsync(_channels.User(userId), payload)); } } return Task.WhenAll(publishTasks); } return Task.CompletedTask; } private async Task PublishAsync(string channel, byte[] payload) { await EnsureRedisServerConnection(); RedisLog.PublishToChannel(_logger, channel); await _bus.PublishAsync(channel, payload); } private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName) { var feature = connection.Features.Get(); var groupNames = feature.Groups; lock (groupNames) { // Connection already in group if (!groupNames.Add(groupName)) { return Task.CompletedTask; } } var groupChannel = _channels.Group(groupName); return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync); } /// /// This takes because we want to remove the connection from the /// _connections list in OnDisconnectedAsync and still be able to remove groups with this method. /// private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName) { var groupChannel = _channels.Group(groupName); await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { lock (groupNames) { groupNames.Remove(groupName); } } } private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action) { var id = Interlocked.Increment(ref _internalId); var ack = _ackHandler.CreateAck(id); // Send Add/Remove Group to other servers and wait for an ack or timeout var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId)); await PublishAsync(_channels.GroupManagement, message); await ack; } private Task RemoveUserAsync(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.RemoveSubscriptionAsync(userChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); } public void Dispose() { _bus?.UnsubscribeAll(); _redisServerConnection?.Dispose(); _ackHandler.Dispose(); } private Task SubscribeToAll() { RedisLog.Subscribing(_logger, _channels.All); return _bus.SubscribeAsync(_channels.All, async (c, data) => { try { RedisLog.ReceivedFromChannel(_logger, _channels.All); var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(_connections.Count); foreach (var connection in _connections) { if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId)) { tasks.Add(connection.WriteAsync(invocation.Message).AsTask()); } } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private Task SubscribeToGroupManagementChannel() { return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) => { try { var groupMessage = _protocol.ReadGroupCommand((byte[])data); var connection = _connections[groupMessage.ConnectionId]; if (connection == null) { // user not on this server return; } if (groupMessage.Action == GroupAction.Remove) { await RemoveGroupAsyncCore(connection, groupMessage.GroupName); } if (groupMessage.Action == GroupAction.Add) { await AddGroupAsyncCore(connection, groupMessage.GroupName); } // Send an ack to the server that sent the original command. await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id)); } catch (Exception ex) { RedisLog.InternalMessageFailed(_logger, ex); } }); } private Task SubscribeToAckChannel() { // Create server specific channel in order to send an ack to a single server return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) => { var ackId = _protocol.ReadAck((byte[])data); _ackHandler.TriggerAck(ackId); }); } private Task SubscribeToConnection(HubConnectionContext connection) { var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Subscribing(_logger, connectionChannel); return _bus.SubscribeAsync(connectionChannel, async (c, data) => { var invocation = _protocol.ReadInvocation((byte[])data); await connection.WriteAsync(invocation.Message); }); } private Task SubscribeToUser(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) => { RedisLog.Subscribing(_logger, channelName); return _bus.SubscribeAsync(channelName, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var userConnection in subscriptions) { tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); }); } private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections) { RedisLog.Subscribing(_logger, groupChannel); return _bus.SubscribeAsync(groupChannel, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var groupConnection in groupConnections) { if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true) { continue; } tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private async Task EnsureRedisServerConnection() { if (_redisServerConnection == null) { await _connectionLock.WaitAsync(); try { if (_redisServerConnection == null) { var writer = new LoggerTextWriter(_logger); _redisServerConnection = await _options.ConnectAsync(writer); _bus = _redisServerConnection.GetSubscriber(); _redisServerConnection.ConnectionRestored += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionRestored(_logger); }; _redisServerConnection.ConnectionFailed += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionFailed(_logger, e.Exception); }; if (_redisServerConnection.IsConnected) { RedisLog.Connected(_logger); } else { RedisLog.NotConnected(_logger); } await SubscribeToAll(); await SubscribeToGroupManagementChannel(); await SubscribeToAckChannel(); } } finally { _connectionLock.Release(); } } } private static string GenerateServerName() { // Use the machine name for convenient diagnostics, but add a guid to make it unique. // Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29 return $"{Environment.MachineName}_{Guid.NewGuid():N}"; } private class LoggerTextWriter : TextWriter { private readonly ILogger _logger; public LoggerTextWriter(ILogger logger) { _logger = logger; } public override Encoding Encoding => Encoding.UTF8; public override void Write(char value) { } public override void WriteLine(string value) { RedisLog.ConnectionMultiplexerMessage(_logger, value); } } private interface IRedisFeature { HashSet Groups { get; } } private class RedisFeature : IRedisFeature { public HashSet Groups { get; } = new HashSet(StringComparer.OrdinalIgnoreCase); } } </code></pre> <p>}

Original: https://www.cnblogs.com/zendu/p/13898869.html
Author: zyz913614263
Title: SignalR 如何借助redis 实现跨进程通信

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529430/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • CSAPP 之 BombLab 详解

    前言 本篇博客将会展示 CSAPP 之 BombLab 的拆弹过程,粉碎 Dr.Evil 的邪恶阴谋。Dr.Evil 总共设置了 6 个炸弹,每个炸弹对应一串字符串,如果字符串错误…

    Linux 2023年6月7日
    093
  • 如何使用 systemctl 管理服务

    systemd是一个服务管理器,目前已经成为Linux发行版的新标准。它使管理服务器变得更加容易。了解并利用组成systemd的工具将有助于我们更好地理解它提供的便利性。 syst…

    Linux 2023年6月13日
    085
  • shell 配置文件节约空间

    shell 配置文件节约空间 sed 读取一个配置文件的的多个变量 Original: https://www.cnblogs.com/hshy/p/16451927.htmlAu…

    Linux 2023年5月28日
    088
  • Linux02:Vim使用及账号、磁盘、进程管理(狂神说)

    五、Vim编译器 1.什么是vim编译器 Vim相当于vi的升级版 Vim是从 vi 发展出来的一个文本编辑器。代码补完、编译及错误跳转等方便编程的功能特别丰富,在程序员中被广泛使…

    Linux 2023年5月27日
    0117
  • QT和Java的跨平台

    大家基本上都知道QT是跨平台的,Java也是跨平台的,那咱们今天就来聊聊他们两个: 相同点:都是跨平台 不同点:Java 的运行是建立在虚拟机上的,在虚拟机上 一次编译到处运行,但…

    Linux 2023年6月13日
    0104
  • Django补充

    django配置文件相关操作 django实际上有两个配置文件 一个是提供给用户可以自定义的基本配置 from 项目名 import settings 一个是全局的系统默认的配置 …

    Linux 2023年6月7日
    0111
  • WebBug Java漏洞靶场 Java代码审计

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Linux 2023年6月6日
    0107
  • 常见开发模型-敏捷开发与瀑布开发模型详解

    引言 在学习软件工程的时候接触过一些软件工程开发模型的相关概念,其中,印象比较深刻的就是瀑布模型和敏捷开发模型。这两种模型在日常的软件开发中都是非常常用的,但是它们也有比较大的区别…

    Linux 2023年6月7日
    0136
  • zenity,把shell加上图形界面

    有时自己写点小工具自己用,比较喜欢非图形界面的,可是有些应用还是弹出个东西来提醒一下,才能让你注意。 zenity是个很帅的东西,它用法可以参考它的–help ,这里不…

    Linux 2023年5月28日
    093
  • 【证券从业】金融基础知识-第四章 股票03

    注1:后续学习并整理到第八章,全书完结后再合并成一个笔记进行源文件分享 注2:本章内容巨多,大约分为三篇文章记录消化 posted @2022-06-08 01:28 陈景中 阅读…

    Linux 2023年6月13日
    0107
  • 自动升级shell

    make_version.sh ./make_version.sh 第一次提示”y/N” 表示接下来的操作是手动(y)还是自动(N); 自动(N)会为镜像自…

    Linux 2023年5月28日
    0109
  • THE EVOLUTION OF INTELLECTUAL FREEDOM;

    分享几张有趣的图: 1:http://www.cs.cmu.edu/~dskarlat/ 2022-03-03 17:47 2:https://www.zhihu.com/ques…

    Linux 2023年6月14日
    0105
  • shell相关知识1

    组命令,就是将多个命令划分为一组,或者看成一个整体。 用法区别 Shell 组命令的写法有两种: { command1; command2;. . .; }(command1; c…

    Linux 2023年5月28日
    090
  • LRU原理和Redis实现——一个今日头条的面试题(转载)

    很久前参加过今日头条的面试,遇到一个题,目前半部分是如何实现 LRU,后半部分是 Redis 中如何实现 LRU。 我的第一反应是操作系统课程里学过,应该是内存不够的场景下,淘汰旧…

    Linux 2023年5月28日
    098
  • 使用Retrofit上传图片

    Retrofit使用协程发送请求参考文章 :https://www.cnblogs.com/sw-code/p/14451921.html 导入依赖 app的build文件中加入:…

    Linux 2023年6月8日
    0119
  • linux自动备份mysql数据库

    备份脚本记录一下–(单个数据库) 2021-11-15 1.新建shell脚本:vim **.sh #!/bin/bashCKUP=/data/backup/db #获…

    Linux 2023年5月27日
    0129
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球