Source code for asyncyt.utils

import inspect
from pathlib import Path
import hashlib
from typing import TYPE_CHECKING
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

if TYPE_CHECKING:
    from .basemodels import DownloadConfig

__all__ = [
    "call_callback",
    "get_unique_filename",
    "get_id",
    "get_unique_path",
    "clean_youtube_url",
]


[docs] async def call_callback(callback, *args, **kwargs): """ Call a callback, supporting both coroutine and regular functions. :param callback: The callback function to call. :param args: Positional arguments for the callback. :param kwargs: Keyword arguments for the callback. """ if inspect.iscoroutinefunction(callback): await callback(*args, **kwargs) else: callback(*args, **kwargs)
[docs] def get_unique_filename(file: Path, title: str) -> Path: """ Generate a unique filename in the same directory, avoiding overwrites. :param file: Original file path. :type file: Path :param title: Desired title for the file. :type title: str :return: Unique file path. :rtype: Path """ base = file.with_name(title).with_suffix(file.suffix) new_file = base counter = 1 while new_file.exists(): new_file = file.with_name(f"{title} ({counter}){file.suffix}") counter += 1 return new_file
[docs] def get_id(url: str, config: "DownloadConfig"): """ Generate a unique ID for a download based on URL and config. :param url: Download URL. :type url: str :param config: Download configuration. :type config: DownloadConfig :return: SHA256 hash string. :rtype: str """ combined = url + config.model_dump_json() return hashlib.sha256(combined.encode()).hexdigest()
[docs] def get_unique_path(dir: Path, name: str) -> Path: """ Get Unique Path if path exists :param dir: The dir of the file :type dir: Path :param name: the Original File name :type name: str :return: The Unique Path for that dir, for example `Videos/Unique Video (2).mp4` :rtype: Path """ base = dir / name if not base.exists(): return base stem = base.stem suffix = base.suffix counter = 2 while True: new_name = f"{stem} ({counter}){suffix}" candidate = dir / new_name if not candidate.exists(): return candidate counter += 1
[docs] def clean_youtube_url(url: str) -> str: """ Clean any YouTube URL (watch, youtu.be, shorts, embed) into its core form. :param url: The youtube URL :type url: str :return: Cleaned YouTube URL. """ parsed = urlparse(url) # short link URL if parsed.netloc in ["youtu.be"]: video_id = parsed.path.lstrip("/") qs = parse_qs(parsed.query) params = {"v": video_id} return f"https://www.youtube.com/watch?{urlencode(params)}" # shorts URL if "youtube.com" in parsed.netloc and parsed.path.startswith("/shorts/"): video_id = parsed.path.split("/")[2] qs = parse_qs(parsed.query) params = {"v": video_id} return f"https://www.youtube.com/watch?{urlencode(params)}" # embed URL if "youtube.com" in parsed.netloc and parsed.path.startswith("/embed/"): video_id = parsed.path.split("/")[2] qs = parse_qs(parsed.query) params = {"v": video_id} return f"https://www.youtube.com/watch?{urlencode(params)}" # Standard URL if parsed.netloc in ["www.youtube.com", "youtube.com"] and parsed.path == "/watch": qs = parse_qs(parsed.query) params = {} if "v" in qs: params["v"] = qs["v"][0] parsed = parsed._replace(query=urlencode(params, doseq=True)) return urlunparse(parsed) return url