SignalR 如何借助redis 实现跨进程通信

关于redis的订阅和发布功能,这里讲到比较好https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

signalr 实际上就是使用了Redis内置的发布订阅功能来同步不同服务器上的客户端信息。

每个连接有一个connectionId,连接成功的时候就会订阅这个connectionId + 回调,
send的实质是会发布一下这个connectionId。
这样 即使是在其他进程上send ,也会通过redis的订阅机制,传信息给订阅者,然后执行回调操作。
这样就轻松实现了跨进程通讯。

具体可以参考源码:

csharp;gutter:true; // Copyright (c) .NET Foundation. All rights reserved.</p> <p>// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.</p> <p>using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.AspNetCore.SignalR.Redis.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis;</p> <p>namespace Microsoft.AspNetCore.SignalR.Redis { public class RedisHubLifetimeManager : HubLifetimeManager, IDisposable where THub : Hub { private readonly HubConnectionStore _connections = new HubConnectionStore(); private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager(); private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager(); private IConnectionMultiplexer _redisServerConnection; private ISubscriber _bus; private readonly ILogger _logger; private readonly RedisOptions _options; private readonly RedisChannels _channels; private readonly string _serverName = GenerateServerName(); private readonly RedisProtocol _protocol; private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);</p> <pre><code> private readonly AckHandler _ackHandler; private int _internalId; public RedisHubLifetimeManager(ILogger> logger, IOptions options, IHubProtocolResolver hubProtocolResolver) { _logger = logger; _options = options.Value; _ackHandler = new AckHandler(); _channels = new RedisChannels(typeof(THub).FullName); _protocol = new RedisProtocol(hubProtocolResolver.AllProtocols); RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName); _ = EnsureRedisServerConnection(); } public override async Task OnConnectedAsync(HubConnectionContext connection) { await EnsureRedisServerConnection(); var feature = new RedisFeature(); connection.Features.Set(feature); var connectionTask = Task.CompletedTask; var userTask = Task.CompletedTask; _connections.Add(connection); connectionTask = SubscribeToConnection(connection); if (!string.IsNullOrEmpty(connection.UserIdentifier)) { userTask = SubscribeToUser(connection); } await Task.WhenAll(connectionTask, userTask); } public override Task OnDisconnectedAsync(HubConnectionContext connection) { _connections.Remove(connection); var tasks = new List(); var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Unsubscribe(_logger, connectionChannel); tasks.Add(_bus.UnsubscribeAsync(connectionChannel)); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { // Copy the groups to an array here because they get removed from this collection // in RemoveFromGroupAsync foreach (var group in groupNames.ToArray()) { // Use RemoveGroupAsyncCore because the connection is local and we don't want to // accidentally go to other servers with our remove request. tasks.Add(RemoveGroupAsyncCore(connection, group)); } } if (!string.IsNullOrEmpty(connection.UserIdentifier)) { tasks.Add(RemoveUserAsync(connection)); } return Task.WhenAll(tasks); } public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.All, message); } public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.All, message); } public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } // If the connection is local we can skip sending the message through the bus since we require sticky connections. // This also saves serializing and deserializing the message! var connection = _connections[connectionId]; if (connection != null) { return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask(); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Connection(connectionId), message); } public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.Group(groupName), message); } public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedConnectionIds, CancellationToken cancellationToken = default) { if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds); return PublishAsync(_channels.Group(groupName), message); } public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default) { var message = _protocol.WriteInvocation(methodName, args); return PublishAsync(_channels.User(userId), message); } public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return AddGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add); } public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) { if (connectionId == null) { throw new ArgumentNullException(nameof(connectionId)); } if (groupName == null) { throw new ArgumentNullException(nameof(groupName)); } var connection = _connections[connectionId]; if (connection != null) { // short circuit if connection is on this server return RemoveGroupAsyncCore(connection, groupName); } return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove); } public override Task SendConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (connectionIds == null) { throw new ArgumentNullException(nameof(connectionIds)); } var publishTasks = new List(connectionIds.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var connectionId in connectionIds) { publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload)); } return Task.WhenAll(publishTasks); } public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args, CancellationToken cancellationToken = default) { if (groupNames == null) { throw new ArgumentNullException(nameof(groupNames)); } var publishTasks = new List(groupNames.Count); var payload = _protocol.WriteInvocation(methodName, args); foreach (var groupName in groupNames) { if (!string.IsNullOrEmpty(groupName)) { publishTasks.Add(PublishAsync(_channels.Group(groupName), payload)); } } return Task.WhenAll(publishTasks); } public override Task SendUsersAsync(IReadOnlyList userIds, string methodName, object[] args, CancellationToken cancellationToken = default) { if (userIds.Count > 0) { var payload = _protocol.WriteInvocation(methodName, args); var publishTasks = new List(userIds.Count); foreach (var userId in userIds) { if (!string.IsNullOrEmpty(userId)) { publishTasks.Add(PublishAsync(_channels.User(userId), payload)); } } return Task.WhenAll(publishTasks); } return Task.CompletedTask; } private async Task PublishAsync(string channel, byte[] payload) { await EnsureRedisServerConnection(); RedisLog.PublishToChannel(_logger, channel); await _bus.PublishAsync(channel, payload); } private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName) { var feature = connection.Features.Get(); var groupNames = feature.Groups; lock (groupNames) { // Connection already in group if (!groupNames.Add(groupName)) { return Task.CompletedTask; } } var groupChannel = _channels.Group(groupName); return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync); } /// /// This takes because we want to remove the connection from the /// _connections list in OnDisconnectedAsync and still be able to remove groups with this method. /// private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName) { var groupChannel = _channels.Group(groupName); await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); var feature = connection.Features.Get(); var groupNames = feature.Groups; if (groupNames != null) { lock (groupNames) { groupNames.Remove(groupName); } } } private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action) { var id = Interlocked.Increment(ref _internalId); var ack = _ackHandler.CreateAck(id); // Send Add/Remove Group to other servers and wait for an ack or timeout var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId)); await PublishAsync(_channels.GroupManagement, message); await ack; } private Task RemoveUserAsync(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.RemoveSubscriptionAsync(userChannel, connection, channelName => { RedisLog.Unsubscribe(_logger, channelName); return _bus.UnsubscribeAsync(channelName); }); } public void Dispose() { _bus?.UnsubscribeAll(); _redisServerConnection?.Dispose(); _ackHandler.Dispose(); } private Task SubscribeToAll() { RedisLog.Subscribing(_logger, _channels.All); return _bus.SubscribeAsync(_channels.All, async (c, data) => { try { RedisLog.ReceivedFromChannel(_logger, _channels.All); var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(_connections.Count); foreach (var connection in _connections) { if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId)) { tasks.Add(connection.WriteAsync(invocation.Message).AsTask()); } } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private Task SubscribeToGroupManagementChannel() { return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) => { try { var groupMessage = _protocol.ReadGroupCommand((byte[])data); var connection = _connections[groupMessage.ConnectionId]; if (connection == null) { // user not on this server return; } if (groupMessage.Action == GroupAction.Remove) { await RemoveGroupAsyncCore(connection, groupMessage.GroupName); } if (groupMessage.Action == GroupAction.Add) { await AddGroupAsyncCore(connection, groupMessage.GroupName); } // Send an ack to the server that sent the original command. await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id)); } catch (Exception ex) { RedisLog.InternalMessageFailed(_logger, ex); } }); } private Task SubscribeToAckChannel() { // Create server specific channel in order to send an ack to a single server return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) => { var ackId = _protocol.ReadAck((byte[])data); _ackHandler.TriggerAck(ackId); }); } private Task SubscribeToConnection(HubConnectionContext connection) { var connectionChannel = _channels.Connection(connection.ConnectionId); RedisLog.Subscribing(_logger, connectionChannel); return _bus.SubscribeAsync(connectionChannel, async (c, data) => { var invocation = _protocol.ReadInvocation((byte[])data); await connection.WriteAsync(invocation.Message); }); } private Task SubscribeToUser(HubConnectionContext connection) { var userChannel = _channels.User(connection.UserIdentifier); return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) => { RedisLog.Subscribing(_logger, channelName); return _bus.SubscribeAsync(channelName, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var userConnection in subscriptions) { tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); }); } private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections) { RedisLog.Subscribing(_logger, groupChannel); return _bus.SubscribeAsync(groupChannel, async (c, data) => { try { var invocation = _protocol.ReadInvocation((byte[])data); var tasks = new List(); foreach (var groupConnection in groupConnections) { if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true) { continue; } tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask()); } await Task.WhenAll(tasks); } catch (Exception ex) { RedisLog.FailedWritingMessage(_logger, ex); } }); } private async Task EnsureRedisServerConnection() { if (_redisServerConnection == null) { await _connectionLock.WaitAsync(); try { if (_redisServerConnection == null) { var writer = new LoggerTextWriter(_logger); _redisServerConnection = await _options.ConnectAsync(writer); _bus = _redisServerConnection.GetSubscriber(); _redisServerConnection.ConnectionRestored += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionRestored(_logger); }; _redisServerConnection.ConnectionFailed += (_, e) => { // We use the subscription connection type // Ignore messages from the interactive connection (avoids duplicates) if (e.ConnectionType == ConnectionType.Interactive) { return; } RedisLog.ConnectionFailed(_logger, e.Exception); }; if (_redisServerConnection.IsConnected) { RedisLog.Connected(_logger); } else { RedisLog.NotConnected(_logger); } await SubscribeToAll(); await SubscribeToGroupManagementChannel(); await SubscribeToAckChannel(); } } finally { _connectionLock.Release(); } } } private static string GenerateServerName() { // Use the machine name for convenient diagnostics, but add a guid to make it unique. // Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29 return $"{Environment.MachineName}_{Guid.NewGuid():N}"; } private class LoggerTextWriter : TextWriter { private readonly ILogger _logger; public LoggerTextWriter(ILogger logger) { _logger = logger; } public override Encoding Encoding => Encoding.UTF8; public override void Write(char value) { } public override void WriteLine(string value) { RedisLog.ConnectionMultiplexerMessage(_logger, value); } } private interface IRedisFeature { HashSet Groups { get; } } private class RedisFeature : IRedisFeature { public HashSet Groups { get; } = new HashSet(StringComparer.OrdinalIgnoreCase); } } </code></pre> <p>}

Original: https://www.cnblogs.com/zendu/p/13898869.html
Author: zyz913614263
Title: SignalR 如何借助redis 实现跨进程通信

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529430/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Linux基础入门笔记

    Linux内核最初只是由芬兰人林纳斯·托瓦兹(Linus Torvalds)在赫尔辛基大学上学时出于个人爱好而编写的。 Linux是一套免费使用和自由传播的类Unix操作系统,是一…

    Linux 2023年6月14日
    059
  • Centos7安装Docker

    一、docker运行流程 举个例子你想使用MySQL镜像,那么执行docker pull 下载镜像的时候 首先它会在本地仓库进行运行,如果本地仓库有你想要的MySQL镜像 那么它会…

    Linux 2023年6月6日
    079
  • shell编程-杨辉三角简单实现

    shell编程-杨辉三角问题: 概述:中国古代数学家在数学的许多重要领域中处于遥遥领先的地位。中国古代数学史曾经有自己光辉灿烂的篇章,而杨辉三角的发现就是十分精彩的一页。杨辉三角形…

    Linux 2023年6月7日
    097
  • 6.18(反射和注解—>反射机制的作用)

    反射 获取Class对象三种方式第一种方式:类.class第二种方式:对象.getClass()第三种方式:Class.forName &#x5728;&#x4E0…

    Linux 2023年6月7日
    0110
  • String为什么不是基本数据类型

    java虚拟机处理基础类型与引用类型的方式是不一样的,对于基本类型,java虚拟机会为其分配数据类型实际占用的内存空间,对于引用类型变量,他仅仅是一个指向堆区中某个实例的指针。 O…

    Linux 2023年6月7日
    092
  • jenkins 设置钉钉机器人+jenkins调用shell脚本使用钉钉机器人自定义发消息并通知指定人

    两种钉钉通知方式,一种是使用安装的钉钉插件来通知,但是这个不好定义通知内容,没办法控制发送条件,只要配置了,不管构建参数(分支,渠道,配置),都会发通知,第二种是使用脚本的方式来通…

    Linux 2023年5月28日
    084
  • Docker最常用的镜像命令和容器命令

    一、镜像相关命令 官方文档:https://docs.docker.com/referenc 1.1查看镜像 [root@localhost ~]# docker images R…

    Linux 2023年5月27日
    095
  • Spring 对Controller异常的统一处理

    对于Controller的异常处理,分为两种,一种是对已知的异常处理,一种是未知的异常处理 1、定义自定义异常类 /** * @author hzc * */ public cla…

    Linux 2023年6月7日
    0102
  • 缓冲区溢出二:从缓冲区溢出到获取反弹shell实例

    一、说明 之前写过一篇”缓冲区溢出一:函数调用过程中的堆栈变化及缓冲区溢出利用原理“,道理讲得还可以,但现在看还是需要一个示例来讲解从攻击角度如何实现返回地…

    Linux 2023年5月28日
    0106
  • 前端开发:如何正确地跨端

    导读:面对多种多样的跨端诉求,有哪些跨端方案?跨端的本质是什么?作为业务技术开发者,应该怎么做?本文分享阿里巴巴ICBU技术部在跨端开发上的一些思考,介绍了当前主流的跨端方案,以及…

    Linux 2023年6月8日
    073
  • 【微服务】- 服务调用-OpenFeign

    服务调用 – OpenFeign 😄生命不息,写作不止🔥 继续踏上学习之路,学之分享笔记👊 总有一天我也能像各位大佬一样🏆 一个有梦有戏的人 @怒放吧德德🌝分享学习心得…

    Linux 2023年6月6日
    085
  • 【MQTT】在Linux下sqlite3的使用

    安装sqlite3 #下载 wget https: #解压 tar -xzvf sqlite-autoconf-3310100.tar.gz sqlite3库函数 1. 打开/创建…

    Linux 2023年6月13日
    073
  • redis后台启动

    打开redis.conf文件 把daemonize设置为yes posted @2021-11-25 15:30 HongMaJu 阅读(73 ) 评论() 编辑 Original…

    Linux 2023年5月28日
    084
  • redis主从复制

    Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 特性: 运行在内存中的数据集工作方式 支持多种数据结构 提供不同级别的磁盘持…

    Linux 2023年5月28日
    077
  • Redis从入门到精通:初级篇

    原文链接:http://www.cnblogs.com/xrq730/p/8890896.html,转载请注明出处,谢谢 Redis从入门到精通:初级篇 平时陆陆续续看了不少Red…

    Linux 2023年5月28日
    082
  • Ubuntu更换镜像源

    当修改 sources.list文件时,我们需要将下面任意一个镜像源的代码 复制粘贴到该文件中。 阿里源 阿里镜像源 deb http://mirrors.aliyun.com/u…

    Linux 2023年6月14日
    085
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球