java|java 子节点查找父节点_现代化的 Java (二十)——撮合节点的实现

今天我做了一些整合工作,把撮合、行情、状态、下单、定序整体在测试环境里跑了起来。那么这个项目算不算写完了呢?
我个人觉得并没有。

  • 没有测试代码来验证整个工作流程确实生效。
  • 现在行情只做了深度,并且只是记录了日志,这离一个完整的行情服务还有不小的距离,我们还有一些有趣的实践值得尝试
  • 现在每次更新项目,我是纯手工处理,那么对于一个正式的Java项目,有没有一些自动化的措施可用呢?其实也是有的
不过现在这个时间,代码都已经可以假装自己长大成人一般的运行起来。可以花几篇讨论一下撮合和行情的逻辑了。甚至我想对于熟悉 Akka 和 Clojure 的工程师,抢在我前面完成那些工作,也是很正常的。
缘起 动笔写这个系列,始于上半年我还在火币的时候,那时刚从医院出来,戴着一个略蒸汽朋克的骨骼支具,跟现在主持火币撮合引擎的张淞老师有过一些交流。他认为撮合是个很紧凑很典型的逻辑,很适合当作练习项目,建议我们几个都自己实现一个撮合引擎试试。
所以我夏天在家里的时候,就基于 Akka 和 Clojure 以及 Java ,做了一些实验,从头用新的技术架构实现交易所相关的各种逻辑,现在演示的这个教学项目,有一些来自于六七月份的实验项目,这也是项目代码中有些冗余内容,不够精简的原因之一,为了我们这个教程不至于太过发散,我去掉了夏天实现的一部分逻辑,例如撮合引擎对开盘收盘和集合竞价的支持,这里我们假设它总是7x24小时工作。
撮合本身,用张淞老师的话说,应该是几百行逻辑能够实现的,当然这个语境在于火币的撮合引擎是个纯 Clojure 项目,有完整的上下游服务支撑。我自己写的这个引擎呢,因为想要尝试各种感兴趣的技术,写起来就没有那么紧凑,特别是 Java 代码的表达能力,比 Clojure 还是差不少。但是通过 Java ,我们使我们的项目有了一个高质量的分布式框架支撑,Akka 给这个项目架构带来的潜力,又是纯 Clojure 项目不能比的。总的来说,我个人觉得这次实验非常有收获,很开心,这也是现在拿出来跟大家分享这些东西的动力。
撮合逻辑 在有了周边代码的支撑之后,撮合节点的逻辑就非常简单了,我们要在actor里维护这样一组内存状态:
  • 交易对名 symbol
  • 最后一个处理的订单标识 latest order id
  • 卖单列表 asks
  • 买单列表 bids
当然我们前面的章节也说过,这里我们对交易做了简化,假设总是有足够的限价单 ,使得市价单永远都会被立刻完全成交,实际上虽然“盘口击穿”确实是交易所极力避免的事情,但是并非没有出现这种情况的可能,特别是数字加密货币市场本身就很小。如果从程序上能够处理这种情况,我们应该加入两个列表,专用于维护未完成的市价单,并且在新的订单进入时,优先(按下单时序)撮合。
但是支持这样的逻辑会带来一些新的复杂性,例如盘口的某个方向已经被击穿,此时对向的市价单进入时,按什么样的价格撮合两个市价单?在有足够市场深度的前提下,我们就不需要考虑这种问题,每个市价单只会与对手方向的市价单进行撮合,这使得整个逻辑简单清晰。
有兴趣的同行也可以尝试自行实现这个逻辑。
相对来说限价单的撮合实现要清晰一些,它总是尝试在对手方向成交,剩下的部分称为 dash 的一部分。
取消订单的操作,我们也设计为一个订单,这个订单我们称之为取消单,那么很明显,它只需要尝试从盘口寻找匹配的限价单并拿走未交易的部分。
现在我们看一下关于市价买单(market bid)、市价卖单(market ask ),限价买单(limit bid),限价卖单(limit ask)以及取消单(cancel)的逻辑:
private Trade trade(MarketAsk order) throws Exception { latestId = order.getId(); Trade re = createTrade(order); re.setTakerCategory("market-ask"); while (order.getSurplus() > 0) { Bid bid = bidList.getLast(); bid.trade(order).ifPresent(re::add); if (bid.getSurplus() == 0) { bidList.removeLast(); } } return re; }private Trade trade(MarketBid order) throws Exception { latestId = order.getId(); Trade re = createTrade(order); re.setTakerCategory("market-bid"); while (order.getSurplus() > 0) { Ask ask = askList.getLast(); ask.trade(order).ifPresent(re::add); if (ask.getSurplus() == 0) { askList.removeLast(); } }return re; }private Trade trade(LimitAsk order) throws Exception { latestId = order.getId(); Trade re = createTrade(order); re.setTakerCategory("limit-ask"); while (order.getSurplus() > 0 && !bidList.isEmpty()) { Bid bid = bidList.getLast(); if (bid.getPrice().compareTo(order.getPrice()) < 0) { break; }bid.trade (order).ifPresent(re::add); if (bid.getSurplus() == 0) { bidList.removeLast(); } }if (order.getSurplus() > 0) { askList.addLast(Ask.from(order)); }return re; }private Trade trade(LimitBid order) throws Exception { latestId = order.getId(); Trade re = createTrade(order); re.setTakerCategory("limit-bid"); while (order.getSurplus() > 0 && !askList.isEmpty()) { Ask ask = askList.getLast(); if (ask.getPrice().compareTo(order.getPrice()) > 0) { break; }ask.trade(order).ifPresent(re::add); if (ask.getSurplus() == 0) { askList.removeLast(); } } if (order.getSurplus() > 0) { bidList.addLast(Bid.from(order)); }return re; }private Trade trade(Cancel order) throws Exception { latestId = order.getId(); Trade re = createTrade(order); re.setTakerCategory("cancel"); Predicate checker = make -> { if (make.getOrderId() == order.getOrderId()) { TradeItem item = new TradeItem(); item.setPrice(make.getPrice()); item.setMakerId(make.getOrderId()); return true; } else { return false; } }; askList.removeIf(checker); bidList.removeIf(checker); return re; }

这个过程中都涉及了 create trade 方法,这个方法很简单:
private Trade createTrade(Order order) throws Exception { Trade re = new Trade(); re.setId(getNextId()); re.setTakerId(order.getId()); re.setSymbol(order.getSymbol()); return re; }

实际上,订单的消息响应调用是下面的逻辑:
... .match(LimitAsk.class, order -> { Trade t = trade(order); postTrade(t); }) .match(LimitBid.class, order -> { Trade t = trade(order); postTrade(t); }) .match(MarketAsk.class, order -> { Trade t = trade(order); postTrade(t); }) .match(MarketBid.class, order -> { Trade t = trade(order); postTrade(t); }) .match(Cancel.class, order -> { Trade t = trade(order); postTrade(t); }) ...

因为处理完撮合后,总是要保存交易单、应答、输出行情和获取下一个订单,所以我把它们封装到了 postTrade 方法,至于为什么不在 trade 里面调用 post trade,应该说这是历史的进程,体现出了个人的不努力,也就是懒。其实封装进去也没有什么不妥,它只是在不断的代码修改提炼过程中走到了现在的状态而已。
等待逻辑 在这个设计中,每当一个订单处理完,撮合引擎向柜台节点发next order消息,如果有下一个订单,节点会立刻收到并继续处理,否则柜台会返回一个 OrderNoMore 消息,此时撮合节点会等一个很短的时刻再重新发送 next order:
... .match(OrderNoMore.class, msg -> { Home("not more new order. Request next after {} ms", 100); context().system().scheduler().scheduleOnce(Duration.ofMillis(100), () -> { this.peekActor.tell(nextMessage(), self()); }, context().dispatcher()); }) ...

我们应该注意这样一个问题,就是如果用在实用场合,等待 100ms 可能过长了。这完全就是一个实验项目的作者拍脑袋想出来的,并没有什么深刻的自然规律。建议有实用需求的朋友自己完善这部分逻辑。
再就是这部分逻辑中其实没有处理 next order 发送失败的情况,而在分布式系统中这是非常正常的情况,我在自己家的无线网络中,从饭厅到客厅的 tcp 通信都会时不时丢包(严重的时候每十几分钟断几秒,这是我没有成为一个游戏主播/代练,仍然要尝试找到下一个开发工作的重要原因(此处是个笑话应有掌声))。在生产环境中我这些年对各家云服务厂商的使用体验来看,也必须要考虑通信失败的情况。最简单的策略是每次收到消息时刷新一个时间戳变量,并且在一个高频的定时任务中检查这个时间戳,如果超时就主动发送一个新的 next order 请求。后续的章节中我们应该会补完这部分,有兴趣的朋友也可以尝试自己实现一下。
启动逻辑 接下来我们讨论撮合引擎的一个比较有意思的逻辑:程序启动。
其实akka自己带了一个很棒的功能,就是 Akka Persistence [Akka Documentation] 。这个功能实现了 Akka Actor 的状态持久化和恢复。但是要掌握它本身就需要一些预备知识,在我们这个小项目中,从业务出发,把状态描述成业务类型和json进行管理,更为简单。在将来是否要改变状态管理方式,也在未定。
我们假定撮合节点启动时,处于loading状态,它先去向 status keeper 节点查询最近一次的状态,如果真的没有,那么状态管理节点负责构造一个初始状态返回给撮合。在完成加载之前,撮合节点拒绝接受其它工作,如果失败,它停下来等待人工干预:
... private AbstractActor.Receive loading = ReceiveBuilder.create() .match(StatusQuery.class, msg -> { sender().tell(new Loading(), self()); }) .match(DashStatus.class, status -> { Home("received status {}", status.getId()); this.askList.addAll(status.getAskList()); this.bidList.addAll(status.getBidList()); latestId = status.getLatestOrderId(); switch (status.getStatus()) { case "trading": this.getContext().become(this.trading); var message = new NextOrder(); message.setSymbol(this.symbol); message.setPositionId(this.latestId); Home("load finished and into trading, begin match after 3 seconds"); context().system().scheduler().scheduleOnce(Duration.ofSeconds(3), () -> { this.peekActor.tell(message, self()); }, context().dispatcher()); break; default: throw new LoadFailed(this.symbol, String.format("invalid status %s", status.getStatus())); } }) .matchAny(msg -> { sender().tell(new InLoading(this.symbol), self()); }).build(); ... public Receive createReceive() { statusActor.tell(new LoadStatus(), self()); return loading; }

因为遗留代码的问题,这里进入加载逻辑的时机并不严谨,放在构造函数里,跟next order 请求串在一起,通过 stream 控制发送,可能效果更好,我们在后续的章节中也许会补完这个逻辑。
接下来按说我应该要去写测试逻辑,但是在这之前,可能我会先处理一些辅助工具的实现,例如 Java Function 、Runnable 等类型的 Clojure 封装。虽然它们并没有那么酷,但是我下个工作可能就不再接触数字货币,读者朋友们也可能来自各种不同的行业背景,通用代码工具可能对我们反而会长久的提供帮助。
=======================
【java|java 子节点查找父节点_现代化的 Java (二十)——撮合节点的实现】项目代码在: MarchLiu/market

    推荐阅读