本系列力求手把手教你怎样利用 QuickFix Java 搭建自己的 FIX 协议收法平台,以及其中的注意事项。
所有源码的地址(免费):
https://github.com/zongzhec/QuickFixPractise
这节我们讨论怎么搭建Acceptor端。
5. Acceptor端的搭建 Acceptor,也可以称作为 Server,就是一个集中处理的交易机器。客户端从各地发来交易或者行情请求后,由这些服务器端接收,并进一步送给决策端进行验证、决策,并由此服务器返回相应的结果。针对上节说到的常用消息类型,相对应的回复类型为:
- 行情请求(35=V)——行情报价回复(35=W);
- 新建订单(35=D)——订单回复(35=8)
- 撤销订单(35=F)——订单回复(35=8)或者拒绝(35=9)
结构框架如下:详细的在这里:https://github.com/zongzhec/QuickFixPractise/tree/master/FixAcceptor
文章图片
5.1. Property设置 设置跟客户端的差不多,但是一定要记住仔细区别客户端的设置,因为有的可以改,但是有的是固定格式甚至固定字段,改写之后会让程序找不到相应的配置。
#quickfix-server.properties
[default]
# 这些字段记得改成你的设置
FileStorePath=fileStore
SocketAcceptAddress=10.176.125.79
SocketAcceptPort=10003
SenderCompID=QUICKFIX_ACCEPTOR# 以下字段可以不改
ConnectionType=acceptor
HeartBtInt=30
#ReconnectInterval=10
FileLogPath=log
DataDictionary=src/main/resources/FIX44.modified.xml
ContinueInitializationOnError=Y
BeginString=FIX.4.4
StartTime=00:00:00
EndTime=23:00:00
ResetOnLogon=Y
ResetSeqNumFlag=Y
MaxMessagesInResendRequest=1
SocketReuseAddress=Y
UseDataDictionary=Y[session]
TargetCompID=QUICKFIX_INITIATOR1[session]
TargetCompID=QUICKFIX_INITIATOR2
以上设置中,ConnectionType=acceptor 是固定的,如果你是一不小心从Initiator那边复制的配置,一定要记得把这边的设置改掉,否则就是两个Initiator 启动之后找不到相应的Acceptor。
在这里你也同样可以配置多个[session],只要记住这里的TargetCompID 和Initiator 那边的SenderComID 对应即可。
5.2. FixAcceptor 和Initiator 一样,这个类也是程序的主入口(因此这意味着我们是要启动两个程序)。如果你又是不小心从Initiator那边复制过来的代码,记得把SocketInitiator initiator 改成 ThreadedSocketAcceptor acceptor,否则程序要么报错,说找不到server的setting (因为程序不认为你在配一个server),要么就是启动之后实际上是两个Initiator 在空跑。
package foo.zongzhe.quickfix.acceptor;
import quickfix.*;
public class FixAcceptor {
private static ThreadedSocketAcceptor acceptor;
private static SessionSettings settings;
private static FixAcceptorApplication application;
public static ThreadedSocketAcceptor getAcceptor() {
return acceptor;
}public FixAcceptor() {
try {
settings = new SessionSettings("src/main/resources/quickfix.properties");
} catch (ConfigError configError) {
System.out.println("Warning: config error!" + configError);
}application = new FixAcceptorApplication();
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
MessageFactory messageFactory = new DefaultMessageFactory();
// 不是quickfix.fix44.MessageFactory
try {
acceptor = new ThreadedSocketAcceptor(application, storeFactory, settings, logFactory, messageFactory);
} catch (ConfigError configError) {
System.out.println("Warning: config error! " + configError);
}
}private void startServer() {
try {
acceptor.start();
} catch (ConfigError configError) {
configError.printStackTrace();
}
}private void stopServer() {
acceptor.stop();
}public static void main(String[] args) {
FixAcceptor fixAcceptor = new FixAcceptor();
fixAcceptor.startServer();
// 启动一个Session,记得参考你的quickfix.properties设定
SessionID sessionID = new SessionID("FIX.4.4", "QUICKFIX_ACCEPTOR", "QUICKFIX_INITIATOR1");
while (true) {
// 等消息就行了
}
}}
5.3. FixAcceptorApplication 老样子,这里一方面要extends MessageCracker, 用来解析消息。另一方面要 implements Application,用来收法消息。我们主要需要注意的还是在extends MessageCracker的时候,重写你需要的消息方法,比如你要收取并解析NewOrderSingle ,那就要重写onMessage(NewOrderSingle message, SessionID sessionID)。
package foo.zongzhe.quickfix.acceptor;
import quickfix.*;
import quickfix.field.ExecID;
import quickfix.field.Symbol;
import quickfix.fix44.ExecutionReport;
import quickfix.fix44.MarketDataRequest;
import quickfix.fix44.MessageCracker;
import quickfix.fix44.NewOrderSingle;
import java.util.List;
public class FixAcceptorApplication extends MessageCracker implements Application {/// 以下是Application的固定七件套
@Override
public void onCreate(SessionID sessionId) {
System.out.println("onCreate is called");
}@Override
public void onLogon(SessionID sessionId) {
System.out.println("onLogon is called");
}@Override
public void onLogout(SessionID sessionId) {
System.out.println("onLogout is called");
}@Override
public void toAdmin(Message message, SessionID sessionId) {
System.out.println("toAdmin is called");
}@Override
public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
System.out.println("fromAdmin is called");
}@Override
public void toApp(Message message, SessionID sessionId) throws DoNotSend {
System.out.println("toApp is called");
}@Override
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
System.out.println("fromApp is called: " + message);
crack(message, sessionId);
}// 以下是你可以自定义的消息接收器,来自MessageCracker
@Override
public void onMessage(MarketDataRequest message, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
System.out.println("Received MarketDataRequest: " + message + ", sessionID: " + sessionID);
// 收都收了,解析一下
System.out.println("MDReqID: " + message.getMDReqID().getValue());
// 解析重复组
MarketDataRequest.NoRelatedSym symGroup = new MarketDataRequest.NoRelatedSym();
List groupList = message.getGroups(symGroup.getFieldTag());
System.out.println("Group size: " + groupList.size());
// Group Index 从 1 开始
for (int groupIndex = 1;
groupIndex <= groupList.size();
groupIndex++) {
message.getGroup(groupIndex, symGroup);
Symbol symbol = new Symbol();
symGroup.get(symbol);
System.out.println("Symbol in group" + groupIndex + ": " + symbol.getValue());
}
}@Override
public void onMessage(NewOrderSingle message, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
System.out.println("Received NewOrderSingle: " + message + ", sessionID: " + sessionID);
// 收都收了,解析一下
System.out.println(String.format("clOrderID: %s, symbol: %s, side: %s",
message.getClOrdID().getValue(),
message.getSymbol().getValue(),
message.getSide().getValue()));
// 返还一个订单回复
ExecutionReport executionReport = new ExecutionReport();
executionReport.set(message.getClOrdID());
executionReport.set(new ExecID("mockedExecID"));
executionReport.set(message.getSide());
executionReport.set(message.getSymbol());
try {
Session.sendToTarget(executionReport, sessionID);
} catch (SessionNotFound sessionNotFound) {
sessionNotFound.printStackTrace();
}
}}
你可能会问,我重写的onMessage 方法时什么时候调用的呢?
——答案在fromApp函数里面crack(message, sessionId)调用。
Initiator 和Acceptor 之间的传输,登入登出和心跳是Admin层面的,所以会进入“fromAdmin”,“toAdmin”方法。而消息的传输是在Appliocation层的,因此会进入“fromApp”,“toApp”方法。
因为我们这里是acceptor,是服务器,因此一上来肯定是收消息,所以我们要在“fromApp”里面调用crack 方法,来让QuickFIX 自动的去套用合适的解析方法。如果发来的是NewOrderSingle,它就会进入你重写了的onMessage方法。
问题1:QuickFIX怎么知道传过来的什么类型的消息呢?
回答:一切都在消息里面,35是消息类型,35=8就代表这是一个NewOrderSingle。
问题2:如果我没有重写对应的onMessage方法怎么办?
回答:那就正常接收,但是后面什么也不做。
问题3:一般建议重写哪些onMessage方法?
回答:除了上面说的三个基本的,和你实际业务需要的,我个人还建议你重写onMessage(Reject message, SessionID sessionID)和onMessage(BusinessMessageReject message, SessionID sessionID)。这两个是针对拒绝消息的处理。
一般如果你发的消息不对(字段确实或者数值有问题),对方会发送诸如35=3的拒绝消息,如果你不重写,它们就不会被处理,进而你就很难在茫茫的消息之海中发现是不是有问题。
5.4. 运行及结果 启动acceptor以后,你只能看到“onCreate is called”,因为暂时还没有initiator给它发消息。
文章图片
为什么看到两遍?因为在property里面我们配了两个session,就这么简单。
下节讲陆续启动Initiator 和Acceptor 之后,有什么现象,哪里看消息和log。下课。
【QuickFIX|QuickFix Java 讲解(四)服务器的搭建与解析】
推荐阅读
- QuickFIX|QuickFix Java 讲解(五)消息的收发与查看
- quickfix/N|quickfix/n 中文乱码
- 远程互动会议平台是什么()
- java框架|SpringBoot入门(九)数据访问
- Spring|Spring Boot(十一)--------整合Druid
- Java|SpringBoot2(五)(数据访问)
- 数据库|太强了!这款轻量级的数据库中间件完美解决了Spring Boot 中分库分表问题
- java|Springboot整合springcloud实现分布式服务 简单demo 完整示例
- 代码狂魔|工欲善其事必先利其器,IDEA必装插件!