outside-fetcher/tasks/mtproto_task_abstraction.py
andrew (from workstation) eda2512f0f next_delay fix conversion
2019-11-05 19:56:24 +01:00

33 lines
802 B
Python

import abc
import typing
from async_worker import AsyncTask
from pyrogram.errors.exceptions import FloodWait, ChannelInvalid, ChannelPrivate, Unauthorized
class MtProtoTask(AsyncTask, abc.ABC):
@abc.abstractmethod
def setup(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
async def process(self) -> typing.Union[bool, int]:
raise NotImplementedError
async def _process(self) -> typing.Union[bool, int]:
try:
result = await self.process()
if result is False:
return False
return result * 1e9
except FloodWait as error:
return int(error.MESSAGE.split("_")[-1]) * 1e9
except (ChannelInvalid, ChannelPrivate, Unauthorized):
return False