CTP开发(2)行情模块的开发
我在做CTP开发之前,也参考了不少其他的资料,发现他们都是把行情和交易做在同一个工程里的。我呢之前也做过期货相关的交易平台,感觉这种把行情和交易做在一起的方法缺乏可扩展性。比如我开了多个CTP账户,要同时交易,这种做在一起的方法就很难实现;另外,如果我还要接入其他的交易所,该怎么接?
下面是我的软件架构图:
软件架构图
为了便于说明CTP行情、交易的开发,这个架构图是最初始版的。以后会在这个基础上进行深化,开发成可以实战的可扩展的交易平台。
理论上,1个CTP账号可以在6个地方进行多点登录;另外在实战中,有的客户会有多个CTP账号同时进行量化交易。所以,这个软件架构图是能满足多CTP账号同时交易的要求的。
1、类文件说明
(1)CTPMdSpi.h、CTPMdSpi.cpp
继承CThostFtdcMdSpi类,里面实现所有的回调函数,是最重要的类。
(2)MarketL1Core.h、MarketL1Core.cpp
主类,里面包含着程序调用的主要逻辑关系。
(3)Main.cpp
写main()函数的类。
2、代码说明
(1)CTPMdSpi.h
#pragma once
#include <string>
#include <vector>
#include "ctp/ThostFtdcMdApi.h"
class CTPMdSpi : public CThostFtdcMdSpi
{
public:
CTPMdSpi() = default;
~CTPMdSpi() = default;
///当客户端与交易后台建立起通信连接时(还未登录前),该方法被调用。
virtual void OnFrontConnected();
///当客户端与交易后台通信连接断开时,该方法被调用。当发生这个情况后,API会自动重新连接,客户端可不做处理。
///@param nReason 错误原因
/// 0x1001 网络读失败
/// 0x1002 网络写失败
/// 0x2001 接收心跳超时
/// 0x2002 发送心跳失败
/// 0x2003 收到错误报文
virtual void OnFrontDisconnected(int nReason);
///心跳超时警告。当长时间未收到报文时,该方法被调用。
///@param nTimeLapse 距离上次接收报文的时间
virtual void OnHeartBeatWarning(int nTimeLapse);
///登录请求响应
virtual void OnRspUserLogin(CThostFtdcRspUserLoginField *pRspUserLogin,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///登出请求响应
virtual void OnRspUserLogout(CThostFtdcUserLogoutField *pUserLogout,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///请求查询组播合约响应
virtual void OnRspQryMulticastInstrument(CThostFtdcMulticastInstrumentField *pMulticastInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///错误应答
virtual void OnRspError(CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
///订阅行情应答
virtual void OnRspSubMarketData(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///取消订阅行情应答
virtual void OnRspUnSubMarketData(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///订阅询价应答
virtual void OnRspSubForQuoteRsp(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///取消订阅询价应答
virtual void OnRspUnSubForQuoteRsp(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast);
///深度行情通知
virtual void OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketData);
///询价通知
virtual void OnRtnForQuoteRsp(CThostFtdcForQuoteRspField *pForQuoteRsp);
private:
void ReqUserLogin();
void SubscribeMarketData();
bool IsErrorRspInfo(CThostFtdcRspInfoField *pRspInfo);
private:
static int iRequestID;
};
(2)CTPMdSpi.cpp
#include <iostream>
#include <fstream>
#include <string.h>
#include <chrono>
#include <thread>
#include "CTPMdSpi.h"
using namespace std;
int CTPMdSpi::iRequestID = 0;
extern CThostFtdcMdApi *g_pMdApi;
extern std::string g_brokerId;
extern std::string g_userId;
extern std::string g_password;
extern std::vector<std::string> *g_pInstrumentIds;
//----------------------------- private methods ------------------------------
void CTPMdSpi::ReqUserLogin(){
std::cout<<"ReqUserLogin g_brokerId:"<<g_brokerId<<" ,g_userId:"<<g_userId<<", g_password"<<g_password<<std::endl;
CThostFtdcReqUserLoginField req;
memset(&req, 0, sizeof(req));
strcpy(req.BrokerID, g_brokerId.c_str());
strcpy(req.UserID, g_userId.c_str());
strcpy(req.Password, g_password.c_str());
int iResult = g_pMdApi->ReqUserLogin(&req, ++iRequestID);
cerr << "--->>> Req user Lgoin: " << ((iResult == 0) ? "Success" : "Fail") << endl;
}
void CTPMdSpi::SubscribeMarketData(){
int len = g_pInstrumentIds->size();
cout<<"SubscribeMarketData len:"<<len<<endl;
char** ppInstrumentID = new char*[len];
int index = 0;
vector<string>::iterator it;
for (it = g_pInstrumentIds->begin(); it != g_pInstrumentIds->end(); it++){
*(ppInstrumentID + index) = (char*)it->c_str();
std::cout<<"InstrumentID index["<<index<<"] : "<<*(ppInstrumentID + index)<<endl;
index ++;
}
int iResult0 = g_pMdApi->UnSubscribeMarketData(ppInstrumentID, len);
cerr << "--->>> UnSubscribeMarketData: " << ((iResult0 == 0) ? "Success" : "Fail") << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
int iResult = g_pMdApi->SubscribeMarketData(ppInstrumentID, len);
cerr << "--->>> SubscribeMarketData: " << ((iResult == 0) ? "Success" : "Fail") << endl;
delete[] ppInstrumentID;
}
bool CTPMdSpi::IsErrorRspInfo(CThostFtdcRspInfoField *pRspInfo){
// 如果ErrorID != 0, 说明收到了错误的响应
bool bResult = ((pRspInfo) && (pRspInfo->ErrorID != 0));
if (bResult)
cerr << "--->>> ErrorID=" << pRspInfo->ErrorID << ", ErrorMsg=" << pRspInfo->ErrorMsg << endl;
return bResult;
}
//----------------------------- public methods -------------------------------
///当客户端与交易后台建立起通信连接时(还未登录前),该方法被调用。
void CTPMdSpi::OnFrontConnected(){
std::cout << "=====FrontConnected Success=====" << std::endl;
ReqUserLogin();
}
///当客户端与交易后台通信连接断开时,该方法被调用。当发生这个情况后,API会自动重新连接,客户端可不做处理。
///@param nReason 错误原因
/// 0x1001 网络读失败
/// 0x1002 网络写失败
/// 0x2001 接收心跳超时
/// 0x2002 发送心跳失败
/// 0x2003 收到错误报文
void CTPMdSpi::OnFrontDisconnected(int nReason){
std::cerr << "=====FrontDisconnected=====" << std::endl;
std::cerr << "ErrorCode: " << nReason << std::endl;
}
///心跳超时警告。当长时间未收到报文时,该方法被调用。
///@param nTimeLapse 距离上次接收报文的时间
void CTPMdSpi::OnHeartBeatWarning(int nTimeLapse){
std::cerr << "=====HeartBeat Timeout=====" << std::endl;
std::cerr << "Timeout lap: " << nTimeLapse << std::endl;
}
///登录请求响应
void CTPMdSpi::OnRspUserLogin(CThostFtdcRspUserLoginField *pRspUserLogin,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
if (bIsLast && !IsErrorRspInfo(pRspInfo)){
std::cout << "=====OnRspUserLogin success=====" << std::endl;
std::cout << "TradingDay: " << pRspUserLogin->TradingDay << std::endl;
std::cout << "LoginTime: " << pRspUserLogin->LoginTime << std::endl;
std::cout << "BrokerID: " << pRspUserLogin->BrokerID << std::endl;
std::cout << "UserID: " << pRspUserLogin->UserID << std::endl;
///获取当前交易日
cerr << "--->>> GetTradingDay: " << g_pMdApi->GetTradingDay() << endl;
// 请求订阅行情
SubscribeMarketData();
}
}
///登出请求响应
void CTPMdSpi::OnRspUserLogout(CThostFtdcUserLogoutField *pUserLogout,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
}
///请求查询组播合约响应
void CTPMdSpi::OnRspQryMulticastInstrument(CThostFtdcMulticastInstrumentField *pMulticastInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
}
///错误应答
void CTPMdSpi::OnRspError(CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast){
IsErrorRspInfo(pRspInfo);
}
///订阅行情应答
void CTPMdSpi::OnRspSubMarketData(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
if (bIsLast && !IsErrorRspInfo(pRspInfo)){
cout<< "--->>> OnRspSubMarketData: " <<pSpecificInstrument->InstrumentID<<" Success"<<endl;
char filePath[100] = { '\0' };
sprintf(filePath, ".//sql_data//%s_market_data.csv", pSpecificInstrument->InstrumentID);
std::ofstream outFile;
outFile.open(filePath, std::ios::out); // 新开文件
outFile << "InstrumentID" << ","
<< "UpdateTime" << ","
<< "LastPrice" << ","
<< "Volume" << ","
<< "BidPrice1" << ","
<< "BidVolume1" << ","
<< "AskPrice1" << ","
<< "AskVolume1" << ","
<< "OpenInterest" << ","
<< "Turnover"
<< std::endl;
outFile.close();
}
}
///取消订阅行情应答
void CTPMdSpi::OnRspUnSubMarketData(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
}
///订阅询价应答
void CTPMdSpi::OnRspSubForQuoteRsp(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
}
///取消订阅询价应答
void CTPMdSpi::OnRspUnSubForQuoteRsp(CThostFtdcSpecificInstrumentField *pSpecificInstrument,
CThostFtdcRspInfoField *pRspInfo,
int nRequestID,
bool bIsLast){
}
///深度行情通知
void CTPMdSpi::OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketData){
// 生成5档盘口和成交数据
std::cout << "=====OnRtnDepthMarketData=====" << std::endl;
std::cout << "TradingDay: " << pDepthMarketData->TradingDay << std::endl;
std::cout << "ExchangeID: " << pDepthMarketData->ExchangeID << std::endl;
std::cout << "InstrumentID: " << pDepthMarketData->InstrumentID << std::endl;
std::cout << "ExchangeInstID: " << pDepthMarketData->ExchangeInstID << std::endl; //合约在交易所的代码
std::cout << "LastPrice: " << pDepthMarketData->LastPrice << std::endl;
std::cout << "Volume: " << pDepthMarketData->Volume << std::endl;
//生成Tick数据
// 将行情写入到csv文件中
// 如果只获取某一个合约行情,可以逐tick地存入文件或数据库
char filePath[100] = { '\0' };
sprintf(filePath, ".//sql_data//%s_market_data.csv", pDepthMarketData->InstrumentID);
//sprintf(filePath, "%s_market_data.csv", pDepthMarketData->InstrumentID);
std::ofstream outFile;
outFile.open(filePath, std::ios::app); // 文件追加写入
outFile << pDepthMarketData->InstrumentID << ","
<< pDepthMarketData->UpdateTime << "." << pDepthMarketData->UpdateMillisec << ","
<< pDepthMarketData->LastPrice << ","
<< pDepthMarketData->Volume << ","
<< pDepthMarketData->BidPrice1 << ","
<< pDepthMarketData->BidVolume1 << ","
<< pDepthMarketData->AskPrice1 << ","
<< pDepthMarketData->AskVolume1 << ","
<< pDepthMarketData->OpenInterest << ","
<< pDepthMarketData->Turnover << std::endl;
outFile.close();
}
///询价通知
void CTPMdSpi::OnRtnForQuoteRsp(CThostFtdcForQuoteRspField *pForQuoteRsp){
}
(3)MarketL1Core.h
#pragma once
#include <string>
#include <vector>
#include "CTPMdSpi.h"
class MarketL1Core
{
public:
MarketL1Core() = default;
~MarketL1Core();
bool start();
bool stop();
//启动客户端监听
bool startListenClient();
private:
bool checkConfig();
//连接上手服务器(CTP)
bool connectUpServer();
private:
std::string serviceName;
std::string serviceIp;
int servicePort;
std::string CTP_Address1; //主地址
std::string CTP_Address2; //备地址
std::vector<std::string> instrumentIds;
CThostFtdcMdSpi *pMdUserSpi = new CTPMdSpi;
};
(4)MarketL1Core.cpp
#include <iostream>
#include "MarketL1Core.h"
#include "com/xini_file.h"
#include "Util.h"
using namespace std;
CThostFtdcMdApi *g_pMdApi;
std::string g_brokerId;
std::string g_userId;
std::string g_password;
std::vector<std::string> *g_pInstrumentIds;
//---------------------------------- private methods --------------------------------
bool MarketL1Core::checkConfig(){
CTP_Address1 = "tcp://218.202.237.33:10213";
CTP_Address2 = "";
g_brokerId = "9999";
g_userId = "xxx"; //填入你自己申请到的账号、密码
g_password = "xxx"; //
string ids = "sc2302|sc2403|bu2306|IO2303-C-3900|ag2302|WH301"; //需要订阅的行情
instrumentIds = Util::split(ids, "|");
g_pInstrumentIds = &instrumentIds;
cout<<"CTP_Address1:"<<CTP_Address1<<", instrumentIds:"<<ids<<endl;
}
//连接上手服务器(CTP)
bool MarketL1Core::connectUpServer(){
// 初始化UserApi
g_pMdApi = CThostFtdcMdApi::CreateFtdcMdApi("./market_data/", true);
g_pMdApi->RegisterSpi(pMdUserSpi); // 注册事件类
g_pMdApi->RegisterFront((char*)CTP_Address1.c_str()); // connect 优先行情地址
//pMdApi->RegisterFront(); // connect 备用行情地址,1B断开,自动连接2B地址
g_pMdApi->Init();
std::cout<<"Version:"<<g_pMdApi->GetApiVersion()<<std::endl;
}
//---------------------------------- public methods ---------------------------------
MarketL1Core::~MarketL1Core(){
if(g_pMdApi){
g_pMdApi->Join();
g_pMdApi->Release();
}
if(pMdUserSpi){
delete pMdUserSpi;
pMdUserSpi = nullptr;
}
}
bool MarketL1Core::start(){
// 初始化log
// 读取配置
checkConfig();
connectUpServer();
}
bool MarketL1Core::stop(){
}
(5)split()方法
class Util
{
public:
static std::vector<std::string> split(std::string str, std::string pattern)
{
std::string::size_type pos;
std::vector<std::string> result;
str += pattern;//扩展字符串以方便操作
int size = str.size();
for (int i = 0; i < size; i++)
{
pos = str.find(pattern, i);
if (pos < size)
{
std::string s = str.substr(i, pos - i);
result.push_back(s);
i = pos + pattern.size() - 1;
}
}
return result;
}
};
(6)main()方法
#include "MarketL1Core.h"
int main(int argc,char *argv[])
{
MarketL1Core core;
core.start();
}
好了,基本步骤就这些。最后在Unix操作系统上make通过后,就可以运行了!