"""No longer used and new code should not use. Exists only for API compat."""
import asyncio
import atexit
import platform
import struct
import sys
import urllib.error
import urllib.parse
import urllib.request
import zlib

from ._utils import Popen, decode_base_n, preferredencoding
from .traversal import traverse_obj
from ..dependencies import certifi, websockets
from ..networking._helper import make_ssl_context
from ..networking._urllib import HTTPHandler

# isort: split
from .networking import escape_rfc3986  # noqa: F401
from .networking import normalize_url as escape_url  # noqa: F401
from .networking import random_user_agent, std_headers  # noqa: F401
from ..cookies import YoutubeDLCookieJar  # noqa: F401
from ..networking._urllib import PUTRequest  # noqa: F401
from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest  # noqa: F401
from ..networking._urllib import ProxyHandler as PerRequestProxyHandler  # noqa: F401
from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler  # noqa: F401
from ..networking._urllib import (  # noqa: F401
    make_socks_conn_class,
    update_Request,
)
from ..networking.exceptions import HTTPError, network_exceptions  # noqa: F401

has_certifi = bool(certifi)
has_websockets = bool(websockets)


class WebSocketsWrapper:
    """Wraps websockets module to use in non-async scopes"""
    pool = None

    def __init__(self, url, headers=None, connect=True, **ws_kwargs):
        self.loop = asyncio.new_event_loop()
        # XXX: "loop" is deprecated
        self.conn = websockets.connect(
            url, extra_headers=headers, ping_interval=None,
            close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'), **ws_kwargs)
        if connect:
            self.__enter__()
        atexit.register(self.__exit__, None, None, None)

    def __enter__(self):
        if not self.pool:
            self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
        return self

    def send(self, *args):
        self.run_with_loop(self.pool.send(*args), self.loop)

    def recv(self, *args):
        return self.run_with_loop(self.pool.recv(*args), self.loop)

    def __exit__(self, type, value, traceback):
        try:
            return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
        finally:
            self.loop.close()
            self._cancel_all_tasks(self.loop)

    # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
    # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
    @staticmethod
    def run_with_loop(main, loop):
        if not asyncio.iscoroutine(main):
            raise ValueError(f'a coroutine was expected, got {main!r}')

        try:
            return loop.run_until_complete(main)
        finally:
            loop.run_until_complete(loop.shutdown_asyncgens())
            if hasattr(loop, 'shutdown_default_executor'):
                loop.run_until_complete(loop.shutdown_default_executor())

    @staticmethod
    def _cancel_all_tasks(loop):
        to_cancel = asyncio.all_tasks(loop)

        if not to_cancel:
            return

        for task in to_cancel:
            task.cancel()

        # XXX: "loop" is removed in Python 3.10+
        loop.run_until_complete(
            asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))

        for task in to_cancel:
            if task.cancelled():
                continue
            if task.exception() is not None:
                loop.call_exception_handler({
                    'message': 'unhandled exception during asyncio.run() shutdown',
                    'exception': task.exception(),
                    'task': task,
                })


def load_plugins(name, suffix, namespace):
    from ..plugins import load_plugins
    ret = load_plugins(name, suffix)
    namespace.update(ret)
    return ret


def traverse_dict(dictn, keys, casesense=True):
    return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)


def decode_base(value, digits):
    return decode_base_n(value, table=digits)


def platform_name():
    """ Returns the platform name as a str """
    return platform.platform()


def get_subprocess_encoding():
    if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
        # For subprocess calls, encode with locale encoding
        # Refer to http://stackoverflow.com/a/9951851/35070
        encoding = preferredencoding()
    else:
        encoding = sys.getfilesystemencoding()
    if encoding is None:
        encoding = 'utf-8'
    return encoding


# UNUSED
# Based on png2str() written by @gdkchan and improved by @yokrysty
# Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
def decode_png(png_data):
    # Reference: https://www.w3.org/TR/PNG/
    header = png_data[8:]

    if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
        raise OSError('Not a valid PNG file.')

    int_map = {1: '>B', 2: '>H', 4: '>I'}
    unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]

    chunks = []

    while header:
        length = unpack_integer(header[:4])
        header = header[4:]

        chunk_type = header[:4]
        header = header[4:]

        chunk_data = header[:length]
        header = header[length:]

        header = header[4:]  # Skip CRC

        chunks.append({
            'type': chunk_type,
            'length': length,
            'data': chunk_data
        })

    ihdr = chunks[0]['data']

    width = unpack_integer(ihdr[:4])
    height = unpack_integer(ihdr[4:8])

    idat = b''

    for chunk in chunks:
        if chunk['type'] == b'IDAT':
            idat += chunk['data']

    if not idat:
        raise OSError('Unable to read PNG data.')

    decompressed_data = bytearray(zlib.decompress(idat))

    stride = width * 3
    pixels = []

    def _get_pixel(idx):
        x = idx % stride
        y = idx // stride
        return pixels[y][x]

    for y in range(height):
        basePos = y * (1 + stride)
        filter_type = decompressed_data[basePos]

        current_row = []

        pixels.append(current_row)

        for x in range(stride):
            color = decompressed_data[1 + basePos + x]
            basex = y * stride + x
            left = 0
            up = 0

            if x > 2:
                left = _get_pixel(basex - 3)
            if y > 0:
                up = _get_pixel(basex - stride)

            if filter_type == 1:  # Sub
                color = (color + left) & 0xff
            elif filter_type == 2:  # Up
                color = (color + up) & 0xff
            elif filter_type == 3:  # Average
                color = (color + ((left + up) >> 1)) & 0xff
            elif filter_type == 4:  # Paeth
                a = left
                b = up
                c = 0

                if x > 2 and y > 0:
                    c = _get_pixel(basex - stride - 3)

                p = a + b - c

                pa = abs(p - a)
                pb = abs(p - b)
                pc = abs(p - c)

                if pa <= pb and pa <= pc:
                    color = (color + a) & 0xff
                elif pb <= pc:
                    color = (color + b) & 0xff
                else:
                    color = (color + c) & 0xff

            current_row.append(color)

    return width, height, pixels


def register_socks_protocols():
    # "Register" SOCKS protocols
    # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
    # URLs with protocols not in urlparse.uses_netloc are not handled correctly
    for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
        if scheme not in urllib.parse.uses_netloc:
            urllib.parse.uses_netloc.append(scheme)


def handle_youtubedl_headers(headers):
    filtered_headers = headers

    if 'Youtubedl-no-compression' in filtered_headers:
        filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
        del filtered_headers['Youtubedl-no-compression']

    return filtered_headers


def request_to_url(req):
    if isinstance(req, urllib.request.Request):
        return req.get_full_url()
    else:
        return req


def sanitized_Request(url, *args, **kwargs):
    from ..utils import extract_basic_auth, sanitize_url
    url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
    if auth_header is not None:
        headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
        headers['Authorization'] = auth_header
    return urllib.request.Request(url, *args, **kwargs)


class YoutubeDLHandler(HTTPHandler):
    def __init__(self, params, *args, **kwargs):
        self._params = params
        super().__init__(*args, **kwargs)


YoutubeDLHTTPSHandler = YoutubeDLHandler


class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
    def __init__(self, cookiejar=None):
        urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)

    def http_response(self, request, response):
        return urllib.request.HTTPCookieProcessor.http_response(self, request, response)

    https_request = urllib.request.HTTPCookieProcessor.http_request
    https_response = http_response


def make_HTTPS_handler(params, **kwargs):
    return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
        verify=not params.get('nocheckcertificate'),
        client_certificate=params.get('client_certificate'),
        client_certificate_key=params.get('client_certificate_key'),
        client_certificate_password=params.get('client_certificate_password'),
        legacy_support=params.get('legacyserverconnect'),
        use_certifi='no-certifi' not in params.get('compat_opts', []),
    ), **kwargs)


def process_communicate_or_kill(p, *args, **kwargs):
    return Popen.communicate_or_kill(p, *args, **kwargs)