Nett分布式分隔符解码器逻辑源码剖析
目录
- 分隔符解码器
- 我们看其中的一个构造方法
- 我们跟到重载decode方法中
- 我们看初始化该属性的构造方法
- 章节总结
分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包
同样继承了ByteToMessageDecoder并重写了decode方法
我们看其中的一个构造方法
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {this(maxFrameLength, true, delimiters); }
这里参数maxFrameLength代表最大长度, delimiters是个可变参数, 可以说可以支持多个分隔符进行解码
我们进入decode方法:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List
这里同样调用了其重载的decode方法并将解析好的数据添加到集合list中, 其父类就可以遍历out, 并将内容传播
我们跟到重载decode方法中
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {//行处理器(1)if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); }int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; //找到最小长度的分隔符(2)for (ByteBuf delim: delimiters) {//每个分隔符分隔的数据包长度int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) {minFrameLength = frameLength; minDelim = delim; }}//解码(3)//已经找到分隔符if (minDelim != null) {int minDelimLength = minDelim.capacity(); ByteBuf frame; //当前分隔符否处于丢弃模式if (discardingTooLongFrame) { //首先设置为非丢弃模式discardingTooLongFrame = false; //丢弃buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) {fail(tooLongFrameLength); }return null; }//处于非丢弃模式//当前找到的数据包, 大于允许的数据包if (minFrameLength > maxFrameLength) {//当前数据包+最小分隔符长度 全部丢弃buffer.skipBytes(minFrameLength + minDelimLength); //传递异常事件fail(minFrameLength); return null; }//如果是正常的长度//解析出来的数据包是否忽略分隔符if (stripDelimiter) {//如果不包含分隔符//截取frame = buffer.readRetainedSlice(minFrameLength); //跳过分隔符buffer.skipBytes(minDelimLength); } else {//截取包含分隔符的长度frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); }return frame; } else {//如果没有找到分隔符//非丢弃模式if (!discardingTooLongFrame) {//可读字节大于允许的解析出来的长度if (buffer.readableBytes() > maxFrameLength) {//将这个长度记录下tooLongFrameLength = buffer.readableBytes(); //跳过这段长度buffer.skipBytes(buffer.readableBytes()); //标记当前处于丢弃状态discardingTooLongFrame = true; if (failFast) {fail(tooLongFrameLength); }}} else {tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); }return null; }}
这里的方法也比较长, 这里也通过拆分进行剖析
(1). 行处理器
(2). 找到最小长度分隔符
(3). 解码
首先看第一步行处理器:
if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); }
这里首先判断成员变量lineBasedDecoder是否为空, 如果不为空则直接调用lineBasedDecoder的decode的方法进行解码, lineBasedDecoder实际上就是上一小节剖析的LineBasedFrameDecoder解码器
这个成员变量, 会在分隔符是\n和\r\n的时候进行初始化
我们看初始化该属性的构造方法
public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {//代码省略//如果是基于行的分隔if (isLineBased(delimiters) && !isSubclass()) {//初始化行处理器lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else {//代码省略}//代码省略}
这里isLineBased(delimiters)会判断是否是基于行的分隔, 跟到isLineBased方法中:
private static boolean isLineBased(final ByteBuf[] delimiters) {//分隔符长度不为2if (delimiters.length != 2) {return false; }//拿到第一个分隔符ByteBuf a = delimiters[0]; //拿到第二个分隔符ByteBuf b = delimiters[1]; if (a.capacity() < b.capacity()) {a = delimiters[1]; b = delimiters[0]; }//确保a是/r/n分隔符, 确保b是/n分隔符return a.capacity() == 2 && b.capacity() == 1&& a.getByte(0) == '\r' && a.getByte(1) == '\n'&& b.getByte(0) == '\n'; }
首先判断长度等于2, 直接返回false
然后拿到第一个分隔符a和第二个分隔符b, 然后判断a的第一个分隔符是不是\r, a的第二个分隔符是不是\n, b的第一个分隔符是不是\n, 如果都为true, 则条件成立
我们回到decode方法中, 看步骤2, 找到最小长度的分隔符:
这里最小长度的分隔符, 意思就是从读指针开始, 找到最近的分隔符
for (ByteBuf delim: delimiters) {//每个分隔符分隔的数据包长度int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) {minFrameLength = frameLength; minDelim = delim; }}
这里会遍历所有的分隔符, 然后找到每个分隔符到读指针到数据包长度
然后通过if判断, 找到长度最小的数据包的长度, 然后保存当前数据包的的分隔符, 如下图:
文章图片
6-4-1
这里假设A和B同为分隔符, A分隔符到读指针的长度小于B分隔符到读指针的长度, 这里会找到最小的分隔符A, 分隔符的最小长度, 就readIndex到A的长度
我们继续看第3步, 解码:
if (minDelim != null) 表示已经找到最小长度分隔符, 我们继续看if块中的逻辑:
int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) {fail(tooLongFrameLength); }return null; } if (minFrameLength > maxFrameLength) { buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); }return frame;
if (discardingTooLongFrame) 表示当前是否处于非丢弃模式, 如果是丢弃模式, 则进入if块
因为第一个不是丢弃模式, 所以这里先分析if块后面的逻辑
if (minFrameLength > maxFrameLength) 这里是判断当前找到的数据包长度大于最大长度, 这里的最大长度使我们创建解码器的时候设置的, 如果超过了最大长度, 就通过 buffer.skipBytes(minFrameLength + minDelimLength) 方式, 跳过数据包+分隔符的长度, 也就是将这部分数据进行完全丢弃
【Nett分布式分隔符解码器逻辑源码剖析】继续往下看, 如果长度不大最大允许长度, 则通过 if (stripDelimiter) 判断解析的出来的数据包是否包含分隔符, 如果不包含分隔符, 则截取数据包的长度之后, 跳过分隔符
我们再回头看 if (discardingTooLongFrame) 中的if块中的逻辑, 也就是丢弃模式:
首先将discardingTooLongFrame设置为false, 标记非丢弃模式
然后通过 buffer.skipBytes(minFrameLength + minDelimLength) 将数据包+分隔符长度的字节数跳过, 也就是进行丢弃, 之后再进行抛出异常
分析完成了找到分隔符之后的丢弃模式非丢弃模式的逻辑处理, 我们在分析没找到分隔符的逻辑处理, 也就是 if (minDelim != null) 中的else块:
if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true; if (failFast) {fail(tooLongFrameLength); }}} else {tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); }return null;
首先通过 if (!discardingTooLongFrame) 判断是否为非丢弃模式, 如果是, 则进入if块:
在if块中, 首先通过 if (buffer.readableBytes() > maxFrameLength) 判断当前可读字节数是否大于最大允许的长度, 如果大于最大允许的长度, 则将可读字节数设置到tooLongFrameLength的属性中, 代表丢弃的字节数
然后通过 buffer.skipBytes(buffer.readableBytes()) 将累计器中所有的可读字节进行丢弃
最后将discardingTooLongFrame设置为true, 也就是丢弃模式, 之后抛出异常
如果 if (!discardingTooLongFrame) 为false, 也就是当前处于丢弃模式, 则追加tooLongFrameLength也就是丢弃的字节数的长度, 并通过 buffer.skipBytes(buffer.readableBytes()) 将所有的字节继续进行丢弃
以上就是分隔符解码器的相关逻辑
章节总结 本章介绍了抽象解码器ByteToMessageDecoder, 和其他几个实现了ByteToMessageDecoder类的解码器, 这个几个解码器逻辑都比较简单, 同学们可以根据其中的思想剖析其他的比较复杂的解码器, 或者根据其规则实现自己的自定义解码器
更多关于Nett分布式分隔符解码器的资料请关注脚本之家其它相关文章!
推荐阅读
- 架构|浅析分布式、集群及高并发
- [源码解析]|[源码解析] TensorFlow 分布式环境(5) --- Session
- Netty分布式ByteBuf使用subPage级别内存分配剖析
- Netty分布式ByteBuf使用的回收逻辑剖析
- Netty分布式ByteBuf使用的底层实现方式源码解析
- java实现分布式项目搭建
- 研发管理|学习笔记(06):21天搞定分布式Python网络爬虫-爬虫前奏小结和作业
- python|21天python分布式爬虫--爬虫前奏3
- Citus|Citus 分布式 PostgreSQL 集群 - SQL Reference(创建和修改分布式表 DDL)
- 华为云GaussDB(用分布式计算和存算分离的集合力量打造云原生数据库)