Compare commits

...

21 Commits

Author SHA1 Message Date
DEVENU
de513f86d9
Merge ac410f1354 into fbc66e3ab3 2024-10-17 05:57:29 +02:00
bashonly
fbc66e3ab3
[utils] Popen: Reset PyInstaller environment (#11258)
- Forces spawning independent subprocesses for exes bundled with PyInstaller>=6.10
- Fixes regression introduced in fb8b7f226d
- Ref: https://pyinstaller.org/en/v6.10.0/CHANGES.html#incompatible-changes

Closes #11259
Authored by: bashonly, Grub4K

Co-authored-by: Simon Sawicki <contact@grub4k.xyz>
2024-10-16 03:53:53 +00:00
bashonly
64d84d75ca
[build] Use macos-13 image for macOS builds (#11236)
Authored by: bashonly
2024-10-15 07:07:42 +00:00
bashonly
dcfeea4dd5
[ie/adobepass] Use newer user-agent for provider redirect request (#11250)
Closes #10848
Authored by: bashonly
2024-10-14 22:19:26 +00:00
Mozi
cba7868502
[ie/reddit] Detect and raise when login is required (#11202)
Closes #10924
Authored by: pzhlkj6612
2024-10-13 06:27:01 +00:00
Simon Sawicki
d710a6ca7c
Add extractor helpers (#10653)
Authored by: Grub4K
2024-10-13 05:14:32 +02:00
Simon Sawicki
85b87c991a
[utils] sanitize_path: Reimplement function (#11198)
Authored by: Grub4K
2024-10-13 04:10:12 +02:00
Simon Sawicki
16eb28026a
[test] Allow running tests explicitly (#11203)
Authored by: Grub4K
2024-10-13 04:01:26 +02:00
Simon Sawicki
1a830394a2
[build] make_lazy_extractors: Force running without plugins (#11205)
Authored by: Grub4K
2024-10-13 03:50:31 +02:00
Simon Sawicki
edfd095b19
[ie/generic] Impersonate browser by default (#11206)
Also adds `impersonate` extractor arg

Authored by: Grub4K
2024-10-13 03:42:43 +02:00
Simon Sawicki
c5f0f58efd
[cookies] Fix compatibility for Python <=3.9 in traceback
Authored by: Grub4K
2024-10-13 03:38:09 +02:00
DEVENU
ac410f1354
Update amazonminitv.py 2024-08-15 21:27:30 +05:30
DEVENU
b981449466
Merge branch 'yt-dlp:master' into master 2024-08-15 20:51:28 +05:30
DEVENU
ce502038fb
Merge branch 'yt-dlp:master' into master 2024-08-05 16:11:10 +05:30
DEVENU
b3dbc3deec
Update amazonminitv.py 2024-07-23 21:52:09 +05:30
DEVENU
53a8806ced
Update amazonminitv.py 2024-07-23 19:26:09 +05:30
DEVENU
696b1c7dc4
Update amazonminitv.py 2024-07-22 20:15:32 +05:30
DEVENU
ae08e3b3a0
Update amazonminitv.py [code fix] 2024-07-22 20:10:29 +05:30
DEVENU
7f36e69091
Update amazonminitv.py 2024-07-22 19:58:47 +05:30
DEVENU
7814598369
[ie/Amazonminitv] extractor fix 2024-07-21 12:08:56 +05:30
DEVENU
7121179075
[ie/Amazonminitv] extractor fix 2024-07-21 12:08:10 +05:30
16 changed files with 406 additions and 260 deletions

View File

@ -240,7 +240,7 @@ jobs:
permissions: permissions:
contents: read contents: read
actions: write # For cleaning up cache actions: write # For cleaning up cache
runs-on: macos-12 runs-on: macos-13
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -346,7 +346,7 @@ jobs:
macos_legacy: macos_legacy:
needs: process needs: process
if: inputs.macos_legacy if: inputs.macos_legacy
runs-on: macos-12 runs-on: macos-13
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@ -278,7 +278,7 @@ ### Related scripts
* **`devscripts/update-version.py`** - Update the version number based on the current date. * **`devscripts/update-version.py`** - Update the version number based on the current date.
* **`devscripts/set-variant.py`** - Set the build variant of the executable. * **`devscripts/set-variant.py`** - Set the build variant of the executable.
* **`devscripts/make_changelog.py`** - Create a markdown changelog using short commit messages and update `CONTRIBUTORS` file. * **`devscripts/make_changelog.py`** - Create a markdown changelog using short commit messages and update `CONTRIBUTORS` file.
* **`devscripts/make_lazy_extractors.py`** - Create lazy extractors. Running this before building the binaries (any variant) will improve their startup performance. Set the environment variable `YTDLP_NO_LAZY_EXTRACTORS=1` if you wish to forcefully disable lazy extractor loading. * **`devscripts/make_lazy_extractors.py`** - Create lazy extractors. Running this before building the binaries (any variant) will improve their startup performance. Set the environment variable `YTDLP_NO_LAZY_EXTRACTORS` to something nonempty to forcefully disable lazy extractor loading.
Note: See their `--help` for more info. Note: See their `--help` for more info.
@ -1795,6 +1795,7 @@ #### generic
* `key_query`: Passthrough the master m3u8 URL query to its HLS AES-128 decryption key URI if no value is provided, or else apply the query string given as `key_query=VALUE`. Note that this will have no effect if the key URI is provided via the `hls_key` extractor-arg. Does not apply to ffmpeg * `key_query`: Passthrough the master m3u8 URL query to its HLS AES-128 decryption key URI if no value is provided, or else apply the query string given as `key_query=VALUE`. Note that this will have no effect if the key URI is provided via the `hls_key` extractor-arg. Does not apply to ffmpeg
* `hls_key`: An HLS AES-128 key URI *or* key (as hex), and optionally the IV (as hex), in the form of `(URI|KEY)[,IV]`; e.g. `generic:hls_key=ABCDEF1234567980,0xFEDCBA0987654321`. Passing any of these values will force usage of the native HLS downloader and override the corresponding values found in the m3u8 playlist * `hls_key`: An HLS AES-128 key URI *or* key (as hex), and optionally the IV (as hex), in the form of `(URI|KEY)[,IV]`; e.g. `generic:hls_key=ABCDEF1234567980,0xFEDCBA0987654321`. Passing any of these values will force usage of the native HLS downloader and override the corresponding values found in the m3u8 playlist
* `is_live`: Bypass live HLS detection and manually set `live_status` - a value of `false` will set `not_live`, any other value (or no value) will set `is_live` * `is_live`: Bypass live HLS detection and manually set `live_status` - a value of `false` will set `not_live`, any other value (or no value) will set `is_live`
* `impersonate`: Target(s) to try and impersonate with the initial webpage request; e.g. `safari,chrome-110`. By default any available target will be used. Use `false` to disable impersonation
#### funimation #### funimation
* `language`: Audio languages to extract, e.g. `funimation:language=english,japanese` * `language`: Audio languages to extract, e.g. `funimation:language=english,japanese`
@ -1897,6 +1898,7 @@ # PLUGINS
myplugin.py myplugin.py
yt-dlp looks for these `yt_dlp_plugins` namespace folders in many locations (see below) and loads in plugins from **all** of them. yt-dlp looks for these `yt_dlp_plugins` namespace folders in many locations (see below) and loads in plugins from **all** of them.
Set the environment variable `YTDLP_NO_PLUGINS` to something nonempty to disable loading plugins entirely.
See the [wiki for some known plugins](https://github.com/yt-dlp/yt-dlp/wiki/Plugins) See the [wiki for some known plugins](https://github.com/yt-dlp/yt-dlp/wiki/Plugins)

View File

@ -2,7 +2,6 @@
# Allow direct execution # Allow direct execution
import os import os
import shutil
import sys import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -34,18 +33,14 @@ class {name}({bases}):
def main(): def main():
os.environ['YTDLP_NO_PLUGINS'] = 'true'
os.environ['YTDLP_NO_LAZY_EXTRACTORS'] = 'true'
lazy_extractors_filename = get_filename_args(default_outfile='yt_dlp/extractor/lazy_extractors.py') lazy_extractors_filename = get_filename_args(default_outfile='yt_dlp/extractor/lazy_extractors.py')
if os.path.exists(lazy_extractors_filename):
os.remove(lazy_extractors_filename)
_ALL_CLASSES = get_all_ies() # Must be before import from yt_dlp.extractor.extractors import _ALL_CLASSES
import yt_dlp.plugins
from yt_dlp.extractor.common import InfoExtractor, SearchInfoExtractor from yt_dlp.extractor.common import InfoExtractor, SearchInfoExtractor
# Filter out plugins
_ALL_CLASSES = [cls for cls in _ALL_CLASSES if not cls.__module__.startswith(f'{yt_dlp.plugins.PACKAGE_NAME}.')]
DummyInfoExtractor = type('InfoExtractor', (InfoExtractor,), {'IE_NAME': NO_ATTR}) DummyInfoExtractor = type('InfoExtractor', (InfoExtractor,), {'IE_NAME': NO_ATTR})
module_src = '\n'.join(( module_src = '\n'.join((
MODULE_TEMPLATE, MODULE_TEMPLATE,
@ -58,20 +53,6 @@ def main():
write_file(lazy_extractors_filename, f'{module_src}\n') write_file(lazy_extractors_filename, f'{module_src}\n')
def get_all_ies():
PLUGINS_DIRNAME = 'ytdlp_plugins'
BLOCKED_DIRNAME = f'{PLUGINS_DIRNAME}_blocked'
if os.path.exists(PLUGINS_DIRNAME):
# os.rename cannot be used, e.g. in Docker. See https://github.com/yt-dlp/yt-dlp/pull/4958
shutil.move(PLUGINS_DIRNAME, BLOCKED_DIRNAME)
try:
from yt_dlp.extractor.extractors import _ALL_CLASSES
finally:
if os.path.exists(BLOCKED_DIRNAME):
shutil.move(BLOCKED_DIRNAME, PLUGINS_DIRNAME)
return _ALL_CLASSES
def extra_ie_code(ie, base=None): def extra_ie_code(ie, base=None):
for var in STATIC_CLASS_PROPERTIES: for var in STATIC_CLASS_PROPERTIES:
val = getattr(ie, var) val = getattr(ie, var)

View File

@ -16,7 +16,7 @@
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='Run selected yt-dlp tests') parser = argparse.ArgumentParser(description='Run selected yt-dlp tests')
parser.add_argument( parser.add_argument(
'test', help='a extractor tests, or one of "core" or "download"', nargs='*') 'test', help='an extractor test, test path, or one of "core" or "download"', nargs='*')
parser.add_argument( parser.add_argument(
'-k', help='run a test matching EXPRESSION. Same as "pytest -k"', metavar='EXPRESSION') '-k', help='run a test matching EXPRESSION. Same as "pytest -k"', metavar='EXPRESSION')
parser.add_argument( parser.add_argument(
@ -27,7 +27,6 @@ def parse_args():
def run_tests(*tests, pattern=None, ci=False): def run_tests(*tests, pattern=None, ci=False):
run_core = 'core' in tests or (not pattern and not tests) run_core = 'core' in tests or (not pattern and not tests)
run_download = 'download' in tests run_download = 'download' in tests
tests = list(map(fix_test_name, tests))
pytest_args = args.pytest_args or os.getenv('HATCH_TEST_ARGS', '') pytest_args = args.pytest_args or os.getenv('HATCH_TEST_ARGS', '')
arguments = ['pytest', '-Werror', '--tb=short', *shlex.split(pytest_args)] arguments = ['pytest', '-Werror', '--tb=short', *shlex.split(pytest_args)]
@ -41,7 +40,9 @@ def run_tests(*tests, pattern=None, ci=False):
arguments.extend(['-m', 'download']) arguments.extend(['-m', 'download'])
else: else:
arguments.extend( arguments.extend(
f'test/test_download.py::TestDownload::test_{test}' for test in tests) test if '/' in test
else f'test/test_download.py::TestDownload::test_{fix_test_name(test)}'
for test in tests)
print(f'Running {arguments}', flush=True) print(f'Running {arguments}', flush=True)
try: try:

View File

@ -4,8 +4,18 @@
import pytest import pytest
from yt_dlp.utils import dict_get, int_or_none, str_or_none from yt_dlp.utils import (
from yt_dlp.utils.traversal import traverse_obj ExtractorError,
determine_ext,
dict_get,
int_or_none,
str_or_none,
)
from yt_dlp.utils.traversal import (
traverse_obj,
require,
subs_list_to_dict,
)
_TEST_DATA = { _TEST_DATA = {
100: 100, 100: 100,
@ -420,6 +430,71 @@ def test_traversal_morsel(self):
assert traverse_obj(morsel, [(None,), any]) == morsel, \ assert traverse_obj(morsel, [(None,), any]) == morsel, \
'Morsel should not be implicitly changed to dict on usage' 'Morsel should not be implicitly changed to dict on usage'
def test_traversal_filter(self):
data = [None, False, True, 0, 1, 0.0, 1.1, '', 'str', {}, {0: 0}, [], [1]]
assert traverse_obj(data, [..., filter]) == [True, 1, 1.1, 'str', {0: 0}, [1]], \
'`filter` should filter falsy values'
class TestTraversalHelpers:
def test_traversal_require(self):
with pytest.raises(ExtractorError):
traverse_obj(_TEST_DATA, ['None', {require('value')}])
assert traverse_obj(_TEST_DATA, ['str', {require('value')}]) == 'str', \
'`require` should pass through non `None` values'
def test_subs_list_to_dict(self):
assert traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.vtt'},
{'name': 'en', 'url': 'https://example.com/subs/en1.ass'},
{'name': 'en', 'url': 'https://example.com/subs/en2.ass'},
], [..., {
'id': 'name',
'url': 'url',
}, all, {subs_list_to_dict}]) == {
'de': [{'url': 'https://example.com/subs/de.vtt'}],
'en': [
{'url': 'https://example.com/subs/en1.ass'},
{'url': 'https://example.com/subs/en2.ass'},
],
}, 'function should build subtitle dict from list of subtitles'
assert traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.ass'},
{'name': 'de'},
{'name': 'en', 'content': 'content'},
{'url': 'https://example.com/subs/en'},
], [..., {
'id': 'name',
'data': 'content',
'url': 'url',
}, all, {subs_list_to_dict}]) == {
'de': [{'url': 'https://example.com/subs/de.ass'}],
'en': [{'data': 'content'}],
}, 'subs with mandatory items missing should be filtered'
assert traverse_obj([
{'url': 'https://example.com/subs/de.ass', 'name': 'de'},
{'url': 'https://example.com/subs/en', 'name': 'en'},
], [..., {
'id': 'name',
'ext': ['url', {lambda x: determine_ext(x, default_ext=None)}],
'url': 'url',
}, all, {subs_list_to_dict(ext='ext')}]) == {
'de': [{'url': 'https://example.com/subs/de.ass', 'ext': 'ass'}],
'en': [{'url': 'https://example.com/subs/en', 'ext': 'ext'}],
}, '`ext` should set default ext but leave existing value untouched'
assert traverse_obj([
{'name': 'en', 'url': 'https://example.com/subs/en2', 'prio': True},
{'name': 'en', 'url': 'https://example.com/subs/en1', 'prio': False},
], [..., {
'id': 'name',
'quality': ['prio', {int}],
'url': 'url',
}, all, {subs_list_to_dict(ext='ext')}]) == {'en': [
{'url': 'https://example.com/subs/en1', 'ext': 'ext'},
{'url': 'https://example.com/subs/en2', 'ext': 'ext'},
]}, '`quality` key should sort subtitle list accordingly'
class TestDictGet: class TestDictGet:
def test_dict_get(self): def test_dict_get(self):

View File

@ -221,9 +221,10 @@ def test_sanitize_ids(self):
self.assertEqual(sanitize_filename('N0Y__7-UOdI', is_id=True), 'N0Y__7-UOdI') self.assertEqual(sanitize_filename('N0Y__7-UOdI', is_id=True), 'N0Y__7-UOdI')
def test_sanitize_path(self): def test_sanitize_path(self):
if sys.platform != 'win32': with unittest.mock.patch('sys.platform', 'win32'):
return self._test_sanitize_path()
def _test_sanitize_path(self):
self.assertEqual(sanitize_path('abc'), 'abc') self.assertEqual(sanitize_path('abc'), 'abc')
self.assertEqual(sanitize_path('abc/def'), 'abc\\def') self.assertEqual(sanitize_path('abc/def'), 'abc\\def')
self.assertEqual(sanitize_path('abc\\def'), 'abc\\def') self.assertEqual(sanitize_path('abc\\def'), 'abc\\def')
@ -256,6 +257,11 @@ def test_sanitize_path(self):
self.assertEqual(sanitize_path('./abc'), 'abc') self.assertEqual(sanitize_path('./abc'), 'abc')
self.assertEqual(sanitize_path('./../abc'), '..\\abc') self.assertEqual(sanitize_path('./../abc'), '..\\abc')
self.assertEqual(sanitize_path('\\abc'), '\\abc')
self.assertEqual(sanitize_path('C:abc'), 'C:abc')
self.assertEqual(sanitize_path('C:abc\\..\\'), 'C:..')
self.assertEqual(sanitize_path('C:\\abc:%(title)s.%(ext)s'), 'C:\\abc#%(title)s.%(ext)s')
def test_sanitize_url(self): def test_sanitize_url(self):
self.assertEqual(sanitize_url('//foo.bar'), 'http://foo.bar') self.assertEqual(sanitize_url('//foo.bar'), 'http://foo.bar')
self.assertEqual(sanitize_url('httpss://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('httpss://foo.bar'), 'https://foo.bar')

View File

@ -4070,6 +4070,10 @@ def get_encoding(stream):
write_debug(f'Proxy map: {self.proxies}') write_debug(f'Proxy map: {self.proxies}')
write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}') write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}')
if os.environ.get('YTDLP_NO_PLUGINS'):
write_debug('Plugins are forcibly disabled')
return
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items(): for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
display_list = ['{}{}'.format( display_list = ['{}{}'.format(
klass.__name__, '' if klass.__name__ == name else f' as {name}') klass.__name__, '' if klass.__name__ == name else f' as {name}')
@ -4120,7 +4124,8 @@ def cookiejar(self):
self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self) self.params.get('cookiefile'), self.params.get('cookiesfrombrowser'), self)
except CookieLoadError as error: except CookieLoadError as error:
cause = error.__context__ cause = error.__context__
self.report_error(str(cause), tb=''.join(traceback.format_exception(cause))) # compat: <=py3.9: `traceback.format_exception` has a different signature
self.report_error(str(cause), tb=''.join(traceback.format_exception(None, cause, cause.__traceback__)))
raise raise
@property @property

View File

@ -118,7 +118,6 @@
from .amazonminitv import ( from .amazonminitv import (
AmazonMiniTVIE, AmazonMiniTVIE,
AmazonMiniTVSeasonIE, AmazonMiniTVSeasonIE,
AmazonMiniTVSeriesIE,
) )
from .amcnetworks import AMCNetworksIE from .amcnetworks import AMCNetworksIE
from .americastestkitchen import ( from .americastestkitchen import (

View File

@ -1355,6 +1355,7 @@
class AdobePassIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor class AdobePassIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
_SERVICE_PROVIDER_TEMPLATE = 'https://sp.auth.adobe.com/adobe-services/%s' _SERVICE_PROVIDER_TEMPLATE = 'https://sp.auth.adobe.com/adobe-services/%s'
_USER_AGENT = 'Mozilla/5.0 (X11; Linux i686; rv:47.0) Gecko/20100101 Firefox/47.0' _USER_AGENT = 'Mozilla/5.0 (X11; Linux i686; rv:47.0) Gecko/20100101 Firefox/47.0'
_MODERN_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; rv:131.0) Gecko/20100101 Firefox/131.0'
_MVPD_CACHE = 'ap-mvpd' _MVPD_CACHE = 'ap-mvpd'
_DOWNLOADING_LOGIN_PAGE = 'Downloading Provider Login Page' _DOWNLOADING_LOGIN_PAGE = 'Downloading Provider Login Page'
@ -1454,7 +1455,11 @@ def extract_redirect_url(html, url=None, fatal=False):
'no_iframe': 'false', 'no_iframe': 'false',
'domain_name': 'adobe.com', 'domain_name': 'adobe.com',
'redirect_url': url, 'redirect_url': url,
}) }, headers={
# yt-dlp's default user-agent is usually too old for Comcast_SSO
# See: https://github.com/yt-dlp/yt-dlp/issues/10848
'User-Agent': self._MODERN_USER_AGENT,
} if mso_id == 'Comcast_SSO' else None)
elif not self._cookies_passed: elif not self._cookies_passed:
raise_mvpd_required() raise_mvpd_required()

View File

@ -1,5 +1,3 @@
import json
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ExtractorError, int_or_none, traverse_obj, try_get from ..utils import ExtractorError, int_or_none, traverse_obj, try_get
@ -9,35 +7,28 @@ def _real_initialize(self):
self._download_webpage( self._download_webpage(
'https://www.amazon.in/minitv', None, 'https://www.amazon.in/minitv', None,
note='Fetching guest session cookies') note='Fetching guest session cookies')
AmazonMiniTVBaseIE.session_id = self._get_cookies('https://www.amazon.in')['session-id'].value AmazonMiniTVBaseIE.urtk = self._get_cookies('https://www.amazon.in')['urtk'].value
def _call_api(self, asin, data=None, note=None): def _call_api(self, asin, data=None, note=None):
device = {'clientId': 'ATVIN', 'deviceLocale': 'en_GB'} query = {
'contentId': asin,
}
if data: if data:
data['variables'].update({ query.update(data)
'contentType': 'VOD',
'sessionIdToken': self.session_id,
**device,
})
resp = self._download_json( resp = self._download_json(
f'https://www.amazon.in/minitv/api/web/{"graphql" if data else "prs"}', 'https://www.amazon.in/minitv-pr/api/web/page/title',
asin, note=note, headers={ asin, note=note, headers={
'Content-Type': 'application/json', 'Content-Type': 'application/json',
'accounttype': 'NEW_GUEST_ACCOUNT',
'currentpageurl': '/', 'currentpageurl': '/',
'currentplatform': 'dWeb', 'currentplatform': 'dWeb',
}, data=json.dumps(data).encode() if data else None, }, data=None,
query=None if data else { query=query)
'deviceType': 'A1WMMUXPCUJL4N',
'contentId': asin,
**device,
})
if resp.get('errors'): if resp.get('errors'):
raise ExtractorError(f'MiniTV said: {resp["errors"][0]["message"]}') raise ExtractorError(f'MiniTV said: {resp["errors"][0]["message"]}')
elif not data: return resp
return resp
return resp['data'][data['operationName']]
class AmazonMiniTVIE(AmazonMiniTVBaseIE): class AmazonMiniTVIE(AmazonMiniTVBaseIE):
@ -89,104 +80,46 @@ class AmazonMiniTVIE(AmazonMiniTVBaseIE):
'only_matching': True, 'only_matching': True,
}] }]
_GRAPHQL_QUERY_CONTENT = '''
query content($sessionIdToken: String!, $deviceLocale: String, $contentId: ID!, $contentType: ContentType!, $clientId: String) {
content(
applicationContextInput: {deviceLocale: $deviceLocale, sessionIdToken: $sessionIdToken, clientId: $clientId}
contentId: $contentId
contentType: $contentType
) {
contentId
name
... on Episode {
contentId
vodType
name
images
description {
synopsis
contentLengthInSeconds
}
publicReleaseDateUTC
audioTracks
seasonId
seriesId
seriesName
seasonNumber
episodeNumber
timecode {
endCreditsTime
}
}
... on MovieContent {
contentId
vodType
name
description {
synopsis
contentLengthInSeconds
}
images
publicReleaseDateUTC
audioTracks
}
}
}'''
def _real_extract(self, url): def _real_extract(self, url):
asin = f'amzn1.dv.gti.{self._match_id(url)}' asin = f'amzn1.dv.gti.{self._match_id(url)}'
prs = self._call_api(asin, note='Downloading playback info') prs = self._call_api(asin, note='Downloading playback info')
playback_info = traverse_obj(prs, ('widgets', 0, 'data', 'playbackAssets', 'manifestData'))
title_info = traverse_obj(prs, ('widgets', 0, 'data', 'contentDetails'))
title_info_ = traverse_obj(prs, ('metaData', 'contentDetails'))
formats, subtitles = [], {} formats, subtitles = [], {}
for type_, asset in prs['playbackAssets'].items(): for mpd in playback_info:
if not traverse_obj(asset, 'manifestUrl'): mpd_fmts, mpd_subs = self._extract_mpd_formats_and_subtitles(
continue mpd['manifestURL'], asin, mpd_id=mpd['codec'], fatal=False)
if type_ == 'hls': formats.extend(mpd_fmts)
m3u8_fmts, m3u8_subs = self._extract_m3u8_formats_and_subtitles( subtitles = self._merge_subtitles(subtitles, mpd_subs)
asset['manifestUrl'], asin, ext='mp4', entry_protocol='m3u8_native',
m3u8_id=type_, fatal=False)
formats.extend(m3u8_fmts)
subtitles = self._merge_subtitles(subtitles, m3u8_subs)
elif type_ == 'dash':
mpd_fmts, mpd_subs = self._extract_mpd_formats_and_subtitles(
asset['manifestUrl'], asin, mpd_id=type_, fatal=False)
formats.extend(mpd_fmts)
subtitles = self._merge_subtitles(subtitles, mpd_subs)
else:
self.report_warning(f'Unknown asset type: {type_}')
title_info = self._call_api( credits_time = try_get(title_info, lambda x: x['skipData']['INTRO']['endTime'])
asin, note='Downloading title info', data={ is_episode = title_info_.get('vodType') == 'EPISODE'
'operationName': 'content',
'variables': {'contentId': asin},
'query': self._GRAPHQL_QUERY_CONTENT,
})
credits_time = try_get(title_info, lambda x: x['timecode']['endCreditsTime'] / 1000)
is_episode = title_info.get('vodType') == 'EPISODE'
return { return {
'id': asin, 'id': asin,
'title': title_info.get('name'), 'title': title_info_.get('name'),
'formats': formats, 'formats': formats,
'subtitles': subtitles, 'subtitles': subtitles,
'language': traverse_obj(title_info, ('audioTracks', 0)), 'language': traverse_obj(title_info, ('audioTracks', 0)),
'thumbnails': [{ 'thumbnails': [{
'id': type_, 'id': 'imageSrc',
'url': url, 'url': title_info_.get('imageSrc'),
} for type_, url in (title_info.get('images') or {}).items()], }] if title_info_.get('imageSrc') else [],
'description': traverse_obj(title_info, ('description', 'synopsis')), 'description': traverse_obj(title_info_, ('synopsis')),
'release_timestamp': int_or_none(try_get(title_info, lambda x: x['publicReleaseDateUTC'] / 1000)), 'release_timestamp': int_or_none(try_get(title_info_, lambda x: x['publicReleaseDateUTC'] / 1000)),
'duration': traverse_obj(title_info, ('description', 'contentLengthInSeconds')), 'duration': traverse_obj(title_info_, ('contentLengthInSeconds')),
'chapters': [{ 'chapters': [{
'start_time': credits_time, 'start_time': credits_time,
'title': 'End Credits', 'title': 'End Credits',
}] if credits_time else [], }] if credits_time else [],
'series': title_info.get('seriesName'), 'series': title_info_.get('seasonName') if is_episode else None,
'series_id': title_info.get('seriesId'), 'series_id': title_info.get('seriesId') if is_episode else None,
'season_number': title_info.get('seasonNumber'), 'season_number': title_info.get('seasonNumber') if is_episode else None,
'season_id': title_info.get('seasonId'), 'season_id': title_info.get('seasonId') if is_episode else None,
'episode': title_info.get('name') if is_episode else None, 'episode': title_info.get('name') if is_episode else None,
'episode_number': title_info.get('episodeNumber'), 'episode_number': title_info.get('episodeNumber') if is_episode else None,
'episode_id': asin if is_episode else None, 'episode_id': asin if is_episode else None,
} }
@ -206,88 +139,17 @@ class AmazonMiniTVSeasonIE(AmazonMiniTVBaseIE):
'only_matching': True, 'only_matching': True,
}] }]
_GRAPHQL_QUERY = '''
query getEpisodes($sessionIdToken: String!, $clientId: String, $episodeOrSeasonId: ID!, $deviceLocale: String) {
getEpisodes(
applicationContextInput: {sessionIdToken: $sessionIdToken, deviceLocale: $deviceLocale, clientId: $clientId}
episodeOrSeasonId: $episodeOrSeasonId
) {
episodes {
... on Episode {
contentId
name
images
seriesName
seasonId
seriesId
seasonNumber
episodeNumber
description {
synopsis
contentLengthInSeconds
}
publicReleaseDateUTC
}
}
}
}
'''
def _entries(self, asin): def _entries(self, asin):
season_info = self._call_api( season_info = self._call_api(
asin, note='Downloading season info', data={ asin, note='Downloading season info',
'operationName': 'getEpisodes', data={'cursor': '8e0cefec-e190-46ba-854d-1f3ca7978b4a:::'},
'variables': {'episodeOrSeasonId': asin}, )
'query': self._GRAPHQL_QUERY,
})
for episode in season_info['episodes']: for season in season_info['widgets'][0]['data']['options']:
yield self.url_result( if season['active']:
f'amazonminitv:{episode["contentId"]}', AmazonMiniTVIE, episode['contentId']) for episode in season['value']['data']['widgets'][0]['data']['widgets']:
yield self.url_result(
def _real_extract(self, url): f'amazonminitv:{episode["data"]["contentId"]}', AmazonMiniTVIE, episode['data']['contentId'])
asin = f'amzn1.dv.gti.{self._match_id(url)}'
return self.playlist_result(self._entries(asin), asin)
class AmazonMiniTVSeriesIE(AmazonMiniTVBaseIE):
IE_NAME = 'amazonminitv:series'
_VALID_URL = r'amazonminitv:series:(?:amzn1\.dv\.gti\.)?(?P<id>[a-f0-9-]+)'
IE_DESC = 'Amazon MiniTV Series, "minitv:series:" prefix'
_TESTS = [{
'url': 'amazonminitv:series:amzn1.dv.gti.56521d46-b040-4fd5-872e-3e70476a04b0',
'playlist_mincount': 3,
'info_dict': {
'id': 'amzn1.dv.gti.56521d46-b040-4fd5-872e-3e70476a04b0',
},
}, {
'url': 'amazonminitv:series:56521d46-b040-4fd5-872e-3e70476a04b0',
'only_matching': True,
}]
_GRAPHQL_QUERY = '''
query getSeasons($sessionIdToken: String!, $deviceLocale: String, $episodeOrSeasonOrSeriesId: ID!, $clientId: String) {
getSeasons(
applicationContextInput: {deviceLocale: $deviceLocale, sessionIdToken: $sessionIdToken, clientId: $clientId}
episodeOrSeasonOrSeriesId: $episodeOrSeasonOrSeriesId
) {
seasons {
seasonId
}
}
}
'''
def _entries(self, asin):
season_info = self._call_api(
asin, note='Downloading series info', data={
'operationName': 'getSeasons',
'variables': {'episodeOrSeasonOrSeriesId': asin},
'query': self._GRAPHQL_QUERY,
})
for season in season_info['seasons']:
yield self.url_result(f'amazonminitv:season:{season["seasonId"]}', AmazonMiniTVSeasonIE, season['seasonId'])
def _real_extract(self, url): def _real_extract(self, url):
asin = f'amzn1.dv.gti.{self._match_id(url)}' asin = f'amzn1.dv.gti.{self._match_id(url)}'

View File

@ -573,13 +573,13 @@ class InfoExtractor:
def _login_hint(self, method=NO_DEFAULT, netrc=None): def _login_hint(self, method=NO_DEFAULT, netrc=None):
password_hint = f'--username and --password, --netrc-cmd, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials' password_hint = f'--username and --password, --netrc-cmd, or --netrc ({netrc or self._NETRC_MACHINE}) to provide account credentials'
cookies_hint = 'See https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp for how to manually pass cookies'
return { return {
None: '', None: '',
'any': f'Use --cookies, --cookies-from-browser, {password_hint}', 'any': f'Use --cookies, --cookies-from-browser, {password_hint}. {cookies_hint}',
'password': f'Use {password_hint}', 'password': f'Use {password_hint}',
'cookies': ( 'cookies': f'Use --cookies-from-browser or --cookies for the authentication. {cookies_hint}',
'Use --cookies-from-browser or --cookies for the authentication. ' 'session_cookies': f'Use --cookies for the authentication (--cookies-from-browser might not work). {cookies_hint}',
'See https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp for how to manually pass cookies'),
}[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies'] }[method if method is not NO_DEFAULT else 'any' if self.supports_login() else 'cookies']
def __init__(self, downloader=None): def __init__(self, downloader=None):

View File

@ -8,6 +8,7 @@
from .commonprotocols import RtmpIE from .commonprotocols import RtmpIE
from .youtube import YoutubeIE from .youtube import YoutubeIE
from ..compat import compat_etree_fromstring from ..compat import compat_etree_fromstring
from ..networking.impersonate import ImpersonateTarget
from ..utils import ( from ..utils import (
KNOWN_EXTENSIONS, KNOWN_EXTENSIONS,
MEDIA_EXTENSIONS, MEDIA_EXTENSIONS,
@ -2373,6 +2374,12 @@ def _real_extract(self, url):
else: else:
video_id = self._generic_id(url) video_id = self._generic_id(url)
# Try to impersonate a web-browser by default if possible
# Skip impersonation if not available to omit the warning
impersonate = self._configuration_arg('impersonate', [''])
if 'false' in impersonate or not self._downloader._impersonate_target_available(ImpersonateTarget()):
impersonate = None
# Some webservers may serve compressed content of rather big size (e.g. gzipped flac) # Some webservers may serve compressed content of rather big size (e.g. gzipped flac)
# making it impossible to download only chunk of the file (yet we need only 512kB to # making it impossible to download only chunk of the file (yet we need only 512kB to
# test whether it's HTML or not). According to yt-dlp default Accept-Encoding # test whether it's HTML or not). According to yt-dlp default Accept-Encoding
@ -2384,7 +2391,7 @@ def _real_extract(self, url):
full_response = self._request_webpage(url, video_id, headers=filter_dict({ full_response = self._request_webpage(url, video_id, headers=filter_dict({
'Accept-Encoding': 'identity', 'Accept-Encoding': 'identity',
'Referer': smuggled_data.get('referer'), 'Referer': smuggled_data.get('referer'),
})) }), impersonate=impersonate)
new_url = full_response.url new_url = full_response.url
if new_url != extract_basic_auth(url)[0]: if new_url != extract_basic_auth(url)[0]:
self.report_following_redirect(new_url) self.report_following_redirect(new_url)

View File

@ -1,3 +1,4 @@
import json
import urllib.parse import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
@ -17,7 +18,7 @@
class RedditIE(InfoExtractor): class RedditIE(InfoExtractor):
_NETRC_MACHINE = 'reddit' _NETRC_MACHINE = 'reddit'
_VALID_URL = r'https?://(?P<host>(?:\w+\.)?reddit(?:media)?\.com)/(?P<slug>(?:(?:r|user)/[^/]+/)?comments/(?P<id>[^/?#&]+))' _VALID_URL = r'https?://(?:\w+\.)?reddit(?:media)?\.com/(?P<slug>(?:(?:r|user)/[^/]+/)?comments/(?P<id>[^/?#&]+))'
_TESTS = [{ _TESTS = [{
'url': 'https://www.reddit.com/r/videos/comments/6rrwyj/that_small_heart_attack/', 'url': 'https://www.reddit.com/r/videos/comments/6rrwyj/that_small_heart_attack/',
'info_dict': { 'info_dict': {
@ -251,15 +252,15 @@ def _get_subtitles(self, video_id):
return {'en': [{'url': caption_url}]} return {'en': [{'url': caption_url}]}
def _real_extract(self, url): def _real_extract(self, url):
host, slug, video_id = self._match_valid_url(url).group('host', 'slug', 'id') slug, video_id = self._match_valid_url(url).group('slug', 'id')
data = self._download_json( try:
f'https://{host}/{slug}/.json', video_id, fatal=False, expected_status=403)
if not data:
fallback_host = 'old.reddit.com' if host != 'old.reddit.com' else 'www.reddit.com'
self.to_screen(f'{host} request failed, retrying with {fallback_host}')
data = self._download_json( data = self._download_json(
f'https://{fallback_host}/{slug}/.json', video_id, expected_status=403) f'https://www.reddit.com/{slug}/.json', video_id, expected_status=403)
except ExtractorError as e:
if isinstance(e.cause, json.JSONDecodeError):
self.raise_login_required('Account authentication is required')
raise
if traverse_obj(data, 'error') == 403: if traverse_obj(data, 'error') == 403:
reason = data.get('reason') reason = data.get('reason')

View File

@ -5,6 +5,7 @@
import importlib.util import importlib.util
import inspect import inspect
import itertools import itertools
import os
import pkgutil import pkgutil
import sys import sys
import traceback import traceback
@ -137,6 +138,8 @@ def load_module(module, module_name, suffix):
def load_plugins(name, suffix): def load_plugins(name, suffix):
classes = {} classes = {}
if os.environ.get('YTDLP_NO_PLUGINS'):
return classes
for finder, module_name, _ in iter_modules(name): for finder, module_name, _ in iter_modules(name):
if any(x.startswith('_') for x in module_name.split('.')): if any(x.startswith('_') for x in module_name.split('.')):

View File

@ -664,31 +664,51 @@ def replace_insane(char):
return result return result
def _sanitize_path_parts(parts):
sanitized_parts = []
for part in parts:
if not part or part == '.':
continue
elif part == '..':
if sanitized_parts and sanitized_parts[-1] != '..':
sanitized_parts.pop()
sanitized_parts.append('..')
continue
# Replace invalid segments with `#`
# - trailing dots and spaces (`asdf...` => `asdf..#`)
# - invalid chars (`<>` => `##`)
sanitized_part = re.sub(r'[/<>:"\|\\?\*]|[\s.]$', '#', part)
sanitized_parts.append(sanitized_part)
return sanitized_parts
def sanitize_path(s, force=False): def sanitize_path(s, force=False):
"""Sanitizes and normalizes path on Windows""" """Sanitizes and normalizes path on Windows"""
# XXX: this handles drive relative paths (c:sth) incorrectly if sys.platform != 'win32':
if sys.platform == 'win32': if not force:
force = False return s
drive_or_unc, _ = os.path.splitdrive(s) root = '/' if s.startswith('/') else ''
elif force: return root + '/'.join(_sanitize_path_parts(s.split('/')))
drive_or_unc = ''
else:
return s
norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep) normed = s.replace('/', '\\')
if drive_or_unc:
norm_path.pop(0) if normed.startswith('\\\\'):
sanitized_path = [ # UNC path (`\\SERVER\SHARE`) or device path (`\\.`, `\\?`)
path_part if path_part in ['.', '..'] else re.sub(r'(?:[/<>:"\|\\?\*]|[\s.]$)', '#', path_part) parts = normed.split('\\')
for path_part in norm_path] root = '\\'.join(parts[:4]) + '\\'
if drive_or_unc: parts = parts[4:]
sanitized_path.insert(0, drive_or_unc + os.path.sep) elif normed[1:2] == ':':
elif force and s and s[0] == os.path.sep: # absolute path or drive relative path
sanitized_path.insert(0, os.path.sep) offset = 3 if normed[2:3] == '\\' else 2
# TODO: Fix behavioral differences <3.12 root = normed[:offset]
# The workaround using `normpath` only superficially passes tests parts = normed[offset:].split('\\')
# Ref: https://github.com/python/cpython/pull/100351 else:
return os.path.normpath(os.path.join(*sanitized_path)) # relative/drive root relative path
root = '\\' if normed[:1] == '\\' else ''
parts = normed.split('\\')
return root + '\\'.join(_sanitize_path_parts(parts))
def sanitize_url(url, *, scheme='http'): def sanitize_url(url, *, scheme='http'):
@ -804,14 +824,18 @@ class Popen(subprocess.Popen):
_startupinfo = None _startupinfo = None
@staticmethod @staticmethod
def _fix_pyinstaller_ld_path(env): def _fix_pyinstaller_issues(env):
"""Restore LD_LIBRARY_PATH when using PyInstaller
Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
https://github.com/yt-dlp/yt-dlp/issues/4573
"""
if not hasattr(sys, '_MEIPASS'): if not hasattr(sys, '_MEIPASS'):
return return
# Force spawning independent subprocesses for exes bundled with PyInstaller>=6.10
# Ref: https://pyinstaller.org/en/v6.10.0/CHANGES.html#incompatible-changes
# https://github.com/yt-dlp/yt-dlp/issues/11259
env['PYINSTALLER_RESET_ENVIRONMENT'] = '1'
# Restore LD_LIBRARY_PATH when using PyInstaller
# Ref: https://pyinstaller.org/en/v6.10.0/runtime-information.html#ld-library-path-libpath-considerations
# https://github.com/yt-dlp/yt-dlp/issues/4573
def _fix(key): def _fix(key):
orig = env.get(f'{key}_ORIG') orig = env.get(f'{key}_ORIG')
if orig is None: if orig is None:
@ -825,7 +849,7 @@ def _fix(key):
def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs): def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs):
if env is None: if env is None:
env = os.environ.copy() env = os.environ.copy()
self._fix_pyinstaller_ld_path(env) self._fix_pyinstaller_issues(env)
self.__text_mode = kwargs.get('encoding') or kwargs.get('errors') or text or kwargs.get('universal_newlines') self.__text_mode = kwargs.get('encoding') or kwargs.get('errors') or text or kwargs.get('universal_newlines')
if text is True: if text is True:
@ -1964,11 +1988,30 @@ def urljoin(base, path):
return urllib.parse.urljoin(base, path) return urllib.parse.urljoin(base, path)
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1): def partial_application(func):
sig = inspect.signature(func)
@functools.wraps(func)
def wrapped(*args, **kwargs):
try:
sig.bind(*args, **kwargs)
except TypeError:
return functools.partial(func, *args, **kwargs)
else:
return func(*args, **kwargs)
return wrapped
@partial_application
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1, base=None):
if get_attr and v is not None: if get_attr and v is not None:
v = getattr(v, get_attr, None) v = getattr(v, get_attr, None)
if invscale == 1 and scale < 1:
invscale = int(1 / scale)
scale = 1
try: try:
return int(v) * invscale // scale return (int(v) if base is None else int(v, base=base)) * invscale // scale
except (ValueError, TypeError, OverflowError): except (ValueError, TypeError, OverflowError):
return default return default
@ -1986,9 +2029,13 @@ def str_to_int(int_str):
return int_or_none(int_str) return int_or_none(int_str)
@partial_application
def float_or_none(v, scale=1, invscale=1, default=None): def float_or_none(v, scale=1, invscale=1, default=None):
if v is None: if v is None:
return default return default
if invscale == 1 and scale < 1:
invscale = int(1 / scale)
scale = 1
try: try:
return float(v) * invscale / scale return float(v) * invscale / scale
except (ValueError, TypeError): except (ValueError, TypeError):

View File

@ -1,18 +1,35 @@
from __future__ import annotations
import collections
import collections.abc import collections.abc
import contextlib import contextlib
import functools
import http.cookies import http.cookies
import inspect import inspect
import itertools import itertools
import re import re
import typing
import xml.etree.ElementTree import xml.etree.ElementTree
from ._utils import ( from ._utils import (
IDENTITY, IDENTITY,
NO_DEFAULT, NO_DEFAULT,
ExtractorError,
LazyList, LazyList,
deprecation_warning, deprecation_warning,
get_elements_html_by_class,
get_elements_html_by_attribute,
get_elements_by_attribute,
get_element_html_by_attribute,
get_element_by_attribute,
get_element_html_by_id,
get_element_by_id,
get_element_html_by_class,
get_elements_by_class,
get_element_text_and_html_by_tag,
is_iterable_like, is_iterable_like,
try_call, try_call,
url_or_none,
variadic, variadic,
) )
@ -54,6 +71,7 @@ def traverse_obj(
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`. Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- `any`-builtin: Take the first matching object and return it, resetting branching. - `any`-builtin: Take the first matching object and return it, resetting branching.
- `all`-builtin: Take all matching objects and return them as a list, resetting branching. - `all`-builtin: Take all matching objects and return them as a list, resetting branching.
- `filter`-builtin: Return the value if it is truthy, `None` otherwise.
`tuple`, `list`, and `dict` all support nested paths and branches. `tuple`, `list`, and `dict` all support nested paths and branches.
@ -247,6 +265,10 @@ def apply_path(start_obj, path, test_type):
objs = (list(filtered_objs),) objs = (list(filtered_objs),)
continue continue
if key is filter:
objs = filter(None, objs)
continue
if __debug__ and callable(key): if __debug__ and callable(key):
# Verify function signature # Verify function signature
inspect.signature(key).bind(None, None) inspect.signature(key).bind(None, None)
@ -277,13 +299,143 @@ def _traverse_obj(obj, path, allow_empty, test_type):
return results[0] if results else {} if allow_empty and is_dict else None return results[0] if results else {} if allow_empty and is_dict else None
for index, path in enumerate(paths, 1): for index, path in enumerate(paths, 1):
result = _traverse_obj(obj, path, index == len(paths), True) is_last = index == len(paths)
if result is not None: try:
return result result = _traverse_obj(obj, path, is_last, True)
if result is not None:
return result
except _RequiredError as e:
if is_last:
# Reraise to get cleaner stack trace
raise ExtractorError(e.orig_msg, expected=e.expected) from None
return None if default is NO_DEFAULT else default return None if default is NO_DEFAULT else default
def value(value, /):
return lambda _: value
def require(name, /, *, expected=False):
def func(value):
if value is None:
raise _RequiredError(f'Unable to extract {name}', expected=expected)
return value
return func
class _RequiredError(ExtractorError):
pass
@typing.overload
def subs_list_to_dict(*, ext: str | None = None) -> collections.abc.Callable[[list[dict]], dict[str, list[dict]]]: ...
@typing.overload
def subs_list_to_dict(subs: list[dict] | None, /, *, ext: str | None = None) -> dict[str, list[dict]]: ...
def subs_list_to_dict(subs: list[dict] | None = None, /, *, ext=None):
"""
Convert subtitles from a traversal into a subtitle dict.
The path should have an `all` immediately before this function.
Arguments:
`ext` The default value for `ext` in the subtitle dict
In the dict you can set the following additional items:
`id` The subtitle id to sort the dict into
`quality` The sort order for each subtitle
"""
if subs is None:
return functools.partial(subs_list_to_dict, ext=ext)
result = collections.defaultdict(list)
for sub in subs:
if not url_or_none(sub.get('url')) and not sub.get('data'):
continue
sub_id = sub.pop('id', None)
if sub_id is None:
continue
if ext is not None and not sub.get('ext'):
sub['ext'] = ext
result[sub_id].append(sub)
result = dict(result)
for subs in result.values():
subs.sort(key=lambda x: x.pop('quality', 0) or 0)
return result
@typing.overload
def find_element(*, attr: str, value: str, tag: str | None = None, html=False): ...
@typing.overload
def find_element(*, cls: str, html=False): ...
@typing.overload
def find_element(*, id: str, tag: str | None = None, html=False): ...
@typing.overload
def find_element(*, tag: str, html=False): ...
def find_element(*, tag=None, id=None, cls=None, attr=None, value=None, html=False):
# deliberately using `id=` and `cls=` for ease of readability
assert tag or id or cls or (attr and value), 'One of tag, id, cls or (attr AND value) is required'
if not tag:
tag = r'[\w:.-]+'
if attr and value:
assert not cls, 'Cannot match both attr and cls'
assert not id, 'Cannot match both attr and id'
func = get_element_html_by_attribute if html else get_element_by_attribute
return functools.partial(func, attr, value, tag=tag)
elif cls:
assert not id, 'Cannot match both cls and id'
assert tag is None, 'Cannot match both cls and tag'
func = get_element_html_by_class if html else get_elements_by_class
return functools.partial(func, cls)
elif id:
func = get_element_html_by_id if html else get_element_by_id
return functools.partial(func, id, tag=tag)
index = int(bool(html))
return lambda html: get_element_text_and_html_by_tag(tag, html)[index]
@typing.overload
def find_elements(*, cls: str, html=False): ...
@typing.overload
def find_elements(*, attr: str, value: str, tag: str | None = None, html=False): ...
def find_elements(*, tag=None, cls=None, attr=None, value=None, html=False):
# deliberately using `cls=` for ease of readability
assert cls or (attr and value), 'One of cls or (attr AND value) is required'
if attr and value:
assert not cls, 'Cannot match both attr and cls'
func = get_elements_html_by_attribute if html else get_elements_by_attribute
return functools.partial(func, attr, value, tag=tag or r'[\w:.-]+')
assert not tag, 'Cannot match both cls and tag'
func = get_elements_html_by_class if html else get_elements_by_class
return functools.partial(func, cls)
def get_first(obj, *paths, **kwargs): def get_first(obj, *paths, **kwargs):
return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False) return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)