人生难得几回搏,此时不搏待何时。这篇文章主要讲述Netty之非阻塞处理相关的知识,希望能为你提供帮助。
Netty
是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO
程序。
一、异步模型同步I/O : 需要进程去真正的去操作I/O;
异步I/O:内核在I/O操作完成后再通知应用进程操作结果。
怎么去理解同步和异步?
同步:
比如服务端发送数据给客户端,客户端中的处理器(继承一个入站处理器
即可),可以去重写channelRead0
方法,那么该方法触发的时候,其实必须得服务器有消息发过来,客户端才能去读写,两者必须是有先后
顺序,这就是所谓的同步
。- 异步:客户端在服务端发送数据来之前就已经返回数据给了用户,但客户端已经告诉服务端数据到了要通过订阅的方式(大名鼎鼎的
观察者模式
),文章最后已经附上传送门,理解设计模式
Netty
的AttributeKey
和AttributeMap
的原理和使用,这里不妨讲讲它的缺点二、异步模型存在的问题使用流程
创建一个处理器
NettyClientHandler
继承 SimpleChannelInboundHandler&
lt;
RpcResponse&
gt;
,它已经实现了 入站处理器相关的功能,只要重写它的 channelRead0
方法即可public class NettyClientHandler extends SimpleChannelInboundHandler<
RpcResponse>
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception
try
AttributeKey<
RpcResponse>
key = AttributeKey.valueOf(msg.getRequestId());
ctx.channel().attr(key).set(msg);
ctx.channel().close();
finally
ReferenceCountUtil.release(msg);
记得将该 处理器 加入到 客户端
bootStrap
的 handler()
方法中,需要 通过默认的 初始化器new ChannelInitializer&
lt;
SocketChannel&
gt;
()
(也是一个处理器)去初始化处理器链,我是通过匿名内部类去重写 initChannel
方法的,最后addLast()
刚刚自己写的处理器即可。创建服务器和客户端,这里不再赘述,这篇文章对刚入门的帮助不大,可到文章最后取经拿服务端和客户端。
最后测试到,客户端拿不到值,总是为
null
那怎么保持使用异步操作,并且可以顺利拿到值呢?
那么就得通过
future
来实现,就是先返回值,但值还是没有的,后面让用户自己用future
的方法get
阻塞拿值,说白了,还是要去同步,只是同步由CPU
转到了用户
自己手中,慢慢品三、使用CompletableFuture 解决异步问题
CompletableFuture
使用方法CompletableFuture<
RpcResponse>
resultFuture = new CompletableFuture<
>
();
/**complete 执行结束后,状态发生改变,则 说明 值已经传到了,complete 是 (被观察者)
通知类的通知方法,通知 观察者 ,get 方法将 不再阻塞,可以获取到值
*/
resultFuture .complete(msg);
/**获取 正确结果,get 是阻塞操作,所以 先把 resultFuture 作为 返回值 返回,再 get
获取值
*/
RpcResponse rpcResponse = resultFuture.get();
// 获取 错误结果, 抛 异常 处理
resultFuture.completeExceptionally(future.cause());
所以我们要做的就是在
channelRead0()
中 做 complete()
,最后 用户直接 get
得到数据即可,只要把sendRequest()
方法的返回类型改为CompletableFuture
就可以了。简单来说就是通过使用这个
CompletableFuture
,让 response
不至于返回后是null,因为我们自己new
了一个CompletableFuture
类,这个类会被通知,并把结果告知给它需要注意的是,在 客户端的
sendRequest()
方法拿到的 CompletableFuture&
lt;
RpcResponse&
gt;
和在channelRead0()
拿到的必须为同一个,可以设计成单例模式
,这里是很泛化的单例,通用public class SingleFactory private static Map<
Class, Object>
objectMap = new HashMap<
>
();
private SingleFactory() /**
* 使用 双重 校验锁 实现 单例模式
* @param clazz
* @param <
T>
* @return
*/
public static <
T>
T getInstance(Class<
T>
clazz)
Object instance = objectMap.get(clazz);
if (instance == null)
synchronized (clazz)
if (instance == null)
try
instance = clazz.newInstance();
catch (InstantiationException | IllegalAccessException e)
throw new RuntimeException(e.getMessage(), e);
return clazz.cast(instance);
下面这样实现是因为涉及到多个客户端并发访问同一个服务器,设计的原因如下:
- 如果是同一个客户端要采用发起多个线程去请求服务端,设计时如果多个线程的
rpcRequest
请求id
一样,那么要考虑线程安全 - 如果是不同客户端发起请求服务端,又要保证线程之间对
CompleteFuture
是线程安全的,确保性能,不能用让所有线程共享同一个CompleteFuture
,这样通知会变为不定向,不可用,因此考虑使用map
暂时缓存所有CompleteFuture
,更加高效
public class UnprocessedRequests /**
* k - request id
* v - 可将来获取 的 response
*/
private static ConcurrentMap<
String, CompletableFuture<
RpcResponse>
>
unprocessedResponseFutures = new ConcurrentHashMap<
>
();
/**
* @param requestId 请求体的 requestId 字段
* @param future 经过 CompletableFuture 包装过的 响应体
*/
public void put(String requestId, CompletableFuture<
RpcResponse>
future)
System.out.println("put" + future);
unprocessedResponseFutures.put(requestId, future);
/**
* 移除 CompletableFuture<
RpcResponse>
* @param requestId 请求体的 requestId 字段
*/
public void remove(String requestId)
unprocessedResponseFutures.remove(requestId);
public void complete(RpcResponse rpcResponse)
CompletableFuture<
RpcResponse>
completableFuture = unprocessedResponseFutures.remove(rpcResponse.getRequestId());
completableFuture.complete(rpcResponse);
System.out.println("remove" + completableFuture);
传送门:
设计模式:https://gitee.com/fyphome/git-res/tree/master/design-patterns
或者:https://github.com/Fyupeng/java/tree/main/design_patterns
服务端和客户端的实现:https://github.com/Fyupeng/java/tree/main/NettyPro/src/main/java/com/fyp/netty/groupchat
?
四、结束语【Netty之非阻塞处理】评论区可留言,可私信,可互相交流学习,共同进步,欢迎各位给出意见或评价,本人致力于做到优质文章,希望能有幸拜读各位的建议!
个人博客园:https://www.cnblogs.com/fyphome/
推荐阅读
- 死磕Lambda表达式(Lambda的使用)
- 信息安全深透测试与工房操作系统弱点扫描
- 云计算-10-Docker网络使用
- CentOS vim编辑器
- 数仓建模—OneID
- Keepalived+LVS实战案例( 单主架构实现WEB负载均衡及可用)
- Tars | 第2篇 TarsJava SpingBoot启动与负载均衡源码初探 #yyds干货盘点#
- 盘点微软对Windows8 Beta关键技巧改进
- 无法更新到windows8.1的原因与处理办法