import logging from copy import deepcopy from datetime import datetime, timedelta, timezone from typing import Any, Literal import numpy as np from pandas import DataFrame, Series, concat, to_datetime from freqtrade.constants import BACKTEST_BREAKDOWNS, DATETIME_PRINT_FORMAT from freqtrade.data.metrics import ( calculate_cagr, calculate_calmar, calculate_csum, calculate_expectancy, calculate_market_change, calculate_max_drawdown, calculate_sharpe, calculate_sortino, calculate_sqn, ) from freqtrade.ft_types import BacktestResultType, get_BacktestResultType_default from freqtrade.util import decimals_per_coin, fmt_coin, get_dry_run_wallet logger = logging.getLogger(__name__) def generate_trade_signal_candles( preprocessed_df: dict[str, DataFrame], bt_results: dict[str, Any], date_col: str ) -> dict[str, DataFrame]: signal_candles_only = {} for pair in preprocessed_df.keys(): signal_candles_only_df = DataFrame() pairdf = preprocessed_df[pair] resdf = bt_results["results"] pairresults = resdf.loc[(resdf["pair"] == pair)] if pairdf.shape[0] > 0: for t, v in pairresults.iterrows(): allinds = pairdf.loc[(pairdf["date"] < v[date_col])] signal_inds = allinds.iloc[[-1]] signal_candles_only_df = concat( [signal_candles_only_df.infer_objects(), signal_inds.infer_objects()] ) signal_candles_only[pair] = signal_candles_only_df return signal_candles_only def generate_rejected_signals( preprocessed_df: dict[str, DataFrame], rejected_dict: dict[str, DataFrame] ) -> dict[str, DataFrame]: rejected_candles_only = {} for pair, signals in rejected_dict.items(): rejected_signals_only_df = DataFrame() pairdf = preprocessed_df[pair] for t in signals: data_df_row = pairdf.loc[(pairdf["date"] == t[0])].copy() data_df_row["pair"] = pair data_df_row["enter_tag"] = t[1] rejected_signals_only_df = concat( [rejected_signals_only_df.infer_objects(), data_df_row.infer_objects()] ) rejected_candles_only[pair] = rejected_signals_only_df return rejected_candles_only def _generate_result_line( result: DataFrame, min_date: datetime, max_date: datetime, starting_balance: float, first_column: str | list[str], ) -> dict: """ Generate one result dict, with "first_column" as key. """ profit_sum = result["profit_ratio"].sum() # (end-capital - starting capital) / starting capital profit_total = result["profit_abs"].sum() / starting_balance backtest_days = (max_date - min_date).days or 1 final_balance = starting_balance + result["profit_abs"].sum() expectancy, expectancy_ratio = calculate_expectancy(result) winning_profit = result.loc[result["profit_abs"] > 0, "profit_abs"].sum() losing_profit = result.loc[result["profit_abs"] < 0, "profit_abs"].sum() profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0 try: drawdown = calculate_max_drawdown( result, value_col="profit_abs", starting_balance=starting_balance ) except ValueError: drawdown = None return { "key": first_column, "trades": len(result), "profit_mean": result["profit_ratio"].mean() if len(result) > 0 else 0.0, "profit_mean_pct": ( round(result["profit_ratio"].mean() * 100.0, 2) if len(result) > 0 else 0.0 ), "profit_sum": profit_sum, "profit_sum_pct": round(profit_sum * 100.0, 2), "profit_total_abs": result["profit_abs"].sum(), "profit_total": profit_total, "profit_total_pct": round(profit_total * 100.0, 2), "duration_avg": ( str(timedelta(minutes=round(result["trade_duration"].mean()))) if not result.empty else "0:00" ), # 'duration_max': str(timedelta( # minutes=round(result['trade_duration'].max())) # ) if not result.empty else '0:00', # 'duration_min': str(timedelta( # minutes=round(result['trade_duration'].min())) # ) if not result.empty else '0:00', "wins": len(result[result["profit_abs"] > 0]), "draws": len(result[result["profit_abs"] == 0]), "losses": len(result[result["profit_abs"] < 0]), "winrate": len(result[result["profit_abs"] > 0]) / len(result) if len(result) else 0.0, "cagr": calculate_cagr(backtest_days, starting_balance, final_balance), "expectancy": expectancy, "expectancy_ratio": expectancy_ratio, "sortino": calculate_sortino(result, min_date, max_date, starting_balance), "sharpe": calculate_sharpe(result, min_date, max_date, starting_balance), "calmar": calculate_calmar(result, min_date, max_date, starting_balance), "sqn": calculate_sqn(result, starting_balance), "profit_factor": profit_factor, "max_drawdown_account": drawdown.relative_account_drawdown if drawdown else 0.0, "max_drawdown_abs": drawdown.drawdown_abs if drawdown else 0.0, } def calculate_trade_volume(trades_dict: list[dict[str, Any]]) -> float: # Aggregate the total volume traded from orders.cost. # Orders is a nested dictionary within the trades list. return sum(sum(order["cost"] for order in trade.get("orders", [])) for trade in trades_dict) def generate_pair_metrics( # pairlist: list[str], stake_currency: str, starting_balance: float, results: DataFrame, min_date: datetime, max_date: datetime, skip_nan: bool = False, ) -> list[dict]: """ Generates and returns a list for the given backtest data and the results dataframe :param pairlist: Pairlist used :param stake_currency: stake-currency - used to correctly name headers :param starting_balance: Starting balance :param results: Dataframe containing the backtest results :param skip_nan: Print "left open" open trades :return: List of Dicts containing the metrics per pair """ tabular_data = [] for pair in pairlist: result = results[results["pair"] == pair] if skip_nan and result["profit_abs"].isnull().all(): continue tabular_data.append( _generate_result_line(result, min_date, max_date, starting_balance, pair) ) # Sort by total profit %: tabular_data = sorted(tabular_data, key=lambda k: k["profit_total_abs"], reverse=True) # Append Total tabular_data.append( _generate_result_line(results, min_date, max_date, starting_balance, "TOTAL") ) return tabular_data def generate_tag_metrics( tag_type: Literal["enter_tag", "exit_reason"] | list[Literal["enter_tag", "exit_reason"]], starting_balance: float, results: DataFrame, min_date: datetime, max_date: datetime, skip_nan: bool = False, ) -> list[dict]: """ Generates and returns a list of metrics for the given tag trades and the results dataframe :param starting_balance: Starting balance :param results: Dataframe containing the backtest results :param skip_nan: Print "left open" open trades :return: List of Dicts containing the metrics per pair """ tabular_data = [] if all( tag in results.columns for tag in (tag_type if isinstance(tag_type, list) else [tag_type]) ): for tags, group in results.groupby(tag_type): if skip_nan and group["profit_abs"].isnull().all(): continue tabular_data.append( _generate_result_line(group, min_date, max_date, starting_balance, tags) ) # Sort by total profit %: tabular_data = sorted(tabular_data, key=lambda k: k["profit_total_abs"], reverse=True) # Append Total tabular_data.append( _generate_result_line(results, min_date, max_date, starting_balance, "TOTAL") ) return tabular_data else: return [] def generate_strategy_comparison(bt_stats: dict) -> list[dict]: """ Generate summary per strategy :param bt_stats: Dict of containing results for all strategies :return: List of Dicts containing the metrics per Strategy """ tabular_data = [] for strategy, result in bt_stats.items(): tabular_data.append(deepcopy(result["results_per_pair"][-1])) # Update "key" to strategy (results_per_pair has it as "Total"). tabular_data[-1]["key"] = strategy tabular_data[-1]["max_drawdown_account"] = result["max_drawdown_account"] tabular_data[-1]["max_drawdown_abs"] = fmt_coin( result["max_drawdown_abs"], result["stake_currency"], False ) return tabular_data def _get_resample_from_period(period: str) -> str: if period == "day": return "1d" if period == "week": # Weekly defaulting to Monday. return "1W-MON" if period == "month": return "1ME" if period == "year": return "1YE" raise ValueError(f"Period {period} is not supported.") def generate_periodic_breakdown_stats( trade_list: list | DataFrame, period: str ) -> list[dict[str, Any]]: results = trade_list if not isinstance(trade_list, list) else DataFrame.from_records(trade_list) if len(results) == 0: return [] results["close_date"] = to_datetime(results["close_date"], utc=True) resample_period = _get_resample_from_period(period) resampled = results.resample(resample_period, on="close_date") stats = [] for name, day in resampled: profit_abs = day["profit_abs"].sum().round(10) wins = sum(day["profit_abs"] > 0) draws = sum(day["profit_abs"] == 0) losses = sum(day["profit_abs"] < 0) trades = wins + draws + losses winning_profit = day.loc[day["profit_abs"] > 0, "profit_abs"].sum() losing_profit = day.loc[day["profit_abs"] < 0, "profit_abs"].sum() profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0 stats.append( { "date": name.strftime("%d/%m/%Y"), "date_ts": int(name.to_pydatetime().timestamp() * 1000), "profit_abs": profit_abs, "wins": wins, "draws": draws, "losses": losses, "trades": trades, "profit_factor": round(profit_factor, 8), } ) return stats def generate_all_periodic_breakdown_stats(trade_list: list) -> dict[str, list]: result = {} for period in BACKTEST_BREAKDOWNS: result[period] = generate_periodic_breakdown_stats(trade_list, period) return result def calc_streak(dataframe: DataFrame) -> tuple[int, int]: """ Calculate consecutive win and loss streaks :param dataframe: Dataframe containing the trades dataframe, with profit_ratio column :return: Tuple containing consecutive wins and losses """ df = Series(np.where(dataframe["profit_ratio"] > 0, "win", "loss")).to_frame("result") df["streaks"] = df["result"].ne(df["result"].shift()).cumsum().rename("streaks") df["counter"] = df["streaks"].groupby(df["streaks"]).cumcount() + 1 res = df.groupby(df["result"]).max() # cons_wins = int(res.loc["win", "counter"]) if "win" in res.index else 0 cons_losses = int(res.loc["loss", "counter"]) if "loss" in res.index else 0 return cons_wins, cons_losses def generate_trading_stats(results: DataFrame) -> dict[str, Any]: """Generate overall trade statistics""" if len(results) == 0: return { "wins": 0, "losses": 0, "draws": 0, "winrate": 0, "holding_avg": timedelta(), "winner_holding_avg": timedelta(), "loser_holding_avg": timedelta(), "max_consecutive_wins": 0, "max_consecutive_losses": 0, } winning_trades = results.loc[results["profit_ratio"] > 0] draw_trades = results.loc[results["profit_ratio"] == 0] losing_trades = results.loc[results["profit_ratio"] < 0] holding_avg = ( timedelta(minutes=round(results["trade_duration"].mean())) if not results.empty else timedelta() ) winner_holding_avg = ( timedelta(minutes=round(winning_trades["trade_duration"].mean())) if not winning_trades.empty else timedelta() ) loser_holding_avg = ( timedelta(minutes=round(losing_trades["trade_duration"].mean())) if not losing_trades.empty else timedelta() ) winstreak, loss_streak = calc_streak(results) return { "wins": len(winning_trades), "losses": len(losing_trades), "draws": len(draw_trades), "winrate": len(winning_trades) / len(results) if len(results) else 0.0, "holding_avg": holding_avg, "holding_avg_s": holding_avg.total_seconds(), "winner_holding_avg": winner_holding_avg, "winner_holding_avg_s": winner_holding_avg.total_seconds(), "loser_holding_avg": loser_holding_avg, "loser_holding_avg_s": loser_holding_avg.total_seconds(), "max_consecutive_wins": winstreak, "max_consecutive_losses": loss_streak, } def generate_daily_stats(results: DataFrame) -> dict[str, Any]: """Generate daily statistics""" if len(results) == 0: return { "backtest_best_day": 0, "backtest_worst_day": 0, "backtest_best_day_abs": 0, "backtest_worst_day_abs": 0, "winning_days": 0, "draw_days": 0, "losing_days": 0, "daily_profit_list": [], } daily_profit_rel = results.resample("1d", on="close_date")["profit_ratio"].sum() daily_profit = results.resample("1d", on="close_date")["profit_abs"].sum().round(10) worst_rel = min(daily_profit_rel) best_rel = max(daily_profit_rel) worst = min(daily_profit) best = max(daily_profit) winning_days = sum(daily_profit > 0) draw_days = sum(daily_profit == 0) losing_days = sum(daily_profit < 0) daily_profit_list = [(str(idx.date()), val) for idx, val in daily_profit.items()] return { "backtest_best_day": best_rel, "backtest_worst_day": worst_rel, "backtest_best_day_abs": best, "backtest_worst_day_abs": worst, "winning_days": winning_days, "draw_days": draw_days, "losing_days": losing_days, "daily_profit": daily_profit_list, } def generate_strategy_stats( pairlist: list[str], strategy: str, content: dict[str, Any], min_date: datetime, max_date: datetime, market_change: float, is_hyperopt: bool = False, ) -> dict[str, Any]: """ :param pairlist: List of pairs to backtest :param strategy: Strategy name :param content: Backtest result data in the format: {'results: results, 'config: config}}. :param min_date: Backtest start date :param max_date: Backtest end date :param market_change: float indicating the market change :return: Dictionary containing results per strategy and a strategy summary. """ results: DataFrame = content["results"] if not isinstance(results, DataFrame): return {} config = content["config"] max_open_trades = min(config["max_open_trades"], len(pairlist)) start_balance = get_dry_run_wallet(config) stake_currency = config["stake_currency"] pair_results = generate_pair_metrics( pairlist, stake_currency=stake_currency, starting_balance=start_balance, results=results, min_date=min_date, max_date=max_date, skip_nan=False, ) enter_tag_stats = generate_tag_metrics( "enter_tag", starting_balance=start_balance, results=results, min_date=min_date, max_date=max_date, skip_nan=False, ) exit_reason_stats = generate_tag_metrics( "exit_reason", starting_balance=start_balance, results=results, min_date=min_date, max_date=max_date, skip_nan=False, ) mix_tag_stats = generate_tag_metrics( ["enter_tag", "exit_reason"], starting_balance=start_balance, results=results, min_date=min_date, max_date=max_date, skip_nan=False, ) left_open_results = generate_pair_metrics( pairlist, stake_currency=stake_currency, starting_balance=start_balance, results=results.loc[results["exit_reason"] == "force_exit"], min_date=min_date, max_date=max_date, skip_nan=True, ) daily_stats = generate_daily_stats(results) trade_stats = generate_trading_stats(results) periodic_breakdown = {} if not is_hyperopt: periodic_breakdown = {"periodic_breakdown": generate_all_periodic_breakdown_stats(results)} best_pair = ( max( [pair for pair in pair_results if pair["key"] != "TOTAL"], key=lambda x: x["profit_sum"] ) if len(pair_results) > 1 else None ) worst_pair = ( min( [pair for pair in pair_results if pair["key"] != "TOTAL"], key=lambda x: x["profit_sum"] ) if len(pair_results) > 1 else None ) winning_profit = results.loc[results["profit_abs"] > 0, "profit_abs"].sum() losing_profit = results.loc[results["profit_abs"] < 0, "profit_abs"].sum() profit_factor = winning_profit / abs(losing_profit) if losing_profit else 0.0 expectancy, expectancy_ratio = calculate_expectancy(results) backtest_days = (max_date - min_date).days or 1 trades_dict = results.to_dict(orient="records") strat_stats = { "trades": trades_dict, "locks": [lock.to_json() for lock in content["locks"]], "best_pair": best_pair, "worst_pair": worst_pair, "results_per_pair": pair_results, "results_per_enter_tag": enter_tag_stats, "exit_reason_summary": exit_reason_stats, "mix_tag_stats": mix_tag_stats, "left_open_trades": left_open_results, "total_trades": len(results), "trade_count_long": len(results.loc[~results["is_short"]]), "trade_count_short": len(results.loc[results["is_short"]]), "total_volume": calculate_trade_volume(trades_dict), "avg_stake_amount": results["stake_amount"].mean() if len(results) > 0 else 0, "profit_mean": results["profit_ratio"].mean() if len(results) > 0 else 0, "profit_median": results["profit_ratio"].median() if len(results) > 0 else 0, "profit_total": results["profit_abs"].sum() / start_balance, "profit_total_long": results.loc[~results["is_short"], "profit_abs"].sum() / start_balance, "profit_total_short": results.loc[results["is_short"], "profit_abs"].sum() / start_balance, "profit_total_abs": results["profit_abs"].sum(), "profit_total_long_abs": results.loc[~results["is_short"], "profit_abs"].sum(), "profit_total_short_abs": results.loc[results["is_short"], "profit_abs"].sum(), "cagr": calculate_cagr(backtest_days, start_balance, content["final_balance"]), "expectancy": expectancy, "expectancy_ratio": expectancy_ratio, "sortino": calculate_sortino(results, min_date, max_date, start_balance), "sharpe": calculate_sharpe(results, min_date, max_date, start_balance), "calmar": calculate_calmar(results, min_date, max_date, start_balance), "sqn": calculate_sqn(results, start_balance), "profit_factor": profit_factor, "backtest_start": min_date.strftime(DATETIME_PRINT_FORMAT), "backtest_start_ts": int(min_date.timestamp() * 1000), "backtest_end": max_date.strftime(DATETIME_PRINT_FORMAT), "backtest_end_ts": int(max_date.timestamp() * 1000), "backtest_days": backtest_days, "backtest_run_start_ts": content["backtest_start_time"], "backtest_run_end_ts": content["backtest_end_time"], "trades_per_day": round(len(results) / backtest_days, 2), "market_change": market_change, "pairlist": pairlist, "stake_amount": config["stake_amount"], "stake_currency": config["stake_currency"], "stake_currency_decimals": decimals_per_coin(config["stake_currency"]), "starting_balance": start_balance, "dry_run_wallet": start_balance, "final_balance": content["final_balance"], "rejected_signals": content["rejected_signals"], "timedout_entry_orders": content["timedout_entry_orders"], "timedout_exit_orders": content["timedout_exit_orders"], "canceled_trade_entries": content["canceled_trade_entries"], "canceled_entry_orders": content["canceled_entry_orders"], "replaced_entry_orders": content["replaced_entry_orders"], "max_open_trades": max_open_trades, "max_open_trades_setting": ( config["max_open_trades"] if config["max_open_trades"] != float("inf") else -1 ), "timeframe": config["timeframe"], "timeframe_detail": config.get("timeframe_detail", ""), "timerange": config.get("timerange", ""), "enable_protections": config.get("enable_protections", False), "strategy_name": strategy, # Parameters relevant for backtesting "stoploss": config["stoploss"], "trailing_stop": config.get("trailing_stop", False), "trailing_stop_positive": config.get("trailing_stop_positive"), "trailing_stop_positive_offset": config.get("trailing_stop_positive_offset", 0.0), "trailing_only_offset_is_reached": config.get("trailing_only_offset_is_reached", False), "use_custom_stoploss": config.get("use_custom_stoploss", False), "minimal_roi": config["minimal_roi"], "use_exit_signal": config["use_exit_signal"], "exit_profit_only": config["exit_profit_only"], "exit_profit_offset": config["exit_profit_offset"], "ignore_roi_if_entry_signal": config["ignore_roi_if_entry_signal"], "trading_mode": config["trading_mode"], "margin_mode": config["margin_mode"], **periodic_breakdown, **daily_stats, **trade_stats, } try: drawdown = calculate_max_drawdown( results, value_col="profit_abs", starting_balance=start_balance ) # max_relative_drawdown = Underwater underwater = calculate_max_drawdown( results, value_col="profit_abs", starting_balance=start_balance, relative=True ) strat_stats.update( { "max_drawdown_account": drawdown.relative_account_drawdown, "max_relative_drawdown": underwater.relative_account_drawdown, "max_drawdown_abs": drawdown.drawdown_abs, "drawdown_start": drawdown.high_date.strftime(DATETIME_PRINT_FORMAT), "drawdown_start_ts": drawdown.high_date.timestamp() * 1000, "drawdown_end": drawdown.low_date.strftime(DATETIME_PRINT_FORMAT), "drawdown_end_ts": drawdown.low_date.timestamp() * 1000, "max_drawdown_low": drawdown.low_value, "max_drawdown_high": drawdown.high_value, } ) csum_min, csum_max = calculate_csum(results, start_balance) strat_stats.update({"csum_min": csum_min, "csum_max": csum_max}) except ValueError: strat_stats.update( { "max_drawdown_account": 0.0, "max_relative_drawdown": 0.0, "max_drawdown_abs": 0.0, "max_drawdown_low": 0.0, "max_drawdown_high": 0.0, "drawdown_start": datetime(1970, 1, 1, tzinfo=timezone.utc), "drawdown_start_ts": 0, "drawdown_end": datetime(1970, 1, 1, tzinfo=timezone.utc), "drawdown_end_ts": 0, "csum_min": 0, "csum_max": 0, } ) return strat_stats def generate_backtest_stats( btdata: dict[str, DataFrame], all_results: dict[str, dict[str, DataFrame | dict]], min_date: datetime, max_date: datetime, ) -> BacktestResultType: """ :param btdata: Backtest data :param all_results: backtest result - dictionary in the form: { Strategy: {'results: results, 'config: config}}. :param min_date: Backtest start date :param max_date: Backtest end date :return: Dictionary containing results per strategy and a strategy summary. """ result: BacktestResultType = get_BacktestResultType_default() market_change = calculate_market_change(btdata, "close") metadata = {} pairlist = list(btdata.keys()) for strategy, content in all_results.items(): strat_stats = generate_strategy_stats( pairlist, strategy, content, min_date, max_date, market_change=market_change ) metadata[strategy] = { "run_id": content["run_id"], "backtest_start_time": content["backtest_start_time"], "timeframe": content["config"]["timeframe"], "timeframe_detail": content["config"].get("timeframe_detail", None), "backtest_start_ts": int(min_date.timestamp()), "backtest_end_ts": int(max_date.timestamp()), } result["strategy"][strategy] = strat_stats strategy_results = generate_strategy_comparison(bt_stats=result["strategy"]) result["metadata"] = metadata result["strategy_comparison"] = strategy_results return result