[ie] Add extractor impersonate API (#9474)

Authored by: bashonly, Grub4K, pukkandan
This commit is contained in:
bashonly 2024-03-30 18:18:07 -05:00 committed by GitHub
parent 0df63cce69
commit 50c2935231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 11 deletions

View File

@ -37,6 +37,7 @@
IncompleteRead,
network_exceptions,
)
from ..networking.impersonate import ImpersonateTarget
from ..utils import (
IDENTITY,
JSON_LD_RE,
@ -818,7 +819,7 @@ def __can_accept_status_code(err, expected_status):
else:
return err.status in variadic(expected_status)
def _create_request(self, url_or_request, data=None, headers=None, query=None):
def _create_request(self, url_or_request, data=None, headers=None, query=None, extensions=None):
if isinstance(url_or_request, urllib.request.Request):
self._downloader.deprecation_warning(
'Passing a urllib.request.Request to _create_request() is deprecated. '
@ -827,10 +828,11 @@ def _create_request(self, url_or_request, data=None, headers=None, query=None):
elif not isinstance(url_or_request, Request):
url_or_request = Request(url_or_request)
url_or_request.update(data=data, headers=headers, query=query)
url_or_request.update(data=data, headers=headers, query=query, extensions=extensions)
return url_or_request
def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None, headers=None, query=None, expected_status=None):
def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None,
headers=None, query=None, expected_status=None, impersonate=None, require_impersonation=False):
"""
Return the response handle.
@ -861,8 +863,31 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa
headers = (headers or {}).copy()
headers.setdefault('X-Forwarded-For', self._x_forwarded_for_ip)
extensions = {}
if impersonate in (True, ''):
impersonate = ImpersonateTarget()
requested_targets = [
t if isinstance(t, ImpersonateTarget) else ImpersonateTarget.from_str(t)
for t in variadic(impersonate)
] if impersonate else []
available_target = next(filter(self._downloader._impersonate_target_available, requested_targets), None)
if available_target:
extensions['impersonate'] = available_target
elif requested_targets:
message = 'The extractor is attempting impersonation, but '
message += (
'no impersonate target is available' if not str(impersonate)
else f'none of these impersonate targets are available: "{", ".join(map(str, requested_targets))}"')
info_msg = ('see https://github.com/yt-dlp/yt-dlp#impersonation '
'for information on installing the required dependencies')
if require_impersonation:
raise ExtractorError(f'{message}; {info_msg}', expected=True)
self.report_warning(f'{message}; if you encounter errors, then {info_msg}', only_once=True)
try:
return self._downloader.urlopen(self._create_request(url_or_request, data, headers, query))
return self._downloader.urlopen(self._create_request(url_or_request, data, headers, query, extensions))
except network_exceptions as err:
if isinstance(err, HTTPError):
if self.__can_accept_status_code(err, expected_status):
@ -881,13 +906,14 @@ def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fa
return False
def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True,
encoding=None, data=None, headers={}, query={}, expected_status=None):
encoding=None, data=None, headers={}, query={}, expected_status=None,
impersonate=None, require_impersonation=False):
"""
Return a tuple (page content as string, URL handle).
Arguments:
url_or_request -- plain text URL as a string or
a urllib.request.Request object
a yt_dlp.networking.Request object
video_id -- Video/playlist/item identifier (string)
Keyword arguments:
@ -912,13 +938,22 @@ def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=
returning True if it should be accepted
Note that this argument does not affect success status codes (2xx)
which are always accepted.
impersonate -- the impersonate target. Can be any of the following entities:
- an instance of yt_dlp.networking.impersonate.ImpersonateTarget
- a string in the format of CLIENT[:OS]
- a list or a tuple of CLIENT[:OS] strings or ImpersonateTarget instances
- a boolean value; True means any impersonate target is sufficient
require_impersonation -- flag to toggle whether the request should raise an error
if impersonation is not possible (bool, default: False)
"""
# Strip hashes from the URL (#1038)
if isinstance(url_or_request, str):
url_or_request = url_or_request.partition('#')[0]
urlh = self._request_webpage(url_or_request, video_id, note, errnote, fatal, data=data, headers=headers, query=query, expected_status=expected_status)
urlh = self._request_webpage(url_or_request, video_id, note, errnote, fatal, data=data,
headers=headers, query=query, expected_status=expected_status,
impersonate=impersonate, require_impersonation=require_impersonation)
if urlh is False:
assert not fatal
return False
@ -1047,17 +1082,20 @@ def parse(ie, content, *args, errnote=errnote, **kwargs):
return getattr(ie, parser)(content, *args, **kwargs)
def download_handle(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None,
impersonate=None, require_impersonation=False):
res = self._download_webpage_handle(
url_or_request, video_id, note=note, errnote=errnote, fatal=fatal, encoding=encoding,
data=data, headers=headers, query=query, expected_status=expected_status)
data=data, headers=headers, query=query, expected_status=expected_status,
impersonate=impersonate, require_impersonation=require_impersonation)
if res is False:
return res
content, urlh = res
return parse(self, content, video_id, transform_source=transform_source, fatal=fatal, errnote=errnote), urlh
def download_content(self, url_or_request, video_id, note=note, errnote=errnote, transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None,
impersonate=None, require_impersonation=False):
if self.get_param('load_pages'):
url_or_request = self._create_request(url_or_request, data, headers, query)
filename = self._request_dump_filename(url_or_request.url, video_id)
@ -1080,6 +1118,8 @@ def download_content(self, url_or_request, video_id, note=note, errnote=errnote,
'headers': headers,
'query': query,
'expected_status': expected_status,
'impersonate': impersonate,
'require_impersonation': require_impersonation,
}
if parser is None:
kwargs.pop('transform_source')

View File

@ -463,9 +463,10 @@ def headers(self, new_headers: Mapping):
else:
raise TypeError('headers must be a mapping')
def update(self, url=None, data=None, headers=None, query=None):
def update(self, url=None, data=None, headers=None, query=None, extensions=None):
self.data = data if data is not None else self.data
self.headers.update(headers or {})
self.extensions.update(extensions or {})
self.url = update_url_query(url or self.url, query or {})
def copy(self):