"""
AsyncYT - A comprehensive async Any website downloader library
Uses yt-dlp and ffmpeg with automatic binary management
"""
import asyncio
from asyncio.subprocess import Process
import os
import platform
import shutil
import zipfile
from pathlib import Path
from typing import (
Any,
AsyncGenerator,
Dict,
List,
Optional,
)
import aiofiles
import aiohttp
import logging
from ._version import __version__
from .basemodels import *
from .enums import *
from .exceptions import AsyncYTBase
logger = logging.getLogger(__name__)
__all__ = ["BinaryManager"]
[docs]
class BinaryManager:
"""
Main Manager for managing binaries.
:param bin_dir: Directory for binary files (yt-dlp, ffmpeg).
:type bin_dir: Optional[str | Path]
"""
def __init__(self, bin_dir: Optional[str | Path] = None):
if isinstance(bin_dir, str):
bin_dir = Path(bin_dir)
if bin_dir and bin_dir.exists() and not bin_dir.is_dir():
raise ValueError(f"Path {bin_dir} not dir!")
self.bin_dir = bin_dir or Path.cwd() / "bin"
system = platform.system().lower()
if system == "darwin":
self.ytdlp_path = self.bin_dir / "yt-dlp_macos"
elif system == "windows":
self.ytdlp_path = self.bin_dir / "yt-dlp.exe"
else:
self.ytdlp_path = self.bin_dir / "yt-dlp"
self.ffmpeg_path = (
self.bin_dir / "ffmpeg.exe" if system == "windows" else "ffmpeg"
)
self.ffprobe_path = (
self.bin_dir / "ffprobe.exe" if system == "windows" else "ffprobe"
)
self._downloads: Dict[str, Process] = {}
[docs]
async def setup_binaries_generator(self) -> AsyncGenerator[SetupProgress, Any]:
"""
Download and setup yt-dlp and ffmpeg binaries, yielding SetupProgress.
:return: Async generator yielding SetupProgress objects.
:rtype: AsyncGenerator[SetupProgress, Any]
"""
self.bin_dir.mkdir(exist_ok=True)
# Setup yt-dlp
async for progress in self._setup_ytdlp():
yield progress
# Setup ffmpeg
async for progress in self._setup_ffmpeg():
yield progress
logger.info("All binaries are ready!")
[docs]
async def setup_binaries(self) -> None:
"""
Download and setup yt-dlp and ffmpeg binaries.
:return: None
"""
self.bin_dir.mkdir(exist_ok=True)
# Setup yt-dlp
async for _ in self._setup_ytdlp():
pass
# Setup ffmpeg
async for _ in self._setup_ffmpeg():
pass
logger.info("All binaries are ready!")
async def _setup_ytdlp(self) -> AsyncGenerator[SetupProgress, Any]:
"""
Ensure yt-dlp binary is available and up-to-date.
:return: Async generator yielding SetupProgress objects.
"""
system = platform.system().lower()
if system == "windows":
url = "https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp.exe"
elif system == "darwin":
url = (
"https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp_macos"
)
else:
url = "https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp"
if self.ytdlp_path.exists():
yield SetupProgress(
file="yt-dlp",
download_file_progress=DownloadFileProgress(
status=ProgressStatus.UPDATING,
downloaded_bytes=0,
total_bytes=0,
percentage=0,
),
)
try:
process = await asyncio.create_subprocess_exec(
str(self.ytdlp_path),
"-U",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
logger.warning(f"yt-dlp update failed: {stderr.decode().strip()}")
yield SetupProgress(
file="yt-dlp",
download_file_progress=DownloadFileProgress(
status=ProgressStatus.COMPLETED,
downloaded_bytes=0,
total_bytes=0,
percentage=100,
),
)
except Exception as e:
logger.warning(f"yt-dlp update encountered an error: {e}")
else:
logger.info("Downloading yt-dlp...")
async for progress in self._download_file(url, self.ytdlp_path):
yield SetupProgress(file="yt-dlp", download_file_progress=progress)
if system != "windows":
os.chmod(self.ytdlp_path, 0o755)
yield SetupProgress(
file="yt-dlp",
download_file_progress=DownloadFileProgress(
status=ProgressStatus.COMPLETED,
downloaded_bytes=0,
total_bytes=0,
percentage=100,
),
)
async def _setup_ffmpeg(self) -> AsyncGenerator[SetupProgress, Any]:
"""
Download ffmpeg binary.
:return: Async generator yielding SetupProgress objects.
:rtype: AsyncGenerator[SetupProgress, Any]
"""
system = platform.system().lower()
self.ffmpeg_path = self.bin_dir / "ffmpeg.exe"
self.ffprobe_path = self.bin_dir / "ffprobe.exe"
if system == "darwin":
self.ffmpeg_path = "ffmpeg"
self.ffprobe_path = "ffprobe"
yield SetupProgress(
file="ffmpeg",
download_file_progress=DownloadFileProgress(
status=ProgressStatus.DOWNLOADING,
downloaded_bytes=0,
total_bytes=0,
percentage=0,
),
)
if shutil.which("brew") is None:
raise AsyncYTBase(
"Homebrew is not installed. Install it from https://brew.sh"
)
process = await asyncio.create_subprocess_exec(
"brew",
"install",
"ffmpeg",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await process.communicate()
if process.returncode != 0:
raise AsyncYTBase(
f"Failed to install ffmpeg via brew: {stderr.decode().strip()}"
)
yield SetupProgress(
file="ffmpeg",
download_file_progress=DownloadFileProgress(
status=ProgressStatus.COMPLETED,
downloaded_bytes=0,
total_bytes=0,
percentage=100,
),
)
return
if not self.ffmpeg_path.exists() or not self.ffprobe_path.exists():
logger.info(f"Downloading ffmpeg for {system.capitalize()}...")
arch = "win64" if system == "windows" else "linux64"
url = f"https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-n7.1-latest-{arch}-gpl-7.1.zip"
temp_file = self.bin_dir / "ffmpeg.zip"
progress: DownloadFileProgress = DownloadFileProgress(
status=ProgressStatus.DOWNLOADING,
downloaded_bytes=0,
total_bytes=0,
percentage=0,
)
async for progress in self._download_file(url, temp_file):
yield SetupProgress(file="ffmpeg", download_file_progress=progress)
progress.status = ProgressStatus.EXTRACTING
yield SetupProgress(file="ffmpeg", download_file_progress=progress)
await self._extract_ffmpeg_windows(temp_file)
temp_file.unlink()
async def _extract_ffmpeg_windows(self, zip_path: Path) -> None:
"""
Extract only missing ffmpeg-related binaries from the Windows zip file.
:param zip_path: Path to the ffmpeg zip file.
:type zip_path: Path
:return: None
"""
with zipfile.ZipFile(zip_path, "r") as zip_ref:
for file_info in zip_ref.infolist():
filename = os.path.basename(file_info.filename)
if filename not in ["ffmpeg.exe", "ffprobe.exe"]:
continue
target_file = self.bin_dir / filename
if target_file.exists():
continue
with (
zip_ref.open(file_info) as source,
open(target_file, "wb") as target,
):
shutil.copyfileobj(source, target)
async def _download_file(
self, url: str, filepath: Path, max_retries: int = 5
) -> AsyncGenerator[DownloadFileProgress, Any]:
"""
Download a file asynchronously with retries, timeout, resume support, and file size verification.
:param url: URL to download from.
:type url: str
:param filepath: Path to save the file.
:type filepath: Path
:param max_retries: Maximum number of retries.
:type max_retries: int
:return: Async generator yielding DownloadFileProgress objects.
:rtype: AsyncGenerator[DownloadFileProgress, Any]
:raises AsyncYTBase: If download fails after max_retries.
"""
temp_filepath = filepath.with_suffix(filepath.suffix + ".part")
attempt = 0
backoff = 2
while attempt < max_retries:
try:
resume_pos = 0
if temp_filepath.exists():
resume_pos = temp_filepath.stat().st_size
headers = {}
if resume_pos > 0:
headers["Range"] = f"bytes={resume_pos}-"
timeout_obj = aiohttp.ClientTimeout(
total=None, sock_connect=30, sock_read=300
)
async with aiohttp.ClientSession(timeout=timeout_obj) as session:
async with session.get(url, headers=headers) as response:
if response.status in (200, 206):
mode = (
"ab"
if resume_pos > 0 and response.status == 206
else "wb"
)
async with aiofiles.open(temp_filepath, mode) as f:
downloaded = resume_pos
total = (
int(response.headers.get("Content-Length", 0))
+ resume_pos
if response.status == 206
else int(response.headers.get("Content-Length", 0))
)
chunk_size = 4096
async for chunk in response.content.iter_chunked(
chunk_size
):
await f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if total > 0:
percent = (downloaded / total) * 100
else:
percent = 0
yield DownloadFileProgress(
status=ProgressStatus.DOWNLOADING,
downloaded_bytes=downloaded,
total_bytes=total,
percentage=percent,
)
# Verify file size (only if we know the expected size)
if total > 0 and temp_filepath.stat().st_size != total:
raise AsyncYTBase(
f"Incomplete download for {filepath.name}: expected {total}, got {temp_filepath.stat().st_size}"
)
temp_filepath.rename(filepath)
return
else:
raise AsyncYTBase(
f"Failed to download {url}: {response.status}"
)
except asyncio.TimeoutError as e:
attempt += 1
wait = min(backoff**attempt, 60) # Cap wait time at 60 seconds
logger.warning(
f"Download attempt {attempt} timed out for {url}: {e}. Retrying in {wait}s..."
)
await asyncio.sleep(wait)
except Exception as e:
attempt += 1
wait = min(backoff**attempt, 60) # Cap wait time at 60 seconds
logger.warning(
f"Download attempt {attempt} failed for {url}: {e}. Retrying in {wait}s..."
)
await asyncio.sleep(wait)
raise AsyncYTBase(f"Failed to download {url} after {max_retries} attempts.")
[docs]
async def health_check(self) -> HealthResponse:
"""
Perform a health check on the required binaries (yt-dlp and ffmpeg).
:return: HealthResponse object with health status and binary availability.
:rtype: HealthResponse
:raises Exception: If an unexpected error occurs during the health check process.
"""
try:
# Check yt-dlp
ytdlp_available = False
if self.ytdlp_path and self.ytdlp_path.exists():
try:
process = await asyncio.create_subprocess_exec(
str(self.ytdlp_path),
"--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.communicate()
ytdlp_available = process.returncode == 0
except Exception:
ytdlp_available = False
# Check ffmpeg
ffmpeg_available = False
if self.ffmpeg_path:
try:
ffmpeg_cmd = (
str(self.ffmpeg_path)
if self.ffmpeg_path != "ffmpeg"
else "ffmpeg"
)
process = await asyncio.create_subprocess_exec(
ffmpeg_cmd,
"-version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.communicate()
ffmpeg_available = process.returncode == 0
except Exception:
ffmpeg_available = False
status = "healthy" if (ytdlp_available and ffmpeg_available) else "degraded"
return HealthResponse(
status=status,
yt_dlp_available=ytdlp_available,
ffmpeg_available=ffmpeg_available,
binaries_path=str(self.bin_dir),
version=__version__,
)
except Exception as e:
return HealthResponse(
status="unhealthy",
yt_dlp_available=False,
ffmpeg_available=False,
error=str(e),
version=__version__,
)
def _get_format_selector(self, quality: "Quality") -> str:
"""Get yt-dlp format selector for quality. Use flexible format selection to let FFmpeg handle conversion."""
if quality == Quality.BEST:
return "bestvideo+bestaudio/best"
elif quality == Quality.WORST:
return "worst"
elif quality == Quality.AUDIO_ONLY:
return "bestaudio/best"
elif quality == Quality.VIDEO_ONLY:
return "bestvideo/best"
else:
height = quality.value.replace("p", "")
return f"bestvideo[height<={height}]+bestaudio/best[height<={height}]"
def _build_download_command(self, url: str, config: "DownloadConfig") -> List[str]:
"""
Build the yt-dlp command based on configuration.
:param url: URL to download.
:type url: str
:param config: Download configuration.
:type config: DownloadConfig
:return: List of command arguments for yt-dlp.
:rtype: List[str]
"""
cmd = [str(self.ytdlp_path)]
# Basic options
cmd.extend(
[
"--no-warnings",
"--progress",
"--newline",
"--no-playlist",
]
)
# Quality and format
selected_quality = (
Quality.AUDIO_ONLY if config.extract_audio else Quality(config.quality)
)
cmd.extend(["-f", self._get_format_selector(selected_quality)])
if config.extract_audio:
cmd.extend(["--extract-audio", "--audio-format", "mp3"])
# Output template - use safe template that preserves Unicode
if config.custom_filename:
cmd.extend(["-o", config.custom_filename])
else:
# Use sanitized version but keep Unicode
cmd.extend(["-o", "%(title).200B.%(ext)s"])
# Thumbnail handling
if config.embed_thumbnail or config.write_thumbnail:
cmd.extend(["--write-thumbnail", "--convert-thumbnails", "jpg"])
if config.embed_thumbnail:
cmd.append("--embed-thumbnail")
# Metadata and subtitles
if config.write_subs:
cmd.extend(["--write-subs", "--sub-lang", config.subtitle_lang])
if config.embed_subs:
cmd.append("--embed-subs")
if config.embed_metadata:
cmd.append("--add-metadata")
if config.write_info_json:
cmd.append("--write-info-json")
if config.write_live_chat:
cmd.append("--write-live-chat")
# Network options
if config.cookies_file:
cmd.extend(["--cookies", str(Path(config.cookies_file).resolve())])
if config.proxy:
cmd.extend(["--proxy", config.proxy])
if config.rate_limit:
cmd.extend(["--limit-rate", config.rate_limit])
# Retry options
cmd.extend(
[
"--retries",
str(config.retries),
"--fragment-retries",
str(config.fragment_retries),
]
)
# FFmpeg location
if self.ffmpeg_path:
cmd.extend(["--ffmpeg-location", str(self.ffmpeg_path)])
# Progress template for parsing
cmd.extend(
[
"--progress-template",
"download:PROGRESS|%(progress._percent_str)s|%(progress._downloaded_bytes_str)s|%(progress._total_bytes_str)s|%(progress._speed_str)s|%(progress._eta_str)s",
]
)
cmd.append("--restrict-filenames")
cmd.extend(["--print", "after_move:filepath"])
# Output metadata as JSON for reliable parsing
# cmd.append("--dump-single-json")
# Custom options
for key, value in config.custom_options.items():
if isinstance(value, bool) and value:
cmd.append(f"--{key}")
elif not isinstance(value, bool):
cmd.extend([f"--{key}", str(value)])
cmd.append(url)
return cmd
async def _read_process_output(self, process):
"""
Read process output line by line as UTF-8 (with replacement for bad chars).
:param process: The process to read output from.
:type process: asyncio.subprocess.Process
:return: Async generator yielding lines of output.
"""
assert process.stdout is not None, "Process must have stdout=PIPE"
try:
while True:
line = await process.stdout.readline()
if not line:
break
data = line.decode("utf-8", errors="replace").rstrip()
yield data
except ValueError as e:
if "chunk is longer than limit" in str(e):
# yt-dlp --dump-single-json workaround
data = await process.stdout.read()
data = data.decode("utf-8", errors="replace").rstrip()
yield data
else:
raise
def _parse_progress(self, line: str, progress: DownloadProgress) -> None:
"""
Parse progress information from yt-dlp output.
:param line: Output line from yt-dlp.
:type line: str
:param progress: DownloadProgress object to update.
:type progress: DownloadProgress
:return: None
"""
line = line.strip()
if "Destination:" in line:
# Extract title
progress.title = Path(line.split("Destination: ")[1]).stem
return
if line.startswith("PROGRESS|"):
try:
# Split the custom format: PROGRESS|percentage|downloaded|total|speed|eta
parts = line.split("|")
if len(parts) >= 6:
percentage_str = parts[1].replace("%", "").strip()
downloaded_str = parts[2].strip()
total_str = parts[3].strip()
speed_str = parts[4].strip()
eta_str = parts[5].strip()
# Parse percentage
if percentage_str and percentage_str != "N/A":
progress.percentage = float(percentage_str)
# Parse downloaded bytes
if downloaded_str and downloaded_str != "N/A":
progress.downloaded_bytes = self._parse_size(downloaded_str)
# Parse total bytes
if total_str and total_str != "N/A":
progress.total_bytes = self._parse_size(total_str)
# Parse speed
if speed_str and speed_str != "N/A":
progress.speed = speed_str
# Parse ETA
if eta_str and eta_str != "N/A":
progress.eta = self._parse_time(eta_str)
return
except (ValueError, IndexError) as e:
pass
def _parse_size(self, size_str: str) -> int:
"""
Parse size string (e.g., '10.5MiB', '1.2GB') to bytes.
:param size_str: Size string to parse.
:type size_str: str
:return: Size in bytes.
:rtype: int
"""
if not size_str:
return 0
size_str = size_str.strip().replace("~", "")
# Handle different size units
multipliers = {
"B": 1,
"KiB": 1024,
"KB": 1000,
"MiB": 1024**2,
"MB": 1000**2,
"GiB": 1024**3,
"GB": 1000**3,
"TiB": 1024**4,
"TB": 1000**4,
}
for unit, multiplier in multipliers.items():
if size_str.endswith(unit):
try:
number = float(size_str[: -len(unit)])
return int(number * multiplier)
except ValueError:
return 0
# If no unit, assume bytes
try:
return int(float(size_str))
except ValueError:
return 0
def _parse_time(self, time_str: str) -> int:
"""
Parse time string to seconds.
:param time_str: Time string to parse.
:type time_str: str
:return: Time in seconds.
:rtype: int
"""
try:
parts = time_str.split(":")
if len(parts) == 2:
return int(parts[0]) * 60 + int(parts[1])
elif len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
else:
return int(time_str)
except ValueError:
return 0