基础函数使用实例
本案例演示如何使用WindPy接口提取数据。代码示例如下:
from WindPy import w
w.start()
# 命令如何写可以用命令生成器来辅助完成
# 定义打印输出函数,用来展示数据使用
def printpy(outdata):
if outdata.ErrorCode!=0:
print('error code:'+str(outdata.ErrorCode)+'\n')
return()
for i in range(0,len(outdata.Data[0])):
strTemp=''
if len(outdata.Times)>1:
strTemp=str(outdata.Times[i])+' '
for k in range(0, len(outdata.Fields)):
strTemp=strTemp+str(outdata.Data[k][i])+' '
print(strTemp)
# 通过wsd来提取时间序列数据,比如取开高低收成交量,成交额数据
print('\n\n'+'-----通过wsd来提取时间序列数据,比如取开高低收成交量,成交额数据-----'+'\n')
wsddata1=w.wsd("000001.SZ", "open,high,low,close,volume,amt", "2015-11-22", "2015-12-22", "Fill=Previous")
printpy(wsddata1)
# 通过wsd来提取各个报告期财务数据
print('\n\n'+'-----通过wsd来提取各个报告期财务数据-----'+'\n')
wsddata2=w.wsd("600000.SH", "tot_oper_rev,tot_oper_cost,opprofit,net_profit_is", "2008-01-01", "2015-12-22", "rptType=1;Period=Q;Days=Alldays;Fill=Previous")
printpy(wsddata2)
# 通过wss来取截面数据
print('\n\n'+'-----通过wss来取截面数据-----'+'\n')
wssdata=w.wss("600000.SH,600007.SH,600016.SH", "ev,total_shares","tradeDate=20151222;industryType=1")
printpy(wssdata)
# 通过wst来取日内成交数据,最新7日内数据
print('\n\n'+'-----通过wst来取日内成交数据-----'+'\n')
wstdata=w.wst("IF.CFE", "last,volume", "2019-4-2 09:00:00", "2019-4-2 14:04:45")
printpy(wstdata)
# 通过wsi来取日内分钟数据,三年内数据
print('\n\n'+'-----通过wsi来取日内分钟数据-----'+'\n')
wsidata=w.wsi("IF.CFE", "open,high,low,close,volume,amt", "2018-12-22 09:00:00", "2018-12-22 14:06:15")
printpy(wsidata)
# 通过wset来取数据集数据
print('\n\n'+'-----通过wset来取数据集数据,获取沪深300指数权重-----'+'\n')
wsetdata=w.wset("IndexConstituent","date=20151222;windcode=000300.SH;field=date,wind_code,i_weight")
printpy(wsetdata)
输出DataFrame格式
本案例演示如何以DataFrame格式输出数据。在测试以下案例前,请确保已经安装了Pandas。
案例1. 输出WindData对象并转化为Pandas格式
# 案例1. 输出WindData对象并转化为Pandas格式
'''本案例以WSD接口为例,演示如何将输出的WindData转化为DataFrame。
WSD接口返回的WindData对象有以下几个字段:
ErrorCode
Codes
Fields
Times
Data
其中,Data为返回的数据,Fields和Times分别为获取的指标名称和日期序列。
'''
from WindPy import w
import pandas as pd
import datetime
w.start()
# 取数据的命令如何写可以用命令生成器来辅助完成
wsd_data=w.wsd("000001.SZ", "open,high,low,close", "2015-12-10", "2015-12-22", "Fill=Previous")
if wsd_data.ErrorCode == 0:
#演示如何将api返回的数据装入Pandas的Series
open=pd.Series(wsd_data.Data[0])
high=pd.Series(wsd_data.Data[1])
low=pd.Series(wsd_data.Data[2])
close=pd.Series(wsd_data.Data[3])
print('open:/n',open)
print('high:/n',high)
print('low:/n',low)
print('close:/n',close)
#演示如何将api返回的数据装入Pandas的DataFrame
fm=pd.DataFrame(wsd_data.Data,index=wsd_data.Fields,columns=wsd_data.Times)
fm=fm.T #将矩阵转置
print('fm:/n',fm)
else:
print("Error Code:", wsd_data.ErrorCode)
print("Error Message:", wsd_data.Data[0][0])
案例2. 量化接口直接输出DataFrame
# 案例2. 量化接口直接输出DataFrame
'''
通过在参数表中增加usedf=True,WindPy支持直接输出DataFrame格式。此时,WindPy函数返回错误码和提取的数据。其中数据的格式为DataFrame。
以下以WSD为例,展示如何使用WindPy函数以DataFrame格式提取数据:
'''
from WindPy import w
import pandas as pd
import datetime
w.start()
error_code, wsd_data=w.wsd("000001.SZ", "open,high,low,close", "2015-12-10", "2015-12-22", "Fill=Previous", usedf=True)
if error_code == 0:
print(wsd_data)
else:
print("Error Code:", error_code)
print("Error Message:", wsd_data.iloc[0, 0])
订阅实时行情
案例1. 订阅实时行情,并存储到硬盘中
# 案例1. 订阅实时行情,并存储到硬盘中
'''本案例演示如何通过WSQ函数订阅实时行情数据,并存储到硬盘中。示例代码分为两个部分,一部分是用WSQ订阅所需的行情指标,另一部分定义了回调函数,用于处理实时推送的行情数据。在运行示例代码后,程序会一致运行。如果需要停止运行,可以输入"q"结束订阅并保存文件。
以下为示例Python代码:
'''
from WindPy import w
import os
def myCallback(indata: w.WindData):
"""Callback function for WSQ
params
------
indata: WindData, accepts the received market quotation data. WindData has the following fields:
.ErrorCode: error code, if it is 0, the code runs successfully
.StateCode: state code. No need to process it.
.RequestID: save the request ID of WSQ request
.Codes: wind code of the securities
.Fields: fields for the received data
.Times: local time rather than the corresponding time for the recieved data
.Data: market quotation data
"""
print(indata)
if indata.ErrorCode!=0:
print('error code:'+str(indata.ErrorCode)+'\n')
return()
global begintime
lastvalue =""
for k in range(0,len(indata.Fields)):
if(indata.Fields[k] == "RT_TIME"):
begintime = indata.Data[k][0]
if(indata.Fields[k] == "RT_LAST"):
lastvalue = str(indata.Data[k][0])
string = str(begintime) + " " + lastvalue +"\n"
pf.writelines(string)
print(string)
pf.flush()
start_ret = w.start()
if start_ret.ErrorCode != 0:
print("Start failed")
print("Error Code:", start_ret.ErrorCode)
print("Error Message:", start_ret.Data[0])
else:
# Open a file to write.
pf = open('pywsqdataif.data', 'w')
# Subscribe market quotation data
wsq_ret = w.wsq("CN.SG","rt_time,rt_last",func=myCallback)
if wsq_ret.ErrorCode != 0:
print("Error Code:", wsq_ret.ErrorCode)
ext = ''
while ext != 'q':
ext = input('Enter "q" to exit')
w.cancelRequest(0)
pf.close()
案例2. 订阅实时行情,并在界面中展示
# 案例2. 订阅实时行情,并在界面中展示
'''本案例将演示如何通过WSQ订阅实时行情,同时通过PyQt模块在界面中展示所订阅的两个品种的现价、盘口报价、成交量及部分订阅指标差值的信息。
详细的代码可以从本案例的附件中下载。以下只展示主脚本的代码:
'''
# quotedlg.py
from PyQt5.Qt import *
from PyQt5.QtCore import pyqtSlot as Slot
from WindPy import w
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
import globaldef
import ui_quote
import wsq
w.start()
MAC = True
try:
from PyQt5.QtGui import qt_mac_set_native_menubar
except ImportError:
MAC = False
class QuoteDlg(QDialog, ui_quote.Ui_Dialog):
def __init__(self, parent=None):
super(QuoteDlg, self).__init__(parent)
self.setupUi(self)
self.sec1Edit.setFocus()
self.setWindowTitle("Wind API Demo---WSQ Subscrib")
self.updateUi()
self.initGraph()
def initGraph(self):
self.scene = QGraphicsScene()
self.dr = Figure_Canvas()
self.scene.addWidget(self.dr)
self.graphicsView.setScene(self.scene)
@Slot()
def on_subscribeButton_clicked(self):
self.subscribeButton.setEnabled(False)
self.cancelButton.setEnabled(True)
self.textBrowser.clear()
globaldef.secID = []
globaldef.indID = []
globaldef.secID.extend([self.sec1Edit.text().upper(), self.sec2Edit.text().upper()])
globaldef.indID.extend(['rt_time'.upper(), 'rt_bid1'.upper(), 'rt_ask1'.upper(),
'rt_bsize1'.upper(), 'rt_asize1'.upper(), 'rt_last'.upper()])
self.qThread = wsq.feeder()
self.qThread.start()
self.qThread.update_data.connect(self.handle_display)
self.qThread.update_data.connect(self.handle_graphic)
def handle_display(self, data):
# Update UI
self.last1Edit.setText('{0:.4f}'.format(data[0][5]))
self.last2Edit.setText('{0:.4f}'.format(data[1][5]))
self.bidvol1Edit.setText('{0:.4f}'.format(data[0][3]))
self.bidvol2Edit.setText('{0:.4f}'.format(data[1][3]))
self.bid1Edit.setText('{0:.4f}'.format(data[0][1]))
self.bid2Edit.setText('{0:.4f}'.format(data[1][1]))
self.ask1Edit.setText('{0:.4f}'.format(data[0][2]))
self.ask2Edit.setText('{0:.4f}'.format(data[1][2]))
self.askvol1Edit.setText('{0:.4f}'.format(data[0][4]))
self.askvol2Edit.setText('{0:.4f}'.format(data[1][4]))
self.spread1Edit.setText('{0:.4f}'.format(globaldef.spreadBid))
self.spread2Edit.setText('{0:.4f}'.format(globaldef.spreadAsk))
self.textBrowser.append("<b>%s</b> | Spd_Bid:<b>%s</b> | Spd_Ask:<b>%s</b>|"
% (str(int(data[0][0])).zfill(6), '{0:.4f}'.format(globaldef.spreadBid),
'{0:.4f}'.format(globaldef.spreadAsk)))
def handle_graphic(self, data):
self.dr.plot()
@Slot()
def on_cancelButton_clicked(self):
self.subscribeButton.setEnabled(True)
self.cancelButton.setEnabled(False)
self.qThread.finished()
@Slot(str)
def on_sec1Edit_textEdited(self, text):
self.updateUi()
@Slot(str)
def on_sec2Edit_textEdited(self, text):
self.updateUi()
def updateUi(self):
enable = bool(self.sec1Edit.text()) and bool(self.sec2Edit.text())
self.subscribeButton.setEnabled(enable)
self.cancelButton.setEnabled(enable)
class Figure_Canvas(FigureCanvas):
"""
Derived from class FigureCanvas, so that this class is both a Qwidget of PyQt5 and a FigureCanvas of matplotlib.
This is a key step to link PyQt5 and matplotlib
"""
def __init__(self, parent=None, width=7.4, height=5, dpi=100):
# Create an Figure. Note that this is figure of matplotlib rather a figure of matplotlib.pyplot
fig = Figure(figsize=(width, height), dpi=dpi)
FigureCanvas.__init__(self, fig) # Initialize the parent class
self.setParent(parent)
# Call method add_subplot of figure, which is similar to method subplot of matplotlib.pyplot
self.axes1 = fig.add_subplot(311)
self.axes2 = fig.add_subplot(312)
self.axes3 = fig.add_subplot(313)
def plot(self):
self.axes1.clear()
self.axes1.plot(globaldef.plotTime, globaldef.plotLast, color='k', alpha=0.9, linewidth=0.5)
self.axes1.xaxis.set_visible(False)
self.axes1.set_title("Real Time Spread_Last Trend Graph", fontsize=10)
self.axes2.clear()
self.axes2.plot(globaldef.plotTime, globaldef.plotBid, color='k', alpha=0.9, linewidth=0.5)
self.axes2.xaxis.set_visible(False)
self.axes2.set_title("Real Time Spread_Bid Trend Graph", fontsize=10)
self.axes3.clear()
self.axes3.plot(globaldef.plotTime, globaldef.plotAsk, color='k', alpha=0.9, linewidth=0.5)
self.axes3.set_title("Real Time Spread_Ask Trend Graph", fontsize=10)
self.draw()
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
form = QuoteDlg()
form.show()
app.exec_()
案例3. 订阅实时行情,并放入分线程中
案例3. 订阅实时行情,并放入分线程中
本案例将演示如何订阅实时行情,并把数据接受模块放到一个分线程里。示例代码如下:
import threading
from WindPy import w
w.start()
#define the callback function
def myCallback(indata):
if indata.ErrorCode!=0:
print('error code:'+str(indata.ErrorCode)+'\n')
return()
lastvalue =""
for k in range(0,len(indata.Fields)):
if(indata.Fields[k] == "RT_LAST"):
lastvalue = str(indata.Data[k][0])
string = lastvalue +"\n"
print(string)
class feeder(threading.Thread):
def __init__(self,threadID,name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
def run(self):
w.start()
w.wsq("IF.CFE","rt_time,rt_last",func=myCallback)
#to subscribe IF.CFE
thread1 =feeder(1,"feder-1")
thread1.start()
ext = ''
while ext != 'q':
ext = input('Enter "q" to exit\n')
获取板块数据
本专题以沪深300指数为例,演示如何通过Wind API获取板块或指数成分的历史数据并把获取的数据保存在本地。
案例1. 获取沪深300指数最新成分股并保存为JSON文件
案例1. 获取沪深300指数最新成分股并保存为JSON文件
本案例通过调用Wind API WSET函数获取沪深300指数的最新成分股代码,并保存为JSON文件。
import json
from datetime import datetime
from WindPy import w
def check_api_error(func):
"""
A decorator for WindPy functions, so that if a function fails to retrieve data, it will print the error code and the relevant message before exiting
:param func: WindPy function that returns a WindData object
:return: wrapper
"""
def wrapper(*args, **kwargs):
data = func(*args, **kwargs)
if data.ErrorCode != 0:
print(data)
exit()
return data
return wrapper
# Decorate w.start
w.start = check_api_error(w.start)
# Decorate w.wset
w.wset = check_api_error(w.wset)
# Start WindPy connection
w.start()
# Get the date of today and convert it to a string with format YYYYMMDD
today = datetime.strftime(datetime.today(), "%Y%m%d")
# Retrieve the wind codes of the constituent
stock_codes = w.wset("sectorconstituent", "date=" + today + ";windcode=000300.SH;field=wind_code")
# Save the data in json
with open('HS300Constituent.json', mode='w') as f:
json.dump(stock_codes.Data[0], f)
案例2. 获取沪深300成分股历史数据并存入数据库中
案例2. 获取沪深300成分股历史数据并存入数据库中
本案例演示如何通过Wind API获取沪深300指数的成分股及其历史数据,并借助Pandas的to_sql函数批量导入SQLite数据库。如需使用其他数据库,用户可自行参考sqlalchemy的文档进行设置。示例代码如下:
# 本案例仅作参考,目的在于帮助WindPy的用户熟悉WindPy接口的使用方法。用户需根据实际需求自行编写、测试脚本
# 以下将演示如何下载沪深300成分股的历史数据并存入数据库
import sqlite3
import time
import pandas as pd
from WindPy import w
from sqlalchemy import create_engine
class WSDLoader:
def __init__(self, start_date, end_date, db_engine):
self._start_date = start_date
self._end_date = end_date
self._db_engine = db_engine
@property
def current_time(self):
return time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime(time.time()))
def __error_logger(self, wind_code, status, info=None):
"""
Log the errors occuring when retriving or saving data
:param wind_code: str, wind code of the present security
:param status: status parameters, e.g. the ErrorCode returned by Wind API
:return: None
"""
error_log = pd.DataFrame(index=[wind_code])
error_log.loc[wind_code, 'start_date'] = self._start_date
error_log.loc[wind_code, 'end_date'] = self._end_date
error_log.loc[wind_code, 'status'] = status
error_log.loc[wind_code, 'table'] = 'stock_daily_data'
error_log.loc[
wind_code, 'args'] = 'Symbol: ' + wind_code + ' From ' + self._start_date + ' To ' + self._end_date
error_log.loc[wind_code, 'error_info'] = info
error_log.loc[wind_code, 'created_date'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
error_log.to_sql('stock_error_log', self._db_engine, if_exists='append')
def fetch_historical_data(self, wind_codes, sleep_time=5):
"""
Retrieve the WSD data of specified windcodes
:param wind_codes: List[str], the windcodes of specified securities
:param sleep_time: number, the sleep time for the loop when an error occurs
:return: None
"""
print(self.current_time, ": Start to Download A-Share Stocks")
start_date = self._start_date
end_date = self._end_date
db_engine = self._db_engine
w.start()
for wind_code in wind_codes:
print(self.current_time, ": {0}".format(wind_code))
# The code can be generated using Code Generator. To get data in format DataFrame, add usedf=True in the parameter list
error_code, data = w.wsd(wind_code,
"windcode,trade_code,open,high,low,close,pre_close,volume,amt",
start_date,
end_date,
usedf=True)
# Check if Wind API runs successfully. If error_code is not 0, it indicators an error occurs
if error_code != 0:
# Output log
self.__error_logger(wind_code, '{0}'.format(int(error_code)))
# Print error message
print(self.current_time, ":data %s : ErrorCode :%s" % (wind_code, error_code))
print(data)
# Pause the loop for the specified time when an error occurs
time.sleep(sleep_time)
# Skip the present iteration
continue
try:
# Save the data into the database
data.to_sql('stock_daily_data', db_engine, if_exists='append')
except Exception as e:
self.__error_logger(wind_code, None)
print(self.current_time, ": SQL Exception :%s" % e)
print(self.current_time, ": Downloading A-Share Stock Finished .")
def get_windcodes(self, trade_date=None):
"""
Retrieve the windcodes of CSI300 (沪深300) constituents
:param trade_date: the date to retrieve the windcodes of the constituents
:return: Error code or a list of windcodes
"""
w.start()
if trade_date is None:
trade_date = self._end_date
# Retrieve the windcodes of CSI300 constituents.
# Users can use Sector Constituents and Index Constituents of WSET to retrieve the constituents of a sector or an index
stock_codes = w.wset("sectorconstituent", "date=" + trade_date + ";windcode=000300.SH;field=wind_code")
if stock_codes.ErrorCode != 0:
# Return the error code when an error occurs
return stock_codes.ErrorCode
else:
# Return a list of windcodes if the data is achieved
return stock_codes.Data[0]
@staticmethod
def fetchall_data(wind_code):
"""
Fetch data from SQLite database
:param str, wind_code:
:return: None
"""
conn = sqlite3.connect('example.db')
c = conn.cursor()
c.execute("SELECT * FROM stock_daily_data WHERE WINDCODE = ?", [(wind_code)])
data = c.fetchall()
if len(data) > 0:
for row in c.fetchall():
# Print the retrieved data
print(row)
else:
print("No data found!")
conn.close()
@staticmethod
def fetchall_log():
"""
Retrieve the error log
:return: None
"""
conn = sqlite3.connect('example.db')
c = conn.cursor()
c.execute("SELECT * FROM stock_error_log")
for row in c.fetchall():
# Print error log
print(row)
conn.close()
def main(start_date, end_date):
"""
The main function
:param start_date: str, set the start date, format: YYYYMMDD
:param end_date: str,set the end date, format: YYYYMMDD
:return: None
"""
# The demonstration uses SQLite as an example. If you need to use another database, please refer to the documentation of sqlalchemy
db_engine = create_engine('sqlite:///example.db')
loader = WSDLoader(start_date, end_date, db_engine)
wind_codes = loader.get_windcodes()
if type(wind_codes) is not int:
loader.fetch_historical_data(wind_codes)
else:
print('ErrorCode:', wind_codes)
if __name__ == '__main__':
start = '20140101'
end = '20151231'
main(start, end)
WSDLoader.fetchall_data('002573.SZ')
WSDLoader.fetchall_log()
案例3. 获取沪深300成分股数据并保存为CSV文件
案例3. 获取沪深300成分股数据并保存为CSV文件
本案例演示如何获取沪深300成分股数据并把获取的数据输出为DataFrame格式,然后借助Pandas的to_csv函数把获取到的数据保存为CSV文件。
from WindPy import w
from datetime import datetime
import pandas as pd
w.start()
# Download the latest constituents of HS300 index
today = datetime.today().strftime('%Y-%m-%d')
windcodes = w.wset("sectorconstituent","date={0};windcode=000300.SH;field=wind_code".format(today))
assert windcodes.ErrorCode == 0, 'Download historical constituents, ErrorCode: {0}'.format(windcodes.ErrorCode)
# Fetch the data of the last 12 months and return a dataframe
dataset = pd.DataFrame(columns=['WINDCODE', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOLUME', 'AMT'])
for windcode in windcodes.Data[0]:
errorcode, data = w.wsd(windcode, "open,high,low,close,volume,amt", "-12M", today, "industryType=2;industryStandard=1", usedf=True)
if errorcode != 0:
print(windcode, ":ErrorCode:", errorcode)
continue
data.loc[:, 'WINDCODE'] = windcode
dataset = dataset.append(data, sort=False)
dataset.index.name = 'DATE'
dataset.to_csv('HS300 Data.csv')
案例4. 获取沪深300成分股数据并保存如数据库
案例4. 获取沪深300成分股数据并保存如数据库
本案例演示如何获取一段时期内所有的沪深300指数的成分股,并借助Wind API的日期宏获取其自上市以来到选定日期的所有历史数据。获取的数据按照日期和WindCode逐行导入数据库中。本案例采用SQLite数据库进行演示。如需使用其他数据库,用户可以自行替换。示例代码如下:
from WindPy import w
from datetime import datetime
import sqlite3
# Connect to the database
conn = sqlite3.connect("example.db")
c = conn.cursor()
# Create an empty table
table_names = c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")
c.execute("DROP TABLE IF EXISTS stockprice")
c.execute("""
CREATE TABLE stockprice (
windcode VARCHAR(20) NOT NULL,
tradedate VARCHAR(50),
openprice FLOAT,
highprice FLOAT,
lowprice FLOAT,
closeprice FLOAT,
volume FLOAT,
amt FLOAT
)
""")
sql = "INSERT INTO stockprice VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
w.start()
# Download the changes in the constituents and hisotrical constituents
windcodes_change = w.wset("indexhistory","startdate=2015-01-01;enddate=2020-01-01;windcode=000300.SH;field=tradedate,tradecode")
assert windcodes_change.ErrorCode == 0, 'Download constituents changes, ErrorCode: {0}'.format(windcodes_change.ErrorCode)
windcodes_2015 = w.wset("sectorconstituent","date=2015-01-01;windcode=000300.SH;field=wind_code")
assert windcodes_2015.ErrorCode == 0, 'Download historical constituents, ErrorCode: {0}'.format(windcodes_2015.ErrorCode)
windcodes = list(set(windcodes_change.Data[1] + windcodes_2015.Data[0]))
# Fetch data and save in the database
for windcode in windcodes:
data = w.wsd(windcode, "open,high,low,close,volume,amt", "IPO", "2020-01-01", "industryType=2;industryStandard=1")
if data.ErrorCode != 0:
print(windcode, ":ErrorCode:", data.ErrorCode)
continue
for i, date in enumerate(data.Times):
sqlrow = list()
for j, field in enumerate(data.Fields):
sqlrow.append(data.Data[j][i])
c.execute(sql, [(windcode), (date)] + sqlrow)
conn.commit()
conn.close()
数据可视化
案例1. 绘制收盘价日期序列
案例1. 绘制收盘价日期序列
from WindPy import *
import numpy as np
import pandas as pd
from datetime import datetime
import matplotlib.pylab as plt
w.start()
errorcode, df = w.wsd("002739.SZ",
"close",
"2016-05-17", "2020-05-18",
"TradingCalendar=SZSE;Fill=Previous",
usedf=True)
plt.figure(figsize=(10, 7))
df['CLOSE'].plot()
plt.ylabel('Price', fontsize=14)
plt.xlabel('Date', fontsize=14)
plt.show()
案例2. 绘制K线图,并叠加成交量的折线图
案例2. 绘制K线图,并叠加成交量的折线图
from WindPy import *
import numpy as np
import pandas as pd
import talib as ta
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import datetime
from matplotlib.dates import date2num
def mkt_plot(quotes, field, sec): # quotes:行情数据-Dateframe类型 sec:标题
fig = plt.figure(figsize=(11,5))
ax1 = fig.add_axes([0, 1, 1, 1])
ax1.set_title(sec, fontsize=15)
ax1.grid(True, axis='y')
ax1.set_xlim(-1, len(quotes)+1)
for i in range(len(quotes)):
close_price,open_price = quotes['CLOSE'].iloc[i], quotes['OPEN'].iloc[i]
high_price, low_price = quotes['HIGH'].iloc[i], quotes['LOW'].iloc[i]
trade_date = quotes.index[i]
if close_price > open_price:#画阳线
ax1.add_patch(patches.Rectangle((i-0.2, open_price), 0.4, close_price-open_price, fill=False, color='r'))
ax1.plot([i, i], [low_price, open_price], 'r')
ax1.plot([i, i], [close_price, high_price], 'r')
else:#画阴线
ax1.add_patch(patches.Rectangle((i-0.2, open_price), 0.4, close_price-open_price, color='g'))
ax1.plot([i, i], [low_price, high_price], color='g')
ax1.set_ylabel("Price", fontsize=15)
ax1.set_xlabel("Date", fontsize=15)
#设置x轴标签
ax1.set_xticks(range(0,len(quotes),5))#位置
ax1.set_xticklabels([(quotes.index[i]).strftime('%Y-%m-%d') for i in ax1.get_xticks()] , rotation=20)#标签内容
ax2 = ax1.twinx()
ax2.plot(range(len(quotes)), quotes[field.upper()])
ax2.set_ylabel(field.capitalize(), fontsize=15)
return fig
w.start()
errorcode, data = w.wsd(
"600028.SH",
"open,high,low,close,volume","ED-1M" ,
(datetime.date.today()-datetime.timedelta(days=1)).strftime('%Y-%m-%d'),
"Fill=Previous",
usedf=True
)
mkt_plot(data, 'volume', '中国石化 600028')
plt.show()
设置Windows计划任务
本案例以Wind Python接口为例,介绍如何设置Windows计划任务。通过设定Windows计划任务,用户可以定时调用Wind API并进行数据处理。其他语言的Wind API程序也可通过类似的步骤设置Windows计划任务。
Windows计划任务的设置步骤以及批处理脚本参考附件中的readme.docx和wind_test.bat。测试用Python脚本如下:
from WindPy import w
import time
import os
def start():
start_data = w.start()
print()
if start_data.ErrorCode == 0:
print("Wind API started successfully")
else:
print("Error message:")
print(start_data)
time.sleep(5)
data = w.wss("600000.SH", "sec_name,bsharename,sec_englishname,exchange_cn,exch_eng")
time.sleep(5)
print()
if data.ErrorCode == 0:
print("WSS ran successfully")
print("The data is:")
print(data)
else:
print("WSS failed to run")
print("Error message:")
print(data)
w.stop()
os.system('PAUSE')
if __name__ == '__main__':
start()
在设置Windows计划任务时,用户需根据系统环境调整本案例参考文件中的部分参数,并进行测试。测试可分以下三个步骤进行:
1.测试Python脚本并确保其能够正常运行并获取到数据
2.测试bat批处理脚本并确保其能够运行
3.设置Windows计划任务并测试
策略回测
使用量化接口wupf函数,结合终端PMS功能模块实现三种调仓方式下的策略回测。
flow_backtest.py 流水上传 holding_backtest.py 持仓上传 weight_backtest.py 权重上传
# -*- coding:utf-8 -*-
# Author:OpenAPISupport@wind.com.cn
# Editdate:2017-11-08
from WindPy import *
import time
w.start()
class wupf_flow(object):
def __init__(self,PortfoName,windAccount,windCode,originalCaptial,beginDate,endDate):
reloadResult=self.reloadPortfo(PortfoName,windAccount)
if reloadResult.ErrorCode!=0:
self.throwError(reloadResult.Data[0][0])
self.strategy(windCode,beginDate,endDate,originalCaptial,PortfoName,windAccount)
#执行回测策略
def strategy(self,windCode,beginDate,endDate,originalCaptial,PortfoName,windAccount):
#接口获取回测数据
closePrice=w.wsd(windCode, "close", beginDate, endDate, "Fill=Previous") #[beginDate,endDate]收盘价
ma10=w.wsd(windCode, "MA", beginDate, endDate, "MA_N=10","PriceAdj=F") #[beginDate,endDate]5日均线
ma30=w.wsd(windCode, "MA", beginDate, endDate, "MA_N=30","PriceAdj=F") #[beginDate,endDate]10日均线
tradeStatus=w.wsd(windCode, "trade_status", beginDate, endDate, "") #[beginDate,endDate]交易状态
#设置期初组合现金
wupfCash=w.wupf(PortfoName, beginDate, "CNY", originalCaptial, "1","Direction=Short;Method=BuySell;CreditTrading=No;Owner="+windAccount+";type=flow")
if wupfCash.ErrorCode!=0:
self.throwError(wupfCash.Data[0][0])
time.sleep(0.5)
buyFlag=1
for i in range(len(ma10.Data[0])):
#10日均线与30日均线黄金交叉且30日均线向上运行, 买入
if i>0 and buyFlag==1 and (ma10.Data[0][i-1] < ma30.Data[0][i-1] and ma10.Data[0][i] > ma30.Data[0][i] and ma30.Data[0][i-1] < ma30.Data[0][i]) and tradeStatus.Data[0][i]==u"交易":
buyFlag=0
wupfSecurity=w.wupf("PMStest",closePrice.Times[i].strftime("%Y%m%d"), windCode, "100", str(closePrice.Data[0][i]),"Direction=Long;Method=BuySell;CreditTrading=No;Owner="+windAccount+";type=flow")
time.sleep(0.5)
if wupfSecurity.ErrorCode!=0:
self.throwError(wupfSecurity.Data[0][0])
print closePrice.Times[i].strftime("%Y%m%d")+"_buy"
#10日均线与30日均线死亡交叉, 卖出
elif i>0 and buyFlag==0 and (ma10.Data[0][i-1] > ma30.Data[0][i-1] and ma10.Data[0][i] < ma30.Data[0][i]) and tradeStatus.Data[0][i]==u"交易":
buyFlag=1
wpfPosition=w.wpf("PMStest", "Position","view=PMS;date="+closePrice.Times[i].strftime("%Y%m%d")+";sectorcode=101;displaymode=1")
time.sleep(0.5)
if wpfPosition.ErrorCode!=0:
self.throwError(wpfPosition.Data[0][0])
wupfSecurity=w.wupf("PMStest",closePrice.Times[i].strftime("%Y%m%d"), windCode, "-"+str(wpfPosition.Data[3][0]), str(closePrice.Data[0][i]),"Direction=Long;Method=BuySell;CreditTrading=No;Owner="+windAccount+";type=flow")
time.sleep(0.5)
if wupfSecurity.ErrorCode!=0:
self.throwError(wupfSecurity.Data[0][0])
print closePrice.Times[i].strftime("%Y%m%d")+"_sell"
#重置回测组合
def reloadPortfo(self,PortfoName,windAccount):
result=w.wupf(PortfoName, "", "", "", "","Owner="+windAccount+";reset=true")
time.sleep(0.5)
return result
#抛出错误信息
def throwError(self,Message):
raise Exception(Message)
if __name__=="__main__":
wupf_flow("PMStest", "W0817573", "300008.SZ","10000","20150101","20171031")
交易模块多账户下单
本案例为批量下单GUI。用户可以登录两个账号并在沪深市场模拟下单。
第1步:用户登录自己的股票账户,返回登录号(logonID),可参照例子中的登录函数。如果登陆成功,对应的LogonID图标后的"Off"将自动变成LogonID。
第2步:填写GUI中的登录号(LogonID)、股票代码、买卖方向、委托价格和委托数量, 并下单。
源代码和UI文件可从本页面的附件下载。代码如下:
import sys
from PyQt5 import QtCore,QtGui,uic,QtWidgets
from WindPy import w
w.start()
qtCreatorFile="sample.ui" # The name of the used UI file
Ui_MainWindow,QtBaseClass = uic.loadUiType(qtCreatorFile)
global accountNumber
accountNumber=0
class MyApp(QtWidgets.QMainWindow,Ui_MainWindow):
def __init__(self):
QtWidgets.QMainWindow.__init__(self)
Ui_MainWindow.__init__(self)
self.setupUi(self)
# Relate events with methods
self.accountLogin.clicked.connect(self.login)
self.torder.clicked.connect(self.torderProcess)
self.quit.clicked.connect(self.logout)
# Log on with the entered accounts
def login(self):
global accountNumber
msg = list()
for k in range(2):
accountlabelName="account_label"+str(k)
print(self.account_label0.text())
temtText = eval("self."+accountlabelName+".text()")
if temtText!="":
windLogin=w.tlogon('0000','0',str(temtText),'123456','SHSZ')
print(windLogin)
if windLogin.ErrorCode == 0:
exec('self.LogonID_label'+str(k)+'.setText(str(windLogin.Data[0][0]))')
# Save ErrorMsg
if windLogin.ErrorCode == 0:
windLogin.Data[4][0] = 'Succeeded!'
msg.append(windLogin.Data[4][0])
accountNumber=k+1
# Join saved ErrorMsgs to display
msg = '\n'.join(msg)
self.logon_msg_browser.setText(msg)
# Process order when the submit button is clicked
def torderProcess(self):
tmp=[]
for j in range(5):
tmp.append("")
table_info = []
for i in range(5):
table_info.append(tmp)
msg = list()
for i in range(5):
if(len(eval('self.label{0}_1.text()'.format(i)))!=0):
logon_id = eval('self.label{0}_0.text()'.format(i))
windcode = eval('self.label{0}_1.text()'.format(i))
trade_side = eval('self.label{0}_2.text()'.format(i))
price = eval('self.label{0}_3.text()'.format(i))
amount = eval('self.label{0}_4.text()'.format(i))
# Wind code, trading direction, order price, amount
torderResult=w.torder(windcode,trade_side,price,amount,'OrderType=LMT;LogonID='+logon_id)
# Save ErrorMsg
msg.append(torderResult.Data[8][0])
# Join saved ErrorMsgs to dispaly
msg = '\n'.join(msg)
self.order_msg_browser_2.setText(msg)
# Log out the accounts when the logout button is clicked
def logout(self):
global accountNumber
if accountNumber!=0:
for i in range(1,accountNumber):
w.tlogout(i)
print(i)
self.close()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
window = MyApp()
window.show()
sys.exit(app.exec_())
版权声明:本文为qq_40844276原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。