"""
AsyncYT - A comprehensive async Any website downloader library
Uses yt-dlp and ffmpeg with automatic binary management
"""
import asyncio
from json import loads
import os
import re
from pathlib import Path
import shutil
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union, overload
from collections.abc import Callable as Callable2
import logging
import warnings
import tempfile
from .enums import ProgressStatus
from .exceptions import *
from .basemodels import *
from .utils import (
call_callback,
clean_youtube_url,
get_id,
get_unique_filename,
get_unique_path,
)
from .binaries import AsyncFFmpeg
logger = logging.getLogger(__name__)
__all__ = ["AsyncYT", "Downloader"]
[docs]
class AsyncYT(AsyncFFmpeg):
"""
AsyncYT: Asynchronous YouTube Downloader and Searcher
This class provides asynchronous methods for downloading YouTube videos, playlists, and searching for videos using yt-dlp and FFmpeg. It supports progress tracking, flexible configuration, and API-friendly response formats.
:param bin_dir: Path to the directory containing yt-dlp and FFmpeg binaries.
:type bin_dir: Optional[str | Path]
"""
def __init__(self, bin_dir: Optional[str | Path] = None):
"""
Initialize the AsyncYT instance.
:param bin_dir: Directory path for binary files (yt-dlp, FFmpeg).
:type bin_dir: Optional[str | Path]
"""
super().__init__(setup_only_ffmpeg=False, bin_dir=bin_dir)
[docs]
async def get_video_info(self, url: str) -> VideoInfo:
"""
Asynchronously retrieve video information from a given URL using yt-dlp.
:param url: The URL of the video to retrieve information for.
:type url: str
:return: VideoInfo object containing the video's metadata.
:rtype: VideoInfo
:raises YtdlpGetInfoError: If yt-dlp fails to retrieve video information.
"""
url = clean_youtube_url(url)
cmd = [str(self.ytdlp_path), "--dump-json", "--no-warnings", url]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpGetInfoError(url, process.returncode, stderr.decode())
data = loads(stdout.decode())
return VideoInfo.from_dict(data)
async def _search(self, query: str, max_results: int = 10) -> List[VideoInfo]:
"""
Search for videos by query.
:param query: Search query string.
:type query: str
:param max_results: Maximum number of results to return.
:type max_results: int
:return: List of VideoInfo objects.
:rtype: List[VideoInfo]
:raises YtdlpSearchError: If yt-dlp search fails.
"""
search_url = f"ytsearch{max_results}:{query}"
cmd = [str(self.ytdlp_path), "--dump-json", "--no-warnings", search_url]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpSearchError(query, process.returncode, stderr.decode())
results = []
for line in stdout.decode().strip().split("\n"):
if line:
data = loads(line)
results.append(VideoInfo.from_dict(data))
return results
def _get_config(
self,
*args,
**kwargs: Dict[
str,
Union[
str,
Optional[DownloadConfig],
Optional[Callable[[DownloadProgress], Union[None, Awaitable[None]]]],
],
],
):
"""
Parse and validate download configuration arguments.
:param args: Positional arguments (url, config, progress_callback, DownloadRequest).
:param kwargs: Keyword arguments (url, config, progress_callback, request).
:return: Tuple of (url, config, progress_callback)
:rtype: Tuple[str, Optional[DownloadConfig], Optional[Callable]]
:raises TypeError: If arguments are invalid.
"""
url: Optional[str] = None
config: Optional[DownloadConfig] = None
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None
if "url" in kwargs:
url = kwargs.get("url") # type: ignore
if not isinstance(url, str):
raise TypeError("url must be str!")
if "config" in kwargs:
config = kwargs.get("config") # type: ignore
if not isinstance(config, DownloadConfig):
raise TypeError("config must be DownloadConfig!")
if "progress_callback" in kwargs:
progress_callback = kwargs.get("progress_callback") # type: ignore
if not isinstance(progress_callback, Callable2):
raise TypeError("progress_callback must be callable!")
if "request" in kwargs:
request = kwargs.get("request")
if not isinstance(request, DownloadRequest):
raise TypeError("request must be DownloadRequest!")
url = request.url
config = request.config
for arg in args:
if isinstance(arg, str):
url = arg
elif isinstance(arg, DownloadConfig):
config = arg
elif isinstance(arg, Callable):
progress_callback = arg
elif isinstance(arg, DownloadRequest):
url = arg.url
config = arg.config
if not url:
raise TypeError("url is a must!")
return (url, config, progress_callback)
@overload
async def download(
self,
url: str,
config: Optional[DownloadConfig] = None,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> str: ...
@overload
async def download(
self,
request: DownloadRequest,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> str: ...
[docs]
async def download(self, *args, **kwargs) -> str:
"""
Asynchronously download media from a given URL using yt-dlp, track progress, and process the output file.
:param args: url (str) or request (DownloadRequest)
:param kwargs: config (Optional[DownloadConfig]), progress_callback (Optional[Callable])
:return: The filename of the output File.
:rtype: str
:raises DownloadAlreadyExistsError: If a download with the same ID is already in progress.
:raises YtdlpDownloadError: If yt-dlp returns a non-zero exit code.
:raises Exception: If the output file cannot be determined from yt-dlp output.
:raises DownloadGotCanceledError: If the download is cancelled.
:raises FileNotFoundError: If FFmpeg wasn't installed.
"""
url, config, progress_callback = self._get_config(*args, **kwargs)
if not config:
config = DownloadConfig()
url = clean_youtube_url(url)
id = get_id(url, config)
if id in self._downloads:
raise DownloadAlreadyExistsError(id)
# Ensure output directory exists
output_dir = Path(config.output_path)
output_dir.mkdir(parents=True, exist_ok=True)
temp_dir = tempfile.TemporaryDirectory()
temp_path = Path(temp_dir.name)
logger.debug(temp_path)
config.output_path = str(temp_path.absolute())
if not self.ffmpeg_path:
raise FileNotFoundError("FFmpeg isn't installed")
config.ffmpeg_config.ffmpeg_path = str(self.ffmpeg_path)
_original_embed_thumbnail = config.embed_thumbnail
_original_write_thumbnail = config.write_thumbnail
if config.embed_thumbnail:
config.embed_thumbnail = False
config.write_thumbnail = True
# Build yt-dlp command
cmd = await self._build_download_command(url, config)
# Create progress tracker
progress = DownloadProgress(url=url, percentage=0, id=id)
output_file: Optional[str] = None # Initialize properly
try:
# Execute download
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=temp_path,
)
self._downloads[id] = process
output: List[str] = []
# Monitor yt-dlp progress
async for line in self._read_process_output(process):
line = line.strip()
output.append(line)
if line:
old_percentage = progress.percentage
self._parse_progress(line, progress)
if progress_callback and progress.percentage > old_percentage:
await call_callback(progress_callback, progress)
# Robust output filename extraction
valid_exts = (
# Audio
".mp3",
".m4a",
".wav",
".flac",
".ogg",
".opus",
".aac",
# Video
".mp4",
".webm",
".mkv",
".avi",
".flv",
".mov",
)
if not output_file and line.lower().endswith(valid_exts):
# Verify the file actually exists and is an absolute path
if os.path.isabs(line) and os.path.exists(line):
output_file = line
returncode = await process.wait()
if returncode != 0:
raise YtdlpDownloadError(
url=url, output=output, cmd=cmd, error_code=returncode
)
if not output_file:
raise Exception("Could not determine output file from yt-dlp")
if progress_callback:
progress.status = ProgressStatus.DOWNLOADED
progress.percentage = 100.0
await call_callback(progress_callback, progress)
config.embed_thumbnail = _original_embed_thumbnail
config.write_thumbnail = _original_write_thumbnail
result = await self.process(
output_file,
config.ffmpeg_config,
config,
progress_callback,
progress,
url,
)
config.output_path = str(output_dir)
for item in temp_path.iterdir():
dest_path = output_dir / item.name
if dest_path.exists():
if config.ffmpeg_config.overwrite:
destination = dest_path
else:
destination = get_unique_path(output_dir, item.name)
destination = Path(destination)
else:
destination = dest_path
try:
shutil.move(str(item), str(destination))
except Exception as e:
logger.error(f"Failed to move {item} to {destination}: {e}")
temp_dir.cleanup()
return result
except asyncio.CancelledError:
if id in self._downloads:
process = self._downloads[id]
process.kill()
await process.wait()
raise DownloadGotCanceledError(id)
finally:
self._downloads.pop(id, None)
[docs]
async def cancel(self, download_id: str):
"""
Cancel the downloading with download_id.
:param download_id: The ID of the download to cancel.
:type download_id: str
:raises DownloadNotFoundError: If the download ID is not found.
"""
process = self._downloads.pop(download_id, None)
if not process:
raise DownloadNotFoundError(download_id)
process.kill()
await process.wait()
@overload
async def download_with_response(
self,
url: str,
config: Optional[DownloadConfig] = None,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> DownloadResponse: ...
@overload
async def download_with_response(
self,
request: DownloadRequest,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> DownloadResponse: ...
[docs]
async def download_with_response(self, *args, **kwargs) -> DownloadResponse:
"""
Download with API-friendly response format.
:param args: url (str) or request (DownloadRequest)
:param kwargs: config (Optional[DownloadConfig]), progress_callback (Optional[Callable])
:return: DownloadResponse object with metadata and error info.
:rtype: DownloadResponse
"""
try:
url, config, progress_callback = self._get_config(*args, **kwargs)
config = config or DownloadConfig()
id = get_id(url, config)
# Get video info first
try:
video_info = await self.get_video_info(url)
except YtdlpGetInfoError as e:
return DownloadResponse(
success=False,
message="Failed to get video information",
error=f"error code: {e.error_code}\nOutput: {e.output}",
id=id,
)
except Exception as e:
return DownloadResponse(
success=False,
message="Failed to get video information",
error=str(e),
id=id,
)
# Download the video
filename = await self.download(url, config, progress_callback)
file = Path(filename)
title = re.sub(r'[\\/:"*?<>|]', "_", video_info.title)
new_file = get_unique_filename(file, title)
file = file.rename(new_file)
return DownloadResponse(
success=True,
message="Download completed successfully",
filename=str(file.absolute()),
video_info=video_info,
id=id,
)
except AsyncYTBase:
raise
except Exception as e:
return DownloadResponse(
success=False, message="Download failed", error=str(e), id=id
)
@overload
async def search(
self, query: str, max_results: Optional[int] = None
) -> "SearchResponse": ...
@overload
async def search(self, *, request: "SearchRequest") -> "SearchResponse": ...
[docs]
async def search(
self,
query: Optional[str] = None,
max_results: Optional[int] = None,
*,
request: Optional["SearchRequest"] = None,
) -> SearchResponse:
"""
Perform an asynchronous search operation.
:param query: The search query string. Required if `request` is not provided.
:type query: Optional[str]
:param max_results: Maximum number of results to return. Defaults to 10.
:type max_results: Optional[int]
:param request: Optional SearchRequest object containing search parameters.
:type request: Optional[SearchRequest]
:return: SearchResponse object with results and status.
:rtype: SearchResponse
:raises TypeError: If both `request` and either `query` or `max_results` are provided, or if neither is provided.
"""
if request is not None:
if query is not None or max_results is not None:
raise TypeError(
"If you provide request, you cannot provide query, or max_results."
)
else:
if query is None:
raise TypeError("You must provide query when request is not given.")
if request:
query = request.query
max_results = request.max_results
if max_results is None:
max_results = 10
try:
results = await self._search(query, max_results) # type: ignore
return SearchResponse(
success=True,
message=f"Found {len(results)} results",
results=results,
total_results=len(results),
)
except Exception as e:
return SearchResponse(success=False, message="Search failed", error=str(e))
@overload
async def download_playlist(
self,
url: str,
config: Optional[DownloadConfig] = None,
max_videos: Optional[int] = None,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> PlaylistResponse: ...
@overload
async def download_playlist(
self,
*,
request: PlaylistRequest,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
) -> PlaylistResponse: ...
[docs]
async def download_playlist(
self,
url: Optional[str] = None,
config: Optional[DownloadConfig] = None,
max_videos: Optional[int] = None,
progress_callback: Optional[
Callable[[DownloadProgress], Union[None, Awaitable[None]]]
] = None,
request: Optional[PlaylistRequest] = None,
) -> PlaylistResponse:
"""
Asynchronously download videos from a YouTube playlist.
You can provide either a `request` object containing all parameters, or specify `url`, `config`, and `max_videos` individually. If `request` is provided, you must not provide `url`, `config`, or `max_videos`.
:param url: The URL of the playlist to download. Required if `request` is not given.
:type url: Optional[str]
:param config: Download configuration options.
:type config: Optional[DownloadConfig]
:param max_videos: Maximum number of videos to download from the playlist. Defaults to 100.
:type max_videos: Optional[int]
:param progress_callback: Optional callback to report download progress.
:type progress_callback: Optional[Callable[[DownloadProgress], Union[None, Awaitable[None]]]]
:param request: An object containing all playlist download parameters.
:type request: Optional[PlaylistRequest]
:return: PlaylistResponse object with download results.
:rtype: PlaylistResponse
:raises TypeError: If both `request` and any of `url`, `config`, or `max_videos` are provided, or if neither is provided.
"""
if request is not None:
if url is not None or config is not None or max_videos is not None:
raise TypeError(
"If you provide request, you cannot provide url, config, or max_videos."
)
else:
if url is None:
raise TypeError("You must provide url when request is not given.")
if request:
url = request.url
config = request.config
max_videos = request.max_videos
if not max_videos:
max_videos = 100
if not url:
raise TypeError("the URL is must.") # even tho it will not be ever raised
try:
config = config or DownloadConfig()
id = get_id(url, config)
# Get playlist info
playlist_info = await self.get_playlist_info(url)
total_videos = min(len(playlist_info["entries"]), max_videos)
downloaded_files = []
failed_downloads = []
for i, video_entry in enumerate(playlist_info["entries"][:max_videos]):
try:
if progress_callback:
overall_progress = DownloadProgress(
url=url,
title=f"Playlist item {i+1}/{total_videos}",
percentage=(i / total_videos) * 100,
id=id,
)
progress_callback(overall_progress)
filename = await self.download(video_entry["webpage_url"], config)
downloaded_files.append(filename)
except Exception as e:
failed_downloads.append(
f"{video_entry.get('title', 'Unknown')}: {str(e)}"
)
return PlaylistResponse(
success=True,
message=f"Downloaded {len(downloaded_files)} out of {total_videos} videos",
downloaded_files=downloaded_files,
failed_downloads=failed_downloads,
total_videos=total_videos,
successful_downloads=len(downloaded_files),
)
except Exception as e:
return PlaylistResponse(
success=False,
message="Playlist download failed",
error=str(e),
total_videos=0,
successful_downloads=0,
)
[docs]
async def get_playlist_info(self, url: str) -> Dict[str, Any]:
"""
Asynchronously retrieve information about a YouTube playlist using yt-dlp.
:param url: The URL of the YouTube playlist.
:type url: str
:return: Dictionary containing playlist entries and title.
:rtype: Dict[str, Any]
:raises YtdlpPlaylistGetInfoError: If the yt-dlp process fails to retrieve playlist information.
"""
cmd = [
str(self.ytdlp_path),
"--dump-json",
"--flat-playlist",
"--no-warnings",
url,
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise YtdlpPlaylistGetInfoError(url, process.returncode, stderr.decode())
entries = []
for line in stdout.decode().strip().split("\n"):
if line:
entries.append(loads(line))
return {
"entries": entries,
"title": (
entries[0].get("playlist_title", "Unknown Playlist")
if entries
else "Empty Playlist"
),
}
# TODO: also add basemodel for just the playlist downloading and the get_playlist_info
class DeprecatedDownloader(AsyncYT):
"""
.. deprecated::
Use :class:`AsyncYT` instead. This class will be removed in a future release.
"""
def __init__(self, bin_dir: Optional[str | Path] = None):
"""
.. deprecated::
Use :class:`AsyncYT` instead. This class will be removed in a future release.
"""
warnings.warn(
"Downloader is deprecated and will be removed in a future release. "
"Please use AsyncYT instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(bin_dir)
Downloader = DeprecatedDownloader