139 lines
6.6 KiB
Python
139 lines
6.6 KiB
Python
import warnings
|
||
warnings.filterwarnings("ignore", category=UserWarning, module="pandas_ta")
|
||
|
||
from freqtrade.strategy import IStrategy
|
||
from pandas import DataFrame
|
||
import pandas_ta as ta
|
||
|
||
class FreqaiPrimer(IStrategy):
|
||
# 策略参数
|
||
minimal_roi = {
|
||
"0": 0.04, # 4% ROI (10 分钟内)
|
||
"60": 0.02, # 2% ROI (1 小时)
|
||
"180": 0.01, # 1% ROI (3 小时)
|
||
"360": 0.0 # 0% ROI (6 小时)
|
||
}
|
||
|
||
stoploss = -0.015 # 初始止损 -1.5%
|
||
trailing_stop = True
|
||
trailing_stop_positive = 0.008 # 价格上涨 0.8% 后开始追踪
|
||
trailing_stop_positive_offset = 0.01 # 追踪止损偏移量 1%
|
||
|
||
timeframe = "3m" # 主时间框架为 3 分钟
|
||
can_short = False # 禁用做空
|
||
|
||
# 自定义指标参数
|
||
bb_length = 20
|
||
bb_std = 2.0
|
||
rsi_length = 14
|
||
rsi_overbought = 65 # 收紧超买阈值到 65
|
||
rsi_oversold = 30 # 收紧超卖阈值到 30
|
||
|
||
def informative_pairs(self):
|
||
pairs = self.dp.current_whitelist()
|
||
return [(pair, '15m') for pair in pairs] + [(pair, '1h') for pair in pairs]
|
||
|
||
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# 计算 3m 周期的指标
|
||
bb_3m = ta.bbands(dataframe['close'], length=self.bb_length, std=self.bb_std)
|
||
dataframe['bb_lower_3m'] = bb_3m[f'BBL_{self.bb_length}_{self.bb_std}']
|
||
dataframe['bb_upper_3m'] = bb_3m[f'BBU_{self.bb_length}_{self.bb_std}']
|
||
dataframe['rsi_3m'] = ta.rsi(dataframe['close'], length=self.rsi_length)
|
||
|
||
# 成交量过滤
|
||
dataframe['volume_ma'] = dataframe['volume'].rolling(20).mean()
|
||
|
||
# 计算 ATR 用于动态止损和退出
|
||
dataframe['atr'] = ta.atr(dataframe['high'], dataframe['low'], dataframe['close'], length=14)
|
||
|
||
# 获取 15m 数据
|
||
df_15m = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='15m')
|
||
bb_15m = ta.bbands(df_15m['close'], length=self.bb_length, std=self.bb_std)
|
||
df_15m['bb_lower_15m'] = bb_15m[f'BBL_{self.bb_length}_{self.bb_std}']
|
||
df_15m['bb_upper_15m'] = bb_15m[f'BBU_{self.bb_length}_{self.bb_std}']
|
||
df_15m['rsi_15m'] = ta.rsi(df_15m['close'], length=self.rsi_length)
|
||
|
||
# 手动合并 15m 数据
|
||
df_15m = df_15m[['date', 'bb_lower_15m', 'bb_upper_15m', 'rsi_15m']]
|
||
df_15m = df_15m.rename(columns={'date': 'date_15m'})
|
||
dataframe = dataframe.merge(df_15m, how='left', left_on='date', right_on='date_15m')
|
||
dataframe = dataframe.fillna(method='ffill')
|
||
|
||
# 获取 1h 数据
|
||
df_1h = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
|
||
bb_1h = ta.bbands(df_1h['close'], length=self.bb_length, std=self.bb_std)
|
||
df_1h['bb_lower_1h'] = bb_1h[f'BBL_{self.bb_length}_{self.bb_std}']
|
||
df_1h['bb_upper_1h'] = bb_1h[f'BBU_{self.bb_length}_{self.bb_std}']
|
||
df_1h['rsi_1h'] = ta.rsi(df_1h['close'], length=self.rsi_length)
|
||
|
||
# 手动合并 1h 数据
|
||
df_1h = df_1h[['date', 'bb_lower_1h', 'bb_upper_1h', 'rsi_1h']]
|
||
df_1h = df_1h.rename(columns={'date': 'date_1h'})
|
||
dataframe = dataframe.merge(df_1h, how='left', left_on='date', right_on='date_1h')
|
||
dataframe = dataframe.fillna(method='ffill')
|
||
|
||
# K线形态:看涨吞没
|
||
dataframe['bullish_engulfing'] = (
|
||
(dataframe['close'].shift(1) < dataframe['open'].shift(1)) &
|
||
(dataframe['close'] > dataframe['open']) &
|
||
(dataframe['close'] > dataframe['open'].shift(1)) &
|
||
(dataframe['open'] < dataframe['close'].shift(1))
|
||
)
|
||
|
||
# 调试:打印指标值(最后 5 行)
|
||
print(f"Pair: {metadata['pair']}, Last 5 rows:")
|
||
print(dataframe[['date', 'close', 'bb_lower_3m', 'rsi_3m', 'rsi_15m', 'rsi_1h', 'bullish_engulfing', 'volume', 'volume_ma']].tail(5))
|
||
|
||
return dataframe
|
||
|
||
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# 做多条件(收紧条件以减少交易量)
|
||
conditions = [
|
||
(dataframe['close'] <= dataframe['bb_lower_3m']), # 严格要求价格低于 3m 布林带下轨
|
||
(dataframe['rsi_3m'] < self.rsi_oversold), # 3m RSI < 30
|
||
(dataframe['rsi_15m'] < self.rsi_oversold), # 15m RSI < 30
|
||
(dataframe['close'] <= dataframe['bb_lower_15m']), # 价格低于 15m 布林带下轨
|
||
(dataframe['bullish_engulfing']), # 强制要求看涨吞没
|
||
(dataframe['volume'] > dataframe['volume_ma'] * 1.2) # 成交量高于均量的 1.2 倍
|
||
]
|
||
|
||
dataframe.loc[conditions[0] & conditions[1] & conditions[2] & conditions[3] & conditions[4] & conditions[5], 'enter_long'] = 1
|
||
|
||
# 调试:检查每个条件的触发情况
|
||
print(f"Pair: {metadata['pair']}, Entry condition checks:")
|
||
print(f" - Close <= bb_lower_3m: {(dataframe['close'] <= dataframe['bb_lower_3m']).sum()} candles")
|
||
print(f" - RSI_3m < {self.rsi_oversold}: {(dataframe['rsi_3m'] < self.rsi_oversold).sum()} candles")
|
||
print(f" - RSI_15m < {self.rsi_oversold}: {(dataframe['rsi_15m'] < self.rsi_oversold).sum()} candles")
|
||
print(f" - Close <= bb_lower_15m: {(dataframe['close'] <= dataframe['bb_lower_15m']).sum()} candles")
|
||
print(f" - Bullish Engulfing: {(dataframe['bullish_engulfing']).sum()} candles")
|
||
print(f" - Volume > Volume_MA * 1.2: {(dataframe['volume'] > dataframe['volume_ma'] * 1.2).sum()} candles")
|
||
if dataframe['enter_long'].sum() > 0:
|
||
print(f"Entry signals found at:")
|
||
print(dataframe[dataframe['enter_long'] == 1][['date', 'close', 'rsi_3m', 'rsi_15m', 'bullish_engulfing']])
|
||
else:
|
||
print("No entry signals generated.")
|
||
|
||
return dataframe
|
||
|
||
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# 做多退出(添加 ATR 动态退出)
|
||
dataframe.loc[
|
||
(
|
||
(dataframe['close'] >= dataframe['bb_upper_3m']) |
|
||
(dataframe['rsi_3m'] > self.rsi_overbought) |
|
||
(dataframe['close'] > dataframe['open'] + dataframe['atr'] * 2) # 获利 2 倍 ATR
|
||
),
|
||
'exit_long'] = 1
|
||
return dataframe
|
||
|
||
def custom_stoploss(self, pair: str, trade: 'Trade', current_time, current_rate: float,
|
||
current_profit: float, **kwargs) -> float:
|
||
# 动态止损基于 ATR
|
||
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
|
||
last_candle = dataframe.iloc[-1]
|
||
atr = last_candle['atr']
|
||
if atr > 0:
|
||
return -1.5 * atr / current_rate
|
||
return self.stoploss
|
||
|