import typing import pyrogram import aiohttp.web import asyncio import multiprocessing import config from async_worker.async_worker import AsyncTaskScheduler from tasks.channel_history import ChannelHistoryReadTask class ClientRefCount: client: pyrogram.Client count: int = 0 def __init__(self, api_id: int, api_hash: str, name: str): self.client = pyrogram.Client(name, api_id, api_hash) async def start(self): await self.client.start() def get_client(self) -> pyrogram.Client: self.count += 1 return self.client def __gt__(self, other: 'ClientRefCount'): return self.count < other.count class ClientRoundRobin: clients: typing.List[ClientRefCount] def __init__(self): self.clients = [] def get_client(self) -> pyrogram.Client: return min(self.clients).get_client() async def add_client(self, api_id: int, api_hash: str, name: str): client = ClientRefCount(api_id, api_hash, name) self.clients.append(client) await client.start() class FetcherAPI: clients: ClientRoundRobin scheduler: AsyncTaskScheduler already_watching: typing.List[str] def __init__(self): self.clients = ClientRoundRobin() self.scheduler = AsyncTaskScheduler() self.already_watching = [] async def setup(self): for name in config.sessions: await self.clients.add_client(config.app_id, config.app_hash, name) app = aiohttp.web.Application() app.add_routes([aiohttp.web.get("/tasks/watch/add/{username}", self.add_channel)]) await asyncio.gather( aiohttp.web._run_app(app, host=config.listen_host, port=config.listen_port), *( self.scheduler.loop() for _ in range(multiprocessing.cpu_count()) ) ) async def add_channel(self, request): user = request.match_info["username"] if user in self.already_watching: return aiohttp.web.Response(status=400) self.already_watching.append(user) client = self.clients.get_client() task = ChannelHistoryReadTask(client, user, config.webhook) await self.scheduler.submit(task) return aiohttp.web.Response(status=200) if __name__ == "__main__": api = FetcherAPI() _loop = asyncio.get_event_loop() _loop.run_until_complete(api.setup())