94 lines
2.4 KiB
Python
94 lines
2.4 KiB
Python
|
import typing
|
||
|
import pyrogram
|
||
|
import aiohttp.web
|
||
|
import asyncio
|
||
|
import multiprocessing
|
||
|
import config
|
||
|
|
||
|
|
||
|
from async_worker.async_worker import AsyncTaskScheduler
|
||
|
from tasks.channel_history import ChannelHistoryReadTask
|
||
|
|
||
|
|
||
|
class ClientRefCount:
|
||
|
client: pyrogram.Client
|
||
|
count: int = 0
|
||
|
|
||
|
def __init__(self, api_id: int, api_hash: str, name: str):
|
||
|
self.client = pyrogram.Client(name, api_id, api_hash)
|
||
|
|
||
|
async def start(self):
|
||
|
await self.client.start()
|
||
|
|
||
|
def get_client(self) -> pyrogram.Client:
|
||
|
self.count += 1
|
||
|
return self.client
|
||
|
|
||
|
def __gt__(self, other: 'ClientRefCount'):
|
||
|
return self.count < other.count
|
||
|
|
||
|
|
||
|
class ClientRoundRobin:
|
||
|
clients: typing.List[ClientRefCount]
|
||
|
|
||
|
def __init__(self):
|
||
|
self.clients = []
|
||
|
|
||
|
def get_client(self) -> pyrogram.Client:
|
||
|
return min(self.clients).get_client()
|
||
|
|
||
|
async def add_client(self, api_id: int, api_hash: str, name: str):
|
||
|
client = ClientRefCount(api_id, api_hash, name)
|
||
|
self.clients.append(client)
|
||
|
await client.start()
|
||
|
|
||
|
|
||
|
class FetcherAPI:
|
||
|
clients: ClientRoundRobin
|
||
|
scheduler: AsyncTaskScheduler
|
||
|
already_watching: typing.List[str]
|
||
|
|
||
|
def __init__(self):
|
||
|
self.clients = ClientRoundRobin()
|
||
|
self.scheduler = AsyncTaskScheduler()
|
||
|
self.already_watching = []
|
||
|
|
||
|
async def setup(self):
|
||
|
for name in config.sessions:
|
||
|
await self.clients.add_client(config.app_id, config.app_hash, name)
|
||
|
|
||
|
app = aiohttp.web.Application()
|
||
|
app.add_routes([aiohttp.web.get("/tasks/watch/add/{username}", self.add_channel)])
|
||
|
|
||
|
await asyncio.gather(
|
||
|
aiohttp.web._run_app(app, host=config.listen_host, port=config.listen_port),
|
||
|
*(
|
||
|
self.scheduler.loop()
|
||
|
for _ in range(multiprocessing.cpu_count())
|
||
|
)
|
||
|
)
|
||
|
|
||
|
async def add_channel(self, request):
|
||
|
user = request.match_info["username"]
|
||
|
|
||
|
if user in self.already_watching:
|
||
|
return aiohttp.web.Response(status=400)
|
||
|
|
||
|
self.already_watching.append(user)
|
||
|
|
||
|
client = self.clients.get_client()
|
||
|
task = ChannelHistoryReadTask()
|
||
|
|
||
|
peer = await client.resolve_peer(user)
|
||
|
task.setup(client, peer, config.webhook)
|
||
|
|
||
|
await self.scheduler.submit(task)
|
||
|
return aiohttp.web.Response(status=200)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
api = FetcherAPI()
|
||
|
|
||
|
_loop = asyncio.get_event_loop()
|
||
|
_loop.run_until_complete(api.setup())
|