This commit is contained in:
andrew (from workstation) 2019-11-04 15:08:33 +01:00
parent f72a45c6a8
commit ebeff15466

View File

@ -1,5 +1,6 @@
import pyrogram import pyrogram
import aiohttp import aiohttp
import json
from async_worker.async_worker import AsyncTask from async_worker.async_worker import AsyncTask
from pyrogram.api.types import ChannelMessagesFilterEmpty from pyrogram.api.types import ChannelMessagesFilterEmpty
@ -9,17 +10,69 @@ from pyrogram.api.functions.updates.get_channel_difference import GetChannelDiff
from pyrogram.client.types.messages_and_media import Message as MessagePyrogram from pyrogram.client.types.messages_and_media import Message as MessagePyrogram
class JsonSerializerFixes:
@staticmethod
def user(obj):
obj.type = "private"
return obj
@staticmethod
def user_type(obj):
obj.type = "channel" if obj.id < 0 else "private"
return obj
class JsonSerializer:
fixes = {
"from_user": {
"new_name": "from",
"patch": JsonSerializerFixes.user
},
"user": {
"new_name": "user",
"patch": JsonSerializerFixes.user_type
}
}
@staticmethod
def default(obj):
if isinstance(obj, bytes):
return repr(obj)
cls = JsonSerializer
result = {}
for name in filter(lambda x: not x.startswith("_"), obj.__dict__):
value = getattr(obj, name)
if value is None:
continue
if name in cls.fixes:
value = cls.fixes[name]["patch"](value)
name = cls.fixes[name]["new_name"]
result[name] = value
return result
class ChannelHistoryReadTask(AsyncTask): class ChannelHistoryReadTask(AsyncTask):
channel: pyrogram.Chat channel: pyrogram.Chat
client: pyrogram.Client client: pyrogram.Client
pts: int pts: int
webhook: str webhook: str
http: aiohttp.ClientSession
seq_no: int = 0 # global mutable
def setup(self, client: pyrogram.Client, channel: pyrogram.Chat, webhook: str): def setup(self, client: pyrogram.Client, channel: pyrogram.Chat, webhook: str):
self.client = client self.client = client
self.channel = channel self.channel = channel
self.pts = False self.pts = False
self.webhook = webhook self.webhook = webhook
self.http = aiohttp.ClientSession()
async def process(self): async def process(self):
response = await self.client.send( response = await self.client.send(
@ -37,13 +90,16 @@ class ChannelHistoryReadTask(AsyncTask):
users = {i.id: i for i in response.users} users = {i.id: i for i in response.users}
chats = {i.id: i for i in response.chats} chats = {i.id: i for i in response.chats}
http = aiohttp.ClientSession()
for message in response.new_messages: for message in response.new_messages:
message = await MessagePyrogram._parse(self.client, message, users, chats) message = await MessagePyrogram._parse(self.client, message, users, chats)
await http.post(self.webhook, data=bytes(str(message), "utf8")) message = {"update_id": 1, "message": message}
await http.close() data = json.dumps(message, default=JsonSerializer.default, ensure_ascii=True)
response = await self.http.post(self.webhook, data=bytes(data, "utf8"))
await response.read()
response.close()
if not response.final: if not response.final:
return 1 return 1