QuickFIX|QuickFix Java 讲解(四)服务器的搭建与解析

本系列力求手把手教你怎样利用 QuickFix Java 搭建自己的 FIX 协议收法平台,以及其中的注意事项。
所有源码的地址(免费):
https://github.com/zongzhec/QuickFixPractise

这节我们讨论怎么搭建Acceptor端。
5. Acceptor端的搭建 Acceptor,也可以称作为 Server,就是一个集中处理的交易机器。客户端从各地发来交易或者行情请求后,由这些服务器端接收,并进一步送给决策端进行验证、决策,并由此服务器返回相应的结果。针对上节说到的常用消息类型,相对应的回复类型为:

  • 行情请求(35=V)——行情报价回复(35=W);
  • 新建订单(35=D)——订单回复(35=8)
  • 撤销订单(35=F)——订单回复(35=8)或者拒绝(35=9)
通过上节,我们知道客户端的搭建需要两个类,一个类负责初始化、启动和关停服务;另一个类负责服务,即收发消息。这里其实也一毛一样,就是名字和相关配置略有区别。第一个类起名叫FixAcceptor,第二个类叫FixAcceptorApplication。
结构框架如下:详细的在这里:https://github.com/zongzhec/QuickFixPractise/tree/master/FixAcceptor
QuickFIX|QuickFix Java 讲解(四)服务器的搭建与解析
文章图片


5.1. Property设置 设置跟客户端的差不多,但是一定要记住仔细区别客户端的设置,因为有的可以改,但是有的是固定格式甚至固定字段,改写之后会让程序找不到相应的配置。
#quickfix-server.properties [default] # 这些字段记得改成你的设置 FileStorePath=fileStore SocketAcceptAddress=10.176.125.79 SocketAcceptPort=10003 SenderCompID=QUICKFIX_ACCEPTOR# 以下字段可以不改 ConnectionType=acceptor HeartBtInt=30 #ReconnectInterval=10 FileLogPath=log DataDictionary=src/main/resources/FIX44.modified.xml ContinueInitializationOnError=Y BeginString=FIX.4.4 StartTime=00:00:00 EndTime=23:00:00 ResetOnLogon=Y ResetSeqNumFlag=Y MaxMessagesInResendRequest=1 SocketReuseAddress=Y UseDataDictionary=Y[session] TargetCompID=QUICKFIX_INITIATOR1[session] TargetCompID=QUICKFIX_INITIATOR2

以上设置中,ConnectionType=acceptor 是固定的,如果你是一不小心从Initiator那边复制的配置,一定要记得把这边的设置改掉,否则就是两个Initiator 启动之后找不到相应的Acceptor。
在这里你也同样可以配置多个[session],只要记住这里的TargetCompID 和Initiator 那边的SenderComID 对应即可。

5.2. FixAcceptor 和Initiator 一样,这个类也是程序的主入口(因此这意味着我们是要启动两个程序)。如果你又是不小心从Initiator那边复制过来的代码,记得把SocketInitiator initiator 改成 ThreadedSocketAcceptor acceptor,否则程序要么报错,说找不到server的setting (因为程序不认为你在配一个server),要么就是启动之后实际上是两个Initiator 在空跑。
package foo.zongzhe.quickfix.acceptor; import quickfix.*; public class FixAcceptor { private static ThreadedSocketAcceptor acceptor; private static SessionSettings settings; private static FixAcceptorApplication application; public static ThreadedSocketAcceptor getAcceptor() { return acceptor; }public FixAcceptor() { try { settings = new SessionSettings("src/main/resources/quickfix.properties"); } catch (ConfigError configError) { System.out.println("Warning: config error!" + configError); }application = new FixAcceptorApplication(); MessageStoreFactory storeFactory = new FileStoreFactory(settings); LogFactory logFactory = new FileLogFactory(settings); MessageFactory messageFactory = new DefaultMessageFactory(); // 不是quickfix.fix44.MessageFactory try { acceptor = new ThreadedSocketAcceptor(application, storeFactory, settings, logFactory, messageFactory); } catch (ConfigError configError) { System.out.println("Warning: config error! " + configError); } }private void startServer() { try { acceptor.start(); } catch (ConfigError configError) { configError.printStackTrace(); } }private void stopServer() { acceptor.stop(); }public static void main(String[] args) { FixAcceptor fixAcceptor = new FixAcceptor(); fixAcceptor.startServer(); // 启动一个Session,记得参考你的quickfix.properties设定 SessionID sessionID = new SessionID("FIX.4.4", "QUICKFIX_ACCEPTOR", "QUICKFIX_INITIATOR1"); while (true) { // 等消息就行了 } }}


5.3. FixAcceptorApplication 老样子,这里一方面要extends MessageCracker, 用来解析消息。另一方面要 implements Application,用来收法消息。我们主要需要注意的还是在extends MessageCracker的时候,重写你需要的消息方法,比如你要收取并解析NewOrderSingle ,那就要重写onMessage(NewOrderSingle message, SessionID sessionID)。
package foo.zongzhe.quickfix.acceptor; import quickfix.*; import quickfix.field.ExecID; import quickfix.field.Symbol; import quickfix.fix44.ExecutionReport; import quickfix.fix44.MarketDataRequest; import quickfix.fix44.MessageCracker; import quickfix.fix44.NewOrderSingle; import java.util.List; public class FixAcceptorApplication extends MessageCracker implements Application {/// 以下是Application的固定七件套 @Override public void onCreate(SessionID sessionId) { System.out.println("onCreate is called"); }@Override public void onLogon(SessionID sessionId) { System.out.println("onLogon is called"); }@Override public void onLogout(SessionID sessionId) { System.out.println("onLogout is called"); }@Override public void toAdmin(Message message, SessionID sessionId) { System.out.println("toAdmin is called"); }@Override public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { System.out.println("fromAdmin is called"); }@Override public void toApp(Message message, SessionID sessionId) throws DoNotSend { System.out.println("toApp is called"); }@Override public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType { System.out.println("fromApp is called: " + message); crack(message, sessionId); }// 以下是你可以自定义的消息接收器,来自MessageCracker @Override public void onMessage(MarketDataRequest message, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue { System.out.println("Received MarketDataRequest: " + message + ", sessionID: " + sessionID); // 收都收了,解析一下 System.out.println("MDReqID: " + message.getMDReqID().getValue()); // 解析重复组 MarketDataRequest.NoRelatedSym symGroup = new MarketDataRequest.NoRelatedSym(); List groupList = message.getGroups(symGroup.getFieldTag()); System.out.println("Group size: " + groupList.size()); // Group Index 从 1 开始 for (int groupIndex = 1; groupIndex <= groupList.size(); groupIndex++) { message.getGroup(groupIndex, symGroup); Symbol symbol = new Symbol(); symGroup.get(symbol); System.out.println("Symbol in group" + groupIndex + ": " + symbol.getValue()); } }@Override public void onMessage(NewOrderSingle message, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue { System.out.println("Received NewOrderSingle: " + message + ", sessionID: " + sessionID); // 收都收了,解析一下 System.out.println(String.format("clOrderID: %s, symbol: %s, side: %s", message.getClOrdID().getValue(), message.getSymbol().getValue(), message.getSide().getValue())); // 返还一个订单回复 ExecutionReport executionReport = new ExecutionReport(); executionReport.set(message.getClOrdID()); executionReport.set(new ExecID("mockedExecID")); executionReport.set(message.getSide()); executionReport.set(message.getSymbol()); try { Session.sendToTarget(executionReport, sessionID); } catch (SessionNotFound sessionNotFound) { sessionNotFound.printStackTrace(); } }}

你可能会问,我重写的onMessage 方法时什么时候调用的呢?
——答案在fromApp函数里面crack(message, sessionId)调用。
Initiator 和Acceptor 之间的传输,登入登出和心跳是Admin层面的,所以会进入“fromAdmin”,“toAdmin”方法。而消息的传输是在Appliocation层的,因此会进入“fromApp”,“toApp”方法。
因为我们这里是acceptor,是服务器,因此一上来肯定是收消息,所以我们要在“fromApp”里面调用crack 方法,来让QuickFIX 自动的去套用合适的解析方法。如果发来的是NewOrderSingle,它就会进入你重写了的onMessage方法。

问题1:QuickFIX怎么知道传过来的什么类型的消息呢?
回答:一切都在消息里面,35是消息类型,35=8就代表这是一个NewOrderSingle。

问题2:如果我没有重写对应的onMessage方法怎么办?
回答:那就正常接收,但是后面什么也不做。

问题3:一般建议重写哪些onMessage方法?
回答:除了上面说的三个基本的,和你实际业务需要的,我个人还建议你重写onMessage(Reject message, SessionID sessionID)和onMessage(BusinessMessageReject message, SessionID sessionID)。这两个是针对拒绝消息的处理。
一般如果你发的消息不对(字段确实或者数值有问题),对方会发送诸如35=3的拒绝消息,如果你不重写,它们就不会被处理,进而你就很难在茫茫的消息之海中发现是不是有问题。

5.4. 运行及结果 启动acceptor以后,你只能看到“onCreate is called”,因为暂时还没有initiator给它发消息。
QuickFIX|QuickFix Java 讲解(四)服务器的搭建与解析
文章图片

为什么看到两遍?因为在property里面我们配了两个session,就这么简单。

下节讲陆续启动Initiator 和Acceptor 之后,有什么现象,哪里看消息和log。下课。







【QuickFIX|QuickFix Java 讲解(四)服务器的搭建与解析】

    推荐阅读