Skip to content

Latest commit

 

History

History
513 lines (417 loc) · 18 KB

File metadata and controls

513 lines (417 loc) · 18 KB

【backtrader 源码解析 33】analyzer.py 源码注释(枯燥,仅供参考)

原文:https://yunjinqi.blog.csdn.net/article/details/124577079

analyzer.py 源码解析

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import calendar
from collections import OrderedDict
import datetime
import pprint as pp

import backtrader as bt
from backtrader import TimeFrame
from backtrader.utils.py3 import MAXINT, with_metaclass

# analyzer 元类
class MetaAnalyzer(bt.MetaParams):
    def donew(cls, *args, **kwargs):
        '''
        Intercept the strategy parameter
        '''
        # Create the object and set the params in place
        _obj, args, kwargs = super(MetaAnalyzer, cls).donew(*args, **kwargs)

        _obj._children = list()
        # findowner 用于发现 _obj 的父类,bt.Strategy 的实例,如果没有找到,返回 None
        _obj.strategy = strategy = bt.metabase.findowner(_obj, bt.Strategy)
        # findowner 用于发现 _obj 的父类,属于 Analyzer 的实例,如果没有找到,返回 None
        _obj._parent = bt.metabase.findowner(_obj, Analyzer)
        # Register with a master observer if created inside one
        # findowner 用于发现 _obj 的父类,但是属于 bt.Observer 的实例,如果没有找到,返回 None
        masterobs = bt.metabase.findowner(_obj, bt.Observer)
        # 如果有 obs 的话,就把 analyzer 注册到 obs 中
        if masterobs is not None:
            masterobs._register_analyzer(_obj)
        # analyzer 的数据
        _obj.datas = strategy.datas

        # For each data add aliases: for first data: data and data0
        # 如果 analyzer 的数据不是 None 的话
        if _obj.datas:
            # analyzer 的 data 就是第一个数据
            _obj.data = data = _obj.datas[0]
            # 对于数据里面的每条 line
            for l, line in enumerate(data.lines):
                # 获取 line 的名字
                linealias = data._getlinealias(l)
                # 如果 line 的名字不是 None 的话,设置属性
                if linealias:
                    setattr(_obj, 'data_%s' % linealias, line)
                # 根据 index 设置 line 的名称
                setattr(_obj, 'data_%d' % l, line)
            # 循环数据,给数据设置不同的名称,可以通过 data_d 访问
            for d, data in enumerate(_obj.datas):
                setattr(_obj, 'data%d' % d, data)
                # 对不同的数据设置具体的属性名,可以通过属性名访问 line
                for l, line in enumerate(data.lines):
                    linealias = data._getlinealias(l)
                    if linealias:
                        setattr(_obj, 'data%d_%s' % (d, linealias), line)
                    setattr(_obj, 'data%d_%d' % (d, l), line)
        # 调用 create_analysis 方法
        _obj.create_analysis()

        # Return to the normal chain
        return _obj, args, kwargs

    # dopostint,如果 analyzer._perent 不是 None 的话,把 _obj 注册给 analyzer._perent
    def dopostinit(cls, _obj, *args, **kwargs):
        _obj, args, kwargs = \
            super(MetaAnalyzer, cls).dopostinit(_obj, *args, **kwargs)

        if _obj._parent is not None:
            _obj._parent._register(_obj)

        # Return to the normal chain
        return _obj, args, kwargs

# Analyzer 类
class Analyzer(with_metaclass(MetaAnalyzer, object)):
    '''Analyzer base class. All analyzers are subclass of this one

    An Analyzer instance operates in the frame of a strategy and provides an
    analysis for that strategy.

    # analyzer 类,所有的 analyzer 都是这个类的基类,一个 analyzer 在策略框架内操作,并且提供策略运行的分析

    Automagically set member attributes:

      - ``self.strategy`` (giving access to the *strategy* and anything
        accessible from it)

        # 访问到 strategy 实例

      - ``self.datas[x]`` giving access to the array of data feeds present in
        the the system, which could also be accessed via the strategy reference

      - ``self.data``, giving access to ``self.datas[0]``

      - ``self.dataX`` -> ``self.datas[X]``

      - ``self.dataX_Y`` -> ``self.datas[X].lines[Y]``

      - ``self.dataX_name`` -> ``self.datas[X].name``

      - ``self.data_name`` -> ``self.datas[0].name``

      - ``self.data_Y`` -> ``self.datas[0].lines[Y]``

      # 访问数据的方法

    This is not a *Lines* object, but the methods and operation follow the same
    design

      - ``__init__`` during instantiation and initial setup

      - ``start`` / ``stop`` to signal the begin and end of operations

      - ``prenext`` / ``nextstart`` / ``next`` family of methods that follow
        the calls made to the same methods in the strategy

      - ``notify_trade`` / ``notify_order`` / ``notify_cashvalue`` /
        ``notify_fund`` which receive the same notifications as the equivalent
        methods of the strategy

    The mode of operation is open and no pattern is preferred. As such the
    analysis can be generated with the ``next`` calls, at the end of operations
    during ``stop`` and even with a single method like ``notify_trade``

    The important thing is to override ``get_analysis`` to return a *dict-like*
    object containing the results of the analysis (the actual format is
    implementation dependent)

    # 下面的不是 line 对象,但是方法和操作设计方法和 strategy 是类似的。最重要的事情是重写 get_analysis,
    # 以返回一个字典形式的对象以保存分析的结果

    '''
    # 保存结果到 csv 中
    csv = True
    # 获取 analyzer 的长度的时候,其实是计算的策略的长度
    def __len__(self):
        '''Support for invoking ``len`` on analyzers by actually returning the
        current length of the strategy the analyzer operates on'''
        return len(self.strategy)

    # 添加一个 child 到 self._children
    def _register(self, child):
        self._children.append(child)
    # 调用 _prenext,对于每个 child,调用 _prenext
    def _prenext(self):
        for child in self._children:
            child._prenext()
        # 调用 prenext
        self.prenext()

    # 通知 cash 和 value
    def _notify_cashvalue(self, cash, value):
        for child in self._children:
            child._notify_cashvalue(cash, value)

        self.notify_cashvalue(cash, value)
    # 通知 cash,value,fundvalue,shares
    def _notify_fund(self, cash, value, fundvalue, shares):
        for child in self._children:
            child._notify_fund(cash, value, fundvalue, shares)

        self.notify_fund(cash, value, fundvalue, shares)

    # 通知 trade
    def _notify_trade(self, trade):
        for child in self._children:
            child._notify_trade(trade)

        self.notify_trade(trade)

    # 通知 order
    def _notify_order(self, order):
        for child in self._children:
            child._notify_order(order)

        self.notify_order(order)

    # 调用 _nextstart
    def _nextstart(self):
        for child in self._children:
            child._nextstart()

        self.nextstart()

    # 调用 _next
    def _next(self):
        for child in self._children:
            child._next()

        self.next()

    # 调用 _start
    def _start(self):
        for child in self._children:
            child._start()

        self.start()

    # 调用 _stop
    def _stop(self):
        for child in self._children:
            child._stop()

        self.stop()

    # 通知 cash 和 value
    def notify_cashvalue(self, cash, value):
        '''Receives the cash/value notification before each next cycle'''
        pass

    # 通知 fund
    def notify_fund(self, cash, value, fundvalue, shares):
        '''Receives the current cash, value, fundvalue and fund shares'''
        pass

    # 通知 order
    def notify_order(self, order):
        '''Receives order notifications before each next cycle'''
        pass

    # 通知 trade
    def notify_trade(self, trade):
        '''Receives trade notifications before each next cycle'''
        pass

    # next
    def next(self):
        '''Invoked for each next invocation of the strategy, once the minum
        preiod of the strategy has been reached'''
        pass

    # prenext
    def prenext(self):
        '''Invoked for each prenext invocation of the strategy, until the minimum
        period of the strategy has been reached

        The default behavior for an analyzer is to invoke ``next``
        '''
        self.next()

    # nextstart
    def nextstart(self):
        '''Invoked exactly once for the nextstart invocation of the strategy,
        when the minimum period has been first reached
        '''
        self.next()

    # start
    def start(self):
        '''Invoked to indicate the start of operations, giving the analyzer
        time to setup up needed things'''
        pass

    # stop
    def stop(self):
        '''Invoked to indicate the end of operations, giving the analyzer
        time to shut down needed things'''
        pass

    # create_analysis 会创建一个有序字典
    def create_analysis(self):
        '''Meant to be overriden by subclasses. Gives a chance to create the
        structures that hold the analysis.

        The default behaviour is to create a ``OrderedDict`` named ``rets``
        '''
        self.rets = OrderedDict()

    # 获取分析结果,会返回 self.rets
    def get_analysis(self):
        '''Returns a *dict-like* object with the results of the analysis

        The keys and format of analysis results in the dictionary is
        implementation dependent.

        It is not even enforced that the result is a *dict-like object*, just
        the convention

        The default implementation returns the default OrderedDict ``rets``
        created by the default ``create_analysis`` method

        '''
        return self.rets

    # print 数据,通过 writerfile 打印相应的数据到标准输出
    def print(self, *args, **kwargs):
        '''Prints the results returned by ``get_analysis`` via a standard
        ``Writerfile`` object, which defaults to writing things to standard
        output
        '''
        # 创建一个 writer
        writer = bt.WriterFile(*args, **kwargs)
        # writer 开始
        writer.start()
        # pdct 代表一个空字典
        pdct = dict()
        # 用类名作为 key,保存分析的结果
        pdct[self.__class__.__name__] = self.get_analysis()
        # 把 pdct 保存到 writer 中
        writer.writedict(pdct)
        # writer 结束
        writer.stop()
    # 使用 pprint 打印相关的信息
    def pprint(self, *args, **kwargs):
        '''Prints the results returned by ``get_analysis`` using the pretty
        print Python module (*pprint*)
        '''
        pp.pprint(self.get_analysis(), *args, **kwargs)

# 周期分析元类
class MetaTimeFrameAnalyzerBase(Analyzer.__class__):
    # 如果存在 _on_dt_over,改成 on_dt_over
    def __new__(meta, name, bases, dct):
        # Hack to support original method name
        if '_on_dt_over' in dct:
            dct['on_dt_over'] = dct.pop('_on_dt_over')  # rename method

        return super(MetaTimeFrameAnalyzerBase, meta).__new__(meta, name,
                                                              bases, dct)

# 周期分析基类
class TimeFrameAnalyzerBase(with_metaclass(MetaTimeFrameAnalyzerBase,
                                           Analyzer)):
    # 参数
    params = (
        ('timeframe', None),
        ('compression', None),
        ('_doprenext', True),
    )

    # 开始
    def _start(self):
        # Override to add specific attributes
        # 设置交易周期,比如分钟
        self.timeframe = self.p.timeframe or self.data._timeframe
        # 设置周期的数目,比如 5,
        self.compression = self.p.compression or self.data._compression

        self.dtcmp, self.dtkey = self._get_dt_cmpkey(datetime.datetime.min)
        super(TimeFrameAnalyzerBase, self)._start()

    # 调用 _prenext
    def _prenext(self):
        for child in self._children:
            child._prenext()

        if self._dt_over():
            self.on_dt_over()

        if self.p._doprenext:
            self.prenext()

    # 调用 _nextstart
    def _nextstart(self):
        for child in self._children:
            child._nextstart()

        if self._dt_over() or not self.p._doprenext:  # exec if no prenext
            self.on_dt_over()

        self.nextstart()

    # 调用 _next
    def _next(self):
        for child in self._children:
            child._next()

        if self._dt_over():
            self.on_dt_over()

        self.next()

    # 调用 on_dt_over
    def on_dt_over(self):
        pass

    # _dt_over
    def _dt_over(self):
        # 如果交易周期等于没有时间周期,dtcmp 等于最大整数,dtkey 等于最大时间
        if self.timeframe == TimeFrame.NoTimeFrame:
            dtcmp, dtkey = MAXINT, datetime.datetime.max
        # 否则,就调用 _get_dt_cmpkey(dt)获取 dtcmp, dtkey
        else:
            # With >= 1.9.x the system datetime is in the strategy
            dt = self.strategy.datetime.datetime()
            dtcmp, dtkey = self._get_dt_cmpkey(dt)
        # 如果 dtcmp 是 None,或者 dtcmp 大于 self.dtcmp 的话
        if self.dtcmp is None or dtcmp > self.dtcmp:
            # 设置 dtkey,dtkey1,dtcmp,dtcmp1 返回 True
            self.dtkey, self.dtkey1 = dtkey, self.dtkey
            self.dtcmp, self.dtcmp1 = dtcmp, self.dtcmp
            return True
        # 返回 False
        return False

    # 获取 dtcmp, dtkey
    def _get_dt_cmpkey(self, dt):
        # 如果当前的交易周期是没有时间周期的话,返回两个 None
        if self.timeframe == TimeFrame.NoTimeFrame:
            return None, None
        # 如果当前的交易周期是年的话
        if self.timeframe == TimeFrame.Years:
            dtcmp = dt.year
            dtkey = datetime.date(dt.year, 12, 31)
        # 如果交易周期是月的话
        elif self.timeframe == TimeFrame.Months:
            dtcmp = dt.year * 100 + dt.month
            # 获取最后一天
            _, lastday = calendar.monthrange(dt.year, dt.month)
            # 获取每月最后一天
            dtkey = datetime.datetime(dt.year, dt.month, lastday)
        # 如果交易周期是星期的话
        elif self.timeframe == TimeFrame.Weeks:
            # 对日期返回年、周数和周几
            isoyear, isoweek, isoweekday = dt.isocalendar()
            # todo 推测这个里面乘以的数应该是 1000,乘以 100,有可能和 months 相等
            # dtcmp = isoyear * 100 + isoweek
            dtcmp = isoyear * 1000 + isoweek
            # 周末
            sunday = dt + datetime.timedelta(days=7 - isoweekday)
            # 获取每周的最后一天
            dtkey = datetime.datetime(sunday.year, sunday.month, sunday.day)
        # 如果交易周期是天的话,计算具体的 dtcmp,dtkey
        elif self.timeframe == TimeFrame.Days:
            dtcmp = dt.year * 10000 + dt.month * 100 + dt.day
            dtkey = datetime.datetime(dt.year, dt.month, dt.day)
        # 如果交易周期小于天的话,调用 _get_subday_cmpkey 来获取
        else:
            dtcmp, dtkey = self._get_subday_cmpkey(dt)

        return dtcmp, dtkey

    # 如果交易周期小于天
    def _get_subday_cmpkey(self, dt):
        # Calculate intraday position
        # 计算当前的分钟数目
        point = dt.hour * 60 + dt.minute
        # 如果当前的交易周期小于分钟,point 转换成秒
        if self.timeframe < TimeFrame.Minutes:
            point = point * 60 + dt.second
        # 如果当前的交易周期小于秒,point 转变为毫秒
        if self.timeframe < TimeFrame.Seconds:
            point = point * 1e6 + dt.microsecond

        # Apply compression to update point position (comp 5 -> 200 // 5)
        # 根据周期的数目,计算当前的 point
        point = point // self.compression

        # Move to next boundary
        # 移动到下个
        point += 1

        # Restore point to the timeframe units by de-applying compression
        # 计算下个 point 结束的点位
        point *= self.compression

        # Get hours, minutes, seconds and microseconds
        # 如果交易周期等于分钟,得到 ph,pm
        if self.timeframe == TimeFrame.Minutes:
            ph, pm = divmod(point, 60)
            ps = 0
            pus = 0
        # 如果交易周期等于秒,得到 ph,pm,ps
        elif self.timeframe == TimeFrame.Seconds:
            ph, pm = divmod(point, 60 * 60)
            pm, ps = divmod(pm, 60)
            pus = 0
        # 如果是毫秒,得到 ph,pm,ps,pus
        elif self.timeframe == TimeFrame.MicroSeconds:
            ph, pm = divmod(point, 60 * 60 * 1e6)
            pm, psec = divmod(pm, 60 * 1e6)
            ps, pus = divmod(psec, 1e6)
        # 是否是下一天
        extradays = 0
        #  小时大于 23,整除,计算是不是下一天了
        if ph > 23:  # went over midnight:
            extradays = ph // 24
            ph %= 24

        # moving 1 minor unit to the left to be in the boundary
        # pm -= self.timeframe == TimeFrame.Minutes
        # ps -= self.timeframe == TimeFrame.Seconds
        # pus -= self.timeframe == TimeFrame.MicroSeconds
        # 需要调整的时间
        tadjust = datetime.timedelta(
            minutes=self.timeframe == TimeFrame.Minutes,
            seconds=self.timeframe == TimeFrame.Seconds,
            microseconds=self.timeframe == TimeFrame.MicroSeconds)

        # Add extra day if present
        # 如果下一天是 True 的话,把时间调整到下一天
        if extradays:
            dt += datetime.timedelta(days=extradays)

        # Replace intraday parts with the calculated ones and update it
        # 计算 dtcmp
        dtcmp = dt.replace(hour=ph, minute=pm, second=ps, microsecond=pus)
        # 对 dtcmp 进行调整
        dtcmp -= tadjust
        # dtkey 等于 dtcmp
        dtkey = dtcmp

        return dtcmp, dtkey