编程语言_JAVA|Netty实战-如何在web项目中对接第三方tcp协议

公司有一个专门用于对接第三方停车,crm,erp等等业务系统的项目由我负责,十几个场子全都是基于Http实现的,突然有一天实施告诉我有一个基于TCP协议实现的停车系统要接入。坦白讲,最开始我是拒绝的,因为我很难接受要引入一个这么难以维护,并且很大程度上会破坏原有系统边界的玩意,架不住人家就是强势,我就是不换车厂,你爱接不接,没办法,只能硬着头皮想办法上
1.坑爹的需求
停车这玩意跟crmerp还不一样,通常的这类第三方系统是由我们主动发起调用。而停车的核心业务除了我们要主动调用的查费,缴费之外,还有需要第三方主动发起调用的车辆出入场推送。好吧,如果是做一个基于此的停车系统,并且对外提供这些方法,用tcp来实现这些根本不算什么,本来就是全双工协议嘛。可问题是我们是要去调用的第三方,我们所有的业务都是基于Http请求响应来实现的。而换成tcp长连接后就会带来很多问题,比如,同一个信道,我一个请求发过去了,怎么让返回的数据和我的请求对应上?你可能会说,简单,加一个请求ID不就行了?native!前面说了,人家就是强势,我什么也不改,你爱接不接。而且就算这个问题解决了,系统的边界控制不住,后面别的维护的同事接收也会很难维护。
2.办法总比困难多
说了这么多,活还是要干的。我的思路很简单,既然原有的系统引入tcp协议很难维护,那干脆新搭一个系统,把它包成http协议再提供给我们系统不就行了,没有请求ID,我们就自己根据协议数据生成请求ID呗。
最终我给出的方案是基于nettyspringboot搭建一个中间服务,接受来自目标车厂的出入场下行数据并转换为http调用至我方标准停车出入场接口,同时提供标准查费,缴费接口给我方系统。
3.最终实现
我也是第一次用netty,以前就看过一本netty实战,而且全公司也没第二个人用过,所以很多东西都只能自己摸索,基础的东西就不赘述了,网上挺多的。这里只讲几个关键的实现
1.解码器 大多数自定义tcp协议都有几个必不可少的元素,一是开头标志位,二是数据包长度,基于此就能从一个tcp信道中的数据流里解析出你想要的数据,具体参照下面的代码,碰到标志位先判断包长度,长度不够就写入缓冲区等到下次读取,长度够了就按位按协议解析数据即可

@Component public class ProtocolDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) { Protocol protocol = new Protocol(); int a = byteBuf.readableBytes(); if(byteBuf.readableBytes() >= Constant.headLength) { int beginReader; while (true) { // 获取包头开始的index beginReader = byteBuf.readerIndex(); // 标记包头开始的index byteBuf.markReaderIndex(); if(messageStart(byteBuf)){ break; } if (byteBuf.readableBytes() < Constant.headLength) { return; } } byte method = byteBuf.readByte(); //设置方法 protocol.setMethod(MethodEnum.getByRequestCode(method)); int bagLength = byteBuf.readInt(); if (byteBuf.readableBytes() < bagLength - 7) { // 还原读指针 byteBuf.readerIndex(beginReader); return; } byteBuf.skipBytes(32); //读取数据 int dataLength = byteBuf.readInt(); byte[] dataByte = new byte[dataLength]; byteBuf.readBytes(dataByte); String dataString = new String(dataByte, StandardCharsets.UTF_8); JSONObject data = https://www.it610.com/article/JSONObject.parseObject(dataString); if(method == MethodEnum.QUERY_FEE.getRequestCode() || method == MethodEnum.NOTIFY_PAY.getRequestCode()){ JSONArray jsonArray = data.getJSONArray("data"); JSONObject json = jsonArray.getJSONObject(0); String flowNo = json.getString("inserialno"); protocol.setFlowNo(flowNo); } //设置数据 protocol.setData(data); list.add(protocol); } }private boolean messageStart(ByteBuf byteBuffer){ if(byteBuffer.readByte() == 0x24){ byteBuffer.markReaderIndex(); return byteBuffer.readByte() == 0x24; } return false; } }
2.编码器 编码器没什么可说的,按位写入协议数据即可
public class ProtocolEncoder extends MessageToByteEncoder { ParkingConfig parkingConfig; public ProtocolEncoder(ParkingConfig config){ this.parkingConfig = config; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Protocol protocol, ByteBuf out) throws Exception { String data = https://www.it610.com/article/protocol.getData().toJSONString(); int textLength = data.getBytes().length; //包头标识 out.writeBytes(Constant.beginBytes); //协议指令码 if(protocol.isUp()) { out.writeByte(protocol.getMethod().getRequestCode()); }else{ out.writeByte(protocol.getMethod().getResponseCode()); } //包长 out.writeInt(textLength + Constant.headLength); //授权码 out.writeCharSequence(parkingConfig.getTcpKey(), StandardCharsets.UTF_8); //文本长度 out.writeInt(textLength); //文本内容 out.writeCharSequence(data, StandardCharsets.UTF_8); //校验码 byte[] req = new byte[out.readableBytes()]; out.writeBytes(CrcUtils.setParamCRC(req)); //包尾标识 out.writeByte(0X0D); out.writeByte(0X0D); } }

3.TCP-HTTP 最大的问题是如何将请求和响应对应上,并且将其包装为HTTP协议,我们知道HTTP协议是基于请求-响应模型实现的,要在TCP长连接中实现这一点,就必须要把响应数据从信道中拿出来返回给请求的线程,并且要能和请求对应上。前面说了,三方系统并未提供请求ID,但基于已有的协议数据我们可以手动生成请求ID。首先第三方协议的上行下行协议码是一一对应的,简单来说,查费上行协议码是0x00,那下行协议码就是0x01,这个对应关系不会改变,我们定义一个协议枚举,在这个枚举中添加上行,下行协议码,并提供根据协议码获取协议的方法即可。同时停车业务中少不了车牌这个元素,我们在设定同一个车牌只能在一个车厂停车的边界后,可以用协议码 + 车牌来标识请求并且将响应数据匹配到请求数据中。代码如下
public class RequestBlockUtils {private static ConcurrentHashMap responseMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap requestMap = new ConcurrentHashMap<>(); private static Lock lock = new ReentrantLock(); public static int queueRemain(){ return requestMap.size(); }public static void putRequest(Protocol requestProtocol){ String requestUid = getRequestIdByRequest(requestProtocol); String responseUid = getResponseIdByRequest(requestProtocol); requestMap.put(requestUid, responseUid); }public static void putResponse(Protocol responseProtocol){ String requestId = getRequestIdByResponse(responseProtocol); lock.lock(); if(requestMap.containsKey(requestId)) { String responseId = getResponseIdByResponse(responseProtocol); responseMap.put(responseId, responseProtocol); } lock.unlock(); }public static Protocol pullResponseWithTimeOut(Protocol requestProtocol, int timeOut){ String requestUid = getRequestIdByRequest(requestProtocol); if(!requestMap.containsKey(requestUid)){ return null; } String responseUid = requestMap.get(requestUid); long startTime = new Date().getTime(); while(true){ if(new Date().getTime() - startTime >= timeOut * 1000 || responseMap.containsKey(responseUid)){ break; } } lock.lock(); if(responseMap.containsKey(responseUid)){ return responseMap.remove(responseUid); } requestMap.remove(requestUid); lock.unlock(); throw new RuntimeException("获取响应结果超时param:" + requestProtocol.getData().toJSONString()); }private static String getRequestIdByRequest(Protocol requestProtocol){ return requestProtocol.getFlowNo() + "$Method$" + requestProtocol.getMethod().getRequestCode(); }private static String getResponseIdByRequest(Protocol requestProtocol){ return requestProtocol.getFlowNo() + "$Method$" + requestProtocol.getMethod().getResponseCode(); }private static String getRequestIdByResponse(Protocol responseProtocol){ return responseProtocol.getFlowNo() + "$Method$" + responseProtocol.getMethod().getRequestCode(); }private static String getResponseIdByResponse(Protocol responseProtocol){ return responseProtocol.getFlowNo() + "$Method$" + responseProtocol.getMethod().getResponseCode(); } }

Protocol.flowNo这里放入的是车牌,我们用两个Map来保存请求,响应关系的映射。在发起请求时,首先根据协议的上行下行协议码+车牌生成请求-响应ID映射并且放入Map同时调用pullResponseWithTimeOut阻塞一段时间尝试获取相应,当信道中有响应数据时会根据响应数据生成相应ID并将结果放入mappullResponseWithTimeOut方法如果未超时就会返回数据,如果已超时,直接抛出异常,注意这里需要加锁,否则可能会出现超时的响应数据未清除的问题
4.加个重试功能 【编程语言_JAVA|Netty实战-如何在web项目中对接第三方tcp协议】第三方的这种项目通常是部署在机房的,网络波动是很正常的事情,不可能每次都再去重启一次服务,所以我们这里做一个简单短线自动重连功能
@Component public class NettyClient {private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); @Autowired private ParkingConfig parkingConfig; @Autowired private WjlService wjlService; @Autowired private Store store; private Channel channel; Bootstrap b; private boolean start = false; public void start() throws Exception { if(start){ throw new RuntimeException("cannot start client that has started"); } start = true; logger.info("开始初始化客户端"); NioEventLoopGroup group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) // 注册线程池 .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(parkingConfig.getTcpIp(), parkingConfig.getTcpPort())) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)); pipeline.addLast("decoder", new ProtocolDecoder()); pipeline.addLast("encoder", new ProtocolEncoder(parkingConfig)); pipeline.addLast(new MethodHandler(parkingConfig, wjlService, store)); pipeline.addLast(new ExceptionHandler()); pipeline.addLast(new HeartHandler()); } }); logger.info("客户端初始化初始化完成"); connect(); }public Protocol request(Protocol requestProtocol){ logger.info("接收到第三方请求数据:{},目前待处理请求数量:{}", requestProtocol.getData(), RequestBlockUtils.queueRemain()); RequestBlockUtils.putRequest(requestProtocol); channel.writeAndFlush(requestProtocol); return RequestBlockUtils.pullResponseWithTimeOut(requestProtocol, 5); }private void connect() throws InterruptedException { if(channel != null && channel.isActive()){ return; } ChannelFuture cf; while(true) { try { logger.info("开始连接服务器...."); cf = b.connect().sync(); if(cf.isSuccess()){ channel = cf.channel(); break; } }catch (Exception e){ logger.info("服务器连接失败,reason:{},将再十秒后开始重连接", e.getMessage()); Thread.sleep(10000); } } logger.info("服务器连接成功......."); cf.channel().closeFuture().sync(); }class HeartHandler extends ChannelInboundHandlerAdapter{ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("120秒未收到服务端心跳--------------"); if (evt instanceof IdleStateEvent){ channel.close().sync(); }else { super.userEventTriggered(ctx,evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); ctx.channel().close().sync(); NettyClient.this.connect(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error(cause.getMessage()); } } }

    推荐阅读