Netty之非阻塞处理

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

同步I/O : 需要进程去真正的去操作I/O;

异步I/O:内核在I/O操作完成后再通知应用进程操作结果。

怎么去理解同步和异步?

  • 同步:比如服务端发送数据给客户端,客户端中的处理器(继承一个 入站处理器即可),可以去重写 channelRead0 方法,那么该方法触发的时候,其实必须得服务器有消息发过来,客户端才能去读写,两者必须是有 先后顺序,这就是所谓的 同步
  • 异步:客户端在服务端发送数据来之前就已经返回数据给了用户,但客户端已经告诉服务端数据到了要通过订阅的方式(大名鼎鼎的 观察者模式),文章最后已经附上传送门,理解设计模式

比如上一篇关于 NettyAttributeKeyAttributeMap的原理和使用,这里不妨讲讲它的缺点

使用流程

Step1 使用 AttributeKey 设置 key 值和 k-v 对,为 channel 获取 值做准备

创建一个处理器 NettyClientHandler 继承 SimpleChannelInboundHandler<rpcresponse></rpcresponse>,它已经实现了 入站处理器相关的功能,只要重写它的 channelRead0 方法即可

public class NettyClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
AttributeKey key = AttributeKey.valueOf(msg.getRequestId());
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
}

记得将该 处理器 加入到 客户端 bootStraphandler()方法中,需要 通过默认的 初始化器 new ChannelInitializer<socketchannel>()</socketchannel>(也是一个处理器)去初始化处理器链,我是通过匿名内部类去重写 initChannel 方法的,最后 addLast() 刚刚自己写的处理器即可。

创建服务器和客户端,这里不再赘述,这篇文章对刚入门的帮助不大,可到文章最后取经拿服务端和客户端。

Step2 使用 channel 的 attr 方法,获取 k-v 值

客户端这里 NettyClient 通过用户调用 sendRequest() 方法,去向服务端发送信息,返回值是服务端发回的消息,我们都知道,信息都是在处理器获取的,也就是在 channelRead0方法中,所以我们要在 sendRequest()方法中,获取服务端传来的值,通过下面代码获取

@Override
public Object sendRequest(RpcRequest rpcRequest) throws RpcException {
// 通过 host 和 port 获取 channel
//省略
// 写入 channel 让 服务端 去 读 request
channel.writeAndFlush(rpcRequest);
// 获取 k-v 对
RpcResponse rpcResponse = channel.attr(key).get();
}

相信你们当中有一部分发觉了异样, sendRequest()方法和 channelRead0()不会同步,就是说你发送数据后,会立马执行到 获取 k-v 的代码,不能 &#x963B;&#x585E;住等待 channelRead0()方法把 k-vset 进去

最后测试到,客户端拿不到值,总是为 null

那怎么保持使用异步操作,并且可以顺利拿到值呢?

那么就得通过 future来实现,就是先返回值,但值还是没有的,后面让用户自己用 future的方法 get阻塞拿值,说白了,还是要去同步,只是同步由 CPU转到了 &#x7528;&#x6237;自己手中,慢慢品

CompletableFuture 使用方法

CompletableFuture resultFuture = new CompletableFuture<>();
/**complete 执行结束后,状态发生改变,则 说明 值已经传到了,complete 是 (被观察者)
通知类的通知方法,通知 观察者 ,get 方法将 不再阻塞,可以获取到值
*/
resultFuture .complete(msg);
/**获取 正确结果,get 是阻塞操作,所以 先把 resultFuture 作为 返回值 返回,再 get
获取值
*/
RpcResponse rpcResponse = resultFuture.get();
// 获取 错误结果, 抛 异常 处理
resultFuture.completeExceptionally(future.cause());

所以我们要做的就是在 channelRead0()中 做 complete(),最后 用户直接 get得到数据即可,只要把 sendRequest()方法的返回类型改为 CompletableFuture 就可以了。

简单来说就是通过使用这个 CompletableFuture,让 response不至于返回后是null,因为我们自己 new了一个 CompletableFuture类,这个类会被通知,并把结果告知给它

需要注意的是,在 客户端的 sendRequest()方法拿到的 CompletableFuture<rpcresponse></rpcresponse> 和在 channelRead0()拿到的必须为同一个,可以设计成 &#x5355;&#x4F8B;&#x6A21;&#x5F0F;,这里是很泛化的单例,通用

public class SingleFactory {

private static Map objectMap = new HashMap<>();

private SingleFactory() {}

/**
* 使用 双重 校验锁 实现 单例模式
* @param clazz
* @param
* @return
*/
public static  T getInstance(Class clazz) {
Object instance = objectMap.get(clazz);
if (instance == null) {
synchronized (clazz) {
if (instance == null) {
try {
instance = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
return clazz.cast(instance);
}

}

下面这样实现是因为涉及到多个客户端并发访问同一个服务器,设计的原因如下:

  • 如果是同一个客户端要采用发起多个线程去请求服务端,设计时如果多个线程的 rpcRequest请求 id一样,那么要考虑线程安全
  • 如果是不同客户端发起请求服务端,又要保证线程之间对 CompleteFuture是线程安全的,确保性能,不能用让所有线程共享同一个 CompleteFuture,这样通知会变为不定向,不可用,因此考虑使用 map暂时缓存所有 CompleteFuture,更加高效
public class UnprocessedRequests {

/**
* k - request id
* v - 可将来获取 的 response
*/
private static ConcurrentMap> unprocessedResponseFutures = new ConcurrentHashMap<>();

/**
* @param requestId 请求体的 requestId 字段
* @param future 经过 CompletableFuture 包装过的 响应体
*/
public void put(String requestId, CompletableFuture future) {
System.out.println("put" + future);
unprocessedResponseFutures.put(requestId, future);
}

/**
* 移除 CompletableFuture
* @param requestId 请求体的 requestId 字段
*/
public void remove(String requestId) {
unprocessedResponseFutures.remove(requestId);
}

public void complete(RpcResponse rpcResponse) {
CompletableFuture completableFuture = unprocessedResponseFutures.remove(rpcResponse.getRequestId());
completableFuture.complete(rpcResponse);
System.out.println("remove" + completableFuture);
}
}

传送门:

Original: https://www.cnblogs.com/fyphome/p/16082667.html
Author: 延年有余
Title: Netty之非阻塞处理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/581002/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • iOS macOS 回到主线程的三种方式

    简单说将代码同步到主线程执行的三种方法如下:// 1.NSThread [self performSelectorOnMainThread:@selector(updateUI) …

    Java 2023年5月29日
    098
  • mybatis返回自增主键踩坑记

    背景 MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可…

    Java 2023年5月30日
    082
  • 数据分表Mybatis Plus动态表名最优方案的探索

    一、应用场景 大家在使用Mybatis进行开发的时候,经常会遇到一种情况:按照月份month将数据放在不同的表里面,查询数据的时候需要跟不同的月份month去查询不同的表。 但是我…

    Java 2023年6月15日
    090
  • 使用Redis+SpringBoot实现定时任务测试

    Redis实现定时任务是基于对RedisKey值的监控 具体代码实现: 建一个SpringBoot项目 引入依赖 "1.0" encoding="UT…

    Java 2023年6月13日
    072
  • (WebFlux)001、如何自定义注解实现功能

    一、背景 最近在项目又在压测,但基于Http请求类型的校验过多,已有想法把Http请求换成Spring中的WebClient,但是由于不是原配(SpringWebFlux + We…

    Java 2023年6月15日
    082
  • Javaweb面试

    一:cookie和session的区别?1.session和cookie都是会话跟踪技术2.session是保存在服务器端的技术,而cookie是保存在客户端的技术3.cookie…

    Java 2023年6月5日
    095
  • SpringBoot整合Redis和SpringBoot(SpringCache)整合Redis

    参考博客: https://blog.csdn.net/lingerlan510/article/details/121906813 https://blog.csdn.net/u…

    Java 2023年6月6日
    070
  • 怎么关闭电脑系统提示声音

    1、在电脑桌面的空白处,点击鼠标右键,在跳转的选项中点击”个性化”。 2、页面进入”个性化”设置中。 3、点击页面下方的&#8221…

    Java 2023年6月5日
    0131
  • beanFactory 设计模式 Bean 生命周期的胡言乱语,哈哈

    写在前面的话 适用读者:有一定经验的,本文不适合初学者,因为可能不能理解我在说什么 文章思路:不会一开始就像别的博客文章那样,Bean 的生命周期,源码解读(给你贴一大堆的源码)。…

    Java 2023年6月5日
    076
  • XenServer 5.5 断电重启虚拟机磁盘丢失的修复

    1.现象 公司云平台使用的是XenServer 5.5,版本比较老了。最近几天因为机房改造,导致云环境断电,重启之后发现有2台机器无法ping到,所以再次重启,登录修复网卡,最后发…

    Java 2023年5月30日
    071
  • eclipse导入依赖后不生效问题

    在父程component中引入子工程entity,导入依赖之后不生效,无法导入包 解决办法: 右键Maven—–>update project Ori…

    Java 2023年6月9日
    077
  • MySQL-InnoDB-MVCC多版本并发控制

    一、MySQL可重复读级别下,因为MVCC引起的BUG,下图1为相应的Java代码,其中事务1的生命周期最长,循环开启的事务2、3、4。。。与事务1并行 ,数据的读取只会成功一次,…

    Java 2023年6月16日
    049
  • 好玩Python——PIL项目实训(三)——gif

    1 # -*- coding: utf-8 -*- 2 """ 3 Created on Tue Apr 14 01:55:48 2020 4 5 @…

    Java 2023年6月6日
    080
  • day03_3_流程控制练习题

    流程控制练习题 一、编程题 1、实现一个课程名称和课程代号的转换器:输入下表中的课程代号,输出课程的名称。用户可以循环进行输入,如果输入0就退出系统。(使用switch +whil…

    Java 2023年6月8日
    078
  • 西门子PLC数据读取 Observer设计模式

    当我听到这个需求的时候,我差点爆粗口(实际上可能已经爆了,不过我忘了)。 需求刚开始是: C#连接PLC Modbus读取值。 我用C#写完了,觉得太简单了,还弄了个窗体。 接着是…

    Java 2023年6月9日
    077
  • Java动态脚本Groovy读取配置文件

    前言:请各大网友尊重本人原创知识分享,谨记本人博客: 南国以南i 核心涉及: @Value:作用是通过注解将常量、配置文件中的值、其他bean的属性值注入到变量中,作为变量的初始值…

    Java 2023年6月5日
    082
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球