From 43a4d0e17fe587300c405ca6a5e1231ace95f0e5 Mon Sep 17 00:00:00 2001 From: the-superpirate Date: Fri, 9 Apr 2021 15:27:58 +0300 Subject: [PATCH] - feat: Open Hub API GitOrigin-RevId: dd347635b52b69451a50cca3163fbfbcefb8561e --- nexus/README.md | 6 +- nexus/hub/BUILD.bazel | 72 +++++ nexus/hub/README.md | 66 +++++ nexus/hub/__init__.py | 0 nexus/hub/configs/__init__.py | 13 + nexus/hub/exceptions.py | 13 + nexus/hub/fancy_names.py | 8 + nexus/hub/main.py | 68 +++++ nexus/hub/services/base.py | 82 ++++++ nexus/hub/services/delivery.py | 385 +++++++++++++++++++++++++ nexus/hub/services/submitter.py | 188 ++++++++++++ nexus/hub/user_manager/__init__.py | 3 + nexus/hub/user_manager/user_manager.py | 22 ++ 13 files changed, 923 insertions(+), 3 deletions(-) create mode 100644 nexus/hub/BUILD.bazel create mode 100644 nexus/hub/README.md create mode 100644 nexus/hub/__init__.py create mode 100644 nexus/hub/configs/__init__.py create mode 100644 nexus/hub/exceptions.py create mode 100644 nexus/hub/fancy_names.py create mode 100644 nexus/hub/main.py create mode 100644 nexus/hub/services/base.py create mode 100644 nexus/hub/services/delivery.py create mode 100644 nexus/hub/services/submitter.py create mode 100644 nexus/hub/user_manager/__init__.py create mode 100644 nexus/hub/user_manager/user_manager.py diff --git a/nexus/README.md b/nexus/README.md index 54882e9..d7d44a8 100644 --- a/nexus/README.md +++ b/nexus/README.md @@ -3,9 +3,9 @@ ## Content - ✅ [`actions`](actions) - shared code for ingesting data from external APIs (LibGen/CrossrefAPI) -- 🛑 `bot` - telegram bot for Summa +- ✅ [`bot`](bot) - telegram bot for Summa - ✅ [`cognitron`](cognitron) - bundled app for IPFS, search server and web frontend -- 🛑 `hub` - downloading & sending +- ✅ [`hub`](hub) - downloading & sending - ✅ [`ingest`](ingest) - retrieving metadata from external APIs and putting it onto Kafka - 🛑 `meta_api` - rescoring and merging API for Summa backends - ✅ [`models`](models) - shared Protobuf models @@ -13,4 +13,4 @@ - ✅ [`pipe`](pipe) - processing pipeline based on Kafka - ✅ [`pylon`](pylon) - smart client for downloading files from the Internet/IPFS - ✅ [`translations`](translations) - text translations used in `bot` and `hub` -- 🛑 `views` - shared views for [`models`](models) +- ✅ [`views`](views) - shared views for [`models`](models) diff --git a/nexus/hub/BUILD.bazel b/nexus/hub/BUILD.bazel new file mode 100644 index 0000000..37c4ce5 --- /dev/null +++ b/nexus/hub/BUILD.bazel @@ -0,0 +1,72 @@ +load("@io_bazel_rules_docker//python3:image.bzl", "py3_image") +load("@io_bazel_rules_docker//container:container.bzl", "container_push") +load("@pip_modules//:requirements.bzl", "requirement") + +alias( + name = "binary", + actual = ":image.binary", +) + +py3_image( + name = "image", + srcs = glob( + ["**/*.py"], + exclude = ["proto/**"], + ), + base = "//images/production:base-python-image", + data = [ + "configs/base.yaml", + "configs/development.yaml", + "configs/logging.yaml", + "configs/production.yaml", + "configs/testing.yaml", + ], + main = "main.py", + srcs_version = "PY3ONLY", + visibility = ["//visibility:public"], + deps = [ + requirement("aiodns"), + requirement("aiohttp"), + requirement("aiohttp_socks"), + requirement("aioipfs"), + requirement("cchardet"), + requirement("orjson"), + requirement("prometheus-client"), + requirement("psycopg2-binary"), + requirement("python-socks"), + requirement("tenacity"), + requirement("uvloop"), + "//idm/api2/proto:idm_proto_py", + requirement("giogrobid"), + "//library/aiogrpctools", + requirement("aiokit"), + "//library/aiopostgres", + "//library/configurator", + "//library/telegram", + "//nexus/hub/proto:hub_grpc_py", + "//nexus/hub/proto:hub_proto_py", + "//nexus/meta_api/aioclient", + "//nexus/models/proto:models_proto_py", + "//nexus/pylon", + "//nexus/views/telegram", + ], +) + +container_push( + name = "push-latest", + format = "Docker", + image = ":image", + registry = "registry.example.com", + repository = "nexus-hub", + tag = "latest", +) + +container_push( + name = "push-testing", + format = "Docker", + image = ":image", + registry = "registry.example.com", + repository = "nexus-hub", + tag = "testing", +) + diff --git a/nexus/hub/README.md b/nexus/hub/README.md new file mode 100644 index 0000000..971b5fa --- /dev/null +++ b/nexus/hub/README.md @@ -0,0 +1,66 @@ +# Nexus Search: Hub API + +`Hub` is a daemon responsible for retrieving files and sending them to users. This version has cut `configs` +subdirectory due to hard reliance of configs on the network infrastructure you are using. +You have to write your own configs taking example below into account. + +The bot requires two other essential parts: +- Postgres Database +- IPFS Daemon + +or their substitutions + +## Sample `configs/base.yaml` + +```yaml +--- + +application: + # Look at the special Postgres `sharience` table to retrieve user-sent files + is_sharience_enabled: true + maintenance_picture_url: + # Used in logging + service_name: nexus-hub + # Store file hashes into operation log + should_store_hashes: true +database: + database: nexus + host: + password: '{{ DATABASE_PASSWORD }}' + username: '{{ DATABASE_USERNAME }}' +grobid: + url: +grpc: + # Listen address + address: 0.0.0.0 + # Listen port + port: 9090 +ipfs: + address: + port: 4001 +log_path: '/var/log/nexus-hub/{{ ENV_TYPE }}' +meta_api: + url: +pylon: + # Proxy used in `pylon` retriever to download files + proxy: socks5://127.0.0.1:9050 + # Proxy used in `pylon` retriever to get metadata + resolve_proxy: socks5://127.0.0.1:9050 +telegram: + # Telegram App Hash from https://my.telegram.org/ + app_hash: '{{ APP_HASH }}' + # Telegram App ID from https://my.telegram.org/ + app_id: 00000 + # External bot name shown in messages to users + bot_external_name: libgen_scihub_bot + # Internal bot name used in logging + bot_name: nexus-bot + bot_token: '{{ BOT_TOKEN }}' + # Telethon database for keeping cache + database: + session_id: nexus-hub + # Frequency of updating downloading progress + progress_throttle_seconds: 5 + # Send files using stored telegram_file_id + should_use_telegram_file_id: true +``` \ No newline at end of file diff --git a/nexus/hub/__init__.py b/nexus/hub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nexus/hub/configs/__init__.py b/nexus/hub/configs/__init__.py new file mode 100644 index 0000000..5a9bbd6 --- /dev/null +++ b/nexus/hub/configs/__init__.py @@ -0,0 +1,13 @@ +from izihawa_utils import env +from library.configurator import Configurator + + +def get_config(): + return Configurator([ + 'nexus/hub/configs/base.yaml', + 'nexus/hub/configs/%s.yaml?' % env.type, + 'nexus/hub/configs/logging.yaml', + ]) + + +config = get_config() diff --git a/nexus/hub/exceptions.py b/nexus/hub/exceptions.py new file mode 100644 index 0000000..7f7ccec --- /dev/null +++ b/nexus/hub/exceptions.py @@ -0,0 +1,13 @@ +from izihawa_utils.exceptions import BaseError + + +class FileTooBigError(BaseError): + code = 'file_too_big_error' + + +class UnavailableMetadataError(BaseError): + code = 'unavailable_metadata_error' + + +class UnparsableDoiError(BaseError): + code = 'unparsable_doi_error' diff --git a/nexus/hub/fancy_names.py b/nexus/hub/fancy_names.py new file mode 100644 index 0000000..0c94233 --- /dev/null +++ b/nexus/hub/fancy_names.py @@ -0,0 +1,8 @@ +from urllib.parse import urlparse + +fancy_names = { +} + + +def get_fancy_name(url): + return fancy_names.get(urlparse(url).netloc.lower(), 'Saturn Rings') diff --git a/nexus/hub/main.py b/nexus/hub/main.py new file mode 100644 index 0000000..e53d233 --- /dev/null +++ b/nexus/hub/main.py @@ -0,0 +1,68 @@ +import asyncio + +import uvloop +from library.aiogrpctools import AioGrpcServer +from library.aiopostgres import AioPostgresPoolHolder +from library.configurator import Configurator +from library.logging import configure_logging +from library.telegram.base import BaseTelegramClient +from nexus.hub.configs import get_config +from nexus.hub.services.delivery import DeliveryService +from nexus.hub.services.submitter import SubmitterService + + +class GrpcServer(AioGrpcServer): + def __init__(self, config: Configurator): + super().__init__(address=config['grpc']['address'], port=config['grpc']['port']) + self.pool_holder = AioPostgresPoolHolder( + dsn=f'dbname={config["database"]["database"]} ' + f'user={config["database"]["username"]} ' + f'password={config["database"]["password"]} ' + f'host={config["database"]["host"]}', + timeout=30, + pool_recycle=60, + maxsize=4, + ) + self.telegram_client = BaseTelegramClient( + app_id=config['telegram']['app_id'], + app_hash=config['telegram']['app_hash'], + bot_token=config['telegram']['bot_token'], + database=config['telegram'].get('database'), + mtproxy=config['telegram'].get('mtproxy'), + ) + self.delivery_service = DeliveryService( + server=self.server, + service_name=config['application']['service_name'], + bot_external_name=config['telegram']['bot_external_name'], + ipfs_config=config['ipfs'], + is_sharience_enabled=config['application']['is_sharience_enabled'], + maintenance_picture_url=config['application'].get('maintenance_picture_url', ''), + pool_holder=self.pool_holder, + pylon_config=config['pylon'], + should_store_hashes=config['application']['should_store_hashes'], + should_use_telegram_file_id=config['telegram']['should_use_telegram_file_id'], + telegram_client=self.telegram_client, + ) + self.submitter_service = SubmitterService( + server=self.server, + service_name=config['application']['service_name'], + bot_external_name=config['telegram']['bot_external_name'], + grobid_config=config['grobid'], + ipfs_config=config['ipfs'], + meta_api_config=config['meta_api'], + telegram_client=self.telegram_client, + ) + self.waits.append(self.pool_holder) + self.starts.extend([self.telegram_client, self.delivery_service, self.submitter_service]) + + +def main(): + config = get_config() + configure_logging(config) + uvloop.install() + grpc_server = GrpcServer(config) + asyncio.get_event_loop().run_until_complete(grpc_server.start_and_wait()) + + +if __name__ == '__main__': + main() diff --git a/nexus/hub/services/base.py b/nexus/hub/services/base.py new file mode 100644 index 0000000..afab074 --- /dev/null +++ b/nexus/hub/services/base.py @@ -0,0 +1,82 @@ +import asyncio + +from aioipfs import AsyncIPFS +from library.aiogrpctools.base import BaseService +from nexus.views.telegram.common import vote_button +from telethon.errors import rpcerrorlist +from telethon.tl.types import DocumentAttributeFilename +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, +) + + +def is_group_or_channel(chat_id: int): + return chat_id < 0 + + +class BaseHubService(BaseService): + def __init__(self, service_name: str, bot_external_name: str, ipfs_config: dict, telegram_client): + super().__init__(service_name=service_name) + self.bot_external_name = bot_external_name + self.ipfs_client = AsyncIPFS(host=ipfs_config['address'], port=ipfs_config['port']) + self.telegram_client = telegram_client + + async def get_ipfs_hashes(self, file): + return list(map( + lambda x: x['Hash'], + await asyncio.gather( + self.ipfs_client.add_bytes(file, cid_version=1, hash='blake2b-256', only_hash=True), + self.ipfs_client.add_bytes(file, cid_version=0, hash='sha2-256', only_hash=True), + ) + )) + + @retry( + reraise=True, + stop=stop_after_attempt(3), + retry=retry_if_exception_type((rpcerrorlist.TimeoutError, ValueError)), + ) + async def send_file( + self, + document_view, + file, + request_context, + session_id, + document_id=None, + voting=True, + progress_callback=None, + ): + if document_id is None: + document_id = document_view.id + buttons = None + if voting: + buttons = [ + vote_button( + case='broken', + document_id=document_id, + language=request_context.chat.language, + session_id=session_id, + ), + vote_button( + case='ok', + document_id=document_id, + language=request_context.chat.language, + session_id=session_id, + ), + ] + message = await self.telegram_client.send_file( + attributes=[DocumentAttributeFilename(document_view.get_filename())], + buttons=buttons, + caption=f"{document_view.generate_body(language=request_context.chat.language, limit=512)}\n" + f"@{self.bot_external_name}", + entity=request_context.chat.id, + file=file, + progress_callback=progress_callback + ) + request_context.statbox( + action='sent', + document_id=document_id, + voting=voting, + ) + return message diff --git a/nexus/hub/services/delivery.py b/nexus/hub/services/delivery.py new file mode 100644 index 0000000..4c616bf --- /dev/null +++ b/nexus/hub/services/delivery.py @@ -0,0 +1,385 @@ +import asyncio +import hashlib +import logging +import time + +from grpc import ( + Server, + ServicerContext, +) +from izihawa_utils.common import filter_none +from izihawa_utils.pb_to_json import MessageToDict +from library.aiogrpctools.base import aiogrpc_request_wrapper +from library.telegram.base import ( + BaseTelegramClient, + RequestContext, +) +from library.telegram.utils import safe_execution +from nexus.hub.fancy_names import get_fancy_name +from nexus.hub.proto.delivery_service_pb2 import \ + StartDeliveryRequest as StartDeliveryRequestPb +from nexus.hub.proto.delivery_service_pb2 import \ + StartDeliveryResponse as StartDeliveryResponsePb +from nexus.hub.proto.delivery_service_pb2_grpc import ( + DeliveryServicer, + add_DeliveryServicer_to_server, +) +from nexus.hub.user_manager import UserManager +from nexus.models.proto.operation_pb2 import \ + DocumentOperation as DocumentOperationPb +from nexus.models.proto.operation_pb2 import UpdateDocument as UpdateDocumentPb +from nexus.models.proto.typed_document_pb2 import \ + TypedDocument as TypedDocumentPb +from nexus.pylon.client import PylonClient +from nexus.pylon.exceptions import DownloadError +from nexus.pylon.proto.file_pb2 import FileResponse as FileResponsePb +from nexus.translations import t +from nexus.views.telegram import parse_typed_document_to_view +from nexus.views.telegram.common import close_button +from nexus.views.telegram.progress_bar import ( + ProgressBar, + ProgressBarLostMessageError, +) +from prometheus_client import Gauge + +from .base import ( + BaseHubService, + is_group_or_channel, +) + +downloads_gauge = Gauge('downloads_total', documentation='Currently downloading files') + + +async def operation_log(document_operation_pb): + logging.getLogger('operation').info(msg=MessageToDict(document_operation_pb)) + + +class DownloadTask: + def __init__( + self, + delivery_service, + request_context, + document_view, + session_id: str, + ): + self.delivery_service = delivery_service + self.request_context = request_context + self.document_view = document_view + self.session_id = session_id + self.task = None + + async def schedule(self): + self.task = asyncio.create_task( + self.download_task( + request_context=self.request_context, + document_view=self.document_view + ) + ) + + self.delivery_service.user_manager.add_task(self.request_context.chat.id, self.document_view.id) + self.delivery_service.downloadings.add(self) + + self.task.add_done_callback(self.done_callback) + + def done_callback(self, f): + self.delivery_service.downloadings.remove(self) + self.delivery_service.user_manager.remove_task( + self.request_context.chat.id, + self.document_view.id, + ) + + async def download_task(self, request_context: RequestContext, document_view): + throttle_secs = 2.0 + + async def _on_fail(): + await self.delivery_service.telegram_client.send_message( + request_context.chat.id, + t('MAINTENANCE', language=request_context.chat.language).format( + maintenance_picture_url=self.delivery_service.maintenance_picture_url + ), + buttons=[close_button()] + ) + async with safe_execution( + request_context=request_context, + on_fail=_on_fail, + ): + progress_bar_download = ProgressBar( + telegram_client=self.delivery_service.telegram_client, + request_context=request_context, + banner=t("LOOKING_AT", language=request_context.chat.language), + header=f'⬇️ {document_view.get_filename()}', + tail_text=t('TRANSMITTED_FROM', language=request_context.chat.language), + throttle_secs=throttle_secs, + ) + downloads_gauge.inc() + start_time = time.time() + try: + file = await self.download( + document_view=document_view, + progress_bar=progress_bar_download, + ) + if not file: + request_context.statbox( + action='missed', + duration=time.time() - start_time, + document_id=document_view.id, + ) + is_served_from_sharience = False + if self.delivery_service.is_sharience_enabled: + is_served_from_sharience = await self.try_sharience( + request_context=request_context, + document_view=document_view, + ) + if not is_served_from_sharience: + request_context.statbox( + action='not_found', + document_id=document_view.id, + duration=time.time() - start_time, + ) + await self.respond_not_found( + request_context=request_context, + document_view=document_view, + ) + return + else: + request_context.statbox( + action='downloaded', + duration=time.time() - start_time, + document_id=document_view.id, + len=len(file), + ) + + progress_bar_upload = ProgressBar( + telegram_client=self.delivery_service.telegram_client, + request_context=request_context, + message=progress_bar_download.message, + banner=t("LOOKING_AT", language=request_context.chat.language), + header=f'⬇️ {document_view.get_filename()}', + tail_text=t('UPLOADED_TO_TELEGRAM', language=request_context.chat.language), + throttle_secs=throttle_secs + ) + + uploaded_message = await self.delivery_service.send_file( + document_view=self.document_view, + file=file, + progress_callback=progress_bar_upload.callback, + request_context=self.request_context, + session_id=self.session_id, + voting=not is_group_or_channel(self.request_context.chat.id), + ) + request_context.statbox( + action='uploaded', + duration=time.time() - start_time, + document_id=document_view.id, + ) + if self.delivery_service.should_store_hashes: + asyncio.create_task(self.store_hashes( + document_view=document_view, + telegram_file_id=uploaded_message.file.id, + file=file, + )) + except DownloadError: + await self.external_cancel() + except ProgressBarLostMessageError: + self.request_context.statbox( + action='user_canceled', + duration=time.time() - start_time, + document_id=document_view.id, + ) + except asyncio.CancelledError: + pass + finally: + downloads_gauge.dec() + messages = filter_none([progress_bar_download.message]) + await self.delivery_service.telegram_client.delete_messages(request_context.chat.id, messages) + + async def process_resp(self, resp, progress_bar, collected, filesize): + progress_bar.set_source(get_fancy_name(resp.source)) + if resp.HasField('status'): + if resp.status == FileResponsePb.Status.RESOLVING: + await progress_bar.show_banner() + if resp.status == FileResponsePb.Status.BEGIN_TRANSMISSION: + collected.clear() + elif resp.HasField('chunk'): + collected.extend(resp.chunk.content) + await progress_bar.callback(len(collected), filesize) + + async def respond_not_found(self, request_context: RequestContext, document_view): + return await self.delivery_service.telegram_client.send_message( + request_context.chat.id, + t("SOURCES_UNAVAILABLE", language=request_context.chat.language).format( + document=document_view.get_robust_title() + ), + buttons=[close_button()] + ) + + async def try_sharience(self, request_context, document_view): + if document_view.doi: + request_context.statbox(action='try_sharience', doi=document_view.doi) + pg_data = await self.delivery_service.pool_holder.execute( + ''' + select sh.id, sh.telegram_file_id as vote_sum + from sharience as sh + left join votes as v + on sh.id = v.document_id + group by sh.id + having coalesce(sum(v.value), 0) > -1 + and sh.parent_id = %s + order by coalesce(sum(v.value), 0) desc; + ''', (document_view.id,), fetch=True) + for document_id, telegram_file_id in pg_data: + return await self.delivery_service.send_file( + document_id=document_id, + document_view=self.document_view, + file=telegram_file_id, + request_context=self.request_context, + session_id=self.session_id, + voting=True, + ) + + async def download(self, document_view, progress_bar): + collected = bytearray() + if document_view.doi: + try: + async for resp in self.delivery_service.pylon_client.by_doi( + doi=document_view.doi, + md5=document_view.md5, + ): + await self.process_resp( + resp=resp, + progress_bar=progress_bar, + collected=collected, + filesize=document_view.filesize, + ) + return bytes(collected) + except DownloadError: + pass + if document_view.md5: + try: + async for resp in self.delivery_service.pylon_client.by_md5(md5=document_view.md5): + await self.process_resp( + resp=resp, + progress_bar=progress_bar, + collected=collected, + filesize=document_view.filesize, + ) + return bytes(collected) + except DownloadError: + pass + + async def external_cancel(self): + self.task.cancel() + self.request_context.statbox(action='externally_canceled') + await self.delivery_service.telegram_client.send_message( + self.request_context.chat.id, + t("DOWNLOAD_CANCELED", language=self.request_context.chat.language).format( + document=self.document_view.get_robust_title() + ), + buttons=[close_button()] + ) + + async def store_hashes(self, document_view, telegram_file_id, file): + document_pb = document_view.document_pb + document_pb.telegram_file_id = telegram_file_id + document_pb.filesize = len(file) + if not document_pb.md5: + document_pb.md5 = hashlib.md5(file).hexdigest() + del document_pb.ipfs_multihashes[:] + document_pb.ipfs_multihashes.extend(await self.delivery_service.get_ipfs_hashes(file=file)) + + document_operation_pb = DocumentOperationPb( + update_document=UpdateDocumentPb( + fields=['filesize', 'ipfs_multihashes', 'md5', 'telegram_file_id'], + typed_document=TypedDocumentPb(**{document_view.schema: document_pb}), + ), + ) + await operation_log(document_operation_pb) + + +class DeliveryService(DeliveryServicer, BaseHubService): + def __init__( + self, + server: Server, + service_name: str, + bot_external_name: str, + ipfs_config: dict, + is_sharience_enabled: bool, + maintenance_picture_url: str, + pool_holder, + pylon_config: dict, + should_store_hashes: bool, + should_use_telegram_file_id: bool, + telegram_client: BaseTelegramClient, + ): + super().__init__( + service_name=service_name, + bot_external_name=bot_external_name, + ipfs_config=ipfs_config, + telegram_client=telegram_client, + ) + self.downloadings = set() + self.is_sharience_enabled = is_sharience_enabled + self.maintenance_picture_url = maintenance_picture_url + self.pool_holder = pool_holder + self.pylon_client = PylonClient( + proxy=pylon_config['proxy'], + resolve_proxy=pylon_config['resolve_proxy'], + ) + self.server = server + self.should_store_hashes = should_store_hashes + self.should_use_telegram_file_id = should_use_telegram_file_id + self.user_manager = UserManager() + self.waits.extend([self.pylon_client]) + + async def start(self): + add_DeliveryServicer_to_server(self, self.server) + + async def stop(self): + for download in set(self.downloadings): + await download.external_cancel() + await asyncio.gather(*map(lambda x: x.task, self.downloadings)) + await self.ipfs_client.close() + + @aiogrpc_request_wrapper(log=False) + async def start_delivery( + self, + request: StartDeliveryRequestPb, + context: ServicerContext, + metadata: dict, + ) -> StartDeliveryResponsePb: + request_context = RequestContext( + bot_name=self.service_name, + chat=request.chat, + request_id=metadata.get('request-id'), + ) + request_context.add_default_fields( + mode='start_delivery', + session_id=metadata.get('session-id'), + **self.get_default_service_fields(), + ) + document_view = parse_typed_document_to_view(request.typed_document) + cache_hit = self.should_use_telegram_file_id and document_view.telegram_file_id + if cache_hit: + try: + await self.send_file( + document_view=document_view, + file=document_view.telegram_file_id, + session_id=metadata.get('session-id'), + request_context=request_context, + voting=not is_group_or_channel(request_context.chat.id), + ) + request_context.statbox(action='cache_hit', document_id=document_view.id) + except ValueError: + cache_hit = False + if not cache_hit: + if self.user_manager.has_task(request.chat.id, document_view.id): + return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.ALREADY_DOWNLOADING) + if self.user_manager.hit_limits(request.chat.id): + return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.TOO_MANY_DOWNLOADS) + await DownloadTask( + delivery_service=self, + document_view=document_view, + request_context=request_context, + session_id=metadata.get('session-id'), + ).schedule() + return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.OK) diff --git a/nexus/hub/services/submitter.py b/nexus/hub/services/submitter.py new file mode 100644 index 0000000..556fa5c --- /dev/null +++ b/nexus/hub/services/submitter.py @@ -0,0 +1,188 @@ +import hashlib +import logging +import time + +from aiogrobid import GrobidClient +from aiogrobid.exceptions import BadRequestError +from grpc import ( + Server, + ServicerContext, +) +from izihawa_utils.pb_to_json import MessageToDict +from library.aiogrpctools.base import aiogrpc_request_wrapper +from library.telegram.base import ( + BaseTelegramClient, + RequestContext, +) +from nexus.hub.exceptions import ( + FileTooBigError, + UnavailableMetadataError, + UnparsableDoiError, +) +from nexus.hub.proto.submitter_service_pb2 import \ + SubmitRequest as SubmitRequestPb +from nexus.hub.proto.submitter_service_pb2 import \ + SubmitResponse as SubmitResponsePb +from nexus.hub.proto.submitter_service_pb2_grpc import ( + SubmitterServicer, + add_SubmitterServicer_to_server, +) +from nexus.hub.user_manager import UserManager +from nexus.meta_api.aioclient import MetaApiGrpcClient +from nexus.models.proto.operation_pb2 import \ + DocumentOperation as DocumentOperationPb +from nexus.models.proto.operation_pb2 import UpdateDocument as UpdateDocumentPb +from nexus.models.proto.sharience_pb2 import Sharience as ShariencePb +from nexus.models.proto.typed_document_pb2 import \ + TypedDocument as TypedDocumentPb +from nexus.translations import t +from nexus.views.telegram.common import close_button +from nexus.views.telegram.scimag import ScimagView +from telethon.extensions import BinaryReader + +from .base import BaseHubService + + +async def operation_log(document_operation_pb): + logging.getLogger('operation').info(msg=MessageToDict(document_operation_pb)) + + +class SubmitterService(SubmitterServicer, BaseHubService): + def __init__( + self, + server: Server, + service_name: str, + bot_external_name: str, + grobid_config: dict, + ipfs_config: dict, + meta_api_config: dict, + telegram_client: BaseTelegramClient, + ): + super().__init__( + service_name=service_name, + bot_external_name=bot_external_name, + ipfs_config=ipfs_config, + telegram_client=telegram_client, + ) + self.server = server + self.grobid_client = GrobidClient(base_url=grobid_config['url']) + self.meta_api_client = MetaApiGrpcClient(base_url=meta_api_config['url']) + self.telegram_client = telegram_client + self.bot_external_name = bot_external_name + self.user_manager = UserManager() + self.waits.extend([self.grobid_client, self.meta_api_client]) + + async def start(self): + add_SubmitterServicer_to_server(self, self.server) + + async def stop(self): + await self.ipfs_client.close() + + @aiogrpc_request_wrapper() + async def submit( + self, + request: SubmitRequestPb, + context: ServicerContext, + metadata: dict, + ) -> SubmitResponsePb: + session_id = metadata.get('session-id') + request_context = RequestContext( + bot_name=self.service_name, + chat=request.chat, + request_id=metadata.get('request-id'), + ) + request_context.add_default_fields( + mode='submit', + session_id=metadata.get('session-id'), + **self.get_default_service_fields(), + ) + + document = BinaryReader(request.telegram_document).tgread_object() + if document.size > 20 * 1024 * 1024: + request_context.error_log(FileTooBigError(size=document.size)) + await self.telegram_client.send_message( + request_context.chat.id, + t('FILE_TOO_BIG_ERROR', language=request_context.chat.language), + buttons=[close_button()], + ) + return SubmitResponsePb() + processing_message = await self.telegram_client.send_message( + request_context.chat.id, + t("PROCESSING_PAPER", language=request_context.chat.language).format( + filename=document.attributes[0].file_name, + ), + ) + try: + + file = await self.telegram_client.download_document(document=document, file=bytes) + + try: + processed_document = await self.grobid_client.process_fulltext_document(pdf_file=file) + except BadRequestError as e: + request_context.error_log(e) + await self.telegram_client.send_message( + request_context.chat.id, + t('UNPARSABLE_DOCUMENT_ERROR', language=request_context.chat.language), + buttons=[close_button()], + ) + return SubmitResponsePb() + + if not processed_document.get('doi'): + request_context.error_log(UnparsableDoiError()) + await self.telegram_client.send_message( + request_context.chat.id, + t('UNPARSABLE_DOI_ERROR', language=request_context.chat.language), + buttons=[close_button()], + ) + return SubmitResponsePb() + + search_response_pb = await self.meta_api_client.search( + schemas=('scimag',), + query=processed_document['doi'], + page=0, + page_size=1, + request_id=request_context.request_id, + session_id=session_id, + user_id=request_context.chat.id, + language=request_context.chat.language, + ) + + if len(search_response_pb.scored_documents) == 0: + request_context.error_log(UnavailableMetadataError(doi=processed_document['doi'])) + await self.telegram_client.send_message( + request_context.chat.id, + t( + 'UNAVAILABLE_METADATA_ERROR', + language=request_context.chat.language + ).format(doi=processed_document['doi']), + buttons=[close_button()], + ) + return SubmitResponsePb() + + document_view = ScimagView(search_response_pb.scored_documents[0].typed_document.scimag) + finally: + await processing_message.delete() + + uploaded_message = await self.send_file( + document_view=document_view, + file=file, + request_context=request_context, + session_id=session_id, + voting=False, + ) + + document_operation_pb = DocumentOperationPb( + update_document=UpdateDocumentPb( + typed_document=TypedDocumentPb(sharience=ShariencePb( + parent_id=document_view.id, + uploader_id=request_context.chat.id, + updated_at=int(time.time()), + md5=hashlib.md5(file).hexdigest(), + filesize=document.size, + ipfs_multihashes=await self.get_ipfs_hashes(file=file), + telegram_file_id=uploaded_message.file.id, + )), + ), + ) + await operation_log(document_operation_pb) + return SubmitResponsePb() diff --git a/nexus/hub/user_manager/__init__.py b/nexus/hub/user_manager/__init__.py new file mode 100644 index 0000000..c46ca64 --- /dev/null +++ b/nexus/hub/user_manager/__init__.py @@ -0,0 +1,3 @@ +from .user_manager import UserManager + +__all__ = ['UserManager'] diff --git a/nexus/hub/user_manager/user_manager.py b/nexus/hub/user_manager/user_manager.py new file mode 100644 index 0000000..ec56189 --- /dev/null +++ b/nexus/hub/user_manager/user_manager.py @@ -0,0 +1,22 @@ +class UserManager: + def __init__(self): + self.last_widget = {} + self.tasks = set() + self.limits = {} + + def add_task(self, user_id, id): + self.tasks.add((user_id, id)) + self.limits[user_id] = self.limits.get(user_id, 0) + 1 + + def remove_task(self, user_id, id): + try: + self.tasks.remove((user_id, id)) + self.limits[user_id] = self.limits.get(user_id, 1) - 1 + except ValueError: + pass + + def has_task(self, user_id, id): + return (user_id, id) in self.tasks + + def hit_limits(self, user_id): + return self.limits.get(user_id, 0) >= 3