671 lines
25 KiB
Python
671 lines
25 KiB
Python
import logging
|
|
from copy import deepcopy
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Literal
|
|
|
|
import numpy as np
|
|
from pandas import DataFrame, Series, concat, to_datetime
|
|
|
|
from freqtrade.constants import BACKTEST_BREAKDOWNS, DATETIME_PRINT_FORMAT
|
|
from freqtrade.data.metrics import (
|
|
calculate_cagr,
|
|
calculate_calmar,
|
|
calculate_csum,
|
|
calculate_expectancy,
|
|
calculate_market_change,
|
|
calculate_max_drawdown,
|
|
calculate_sharpe,
|
|
calculate_sortino,
|
|
calculate_sqn,
|
|
)
|
|
from freqtrade.ft_types import BacktestResultType, get_BacktestResultType_default
|
|
from freqtrade.util import decimals_per_coin, fmt_coin, get_dry_run_wallet
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_trade_signal_candles(
|
|
preprocessed_df: dict[str, DataFrame], bt_results: dict[str, Any], date_col: str
|
|
) -> dict[str, DataFrame]:
|
|
signal_candles_only = {}
|
|
for pair in preprocessed_df.keys():
|
|
signal_candles_only_df = DataFrame()
|
|
|
|
pairdf = preprocessed_df[pair]
|
|
resdf = bt_results["results"]
|
|
pairresults = resdf.loc[(resdf["pair"] == pair)]
|
|
|
|
if pairdf.shape[0] > 0:
|
|
for t, v in pairresults.iterrows():
|
|
allinds = pairdf.loc[(pairdf["date"] < v[date_col])]
|
|
signal_inds = allinds.iloc[[-1]]
|
|
signal_candles_only_df = concat(
|
|
[signal_candles_only_df.infer_objects(), signal_inds.infer_objects()]
|
|
)
|
|
|
|
signal_candles_only[pair] = signal_candles_only_df
|
|
return signal_candles_only
|
|
|
|
|
|
def generate_rejected_signals(
|
|
preprocessed_df: dict[str, DataFrame], rejected_dict: dict[str, DataFrame]
|
|
) -> dict[str, DataFrame]:
|
|
rejected_candles_only = {}
|
|
for pair, signals in rejected_dict.items():
|
|
rejected_signals_only_df = DataFrame()
|
|
pairdf = preprocessed_df[pair]
|
|
|
|
for t in signals:
|
|
data_df_row = pairdf.loc[(pairdf["date"] == t[0])].copy()
|
|
data_df_row["pair"] = pair
|
|
data_df_row["enter_tag"] = t[1]
|
|
|
|
rejected_signals_only_df = concat(
|
|
[rejected_signals_only_df.infer_objects(), data_df_row.infer_objects()]
|
|
)
|
|
|
|
rejected_candles_only[pair] = rejected_signals_only_df
|
|
return rejected_candles_only
|
|
|
|
|
|
def _generate_result_line(
|
|
result: DataFrame,
|
|
min_date: datetime,
|
|
max_date: datetime,
|
|
starting_balance: float,
|
|
first_column: str | list[str],
|
|
) -> dict:
|
|
"""
|
|
Generate one result dict, with "first_column" as key.
|
|
"""
|
|
profit_sum = result["profit_ratio"].sum()
|
|
# (end-capital - starting capital) / starting capital
|
|
profit_total = result["profit_abs"].sum() / starting_balance
|
|
backtest_days = (max_date - min_date).days or 1
|
|
final_balance = starting_balance + result["profit_abs"].sum()
|
|
expectancy, expectancy_ratio = calculate_expectancy(result)
|
|
winning_profit = result.loc[result["profit_abs"] > 0, "profit_abs"].sum()
|
|
losing_profit = result.loc[result["profit_abs"] < 0, "profit_abs"].sum()
|
|
profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0
|
|
|
|
try:
|
|
drawdown = calculate_max_drawdown(
|
|
result, value_col="profit_abs", starting_balance=starting_balance
|
|
)
|
|
|
|
except ValueError:
|
|
drawdown = None
|
|
|
|
return {
|
|
"key": first_column,
|
|
"trades": len(result),
|
|
"profit_mean": result["profit_ratio"].mean() if len(result) > 0 else 0.0,
|
|
"profit_mean_pct": (
|
|
round(result["profit_ratio"].mean() * 100.0, 2) if len(result) > 0 else 0.0
|
|
),
|
|
"profit_sum": profit_sum,
|
|
"profit_sum_pct": round(profit_sum * 100.0, 2),
|
|
"profit_total_abs": result["profit_abs"].sum(),
|
|
"profit_total": profit_total,
|
|
"profit_total_pct": round(profit_total * 100.0, 2),
|
|
"duration_avg": (
|
|
str(timedelta(minutes=round(result["trade_duration"].mean())))
|
|
if not result.empty
|
|
else "0:00"
|
|
),
|
|
# 'duration_max': str(timedelta(
|
|
# minutes=round(result['trade_duration'].max()))
|
|
# ) if not result.empty else '0:00',
|
|
# 'duration_min': str(timedelta(
|
|
# minutes=round(result['trade_duration'].min()))
|
|
# ) if not result.empty else '0:00',
|
|
"wins": len(result[result["profit_abs"] > 0]),
|
|
"draws": len(result[result["profit_abs"] == 0]),
|
|
"losses": len(result[result["profit_abs"] < 0]),
|
|
"winrate": len(result[result["profit_abs"] > 0]) / len(result) if len(result) else 0.0,
|
|
"cagr": calculate_cagr(backtest_days, starting_balance, final_balance),
|
|
"expectancy": expectancy,
|
|
"expectancy_ratio": expectancy_ratio,
|
|
"sortino": calculate_sortino(result, min_date, max_date, starting_balance),
|
|
"sharpe": calculate_sharpe(result, min_date, max_date, starting_balance),
|
|
"calmar": calculate_calmar(result, min_date, max_date, starting_balance),
|
|
"sqn": calculate_sqn(result, starting_balance),
|
|
"profit_factor": profit_factor,
|
|
"max_drawdown_account": drawdown.relative_account_drawdown if drawdown else 0.0,
|
|
"max_drawdown_abs": drawdown.drawdown_abs if drawdown else 0.0,
|
|
}
|
|
|
|
|
|
def calculate_trade_volume(trades_dict: list[dict[str, Any]]) -> float:
|
|
# Aggregate the total volume traded from orders.cost.
|
|
# Orders is a nested dictionary within the trades list.
|
|
|
|
return sum(sum(order["cost"] for order in trade.get("orders", [])) for trade in trades_dict)
|
|
|
|
|
|
def generate_pair_metrics( #
|
|
pairlist: list[str],
|
|
stake_currency: str,
|
|
starting_balance: float,
|
|
results: DataFrame,
|
|
min_date: datetime,
|
|
max_date: datetime,
|
|
skip_nan: bool = False,
|
|
) -> list[dict]:
|
|
"""
|
|
Generates and returns a list for the given backtest data and the results dataframe
|
|
:param pairlist: Pairlist used
|
|
:param stake_currency: stake-currency - used to correctly name headers
|
|
:param starting_balance: Starting balance
|
|
:param results: Dataframe containing the backtest results
|
|
:param skip_nan: Print "left open" open trades
|
|
:return: List of Dicts containing the metrics per pair
|
|
"""
|
|
|
|
tabular_data = []
|
|
|
|
for pair in pairlist:
|
|
result = results[results["pair"] == pair]
|
|
if skip_nan and result["profit_abs"].isnull().all():
|
|
continue
|
|
|
|
tabular_data.append(
|
|
_generate_result_line(result, min_date, max_date, starting_balance, pair)
|
|
)
|
|
|
|
# Sort by total profit %:
|
|
tabular_data = sorted(tabular_data, key=lambda k: k["profit_total_abs"], reverse=True)
|
|
|
|
# Append Total
|
|
tabular_data.append(
|
|
_generate_result_line(results, min_date, max_date, starting_balance, "TOTAL")
|
|
)
|
|
|
|
return tabular_data
|
|
|
|
|
|
def generate_tag_metrics(
|
|
tag_type: Literal["enter_tag", "exit_reason"] | list[Literal["enter_tag", "exit_reason"]],
|
|
starting_balance: float,
|
|
results: DataFrame,
|
|
min_date: datetime,
|
|
max_date: datetime,
|
|
skip_nan: bool = False,
|
|
) -> list[dict]:
|
|
"""
|
|
Generates and returns a list of metrics for the given tag trades and the results dataframe
|
|
:param starting_balance: Starting balance
|
|
:param results: Dataframe containing the backtest results
|
|
:param skip_nan: Print "left open" open trades
|
|
:return: List of Dicts containing the metrics per pair
|
|
"""
|
|
|
|
tabular_data = []
|
|
|
|
if all(
|
|
tag in results.columns for tag in (tag_type if isinstance(tag_type, list) else [tag_type])
|
|
):
|
|
for tags, group in results.groupby(tag_type):
|
|
if skip_nan and group["profit_abs"].isnull().all():
|
|
continue
|
|
|
|
tabular_data.append(
|
|
_generate_result_line(group, min_date, max_date, starting_balance, tags)
|
|
)
|
|
|
|
# Sort by total profit %:
|
|
tabular_data = sorted(tabular_data, key=lambda k: k["profit_total_abs"], reverse=True)
|
|
|
|
# Append Total
|
|
tabular_data.append(
|
|
_generate_result_line(results, min_date, max_date, starting_balance, "TOTAL")
|
|
)
|
|
return tabular_data
|
|
else:
|
|
return []
|
|
|
|
|
|
def generate_strategy_comparison(bt_stats: dict) -> list[dict]:
|
|
"""
|
|
Generate summary per strategy
|
|
:param bt_stats: Dict of <Strategyname: DataFrame> containing results for all strategies
|
|
:return: List of Dicts containing the metrics per Strategy
|
|
"""
|
|
|
|
tabular_data = []
|
|
for strategy, result in bt_stats.items():
|
|
tabular_data.append(deepcopy(result["results_per_pair"][-1]))
|
|
# Update "key" to strategy (results_per_pair has it as "Total").
|
|
tabular_data[-1]["key"] = strategy
|
|
tabular_data[-1]["max_drawdown_account"] = result["max_drawdown_account"]
|
|
tabular_data[-1]["max_drawdown_abs"] = fmt_coin(
|
|
result["max_drawdown_abs"], result["stake_currency"], False
|
|
)
|
|
return tabular_data
|
|
|
|
|
|
def _get_resample_from_period(period: str) -> str:
|
|
if period == "day":
|
|
return "1d"
|
|
if period == "week":
|
|
# Weekly defaulting to Monday.
|
|
return "1W-MON"
|
|
if period == "month":
|
|
return "1ME"
|
|
if period == "year":
|
|
return "1YE"
|
|
raise ValueError(f"Period {period} is not supported.")
|
|
|
|
|
|
def generate_periodic_breakdown_stats(
|
|
trade_list: list | DataFrame, period: str
|
|
) -> list[dict[str, Any]]:
|
|
results = trade_list if not isinstance(trade_list, list) else DataFrame.from_records(trade_list)
|
|
if len(results) == 0:
|
|
return []
|
|
results["close_date"] = to_datetime(results["close_date"], utc=True)
|
|
resample_period = _get_resample_from_period(period)
|
|
resampled = results.resample(resample_period, on="close_date")
|
|
stats = []
|
|
for name, day in resampled:
|
|
profit_abs = day["profit_abs"].sum().round(10)
|
|
wins = sum(day["profit_abs"] > 0)
|
|
draws = sum(day["profit_abs"] == 0)
|
|
losses = sum(day["profit_abs"] < 0)
|
|
trades = wins + draws + losses
|
|
winning_profit = day.loc[day["profit_abs"] > 0, "profit_abs"].sum()
|
|
losing_profit = day.loc[day["profit_abs"] < 0, "profit_abs"].sum()
|
|
profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0
|
|
stats.append(
|
|
{
|
|
"date": name.strftime("%d/%m/%Y"),
|
|
"date_ts": int(name.to_pydatetime().timestamp() * 1000),
|
|
"profit_abs": profit_abs,
|
|
"wins": wins,
|
|
"draws": draws,
|
|
"losses": losses,
|
|
"trades": trades,
|
|
"profit_factor": round(profit_factor, 8),
|
|
}
|
|
)
|
|
return stats
|
|
|
|
|
|
def generate_all_periodic_breakdown_stats(trade_list: list) -> dict[str, list]:
|
|
result = {}
|
|
for period in BACKTEST_BREAKDOWNS:
|
|
result[period] = generate_periodic_breakdown_stats(trade_list, period)
|
|
return result
|
|
|
|
|
|
def calc_streak(dataframe: DataFrame) -> tuple[int, int]:
|
|
"""
|
|
Calculate consecutive win and loss streaks
|
|
:param dataframe: Dataframe containing the trades dataframe, with profit_ratio column
|
|
:return: Tuple containing consecutive wins and losses
|
|
"""
|
|
|
|
df = Series(np.where(dataframe["profit_ratio"] > 0, "win", "loss")).to_frame("result")
|
|
df["streaks"] = df["result"].ne(df["result"].shift()).cumsum().rename("streaks")
|
|
df["counter"] = df["streaks"].groupby(df["streaks"]).cumcount() + 1
|
|
res = df.groupby(df["result"]).max()
|
|
#
|
|
cons_wins = int(res.loc["win", "counter"]) if "win" in res.index else 0
|
|
cons_losses = int(res.loc["loss", "counter"]) if "loss" in res.index else 0
|
|
return cons_wins, cons_losses
|
|
|
|
|
|
def generate_trading_stats(results: DataFrame) -> dict[str, Any]:
|
|
"""Generate overall trade statistics"""
|
|
if len(results) == 0:
|
|
return {
|
|
"wins": 0,
|
|
"losses": 0,
|
|
"draws": 0,
|
|
"winrate": 0,
|
|
"holding_avg": timedelta(),
|
|
"winner_holding_avg": timedelta(),
|
|
"loser_holding_avg": timedelta(),
|
|
"max_consecutive_wins": 0,
|
|
"max_consecutive_losses": 0,
|
|
}
|
|
|
|
winning_trades = results.loc[results["profit_ratio"] > 0]
|
|
draw_trades = results.loc[results["profit_ratio"] == 0]
|
|
losing_trades = results.loc[results["profit_ratio"] < 0]
|
|
|
|
holding_avg = (
|
|
timedelta(minutes=round(results["trade_duration"].mean()))
|
|
if not results.empty
|
|
else timedelta()
|
|
)
|
|
winner_holding_avg = (
|
|
timedelta(minutes=round(winning_trades["trade_duration"].mean()))
|
|
if not winning_trades.empty
|
|
else timedelta()
|
|
)
|
|
loser_holding_avg = (
|
|
timedelta(minutes=round(losing_trades["trade_duration"].mean()))
|
|
if not losing_trades.empty
|
|
else timedelta()
|
|
)
|
|
winstreak, loss_streak = calc_streak(results)
|
|
|
|
return {
|
|
"wins": len(winning_trades),
|
|
"losses": len(losing_trades),
|
|
"draws": len(draw_trades),
|
|
"winrate": len(winning_trades) / len(results) if len(results) else 0.0,
|
|
"holding_avg": holding_avg,
|
|
"holding_avg_s": holding_avg.total_seconds(),
|
|
"winner_holding_avg": winner_holding_avg,
|
|
"winner_holding_avg_s": winner_holding_avg.total_seconds(),
|
|
"loser_holding_avg": loser_holding_avg,
|
|
"loser_holding_avg_s": loser_holding_avg.total_seconds(),
|
|
"max_consecutive_wins": winstreak,
|
|
"max_consecutive_losses": loss_streak,
|
|
}
|
|
|
|
|
|
def generate_daily_stats(results: DataFrame) -> dict[str, Any]:
|
|
"""Generate daily statistics"""
|
|
if len(results) == 0:
|
|
return {
|
|
"backtest_best_day": 0,
|
|
"backtest_worst_day": 0,
|
|
"backtest_best_day_abs": 0,
|
|
"backtest_worst_day_abs": 0,
|
|
"winning_days": 0,
|
|
"draw_days": 0,
|
|
"losing_days": 0,
|
|
"daily_profit_list": [],
|
|
}
|
|
daily_profit_rel = results.resample("1d", on="close_date")["profit_ratio"].sum()
|
|
daily_profit = results.resample("1d", on="close_date")["profit_abs"].sum().round(10)
|
|
worst_rel = min(daily_profit_rel)
|
|
best_rel = max(daily_profit_rel)
|
|
worst = min(daily_profit)
|
|
best = max(daily_profit)
|
|
winning_days = sum(daily_profit > 0)
|
|
draw_days = sum(daily_profit == 0)
|
|
losing_days = sum(daily_profit < 0)
|
|
daily_profit_list = [(str(idx.date()), val) for idx, val in daily_profit.items()]
|
|
|
|
return {
|
|
"backtest_best_day": best_rel,
|
|
"backtest_worst_day": worst_rel,
|
|
"backtest_best_day_abs": best,
|
|
"backtest_worst_day_abs": worst,
|
|
"winning_days": winning_days,
|
|
"draw_days": draw_days,
|
|
"losing_days": losing_days,
|
|
"daily_profit": daily_profit_list,
|
|
}
|
|
|
|
|
|
def generate_strategy_stats(
|
|
pairlist: list[str],
|
|
strategy: str,
|
|
content: dict[str, Any],
|
|
min_date: datetime,
|
|
max_date: datetime,
|
|
market_change: float,
|
|
is_hyperopt: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
:param pairlist: List of pairs to backtest
|
|
:param strategy: Strategy name
|
|
:param content: Backtest result data in the format:
|
|
{'results: results, 'config: config}}.
|
|
:param min_date: Backtest start date
|
|
:param max_date: Backtest end date
|
|
:param market_change: float indicating the market change
|
|
:return: Dictionary containing results per strategy and a strategy summary.
|
|
"""
|
|
results: DataFrame = content["results"]
|
|
if not isinstance(results, DataFrame):
|
|
return {}
|
|
config = content["config"]
|
|
max_open_trades = min(config["max_open_trades"], len(pairlist))
|
|
start_balance = get_dry_run_wallet(config)
|
|
stake_currency = config["stake_currency"]
|
|
|
|
pair_results = generate_pair_metrics(
|
|
pairlist,
|
|
stake_currency=stake_currency,
|
|
starting_balance=start_balance,
|
|
results=results,
|
|
min_date=min_date,
|
|
max_date=max_date,
|
|
skip_nan=False,
|
|
)
|
|
|
|
enter_tag_stats = generate_tag_metrics(
|
|
"enter_tag",
|
|
starting_balance=start_balance,
|
|
results=results,
|
|
min_date=min_date,
|
|
max_date=max_date,
|
|
skip_nan=False,
|
|
)
|
|
exit_reason_stats = generate_tag_metrics(
|
|
"exit_reason",
|
|
starting_balance=start_balance,
|
|
results=results,
|
|
min_date=min_date,
|
|
max_date=max_date,
|
|
skip_nan=False,
|
|
)
|
|
mix_tag_stats = generate_tag_metrics(
|
|
["enter_tag", "exit_reason"],
|
|
starting_balance=start_balance,
|
|
results=results,
|
|
min_date=min_date,
|
|
max_date=max_date,
|
|
skip_nan=False,
|
|
)
|
|
left_open_results = generate_pair_metrics(
|
|
pairlist,
|
|
stake_currency=stake_currency,
|
|
starting_balance=start_balance,
|
|
results=results.loc[results["exit_reason"] == "force_exit"],
|
|
min_date=min_date,
|
|
max_date=max_date,
|
|
skip_nan=True,
|
|
)
|
|
|
|
daily_stats = generate_daily_stats(results)
|
|
trade_stats = generate_trading_stats(results)
|
|
|
|
periodic_breakdown = {}
|
|
if not is_hyperopt:
|
|
periodic_breakdown = {"periodic_breakdown": generate_all_periodic_breakdown_stats(results)}
|
|
|
|
best_pair = (
|
|
max(
|
|
[pair for pair in pair_results if pair["key"] != "TOTAL"], key=lambda x: x["profit_sum"]
|
|
)
|
|
if len(pair_results) > 1
|
|
else None
|
|
)
|
|
worst_pair = (
|
|
min(
|
|
[pair for pair in pair_results if pair["key"] != "TOTAL"], key=lambda x: x["profit_sum"]
|
|
)
|
|
if len(pair_results) > 1
|
|
else None
|
|
)
|
|
winning_profit = results.loc[results["profit_abs"] > 0, "profit_abs"].sum()
|
|
losing_profit = results.loc[results["profit_abs"] < 0, "profit_abs"].sum()
|
|
profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0
|
|
|
|
expectancy, expectancy_ratio = calculate_expectancy(results)
|
|
backtest_days = (max_date - min_date).days or 1
|
|
trades_dict = results.to_dict(orient="records")
|
|
strat_stats = {
|
|
"trades": trades_dict,
|
|
"locks": [lock.to_json() for lock in content["locks"]],
|
|
"best_pair": best_pair,
|
|
"worst_pair": worst_pair,
|
|
"results_per_pair": pair_results,
|
|
"results_per_enter_tag": enter_tag_stats,
|
|
"exit_reason_summary": exit_reason_stats,
|
|
"mix_tag_stats": mix_tag_stats,
|
|
"left_open_trades": left_open_results,
|
|
"total_trades": len(results),
|
|
"trade_count_long": len(results.loc[~results["is_short"]]),
|
|
"trade_count_short": len(results.loc[results["is_short"]]),
|
|
"total_volume": calculate_trade_volume(trades_dict),
|
|
"avg_stake_amount": results["stake_amount"].mean() if len(results) > 0 else 0,
|
|
"profit_mean": results["profit_ratio"].mean() if len(results) > 0 else 0,
|
|
"profit_median": results["profit_ratio"].median() if len(results) > 0 else 0,
|
|
"profit_total": results["profit_abs"].sum() / start_balance,
|
|
"profit_total_long": results.loc[~results["is_short"], "profit_abs"].sum() / start_balance,
|
|
"profit_total_short": results.loc[results["is_short"], "profit_abs"].sum() / start_balance,
|
|
"profit_total_abs": results["profit_abs"].sum(),
|
|
"profit_total_long_abs": results.loc[~results["is_short"], "profit_abs"].sum(),
|
|
"profit_total_short_abs": results.loc[results["is_short"], "profit_abs"].sum(),
|
|
"cagr": calculate_cagr(backtest_days, start_balance, content["final_balance"]),
|
|
"expectancy": expectancy,
|
|
"expectancy_ratio": expectancy_ratio,
|
|
"sortino": calculate_sortino(results, min_date, max_date, start_balance),
|
|
"sharpe": calculate_sharpe(results, min_date, max_date, start_balance),
|
|
"calmar": calculate_calmar(results, min_date, max_date, start_balance),
|
|
"sqn": calculate_sqn(results, start_balance),
|
|
"profit_factor": profit_factor,
|
|
"backtest_start": min_date.strftime(DATETIME_PRINT_FORMAT),
|
|
"backtest_start_ts": int(min_date.timestamp() * 1000),
|
|
"backtest_end": max_date.strftime(DATETIME_PRINT_FORMAT),
|
|
"backtest_end_ts": int(max_date.timestamp() * 1000),
|
|
"backtest_days": backtest_days,
|
|
"backtest_run_start_ts": content["backtest_start_time"],
|
|
"backtest_run_end_ts": content["backtest_end_time"],
|
|
"trades_per_day": round(len(results) / backtest_days, 2),
|
|
"market_change": market_change,
|
|
"pairlist": pairlist,
|
|
"stake_amount": config["stake_amount"],
|
|
"stake_currency": config["stake_currency"],
|
|
"stake_currency_decimals": decimals_per_coin(config["stake_currency"]),
|
|
"starting_balance": start_balance,
|
|
"dry_run_wallet": start_balance,
|
|
"final_balance": content["final_balance"],
|
|
"rejected_signals": content["rejected_signals"],
|
|
"timedout_entry_orders": content["timedout_entry_orders"],
|
|
"timedout_exit_orders": content["timedout_exit_orders"],
|
|
"canceled_trade_entries": content["canceled_trade_entries"],
|
|
"canceled_entry_orders": content["canceled_entry_orders"],
|
|
"replaced_entry_orders": content["replaced_entry_orders"],
|
|
"max_open_trades": max_open_trades,
|
|
"max_open_trades_setting": (
|
|
config["max_open_trades"] if config["max_open_trades"] != float("inf") else -1
|
|
),
|
|
"timeframe": config["timeframe"],
|
|
"timeframe_detail": config.get("timeframe_detail", ""),
|
|
"timerange": config.get("timerange", ""),
|
|
"enable_protections": config.get("enable_protections", False),
|
|
"strategy_name": strategy,
|
|
# Parameters relevant for backtesting
|
|
"stoploss": config["stoploss"],
|
|
"trailing_stop": config.get("trailing_stop", False),
|
|
"trailing_stop_positive": config.get("trailing_stop_positive"),
|
|
"trailing_stop_positive_offset": config.get("trailing_stop_positive_offset", 0.0),
|
|
"trailing_only_offset_is_reached": config.get("trailing_only_offset_is_reached", False),
|
|
"use_custom_stoploss": config.get("use_custom_stoploss", False),
|
|
"minimal_roi": config["minimal_roi"],
|
|
"use_exit_signal": config["use_exit_signal"],
|
|
"exit_profit_only": config["exit_profit_only"],
|
|
"exit_profit_offset": config["exit_profit_offset"],
|
|
"ignore_roi_if_entry_signal": config["ignore_roi_if_entry_signal"],
|
|
"trading_mode": config["trading_mode"],
|
|
"margin_mode": config["margin_mode"],
|
|
**periodic_breakdown,
|
|
**daily_stats,
|
|
**trade_stats,
|
|
}
|
|
|
|
try:
|
|
drawdown = calculate_max_drawdown(
|
|
results, value_col="profit_abs", starting_balance=start_balance
|
|
)
|
|
# max_relative_drawdown = Underwater
|
|
underwater = calculate_max_drawdown(
|
|
results, value_col="profit_abs", starting_balance=start_balance, relative=True
|
|
)
|
|
|
|
strat_stats.update(
|
|
{
|
|
"max_drawdown_account": drawdown.relative_account_drawdown,
|
|
"max_relative_drawdown": underwater.relative_account_drawdown,
|
|
"max_drawdown_abs": drawdown.drawdown_abs,
|
|
"drawdown_start": drawdown.high_date.strftime(DATETIME_PRINT_FORMAT),
|
|
"drawdown_start_ts": drawdown.high_date.timestamp() * 1000,
|
|
"drawdown_end": drawdown.low_date.strftime(DATETIME_PRINT_FORMAT),
|
|
"drawdown_end_ts": drawdown.low_date.timestamp() * 1000,
|
|
"max_drawdown_low": drawdown.low_value,
|
|
"max_drawdown_high": drawdown.high_value,
|
|
}
|
|
)
|
|
|
|
csum_min, csum_max = calculate_csum(results, start_balance)
|
|
strat_stats.update({"csum_min": csum_min, "csum_max": csum_max})
|
|
|
|
except ValueError:
|
|
strat_stats.update(
|
|
{
|
|
"max_drawdown_account": 0.0,
|
|
"max_relative_drawdown": 0.0,
|
|
"max_drawdown_abs": 0.0,
|
|
"max_drawdown_low": 0.0,
|
|
"max_drawdown_high": 0.0,
|
|
"drawdown_start": datetime(1970, 1, 1, tzinfo=timezone.utc),
|
|
"drawdown_start_ts": 0,
|
|
"drawdown_end": datetime(1970, 1, 1, tzinfo=timezone.utc),
|
|
"drawdown_end_ts": 0,
|
|
"csum_min": 0,
|
|
"csum_max": 0,
|
|
}
|
|
)
|
|
|
|
return strat_stats
|
|
|
|
|
|
def generate_backtest_stats(
|
|
btdata: dict[str, DataFrame],
|
|
all_results: dict[str, dict[str, DataFrame | dict]],
|
|
min_date: datetime,
|
|
max_date: datetime,
|
|
) -> BacktestResultType:
|
|
"""
|
|
:param btdata: Backtest data
|
|
:param all_results: backtest result - dictionary in the form:
|
|
{ Strategy: {'results: results, 'config: config}}.
|
|
:param min_date: Backtest start date
|
|
:param max_date: Backtest end date
|
|
:return: Dictionary containing results per strategy and a strategy summary.
|
|
"""
|
|
result: BacktestResultType = get_BacktestResultType_default()
|
|
market_change = calculate_market_change(btdata, "close")
|
|
metadata = {}
|
|
pairlist = list(btdata.keys())
|
|
for strategy, content in all_results.items():
|
|
strat_stats = generate_strategy_stats(
|
|
pairlist, strategy, content, min_date, max_date, market_change=market_change
|
|
)
|
|
metadata[strategy] = {
|
|
"run_id": content["run_id"],
|
|
"backtest_start_time": content["backtest_start_time"],
|
|
"timeframe": content["config"]["timeframe"],
|
|
"timeframe_detail": content["config"].get("timeframe_detail", None),
|
|
"backtest_start_ts": int(min_date.timestamp()),
|
|
"backtest_end_ts": int(max_date.timestamp()),
|
|
}
|
|
result["strategy"][strategy] = strat_stats
|
|
|
|
strategy_results = generate_strategy_comparison(bt_stats=result["strategy"])
|
|
|
|
result["metadata"] = metadata
|
|
result["strategy_comparison"] = strategy_results
|
|
|
|
return result
|