319 views
4

用到的原型

Aberration 策略 (难度:初级)

完成后代码如下

此处完全照搬ringo老师的代码,改编如有错误之处请删帖,我怕误人

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = "Ringo"
 '''
Aberration策略 (难度:初级)
参考: https://www.shinnytech.com/blog/aberration/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''
 from tqsdk import TqApi, TargetPosTask
from tqsdk.ta import BOLL
from contextlib import closing
 # 使用BOLL指标计算中轨、上轨和下轨,其中26为周期N  ,2为参数p
def boll_line(klines):
    boll = BOLL(klines, 26, 2)
    midline = boll["mid"].iloc[-1]
    topline = boll["top"].iloc[-1]
    bottomline = boll["bottom"].iloc[-1]
    print("策略运行,中轨:%.2f,上轨为:%.2f,下轨为:%.2f" % (midline, topline, bottomline))
    return midline, topline, bottomline
 async def demo(SYMBOL):
    quote = api.get_quote(SYMBOL)
    klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)
    position = api.get_position(SYMBOL)
    target_pos = TargetPosTask(api, SYMBOL)
     async with api.register_update_notify([quote, klines]) as update_chan:
        # 确保收到了所有订阅的数据
        while not api.is_serial_ready(klines):
            await update_chan.recv()
         midline, topline, bottomline = boll_line(klines)
        async for _ in update_chan:
            # 每次生成新的K线时重新计算BOLL指标
            if api.is_changing(klines.iloc[-1], "datetime"):
                midline, topline, bottomline = boll_line(klines)
             # 每次最新价发生变化时进行判断
            if api.is_changing(quote, "last_price"):
                # 判断开仓条件
                if position.pos_long == 0 and position.pos_short == 0:
                    # 如果最新价大于上轨,K线上穿上轨,开多仓
                    if quote.last_price > topline:
                        print("K线上穿上轨,开多仓")
                        target_pos.set_target_volume(20)
                    # 如果最新价小于轨,K线下穿下轨,开空仓
                    elif quote.last_price < bottomline:
                        print("K线下穿下轨,开空仓")
                        target_pos.set_target_volume(-20)
                    else:
                        print("当前最新价%.2f,未穿上轨或下轨,不开仓" % quote.last_price)
                 # 在多头情况下,空仓条件
                elif position.pos_long > 0:
                    # 如果最新价低于中线,多头清仓离场
                    if quote.last_price < midline:
                        print("最新价低于中线,多头清仓离场")
                        target_pos.set_target_volume(0)
                    else:
                        print("当前多仓,未穿越中线,仓位无变化")
                 # 在空头情况下,空仓条件
                elif position.pos_short > 0:
                    # 如果最新价高于中线,空头清仓离场
                    if quote.last_price > midline:
                        print("最新价高于中线,空头清仓离场")
                        target_pos.set_target_volume(0)
                    else:
                        print("当前空仓,未穿越中线,仓位无变化")
 # 设置合约代码
list_SYMBOL = ["SHFE.cu2012", "DCE.m2105"]
api = TqApi(auth="信易账户,账户密码")
 # 创建异步任务
for x in list_SYMBOL:
    api.create_task(demo(x))
 # 关闭api,释放相应资源
with closing(api):
    while True:
        api.wait_update()

我觉得:

  1. 通常不需要import asyncio
  2. 并不是所有函数都要加async,通常需要等待(阻塞)的才加。
  3. 第一次获得K需要api.is_serial_ready(),用K计算前万万注意啊
  4. while True: api.wait_update()用async for _ in update_chan替换。for前的async不能写掉,它是一个死循环,完事儿用break退出
  5. 一个协程任务里,大概估计通常只需要一个update_chan实例就够了,如果函数中要用api.wait_update(),直接把update_chan传进去搞事情。例如这样

async def A(update_chan):
    #这是一个A函数
    #。。。
    async for _ in update_chan:
        #灯灯灯,一顿操作
        break
 #然后在协程函数里调用它
async def demo(SYMBOL):
    #省略若干行
    #调用A()
    await A(update_chan)

Posted new comment

谢谢分享!

感谢分享~

0

谢谢分享~

Answered question