Skip to content

Latest commit

 

History

History
308 lines (239 loc) · 10.6 KB

File metadata and controls

308 lines (239 loc) · 10.6 KB

【backtrader 源码解析 21】chainer 和 rollover 两个源代码解析(枯燥,仅供参考)

原文:https://yunjinqi.blog.csdn.net/article/details/124461126

chainer.py

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from datetime import datetime

import backtrader as bt
from backtrader.utils.py3 import range

# 创建一个 chainer 的元类
class MetaChainer(bt.DataBase.__class__):
    def __init__(cls, name, bases, dct):
        '''Class has already been created ... register'''
        # Initialize the class
        super(MetaChainer, cls).__init__(name, bases, dct)

    def donew(cls, *args, **kwargs):
        '''Intercept const. to copy timeframe/compression from 1st data'''
        # Create the object and set the params in place
        _obj, args, kwargs = super(MetaChainer, cls).donew(*args, **kwargs)

        if args:
            _obj.p.timeframe = args[0]._timeframe
            _obj.p.compression = args[0]._compression

        return _obj, args, kwargs

#
class Chainer(bt.with_metaclass(MetaChainer, bt.DataBase)):
    '''Class that chains datas'''
    # 当数据是实时数据的时候 ,会避免 preloading 和 runonce 行为
    def islive(self):
        '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
        should be deactivated'''
        return True
    # 初始化
    def __init__(self, *args):
        self._args = args

    # 开始
    def start(self):
        super(Chainer, self).start()
        for d in self._args:
            d.setenvironment(self._env)
            d._start()

        # put the references in a separate list to have pops
        self._ds = list(self._args)
        self._d = self._ds.pop(0) if self._ds else None
        self._lastdt = datetime.min

    # 停止
    def stop(self):
        super(Chainer, self).stop()
        for d in self._args:
            d.stop()

    # 通知
    def get_notifications(self):
        return [] if self._d is None else self._d.get_notifications()

    # 获取时区
    def _gettz(self):
        '''To be overriden by subclasses which may auto-calculate the
        timezone'''
        if self._args:
            return self._args[0]._gettz()
        return bt.utils.date.Localizer(self.p.tz)
    # load 数据,这个处理看起挺巧妙的,后续准备对期货数据的换月做一个处理或者数据到期之后就剔除这个数据
    def _load(self):
        while self._d is not None:
            if not self._d.next():  # no values from current data source
                self._d = self._ds.pop(0) if self._ds else None
                continue

            # Cannot deliver a date equal or less than an alredy delivered
            dt = self._d.datetime.datetime()
            if dt <= self._lastdt:
                continue

            self._lastdt = dt

            for i in range(self._d.size()):
                self.lines[i][0] = self._d.lines[i][0]

            return True

        # Out of the loop -> self._d is None, no data feed to return from
        return False 

rollover.py

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from datetime import datetime

import backtrader as bt

# rollover 元类
class MetaRollOver(bt.DataBase.__class__):
    def __init__(cls, name, bases, dct):
        '''Class has already been created ... register'''
        # Initialize the class
        super(MetaRollOver, cls).__init__(name, bases, dct)

    def donew(cls, *args, **kwargs):
        '''Intercept const. to copy timeframe/compression from 1st data'''
        # Create the object and set the params in place
        _obj, args, kwargs = super(MetaRollOver, cls).donew(*args, **kwargs)

        if args:
            _obj.p.timeframe = args[0]._timeframe
            _obj.p.compression = args[0]._compression

        return _obj, args, kwargs

class RollOver(bt.with_metaclass(MetaRollOver, bt.DataBase)):
    # 当条件满足之后,移动到下一个合约上
    '''Class that rolls over to the next future when a condition is met

    Params:

        - ``checkdate`` (default: ``None``)

          This must be a *callable* with the following signature::

            checkdate(dt, d):

          Where:

            - ``dt`` is a ``datetime.datetime`` object
            - ``d`` is the current data feed for the active future

          Expected Return Values:

            - ``True``: as long as the callable returns this, a switchover can
              happen to the next future

        If a commodity expires on the 3rd Friday of March, ``checkdate`` could
        return ``True`` for the entire week in which the expiration takes
        place.

            - ``False``: the expiration cannot take place

        # 这个参数是一个可调用对象 checkdate(dt,d),其中 dt 是一个时间对象,d 是当前活跃数据,
        # 如果返回的值是 True,就会转移到下一个合约上;如果是 False,就不会转移到下个合约上

        - ``checkcondition`` (default: ``None``)

          **Note**: This will only be called if ``checkdate`` has returned
          ``True``

          If ``None`` this will evaluate to ``True`` (execute roll over)
          internally

          Else this must be a *callable* with this signature::

            checkcondition(d0, d1)

          Where:

            - ``d0`` is the current data feed for the active future
            - ``d1`` is the data feed for the next expiration

          Expected Return Values:

            - ``True``: roll-over to the next future

        Following with the example from ``checkdate``, this could say that the
        roll-over can only happend if the *volume* from ``d0`` is already less
        than the volume from ``d1``

            - ``False``: the expiration cannot take place
        # 在 checkdate 返回是 True 的时候,将会调用这个功能,这个必须要是一个可调用对象,checkcondition(d0,d1)
        # 其中 d0 是当前激活的期货合约,d1 是下一个到期的合约,如果是 True 的话,将会从 d0 转移到 d1 上,如果不是,将不会发生转移。
    '''

    params = (
        # ('rolls', []),  # array of futures to roll over
        ('checkdate', None),  # callable
        ('checkcondition', None),  # callable
    )

    def islive(self):
        # 让数据是 live 形式,将会避免 preloading 和 runonce
        '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
        should be deactivated'''
        return True

    def __init__(self, *args):
        # 准备用于换月的期货合约
        self._rolls = args

    def start(self):
        super(RollOver, self).start()
        # 循环所有的数据,准备开始
        for d in self._rolls:
            d.setenvironment(self._env)
            d._start()

        # put the references in a separate list to have pops
        # todo 此处从新使用 list 好像用处不大,应为 self._rolls 本身就是 list 格式
        self._ds = list(self._rolls)
        # 第一个数据
        self._d = self._ds.pop(0) if self._ds else None
        # 到期数据
        self._dexp = None
        # 此处默认了一个最小的时间,当和任何时间对比的时候,都会进行移动
        self._dts = [datetime.min for xx in self._ds]

    def stop(self):
        # 结束数据
        super(RollOver, self).stop()
        for d in self._rolls:
            d.stop()

    def _gettz(self):
        # 获取具体的时区
        '''To be overriden by subclasses which may auto-calculate the
        timezone'''
        if self._rolls:
            return self._rolls[0]._gettz()
        return bt.utils.date.Localizer(self.p.tz)

    def _checkdate(self, dt, d):
        # 计算当前是否满足换月条件
        if self.p.checkdate is not None:
            return self.p.checkdate(dt, d)

        return False

    def _checkcondition(self, d0, d1):
        # 准备开始换月
        if self.p.checkcondition is not None:
            return self.p.checkcondition(d0, d1)

        return True

    def _load(self):
        # 加载数据的方法
        while self._d is not None:
            # 当 self._d 不是 None 的时候,调用 next
            _next = self._d.next()
            # 如果 _next 值是 None 的话,继续调用 next
            if _next is None:  # no values yet, more will come
                continue
            # 如果 _next 值是 False 的话,当前数据就换到下个数据上,
            if _next is False:  # no values from current data src
                if self._ds:
                    self._d = self._ds.pop(0)
                    self._dts.pop(0)
                else:
                    self._d = None
                continue
            # 当前数据的当前时间
            dt0 = self._d.datetime.datetime()  # current dt for active data

            # Synchronize other datas using dt0
            # 根据当前时间同步其他的数据
            for i, d_dt in enumerate(zip(self._ds, self._dts)):
                d, dt = d_dt
                # 如果其他数据的时间小于当前时间,就把其他数据向后移动,时间增加,并把时间保存到 self._dts 中
                while dt < dt0:
                    if d.next() is None:
                        continue
                    self._dts[i] = dt = d.datetime.datetime()

            # Move expired future as much as needed
            # 移动到期的数据
            while self._dexp is not None:
                if not self._dexp.next():
                    self._dexp = None
                    break

                if self._dexp.datetime.datetime() < dt0:
                    continue

            if self._dexp is None and self._checkdate(dt0, self._d):
                # rule has been met ... check other factors only if 2 datas
                # still there
                if self._ds and self._checkcondition(self._d, self._ds[0]):
                    # Time to switch to next data
                    self._dexp = self._d
                    self._d = self._ds.pop(0)
                    self._dts.pop(0)

            # Fill the line and tell we die
            self.lines.datetime[0] = self._d.lines.datetime[0]
            self.lines.open[0] = self._d.lines.open[0]
            self.lines.high[0] = self._d.lines.high[0]
            self.lines.low[0] = self._d.lines.low[0]
            self.lines.close[0] = self._d.lines.close[0]
            self.lines.volume[0] = self._d.lines.volume[0]
            self.lines.openinterest[0] = self._d.lines.openinterest[0]
            return True

        # Out of the loop -> self._d is None, no data feed to return from
        return False