今天我做了一些整合工作,把撮合、行情、状态、下单、定序整体在测试环境里跑了起来。那么这个项目算不算写完了呢?
我个人觉得并没有。
- 没有测试代码来验证整个工作流程确实生效。
- 现在行情只做了深度,并且只是记录了日志,这离一个完整的行情服务还有不小的距离,我们还有一些有趣的实践值得尝试
- 现在每次更新项目,我是纯手工处理,那么对于一个正式的Java项目,有没有一些自动化的措施可用呢?其实也是有的
缘起 动笔写这个系列,始于上半年我还在火币的时候,那时刚从医院出来,戴着一个略蒸汽朋克的骨骼支具,跟现在主持火币撮合引擎的张淞老师有过一些交流。他认为撮合是个很紧凑很典型的逻辑,很适合当作练习项目,建议我们几个都自己实现一个撮合引擎试试。
所以我夏天在家里的时候,就基于 Akka 和 Clojure 以及 Java ,做了一些实验,从头用新的技术架构实现交易所相关的各种逻辑,现在演示的这个教学项目,有一些来自于六七月份的实验项目,这也是项目代码中有些冗余内容,不够精简的原因之一,为了我们这个教程不至于太过发散,我去掉了夏天实现的一部分逻辑,例如撮合引擎对开盘收盘和集合竞价的支持,这里我们假设它总是7x24小时工作。
撮合本身,用张淞老师的话说,应该是几百行逻辑能够实现的,当然这个语境在于火币的撮合引擎是个纯 Clojure 项目,有完整的上下游服务支撑。我自己写的这个引擎呢,因为想要尝试各种感兴趣的技术,写起来就没有那么紧凑,特别是 Java 代码的表达能力,比 Clojure 还是差不少。但是通过 Java ,我们使我们的项目有了一个高质量的分布式框架支撑,Akka 给这个项目架构带来的潜力,又是纯 Clojure 项目不能比的。总的来说,我个人觉得这次实验非常有收获,很开心,这也是现在拿出来跟大家分享这些东西的动力。
撮合逻辑 在有了周边代码的支撑之后,撮合节点的逻辑就非常简单了,我们要在actor里维护这样一组内存状态:
- 交易对名 symbol
- 最后一个处理的订单标识 latest order id
- 卖单列表 asks
- 买单列表 bids
但是支持这样的逻辑会带来一些新的复杂性,例如盘口的某个方向已经被击穿,此时对向的市价单进入时,按什么样的价格撮合两个市价单?在有足够市场深度的前提下,我们就不需要考虑这种问题,每个市价单只会与对手方向的市价单进行撮合,这使得整个逻辑简单清晰。
有兴趣的同行也可以尝试自行实现这个逻辑。
相对来说限价单的撮合实现要清晰一些,它总是尝试在对手方向成交,剩下的部分称为 dash 的一部分。
取消订单的操作,我们也设计为一个订单,这个订单我们称之为取消单,那么很明显,它只需要尝试从盘口寻找匹配的限价单并拿走未交易的部分。
现在我们看一下关于市价买单(market bid)、市价卖单(market ask ),限价买单(limit bid),限价卖单(limit ask)以及取消单(cancel)的逻辑:
private Trade trade(MarketAsk order) throws Exception {
latestId = order.getId();
Trade re = createTrade(order);
re.setTakerCategory("market-ask");
while (order.getSurplus() > 0) {
Bid bid = bidList.getLast();
bid.trade(order).ifPresent(re::add);
if (bid.getSurplus() == 0) {
bidList.removeLast();
}
}
return re;
}private Trade trade(MarketBid order) throws Exception {
latestId = order.getId();
Trade re = createTrade(order);
re.setTakerCategory("market-bid");
while (order.getSurplus() > 0) {
Ask ask = askList.getLast();
ask.trade(order).ifPresent(re::add);
if (ask.getSurplus() == 0) {
askList.removeLast();
}
}return re;
}private Trade trade(LimitAsk order) throws Exception {
latestId = order.getId();
Trade re = createTrade(order);
re.setTakerCategory("limit-ask");
while (order.getSurplus() > 0 && !bidList.isEmpty()) {
Bid bid = bidList.getLast();
if (bid.getPrice().compareTo(order.getPrice()) < 0) {
break;
}bid.trade (order).ifPresent(re::add);
if (bid.getSurplus() == 0) {
bidList.removeLast();
}
}if (order.getSurplus() > 0) {
askList.addLast(Ask.from(order));
}return re;
}private Trade trade(LimitBid order) throws Exception {
latestId = order.getId();
Trade re = createTrade(order);
re.setTakerCategory("limit-bid");
while (order.getSurplus() > 0 && !askList.isEmpty()) {
Ask ask = askList.getLast();
if (ask.getPrice().compareTo(order.getPrice()) > 0) {
break;
}ask.trade(order).ifPresent(re::add);
if (ask.getSurplus() == 0) {
askList.removeLast();
}
}
if (order.getSurplus() > 0) {
bidList.addLast(Bid.from(order));
}return re;
}private Trade trade(Cancel order) throws Exception {
latestId = order.getId();
Trade re = createTrade(order);
re.setTakerCategory("cancel");
Predicate checker = make -> {
if (make.getOrderId() == order.getOrderId()) {
TradeItem item = new TradeItem();
item.setPrice(make.getPrice());
item.setMakerId(make.getOrderId());
return true;
} else {
return false;
}
};
askList.removeIf(checker);
bidList.removeIf(checker);
return re;
}
这个过程中都涉及了 create trade 方法,这个方法很简单:
private Trade createTrade(Order order) throws Exception {
Trade re = new Trade();
re.setId(getNextId());
re.setTakerId(order.getId());
re.setSymbol(order.getSymbol());
return re;
}
实际上,订单的消息响应调用是下面的逻辑:
...
.match(LimitAsk.class, order -> {
Trade t = trade(order);
postTrade(t);
})
.match(LimitBid.class, order -> {
Trade t = trade(order);
postTrade(t);
})
.match(MarketAsk.class, order -> {
Trade t = trade(order);
postTrade(t);
})
.match(MarketBid.class, order -> {
Trade t = trade(order);
postTrade(t);
})
.match(Cancel.class, order -> {
Trade t = trade(order);
postTrade(t);
})
...
因为处理完撮合后,总是要保存交易单、应答、输出行情和获取下一个订单,所以我把它们封装到了 postTrade 方法,至于为什么不在 trade 里面调用 post trade,应该说这是历史的进程,体现出了个人的不努力,也就是懒。其实封装进去也没有什么不妥,它只是在不断的代码修改提炼过程中走到了现在的状态而已。
等待逻辑 在这个设计中,每当一个订单处理完,撮合引擎向柜台节点发next order消息,如果有下一个订单,节点会立刻收到并继续处理,否则柜台会返回一个 OrderNoMore 消息,此时撮合节点会等一个很短的时刻再重新发送 next order:
...
.match(OrderNoMore.class, msg -> {
Home("not more new order. Request next after {} ms", 100);
context().system().scheduler().scheduleOnce(Duration.ofMillis(100), () -> {
this.peekActor.tell(nextMessage(), self());
}, context().dispatcher());
})
...
我们应该注意这样一个问题,就是如果用在实用场合,等待 100ms 可能过长了。这完全就是一个实验项目的作者拍脑袋想出来的,并没有什么深刻的自然规律。建议有实用需求的朋友自己完善这部分逻辑。
再就是这部分逻辑中其实没有处理 next order 发送失败的情况,而在分布式系统中这是非常正常的情况,我在自己家的无线网络中,从饭厅到客厅的 tcp 通信都会时不时丢包(严重的时候每十几分钟断几秒,这是我没有成为一个游戏主播/代练,仍然要尝试找到下一个开发工作的重要原因(此处是个笑话应有掌声))。在生产环境中我这些年对各家云服务厂商的使用体验来看,也必须要考虑通信失败的情况。最简单的策略是每次收到消息时刷新一个时间戳变量,并且在一个高频的定时任务中检查这个时间戳,如果超时就主动发送一个新的 next order 请求。后续的章节中我们应该会补完这部分,有兴趣的朋友也可以尝试自己实现一下。
启动逻辑 接下来我们讨论撮合引擎的一个比较有意思的逻辑:程序启动。
其实akka自己带了一个很棒的功能,就是 Akka Persistence [Akka Documentation] 。这个功能实现了 Akka Actor 的状态持久化和恢复。但是要掌握它本身就需要一些预备知识,在我们这个小项目中,从业务出发,把状态描述成业务类型和json进行管理,更为简单。在将来是否要改变状态管理方式,也在未定。
我们假定撮合节点启动时,处于loading状态,它先去向 status keeper 节点查询最近一次的状态,如果真的没有,那么状态管理节点负责构造一个初始状态返回给撮合。在完成加载之前,撮合节点拒绝接受其它工作,如果失败,它停下来等待人工干预:
...
private AbstractActor.Receive loading = ReceiveBuilder.create()
.match(StatusQuery.class, msg -> {
sender().tell(new Loading(), self());
})
.match(DashStatus.class, status -> {
Home("received status {}", status.getId());
this.askList.addAll(status.getAskList());
this.bidList.addAll(status.getBidList());
latestId = status.getLatestOrderId();
switch (status.getStatus()) {
case "trading":
this.getContext().become(this.trading);
var message = new NextOrder();
message.setSymbol(this.symbol);
message.setPositionId(this.latestId);
Home("load finished and into trading, begin match after 3 seconds");
context().system().scheduler().scheduleOnce(Duration.ofSeconds(3), () -> {
this.peekActor.tell(message, self());
}, context().dispatcher());
break;
default:
throw new LoadFailed(this.symbol, String.format("invalid status %s", status.getStatus()));
}
})
.matchAny(msg -> {
sender().tell(new InLoading(this.symbol), self());
}).build();
...
public Receive createReceive() {
statusActor.tell(new LoadStatus(), self());
return loading;
}
因为遗留代码的问题,这里进入加载逻辑的时机并不严谨,放在构造函数里,跟next order 请求串在一起,通过 stream 控制发送,可能效果更好,我们在后续的章节中也许会补完这个逻辑。
接下来按说我应该要去写测试逻辑,但是在这之前,可能我会先处理一些辅助工具的实现,例如 Java Function 、Runnable 等类型的 Clojure 封装。虽然它们并没有那么酷,但是我下个工作可能就不再接触数字货币,读者朋友们也可能来自各种不同的行业背景,通用代码工具可能对我们反而会长久的提供帮助。
=======================
【java|java 子节点查找父节点_现代化的 Java (二十)——撮合节点的实现】项目代码在: MarchLiu/market
推荐阅读
- java|java list集合合并_JAVA List合并集合
- Java|风雨java路之【基础篇】——看看Set集合那点儿猫腻
- SpringBoot|快速从零搭建一个SpringBoot Web项目
- java|Java集合框架————Map集合(1)
- Spring|从零搭建SpringBoot脚手架与SpringCloud生态
- 大工篇|Java集合——HashMap源码
- 数据库|SpringBoot脚手架工程快速搭建
- Java|【Java笔记】一网打尽Java中的集合知识
- 毕业设计|SpringMVC+Vue项目疫情社区管理系统