# -*- coding: utf-8 -*-
"""
modules for Object oriented toolbox which wrappers get_daily and some more
"""
import sys
import datetime as dt
import numpy as np
import pandas as pd
from collections import deque
from functools import wraps, lru_cache
import logging
from xalpha.cons import (
opendate,
yesterday,
next_onday,
last_onday,
scale_dict,
tz_bj,
holidays,
)
from xalpha.universal import (
get_rt,
get_bar,
_convert_code,
_inverse_convert_code,
fetch_backend,
save_backend,
)
import xalpha.universal as xu ## 为了 set_backend 可以动态改变此模块的 get_daily
from xalpha.exceptions import ParserFailure, DateMismatch, NonAccurate
thismodule = sys.modules[__name__]
logger = logging.getLogger(__name__)
[docs]def _set_holdings(module):
for name in [
"no_trading_days",
"holdings",
"currency_info",
"market_info",
"futures_info",
"alt_info",
"gap_info",
]:
setattr(thismodule, name, getattr(module, name, {}))
[docs]def set_holdings(module=None):
"""
导入外部 holdings.py 数据文件用来预测基金净值
:param module: mod. import holdings
:return: None.
"""
if not module:
try:
from xalpha import holdings
_set_holdings(holdings)
print("holdings.py is found and loaded within xalpha dir")
except ImportError:
# print("no holdings.py is found") # may cause confusing for general users
from xalpha import cons
_set_holdings(cons)
else:
_set_holdings(module)
print("external holdings.py is loaded")
set_holdings()
[docs]def _set_display_notebook():
"""Initialize DataTable mode for pandas DataFrame represenation."""
from IPython.core.display import display, Javascript
display(
Javascript(
"""
require.config({
paths: {
DT: '//cdn.datatables.net/1.10.20/js/jquery.dataTables.min',
}
});
$('head').append('<link rel="stylesheet" type="text/css" href="//cdn.datatables.net/1.10.20/css/jquery.dataTables.min.css">');
$('head').append('<style> td, th {{text-align: center;}}</style>')
"""
)
)
def _repr_datatable_(self):
# create table DOM
script = f"$(element).html(`{self.to_html(index=False)}`);\n"
# execute jQuery to turn table into DataTable
script += """
require(["DT"], function(DT) {$(document).ready( () => {
// Turn existing table into datatable
$(element).find("table.dataframe").DataTable();
})
});
"""
return script
pd.DataFrame._repr_javascript_ = _repr_datatable_
[docs]def set_display(env=""):
"""
开关 DataFrame 的显示模式,仅 Jupyter Notebook 有效。
:param env: str, default "". If env="notebook", pd.DataFrame will be shown in fantastic web language
:return:
"""
if not env:
try:
delattr(pd.DataFrame, "_repr_javascript_")
except AttributeError:
pass
elif env in ["notebook", "jupyter", "ipython"]:
_set_display_notebook()
else:
raise ParserFailure("unknown env %s" % env)
[docs]class PEBHistory:
"""
对于指数历史 PE PB 的封装类
"""
indexs = {
"000016.XSHG": ("上证50", "2012-01-01"),
"000300.XSHG": ("沪深300", "2012-01-01"),
"000905.XSHG": ("中证500", "2012-01-01"),
"000922.XSHG": ("中证红利", "2012-01-01"),
"399006.XSHE": ("创业板指", "2012-01-01"),
"000992.XSHG": ("全指金融", "2012-01-01"),
"000991.XSHG": ("全指医药", "2012-01-01"),
"399932.XSHE": ("中证消费", "2012-01-01"),
"000831.XSHG": ("500低波", "2013-01-01"),
"000827.XSHG": ("中证环保", "2013-01-01"),
"000978.XSHG": ("医药100", "2012-01-01"),
"399324.XSHE": ("深证红利", "2012-01-01"),
"399971.XSHE": ("中证传媒", "2014-07-01"),
"000807.XSHG": ("食品饮料", "2013-01-01"),
"000931.XSHG": ("中证可选", "2012-01-01"),
"399812.XSHE": ("养老产业", "2016-01-01"),
"000852.XSHG": ("中证1000", "2015-01-01"),
}
# 聚宽数据源支持的指数列表: https://www.joinquant.com/indexData
[docs] def __init__(self, code, start=None, end=None):
"""
:param code: str. 形式可以是 399971.XSHE 或者 SH000931
:param start: Optional[str]. %Y-%m-%d, 估值历史计算的起始日。
:param end: Dont use, only for debug
"""
yesterday_str = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
if len(code.split(".")) == 2:
self.code = code
self.scode = _convert_code(code)
else:
self.scode = code
self.code = _inverse_convert_code(self.scode)
if self.code in self.indexs:
self.name = self.indexs[self.code][0]
if not start:
start = self.indexs[self.code][1]
else:
try:
self.name = get_rt(self.scode)["name"]
except:
self.name = self.scode
if not start:
start = "2012-01-01" # 可能会出问题,对应指数还未有数据
self.start = start
if not end:
end = yesterday_str
self.df = xu.get_daily("peb-" + self.scode, start=self.start, end=end)
self.ratio = None
self.title = "指数"
self._gen_percentile()
[docs] def _gen_percentile(self):
self.pep = [
round(i, 3) for i in np.nanpercentile(self.df.pe, np.arange(0, 110, 10))
]
try:
self.pbp = [
round(i, 3) for i in np.nanpercentile(self.df.pb, np.arange(0, 110, 10))
]
except TypeError:
df = self.df.fillna(1)
self.pbp = [
round(i, 3) for i in np.nanpercentile(df.pb, np.arange(0, 110, 10))
]
[docs] def percentile(self):
"""
打印 PE PB 的历史十分位对应值
:return:
"""
print("PE 历史分位:\n")
print(*zip(np.arange(0, 110, 10), self.pep), sep="\n")
print("\nPB 历史分位:\n")
print(*zip(np.arange(0, 110, 10), self.pbp), sep="\n")
[docs] def v(self, y="pe"):
"""
pe 或 pb 历史可视化
:param y: Optional[str]. "pe" (defualt) or "pb"
:return:
"""
return self.df.plot(x="date", y=y)
[docs] def fluctuation(self):
if not self.ratio:
d = self.df.iloc[-1]["date"]
oprice = xu.get_daily(
code=self.scode, end=d.strftime("%Y%m%d"), prev=20
).iloc[-1]["close"]
nprice = get_rt(self.scode)["current"]
self.ratio = nprice / oprice
return self.ratio
[docs] def current(self, y="pe"):
"""
返回实时的 pe 或 pb 绝对值估计。
:param y: Optional[str]. "pe" (defualt) or "pb"
:return: float.
"""
try:
return round(self.df.iloc[-1][y] * self.fluctuation(), 3)
except TypeError:
return np.nan
[docs] def current_percentile(self, y="pe"):
"""
返回实时的 pe 或 pb 历史百分位估计
:param y: Optional[str]. "pe" (defualt) or "pb"
:return: float.
"""
df = self.df
d = len(df)
u = len(df[df[y] < self.current(y)])
return round(u / d * 100, 2)
[docs] def summary(self, return_tuple=False):
"""
打印现在估值的全部分析信息。
:return:
"""
result = (
(
self.current("pe"),
self.current_percentile("pe"),
max(
round(
(self.current("pe") - self.pep[0]) / self.current("pe") * 100, 1
),
0,
),
),
(
self.current("pb"),
self.current_percentile("pb"),
max(
round(
(self.current("pb") - self.pbp[0]) / self.current("pb") * 100, 1
),
0,
),
),
)
print("%s%s估值情况\n" % (self.title, self.name))
if dt.datetime.strptime(self.start, "%Y-%m-%d") > dt.datetime(2015, 1, 1):
print("(历史数据较少,仅供参考)\n")
# self.percentile()
print("现在 PE 绝对值 %s, 相对分位 %s%%,距离最低点 %s %%\n" % result[0])
print("现在 PB 绝对值 %s, 相对分位 %s%%,距离最低点 %s %%\n" % result[1])
if return_tuple:
return result
[docs]class StockPEBHistory(PEBHistory):
[docs] def __init__(self, code, start=None, end=None):
"""
:param code: 801180 申万行业指数
:param start:
:param end:
"""
self.code = code
self.scode = code
if not end:
end = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
if not start:
start = "2012-01-01"
self.start = start
self.df = xu.get_daily("peb-" + code, start=start, end=end)
self.name = get_rt(code)["name"]
self.ratio = 1
self.title = "个股"
self._gen_percentile()
[docs]class SWPEBHistory(PEBHistory):
"""
申万一级行业指数列表:
https://www.hysec.com/hyzq/hy/detail/detail.jsp?menu=4&classid=00000003001200130002&firClassid=000300120013&twoClassid=0003001200130002&threeClassid=0003001200130002&infoId=3046547
二三级行业指数也支持
"""
index1 = [
"801740",
"801020",
"801110",
"801200",
"801160",
"801010",
"801120",
"801230",
"801750",
"801050",
"801890",
"801170",
"801710",
"801130",
"801180",
"801760",
"801040",
"801780",
"801880",
"801140",
"801720",
"801080",
"801790",
"801030",
"801730",
"801210",
"801770",
"801150",
]
[docs] def __init__(self, code, start=None, end=None):
"""
:param code: 801180 申万行业指数
:param start:
:param end:
"""
self.code = code
self.scode = code
if not end:
end = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
if not start:
start = "2012-01-01"
self.start = start
self.df = xu.get_daily("sw-" + code, start=start, end=end)
self.name = self.df.iloc[0]["name"]
self.ratio = 1
self.title = "申万行业指数"
self._gen_percentile()
[docs]class Compare:
"""
将不同金融产品同起点归一化比较
"""
[docs] def __init__(
self, *codes, start="20200101", end=yesterday(), col="close", normalize=True
):
"""
:param codes: Union[str, tuple], 格式与 :func:`xalpha.universal.get_daily` 相同,若需要汇率转换,需要用 tuple,第二个元素形如 "USD"
:param start: %Y%m%d
:param end: %Y%m%d, default yesterday
:param col: str, default close. The column to be compared.
:param normalize: bool, default True. 是否将对比价格按起点时间归一。
"""
totdf = pd.DataFrame()
codelist = []
for c in codes:
if isinstance(c, tuple):
code = c[0]
currency = c[1]
else:
code = c
currency = "CNY" # 标的不做汇率调整
codelist.append(code)
df = xu.get_daily(code, start=start, end=end)
df = df[df.date.isin(opendate)]
currency_code = _get_currency_code(currency)
if currency_code:
cdf = xu.get_daily(currency_code, start=start, end=end)
cdf = cdf[cdf["date"].isin(opendate)]
df = df.merge(right=cdf, on="date", suffixes=("_x", "_y"))
df[col] = df[col + "_x"] * df[col + "_y"]
if normalize:
df[code] = df[col] / df.iloc[0][col]
else:
df[code] = df[col]
df = df.reset_index()
df = df[["date", code]]
if "date" not in totdf.columns:
totdf = df
else:
totdf = totdf.merge(on="date", right=df)
self.totdf = totdf
self.codes = codelist
[docs] def v(self):
"""
显示日线可视化
:return:
"""
return self.totdf.plot(x="date", y=self.codes)
[docs] def corr(self):
"""
打印相关系数矩阵
:return: pd.DataFrame
"""
return self.totdf.iloc[:, 1:].pct_change().corr()
#########################
# netvalue prediction #
#########################
[docs]@lru_cache(maxsize=512)
def get_currency(code):
"""
通过代码获取计价货币的函数
:param code:
:return:
"""
# 强制需要自带 cache,否则在回测 table 时,info 里没有的代码将很灾难。。。
# only works for HKD JPY USD GBP CNY EUR, not very general when data source gets diverse more
try:
if code in currency_info:
return currency_info[code]
elif (code.startswith("F") or code.startswith("M")) and code[1:].isdigit():
return "CNY"
elif code.startswith("FT-") and len(code.split(":")) > 2:
# be careful! FT-ABC:IOM has no currency information!
return code.split(":")[-1]
elif code.startswith("HK") and code[2:].isdigit():
return "HKD"
currency = get_rt(code)["currency"]
if currency is None:
currency = "CNY"
elif currency == "JPY":
currency = "100JPY"
except (TypeError, AttributeError, ValueError):
logger.warning("set currency of %s as default CNY" % code)
currency = "CNY"
return currency
[docs]def _get_currency_code(c):
if c == "CNY":
return # None
if c == "JPY":
return "100JPY/CNY"
zjjl = [
"USD",
"EUR",
"100JPY",
"HKD",
"GBP",
"AUD",
"NZD",
"SGD",
"CHF",
"CAD",
"MYR",
"RUB",
"ZAR",
"KRW",
"AED",
"SAR",
"HUF",
"PLN",
"DKK",
"SEK",
"NOK",
"TRY",
"MXN",
"THB",
]
if c in zjjl:
return c + "/CNY"
return "currencies/" + c.lower() + "-cny"
[docs]@lru_cache(maxsize=512)
def get_currency_code(code):
c = get_currency(code)
return _get_currency_code(c)
[docs]@lru_cache(maxsize=512)
def get_market(code):
"""
非常粗糙的通过代码获取交易市场的函数
:param code:
:return:
"""
trans = {
"USD": "US",
"GBP": "UK",
"HKD": "HK",
"CNY": "CN",
"CHF": "CH",
"JPY": "JP",
"EUR": "DE",
"AUD": "AU",
"INR": "IN",
"SGD": "SG",
}
try:
if code in market_info:
return market_info[code]
elif code.startswith("CNY/") or code.endswith("/CNY"):
return "CM" # china money 中间价市场标记
elif code.startswith("HK") and code[2:].isdigit():
return "HK"
market = get_rt(code)["market"]
if market is None:
market = get_currency(code)
market = trans.get(market, market)
except (TypeError, AttributeError, ValueError, IndexError):
market = "CN"
return market
[docs]@lru_cache(maxsize=512)
def get_alt(code):
"""
抓取失败后寻找替代对等标的
:param code:
:return:
"""
if code in alt_info:
return alt_info[code]
elif len(code[1:].split("/")) == 2:
return "INA-" + code # 英为 app 源替代网页源
elif code.startswith("SP") and code[2:].isdigit():
return "SPC" + code[2:] # 中国区标普源替代美国源
else:
return None
[docs]def _is_on(code, date):
df = xu.get_daily(code, prev=20, end=date)
if len(df[df["date"] == date]) == 0:
return False
return True
[docs]def is_on(date, market="CN", no_trading_days=None):
"""
粗略鉴定 date 日是否是指定 market 的开市日,对于当日鉴定,仍有数据未及时更新的风险。也存在历史数据被 investing 补全的风险。
:param date:
:param market: str. CN, JP, HK, US, UK, CH, HK, DE
:return: bool.
"""
date_obj = dt.datetime.strptime(date.replace("-", "").replace("/", ""), "%Y%m%d")
if date_obj.weekday() in [5, 6]: # 周末休市
# 注意部分中东市场周日开市,暂时涉及不到
return False
date_dash = date_obj.strftime("%Y-%m-%d")
if no_trading_days:
if date_dash in no_trading_days.get(market, []):
return False
if date_dash in holidays.get(market, []):
return False
logger.warning(
"determine whether %s is holiday in %s market, but may be wrong, be careful!"
% (date_dash, market)
)
if market in ["CN", "CHN", "CNY", "RMB", "CHINA", "CM"]: # 国内节假日不更新中间价
return date_dash in opendate
elif market in ["JP", "JAPAN", "JPY", "100JPY"]:
code = "indices/japan-ni225"
elif market in ["US", "NY", "USD", "NASDAQ"]:
code = "indices/us-spx-500"
elif market in ["GBP", "UK", "GB"]:
code = "indices/uk-100"
elif market in ["GER", "EUR", "DE"]: # 是否可以代表欧洲待考量, 还要警惕欧洲市场的美元计价标的
code = "indices/germany-30"
elif market in ["CHF", "SWI", "CH"]:
code = "indices/switzerland-20"
elif market in ["HK"]:
code = "indices/hang-sen-40"
else:
logger.warning(
"unknown oversea market %s, assuming %s is not a holiday"
% (market, date_dash)
)
return True
return _is_on(code, date)
[docs]def daily_increment(code, date, lastday=None, _check=False):
"""
单一标的 date 日(若 date 日无数据则取之前的最晚有数据日,但该日必须大于 _check 对应的日期)较上一日或 lastday 的倍数,
lastday 支持不完整,且不能离 date 太远
:param code:
:param date:
:param lastday: 如果用默认 None,则表示和前一日的涨跌
:param _check: 数据必须已更新到 date 日,除非之前每天都是节假日
:return:
"""
try:
tds = xu.get_daily(code=code, end=date, prev=30)
except Exception as e: # 只能笼统 catch 了,因为抓取失败的异常是什么都能遇到。。。
code = get_alt(code)
if code:
tds = xu.get_daily(code=code, end=date, prev=30)
else:
raise e
tds = tds[tds["date"] <= date]
if _check:
date = date.replace("-", "").replace("/", "")
date_obj = dt.datetime.strptime(date, "%Y%m%d")
while tds.iloc[-1]["date"] < date_obj:
# in case data is not up to date
# 但是存在日本市场休市时间不一致的情况,估计美股也存在
if not is_on(
date_obj.strftime("%Y%m%d"),
get_market(code),
no_trading_days=no_trading_days,
) or (date_obj.strftime("%Y-%m-%d") in gap_info.get(code, [])):
print("%s is closed on %s" % (code, date))
if not lastday:
return 1 # 当日没有涨跌,这里暂时为考虑 _check 和 lastday 相同的的情形
date_obj -= dt.timedelta(days=1)
else:
raise DateMismatch(
code,
reason="%s has no data newer than %s"
% (code, date_obj.strftime("%Y-%m-%d")),
)
if not lastday:
ratio = tds.iloc[-1]["close"] / tds.iloc[-2]["close"]
else:
tds2 = tds[tds["date"] <= lastday]
# 未考虑连 lastday 的数据数据源都没更新的情形,这种可能极小
ratio = tds.iloc[-1]["close"] / tds2.iloc[-1]["close"]
return ratio
[docs]def _smooth_pos(r, e, o):
"""
单日仓位估计的平滑函数
:param r: 实际涨幅
:param e: 满仓估计涨幅
:param o: 昨日仓位估计
:return:
"""
pos = r / e
if pos <= 0:
return o
if pos > 1:
pos = 1
elif pos < 0.5:
pos = pos ** 0.6
if abs(r) < 0.6:
pos = (pos + (3 - 5 * abs(r)) * o) / (4 - 5 * abs(r))
return pos
[docs]def error_catcher(f):
"""
装饰器,透明捕获 DateMismatch
:param f:
:return:
"""
@wraps(f)
def wrapper(*args, **kws):
try:
return f(*args, **kws)
except DateMismatch as e:
code = args[0]
error_msg = e.reason
error_msg += ", therefore %s cannot predict correctly" % code
raise NonAccurate(code=code, reason=error_msg)
return wrapper
[docs]def evaluate_fluctuation(hdict, date, lastday=None, _check=None):
"""
分析资产组合 hdict 的涨跌幅,全部兑换成人民币考虑
:param hdict:
:param date:
:param lastday:
:param _check:
:return:
"""
price = 0
tot = 0
for fundid, percent in hdict.items():
ratio = daily_increment(fundid, date, lastday, _check)
exchange = 1
currency = get_currency_code(fundid)
if currency:
exchange = daily_increment(currency, date, lastday, _check)
price += ratio * percent / 100 * exchange
tot += percent
remain = 100 - tot
price += remain / 100
return (price - 1) * 100
[docs]class RTPredict:
"""
场内 ETF LOF 实时溢价,非 QDII 类
"""
[docs] def __init__(self, code, t0dict=None):
"""
:param code:
:param t0dict:
"""
self.code = code
self.fcode = "F" + code[2:]
if not t0dict:
t0dict = holdings.get(code[2:], None)
if not t0dict:
raise ValueError("Please provide t0dict for prediction")
if isinstance(t0dict, str):
t0dict = {t0dict: 100}
self.t0dict = t0dict
self.t1value_cache = None
self.now = dt.datetime.now(tz=tz_bj).replace(tzinfo=None)
self.today = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
[docs] def get_t1(self, return_date=True):
"""
获取昨日基金净值
:return:
"""
if not self.t1value_cache:
last_r = get_rt(self.fcode)
last_value, last_date = last_r["current"], last_r["time"]
self.t1value_cache = (last_value, last_date)
if return_date:
return self.t1value_cache
else:
return self.t1value_cache[0]
[docs] def get_t0(self, return_date=True, percent=False):
last_value, last_date = self.get_t1()
last_date_obj = dt.datetime.strptime(last_date, "%Y-%m-%d")
cday = last_onday(self.today)
while last_date_obj < cday: # 昨天净值数据还没更新
# 是否存在部分部分基金可能有 gap?
if cday.strftime("%Y-%m-%d") not in gap_info[self.fcode]:
self.t1_type = "昨日未出"
raise DateMismatch(
self.code,
reason="%s netvalue has not been updated to yesterday" % self.code,
)
else:
cday = last_onday(cday)
# 经过这个没报错,就表示数据源是最新的
if last_date_obj >= self.today: # 今天数据已出,不需要再预测了
print(
"no need to predict net value since it has been out for %s" % self.code
)
self.t1_type = "今日已出"
if not return_date:
return last_value
else:
return last_value, last_date
t = 0
n = 0
today_str = self.today.strftime("%Y%m%d")
for k, v in self.t0dict.items():
w = v
t += w
r = get_rt(k)
# k should support get_rt, investing pid doesn't support this!
if percent:
c = w / 100 * (1 + r["percent"] / 100) # 直接取标的当日涨跌幅
else:
df = xu.get_daily(k)
basev = df[df["date"] <= last_date].iloc[-1]["close"]
c = w / 100 * r["current"] / basev
currency_code = get_currency_code(k)
if currency_code:
c = c * daily_increment(currency_code, today_str)
n += c
n += (100 - t) / 100
t0value = n * last_value
self.t0_delta = n
if not return_date:
return t0value
else:
return t0value, self.today.strftime("%Y-%m-%d")
[docs] def get_t0_rate(self, percent=False, return_date=True):
iopv = self.get_t0(percent=False, return_date=False)
rtv = get_rt(self.code)["current"]
r = (rtv / iopv - 1) * 100
if return_date:
return r, self.today.strftime("%Y-%m-%d")
else:
return r
[docs]class QDIIPredict:
"""
T+2 确认份额的 QDII 型基金净值预测类
.. warning::
由于该类与现实时间的强烈耦合和激进的缓存利用,该类的对象不能"过夜"使用,每天需声明新的对象
"""
[docs] def __init__(
self, code, t1dict=None, t0dict=None, positions=False, fetch=False, save=False
):
"""
:param code: str, 场内基金代码,eg SH501018
:param t1dict: Dict[str, float]. 用来预测 T-1 净值的基金组合持仓,若为空自动去 holdings 中寻找。
:param t0ict: Dict[str, float]. 用来预测 T 实时净值的基金组合持仓,若为空自动去 holdings 中寻找。
:param positions: bool. 仓位是否浮动,默认固定仓位。
:param fetch: bool, default True. 优先从 backend fetch t1。
:param save: bool, default True. 将 t1 缓存到 backend。
"""
self.code = code
self.fcode = "F" + code[2:]
self.fetch = fetch
self.save = save
if not t1dict:
self.t1dict = holdings.get(code[2:], None)
if not self.t1dict:
raise ValueError("Please provide t1dict for prediction")
else:
self.t1dict = t1dict
if not t0dict:
self.t0dict = holdings.get(code[2:] + "rt", None)
else:
self.t0dict = t0dict
self.position_cache = {}
self.t1value_cache = {}
self.t2value_cache = None
# t0 实时净值自然不 cache
self.positions = positions
self.position_zero = sum([v for _, v in self.t1dict.items()])
self.now = dt.datetime.now(tz=tz_bj).replace(tzinfo=None)
self.today = self.now.replace(hour=0, minute=0, second=0, microsecond=0)
self.t1_type = "未计算"
self.bar_cache = {}
self.t0_delta = None
self.t1_delta = None
# 不建议直接使用以上两者看变化量,在手动 set 后,以上两者可能继续为 None
if fetch:
df = fetch_backend("t1-" + code)
if df is not None:
df["date"] = pd.to_datetime(df["date"])
for i, r in df.iterrows():
self.set_t1(float(r["t1"]), r["date"].strftime("%Y-%m-%d"))
self.set_position(float(r["pos"]), r["date"].strftime("%Y-%m-%d"))
else: # nodf
emptydf = pd.DataFrame({"date": [], "t1": [], "pos": []})
save_backend("t1-" + code, emptydf, header=True)
[docs] def set_t1(self, value, date=None):
"""
设定 T-1 的基金净值,有时我们只想计算实时净值,这就不需要重复计算 t1,可以先行设定
:param value:
:param date:
:return:
"""
if date is None:
yesterday = last_onday(self.today)
datekey = yesterday.strftime("%Y%m%d")
else:
datekey = date.replace("/", "").replace("-", "")
if datekey in self.t1value_cache:
logger.debug("t-1 value already exists, rewriting...")
self.t1value_cache[datekey] = value
self.t1_type = "已计算"
[docs] def set_t2(self, value, date=None):
"""
手动设定 t2 净值
:param value:
:return:
"""
if not date:
date = last_onday(last_onday(self.today)).strftime("%Y-%m-%d")
self.t2value_cache = (value, date)
[docs] def get_t2(self, return_date=True):
"""
返回最新的已公布基金净值,注意这里严格按照最新公布,不一定是前两个交易日,可以更新,但更老会报错 DateMismatch
:param return_date:
:return: if return_date is True, tuple (value, %Y-%m-%d)
"""
if not self.t2value_cache:
last_r = get_rt(self.fcode)
last_value, last_date = last_r["current"], last_r["time"]
self.t2value_cache = (last_value, last_date)
if return_date:
return self.t2value_cache
else:
return self.t2value_cache[0]
[docs] @error_catcher
def get_t1(self, date=None, return_date=True):
"""
预测 date 日的净值,基于 date-1 日的净值和 date 日的外盘数据,数据自动缓存,不会重复计算
:param date: str. %Y-%m-%d. 注意若是 date 日为昨天,即今日预测昨日的净值,date 取默认值 None。
:param return_date: bool, default True. return tuple, the second one is date in the format %Y%m%d
:return: float, (str).
:raises NonAccurate: 由于外盘数据还未及时更新,而 raise,可在调用程序中用 except 捕获再处理。
"""
if date is None:
yesterday = last_onday(self.today)
datekey = yesterday.strftime("%Y%m%d")
else:
datekey = date.replace("/", "").replace("-", "")
if datekey not in self.t1value_cache:
logger.debug("no cache for t1 value, computing from beginning")
if self.positions:
current_pos = self.get_position(datekey, return_date=False)
hdict = scale_dict(self.t1dict.copy(), aim=current_pos * 100)
else:
current_pos = sum([v for _, v in self.t1dict.items()]) / 100
hdict = self.t1dict.copy()
if date is None: # 此时预测上个交易日净值
yesterday_str = datekey
last_value, last_date = self.get_t2()
last_date_obj = dt.datetime.strptime(last_date, "%Y-%m-%d")
cday = last_onday(last_onday(self.today))
while last_date_obj < cday: # 前天净值数据还没更新
# 是否存在部分 QDII 在 A 股交易日,美股休市日不更新净值的情形?
if (
cday.strftime("%Y-%m-%d") not in gap_info[self.fcode]
) and is_on(cday, "US", no_trading_days):
# 这里检查比较宽松,只要当天美股休市,就可以认为确实基金数据不存在而非未更新
self.t1_type = "前日未出"
raise DateMismatch(
self.code,
reason="%s netvalue has not been updated to the day before yesterday"
% self.code,
)
else:
cday = last_onday(cday)
# 经过这个没报错,就表示数据源是最新的
if last_date_obj >= last_onday(self.today): # 昨天数据已出,不需要再预测了
print(
"no need to predict t-1 value since it has been out for %s"
% self.code
)
self.t1_type = "昨日已出"
self.t1value_cache = {last_date.replace("-", ""): last_value}
if not return_date:
return last_value
else:
return last_value, last_date
else:
yesterday_str = datekey
fund_price = xu.get_daily(self.fcode) # 获取国内基金净值
fund_last = fund_price[fund_price["date"] < date].iloc[-1]
# 注意实时更新应用 date=None 传入,否则此处无法保证此数据是前天的而不是大前天的,因为没做校验
# 事实上这里计算的预测是针对 date 之前的最晚数据和之前一日的预测
last_value = fund_last["close"]
last_date = fund_last["date"].strftime("%Y-%m-%d")
self.t1_delta = (
1
+ evaluate_fluctuation(
hdict, yesterday_str, lastday=last_date, _check=True
)
/ 100
)
net = last_value * self.t1_delta
self.t1value_cache[datekey] = net
self.t1_type = "已计算"
if self.save:
df = pd.DataFrame(
{
"date": [datekey[:4] + "-" + datekey[4:6] + "-" + datekey[6:8]],
"t1": [net],
"pos": [current_pos],
}
)
save_backend("t1-" + self.code, df)
if not return_date:
return self.t1value_cache[datekey]
else:
return (
self.t1value_cache[datekey],
datekey[:4] + "-" + datekey[4:6] + "-" + datekey[6:8],
)
[docs] def get_t1_rate(self, date=None, return_date=True):
t1v, d = self.get_t1(date=date, return_date=True)
cp = get_rt(self.code)["current"]
r = (cp / t1v - 1) * 100
if return_date:
return r, d
else:
return r
[docs] def get_t0_rate(self, percent=False, return_date=True):
t0v, d = self.get_t0(percent=percent, return_date=True)
cp = get_rt(self.code)["current"]
r = (cp / t0v - 1) * 100
if return_date:
return r, d
else:
return r
[docs] def _base_value(self, code, shift):
if not shift:
funddf = xu.get_daily(code) ## 获取股指现货日线
return funddf[funddf["date"] <= last_onday(self.today)].iloc[-1][
"close"
] # 日期是按当地时间
# TODO: check it is indeed date of last_on(today)
else:
if code not in self.bar_cache:
funddf = get_bar(code, prev=168, interval="3600") ## 获取小时线
## 注意对于国内超长假期,prev 可能还不够
if self.now.hour > 6: # 昨日美国市场收盘才正常,才缓存参考小时线
self.bar_cache[code] = funddf
else:
funddf = self.bar_cache[code]
refdate = last_onday(self.today) + dt.timedelta(days=1) # 按北京时间校准
return funddf[funddf["date"] <= refdate + dt.timedelta(hours=shift)].iloc[
-1
][
"close"
] # 时间是按北京时间, 小时线只能手动缓存,日线不需要是因为自带透明缓存器
[docs] def get_t0(self, percent=False, return_date=True):
"""
获取当日实时净值估计, 该接口每日凌晨到美股收盘(早晨),不保证自洽和可用
:param percent: bool, default False。现在有两种实时的预测处理逻辑。若 percent 是 True,则将 t0dict 的
每个持仓标的的今日涨跌幅进行估算,若为 False,则将标的现价和标的对应指数昨日收盘价的比例作为涨跌幅估算。不推荐使用 percent=True.
:param return_date: bool, default True. return tuple, the second one is date in the format %Y%m%d
:return: float
"""
if not self.t0dict:
raise ValueError("Please provide t0dict for prediction")
t1value = self.get_t1(date=None, return_date=False)
t = 0
n = 0
today_str = self.today.strftime("%Y%m%d")
for k, v in self.t0dict.items():
if not isinstance(v, dict):
v = {"weight": v}
if len(k.split("~")) > 1 and k.split("~")[-1].isdigit():
# 为了持仓中可以同标的多次出现的 workaround
k = k.split("~")[0]
w = v["weight"]
shift = v.get("time", None)
base = v.get("base", None)
t += w
r = get_rt(
k
) # k should support get_rt, investing pid doesn't support this!
if percent:
c = w / 100 * (1 + r["percent"] / 100) # 直接取标的当日涨跌幅
else:
if k in futures_info and not base:
kf = futures_info[k]
elif not base:
kf = k[:-8] # k + "-futures"
else:
kf = base
try:
basev = self._base_value(kf, shift)
except Exception as e:
kf = get_alt(kf)
if not kf:
raise e
else:
basev = self._base_value(kf, shift)
c = w / 100 * r["current"] / basev
currency_code = get_currency_code(k)
if currency_code:
c = c * daily_increment(currency_code, today_str)
# TODO: 中间价未更新,但实时数据不检查问题也不大
n += c
n += (100 - t) / 100
t0value = n * t1value
self.t0_delta = n
if not return_date:
return t0value
else:
return t0value, self.today.strftime("%Y-%m-%d")
[docs] def set_position(self, value, date=None):
if date is None:
yesterday = last_onday(self.today)
datekey = yesterday.strftime("%Y%m%d")
else:
datekey = date.replace("/", "").replace("-", "")
self.position_cache[datekey] = value
[docs] @error_catcher
def get_position(self, date=None, refresh=False, return_date=True, **kws):
"""
基于 date 日之前的净值数据,对 date 预估需要的仓位进行计算。
:param date: str. %Y-%m-%d
:param refresh: bool, default False. 若为 True,则刷新缓存,重新计算仓位。
:param return_date: bool, default True. return tuple, the second one is date in the format %Y%m%d
:param kws: 一些预估仓位可能的超参。包括 window,预估所需的时间窗口,decay 加权平均的权重衰减,smooth 每日仓位处理的平滑函数。以上参数均可保持默认即可获得较好效果。
:return: float. 0-100. 100 代表满仓。
"""
if not date:
date = last_onday(self.today).strftime("%Y%m%d")
else:
date = date.replace("/", "").replace("-", "")
if date not in self.position_cache or refresh:
fdict = scale_dict(self.t1dict.copy(), aim=100)
l = kws.get("window", 4)
q = kws.get("decay", 0.8)
s = kws.get("smooth", _smooth_pos)
d = dt.datetime.strptime(date, "%Y%m%d")
posl = [sum([v for _, v in self.t1dict.items()]) / 100]
for _ in range(l):
d = last_onday(d)
for _ in range(l - 1):
d = next_onday(d)
pred = evaluate_fluctuation(
fdict,
d.strftime("%Y-%m-%d"),
lastday=last_onday(d).strftime("%Y-%m-%d"),
)
real = evaluate_fluctuation(
{self.fcode: 100},
d.strftime("%Y-%m-%d"),
lastday=last_onday(d).strftime("%Y-%m-%d"),
)
posl.append(s(real, pred, posl[-1]))
current_pos = sum([q ** i * posl[l - i - 1] for i in range(l)]) / sum(
[q ** i for i in range(l)]
)
self.position_cache[date] = current_pos
if not return_date:
return self.position_cache[date]
else:
return (
self.position_cache[date],
date[:4] + "-" + date[4:6] + "-" + date[6:8],
)
[docs] def benchmark_test(self, start, end, **kws):
"""
对该净值预测模型回测
:param start: str. 起始日期
:param end: str. 终止日期
:param kws: 可选仓位估计的超参。
:return: pd.DataFrame. real 列为真实涨跌幅,est 列为估计涨跌幅,diff 列为两者之差。
"""
compare_data = {
"date": [],
}
l = kws.get("window", 4)
q = kws.get("decay", 0.8)
c = kws.get("pos", self.position_zero)
s = kws.get("smooth", _smooth_pos)
real_holdings = {self.fcode: 100}
full_holdings = scale_dict(self.t1dict.copy(), aim=100)
compare_data["est"] = []
compare_data["real"] = []
compare_data["estpos3"] = []
compare_data["estpos1"] = []
fq = deque([c / 100] * l, maxlen=l)
current_pos = c / 100
dl = pd.Series(pd.date_range(start=start, end=end))
dl = dl[dl.isin(opendate)]
for j, d in enumerate(dl):
if j == 0:
continue
dstr = d.strftime("%Y%m%d")
lstdstr = dl.iloc[j - 1].strftime("%Y%m%d")
compare_data["date"].append(d)
fullestf = evaluate_fluctuation(full_holdings, dstr, lstdstr)
realf = evaluate_fluctuation(real_holdings, dstr, lstdstr)
estf = fullestf * current_pos
compare_data["est"].append(estf)
compare_data["estpos3"].append(current_pos)
compare_data["estpos1"].append(fq[-1])
compare_data["real"].append(realf)
pos = s(realf, fullestf, fq[-1])
fq.append(pos)
fq[0] = c / 100 ## 模拟实际的无状态仓位分析
if self.positions:
current_pos = sum([q ** i * fq[l - i - 1] for i in range(l)]) / sum(
[q ** i for i in range(l)]
)
if current_pos > 1:
current_pos = 1
cpdf = pd.DataFrame(compare_data)
cpdf["diff"] = cpdf["real"] - cpdf["est"]
self.cpdf = cpdf
return cpdf
[docs] def analyse(self):
"""
打印出回测结果的定量分析。
:return: None
"""
print("净值预测回测分析:\n")
self.analyse_deviate(self.cpdf, "diff")
self.analyse_percentile(self.cpdf, "diff")
self.analyse_ud(self.cpdf, "real", "diff")
[docs] @staticmethod
def analyse_ud(cpdf, col1, col2):
"""
:param cpdf: pd.DataFrame, with col1 as real netvalue and col2 as prediction difference
:param col1: str.
:param col2: str.
:return:
"""
uu, ud, dd, du, count = 0, 0, 0, 0, 0
# uu 实际上涨,real-est>0 (预测涨的少)
# ud 预测涨的多
# du 预测跌的多
# dd 预测跌的少
for i, row in cpdf.iterrows():
if row[col1] >= 0 and row[col2] > 0:
uu += 1
elif row[col1] >= 0 >= row[col2]:
ud += 1
elif row[col1] < 0 < row[col2]:
du += 1
else:
dd += 1
count += 1
print(
"\n涨跌偏差分析:",
"\n预测涨的比实际少: ",
round(uu / count, 2),
"\n预测涨的比实际多: ",
round(ud / count, 2),
"\n预测跌的比实际多: ",
round(du / count, 2),
"\n预测跌的比实际少: ",
round(dd / count, 2),
)
[docs] @staticmethod
def analyse_percentile(cpdf, col):
percentile = [1, 5, 25, 50, 75, 95, 99]
r = [round(d, 3) for d in np.percentile(list(cpdf[col]), percentile)]
print(
"\n预测偏差分位:",
"\n1% 分位: ",
r[0],
"\n5% 分位: ",
r[1],
"\n25% 分位: ",
r[2],
"\n50% 分位: ",
r[3],
"\n75% 分位: ",
r[4],
"\n95% 分位: ",
r[5],
"\n99% 分位: ",
r[6],
)
[docs] @staticmethod
def analyse_deviate(cpdf, col):
l = np.array(cpdf[col])
d1, d2 = np.mean(np.abs(l)), np.sqrt(np.mean(l ** 2))
print("\n平均偏离: ", d1, "\n标准差偏离: ", d2)