mirror of
https://github.com/ytdl-org/youtube-dl
synced 2024-11-09 09:57:03 +01:00
21792b88b7
* pass YoutubeDL (FileDownloader) to FFmpegPostProcessor constructor * consolidate path search in FFmpegPostProcessor * make availability of FFmpegFD depend on existence of FFmpegPostProcessor * detect ffmpeg executable on instantiation of FFmpegFD * resolves #32735
273 lines
8.9 KiB
Python
273 lines
8.9 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
from __future__ import unicode_literals
|
|
|
|
# Allow direct execution
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
import unittest
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from test.helper import (
|
|
FakeLogger,
|
|
FakeYDL,
|
|
http_server_port,
|
|
try_rm,
|
|
)
|
|
from youtube_dl import YoutubeDL
|
|
from youtube_dl.compat import (
|
|
compat_contextlib_suppress,
|
|
compat_http_cookiejar_Cookie,
|
|
compat_http_server,
|
|
compat_kwargs,
|
|
)
|
|
from youtube_dl.utils import (
|
|
encodeFilename,
|
|
join_nonempty,
|
|
)
|
|
from youtube_dl.downloader.external import (
|
|
Aria2cFD,
|
|
Aria2pFD,
|
|
AxelFD,
|
|
CurlFD,
|
|
FFmpegFD,
|
|
HttpieFD,
|
|
WgetFD,
|
|
)
|
|
from youtube_dl.postprocessor import (
|
|
FFmpegPostProcessor,
|
|
)
|
|
import threading
|
|
|
|
TEST_SIZE = 10 * 1024
|
|
|
|
TEST_COOKIE = {
|
|
'version': 0,
|
|
'name': 'test',
|
|
'value': 'ytdlp',
|
|
'port': None,
|
|
'port_specified': False,
|
|
'domain': '.example.com',
|
|
'domain_specified': True,
|
|
'domain_initial_dot': False,
|
|
'path': '/',
|
|
'path_specified': True,
|
|
'secure': False,
|
|
'expires': None,
|
|
'discard': False,
|
|
'comment': None,
|
|
'comment_url': None,
|
|
'rest': {},
|
|
}
|
|
|
|
TEST_COOKIE_VALUE = join_nonempty('name', 'value', delim='=', from_dict=TEST_COOKIE)
|
|
|
|
TEST_INFO = {'url': 'http://www.example.com/'}
|
|
|
|
|
|
def cookiejar_Cookie(**cookie_args):
|
|
return compat_http_cookiejar_Cookie(**compat_kwargs(cookie_args))
|
|
|
|
|
|
def ifExternalFDAvailable(externalFD):
|
|
return unittest.skipUnless(externalFD.available(),
|
|
externalFD.get_basename() + ' not found')
|
|
|
|
|
|
class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
def send_content_range(self, total=None):
|
|
range_header = self.headers.get('Range')
|
|
start = end = None
|
|
if range_header:
|
|
mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)
|
|
if mobj:
|
|
start, end = (int(mobj.group(i)) for i in (1, 2))
|
|
valid_range = start is not None and end is not None
|
|
if valid_range:
|
|
content_range = 'bytes %d-%d' % (start, end)
|
|
if total:
|
|
content_range += '/%d' % total
|
|
self.send_header('Content-Range', content_range)
|
|
return (end - start + 1) if valid_range else total
|
|
|
|
def serve(self, range=True, content_length=True):
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'video/mp4')
|
|
size = TEST_SIZE
|
|
if range:
|
|
size = self.send_content_range(TEST_SIZE)
|
|
if content_length:
|
|
self.send_header('Content-Length', size)
|
|
self.end_headers()
|
|
self.wfile.write(b'#' * size)
|
|
|
|
def do_GET(self):
|
|
if self.path == '/regular':
|
|
self.serve()
|
|
elif self.path == '/no-content-length':
|
|
self.serve(content_length=False)
|
|
elif self.path == '/no-range':
|
|
self.serve(range=False)
|
|
elif self.path == '/no-range-no-content-length':
|
|
self.serve(range=False, content_length=False)
|
|
else:
|
|
assert False, 'unrecognised server path'
|
|
|
|
|
|
@ifExternalFDAvailable(Aria2pFD)
|
|
class TestAria2pFD(unittest.TestCase):
|
|
def setUp(self):
|
|
self.httpd = compat_http_server.HTTPServer(
|
|
('127.0.0.1', 0), HTTPTestRequestHandler)
|
|
self.port = http_server_port(self.httpd)
|
|
self.server_thread = threading.Thread(target=self.httpd.serve_forever)
|
|
self.server_thread.daemon = True
|
|
self.server_thread.start()
|
|
|
|
def download(self, params, ep):
|
|
with subprocess.Popen(
|
|
['aria2c', '--enable-rpc'],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
) as process:
|
|
if not process.poll():
|
|
filename = 'testfile.mp4'
|
|
params['logger'] = FakeLogger()
|
|
params['outtmpl'] = filename
|
|
ydl = YoutubeDL(params)
|
|
try_rm(encodeFilename(filename))
|
|
self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)
|
|
self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
|
|
try_rm(encodeFilename(filename))
|
|
process.kill()
|
|
|
|
def download_all(self, params):
|
|
for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
|
|
self.download(params, ep)
|
|
|
|
def test_regular(self):
|
|
self.download_all({'external_downloader': 'aria2p'})
|
|
|
|
def test_chunked(self):
|
|
self.download_all({
|
|
'external_downloader': 'aria2p',
|
|
'http_chunk_size': 1000,
|
|
})
|
|
|
|
|
|
@ifExternalFDAvailable(HttpieFD)
|
|
class TestHttpieFD(unittest.TestCase):
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = HttpieFD(ydl, {})
|
|
self.assertEqual(
|
|
downloader._make_cmd('test', TEST_INFO),
|
|
['http', '--download', '--output', 'test', 'http://www.example.com/'])
|
|
|
|
# Test cookie header is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
self.assertEqual(
|
|
downloader._make_cmd('test', TEST_INFO),
|
|
['http', '--download', '--output', 'test',
|
|
'http://www.example.com/', 'Cookie:' + TEST_COOKIE_VALUE])
|
|
|
|
|
|
@ifExternalFDAvailable(AxelFD)
|
|
class TestAxelFD(unittest.TestCase):
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = AxelFD(ydl, {})
|
|
self.assertEqual(
|
|
downloader._make_cmd('test', TEST_INFO),
|
|
['axel', '-o', 'test', '--', 'http://www.example.com/'])
|
|
|
|
# Test cookie header is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
self.assertEqual(
|
|
downloader._make_cmd('test', TEST_INFO),
|
|
['axel', '-o', 'test', '-H', 'Cookie: ' + TEST_COOKIE_VALUE,
|
|
'--max-redirect=0', '--', 'http://www.example.com/'])
|
|
|
|
|
|
@ifExternalFDAvailable(WgetFD)
|
|
class TestWgetFD(unittest.TestCase):
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = WgetFD(ydl, {})
|
|
self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
|
# Test cookiejar tempfile arg is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
|
|
|
|
|
@ifExternalFDAvailable(CurlFD)
|
|
class TestCurlFD(unittest.TestCase):
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = CurlFD(ydl, {})
|
|
self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
|
# Test cookie header is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
|
self.assertIn(TEST_COOKIE_VALUE, downloader._make_cmd('test', TEST_INFO))
|
|
|
|
|
|
@ifExternalFDAvailable(Aria2cFD)
|
|
class TestAria2cFD(unittest.TestCase):
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = Aria2cFD(ydl, {})
|
|
downloader._make_cmd('test', TEST_INFO)
|
|
self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
|
|
|
|
# Test cookiejar tempfile arg is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
cmd = downloader._make_cmd('test', TEST_INFO)
|
|
self.assertIn('--load-cookies=%s' % downloader._cookies_tempfile, cmd)
|
|
|
|
|
|
# Handle delegated availability
|
|
def ifFFmpegFDAvailable(externalFD):
|
|
# raise SkipTest, or set False!
|
|
avail = ifExternalFDAvailable(externalFD) and False
|
|
with compat_contextlib_suppress(Exception):
|
|
avail = FFmpegPostProcessor(downloader=None).available
|
|
return unittest.skipUnless(
|
|
avail, externalFD.get_basename() + ' not found')
|
|
|
|
|
|
@ifFFmpegFDAvailable(FFmpegFD)
|
|
class TestFFmpegFD(unittest.TestCase):
|
|
_args = []
|
|
|
|
def _test_cmd(self, args):
|
|
self._args = args
|
|
|
|
def test_make_cmd(self):
|
|
with FakeYDL() as ydl:
|
|
downloader = FFmpegFD(ydl, {})
|
|
downloader._debug_cmd = self._test_cmd
|
|
info_dict = TEST_INFO.copy()
|
|
info_dict['ext'] = 'mp4'
|
|
|
|
downloader._call_downloader('test', info_dict)
|
|
self.assertEqual(self._args, [
|
|
'ffmpeg', '-y', '-i', 'http://www.example.com/',
|
|
'-c', 'copy', '-f', 'mp4', 'file:test'])
|
|
|
|
# Test cookies arg is added
|
|
ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
|
|
downloader._call_downloader('test', info_dict)
|
|
self.assertEqual(self._args, [
|
|
'ffmpeg', '-y', '-cookies', TEST_COOKIE_VALUE + '; path=/; domain=.example.com;\r\n',
|
|
'-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|