VWAP算法(成交量加权平均价格)(难度:高级)

Table of Contents

VWAP介绍

根据各个算法交易中算法的主动程度不同,可以把算法交易分为被动型算法交易、主动型算法交易、综合型算法交易三大类。而成交量加权平均价格(VWAP)属于被动型算法交易,它也是在日常算法交易中应用最为广泛的交易策略算法,国外机构有一半的算法交易量是采取VWAP策略及其衍生算法交易完成的。

成交量加权平均价格(VWAP)算法就是将大额报单按照规定期限内预测的交易量分布比例拆分成多个小额订单进行委托交易。即先将交易时间分成等时长的时间片,然后根据历史各时间片的交易情况来预测当日内各时间片交易量在全天交易量的占比;最后根据预测的交易占比拆分总交易量到各个时间片执行交易。

其基本思路是从历史交易模式出发,统计归纳历史成交时间、成交量、价格分布等的规则,并将这些规则应用于之后的交易。原则是每一个时间单元完成交易的总量占这段计划交易时段内市场总交易的比例恒定,这一算法交易实现的成交价格的比较基准为一段时间内的市场成交量加权均价。

该策略不是最优的算法策略,将大额报单分拆成较小的报单在一段时间内分开交易,主要是为了降低大额报单对市场短时间的冲击成本,并达到隐藏交易行为的目的, 并不寻求最小化所有成本。理论上,在没有额外的信息,也没有针对价格趋势预测的情况下,VWAP是最优的算法交易策略。

VWAP代码实现

本VWAP策略代码使用Pandas及天勤程序量化软件实现预测拆单计算和交易。

Pandas文档:http://pandas.pydata.org/pandas-docs/stable/index.html

免费模拟、实盘天勤量化软件:https://www.shinnytech.com/tianqin/

本策略代码实现与标准VWAP策略的定义在交易时间上有所不同,标准VWAP策略按照日内交易量分布预测,日内交易量分布即某一时间区间内的交易量占整个交易日交易量的比例。本代码实现将交易时段设置为一个交易日内可选的的任何时段,长度可手动修改设置。使策略的运用更加灵活。

另外,因为成交量占比是小数,成交量为整数,则每个成交量预测值为占比预测值乘以预设的目标手数后进行四舍五入得到,这可能会导致此时所有时间单元的成交量预测值的总和与目标手数不同。初始解决方案为把误差的手数修改到成交量最大的时间单元上,但它并非是一个确定能将误差减小的方法,因此选择另一种解决方案。此解决方案为,设置剩余手数初值为总目标手数,剩余占比值初值为1,循环所有时间单元的占比预测值:当前时间单元的成交量预测值为:剩余手数*(当前时间单元占比预测值/剩余占比值),然后将总占比值减去当前时间单元的占比值,将剩余成交量减去当前时间单元的成交量预测值。

本VWAP策略的实现可分为以下4个步骤:

  • 设定时间单元的长度、目标交易手数、用于计算成交量预测占比的样本长度(即过去交易日的天数)、计划交易时段的起止时间等参数;
  • 将过去每个交易日划分为若干等长时间单元,取出每个交易日的所有时间单元中包含在计划交易时段内的所有时间单元;
  • 计算每个选定时间单元的成交量在此交易时段内总成交量的占比,然后将所有交易日中处于同一时间的时间单元的占比求平均值(即为预测的每个时间单元的成交量占比值),用每个时间单元的占比预测值计算其成交量预测值;
  • 到达交易时段的起始时间即按照预测值进行等时长下单。

天勤内策略源代码:

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
Volume Weighted Average Price策略
参考: https://www.shinnytech.com/blog/vwap
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

import datetime
from tqsdk import TqApi, TargetPosTask

TIME_CELL = 5*60  # 等时长下单的时间单元, 单位: 秒
TARGET_VOLUME = 300  # 目标交易手数 (>0: 多头, <0: 空头)
SYMBOL = "DCE.jd1909"  # 交易合约代码
HISTORY_DAY_LENGTH = 20  # 使用多少天的历史数据用来计算每个时间单元的下单手数
START_HOUR, START_MINUTE = 9, 35  # 计划交易时段起始时间点
END_HOUR, END_MINUTE = 10, 50  # 计划交易时段终点时间点


api = TqApi()
print("策略开始运行")
# 根据 HISTORY_DAY_LENGTH 推算出需要订阅的历史数据长度, 需要注意history_day_length与time_cell的比例关系以避免超过订阅限制
time_slot_start = datetime.time(START_HOUR, START_MINUTE)  # 计划交易时段起始时间点
time_slot_end = datetime.time(END_HOUR, END_MINUTE)  # 计划交易时段终点时间点
klines = api.get_kline_serial(SYMBOL, TIME_CELL, data_length=int(10*60*60/TIME_CELL*HISTORY_DAY_LENGTH))
target_pos = TargetPosTask(api, SYMBOL)
position = api.get_position(SYMBOL)  # 持仓信息

def get_kline_time(kline_datetime):
    """获取k线的时间(不包含日期)"""
    kline_time = datetime.datetime.fromtimestamp(kline_datetime//1000000000).time()  # 每根k线的时间
    return kline_time

def get_market_day(kline_datetime):
    """获取k线所对应的交易日"""
    kline_dt = datetime.datetime.fromtimestamp(kline_datetime//1000000000)  # 每根k线的日期和时间
    if kline_dt.hour >= 18:  # 当天18点以后: 移到下一个交易日
        kline_dt = kline_dt + datetime.timedelta(days=1)
    while kline_dt.weekday() >= 5:  # 是周六或周日,移到周一
        kline_dt = kline_dt + datetime.timedelta(days=1)
    return kline_dt.date()

# 添加辅助列: time及date, 分别为K线时间的时:分:秒和其所属的交易日
klines["time"] = klines.datetime.apply(lambda x: get_kline_time(x))
klines["date"] = klines.datetime.apply(lambda x: get_market_day(x))

# 获取在预设交易时间段内的所有K线, 即时间位于 time_slot_start 到 time_slot_end 之间的数据
if time_slot_end > time_slot_start:  # 判断是否类似 23:00:00 开始, 01:00:00 结束这样跨天的情况
    klines = klines[(klines["time"] >= time_slot_start) & (klines["time"] <= time_slot_end)]
else:
    klines = klines[(klines["time"] >= time_slot_start) | (klines["time"] <= time_slot_end)]

# 由于可能有节假日导致部分天并没有填满整个预设交易时间段
# 因此去除缺失部分交易时段的日期(即剩下的每个日期都包含预设的交易时间段内所需的全部时间单元)
date_cnt = klines["date"].value_counts()
max_num = date_cnt.max()  # 所有日期中最完整的交易时段长度
need_date = date_cnt[date_cnt == max_num].sort_index().index[-HISTORY_DAY_LENGTH - 1:-1]  # 获取今天以前的预设数目个交易日的日期
df = klines[klines["date"].isin(need_date)]  # 最终用来计算的k线数据

# 计算每个时间单元的成交量占比, 并使用算数平均计算出预测值
datetime_grouped = df.groupby(['date', 'time'])['volume'].sum()  # 将K线的volume按照date、time建立多重索引分组
# 计算每个交易日内的预设交易时间段内的成交量总和(level=0: 表示按第一级索引"data"来分组)后,将每根k线的成交量除以所在交易日内的总成交量,计算其所占比例
volume_percent = datetime_grouped / datetime_grouped.groupby(level=0).sum()
predicted_percent = volume_percent.groupby(level=1).mean()  # 将历史上相同时间单元的成交量占比使用算数平均计算出预测值
print("各时间单元成交量占比: %s" % predicted_percent)

# 计算每个时间单元的成交量预测值
predicted_volume = {}  # 记录每个时间单元需调整的持仓量
percentage_left = 1  # 剩余比例
volume_left = TARGET_VOLUME  # 剩余手数
for index, value in predicted_percent.items():
    volume = round(volume_left*(value/percentage_left))
    predicted_volume[index] = volume
    percentage_left -= value
    volume_left -= volume
print("各时间单元应下单手数: %s" % predicted_volume)


# 交易
current_volume = 0  # 记录已调整持仓量
while True:
    api.wait_update()
    # 新产生一根K线并且在计划交易时间段内: 调整目标持仓量
    if api.is_changing(klines.iloc[-1], "datetime"):
        t = datetime.datetime.fromtimestamp(klines.iloc[-1]["datetime"]//1000000000).time()
        if t in predicted_volume:
            current_volume += predicted_volume[t]
            print("到达下一时间单元,调整持仓为: %d" % current_volume)
            target_pos.set_target_volume(current_volume)
    # 用持仓信息判断是否完成所有目标交易手数
    if api.is_changing(position, "volume_long") or api.is_changing(position, "volume_short"):
        if position["volume_long"] - position["volume_short"] == TARGET_VOLUME:
            break

api.close()

点击了解天勤量化程序软件