『Backtrader』でFXのバックテストをする!:Python
【 今回やること! 】
- Pythonのライブラリの『Backtrader』を使って、FXのバックテストを行います。
- プログラムの作成と実行は『Jupyter Notebook』で行います。
『Jupyter Notebook』を含むPythonの開発環境の作成は、下記の記事で説明しています。
- プログラムの作成と実行は『Jupyter Notebook』で行います。
下記の記事で少し触れたのですが、"最初に使おうとしたライブラリ"が『Backtrader』です。
『Backtrader』に興味を持ったのは『OANDA API』に対応しているという情報を見たからです。『OANDA API』は、テストではなく実際にFXの自動売買システムを作るためのライブラリです。まだ売買システムも出来ていないので先を見すぎですが、どうせならと思いました。
※今回はバックテストに関する部分だけで『OANDA API』に絡む部分は触れていません。
ですが『Backtrader』ドキュメントを見るとハードルが高そうだったので、結局先にシンプルそうな『Backtesting.py』を試しました。今回『Backtrader』のドキュメントを読み返すと、共通点があるので以前より理解しやすくなっていました。最初難しく感じたのは、私がバックテストの流れも何も分かっていなかったせいなので、ある程度知識がある方なら最初から『Backtrader』で大丈夫だと思います。
『Backtrader』は『Backtesting.py』に比べるとインジケータの種類が多いところが良いです。インジケータとは為替データに様々な計算を加えて、売買のタイミングの目安となる数値を算出するものです。
また『Backtesting.py』は取引量を設定出来ませんが、『Backtrader』は設定できます。
英語ですがコミュニティサイトがあるのも『Backtrader』の良いところです。私はGoogle翻訳頼みですが参考になります。
『Jupyter Notebook』で『Backtrader』を使ってバックテストをする方法を説明します。
目次
『Backtrader』の公式サイト
今回の作業の前提
『Backtrader』のインストール
- 新規または既存の『Notebook』を開きます。
- 『コードセル』に下記のコードを貼り付けて実行します。
!pip install backtrader[plotting]
ライブラリがインストールされます。
ここから『Jupyter Notebook』で作業
バックテストプログラム
コード
このプログラムでは、ヒストリカルデータの読み込み→ストラテジーの設定→バックテストの設定→実行→結果出力というバックテストの一連の流れを行っています。
- 『コードセル』に下記のコードを貼り付けて実行します。
%%time # ↑セルの処理時間を計算 セルの最初に単独で書く import os # ディレクトリ作成、パス結合、ファイル削除 import pandas as pd # pandasデータフレームを使用 dir_name = 'hst_20191128' # ヒストリカルデータのフォルダ名 input_csv = os.path.join(os.getcwd(), dir_name, 'USDJPY_20190930.csv') # csvファイルのフルパス df = pd.read_csv(input_csv) # csvファイルをPandasデータフレームに読み込む #日時列をdatatime型にしてインデックスにして、元の列は削除する df = df.set_index(pd.to_datetime(df['DateTime'])).drop('DateTime', axis=1) import backtrader as bt # Backtrader import backtrader.feeds as btfeed # データ変換 data = btfeed.PandasData(dataname=df) # PandasのデータをBacktraderの形式に変換する class myStrategy(bt.Strategy): # ストラテジー n1 = 10 # 終値のSMA(単純移動平均)の期間 n2 = 30 # 終値のSMA(単純移動平均)の期間 def log(self, txt, dt=None, doprint=False): # ログ出力用のメソッド if doprint: print('{0:%Y-%m-%d %H:%M:%S}, {1}'.format( dt or self.datas[0].datetime.datetime(0), txt )) def __init__(self): # 事前処理 self.sma1 = bt.indicators.SMA(self.data.close, period=self.n1) # SMA(単純移動平均)のインジケータを追加 self.sma2 = bt.indicators.SMA(self.data.close, period=self.n2) # SMA(単純移動平均)のインジケータを追加 self.crossover = bt.indicators.CrossOver(self.sma1, self.sma2) # sma1がsma2を上回った時に1、下回ったときに-1を返す def next(self): # 行ごとに呼び出される if self.crossover > 0: # SMA1がSMA2を上回った場合 if self.position: # ポジションを持っている場合 self.close() # ポジションをクローズする self.buy() # 買い発注 elif self.crossover < 0: # SMA1がSMA2を下回った場合 if self.position: # ポジションを持っている場合 self.close() # ポジションをクローズする self.sell() # 売り発注 def notify_order(self, order): # 注文のステータスの変更を通知する if order.status in [order.Submitted, order.Accepted]: # 注文の状態が送信済or受理済の場合 return # 何もしない if order.status in [order.Completed]: # 注文の状態が完了済の場合 if order.isbuy(): # 買い注文の場合 self.log('買い約定, 取引量:{0:.2f}, 価格:{1:.2f}, 取引額:{2:.2f}, 手数料:{3:.2f}'.format( order.executed.size, # 取引量 order.executed.price, # 価格 order.executed.value, # 取引額 order.executed.comm # 手数料 ), dt=bt.num2date(order.executed.dt), # 約定の日時をdatetime型に変換 doprint=True # Trueの場合出力 ) elif order.issell(): # 売り注文の場合 self.log('売り約定, 取引量:{0:.2f}, 価格:{1:.2f}, 取引額:{2:.2f}, 手数料:{3:.2f}'.format( order.executed.size, # 取引量 order.executed.price, # 価格 order.executed.value, # 取引額 order.executed.comm # 手数料 ), dt=bt.num2date(order.executed.dt), # 約定の日時をdatetime型に変換 doprint=True # Trueの場合ログを出力する ) # 注文の状態がキャンセル済・マージンコール(証拠金不足)・拒否済の場合 elif order.status in [order.Canceled, order.Margin, order.Rejected]: self.log('注文 キャンセル・マージンコール(証拠金不足)・拒否', doprint=True ) def notify_trade(self, trade): # 取引の開始/更新/終了を通知する if trade.isclosed: # トレードが完了した場合 self.log('取引損益, 総額:{0:.2f}, 純額:{1:.2f}'.format( trade.pnl, # 損益 trade.pnlcomm # 手数料を差し引いた損益 ), doprint=True # Trueの場合ログを出力する ) # バックテストの設定 cerebro = bt.Cerebro() # Cerebroエンジンをインスタンス化 cerebro.addstrategy(myStrategy) # ストラテジーを追加 data = btfeed.PandasData(dataname=df) # Cerebro形式にデータを変換 cerebro.adddata(data) # データをCerebroエンジンに追加 cerebro.broker.setcash(10000.0) # 所持金を設定 cerebro.broker.setcommission(commission=0.0005) # 手数料(スプレッド)を0.05%に設定 cerebro.addsizer(bt.sizers.PercentSizer, percents=50) # デフォルト(buy/sellで取引量を設定していない時)の取引量を所持金に対する割合で指定する startcash = cerebro.broker.getvalue() # 開始時の所持金 cerebro.broker.set_coc(True) # 発注時の終値で約定する # 解析の設定 import backtrader.analyzers as btanalyzers # バックテストの解析用ライブラリ cerebro.addanalyzer(btanalyzers.DrawDown, _name='myDrawDown') # ドローダウン cerebro.addanalyzer(btanalyzers.SQN, _name='mySQN') # SQN cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='myTradeAnalyzer') # トレードの勝敗等の結果 thestrats = cerebro.run() # バックテストを実行 thestrat = thestrats[0] # 解析結果の取得 # 評価値の表示 print('Start :{0:%Y/%m/%d %H:%M:%S}'.format( # ヒストリカルデータの開始日時 pd.to_datetime(df.index.values[0]) )) print('End :{0:%Y/%m/%d %H:%M:%S}'.format (# ヒストリカルデータの開始日時 pd.to_datetime(df.index.values[-1]) )) print('Duration :{0}'.format( # ヒストリカルデータの期間の長さ pd.to_datetime(df.index.values[-1]) - pd.to_datetime(df.index.values[0]) )) print('Exposure[%] :{0:.2f}'.format( # ポジションを持っていた期間の割合(ポジションを持っていた期間÷全期間×100) thestrat.analyzers.myTradeAnalyzer.get_analysis().len.total / len(df) * 100 )) print('Equity Final[$] :{0:.2f}'.format( # 所持金の最終値(closedした取引) startcash + thestrat.analyzers.myTradeAnalyzer.get_analysis().pnl.net.total )) print('Return[%] :{0:.2f}'.format( # 利益率=損益÷開始時所持金×100 thestrat.analyzers.myTradeAnalyzer.get_analysis().pnl.net.total / startcash * 100 )) print('Buy & Hold Return[%] :{0:.2f}'.format( # ((終了時の終値 - 開始時の終値)÷ 開始時の終値)の絶対値×100 (df['Close'][-1] - df['Close'][0]) / df['Close'][0] * 100 )) print('Max. Drawdown[%] :{0:.2f}'.format( # 最大下落率 thestrat.analyzers.myDrawDown.get_analysis().max.drawdown )) print('Max. Drawdown Duration:{0}'.format( # 最大下落期間 pd.Timedelta(minutes=thestrat.analyzers.myDrawDown.get_analysis().max.len) )) print('Trades :{0}'.format( # 取引回数 thestrat.analyzers.myTradeAnalyzer.get_analysis().total.closed )) winrate = ( # 勝率 thestrat.analyzers.myTradeAnalyzer.get_analysis().won.total / thestrat.analyzers.myTradeAnalyzer.get_analysis().total.closed ) lostrate = ( # 敗率 thestrat.analyzers.myTradeAnalyzer.get_analysis().lost.total / thestrat.analyzers.myTradeAnalyzer.get_analysis().total.closed ) print('Win Rate[%] :{0:.2f}'.format( # 勝率=勝ち取引回数÷全取引回数×100 winrate * 100 )) print('Best Trade[%] :{0:.2f}'.format( # 1回の取引での利益の最大値÷所持金×100 thestrat.analyzers.myTradeAnalyzer.get_analysis().won.pnl.max / startcash * 100 )) print('Worst Trade[%] :{0:.2f}'.format( # 1回の取引での損失の最大値÷所持金×100 thestrat.analyzers.myTradeAnalyzer.get_analysis().lost.pnl.max / startcash * 100 )) print('Avg. Trade[%] :{0:.2f}'.format( # 損益の平均値÷所持金×100 thestrat.analyzers.myTradeAnalyzer.get_analysis().pnl.net.average / startcash * 100 )) print('Max. Trade Duration :{0}'.format( # 1回の取引での最長期間 pd.Timedelta(minutes=thestrat.analyzers.myTradeAnalyzer.get_analysis().len.max) )) print('Avg. Trade Duration :{0}'.format( # 1回の取引での平均期間 pd.Timedelta(minutes=thestrat.analyzers.myTradeAnalyzer.get_analysis().len.average) )) print('Expectancy[%] :{0:.2f}'.format( # 期待値=平均利益×勝率+平均損失×敗率 thestrat.analyzers.myTradeAnalyzer.get_analysis().won.pnl.average * winrate + thestrat.analyzers.myTradeAnalyzer.get_analysis().lost.pnl.average * lostrate )) print('SQN :{0:.2f}'.format( # SQN システムの評価値 thestrat.analyzers.mySQN.get_analysis().sqn )) # グラフの設定 %matplotlib inline # ↑グラフをNotebook内に描画する import matplotlib.pylab as pylab # グラフ描画用ライブラリ pylab.rcParams['figure.figsize'] = 12, 8 # グラフのサイズ cerebro.plot( style = 'candle', # ロウソク表示にする barup = 'green', barupfill = False, # 陽線の色、塗りつぶし設定 bardown = 'red', bardownfill = False, # 陰線の色、塗りつぶし設定 fmt_x_data = '%Y-%m-%d %H:%M:%S' # 時間軸のマウスオーバー時の表示フォーマット )
- 下記のような実行結果とグラフが表示されます。
2019-09-30 00:58:00, 売り約定, 取引量:-46.31, 価格:107.97, 取引額:-5000.00, 手数料:2.50 2019-09-30 02:03:00, 買い約定, 取引量:46.31, 価格:107.90, 取引額:-5000.00, 手数料:2.50 2019-09-30 02:03:00, 買い約定, 取引量:46.31, 価格:107.90, 取引額:4996.80, 手数料:2.50 2019-09-30 02:04:00, 取引損益, 総額:3.20, 純額:-1.80 (省略) Start :2019/09/30 00:01:00 # ヒストリカルデータの開始日時 End :2019/09/30 23:59:00 # ヒストリカルデータの終了日 Duration :0 days 23:58:00 # ヒストリカルデータの期間の長さ Exposure[%] :92.70 # ポジションを持っていた期間の割合(ポジションを持っていた期間÷全期間×100 Equity Final[$] :9721.88 # 所持金の最終値 Return[%] :-2.78 # 利益率=損益÷開始時所持金×10 Buy & Hold Return[%] :0.10 # ((終了時の終値 - 開始時の終値)÷ 開始時の終値)の絶対値×100 Max. Drawdown[%] :2.84 # 最大下落率 Max. Drawdown Duration:0 days 22:19:00 # 最大下落期間 Trades :54 # 取引回数 Win Rate[%] :1.85 # 勝率=勝ち取引回数÷全取引回数×100 Best Trade[%] :0.01 # 1回の取引の利益の最大値÷所持金×100 Worst Trade[%] :-0.10 # 1回の取引の損失の最大値÷所持金×100 Avg. Trade[%] :-0.05 # 損益の平均値÷所持金×100 Max. Trade Duration :0 days 01:50:00 # 1回の取引の最長期間 Avg. Trade Duration :0 days 00:24:42.222222 # 1回の取引の平均期間 Expectancy[%] :-5.15 # 期待値=平均利益×勝率+平均損失×敗率 SQN :-20.96 # SQN システムの評価値 # SQN値によって下記のように分類されます。 # 1.6-1.9:平均以下 # 2.0-2.4:平均 # 2.5-2.9:良い # 3.0-5.0:素晴らしい # 5.1-6.9:最高 # 7.0~:聖杯?

補足
取引量
売買の際の取引量は『buy(size=n)/sell(size=n)』で設定します。
『buy()/sell()』で設定されていない場合は『cerebro.addsizer()』の設定で取引量が決まります。
- 『buy()』で10通貨買います。
self.buy(size=10) # 10通貨買う
- 売り買いの注文時の取引量を所持金に対する割合で設定します。
cerebro.addsizer(bt.sizers.PercentSizer, percents=50) # 取引量を所持金に対する割合で指定
取引量を整数にする場合は『bt.sizers.PercentSizeInt』を設定します。
『percents=200』にすると所持金の2倍になります。
- 売り買いの注文時の取引量を数値で指定します。
cerebro.addsizer(bt.sizers.FixedSize, stake=50) # 取引量を50通貨に設定
バックテストの解析
アナライザーを設定→『cerebro.run()』でバックテスト実行→『cerebro.run()』の戻り値から結果を取得する、という手順で解析します。
設定できるアナライザーはドキュメントの下記のページを参照して下さい。
発注した時間軸の終値(Close)で約定する
通常では『buy()/sell()』で発注した次の時間軸の始値(Open)で約定されます。
下記の設定をすることで、発注した時間軸の終値(Close)で約定されます。
cerebro.broker.set_coc(True) # 発注時の終値で約定する
現在の時間軸の値
ストラテジーの『next()』で現在の値は下記のように取得します。
def next(self): # 行ごとに呼び出される print('現在の始値:{0:.2f}'.format(self.data.open[0])) print('現在の終値:{0:.2f}'.format(self.data.close[0]))
あとがき
『Backtesting.py』を使った時のプログラムに近づけようと思ったのですが、複雑になりそうな部分はある程度省略しました。
今回も最初は『Google Colaboratory』で動かそうと思ったのですがグラフが表示できなかったため、『Jupyter Notebook』に変更しました。
