"""
core.py
-------
AsyncYT core downloader. Contains the main :class:`AsyncYT` class with methods for video info retrieval, single-video download, playlist download, and search. Also includes internal helper functions and the
``_FfmpegProgressParser`` class for parsing FFmpeg progress output. Exceptions are defined in :mod:`exceptions.py` and data models in :mod:`basemodels.py`. Binary management is handled by :mod:`binaries.py`.
"""
from __future__ import annotations
import asyncio
import logging
import os
import platform
import re
import shutil
import signal
import tempfile
import warnings
from json import loads
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union, overload
from collections.abc import Callable as CallableABC
from .basemodels import (
DownloadConfig,
DownloadProgress,
DownloadRequest,
DownloadResponse,
PlaylistConfig,
PlaylistDownloadProgress,
PlaylistInfo,
PlaylistItemResult,
PlaylistRequest,
PlaylistResponse,
PlaylistVideoInfo,
SearchRequest,
SearchResponse,
VideoInfo,
)
from .builder import build_download_command
from .enums import PlaylistStatus, ProgressStatus
from .exceptions import (
AsyncYTBase,
DownloadAlreadyExistsError,
DownloadGotCanceledError,
DownloadNotFoundError,
PlaylistCancelledError,
PlaylistDownloadError,
YtdlpDownloadError,
YtdlpGetInfoError,
YtdlpPlaylistGetInfoError,
YtdlpSearchError,
)
from .utils import (
call_callback,
clean_youtube_url,
get_id,
get_unique_filename,
get_unique_path,
)
from .binaries import BinaryManager
logger = logging.getLogger(__name__)
__all__ = ["AsyncYT"]
_IS_WINDOWS = platform.system().lower() == "windows"
# [download] 45.3% of ~ 50.00MiB at 3.20MiB/s ETA 00:08
_RE_DOWNLOAD = re.compile(
r"\[download\]\s+"
r"(?P<pct>[\d.]+)%"
r"(?:\s+of\s+~?\s*(?P<total>[\d.]+\s*\S+))?"
r"(?:\s+at\s+(?P<speed>[\d.]+\s*\S+/s))?"
r"(?:\s+ETA\s+(?P<eta>[\d:]+))?"
)
_RE_MERGER = re.compile(r"\[Merger\]", re.IGNORECASE)
_RE_CONVERTOR = re.compile(r"\[VideoConvertor\]|\[ExtractAudio\]", re.IGNORECASE)
_RE_REMUXER = re.compile(r"\[VideoRemuxer\]", re.IGNORECASE)
_RE_DESTINATION = re.compile(
r"(?:\[download\] Destination:|Merging formats into)\s+(.+)"
)
# FFmpeg -progress key=value line (e.g. "out_time=00:00:05.123456")
_RE_FFMPEG_KV = re.compile(r"^(?P<key>[a-zA-Z_]+)=(?P<value>.+)$")
def _parse_eta(eta_str: str) -> int:
parts = eta_str.split(":")
try:
if len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(float(parts[2]))
if len(parts) == 2:
return int(parts[0]) * 60 + int(float(parts[1]))
return int(float(parts[0]))
except (ValueError, IndexError):
return 0
def _out_time_to_seconds(t: str) -> float:
"""Convert 'HH:MM:SS.ffffff' or 'SS.ffffff' to float seconds."""
try:
t = t.strip()
parts = t.split(":")
if len(parts) == 3:
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
if len(parts) == 2:
return float(parts[0]) * 60 + float(parts[1])
return float(parts[0])
except (ValueError, IndexError):
return 0.0
def _bytes_to_human(n: int) -> str:
for unit in ("B", "KiB", "MiB", "GiB"):
if n < 1024:
return f"{n:.1f}{unit}"
n //= 1024
return f"{n:.1f}TiB"
class _FfmpegProgressParser:
"""
Stateful parser for FFmpeg ``-progress pipe:1`` key=value blocks.
FFmpeg emits groups of key=value pairs terminated by
``progress=continue`` or ``progress=end``. We accumulate the block
and flush it once we see the terminator.
"""
def __init__(self, progress: DownloadProgress, total_duration: float):
self.progress = progress
self.total_duration = total_duration
self._block: Dict[str, str] = {}
def feed(self, line: str) -> bool:
"""
Feed one line. Returns True if ``progress`` was updated.
``line`` should already be stripped.
"""
m = _RE_FFMPEG_KV.match(line)
if not m:
return False
key = m.group("key")
value = m.group("value").strip()
if key == "progress":
changed = self._flush()
self._block = {}
if value == "end":
self.progress.encoding_percentage = 100.0
self.progress.status = ProgressStatus.ENCODING
return True
return changed
self._block[key] = value
return False
def _flush(self) -> bool:
b = self._block
if not b:
return False
logger.debug(b)
p = self.progress
p.status = ProgressStatus.ENCODING
changed = False
if "frame" in b:
try:
new = int(b["frame"])
if new != p.encoding_frame: # ← only if changed
p.encoding_frame = new
changed = True
except ValueError:
pass
if "fps" in b:
try:
val = float(b["fps"])
if val > 0 and val != p.encoding_fps: # ← only if changed
p.encoding_fps = val
changed = True
except ValueError:
pass
if "bitrate" in b:
new = b["bitrate"]
if new != p.encoding_bitrate: # ← only if changed
p.encoding_bitrate = new
changed = True
if "total_size" in b:
try:
size_bytes = int(b["total_size"])
if size_bytes > 0:
human = _bytes_to_human(size_bytes)
if human != p.encoding_size: # ← only if changed
p.encoding_size = human
changed = True
except ValueError:
pass
if "speed" in b:
raw = b["speed"].strip()
if raw and raw != "N/A" and raw != p.encoding_speed: # ← only if changed
p.encoding_speed = raw
changed = True
if "out_time" in b:
raw_time = b["out_time"].strip()
if not raw_time.startswith("-"):
if raw_time != p.encoding_time:
p.encoding_time = raw_time
changed = True # ← always fire when time moves
elapsed = _out_time_to_seconds(raw_time)
if self.total_duration > 0:
pct = min(round((elapsed / self.total_duration) * 100, 2), 100.0)
if pct != p.encoding_percentage:
p.encoding_percentage = pct
changed = True
return changed
def _update_download_progress(
line: str,
progress: DownloadProgress,
ffmpeg_parser: _FfmpegProgressParser,
) -> bool:
"""
Parse one line of yt-dlp (or embedded FFmpeg) output.
Returns True if ``progress`` changed and the callback should fire.
"""
stripped = line.strip()
# --- FFmpeg -progress pipe:1 key=value lines ---
# These are interleaved with yt-dlp output when using --external-downloader ffmpeg
if _RE_FFMPEG_KV.match(stripped):
return ffmpeg_parser.feed(stripped)
# --- Phase change markers ---
if _RE_MERGER.search(line):
if progress.status != ProgressStatus.MERGING:
progress.status = ProgressStatus.MERGING
return True
return False
if _RE_CONVERTOR.search(line):
if progress.status != ProgressStatus.ENCODING:
progress.status = ProgressStatus.ENCODING
progress.encoding_percentage = 0.0
return True
return False
if _RE_REMUXER.search(line):
if progress.status != ProgressStatus.REMUXING:
progress.status = ProgressStatus.REMUXING
return True
return False
# --- Title from destination line ---
dm = _RE_DESTINATION.search(line)
if dm:
progress.title = Path(dm.group(1).strip()).stem
return False
# --- yt-dlp download progress line ---
m = _RE_DOWNLOAD.search(line)
if m:
old_pct = progress.percentage
progress.percentage = min(float(m.group("pct")), 100.0)
if m.group("speed"):
progress.speed = m.group("speed").strip()
if m.group("eta"):
progress.eta = _parse_eta(m.group("eta"))
progress.status = ProgressStatus.DOWNLOADING
return progress.percentage != old_pct
return False
async def _kill_process(process: asyncio.subprocess.Process) -> None:
"""
Terminate a process and all its children.
On POSIX, we kill the entire process group.
On Windows, we use ``taskkill /F /T`` to recursively kill the tree.
"""
pid = process.pid
try:
if _IS_WINDOWS:
kill_cmd = ["taskkill", "/F", "/T", "/PID", str(pid)]
kill_proc = await asyncio.create_subprocess_exec(
*kill_cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await kill_proc.wait()
else:
if hasattr(os, "killpg"):
try:
pgid = os.getpgid(pid) # type: ignore
os.killpg(pgid, signal.SIGKILL) # type: ignore
except ProcessLookupError:
pass
except Exception as e:
logger.warning("Failed to kill process %s: %s", pid, e)
finally:
try:
await process.wait()
except Exception:
pass
async def _read_process_output(process: asyncio.subprocess.Process):
"""Async generator yielding decoded stdout lines."""
assert process.stdout is not None
while True:
line = await process.stdout.readline()
if not line:
break
yield line.decode("utf-8", errors="replace")
[docs]
class AsyncYT(BinaryManager):
"""
AsyncYT: Asynchronous media downloader powered by yt-dlp.
Supports single-video downloads, audio extraction, YouTube search,
and native playlist downloads — all with real-time progress callbacks.
FFmpeg encoding/download progress is captured via
``--external-downloader ffmpeg -progress pipe:1`` so you receive
accurate frame, fps, speed, bitrate, size, and percentage updates.
:param bin_dir: Directory containing yt-dlp (and optionally ffmpeg) binaries.
"""
def __init__(self, bin_dir=None):
super().__init__(bin_dir=bin_dir)
# Maps download_id → asyncio subprocess (single videos)
self._downloads: Dict[str, asyncio.subprocess.Process] = {}
# Maps playlist_id → asyncio.Event (set to request cancellation)
self._playlist_cancel_events: Dict[str, asyncio.Event] = {}
[docs]
async def get_video_info(self, url: str) -> VideoInfo:
"""
Retrieve video metadata from *url* using yt-dlp.
:param url: Video URL.
:return: :class:`VideoInfo` with title, duration, thumbnail, etc.
:raises YtdlpGetInfoError: If yt-dlp returns a non-zero exit code.
"""
url = clean_youtube_url(url)
cmd = [str(self.ytdlp_path), "--dump-json", "--no-warnings", url]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpGetInfoError(url, process.returncode, stderr.decode())
return VideoInfo.from_dict(loads(stdout.decode()))
[docs]
async def get_playlist_info(
self,
url: str,
max_videos: Optional[int] = None,
) -> PlaylistInfo:
"""
Fetch full playlist metadata, including per-video thumbnails.
Uses yt-dlp's ``--flat-playlist`` so it is fast — no individual
video pages are fetched. Thumbnails come from the flat data that
YouTube/yt-dlp provides in the playlist manifest.
:param url: Playlist URL.
:param max_videos: Limit to this many entries (None = all).
:return: :class:`PlaylistInfo` with all :class:`PlaylistVideoInfo` entries.
:raises YtdlpPlaylistGetInfoError: If yt-dlp fails.
"""
cmd = [
str(self.ytdlp_path),
"--dump-json",
"--flat-playlist",
"--no-warnings",
url,
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpPlaylistGetInfoError(url, process.returncode, stderr.decode())
raw_lines = stdout.decode().strip().splitlines()
raw_entries = [loads(line) for line in raw_lines if line.strip()]
return PlaylistInfo.from_ytdlp(
raw_entries, playlist_url=url, max_videos=max_videos
)
async def _search(self, query: str, max_results: int = 10) -> List[VideoInfo]:
"""Internal YouTube search via yt-dlp."""
search_url = f"ytsearch{max_results}:{query}"
cmd = [
str(self.ytdlp_path),
"--dump-json",
"--no-warnings",
"--match-filter",
"live_status = 'not_live' & duration > 0",
search_url,
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpSearchError(query, process.returncode, stderr.decode())
return [
VideoInfo.from_dict(loads(line))
for line in stdout.decode().strip().splitlines()
if line.strip()
]
def _get_config(self, *args, **kwargs):
"""
Parse positional / keyword arguments into ``(url, config, callback, finalize)``.
Accepts:
- ``(url: str, config: DownloadConfig, callback, finalize: bool)`` positionally
- ``url=``, ``config=``, ``progress_callback=``, ``finalize=`` as kwargs
- A :class:`DownloadRequest` as first positional arg or ``request=`` kwarg
"""
url: Optional[str] = None
config: Optional[DownloadConfig] = None
progress_callback = None
finalize: bool = True
if "url" in kwargs:
url = kwargs["url"]
if "config" in kwargs:
config = kwargs["config"]
if "progress_callback" in kwargs:
progress_callback = kwargs["progress_callback"]
if "finalize" in kwargs:
finalize = kwargs["finalize"]
if "request" in kwargs:
req = kwargs["request"]
url = req.url
config = req.config
for arg in args:
if isinstance(arg, str):
url = arg
elif isinstance(arg, DownloadConfig):
config = arg
elif isinstance(arg, bool):
finalize = arg
elif isinstance(arg, CallableABC):
progress_callback = arg
elif isinstance(arg, DownloadRequest):
url = arg.url
config = arg.config
if not url:
raise TypeError("url is required")
return url, config, progress_callback, finalize
[docs]
async def finalize_download(
self,
temp_dir: Union[tempfile.TemporaryDirectory, Path],
output_dir: Path,
config: DownloadConfig,
) -> List[Path]:
"""
Move processed files from *temp_dir* to *output_dir*.
:param temp_dir: Temporary directory (cleaned up afterwards).
:param output_dir: Final destination directory.
:param config: Download config (used for overwrite setting).
:return: List of moved :class:`Path` objects.
"""
moved: List[Path] = []
td = (
Path(temp_dir.name)
if isinstance(temp_dir, tempfile.TemporaryDirectory)
else temp_dir
)
try:
output_dir.mkdir(parents=True, exist_ok=True)
for item in td.iterdir():
if item.is_dir():
continue
dest = output_dir / item.name
overwrite = config.encoding.overwrite if config.encoding else False
if dest.exists():
destination = (
dest
if overwrite
else Path(get_unique_path(output_dir, item.name))
)
else:
destination = dest
try:
await asyncio.to_thread(shutil.move, str(item), str(destination))
logger.debug("Moved %s → %s", item, destination)
moved.append(destination)
except Exception as e:
logger.error("Failed to move %s → %s: %s", item, destination, e)
raise
finally:
try:
if isinstance(temp_dir, Path):
if temp_dir.exists():
await asyncio.to_thread(shutil.rmtree, temp_dir)
else:
await asyncio.to_thread(temp_dir.cleanup)
except Exception as e:
logger.warning("Failed to clean temp dir: %s", e)
return moved
@overload
async def download(
self,
url: str,
config: Optional[DownloadConfig] = None,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
finalize: bool = True,
) -> Path: ...
@overload
async def download(
self,
request: DownloadRequest,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
finalize: bool = True,
) -> Path: ...
[docs]
async def download(self, *args, **kwargs) -> Path:
"""
Download a single video (or audio) from *url*.
:param url: Video URL **or** a :class:`DownloadRequest`.
:param config: Optional :class:`DownloadConfig`.
:param progress_callback: Async or sync callable receiving :class:`DownloadProgress`.
:param finalize: Move output from temp dir to ``config.output_path``.
:return: :class:`Path` to the downloaded file.
:raises DownloadAlreadyExistsError: Same download already running.
:raises YtdlpDownloadError: yt-dlp returned a non-zero exit code.
:raises DownloadGotCanceledError: :meth:`cancel` was called.
:raises FileNotFoundError: FFmpeg not found, or no output file produced.
"""
url, config, progress_callback, finalize = self._get_config(*args, **kwargs)
config = config or DownloadConfig()
url = clean_youtube_url(url)
id_ = get_id(url, config)
if id_ in self._downloads:
raise DownloadAlreadyExistsError(id_)
output_dir = Path(config.output_path).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
if not self.ffmpeg_path:
raise FileNotFoundError("FFmpeg is not installed / not found")
# Temp dir — yt-dlp writes here first; we move on completion
temp_dir = tempfile.TemporaryDirectory(delete=False)
temp_path = Path(temp_dir.name)
config_for_run = config.model_copy(update={"output_path": str(temp_path)})
# Pre-fetch duration for encoding_percentage calculation
total_duration = 0.0
try:
info = await self.get_video_info(url)
total_duration = float(info.duration or 0)
except Exception as e:
logger.warning(
"Could not fetch video duration (percentage will be unavailable): %s", e
)
cmd = build_download_command(
ytdlp_path=str(self.ytdlp_path),
ffmpeg_path=str(self.ffmpeg_path),
url=url,
config=config_for_run,
)
logger.debug("yt-dlp command: %s", " ".join(cmd))
progress = DownloadProgress(url=url, percentage=0.0, id=id_)
ffmpeg_parser = _FfmpegProgressParser(progress, total_duration)
last_pct = -1.0
last_enc_pct = -1.0
output: List[str] = []
try:
# On POSIX, start_new_session=True puts the child in its own
# process group so we can killpg() the whole tree on cancel.
kwargs_proc: Dict[str, Any] = dict(
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=temp_path,
)
if not _IS_WINDOWS:
kwargs_proc["start_new_session"] = True
process = await asyncio.create_subprocess_exec(*cmd, **kwargs_proc)
self._downloads[id_] = process
async for line in _read_process_output(process):
output.append(line.rstrip())
if not line.strip():
continue
changed = _update_download_progress(
line.rstrip(), progress, ffmpeg_parser
)
if progress_callback and changed:
should_fire = False
if progress.status == ProgressStatus.ENCODING:
if progress.encoding_percentage != last_enc_pct:
last_enc_pct = progress.encoding_percentage
should_fire = True
else:
if progress.percentage != last_pct:
last_pct = progress.percentage
should_fire = True
if should_fire:
await call_callback(progress_callback, progress)
returncode = await process.wait()
if returncode != 0:
raise YtdlpDownloadError(
url=url, output=output, cmd=cmd, error_code=returncode
)
# Completion
progress.status = ProgressStatus.COMPLETED
progress.percentage = 100.0
progress.encoding_percentage = 100.0
if progress_callback:
await call_callback(progress_callback, progress)
if finalize:
moved = await self.finalize_download(temp_dir, output_dir, config)
if moved:
logger.info("Download completed: %s", moved[0])
return moved[0]
raise FileNotFoundError("No output file found after processing")
files = [f for f in temp_path.iterdir() if f.is_file()]
if files:
return files[0]
raise FileNotFoundError("No output file found in temp dir")
except asyncio.CancelledError:
proc = self._downloads.get(id_)
if proc:
await _kill_process(proc)
raise DownloadGotCanceledError(id_)
except Exception:
raise
finally:
self._downloads.pop(id_, None)
[docs]
async def cancel(self, download_id: str) -> None:
"""
Cancel a running single-video download.
Kills the yt-dlp process **and** any child FFmpeg process.
:param download_id: The ID returned by :func:`get_id`.
:raises DownloadNotFoundError: If no download with that ID is running.
"""
process = self._downloads.pop(download_id, None)
if not process:
raise DownloadNotFoundError(download_id)
await _kill_process(process)
[docs]
async def cancel_playlist(self, playlist_id: str) -> None:
"""
Request cancellation of a running playlist download.
The playlist loop checks the cancel event between each video.
The currently-downloading video is also cancelled immediately.
:param playlist_id: The playlist ID returned by :func:`get_id`.
:raises DownloadNotFoundError: If no playlist with that ID is running.
"""
event = self._playlist_cancel_events.get(playlist_id)
if not event:
raise DownloadNotFoundError(playlist_id)
event.set()
[docs]
async def download_with_response(self, *args, **kwargs) -> DownloadResponse:
"""
Download with an API-friendly :class:`DownloadResponse` return value.
Accepts the same arguments as :meth:`download`.
:return: :class:`DownloadResponse` with metadata and optional error.
"""
id_: Optional[str] = None
try:
url, config, progress_callback, finalize = self._get_config(*args, **kwargs)
config = config or DownloadConfig()
id_ = get_id(url, config)
try:
video_info = await self.get_video_info(url)
except YtdlpGetInfoError as e:
return DownloadResponse(
success=False,
message="Failed to get video information",
error=f"error code: {e.error_code}\nOutput: {e.output}",
id=id_,
)
except Exception as e:
return DownloadResponse(
success=False,
message="Failed to get video information",
error=str(e),
id=id_,
)
filename = await self.download(url, config, progress_callback, finalize)
file = Path(filename)
new_file = get_unique_filename(
file, re.sub(r'[\\/:"*?<>|]', "_", video_info.title)
)
file = file.rename(new_file)
return DownloadResponse(
success=True,
message="Download completed successfully",
filename=str(file.absolute()),
video_info=video_info,
id=id_,
)
except AsyncYTBase:
raise
except Exception as e:
return DownloadResponse(
success=False,
message="Download failed",
error=str(e),
id=id_ or "",
)
[docs]
async def search(
self,
query: Optional[str] = None,
max_results: Optional[int] = None,
*,
request: Optional[SearchRequest] = None,
) -> SearchResponse:
"""
Search YouTube for videos.
:param query: Search query (required when *request* is not given).
:param max_results: Maximum results (default 10, max 50).
:param request: Optional :class:`SearchRequest` (mutually exclusive with *query*).
:return: :class:`SearchResponse` with a list of :class:`VideoInfo` objects.
"""
if request is not None:
if query is not None or max_results is not None:
raise TypeError("Provide request OR (query + max_results), not both.")
else:
if query is None:
raise TypeError("query is required when request is not given.")
if request:
query = request.query
max_results = request.max_results
max_results = max_results or 10
try:
results = await self._search(query, max_results) # type: ignore[arg-type]
return SearchResponse(
success=True,
message=f"Found {len(results)} results",
results=results,
total_results=len(results),
)
except Exception as e:
return SearchResponse(success=False, message="Search failed", error=str(e))
[docs]
async def get_playlist(
self,
url: str,
max_videos: Optional[int] = None,
) -> PlaylistInfo:
"""
Fetch playlist metadata without downloading any videos.
:param url: Playlist URL.
:param max_videos: Limit entries returned (None = all).
:return: :class:`PlaylistInfo` with per-video :class:`PlaylistVideoInfo` entries
(each containing thumbnail URLs).
"""
return await self.get_playlist_info(url, max_videos=max_videos)
[docs]
async def download_playlist(
self,
url: Optional[str] = None,
playlist_config: Optional[PlaylistConfig] = None,
progress_callback: Optional[
Callable[[PlaylistDownloadProgress], Union[None, Awaitable[None]]]
] = None,
*,
request: Optional[PlaylistRequest] = None,
) -> PlaylistResponse:
"""
Download all (or a subset of) videos from a playlist.
Supports concurrent downloads via ``PlaylistConfig.concurrency``.
Progress is reported through *progress_callback* with a
:class:`PlaylistDownloadProgress` that includes both the overall
playlist state and the current video's :class:`DownloadProgress`.
:param url: Playlist URL (required when *request* is not given).
:param playlist_config: Playlist-level configuration.
:param progress_callback: Async or sync callable receiving
:class:`PlaylistDownloadProgress` updates.
:param request: Optional :class:`PlaylistRequest`.
:return: :class:`PlaylistResponse` with per-item results and aggregated stats.
:raises TypeError: If conflicting arguments are supplied.
"""
if request is not None:
if url is not None or playlist_config is not None:
raise TypeError("Provide request OR (url + playlist_config), not both.")
else:
if url is None:
raise TypeError("url is required when request is not given.")
if request:
url = request.url
playlist_config = request.playlist_config
playlist_config = playlist_config or PlaylistConfig()
item_config = playlist_config.item_config or DownloadConfig()
playlist_id = get_id(url, item_config) # type: ignore[arg-type]
# Create cancellation event
cancel_event = asyncio.Event()
self._playlist_cancel_events[playlist_id] = cancel_event
# Live progress object
pl_progress = PlaylistDownloadProgress(
playlist_id=playlist_id,
status=PlaylistStatus.FETCHING_INFO,
)
async def _emit() -> None:
if progress_callback:
await call_callback(progress_callback, pl_progress)
await _emit()
try:
# --- Fetch playlist info ---
try:
# Always fetch the full playlist so video_indices and video_ids
# can be resolved correctly against the complete entry list.
# max_videos pre-limiting is applied later in the range-based path.
playlist_info = await self.get_playlist_info(url, max_videos=None) # type: ignore[arg-type]
except YtdlpPlaylistGetInfoError as exc:
raise PlaylistDownloadError(url, str(exc)) from exc # type: ignore[arg-type]
# Apply entry filtering — priority order:
# 1. video_indices + video_ids (explicit selection, union of both)
# 2. start_index / end_index / max_videos (range-based)
entries = playlist_info.entries
use_explicit = bool(
playlist_config.video_indices or playlist_config.video_ids
)
if use_explicit:
wanted_positions: set[int] = set()
wanted_ids: set[str] = set()
if playlist_config.video_indices:
wanted_positions = set(playlist_config.video_indices)
if playlist_config.video_ids:
wanted_ids = set(playlist_config.video_ids)
filtered: list[PlaylistVideoInfo] = []
for entry in entries:
in_positions = (
entry.playlist_index is not None
and entry.playlist_index in wanted_positions
)
in_ids = bool(entry.id and entry.id in wanted_ids)
if in_positions or in_ids:
filtered.append(entry)
# Warn about any requested indices / ids that matched nothing
matched_positions = {
e.playlist_index for e in filtered if e.playlist_index is not None
}
matched_ids = {e.id for e in filtered if e.id}
missing_pos = wanted_positions - matched_positions
missing_ids = wanted_ids - matched_ids
if missing_pos:
logger.warning(
"video_indices not found in playlist (playlist has %d entries): %s",
len(entries),
sorted(missing_pos),
)
if missing_ids:
logger.warning(
"video_ids not found in playlist: %s",
sorted(missing_ids),
)
entries = filtered
else:
# Range-based selection (original behaviour)
start = playlist_config.start_index - 1 # convert to 0-based
end = playlist_config.end_index # None = all
entries = entries[start:end]
if playlist_config.max_videos:
entries = entries[: playlist_config.max_videos]
if playlist_config.reverse:
entries = list(reversed(entries))
pl_progress.playlist_info = playlist_info
pl_progress.total_videos = len(entries)
pl_progress.status = PlaylistStatus.DOWNLOADING
await _emit()
results: List[PlaylistItemResult] = []
downloaded_files: List[str] = []
semaphore = asyncio.Semaphore(playlist_config.concurrency)
async def _download_one(entry: PlaylistVideoInfo) -> PlaylistItemResult:
"""Download a single playlist entry, respecting the semaphore."""
async with semaphore:
if cancel_event.is_set():
return PlaylistItemResult(
index=entry.playlist_index or 0,
video_info=entry,
success=False,
error="Cancelled",
)
# Per-video progress callback — update pl_progress and re-emit
async def _video_cb(vp: DownloadProgress) -> None:
pl_progress.current_video_progress = vp
if progress_callback:
await call_callback(progress_callback, pl_progress)
pl_progress.current_index = entry.playlist_index or 0
pl_progress.current_video = entry
pl_progress.current_video_progress = None
await _emit()
try:
filepath = await self.download(
entry.url,
item_config,
_video_cb,
)
pl_progress.completed_videos += 1
pl_progress._recalculate_percentage()
result = PlaylistItemResult(
index=entry.playlist_index or 0,
video_info=entry,
success=True,
filepath=str(filepath),
)
except DownloadGotCanceledError:
pl_progress.failed_videos += 1
pl_progress._recalculate_percentage()
result = PlaylistItemResult(
index=entry.playlist_index or 0,
video_info=entry,
success=False,
error="Cancelled",
)
except Exception as exc:
logger.warning(
"Playlist item %s failed: %s",
entry.url,
exc,
)
pl_progress.failed_videos += 1
pl_progress._recalculate_percentage()
result = PlaylistItemResult(
index=entry.playlist_index or 0,
video_info=entry,
success=False,
error=str(exc),
)
if not playlist_config.skip_on_error:
raise
pl_progress.results.append(result)
await _emit()
return result
# --- Run downloads ---
if playlist_config.concurrency == 1:
# Sequential — simpler, friendlier for rate limits
for entry in entries:
if cancel_event.is_set():
break
result = await _download_one(entry)
results.append(result)
if result.filepath:
downloaded_files.append(result.filepath)
else:
# Concurrent
tasks = [asyncio.create_task(_download_one(e)) for e in entries]
try:
done_results = await asyncio.gather(*tasks, return_exceptions=True)
for r in done_results:
if isinstance(r, PlaylistItemResult):
results.append(r)
if r.filepath:
downloaded_files.append(r.filepath)
elif (
isinstance(r, Exception)
and not playlist_config.skip_on_error
):
raise r
except asyncio.CancelledError:
for t in tasks:
t.cancel()
raise
# Detect if cancelled
was_cancelled = cancel_event.is_set()
success = pl_progress.completed_videos > 0
pl_progress.status = (
PlaylistStatus.CANCELLED
if was_cancelled
else PlaylistStatus.COMPLETED if success else PlaylistStatus.FAILED
)
pl_progress.overall_percentage = (
round(pl_progress.completed_videos / len(entries) * 100, 1)
if entries
else 100.0
)
await _emit()
if was_cancelled:
raise PlaylistCancelledError(
playlist_id,
pl_progress.completed_videos,
pl_progress.total_videos,
)
return PlaylistResponse(
success=success,
message=(
f"Downloaded {pl_progress.completed_videos} of "
f"{pl_progress.total_videos} videos"
),
playlist_info=playlist_info,
results=results,
total_videos=pl_progress.total_videos,
successful_downloads=pl_progress.completed_videos,
failed_downloads=pl_progress.failed_videos,
downloaded_files=downloaded_files,
)
except PlaylistCancelledError:
raise
except PlaylistDownloadError:
raise
except asyncio.CancelledError:
raise PlaylistCancelledError(
playlist_id,
pl_progress.completed_videos,
pl_progress.total_videos,
)
except Exception as exc:
pl_progress.status = PlaylistStatus.FAILED
if progress_callback:
await call_callback(progress_callback, pl_progress)
return PlaylistResponse(
success=False,
message="Playlist download failed",
error=str(exc),
total_videos=pl_progress.total_videos,
successful_downloads=pl_progress.completed_videos,
failed_downloads=pl_progress.failed_videos,
)
finally:
self._playlist_cancel_events.pop(playlist_id, None)