outside-fetcher/tasks/webhook.py
2019-11-05 19:48:14 +01:00

25 lines
607 B
Python

import typing
import aiohttp
from async_worker import OneLoopAsyncTask
HEADERS = [("Content-Type", "application/json")]
class WebHookDataForward(OneLoopAsyncTask):
_webhook: str
_data: typing.Union[str, bytes]
_http: aiohttp.ClientSession
async def process(self) -> typing.NoReturn:
res = await self._http.post(self._webhook, data=self._data, headers=HEADERS)
await res.read()
res.close()
def setup(self, webhook: str, data: typing.Union[str, bytes]):
self._http = aiohttp.ClientSession()
self._webhook = webhook
self._data = data