交互
下载xtquant
https://dict.thinktrader.net/nativeApi/start_now.html?id=3WR14v
第三方库
qmt python=3.6.8(可能需要再次确认qmt中的python版本)
注意:安装三方库前,请备份 QMT安装目录\bin.x64\ 目录下的 DLLs 和 Lib 这两个文件夹,以便在安装三方库引起系统错误后,可以恢复系统默认的库文件。
安装指定python 版本的库
python -m pip install pytdx --target=D:\qmt\bin.x64\Lib\site-packages
在策略里建立服务器
#coding:gbk
import json
import socket
import http.server
from urllib.parse import urlparse, parse_qs
class logic:
@staticmethod
def index(query_params):
return {
'iopv':get_etf_iopv("510050.SH")
}
class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed_url = urlparse(self.path)
path = parsed_url.path
query_params = parse_qs(parsed_url.query)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
router=path[1:]
if(hasattr(logic,router)):
response=getattr(logic,router)
try:
response=response(query_params)
response={'code':0,'data':response}
except Exception as err:
print('发生错误:',err)
response={'code':1,'data':str(err)}
else:
response={'code':404,'data':None}
response = json.dumps(response)
self.wfile.write(response.encode('utf-8'))
def check_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def run(server_class=http.server.HTTPServer, handler_class=SimpleHTTPRequestHandler, port=8080):
if check_port_in_use(port):
raise RuntimeError(f'Port {port} is already in use.')
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f"Starting server on port {port}...")
httpd.serve_forever()
run(port=8085)