hyperboria/idm/api/services/chat_manager.py

113 lines
4.1 KiB
Python
Raw Normal View History

from grpc import StatusCode
from idm.api.proto.chat_manager_service_pb2 import Chat as ChatPb
from idm.api.proto.chat_manager_service_pb2 import Chats as ChatsPb
from idm.api.proto.chat_manager_service_pb2_grpc import (
ChatManagerServicer,
add_ChatManagerServicer_to_server,
)
from library.aiogrpctools.base import (
BaseService,
aiogrpc_request_wrapper,
)
from pypika import (
PostgreSQLQuery,
Table,
)
class ChatManagerService(ChatManagerServicer, BaseService):
chats_table = Table('chats')
def __init__(self, server, service_name, pool_holder, admin_log_reader):
super().__init__(service_name=service_name)
self.server = server
self.pool_holder = pool_holder
self.admin_log_reader = admin_log_reader
async def start(self):
add_ChatManagerServicer_to_server(self, self.server)
def enrich_chat(self, chat_pb: ChatPb):
chat_pb.is_subscribed = self.admin_log_reader.is_subscribed(chat_pb.chat_id)
return chat_pb
@aiogrpc_request_wrapper()
async def create_chat(self, request, context, metadata):
chat = ChatPb(
chat_id=request.chat_id,
language=request.language,
username=request.username,
is_system_messaging_enabled=True,
is_discovery_enabled=True,
)
query = (
PostgreSQLQuery
.into(self.chats_table)
.columns(
self.chats_table.chat_id,
self.chats_table.language,
self.chats_table.username,
self.chats_table.is_system_messaging_enabled,
self.chats_table.is_discovery_enabled,
)
.insert(
chat.chat_id,
chat.language,
chat.username,
chat.is_system_messaging_enabled,
chat.is_discovery_enabled,
)
.on_conflict('chat_id')
.do_nothing()
).get_sql()
async with self.pool_holder.pool.acquire() as session:
await session.execute(query)
return await self._get_chat(session=session, chat_id=request.chat_id, context=context)
async def _get_chat(self, session, chat_id, context):
query = (
PostgreSQLQuery
.from_(self.chats_table)
.select('*')
.where(self.chats_table.chat_id == chat_id)
).get_sql()
result = await session.execute(query)
chat = await result.fetchone()
if chat is None:
await context.abort(StatusCode.NOT_FOUND, 'not_found')
return self.enrich_chat(ChatPb(**chat))
@aiogrpc_request_wrapper()
async def get_chat(self, request, context, metadata):
async with self.pool_holder.pool.acquire() as session:
return await self._get_chat(session=session, chat_id=request.chat_id, context=context)
@aiogrpc_request_wrapper()
async def list_chats(self, request, context, metadata):
query = (
PostgreSQLQuery
.from_(self.chats_table)
.select('*')
.where(self.chats_table.ban_until > request.banned_at_moment)
.limit(10)
).get_sql()
async with self.pool_holder.pool.acquire() as session:
results = await session.execute(query)
chats = await results.fetchall()
return ChatsPb(
chats=list(map(lambda x: self.enrich_chat(ChatPb(**x)), chats))
)
@aiogrpc_request_wrapper()
async def update_chat(self, request, context, metadata):
query = PostgreSQLQuery.update(self.chats_table)
for field in request.DESCRIPTOR.fields:
if field.containing_oneof and request.HasField(field.name):
field_value = getattr(request, field.name)
query = query.set(field.name, field_value)
query = query.where(self.chats_table.chat_id == request.chat_id).returning('*').get_sql()
async with self.pool_holder.pool.acquire() as session:
result = await session.execute(query)
chat = await result.fetchone()
return self.enrich_chat(ChatPb(**chat))