mirror of
https://github.com/DYefremov/DemonEditor.git
synced 2025-12-29 20:09:41 +01:00
472 lines
17 KiB
Python
472 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# The MIT License (MIT)
|
|
#
|
|
# Copyright (c) 2018-2023 Dmitriy Yefremov
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
#
|
|
# Author: Dmitriy Yefremov
|
|
#
|
|
|
|
|
|
""" Module for working with YouTube service. """
|
|
import gzip
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from html.parser import HTMLParser
|
|
from json import JSONDecodeError
|
|
from urllib import parse
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen, urlretrieve
|
|
|
|
from app.commons import log
|
|
from app.settings import SEP
|
|
from app.ui.uicommons import show_notification
|
|
|
|
_TIMEOUT = 5
|
|
_HEADERS = {"User-Agent": "Mozilla/5.0 (Linux x86_64; rv:92.0) Gecko/20100101 Firefox/92.0",
|
|
"DNT": "1",
|
|
"Accept-Encoding": "gzip, deflate"}
|
|
_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=)([\w-]{11}).*")
|
|
_YT_LIST_PATTERN = re.compile(r"https://www.youtube.com/.+?(?:list=)([\w-]{18,})?.*")
|
|
_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*")
|
|
|
|
Quality = {137: "1080p", 136: "720p", 135: "480p", 134: "360p",
|
|
133: "240p", 160: "144p", 0: "0p", 18: "360p", 22: "720p"}
|
|
|
|
|
|
class YouTubeException(Exception):
|
|
pass
|
|
|
|
|
|
class YouTube:
|
|
""" Helper class for working with YouTube service. """
|
|
|
|
_YT_INSTANCE = None
|
|
_VIDEO_INFO_LINK = "https://youtube.com/get_video_info?video_id={}&hl=en"
|
|
|
|
VIDEO_LINK = "https://www.youtube.com/watch?v={}"
|
|
|
|
def __init__(self, settings, callback):
|
|
self._settings = settings
|
|
self._yt_dl = None
|
|
self._callback = callback
|
|
|
|
if self._settings.enable_yt_dl:
|
|
try:
|
|
self._yt_dl = YouTubeDL.get_instance(self._settings, callback=self._callback)
|
|
except YouTubeException:
|
|
pass # NOP
|
|
|
|
@classmethod
|
|
def get_instance(cls, settings, callback=log):
|
|
if not cls._YT_INSTANCE:
|
|
cls._YT_INSTANCE = YouTube(settings, callback)
|
|
return cls._YT_INSTANCE
|
|
|
|
@staticmethod
|
|
def is_yt_video_link(url):
|
|
return re.match(_YT_VIDEO_PATTERN, url)
|
|
|
|
@staticmethod
|
|
def get_yt_id(url):
|
|
""" Returns video id or None """
|
|
yt = re.search(_YT_PATTERN, url)
|
|
if yt:
|
|
return yt.group(1)
|
|
|
|
@staticmethod
|
|
def get_yt_list_id(url):
|
|
""" Returns playlist id or None """
|
|
yt = re.search(_YT_LIST_PATTERN, url)
|
|
if yt:
|
|
return yt.group(1)
|
|
|
|
def get_yt_link(self, video_id, url=None, skip_errors=False):
|
|
""" Getting link to YouTube video by id or URL.
|
|
|
|
Returns tuple from the video links dict and title.
|
|
"""
|
|
if self._settings.enable_yt_dl and url:
|
|
if not self._yt_dl:
|
|
self._yt_dl = YouTubeDL.get_instance(self._settings, self._callback)
|
|
if not self._yt_dl:
|
|
raise YouTubeException("youtube-dl initialization error.")
|
|
return self._yt_dl.get_yt_link(url, skip_errors)
|
|
|
|
return self.get_yt_link_by_id(video_id)
|
|
|
|
@staticmethod
|
|
def get_yt_link_by_id(video_id):
|
|
""" Getting link to YouTube video by id.
|
|
|
|
Returns tuple from the video links dict and title.
|
|
"""
|
|
info = InnerTube().player(video_id)
|
|
det = info.get("videoDetails", None)
|
|
title = det.get("title", None) if det else None
|
|
streaming_data = info.get("streamingData", None)
|
|
fmts = streaming_data.get("formats", None) if streaming_data else None
|
|
|
|
if fmts:
|
|
links = {Quality[i["itag"]]: i["url"] for i in fmts if i.get("itag", -1) in Quality and "url" in i}
|
|
|
|
if links and title:
|
|
return links, title.replace("+", " ")
|
|
|
|
cause = None
|
|
status = info.get("playabilityStatus", None)
|
|
if status:
|
|
cause = f"[{status.get('status', '')}] {status.get('reason', '')}"
|
|
|
|
log(f"{__class__.__name__}: Getting link to video with id '{video_id}' filed! Cause: {cause}")
|
|
|
|
return None, cause
|
|
|
|
def get_yt_playlist(self, list_id, url=None):
|
|
""" Returns tuple from the playlist header and list of tuples (title, video id). """
|
|
if self._settings.enable_yt_dl and url:
|
|
try:
|
|
if not self._yt_dl:
|
|
raise YouTubeException("youtube-dl is not initialized!")
|
|
|
|
self._yt_dl.update_options({"noplaylist": False, "extract_flat": True})
|
|
info = self._yt_dl.get_info(url, skip_errors=False)
|
|
if "url" in info:
|
|
info = self._yt_dl.get_info(info.get("url"), skip_errors=False)
|
|
|
|
return info.get("title", ""), [(e.get("title", ""), e.get("id", "")) for e in info.get("entries", [])]
|
|
finally:
|
|
# Restoring default options
|
|
if self._yt_dl:
|
|
self._yt_dl.update_options({"noplaylist": True, "extract_flat": False})
|
|
|
|
return PlayListParser.get_yt_playlist(list_id)
|
|
|
|
|
|
class InnerTube:
|
|
""" Object for interacting with the innertube API.
|
|
|
|
Based on InnerTube class from pytube [https://github.com/pytube/pytube] project!
|
|
"""
|
|
_BASE_URI = "https://www.youtube.com/youtubei/v1"
|
|
|
|
_DEFAULT_CLIENTS = {
|
|
"ANDROID": {
|
|
"context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20"}},
|
|
"api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8"
|
|
},
|
|
"ANDROID_EMBED": {
|
|
"context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20", "clientScreen": "EMBED"}},
|
|
"api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8"
|
|
}
|
|
}
|
|
|
|
def __init__(self, client="ANDROID"):
|
|
""" Initialize an InnerTube object.
|
|
|
|
@param client: Client to use for the object. Default to web because it returns the most playback types.
|
|
"""
|
|
self.context = self._DEFAULT_CLIENTS[client]["context"]
|
|
self.api_key = self._DEFAULT_CLIENTS[client]["api_key"]
|
|
|
|
@property
|
|
def base_data(self):
|
|
"""Return the base json data to transmit to the innertube API."""
|
|
return {"context": self.context}
|
|
|
|
@property
|
|
def base_params(self):
|
|
"""Return the base query parameters to transmit to the innertube API."""
|
|
return {"key": self.api_key, "contentCheckOk": True, "racyCheckOk": True}
|
|
|
|
def player(self, video_id):
|
|
""" Make a request to the player endpoint. Returns raw player info results. """
|
|
endpoint = f"{self._BASE_URI}/player"
|
|
query = {"videoId": video_id}
|
|
query.update(self.base_params)
|
|
return self._call_api(endpoint, query, self.base_data) or {}
|
|
|
|
@staticmethod
|
|
def _call_api(endpoint, query, data):
|
|
""" Make a request to a given endpoint with the provided query parameters and data."""
|
|
headers = {"Content-Type": "application/json", }
|
|
response = InnerTube._execute(f"{endpoint}?{parse.urlencode(query)}", "POST", headers=headers, data=data)
|
|
|
|
try:
|
|
resp = json.loads(response.read())
|
|
except JSONDecodeError as e:
|
|
log(f"{__class__.__name__}: Parsing response error: {e}")
|
|
else:
|
|
return resp
|
|
|
|
@staticmethod
|
|
def _execute(url, method=None, headers=None, data=None, timeout=_TIMEOUT):
|
|
base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
|
|
if headers:
|
|
base_headers.update(headers)
|
|
if data:
|
|
# Encoding data for request.
|
|
if not isinstance(data, bytes):
|
|
data = bytes(json.dumps(data), encoding="utf-8")
|
|
return urlopen(Request(url, headers=base_headers, method=method, data=data), timeout=timeout)
|
|
|
|
|
|
class PlayListParser(HTMLParser):
|
|
""" Very simple parser to handle YouTube playlist pages. """
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._is_header = False
|
|
self._header = ""
|
|
self._playlist = []
|
|
self._is_script = False
|
|
self._scr_start = ('var ytInitialData = ', 'window["ytInitialData"] = ')
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag == "script":
|
|
self._is_script = True
|
|
|
|
def handle_data(self, data):
|
|
if self._is_script:
|
|
data = data.lstrip()
|
|
if data.startswith(self._scr_start):
|
|
data = data.split(";")[0]
|
|
for s in self._scr_start:
|
|
data = data.lstrip(s)
|
|
|
|
try:
|
|
resp = json.loads(data)
|
|
except JSONDecodeError as e:
|
|
log(f"{__class__.__name__}: Parsing data error: {e}")
|
|
else:
|
|
sb = resp.get("sidebar", None)
|
|
if sb:
|
|
for t in [t["runs"][0] for t in flat("title", sb) if "runs" in t]:
|
|
txt = t.get("text", None)
|
|
if txt:
|
|
self._header = txt
|
|
break
|
|
|
|
ct = resp.get("contents", None)
|
|
if ct:
|
|
for d in [(d.get("title", {}).get("runs", [{}])[0].get("text", ""),
|
|
d.get("videoId", "")) for d in flat("playlistVideoRenderer", ct)]:
|
|
self._playlist.append(d)
|
|
self._is_script = False
|
|
|
|
def error(self, message):
|
|
log(f"{__class__.__name__} Parsing error: {message}")
|
|
|
|
@property
|
|
def header(self):
|
|
return self._header
|
|
|
|
@property
|
|
def playlist(self):
|
|
return self._playlist
|
|
|
|
@staticmethod
|
|
def get_yt_playlist(play_list_id):
|
|
""" Getting YouTube playlist by id.
|
|
|
|
returns tuple from the playlist header and list of tuples (title, video id)
|
|
"""
|
|
request = Request(f"https://www.youtube.com/playlist?list={play_list_id}&hl=en", headers=_HEADERS)
|
|
|
|
with urlopen(request, timeout=_TIMEOUT) as resp:
|
|
data = gzip.decompress(resp.read()).decode("utf-8")
|
|
parser = PlayListParser()
|
|
parser.feed(data)
|
|
return parser.header, parser.playlist
|
|
|
|
|
|
class YouTubeDL:
|
|
""" Utility class [experimental] for working with youtube-dl.
|
|
|
|
[https://github.com/ytdl-org/youtube-dl]
|
|
"""
|
|
|
|
_DL_INSTANCE = None
|
|
_DownloadError = None
|
|
_LATEST_RELEASE_URL = "https://api.github.com/repos/ytdl-org/youtube-dl/releases/latest"
|
|
_OPTIONS = {"noplaylist": True, # Single video instead of a playlist [ignoring playlist in URL].
|
|
"extract_flat": False, # Do not resolve URLs, return the immediate result.
|
|
"quiet": True, # Do not print messages to stdout.
|
|
"simulate": True, # Do not download the video files.
|
|
"cookiefile": "cookies.txt"} # File name where cookies should be read from and dumped to.
|
|
|
|
def __init__(self, settings, callback):
|
|
self._path = f"{settings.default_data_path}tools{SEP}"
|
|
self._update = settings.enable_yt_dl_update
|
|
self._supported = {"22", "18"}
|
|
self._dl = None
|
|
self._callback = callback
|
|
self._download_exception = None
|
|
self._is_update_process = False
|
|
|
|
self.init()
|
|
|
|
@classmethod
|
|
def get_instance(cls, settings, callback=print):
|
|
if not cls._DL_INSTANCE:
|
|
cls._DL_INSTANCE = YouTubeDL(settings, callback)
|
|
return cls._DL_INSTANCE
|
|
|
|
def init(self):
|
|
if not os.path.isfile(f"{self._path}youtube_dl{SEP}version.py"):
|
|
self.get_latest_release()
|
|
|
|
if self._path not in sys.path:
|
|
sys.path.append(self._path)
|
|
|
|
self.init_dl()
|
|
|
|
def init_dl(self):
|
|
try:
|
|
import youtube_dl
|
|
except ModuleNotFoundError as e:
|
|
log(f"YouTubeDLHelper error: {e}")
|
|
raise YouTubeException(e)
|
|
except ImportError as e:
|
|
log(f"YouTubeDLHelper error: {e}")
|
|
else:
|
|
if self._path not in youtube_dl.__file__:
|
|
msg = "Another version of youtube-dl was found on your system!"
|
|
log(msg)
|
|
raise YouTubeException(msg)
|
|
|
|
if self._update:
|
|
if hasattr(youtube_dl.version, "__version__"):
|
|
l_ver = self.get_last_release_id()
|
|
cur_ver = youtube_dl.version.__version__
|
|
if l_ver and youtube_dl.version.__version__ < l_ver:
|
|
msg = f"youtube-dl has new release!\nCurrent: {cur_ver}. Last: {l_ver}."
|
|
show_notification(msg)
|
|
log(msg)
|
|
self._callback(msg, False)
|
|
self.get_latest_release()
|
|
|
|
self._DownloadError = youtube_dl.utils.DownloadError
|
|
self._dl = youtube_dl.YoutubeDL(self._OPTIONS)
|
|
msg = "youtube-dl initialized..."
|
|
show_notification(msg)
|
|
log(msg)
|
|
|
|
@staticmethod
|
|
def get_last_release_id():
|
|
""" Getting last release id. """
|
|
url = "https://api.github.com/repos/ytdl-org/youtube-dl/releases/latest"
|
|
try:
|
|
with urlopen(url, timeout=10) as resp:
|
|
return json.loads(resp.read().decode("utf-8")).get("tag_name", "0")
|
|
except URLError as e:
|
|
log(f"YouTubeDLHelper error [get last release id]: {e}")
|
|
|
|
def get_latest_release(self):
|
|
try:
|
|
self._is_update_process = True
|
|
log("Getting the last youtube-dl release...")
|
|
|
|
with urlopen(YouTubeDL._LATEST_RELEASE_URL, timeout=10) as resp:
|
|
r = json.loads(resp.read().decode("utf-8"))
|
|
zip_url = r.get("zipball_url", None)
|
|
if zip_url:
|
|
if os.path.isdir(self._path):
|
|
shutil.rmtree(self._path)
|
|
|
|
zip_file = self._path + "yt.zip"
|
|
os.makedirs(os.path.dirname(self._path), exist_ok=True)
|
|
f_name, headers = urlretrieve(zip_url, filename=zip_file)
|
|
|
|
import zipfile
|
|
|
|
with zipfile.ZipFile(f_name) as arch:
|
|
for info in arch.infolist():
|
|
pref, sep, f = info.filename.partition("/youtube_dl/")
|
|
if sep:
|
|
arch.extract(info.filename)
|
|
shutil.move(info.filename, f"{self._path}{sep}{f}")
|
|
shutil.rmtree(pref)
|
|
msg = "Getting the last youtube-dl release is done!"
|
|
show_notification(msg)
|
|
log(msg)
|
|
self._callback(msg, False)
|
|
|
|
if os.path.isfile(zip_file):
|
|
os.remove(zip_file)
|
|
return True
|
|
except URLError as e:
|
|
log(f"YouTubeDLHelper error: {e}")
|
|
raise YouTubeException(e)
|
|
finally:
|
|
self._is_update_process = False
|
|
|
|
def get_yt_link(self, url, skip_errors=False):
|
|
""" Returns tuple from the video links [dict] and title. """
|
|
if self._is_update_process:
|
|
self._callback("Update process. Please wait.", False)
|
|
return {}, ""
|
|
|
|
info = self.get_info(url, skip_errors)
|
|
fmts = info.get("formats", None)
|
|
if fmts:
|
|
return {Quality.get(int(fm["format_id"])): fm.get("url", "") for fm in fmts if
|
|
fm.get("format_id", "") in self._supported}, info.get("title", "")
|
|
|
|
return {}, info.get("title", "")
|
|
|
|
def get_info(self, url, skip_errors=False):
|
|
try:
|
|
return self._dl.extract_info(url, download=False)
|
|
except URLError as e:
|
|
log(f"YouTubeDLHelper error [get info]: {e}")
|
|
raise YouTubeException(e)
|
|
except self._DownloadError as e:
|
|
log(f"YouTubeDLHelper error [get info]: {e}")
|
|
if not skip_errors:
|
|
raise YouTubeException(e)
|
|
|
|
def update_options(self, options):
|
|
self._dl.params.update(options)
|
|
|
|
@property
|
|
def options(self):
|
|
return self._dl.params
|
|
|
|
|
|
def flat(key, d):
|
|
for k, v in d.items():
|
|
if k == key:
|
|
yield v
|
|
elif isinstance(v, dict):
|
|
yield from flat(key, v)
|
|
elif isinstance(v, list):
|
|
for el in v:
|
|
if isinstance(el, dict):
|
|
yield from flat(key, el)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pass
|