analyzer.py 源码解析
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import calendar
from collections import OrderedDict
import datetime
import pprint as pp
import backtrader as bt
from backtrader import TimeFrame
from backtrader.utils.py3 import MAXINT, with_metaclass
# analyzer 元类
class MetaAnalyzer(bt.MetaParams):
def donew(cls, *args, **kwargs):
'''
Intercept the strategy parameter
'''
# Create the object and set the params in place
_obj, args, kwargs = super(MetaAnalyzer, cls).donew(*args, **kwargs)
_obj._children = list()
# findowner 用于发现 _obj 的父类,bt.Strategy 的实例,如果没有找到,返回 None
_obj.strategy = strategy = bt.metabase.findowner(_obj, bt.Strategy)
# findowner 用于发现 _obj 的父类,属于 Analyzer 的实例,如果没有找到,返回 None
_obj._parent = bt.metabase.findowner(_obj, Analyzer)
# Register with a master observer if created inside one
# findowner 用于发现 _obj 的父类,但是属于 bt.Observer 的实例,如果没有找到,返回 None
masterobs = bt.metabase.findowner(_obj, bt.Observer)
# 如果有 obs 的话,就把 analyzer 注册到 obs 中
if masterobs is not None:
masterobs._register_analyzer(_obj)
# analyzer 的数据
_obj.datas = strategy.datas
# For each data add aliases: for first data: data and data0
# 如果 analyzer 的数据不是 None 的话
if _obj.datas:
# analyzer 的 data 就是第一个数据
_obj.data = data = _obj.datas[0]
# 对于数据里面的每条 line
for l, line in enumerate(data.lines):
# 获取 line 的名字
linealias = data._getlinealias(l)
# 如果 line 的名字不是 None 的话,设置属性
if linealias:
setattr(_obj, 'data_%s' % linealias, line)
# 根据 index 设置 line 的名称
setattr(_obj, 'data_%d' % l, line)
# 循环数据,给数据设置不同的名称,可以通过 data_d 访问
for d, data in enumerate(_obj.datas):
setattr(_obj, 'data%d' % d, data)
# 对不同的数据设置具体的属性名,可以通过属性名访问 line
for l, line in enumerate(data.lines):
linealias = data._getlinealias(l)
if linealias:
setattr(_obj, 'data%d_%s' % (d, linealias), line)
setattr(_obj, 'data%d_%d' % (d, l), line)
# 调用 create_analysis 方法
_obj.create_analysis()
# Return to the normal chain
return _obj, args, kwargs
# dopostint,如果 analyzer._perent 不是 None 的话,把 _obj 注册给 analyzer._perent
def dopostinit(cls, _obj, *args, **kwargs):
_obj, args, kwargs = \
super(MetaAnalyzer, cls).dopostinit(_obj, *args, **kwargs)
if _obj._parent is not None:
_obj._parent._register(_obj)
# Return to the normal chain
return _obj, args, kwargs
# Analyzer 类
class Analyzer(with_metaclass(MetaAnalyzer, object)):
'''Analyzer base class. All analyzers are subclass of this one
An Analyzer instance operates in the frame of a strategy and provides an
analysis for that strategy.
# analyzer 类,所有的 analyzer 都是这个类的基类,一个 analyzer 在策略框架内操作,并且提供策略运行的分析
Automagically set member attributes:
- ``self.strategy`` (giving access to the *strategy* and anything
accessible from it)
# 访问到 strategy 实例
- ``self.datas[x]`` giving access to the array of data feeds present in
the the system, which could also be accessed via the strategy reference
- ``self.data``, giving access to ``self.datas[0]``
- ``self.dataX`` -> ``self.datas[X]``
- ``self.dataX_Y`` -> ``self.datas[X].lines[Y]``
- ``self.dataX_name`` -> ``self.datas[X].name``
- ``self.data_name`` -> ``self.datas[0].name``
- ``self.data_Y`` -> ``self.datas[0].lines[Y]``
# 访问数据的方法
This is not a *Lines* object, but the methods and operation follow the same
design
- ``__init__`` during instantiation and initial setup
- ``start`` / ``stop`` to signal the begin and end of operations
- ``prenext`` / ``nextstart`` / ``next`` family of methods that follow
the calls made to the same methods in the strategy
- ``notify_trade`` / ``notify_order`` / ``notify_cashvalue`` /
``notify_fund`` which receive the same notifications as the equivalent
methods of the strategy
The mode of operation is open and no pattern is preferred. As such the
analysis can be generated with the ``next`` calls, at the end of operations
during ``stop`` and even with a single method like ``notify_trade``
The important thing is to override ``get_analysis`` to return a *dict-like*
object containing the results of the analysis (the actual format is
implementation dependent)
# 下面的不是 line 对象,但是方法和操作设计方法和 strategy 是类似的。最重要的事情是重写 get_analysis,
# 以返回一个字典形式的对象以保存分析的结果
'''
# 保存结果到 csv 中
csv = True
# 获取 analyzer 的长度的时候,其实是计算的策略的长度
def __len__(self):
'''Support for invoking ``len`` on analyzers by actually returning the
current length of the strategy the analyzer operates on'''
return len(self.strategy)
# 添加一个 child 到 self._children
def _register(self, child):
self._children.append(child)
# 调用 _prenext,对于每个 child,调用 _prenext
def _prenext(self):
for child in self._children:
child._prenext()
# 调用 prenext
self.prenext()
# 通知 cash 和 value
def _notify_cashvalue(self, cash, value):
for child in self._children:
child._notify_cashvalue(cash, value)
self.notify_cashvalue(cash, value)
# 通知 cash,value,fundvalue,shares
def _notify_fund(self, cash, value, fundvalue, shares):
for child in self._children:
child._notify_fund(cash, value, fundvalue, shares)
self.notify_fund(cash, value, fundvalue, shares)
# 通知 trade
def _notify_trade(self, trade):
for child in self._children:
child._notify_trade(trade)
self.notify_trade(trade)
# 通知 order
def _notify_order(self, order):
for child in self._children:
child._notify_order(order)
self.notify_order(order)
# 调用 _nextstart
def _nextstart(self):
for child in self._children:
child._nextstart()
self.nextstart()
# 调用 _next
def _next(self):
for child in self._children:
child._next()
self.next()
# 调用 _start
def _start(self):
for child in self._children:
child._start()
self.start()
# 调用 _stop
def _stop(self):
for child in self._children:
child._stop()
self.stop()
# 通知 cash 和 value
def notify_cashvalue(self, cash, value):
'''Receives the cash/value notification before each next cycle'''
pass
# 通知 fund
def notify_fund(self, cash, value, fundvalue, shares):
'''Receives the current cash, value, fundvalue and fund shares'''
pass
# 通知 order
def notify_order(self, order):
'''Receives order notifications before each next cycle'''
pass
# 通知 trade
def notify_trade(self, trade):
'''Receives trade notifications before each next cycle'''
pass
# next
def next(self):
'''Invoked for each next invocation of the strategy, once the minum
preiod of the strategy has been reached'''
pass
# prenext
def prenext(self):
'''Invoked for each prenext invocation of the strategy, until the minimum
period of the strategy has been reached
The default behavior for an analyzer is to invoke ``next``
'''
self.next()
# nextstart
def nextstart(self):
'''Invoked exactly once for the nextstart invocation of the strategy,
when the minimum period has been first reached
'''
self.next()
# start
def start(self):
'''Invoked to indicate the start of operations, giving the analyzer
time to setup up needed things'''
pass
# stop
def stop(self):
'''Invoked to indicate the end of operations, giving the analyzer
time to shut down needed things'''
pass
# create_analysis 会创建一个有序字典
def create_analysis(self):
'''Meant to be overriden by subclasses. Gives a chance to create the
structures that hold the analysis.
The default behaviour is to create a ``OrderedDict`` named ``rets``
'''
self.rets = OrderedDict()
# 获取分析结果,会返回 self.rets
def get_analysis(self):
'''Returns a *dict-like* object with the results of the analysis
The keys and format of analysis results in the dictionary is
implementation dependent.
It is not even enforced that the result is a *dict-like object*, just
the convention
The default implementation returns the default OrderedDict ``rets``
created by the default ``create_analysis`` method
'''
return self.rets
# print 数据,通过 writerfile 打印相应的数据到标准输出
def print(self, *args, **kwargs):
'''Prints the results returned by ``get_analysis`` via a standard
``Writerfile`` object, which defaults to writing things to standard
output
'''
# 创建一个 writer
writer = bt.WriterFile(*args, **kwargs)
# writer 开始
writer.start()
# pdct 代表一个空字典
pdct = dict()
# 用类名作为 key,保存分析的结果
pdct[self.__class__.__name__] = self.get_analysis()
# 把 pdct 保存到 writer 中
writer.writedict(pdct)
# writer 结束
writer.stop()
# 使用 pprint 打印相关的信息
def pprint(self, *args, **kwargs):
'''Prints the results returned by ``get_analysis`` using the pretty
print Python module (*pprint*)
'''
pp.pprint(self.get_analysis(), *args, **kwargs)
# 周期分析元类
class MetaTimeFrameAnalyzerBase(Analyzer.__class__):
# 如果存在 _on_dt_over,改成 on_dt_over
def __new__(meta, name, bases, dct):
# Hack to support original method name
if '_on_dt_over' in dct:
dct['on_dt_over'] = dct.pop('_on_dt_over') # rename method
return super(MetaTimeFrameAnalyzerBase, meta).__new__(meta, name,
bases, dct)
# 周期分析基类
class TimeFrameAnalyzerBase(with_metaclass(MetaTimeFrameAnalyzerBase,
Analyzer)):
# 参数
params = (
('timeframe', None),
('compression', None),
('_doprenext', True),
)
# 开始
def _start(self):
# Override to add specific attributes
# 设置交易周期,比如分钟
self.timeframe = self.p.timeframe or self.data._timeframe
# 设置周期的数目,比如 5,
self.compression = self.p.compression or self.data._compression
self.dtcmp, self.dtkey = self._get_dt_cmpkey(datetime.datetime.min)
super(TimeFrameAnalyzerBase, self)._start()
# 调用 _prenext
def _prenext(self):
for child in self._children:
child._prenext()
if self._dt_over():
self.on_dt_over()
if self.p._doprenext:
self.prenext()
# 调用 _nextstart
def _nextstart(self):
for child in self._children:
child._nextstart()
if self._dt_over() or not self.p._doprenext: # exec if no prenext
self.on_dt_over()
self.nextstart()
# 调用 _next
def _next(self):
for child in self._children:
child._next()
if self._dt_over():
self.on_dt_over()
self.next()
# 调用 on_dt_over
def on_dt_over(self):
pass
# _dt_over
def _dt_over(self):
# 如果交易周期等于没有时间周期,dtcmp 等于最大整数,dtkey 等于最大时间
if self.timeframe == TimeFrame.NoTimeFrame:
dtcmp, dtkey = MAXINT, datetime.datetime.max
# 否则,就调用 _get_dt_cmpkey(dt)获取 dtcmp, dtkey
else:
# With >= 1.9.x the system datetime is in the strategy
dt = self.strategy.datetime.datetime()
dtcmp, dtkey = self._get_dt_cmpkey(dt)
# 如果 dtcmp 是 None,或者 dtcmp 大于 self.dtcmp 的话
if self.dtcmp is None or dtcmp > self.dtcmp:
# 设置 dtkey,dtkey1,dtcmp,dtcmp1 返回 True
self.dtkey, self.dtkey1 = dtkey, self.dtkey
self.dtcmp, self.dtcmp1 = dtcmp, self.dtcmp
return True
# 返回 False
return False
# 获取 dtcmp, dtkey
def _get_dt_cmpkey(self, dt):
# 如果当前的交易周期是没有时间周期的话,返回两个 None
if self.timeframe == TimeFrame.NoTimeFrame:
return None, None
# 如果当前的交易周期是年的话
if self.timeframe == TimeFrame.Years:
dtcmp = dt.year
dtkey = datetime.date(dt.year, 12, 31)
# 如果交易周期是月的话
elif self.timeframe == TimeFrame.Months:
dtcmp = dt.year * 100 + dt.month
# 获取最后一天
_, lastday = calendar.monthrange(dt.year, dt.month)
# 获取每月最后一天
dtkey = datetime.datetime(dt.year, dt.month, lastday)
# 如果交易周期是星期的话
elif self.timeframe == TimeFrame.Weeks:
# 对日期返回年、周数和周几
isoyear, isoweek, isoweekday = dt.isocalendar()
# todo 推测这个里面乘以的数应该是 1000,乘以 100,有可能和 months 相等
# dtcmp = isoyear * 100 + isoweek
dtcmp = isoyear * 1000 + isoweek
# 周末
sunday = dt + datetime.timedelta(days=7 - isoweekday)
# 获取每周的最后一天
dtkey = datetime.datetime(sunday.year, sunday.month, sunday.day)
# 如果交易周期是天的话,计算具体的 dtcmp,dtkey
elif self.timeframe == TimeFrame.Days:
dtcmp = dt.year * 10000 + dt.month * 100 + dt.day
dtkey = datetime.datetime(dt.year, dt.month, dt.day)
# 如果交易周期小于天的话,调用 _get_subday_cmpkey 来获取
else:
dtcmp, dtkey = self._get_subday_cmpkey(dt)
return dtcmp, dtkey
# 如果交易周期小于天
def _get_subday_cmpkey(self, dt):
# Calculate intraday position
# 计算当前的分钟数目
point = dt.hour * 60 + dt.minute
# 如果当前的交易周期小于分钟,point 转换成秒
if self.timeframe < TimeFrame.Minutes:
point = point * 60 + dt.second
# 如果当前的交易周期小于秒,point 转变为毫秒
if self.timeframe < TimeFrame.Seconds:
point = point * 1e6 + dt.microsecond
# Apply compression to update point position (comp 5 -> 200 // 5)
# 根据周期的数目,计算当前的 point
point = point // self.compression
# Move to next boundary
# 移动到下个
point += 1
# Restore point to the timeframe units by de-applying compression
# 计算下个 point 结束的点位
point *= self.compression
# Get hours, minutes, seconds and microseconds
# 如果交易周期等于分钟,得到 ph,pm
if self.timeframe == TimeFrame.Minutes:
ph, pm = divmod(point, 60)
ps = 0
pus = 0
# 如果交易周期等于秒,得到 ph,pm,ps
elif self.timeframe == TimeFrame.Seconds:
ph, pm = divmod(point, 60 * 60)
pm, ps = divmod(pm, 60)
pus = 0
# 如果是毫秒,得到 ph,pm,ps,pus
elif self.timeframe == TimeFrame.MicroSeconds:
ph, pm = divmod(point, 60 * 60 * 1e6)
pm, psec = divmod(pm, 60 * 1e6)
ps, pus = divmod(psec, 1e6)
# 是否是下一天
extradays = 0
# 小时大于 23,整除,计算是不是下一天了
if ph > 23: # went over midnight:
extradays = ph // 24
ph %= 24
# moving 1 minor unit to the left to be in the boundary
# pm -= self.timeframe == TimeFrame.Minutes
# ps -= self.timeframe == TimeFrame.Seconds
# pus -= self.timeframe == TimeFrame.MicroSeconds
# 需要调整的时间
tadjust = datetime.timedelta(
minutes=self.timeframe == TimeFrame.Minutes,
seconds=self.timeframe == TimeFrame.Seconds,
microseconds=self.timeframe == TimeFrame.MicroSeconds)
# Add extra day if present
# 如果下一天是 True 的话,把时间调整到下一天
if extradays:
dt += datetime.timedelta(days=extradays)
# Replace intraday parts with the calculated ones and update it
# 计算 dtcmp
dtcmp = dt.replace(hour=ph, minute=pm, second=ps, microsecond=pus)
# 对 dtcmp 进行调整
dtcmp -= tadjust
# dtkey 等于 dtcmp
dtkey = dtcmp
return dtcmp, dtkey