12.1|12.1 客户端请求编码
以dubbo使用netty4为通信框架来进行分析。
客户端请求编码总体流程如下:
1 NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Channel ch, Object msg) 2 -->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer 3 -->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler) 4 -->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg) 5-->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg) 6-->encodeRequest(Channel channel, ChannelBuffer buffer, Request req) 7-->getSerialization(Channel channel)//获取Hessian2Serialization序列化实例 8-->CodecSupport.getSerialization(URL url) 9-->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2")) 10 11-->byte[] header = new byte[16] 12-->Bytes.short2bytes(MAGIC, header)//设置前两个字节为魔数[-38, -69, 0, ..., 0] 13 14-->header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); 15if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; 16if (req.isEvent()) header[2] |= FLAG_EVENT; 17 18-->Bytes.long2bytes(req.getId(), header, 4); 19 20-->new Hessian2ObjectOutput(out) 21-->DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data) 22-->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):消息体长度 23-->buffer.writeBytes(header); // 将header写入buffer的前16位
总体流程很简单:
- 创建一个buffer
- 创建一个16位的byte[16] header,将魔数、请求标志、序列化协议ID、twoway/event标志、requestID、请求体长度写入header
- 之后序列化请求体,从buffer的第17位向后写入序列化后的请求体字节数组
- 最后,将header中的内容写入buffer的前16位
- 最后发送buffer
1@Override 2protected void doOpen() throws Throwable { 3NettyHelper.setNettyLoggerFactory(); 4final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); 5bootstrap = new Bootstrap(); 6bootstrap.group(nioEventLoopGroup) 7.option(ChannelOption.SO_KEEPALIVE, true) 8.option(ChannelOption.TCP_NODELAY, true) 9.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 10//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 11.channel(NioSocketChannel.class); 12 13if (getTimeout() < 3000) { 14bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); 15} else { 16bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); 17} 18 19bootstrap.handler(new ChannelInitializer() { 20 21protected void initChannel(Channel ch) throws Exception { 22NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 23ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 24.addLast("decoder", adapter.getDecoder()) 25.addLast("encoder", adapter.getEncoder()) 26.addLast("handler", nettyClientHandler); 27} 28}); 29}
NettyCodecAdapter:
1 final class NettyCodecAdapter { 2private final ChannelHandler encoder = new InternalEncoder(); 3private final ChannelHandler decoder = new InternalDecoder(); 4private final Codec2 codec; 5private final URL url; 6private final com.alibaba.dubbo.remoting.ChannelHandler handler; 7 8public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { 9this.codec = codec; 10this.url = url; 11this.handler = handler; 12} 13 14public ChannelHandler getEncoder() { 15return encoder; 16} 17 18public ChannelHandler getDecoder() { 19return decoder; 20} 21 22private class InternalEncoder extends MessageToByteEncoder { 23protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { 24com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); 25Channel ch = ctx.channel(); 26NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); 27try { 28codec.encode(channel, buffer, msg); 29} finally { 30NettyChannel.removeChannelIfDisconnected(ch); 31} 32} 33} 34 35private class InternalDecoder extends ByteToMessageDecoder { 36protected void decode(ChannelHandlerContext ctx, ByteBuf input, List
一、创建ChannelBuffer
1 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
这里的out是:
1 ByteBuf buffer = SimpleLeakAwareByteBuf 2 -->ByteBuf buf = PooledUnsafeDirectByteBuf
NettyBackedChannelBuffer:
1private ByteBuf buffer; 2 3public NettyBackedChannelBuffer(ByteBuf buffer) { 4Assert.notNull(buffer, "buffer == null"); 5this.buffer = buffer; 6}
最终的buffer:
1 NettyBackedChannelBuffer 2 -->ByteBuf buffer = SimpleLeakAwareByteBuf 3-->ByteBuf buf = PooledUnsafeDirectByteBuf
二、获取NettyChannel
之后从获取io.netty.channel实例,然后包装在NettyChannel中。
1 NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
1private static final ConcurrentMapchannelMap = new ConcurrentHashMap (); 2private final Channel channel; 3 4private NettyChannel(Channel channel, URL url, ChannelHandler handler) { 5super(url, handler); 6if (channel == null) { 7throw new IllegalArgumentException("netty channel == null; "); 8} 9this.channel = channel; 10} 11 12static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { 13if (ch == null) { 14return null; 15} 16NettyChannel ret = channelMap.get(ch); 17if (ret == null) { 18NettyChannel nettyChannel = new NettyChannel(ch, url, handler); 19if (ch.isActive()) { 20ret = channelMap.putIfAbsent(ch, nettyChannel); 21} 22if (ret == null) { 23ret = nettyChannel; 24} 25} 26return ret; 27}
首先从缓存ConcurrentMap
最终获取到的NettyChannel实例如下:
1 -->Channel channel = NioSocketChannel 2 -->ChannelHandler handler = NettyClient 3 -->URL url = dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&codec=dubbo&default.client=netty4&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2204®ister.ip=10.10.10.10&remote.timestamp=1514958356359&side=consumer&timeout=6000000×tamp=1514959413199
三、进行编码
1 codec.encode(channel, buffer, msg)
这里的codec是:
1 Codec2 codec = 2 DubboCountCodec 3 -->DubboCodec codec = new DubboCodec()
DubboCountCodec
1private DubboCodec codec = new DubboCodec(); 2 3public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { 4codec.encode(channel, buffer, msg); 5}
入参:
- channel:上述的NettyChannel对象
- buffer:上述的NettyBackedChannelBuffer对象
- msg:Request对象,其属性如下:
-
1 long mId = 0 2 String mVersion = "2.0.0" 3 boolean mTwoWay = true 4 boolean mEvent = false 5 boolean mBroken = false 6 Object mData = https://www.it610.com/article/RpcInvocation对象 7 -->String methodName = "sayHello" 8 -->Class>[] parameterTypes = [java.lang.String] 9 -->Object[] arguments = ["world"] 10 -->Map attachments = { 11"path" -> "com.alibaba.dubbo.demo.DemoService" 12"interface" -> "com.alibaba.dubbo.demo.DemoService" 13"version" -> "0.0.0" 14"timeout" -> "6000000" 15 } 16 -->Invoker> invoker = DubboInvoker对象
1public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { 2if (msg instanceof Request) { 3encodeRequest(channel, buffer, (Request) msg); 4} else if (msg instanceof Response) { 5encodeResponse(channel, buffer, (Response) msg); 6} else { 7super.encode(channel, buffer, msg); 8} 9} 10 11protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { 12Serialization serialization = getSerialization(channel); 13// header. 14byte[] header = new byte[HEADER_LENGTH]; 15// set magic number. 16Bytes.short2bytes(MAGIC, header); 17 18// set request and serialization flag. 19header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); 20 21if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; 22if (req.isEvent()) header[2] |= FLAG_EVENT; 23 24// set request id. 25Bytes.long2bytes(req.getId(), header, 4); 26 27// encode request data. 28int savedWriteIndex = buffer.writerIndex(); 29buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); //设置writerIndex为0+16,先输入请求体的字节 30ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); 31ObjectOutput out = serialization.serialize(channel.getUrl(), bos); 32if (req.isEvent()) { 33encodeEventData(channel, out, req.getData()); 34} else { 35encodeRequestData(channel, out, req.getData()); 36} 37out.flushBuffer(); 38bos.flush(); 39bos.close(); 40int len = bos.writtenBytes(); 41checkPayload(channel, len); 42Bytes.int2bytes(len, header, 12); 43 44// write 45buffer.writerIndex(savedWriteIndex); 46buffer.writeBytes(header); // write header. 47buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); 48}
1 首先使用spi获取序列化协议
1 Serialization serialization = getSerialization(channel);
getSerialization位于ExchangeCodec的父类AbstractCodec中。
1protected Serialization getSerialization(Channel channel) { 2return CodecSupport.getSerialization(channel.getUrl()); 3}
1public static Serialization getSerialization(URL url) { 2return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension( 3url.getParameter("serialization", "hessian2")); 4}
最终获取到的Serialization serialization = Hessian2Serialization对象:
1 public class Hessian2Serialization implements Serialization { 2public static final byte ID = 2; 3 4public byte getContentTypeId() { 5return ID; 6} 7 8public String getContentType() { 9return "x-application/hessian2"; 10} 11 12public ObjectOutput serialize(URL url, OutputStream out) throws IOException { 13return new Hessian2ObjectOutput(out); 14} 15 16public ObjectInput deserialize(URL url, InputStream is) throws IOException { 17return new Hessian2ObjectInput(is); 18} 19 }
注意:hessian2序列化方式的id是2,该序列化方式ID会写在协议头里传给服务端,服务端根据序列化方式ID获取对应的序列化方式来反序列化请求体。
2 创建16字节header字节数组
1 byte[] header = new byte[16];
然后填充第1~2个字节为魔数;填充第3个字节为requestFlag、序列化方式ID(这里是2)、twowayFlag或eventFlag;填充第5~12个字节为requestID(long==64bit==8byte)
1// set magic number. 2Bytes.short2bytes(MAGIC, header); 3 4// set request and serialization flag. 5header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); 6 7if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; 8if (req.isEvent()) header[2] |= FLAG_EVENT; 9 10// set request id. 11Bytes.long2bytes(req.getId(), header, 4);
3 序列化请求体
首先设置buffer的writerIndex:
1int savedWriteIndex = buffer.writerIndex(); 2buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); //设置writerIndex为0+16,先输入请求体的字节
首先存储了buffer当前的writeIndex(可写位置),从该位置开始到“该位置+15”这一段我们会写入header字节数组(例如,[0,15]),从“该位置+16”开始向后写入请求体字节数组(例如,[16, x))。
然后就是设置buffer的writerIndex为当前位置+16,因为接下来我们要先序列化请求体,然后将请求体写入buffer,最后才会将header写入buffer。
序列化请求体:
1ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); 2ObjectOutput out = serialization.serialize(channel.getUrl(), bos); 3if (req.isEvent()) { 4encodeEventData(channel, out, req.getData()); 5} else { 6encodeRequestData(channel, out, req.getData()); 7} 8out.flushBuffer(); 9bos.flush(); 10bos.close();
首先新建一个ChannelBufferOutputStream对象(该对象继承了java.io.OutputStream抽象类):
1private final ChannelBuffer buffer; 2private final int startIndex; 3 4public ChannelBufferOutputStream(ChannelBuffer buffer) { 5if (buffer == null) { 6throw new NullPointerException("buffer"); 7} 8this.buffer = buffer; 9startIndex = buffer.writerIndex(); 10}
buffer为上述的NettyBackedChannelBuffer对象;startIndex == 16
然后获取ObjectOutput对象:
1public ObjectOutput serialize(URL url, OutputStream out) throws IOException { 2return new Hessian2ObjectOutput(out); 3}
1private final Hessian2Output mH2o; 2 3public Hessian2ObjectOutput(OutputStream os) { 4mH2o = new Hessian2Output(os); 5mH2o.setSerializerFactory(Hessian2SerializerFactory.SERIALIZER_FACTORY); 6}
1public final static int SIZE = 4096; 2private final byte[] _buffer = new byte[SIZE]; 3protected OutputStream _os; 4 5public Hessian2Output(OutputStream os) { 6_os = os; 7}
最终得到的ObjectOutput对象:
1 Hessian2ObjectOutput 2 -->Hessian2Output mH2o 3-->byte[] _buffer = new byte[4096] 4-->OutputStream _os = 上述的ChannelBufferOutputStream对象 5-->SerializerFactory _serializerFactory = Hessian2SerializerFactory实例
最后执行DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data),该方法是真正的进行请求体序列化的地方。
1@Override 2protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { 3RpcInvocation inv = (RpcInvocation) data; 4 5out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); 6out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); 7out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); 8 9out.writeUTF(inv.getMethodName()); 10out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); 11Object[] args = inv.getArguments(); 12if (args != null) 13for (int i = 0; i < args.length; i++) { 14out.writeObject(encodeInvocationArgument(channel, inv, i)); 15} 16out.writeObject(inv.getAttachments()); 17}
其中,channel是上述的NettyChannel实例;out是上述的Hessian2ObjectOutput实例;data是Request对象中的data属性
1 Object mData = https://www.it610.com/article/RpcInvocation对象 2 -->String methodName = "sayHello" 3 -->Class>[] parameterTypes = [java.lang.String] 4 -->Object[] arguments = ["world"] 5 -->Map attachments = { 6"path" -> "com.alibaba.dubbo.demo.DemoService" 7"interface" -> "com.alibaba.dubbo.demo.DemoService" 8"version" -> "0.0.0" 9"timeout" -> "6000000" 10 } 11 -->Invoker> invoker = DubboInvoker对象
从DubboCodec.encodeRequestData方法中,我们可以看到只会序列化Request请求体中的RpcInvocation对象的:
- methodName:方法名
- parameterTypes:参数类型
- arguments:参数值
- attachments:附加参数
首先来看一下:
1 Hessian2ObjectOutput.writeUTF(String v) 2 -->Hessian2Output.writeString(String value) 3-->printString(String v, int strOffset, int length)
通过这个方法,我们将传入的v存储在ObjectOutput对象的byte[] _buffer = new byte[4096]数组中。
1Hessian2Output: 2/** 3* Writes any object to the output stream. 4*/ 5public void writeObject(Object object) 6throws IOException { 7if (object == null) { 8writeNull(); 9return; 10} 11 12Serializer serializer = findSerializerFactory().getSerializer(object.getClass()); 13serializer.writeObject(object, this); 14} 15 16public final SerializerFactory findSerializerFactory() { 17SerializerFactory factory = _serializerFactory; 18if (factory == null) 19_serializerFactory = factory = new SerializerFactory(); 20return factory; 21} 22 23SerializerFactory: 24private static HashMap _staticSerializerMap; 25private HashMap _cachedSerializerMap; 26/** 27* Returns the serializer for a class. 28* @param cl the class of the object that needs to be serialized. 29* @return a serializer object for the serialization. 30*/ 31public Serializer getSerializer(Class cl) 32throws HessianProtocolException { 33Serializer serializer; 34 35serializer = (Serializer) _staticSerializerMap.get(cl); 36if (serializer != null) 37return serializer; 38 39if (_cachedSerializerMap != null) { 40synchronized (_cachedSerializerMap) { 41serializer = (Serializer) _cachedSerializerMap.get(cl); 42} 43 44if (serializer != null) 45return serializer; 46} 47 48...... 49 50if (serializer != null) { 51 52} 53....... 54else if (Map.class.isAssignableFrom(cl)) { 55if (_mapSerializer == null) 56_mapSerializer = new MapSerializer(); 57 58serializer = _mapSerializer; 59} 60...... 61if (serializer == null) 62serializer = getDefaultSerializer(cl); 63 64if (_cachedSerializerMap == null) 65_cachedSerializerMap = new HashMap(8); 66 67synchronized (_cachedSerializerMap) { 68_cachedSerializerMap.put(cl, serializer); 69} 70 71return serializer; 72}
out.writeObject(Object object):
首先获取_serializerFactory工厂,这里是Hessian2SerializerFactory实例。其getSerializer(Class cl)方法位于其父类SerializerFactory中:获取序列化器的逻辑是:首先从_staticSerializerMap中获取相关类型的序列化器(_staticSerializerMap中启动时就缓存好一堆类型的序列化器:具体见com.alibaba.com.caucho.hessian.io.SerializerFactory),如果有返回,否则从_cachedSerializerMap缓存中获取相关的类加载器,如果没有,根据类型先创建序列化器(new MapSerializer(),当然还有getDefaultSerializer(cl)来兜底),最后放入缓存_cachedSerializerMap中。最后返回创建好的类加载器。
最后调用MapSerializer.writeObject(Object obj, AbstractHessianOutput out)进行序列化。
DubboCodec.encodeRequestData执行完毕之后,我们将所有的信息写入了ObjectOutput对象的byte[] _buffer = new byte[4096]数组中。
注意:
- 如果在将数据写入到_buffer的过程中,字节量超出了4096,会先执行一把Hessian2ObjectOutput.flushBuffer()将_buffer中的数据拷贝到PooledUnsafeDirectByteBuf中,之后再往_buffer中写入字节
1 Hessian2ObjectOutput 2public void flushBuffer() throws IOException { 3mH2o.flushBuffer(); 4} 5 6 Hessian2Output 7public final void flushBuffer() 8throws IOException { 9int offset = _offset; 10 11if (!_isStreaming && offset > 0) { 12_offset = 0; 13_os.write(_buffer, 0, offset); 14} else if (_isStreaming && offset > 3) { 15int len = offset - 3; 16_buffer[0] = 'p'; 17_buffer[1] = (byte) (len >> 8); 18_buffer[2] = (byte) len; 19_offset = 3; 20_os.write(_buffer, 0, offset); 21} 22}
此处执行ChannelBufferOutputStream.write(byte[] b, int off, int len)
1@Override 2public void write(byte[] b, int off, int len) throws IOException { 3if (len == 0) { 4return; 5} 6buffer.writeBytes(b, off, len); 7}
1 ChannelBuffer: 2/** 3* Transfers the specified source array's data to this buffer starting at 4* the current {@code writerIndex} and increases the {@code writerIndex} by 5* the number of the transferred bytes (= {@code length}). 6* 7* @param indexthe first index of the source 8* @param length the number of bytes to transfer 9*/ 10void writeBytes(byte[] src, int index, int length);
就是将ObjectOutput对象的byte[] _buffer = new byte[4096]数组中的数据转移到buf中。(具体方法见:unsafe.copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset,long bytes))
1 NettyBackedChannelBuffer 2 -->ByteBuf buffer = SimpleLeakAwareByteBuf 3-->ByteBuf buf = PooledUnsafeDirectByteBuf
【12.1|12.1 客户端请求编码】4 将header写入buffer
1int len = bos.writtenBytes(); //计算请求体长度 2checkPayload(channel, len); 3Bytes.int2bytes(len, header, 12); //将请求体长度写入header的第13~16个字节(int=4byte) 4 5// write 6buffer.writerIndex(savedWriteIndex); //设置buffer的writerIndex为该次写入的开始位置 7buffer.writeBytes(header); // 将header数组写入buffer 8buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); //设置buffer的writerIndex,为下一次写入做准备
到此为止,整个编码就结束了。之后存储了数据的ByteBuf由netty自己来进行网络传输。
来看一下请求编码的byte[] header的最终结构:
- 1~2 byte:魔数
- 3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
- 5~12 byte :requestID
- 13~16:请求体长度
1protected static void checkPayload(Channel channel, long size) throws IOException { 2int payload = Constants.DEFAULT_PAYLOAD; 3if (channel != null && channel.getUrl() != null) { 4payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD); //8M 5} 6if (payload > 0 && size > payload) { 7ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel); 8logger.error(e); 9throw e; 10} 11}
dubbo限制了如果传输的请求体长度大于8M,将会直接抛出异常。
推荐阅读
- 2020-12(完成事项)
- https请求被提早撤回
- 遇到不正当请求怎么办
- 使用Promise对微信小程序wx.request请求方法进行封装
- AnyProxy抓取http/https请求
- JavaScript|vue 基于axios封装request接口请求——request.js文件
- spring|spring boot中设置异步请求默认使用的线程池
- 12.13
- c#常用网址记录
- 微信小程序请求接口格式