hyperboria/library/telegram/utils.py
the-superpirate cca9e8be47 - feat: Added missing library
- feat: Set up delays for Crossref API

GitOrigin-RevId: 3448e1c2a9fdfca2f8bf95d37e1d62cc5a221577
2021-04-09 14:32:30 +03:00

46 lines
1.1 KiB
Python

import logging
import traceback
from contextlib import asynccontextmanager
from typing import (
Awaitable,
Callable,
Optional,
)
from telethon import (
errors,
events,
)
from .base import RequestContext
@asynccontextmanager
async def safe_execution(
request_context: RequestContext,
on_fail: Optional[Callable[[], Awaitable]] = None,
):
try:
try:
yield
except events.StopPropagation:
raise
except (
errors.UserIsBlockedError,
errors.QueryIdInvalidError,
errors.MessageDeleteForbiddenError,
errors.MessageIdInvalidError,
errors.MessageNotModifiedError,
errors.ChatAdminRequiredError,
) as e:
request_context.error_log(e, level=logging.WARNING)
except Exception as e:
traceback.print_exc()
request_context.error_log(e)
if on_fail:
await on_fail()
except events.StopPropagation:
raise
except Exception as e:
request_context.error_log(e)