outside-fetcher/__main__.py

91 lines
2.4 KiB
Python

import typing
import pyrogram
import aiohttp.web
import asyncio
import multiprocessing
import config
from async_worker.async_worker import AsyncTaskScheduler
from tasks.channel_history import ChannelHistoryReadTask
class ClientRefCount:
client: pyrogram.Client
count: int = 0
def __init__(self, api_id: int, api_hash: str, name: str):
self.client = pyrogram.Client(name, api_id, api_hash)
async def start(self):
await self.client.start()
def get_client(self) -> pyrogram.Client:
self.count += 1
return self.client
def __gt__(self, other: 'ClientRefCount'):
return self.count < other.count
class ClientRoundRobin:
clients: typing.List[ClientRefCount]
def __init__(self):
self.clients = []
def get_client(self) -> pyrogram.Client:
return min(self.clients).get_client()
async def add_client(self, api_id: int, api_hash: str, name: str):
client = ClientRefCount(api_id, api_hash, name)
self.clients.append(client)
await client.start()
class FetcherAPI:
clients: ClientRoundRobin
scheduler: AsyncTaskScheduler
already_watching: typing.List[str]
def __init__(self):
self.clients = ClientRoundRobin()
self.scheduler = AsyncTaskScheduler()
self.already_watching = []
async def setup(self):
for name in config.sessions:
await self.clients.add_client(config.app_id, config.app_hash, name)
app = aiohttp.web.Application()
app.add_routes([aiohttp.web.get("/tasks/watch/add/{username}", self.add_channel)])
await asyncio.gather(
aiohttp.web._run_app(app, host=config.listen_host, port=config.listen_port),
*(
self.scheduler.loop()
for _ in range(multiprocessing.cpu_count())
)
)
async def add_channel(self, request):
user = request.match_info["username"]
if user in self.already_watching:
return aiohttp.web.Response(status=400)
self.already_watching.append(user)
client = self.clients.get_client()
task = ChannelHistoryReadTask(client, user, config.webhook)
await self.scheduler.submit(task)
return aiohttp.web.Response(status=200)
if __name__ == "__main__":
api = FetcherAPI()
_loop = asyncio.get_event_loop()
_loop.run_until_complete(api.setup())