myTestFreqAI/freqtrade/rpc/api_server/api_background_tasks.py
Ubuntu 17199e9a44
Some checks failed
Pre-commit auto-update / auto-update (push) Has been cancelled
first add
2025-04-21 21:11:51 +08:00

46 lines
1.4 KiB
Python

import logging
from fastapi import APIRouter
from fastapi.exceptions import HTTPException
from freqtrade.rpc.api_server.api_schemas import BackgroundTaskStatus
from freqtrade.rpc.api_server.webserver_bgwork import ApiBG
logger = logging.getLogger(__name__)
# Private API, protected by authentication and webserver_mode dependency
router = APIRouter()
@router.get("/background", response_model=list[BackgroundTaskStatus], tags=["webserver"])
def background_job_list():
return [
{
"job_id": jobid,
"job_category": job["category"],
"status": job["status"],
"running": job["is_running"],
"progress": job.get("progress"),
"progress_tasks": job.get("progress_tasks"),
"error": job.get("error", None),
}
for jobid, job in ApiBG.jobs.items()
]
@router.get("/background/{jobid}", response_model=BackgroundTaskStatus, tags=["webserver"])
def background_job(jobid: str):
if not (job := ApiBG.jobs.get(jobid)):
raise HTTPException(status_code=404, detail="Job not found.")
return {
"job_id": jobid,
"job_category": job["category"],
"status": job["status"],
"running": job["is_running"],
"progress": job.get("progress"),
"progress_tasks": job.get("progress_tasks"),
"error": job.get("error", None),
}