mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-01-11 21:15:53 +01:00
110 lines
4.6 KiB
Python
110 lines
4.6 KiB
Python
from hashlib import sha256
|
|
import itertools
|
|
import json
|
|
import re
|
|
import time
|
|
|
|
from .ffmpeg import FFmpegPostProcessor
|
|
from ..compat import compat_urllib_parse_urlencode, compat_HTTPError
|
|
from ..utils import PostProcessingError, network_exceptions, sanitized_Request
|
|
|
|
|
|
class SponsorBlockPP(FFmpegPostProcessor):
|
|
|
|
EXTRACTORS = {
|
|
'Youtube': 'YouTube',
|
|
}
|
|
CATEGORIES = {
|
|
'sponsor': 'Sponsor',
|
|
'intro': 'Intermission/Intro Animation',
|
|
'outro': 'Endcards/Credits',
|
|
'selfpromo': 'Unpaid/Self Promotion',
|
|
'interaction': 'Interaction Reminder',
|
|
'preview': 'Preview/Recap',
|
|
'music_offtopic': 'Non-Music Section'
|
|
}
|
|
|
|
def __init__(self, downloader, categories=None, api='https://sponsor.ajay.app'):
|
|
FFmpegPostProcessor.__init__(self, downloader)
|
|
self._categories = tuple(categories or self.CATEGORIES.keys())
|
|
self._API_URL = api if re.match('^https?://', api) else 'https://' + api
|
|
|
|
def run(self, info):
|
|
extractor = info['extractor_key']
|
|
if extractor not in self.EXTRACTORS:
|
|
self.to_screen(f'SponsorBlock is not supported for {extractor}')
|
|
return [], info
|
|
|
|
self.to_screen('Fetching SponsorBlock segments')
|
|
info['sponsorblock_chapters'] = self._get_sponsor_chapters(info, info['duration'])
|
|
return [], info
|
|
|
|
def _get_sponsor_chapters(self, info, duration):
|
|
segments = self._get_sponsor_segments(info['id'], self.EXTRACTORS[info['extractor_key']])
|
|
|
|
def duration_filter(s):
|
|
start_end = s['segment']
|
|
# Ignore milliseconds difference at the start.
|
|
if start_end[0] <= 1:
|
|
start_end[0] = 0
|
|
# Ignore milliseconds difference at the end.
|
|
# Never allow the segment to exceed the video.
|
|
if duration and duration - start_end[1] <= 1:
|
|
start_end[1] = duration
|
|
# SponsorBlock duration may be absent or it may deviate from the real one.
|
|
return s['videoDuration'] == 0 or not duration or abs(duration - s['videoDuration']) <= 1
|
|
|
|
duration_match = [s for s in segments if duration_filter(s)]
|
|
if len(duration_match) != len(segments):
|
|
self.report_warning('Some SponsorBlock segments are from a video of different duration, maybe from an old version of this video')
|
|
|
|
def to_chapter(s):
|
|
(start, end), cat = s['segment'], s['category']
|
|
return {
|
|
'start_time': start,
|
|
'end_time': end,
|
|
'category': cat,
|
|
'title': self.CATEGORIES[cat],
|
|
'_categories': [(cat, start, end)]
|
|
}
|
|
|
|
sponsor_chapters = [to_chapter(s) for s in duration_match]
|
|
if not sponsor_chapters:
|
|
self.to_screen('No segments were found in the SponsorBlock database')
|
|
else:
|
|
self.to_screen(f'Found {len(sponsor_chapters)} segments in the SponsorBlock database')
|
|
return sponsor_chapters
|
|
|
|
def _get_sponsor_segments(self, video_id, service):
|
|
hash = sha256(video_id.encode('ascii')).hexdigest()
|
|
# SponsorBlock API recommends using first 4 hash characters.
|
|
url = f'{self._API_URL}/api/skipSegments/{hash[:4]}?' + compat_urllib_parse_urlencode({
|
|
'service': service,
|
|
'categories': json.dumps(self._categories),
|
|
})
|
|
self.write_debug(f'SponsorBlock query: {url}')
|
|
for d in self._get_json(url):
|
|
if d['videoID'] == video_id:
|
|
return d['segments']
|
|
return []
|
|
|
|
def _get_json(self, url):
|
|
# While this is not an extractor, it behaves similar to one and
|
|
# so obey extractor_retries and sleep_interval_requests
|
|
max_retries = self.get_param('extractor_retries', 3)
|
|
sleep_interval = self.get_param('sleep_interval_requests') or 0
|
|
for retries in itertools.count():
|
|
try:
|
|
rsp = self._downloader.urlopen(sanitized_Request(url))
|
|
return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))
|
|
except network_exceptions as e:
|
|
if isinstance(e, compat_HTTPError) and e.code == 404:
|
|
return []
|
|
if retries < max_retries:
|
|
self.report_warning(f'{e}. Retrying...')
|
|
if sleep_interval > 0:
|
|
self.to_screen(f'Sleeping {sleep_interval} seconds ...')
|
|
time.sleep(sleep_interval)
|
|
continue
|
|
raise PostProcessingError(f'Unable to communicate with SponsorBlock API: {e}')
|