myTestFreqAI/freqtrade/exchange/binance_public_data.py
Ubuntu 17199e9a44
Some checks failed
Pre-commit auto-update / auto-update (push) Has been cancelled
first add
2025-04-21 21:11:51 +08:00

504 lines
19 KiB
Python

"""
Fetch daily-archived OHLCV data from https://data.binance.vision/
Documentation can be found in https://github.com/binance/binance-public-data
"""
import asyncio
import logging
import zipfile
from datetime import date, timedelta
from io import BytesIO
from typing import Any
import aiohttp
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.enums import CandleType
from freqtrade.misc import chunks
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
logger = logging.getLogger(__name__)
class Http404(Exception):
def __init__(self, msg, date, url):
super().__init__(msg)
self.date = date
self.url = url
class BadHttpStatus(Exception):
"""Not 200/404"""
pass
async def download_archive_ohlcv(
candle_type: CandleType,
pair: str,
timeframe: str,
*,
since_ms: int,
until_ms: int | None,
markets: dict[str, Any],
stop_on_404: bool = True,
) -> DataFrame:
"""
Fetch OHLCV data from https://data.binance.vision
The function makes its best effort to download data within the time range
[`since_ms`, `until_ms`] -- including `since_ms`, but excluding `until_ms`.
If `stop_one_404` is True, this returned DataFrame is guaranteed to start from `since_ms`
with no gaps in the data.
:candle_type: Currently only spot and futures are supported
:pair: symbol name in CCXT convention
:since_ms: the start timestamp of data, including itself
:until_ms: the end timestamp of data, excluding itself
:param until_ms: `None` indicates the timestamp of the latest available data
:markets: the CCXT markets dict, when it's None, the function will load the markets data
from a new `ccxt.binance` instance
:param stop_on_404: Stop to download the following data when a 404 returned
:return: the date range is between [since_ms, until_ms), return an empty DataFrame if no data
available in the time range
"""
try:
symbol = markets[pair]["id"]
start = dt_from_ts(since_ms)
end = dt_from_ts(until_ms) if until_ms else dt_now()
# We use two days ago as the last available day because the daily archives are daily
# uploaded and have several hours delay
last_available_date = dt_now() - timedelta(days=2)
end = min(end, last_available_date)
if start >= end:
return DataFrame()
df = await _download_archive_ohlcv(
symbol, pair, timeframe, candle_type, start, end, stop_on_404
)
logger.debug(
f"Downloaded data for {pair} from https://data.binance.vision with length {len(df)}."
)
except Exception as e:
logger.warning(
"An exception occurred during fast download from Binance, falling back to "
"the slower REST API, this can take more time.",
exc_info=e,
)
df = DataFrame()
if not df.empty:
# only return the data within the requested time range
return df.loc[(df["date"] >= start) & (df["date"] < end)]
else:
return df
def concat_safe(dfs) -> DataFrame:
if all(df is None for df in dfs):
return DataFrame()
else:
return pd.concat(dfs)
async def _download_archive_ohlcv(
symbol: str,
pair: str,
timeframe: str,
candle_type: CandleType,
start: date,
end: date,
stop_on_404: bool,
) -> DataFrame:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
dfs: list[DataFrame | None] = []
# the current day being processing, starting at 1.
current_day = 0
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 1000):
tasks = [
asyncio.create_task(get_daily_ohlcv(symbol, timeframe, candle_type, date, session))
for date in dates
]
for task in tasks:
current_day += 1
try:
df = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
# A 404 error on the first day indicates missing data
# on https://data.binance.vision, we provide the warning and the advice.
# https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7
if current_day == 1:
logger.warning(
f"Fast download is unavailable due to missing data: "
f"{e.url}. Falling back to the slower REST API, "
"which may take more time."
)
if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]:
logger.warning(
f"To avoid the delay, you can first download {pair} using "
"`--timerange <start date>-20200101`, and then download the "
"remaining data with `--timerange 20200101-<end date>`."
)
else:
logger.warning(
f"Binance fast download for {pair} stopped at {e.date} due to "
f"missing data: {e.url}, falling back to rest API for the "
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
else:
dfs.append(None)
except Exception as e:
logger.warning(f"An exception raised: {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
else:
dfs.append(df)
return concat_safe(dfs)
async def cancel_and_await_tasks(unawaited_tasks):
"""Cancel and await the tasks"""
logger.debug("Try to cancel uncompleted download tasks.")
for task in unawaited_tasks:
task.cancel()
await asyncio.gather(*unawaited_tasks, return_exceptions=True)
logger.debug("All download tasks were awaited.")
def date_range(start: date, end: date):
date = start
while date <= end:
yield date
date += timedelta(days=1)
def binance_vision_zip_name(symbol: str, timeframe: str, date: date) -> str:
return f"{symbol}-{timeframe}-{date.strftime('%Y-%m-%d')}.zip"
def candle_type_to_url_segment(candle_type: CandleType) -> str:
if candle_type == CandleType.SPOT:
return "spot"
elif candle_type == CandleType.FUTURES:
return "futures/um"
else:
raise ValueError(f"Unsupported CandleType: {candle_type}")
def binance_vision_ohlcv_zip_url(
symbol: str, timeframe: str, candle_type: CandleType, date: date
) -> str:
"""
example urls:
https://data.binance.vision/data/spot/daily/klines/BTCUSDT/1s/BTCUSDT-1s-2023-10-27.zip
https://data.binance.vision/data/futures/um/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2023-10-27.zip
"""
asset_type_url_segment = candle_type_to_url_segment(candle_type)
url = (
f"https://data.binance.vision/data/{asset_type_url_segment}/daily/klines/{symbol}"
f"/{timeframe}/{binance_vision_zip_name(symbol, timeframe, date)}"
)
return url
def binance_vision_trades_zip_url(symbol: str, candle_type: CandleType, date: date) -> str:
"""
example urls:
https://data.binance.vision/data/spot/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
https://data.binance.vision/data/futures/um/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
"""
asset_type_url_segment = candle_type_to_url_segment(candle_type)
url = (
f"https://data.binance.vision/data/{asset_type_url_segment}/daily/aggTrades/{symbol}"
f"/{symbol}-aggTrades-{date.strftime('%Y-%m-%d')}.zip"
)
return url
async def get_daily_ohlcv(
symbol: str,
timeframe: str,
candle_type: CandleType,
date: date,
session: aiohttp.ClientSession,
retry_count: int = 3,
retry_delay: float = 0.0,
) -> DataFrame:
"""
Get daily OHLCV from https://data.binance.vision
See https://github.com/binance/binance-public-data
:symbol: binance symbol name, e.g. BTCUSDT
:timeframe: e.g. 1m, 1h
:candle_type: SPOT or FUTURES
:date: the returned DataFrame will cover the entire day of `date` in UTC
:session: an aiohttp.ClientSession instance
:retry_count: times to retry before returning the exceptions
:retry_delay: the time to wait before every retry
:return: A dataframe containing columns date,open,high,low,close,volume
"""
url = binance_vision_ohlcv_zip_url(symbol, timeframe, candle_type, date)
logger.debug(f"download data from binance: {url}")
retry = 0
while True:
if retry > 0:
sleep_secs = retry * retry_delay
logger.debug(
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
)
await asyncio.sleep(sleep_secs)
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
header = None
else:
header = 0
csvf.seek(0)
df = pd.read_csv(
csvf,
usecols=[0, 1, 2, 3, 4, 5],
names=["date", "open", "high", "low", "close", "volume"],
header=header,
)
df["date"] = pd.to_datetime(
np.where(df["date"] > 1e13, df["date"] // 1000, df["date"]),
unit="ms",
utc=True,
)
return df
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise
async def download_archive_trades(
candle_type: CandleType,
pair: str,
*,
since_ms: int,
until_ms: int | None,
markets: dict[str, Any],
stop_on_404: bool = True,
) -> tuple[str, list[list]]:
try:
symbol = markets[pair]["id"]
last_available_date = dt_now() - timedelta(days=2)
start = dt_from_ts(since_ms)
end = dt_from_ts(until_ms) if until_ms else dt_now()
end = min(end, last_available_date)
if start >= end:
return pair, []
result_list = await _download_archive_trades(
symbol, pair, candle_type, start, end, stop_on_404
)
return pair, result_list
except Exception as e:
logger.warning(
"An exception occurred during fast trades download from Binance, falling back to "
"the slower REST API, this can take a lot more time.",
exc_info=e,
)
return pair, []
def parse_trades_from_zip(csvf):
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = None
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
else:
# futures
header = 0
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
names=names,
header=header,
)
df.loc[:, "cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df.loc[:, "side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df.loc[:, "type"] = None
# Convert timestamp to ms
df.loc[:, "timestamp"] = np.where(
df["timestamp"] > 1e13,
df["timestamp"] // 1000,
df["timestamp"],
)
return df.loc[:, DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
async def get_daily_trades(
symbol: str,
candle_type: CandleType,
date: date,
session: aiohttp.ClientSession,
retry_count: int = 3,
retry_delay: float = 0.0,
) -> list[list]:
"""
Get daily OHLCV from https://data.binance.vision
See https://github.com/binance/binance-public-data
:symbol: binance symbol name, e.g. BTCUSDT
:candle_type: SPOT or FUTURES
:date: the returned DataFrame will cover the entire day of `date` in UTC
:session: an aiohttp.ClientSession instance
:retry_count: times to retry before returning the exceptions
:retry_delay: the time to wait before every retry
:return: a list containing trades in DEFAULT_TRADES_COLUMNS format
"""
url = binance_vision_trades_zip_url(symbol, candle_type, date)
logger.debug(f"download trades data from binance: {url}")
retry = 0
while True:
if retry > 0:
sleep_secs = retry * retry_delay
logger.debug(
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
)
await asyncio.sleep(sleep_secs)
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
return parse_trades_from_zip(csvf)
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
logger.info("download Daily_trades raised: %s", e)
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise
async def _download_archive_trades(
symbol: str,
pair: str,
candle_type: CandleType,
start: date,
end: date,
stop_on_404: bool,
) -> list[list]:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
results: list[list] = []
# the current day being processing, starting at 1.
current_day = 0
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 30):
tasks = [
asyncio.create_task(get_daily_trades(symbol, candle_type, date, session))
for date in dates
]
for task in tasks:
current_day += 1
try:
result = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
# A 404 error on the first day indicates missing data
# on https://data.binance.vision, we provide the warning and the advice.
# https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7
if current_day == 1:
logger.warning(
f"Fast download is unavailable due to missing data: "
f"{e.url}. Falling back to the slower REST API, "
"which may take more time."
)
if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]:
logger.warning(
f"To avoid the delay, you can first download {pair} using "
"`--timerange <start date>-20200101`, and then download the "
"remaining data with `--timerange 20200101-<end date>`."
)
else:
logger.warning(
f"Binance fast download for {pair} stopped at {e.date} due to "
f"missing data: {e.url}, falling back to rest API for the "
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
except Exception as e:
logger.warning(f"An exception raised: {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
else:
# Happy case
results.extend(result)
return results