217 lines
8.2 KiB
Python
Raw Normal View History

import functools
import json
import os
import urllib.error
from ..utils import (
PostProcessingError,
RetryManager,
_configuration_args,
deprecation_warning,
encodeFilename,
network_exceptions,
sanitized_Request,
)
class PostProcessorMetaClass(type):
@staticmethod
def run_wrapper(func):
@functools.wraps(func)
def run(self, info, *args, **kwargs):
info_copy = self._copy_infodict(info)
self._hook_progress({'status': 'started'}, info_copy)
ret = func(self, info, *args, **kwargs)
if ret is not None:
_, info = ret
self._hook_progress({'status': 'finished'}, info_copy)
return ret
return run
def __new__(cls, name, bases, attrs):
if 'run' in attrs:
attrs['run'] = cls.run_wrapper(attrs['run'])
return type.__new__(cls, name, bases, attrs)
class PostProcessor(metaclass=PostProcessorMetaClass):
"""Post Processor class.
PostProcessor objects can be added to downloaders with their
add_post_processor() method. When the downloader has finished a
successful download, it will take its internal chain of PostProcessors
and start calling the run() method on each one of them, first with
an initial argument and then with the returned value of the previous
PostProcessor.
PostProcessor objects follow a "mutual registration" process similar
to InfoExtractor objects.
Optionally PostProcessor can use a list of additional command-line arguments
with self._configuration_args.
"""
_downloader = None
def __init__(self, downloader=None):
self._progress_hooks = []
self.add_progress_hook(self.report_progress)
self.set_downloader(downloader)
self.PP_NAME = self.pp_key()
@classmethod
def pp_key(cls):
name = cls.__name__[:-2]
return name[6:] if name[:6].lower() == 'ffmpeg' else name
def to_screen(self, text, prefix=True, *args, **kwargs):
if self._downloader:
tag = '[%s] ' % self.PP_NAME if prefix else ''
return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)
def report_warning(self, text, *args, **kwargs):
if self._downloader:
return self._downloader.report_warning(text, *args, **kwargs)
def deprecation_warning(self, msg):
warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
return warn(msg, stacklevel=1)
def deprecated_feature(self, msg):
2021-11-29 23:16:06 +05:30
if self._downloader:
return self._downloader.deprecated_feature(msg)
return deprecation_warning(msg, stacklevel=1)
2021-11-29 23:16:06 +05:30
def report_error(self, text, *args, **kwargs):
self.deprecation_warning('"yt_dlp.postprocessor.PostProcessor.report_error" is deprecated. '
'raise "yt_dlp.utils.PostProcessingError" instead')
if self._downloader:
return self._downloader.report_error(text, *args, **kwargs)
2021-05-14 13:15:29 +05:30
def write_debug(self, text, *args, **kwargs):
if self._downloader:
return self._downloader.write_debug(text, *args, **kwargs)
2022-05-01 04:58:26 +05:30
def _delete_downloaded_files(self, *files_to_delete, **kwargs):
if self._downloader:
return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
for filename in set(filter(None, files_to_delete)):
os.remove(filename)
2022-05-01 04:58:26 +05:30
def get_param(self, name, default=None, *args, **kwargs):
if self._downloader:
return self._downloader.params.get(name, default, *args, **kwargs)
return default
def set_downloader(self, downloader):
"""Sets the downloader for this PP."""
self._downloader = downloader
for ph in getattr(downloader, '_postprocessor_hooks', []):
self.add_progress_hook(ph)
def _copy_infodict(self, info_dict):
return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
@staticmethod
2022-02-18 23:16:16 +05:30
def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
allowed = {'video': video, 'audio': audio, 'images': images}
def decorator(func):
@functools.wraps(func)
def wrapper(self, info):
2022-02-18 23:16:16 +05:30
if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
return [], info
format_type = (
'video' if info.get('vcodec') != 'none'
else 'audio' if info.get('acodec') != 'none'
else 'images')
if allowed[format_type]:
return func(self, info)
else:
self.to_screen('Skipping %s' % format_type)
return [], info
return wrapper
return decorator
def run(self, information):
"""Run the PostProcessor.
The "information" argument is a dictionary like the ones
composed by InfoExtractors. The only difference is that this
one has an extra field called "filepath" that points to the
downloaded file.
This method returns a tuple, the first element is a list of the files
that can be deleted, and the second of which is the updated
information.
In addition, this method may raise a PostProcessingError
exception if post processing fails.
"""
return [], information # by default, keep file and do nothing
def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
try:
os.utime(encodeFilename(path), (atime, mtime))
except Exception:
self.report_warning(errnote)
def _configuration_args(self, exe, *args, **kwargs):
return _configuration_args(
self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)
def _hook_progress(self, status, info_dict):
if not self._progress_hooks:
return
status.update({
'info_dict': info_dict,
'postprocessor': self.pp_key(),
})
for ph in self._progress_hooks:
ph(status)
def add_progress_hook(self, ph):
# See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
self._progress_hooks.append(ph)
def report_progress(self, s):
s['_default_template'] = '%(postprocessor)s %(status)s' % s
if not self._downloader:
return
progress_dict = s.copy()
progress_dict.pop('info_dict')
progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
progress_template = self.get_param('progress_template', {})
tmpl = progress_template.get('postprocess')
if tmpl:
self._downloader.to_screen(
self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False)
self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict))
def _retry_download(self, err, count, retries):
# While this is not an extractor, it behaves similar to one and
# so obey extractor_retries and "--retry-sleep extractor"
RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
def _download_json(self, url, *, expected_http_errors=(404,)):
self.write_debug(f'{self.PP_NAME} query: {url}')
for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
try:
rsp = self._downloader.urlopen(sanitized_Request(url))
except network_exceptions as e:
if isinstance(e, urllib.error.HTTPError) and e.code in expected_http_errors:
return None
retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
continue
return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
2022-06-06 21:49:57 +05:30
class AudioConversionError(PostProcessingError): # Deprecated
pass