mirror of
https://github.com/nexus-stc/hyperboria
synced 2025-01-25 18:07:34 +01:00
Merge pull request #18 from the-superpirate/master
- feat: Open Hub API
This commit is contained in:
commit
baa4f6e5f4
@ -3,9 +3,9 @@
|
||||
## Content
|
||||
|
||||
- β
[`actions`](actions) - shared code for ingesting data from external APIs (LibGen/CrossrefAPI)
|
||||
- π `bot` - telegram bot for Summa
|
||||
- β
[`bot`](bot) - telegram bot for Summa
|
||||
- β
[`cognitron`](cognitron) - bundled app for IPFS, search server and web frontend
|
||||
- π `hub` - downloading & sending
|
||||
- β
[`hub`](hub) - downloading & sending
|
||||
- β
[`ingest`](ingest) - retrieving metadata from external APIs and putting it onto Kafka
|
||||
- π `meta_api` - rescoring and merging API for Summa backends
|
||||
- β
[`models`](models) - shared Protobuf models
|
||||
@ -13,4 +13,4 @@
|
||||
- β
[`pipe`](pipe) - processing pipeline based on Kafka
|
||||
- β
[`pylon`](pylon) - smart client for downloading files from the Internet/IPFS
|
||||
- β
[`translations`](translations) - text translations used in `bot` and `hub`
|
||||
- π `views` - shared views for [`models`](models)
|
||||
- β
[`views`](views) - shared views for [`models`](models)
|
||||
|
72
nexus/hub/BUILD.bazel
Normal file
72
nexus/hub/BUILD.bazel
Normal file
@ -0,0 +1,72 @@
|
||||
load("@io_bazel_rules_docker//python3:image.bzl", "py3_image")
|
||||
load("@io_bazel_rules_docker//container:container.bzl", "container_push")
|
||||
load("@pip_modules//:requirements.bzl", "requirement")
|
||||
|
||||
alias(
|
||||
name = "binary",
|
||||
actual = ":image.binary",
|
||||
)
|
||||
|
||||
py3_image(
|
||||
name = "image",
|
||||
srcs = glob(
|
||||
["**/*.py"],
|
||||
exclude = ["proto/**"],
|
||||
),
|
||||
base = "//images/production:base-python-image",
|
||||
data = [
|
||||
"configs/base.yaml",
|
||||
"configs/development.yaml",
|
||||
"configs/logging.yaml",
|
||||
"configs/production.yaml",
|
||||
"configs/testing.yaml",
|
||||
],
|
||||
main = "main.py",
|
||||
srcs_version = "PY3ONLY",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
requirement("aiodns"),
|
||||
requirement("aiohttp"),
|
||||
requirement("aiohttp_socks"),
|
||||
requirement("aioipfs"),
|
||||
requirement("cchardet"),
|
||||
requirement("orjson"),
|
||||
requirement("prometheus-client"),
|
||||
requirement("psycopg2-binary"),
|
||||
requirement("python-socks"),
|
||||
requirement("tenacity"),
|
||||
requirement("uvloop"),
|
||||
"//idm/api2/proto:idm_proto_py",
|
||||
requirement("giogrobid"),
|
||||
"//library/aiogrpctools",
|
||||
requirement("aiokit"),
|
||||
"//library/aiopostgres",
|
||||
"//library/configurator",
|
||||
"//library/telegram",
|
||||
"//nexus/hub/proto:hub_grpc_py",
|
||||
"//nexus/hub/proto:hub_proto_py",
|
||||
"//nexus/meta_api/aioclient",
|
||||
"//nexus/models/proto:models_proto_py",
|
||||
"//nexus/pylon",
|
||||
"//nexus/views/telegram",
|
||||
],
|
||||
)
|
||||
|
||||
container_push(
|
||||
name = "push-latest",
|
||||
format = "Docker",
|
||||
image = ":image",
|
||||
registry = "registry.example.com",
|
||||
repository = "nexus-hub",
|
||||
tag = "latest",
|
||||
)
|
||||
|
||||
container_push(
|
||||
name = "push-testing",
|
||||
format = "Docker",
|
||||
image = ":image",
|
||||
registry = "registry.example.com",
|
||||
repository = "nexus-hub",
|
||||
tag = "testing",
|
||||
)
|
||||
|
66
nexus/hub/README.md
Normal file
66
nexus/hub/README.md
Normal file
@ -0,0 +1,66 @@
|
||||
# Nexus Search: Hub API
|
||||
|
||||
`Hub` is a daemon responsible for retrieving files and sending them to users. This version has cut `configs`
|
||||
subdirectory due to hard reliance of configs on the network infrastructure you are using.
|
||||
You have to write your own configs taking example below into account.
|
||||
|
||||
The bot requires two other essential parts:
|
||||
- Postgres Database
|
||||
- IPFS Daemon
|
||||
|
||||
or their substitutions
|
||||
|
||||
## Sample `configs/base.yaml`
|
||||
|
||||
```yaml
|
||||
---
|
||||
|
||||
application:
|
||||
# Look at the special Postgres `sharience` table to retrieve user-sent files
|
||||
is_sharience_enabled: true
|
||||
maintenance_picture_url:
|
||||
# Used in logging
|
||||
service_name: nexus-hub
|
||||
# Store file hashes into operation log
|
||||
should_store_hashes: true
|
||||
database:
|
||||
database: nexus
|
||||
host:
|
||||
password: '{{ DATABASE_PASSWORD }}'
|
||||
username: '{{ DATABASE_USERNAME }}'
|
||||
grobid:
|
||||
url:
|
||||
grpc:
|
||||
# Listen address
|
||||
address: 0.0.0.0
|
||||
# Listen port
|
||||
port: 9090
|
||||
ipfs:
|
||||
address:
|
||||
port: 4001
|
||||
log_path: '/var/log/nexus-hub/{{ ENV_TYPE }}'
|
||||
meta_api:
|
||||
url:
|
||||
pylon:
|
||||
# Proxy used in `pylon` retriever to download files
|
||||
proxy: socks5://127.0.0.1:9050
|
||||
# Proxy used in `pylon` retriever to get metadata
|
||||
resolve_proxy: socks5://127.0.0.1:9050
|
||||
telegram:
|
||||
# Telegram App Hash from https://my.telegram.org/
|
||||
app_hash: '{{ APP_HASH }}'
|
||||
# Telegram App ID from https://my.telegram.org/
|
||||
app_id: 00000
|
||||
# External bot name shown in messages to users
|
||||
bot_external_name: libgen_scihub_bot
|
||||
# Internal bot name used in logging
|
||||
bot_name: nexus-bot
|
||||
bot_token: '{{ BOT_TOKEN }}'
|
||||
# Telethon database for keeping cache
|
||||
database:
|
||||
session_id: nexus-hub
|
||||
# Frequency of updating downloading progress
|
||||
progress_throttle_seconds: 5
|
||||
# Send files using stored telegram_file_id
|
||||
should_use_telegram_file_id: true
|
||||
```
|
0
nexus/hub/__init__.py
Normal file
0
nexus/hub/__init__.py
Normal file
13
nexus/hub/configs/__init__.py
Normal file
13
nexus/hub/configs/__init__.py
Normal file
@ -0,0 +1,13 @@
|
||||
from izihawa_utils import env
|
||||
from library.configurator import Configurator
|
||||
|
||||
|
||||
def get_config():
|
||||
return Configurator([
|
||||
'nexus/hub/configs/base.yaml',
|
||||
'nexus/hub/configs/%s.yaml?' % env.type,
|
||||
'nexus/hub/configs/logging.yaml',
|
||||
])
|
||||
|
||||
|
||||
config = get_config()
|
13
nexus/hub/exceptions.py
Normal file
13
nexus/hub/exceptions.py
Normal file
@ -0,0 +1,13 @@
|
||||
from izihawa_utils.exceptions import BaseError
|
||||
|
||||
|
||||
class FileTooBigError(BaseError):
|
||||
code = 'file_too_big_error'
|
||||
|
||||
|
||||
class UnavailableMetadataError(BaseError):
|
||||
code = 'unavailable_metadata_error'
|
||||
|
||||
|
||||
class UnparsableDoiError(BaseError):
|
||||
code = 'unparsable_doi_error'
|
8
nexus/hub/fancy_names.py
Normal file
8
nexus/hub/fancy_names.py
Normal file
@ -0,0 +1,8 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
fancy_names = {
|
||||
}
|
||||
|
||||
|
||||
def get_fancy_name(url):
|
||||
return fancy_names.get(urlparse(url).netloc.lower(), 'Saturn Rings')
|
68
nexus/hub/main.py
Normal file
68
nexus/hub/main.py
Normal file
@ -0,0 +1,68 @@
|
||||
import asyncio
|
||||
|
||||
import uvloop
|
||||
from library.aiogrpctools import AioGrpcServer
|
||||
from library.aiopostgres import AioPostgresPoolHolder
|
||||
from library.configurator import Configurator
|
||||
from library.logging import configure_logging
|
||||
from library.telegram.base import BaseTelegramClient
|
||||
from nexus.hub.configs import get_config
|
||||
from nexus.hub.services.delivery import DeliveryService
|
||||
from nexus.hub.services.submitter import SubmitterService
|
||||
|
||||
|
||||
class GrpcServer(AioGrpcServer):
|
||||
def __init__(self, config: Configurator):
|
||||
super().__init__(address=config['grpc']['address'], port=config['grpc']['port'])
|
||||
self.pool_holder = AioPostgresPoolHolder(
|
||||
dsn=f'dbname={config["database"]["database"]} '
|
||||
f'user={config["database"]["username"]} '
|
||||
f'password={config["database"]["password"]} '
|
||||
f'host={config["database"]["host"]}',
|
||||
timeout=30,
|
||||
pool_recycle=60,
|
||||
maxsize=4,
|
||||
)
|
||||
self.telegram_client = BaseTelegramClient(
|
||||
app_id=config['telegram']['app_id'],
|
||||
app_hash=config['telegram']['app_hash'],
|
||||
bot_token=config['telegram']['bot_token'],
|
||||
database=config['telegram'].get('database'),
|
||||
mtproxy=config['telegram'].get('mtproxy'),
|
||||
)
|
||||
self.delivery_service = DeliveryService(
|
||||
server=self.server,
|
||||
service_name=config['application']['service_name'],
|
||||
bot_external_name=config['telegram']['bot_external_name'],
|
||||
ipfs_config=config['ipfs'],
|
||||
is_sharience_enabled=config['application']['is_sharience_enabled'],
|
||||
maintenance_picture_url=config['application'].get('maintenance_picture_url', ''),
|
||||
pool_holder=self.pool_holder,
|
||||
pylon_config=config['pylon'],
|
||||
should_store_hashes=config['application']['should_store_hashes'],
|
||||
should_use_telegram_file_id=config['telegram']['should_use_telegram_file_id'],
|
||||
telegram_client=self.telegram_client,
|
||||
)
|
||||
self.submitter_service = SubmitterService(
|
||||
server=self.server,
|
||||
service_name=config['application']['service_name'],
|
||||
bot_external_name=config['telegram']['bot_external_name'],
|
||||
grobid_config=config['grobid'],
|
||||
ipfs_config=config['ipfs'],
|
||||
meta_api_config=config['meta_api'],
|
||||
telegram_client=self.telegram_client,
|
||||
)
|
||||
self.waits.append(self.pool_holder)
|
||||
self.starts.extend([self.telegram_client, self.delivery_service, self.submitter_service])
|
||||
|
||||
|
||||
def main():
|
||||
config = get_config()
|
||||
configure_logging(config)
|
||||
uvloop.install()
|
||||
grpc_server = GrpcServer(config)
|
||||
asyncio.get_event_loop().run_until_complete(grpc_server.start_and_wait())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
82
nexus/hub/services/base.py
Normal file
82
nexus/hub/services/base.py
Normal file
@ -0,0 +1,82 @@
|
||||
import asyncio
|
||||
|
||||
from aioipfs import AsyncIPFS
|
||||
from library.aiogrpctools.base import BaseService
|
||||
from nexus.views.telegram.common import vote_button
|
||||
from telethon.errors import rpcerrorlist
|
||||
from telethon.tl.types import DocumentAttributeFilename
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
)
|
||||
|
||||
|
||||
def is_group_or_channel(chat_id: int):
|
||||
return chat_id < 0
|
||||
|
||||
|
||||
class BaseHubService(BaseService):
|
||||
def __init__(self, service_name: str, bot_external_name: str, ipfs_config: dict, telegram_client):
|
||||
super().__init__(service_name=service_name)
|
||||
self.bot_external_name = bot_external_name
|
||||
self.ipfs_client = AsyncIPFS(host=ipfs_config['address'], port=ipfs_config['port'])
|
||||
self.telegram_client = telegram_client
|
||||
|
||||
async def get_ipfs_hashes(self, file):
|
||||
return list(map(
|
||||
lambda x: x['Hash'],
|
||||
await asyncio.gather(
|
||||
self.ipfs_client.add_bytes(file, cid_version=1, hash='blake2b-256', only_hash=True),
|
||||
self.ipfs_client.add_bytes(file, cid_version=0, hash='sha2-256', only_hash=True),
|
||||
)
|
||||
))
|
||||
|
||||
@retry(
|
||||
reraise=True,
|
||||
stop=stop_after_attempt(3),
|
||||
retry=retry_if_exception_type((rpcerrorlist.TimeoutError, ValueError)),
|
||||
)
|
||||
async def send_file(
|
||||
self,
|
||||
document_view,
|
||||
file,
|
||||
request_context,
|
||||
session_id,
|
||||
document_id=None,
|
||||
voting=True,
|
||||
progress_callback=None,
|
||||
):
|
||||
if document_id is None:
|
||||
document_id = document_view.id
|
||||
buttons = None
|
||||
if voting:
|
||||
buttons = [
|
||||
vote_button(
|
||||
case='broken',
|
||||
document_id=document_id,
|
||||
language=request_context.chat.language,
|
||||
session_id=session_id,
|
||||
),
|
||||
vote_button(
|
||||
case='ok',
|
||||
document_id=document_id,
|
||||
language=request_context.chat.language,
|
||||
session_id=session_id,
|
||||
),
|
||||
]
|
||||
message = await self.telegram_client.send_file(
|
||||
attributes=[DocumentAttributeFilename(document_view.get_filename())],
|
||||
buttons=buttons,
|
||||
caption=f"{document_view.generate_body(language=request_context.chat.language, limit=512)}\n"
|
||||
f"@{self.bot_external_name}",
|
||||
entity=request_context.chat.id,
|
||||
file=file,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
request_context.statbox(
|
||||
action='sent',
|
||||
document_id=document_id,
|
||||
voting=voting,
|
||||
)
|
||||
return message
|
385
nexus/hub/services/delivery.py
Normal file
385
nexus/hub/services/delivery.py
Normal file
@ -0,0 +1,385 @@
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
|
||||
from grpc import (
|
||||
Server,
|
||||
ServicerContext,
|
||||
)
|
||||
from izihawa_utils.common import filter_none
|
||||
from izihawa_utils.pb_to_json import MessageToDict
|
||||
from library.aiogrpctools.base import aiogrpc_request_wrapper
|
||||
from library.telegram.base import (
|
||||
BaseTelegramClient,
|
||||
RequestContext,
|
||||
)
|
||||
from library.telegram.utils import safe_execution
|
||||
from nexus.hub.fancy_names import get_fancy_name
|
||||
from nexus.hub.proto.delivery_service_pb2 import \
|
||||
StartDeliveryRequest as StartDeliveryRequestPb
|
||||
from nexus.hub.proto.delivery_service_pb2 import \
|
||||
StartDeliveryResponse as StartDeliveryResponsePb
|
||||
from nexus.hub.proto.delivery_service_pb2_grpc import (
|
||||
DeliveryServicer,
|
||||
add_DeliveryServicer_to_server,
|
||||
)
|
||||
from nexus.hub.user_manager import UserManager
|
||||
from nexus.models.proto.operation_pb2 import \
|
||||
DocumentOperation as DocumentOperationPb
|
||||
from nexus.models.proto.operation_pb2 import UpdateDocument as UpdateDocumentPb
|
||||
from nexus.models.proto.typed_document_pb2 import \
|
||||
TypedDocument as TypedDocumentPb
|
||||
from nexus.pylon.client import PylonClient
|
||||
from nexus.pylon.exceptions import DownloadError
|
||||
from nexus.pylon.proto.file_pb2 import FileResponse as FileResponsePb
|
||||
from nexus.translations import t
|
||||
from nexus.views.telegram import parse_typed_document_to_view
|
||||
from nexus.views.telegram.common import close_button
|
||||
from nexus.views.telegram.progress_bar import (
|
||||
ProgressBar,
|
||||
ProgressBarLostMessageError,
|
||||
)
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from .base import (
|
||||
BaseHubService,
|
||||
is_group_or_channel,
|
||||
)
|
||||
|
||||
downloads_gauge = Gauge('downloads_total', documentation='Currently downloading files')
|
||||
|
||||
|
||||
async def operation_log(document_operation_pb):
|
||||
logging.getLogger('operation').info(msg=MessageToDict(document_operation_pb))
|
||||
|
||||
|
||||
class DownloadTask:
|
||||
def __init__(
|
||||
self,
|
||||
delivery_service,
|
||||
request_context,
|
||||
document_view,
|
||||
session_id: str,
|
||||
):
|
||||
self.delivery_service = delivery_service
|
||||
self.request_context = request_context
|
||||
self.document_view = document_view
|
||||
self.session_id = session_id
|
||||
self.task = None
|
||||
|
||||
async def schedule(self):
|
||||
self.task = asyncio.create_task(
|
||||
self.download_task(
|
||||
request_context=self.request_context,
|
||||
document_view=self.document_view
|
||||
)
|
||||
)
|
||||
|
||||
self.delivery_service.user_manager.add_task(self.request_context.chat.id, self.document_view.id)
|
||||
self.delivery_service.downloadings.add(self)
|
||||
|
||||
self.task.add_done_callback(self.done_callback)
|
||||
|
||||
def done_callback(self, f):
|
||||
self.delivery_service.downloadings.remove(self)
|
||||
self.delivery_service.user_manager.remove_task(
|
||||
self.request_context.chat.id,
|
||||
self.document_view.id,
|
||||
)
|
||||
|
||||
async def download_task(self, request_context: RequestContext, document_view):
|
||||
throttle_secs = 2.0
|
||||
|
||||
async def _on_fail():
|
||||
await self.delivery_service.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t('MAINTENANCE', language=request_context.chat.language).format(
|
||||
maintenance_picture_url=self.delivery_service.maintenance_picture_url
|
||||
),
|
||||
buttons=[close_button()]
|
||||
)
|
||||
async with safe_execution(
|
||||
request_context=request_context,
|
||||
on_fail=_on_fail,
|
||||
):
|
||||
progress_bar_download = ProgressBar(
|
||||
telegram_client=self.delivery_service.telegram_client,
|
||||
request_context=request_context,
|
||||
banner=t("LOOKING_AT", language=request_context.chat.language),
|
||||
header=f'β¬οΈ {document_view.get_filename()}',
|
||||
tail_text=t('TRANSMITTED_FROM', language=request_context.chat.language),
|
||||
throttle_secs=throttle_secs,
|
||||
)
|
||||
downloads_gauge.inc()
|
||||
start_time = time.time()
|
||||
try:
|
||||
file = await self.download(
|
||||
document_view=document_view,
|
||||
progress_bar=progress_bar_download,
|
||||
)
|
||||
if not file:
|
||||
request_context.statbox(
|
||||
action='missed',
|
||||
duration=time.time() - start_time,
|
||||
document_id=document_view.id,
|
||||
)
|
||||
is_served_from_sharience = False
|
||||
if self.delivery_service.is_sharience_enabled:
|
||||
is_served_from_sharience = await self.try_sharience(
|
||||
request_context=request_context,
|
||||
document_view=document_view,
|
||||
)
|
||||
if not is_served_from_sharience:
|
||||
request_context.statbox(
|
||||
action='not_found',
|
||||
document_id=document_view.id,
|
||||
duration=time.time() - start_time,
|
||||
)
|
||||
await self.respond_not_found(
|
||||
request_context=request_context,
|
||||
document_view=document_view,
|
||||
)
|
||||
return
|
||||
else:
|
||||
request_context.statbox(
|
||||
action='downloaded',
|
||||
duration=time.time() - start_time,
|
||||
document_id=document_view.id,
|
||||
len=len(file),
|
||||
)
|
||||
|
||||
progress_bar_upload = ProgressBar(
|
||||
telegram_client=self.delivery_service.telegram_client,
|
||||
request_context=request_context,
|
||||
message=progress_bar_download.message,
|
||||
banner=t("LOOKING_AT", language=request_context.chat.language),
|
||||
header=f'β¬οΈ {document_view.get_filename()}',
|
||||
tail_text=t('UPLOADED_TO_TELEGRAM', language=request_context.chat.language),
|
||||
throttle_secs=throttle_secs
|
||||
)
|
||||
|
||||
uploaded_message = await self.delivery_service.send_file(
|
||||
document_view=self.document_view,
|
||||
file=file,
|
||||
progress_callback=progress_bar_upload.callback,
|
||||
request_context=self.request_context,
|
||||
session_id=self.session_id,
|
||||
voting=not is_group_or_channel(self.request_context.chat.id),
|
||||
)
|
||||
request_context.statbox(
|
||||
action='uploaded',
|
||||
duration=time.time() - start_time,
|
||||
document_id=document_view.id,
|
||||
)
|
||||
if self.delivery_service.should_store_hashes:
|
||||
asyncio.create_task(self.store_hashes(
|
||||
document_view=document_view,
|
||||
telegram_file_id=uploaded_message.file.id,
|
||||
file=file,
|
||||
))
|
||||
except DownloadError:
|
||||
await self.external_cancel()
|
||||
except ProgressBarLostMessageError:
|
||||
self.request_context.statbox(
|
||||
action='user_canceled',
|
||||
duration=time.time() - start_time,
|
||||
document_id=document_view.id,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
downloads_gauge.dec()
|
||||
messages = filter_none([progress_bar_download.message])
|
||||
await self.delivery_service.telegram_client.delete_messages(request_context.chat.id, messages)
|
||||
|
||||
async def process_resp(self, resp, progress_bar, collected, filesize):
|
||||
progress_bar.set_source(get_fancy_name(resp.source))
|
||||
if resp.HasField('status'):
|
||||
if resp.status == FileResponsePb.Status.RESOLVING:
|
||||
await progress_bar.show_banner()
|
||||
if resp.status == FileResponsePb.Status.BEGIN_TRANSMISSION:
|
||||
collected.clear()
|
||||
elif resp.HasField('chunk'):
|
||||
collected.extend(resp.chunk.content)
|
||||
await progress_bar.callback(len(collected), filesize)
|
||||
|
||||
async def respond_not_found(self, request_context: RequestContext, document_view):
|
||||
return await self.delivery_service.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t("SOURCES_UNAVAILABLE", language=request_context.chat.language).format(
|
||||
document=document_view.get_robust_title()
|
||||
),
|
||||
buttons=[close_button()]
|
||||
)
|
||||
|
||||
async def try_sharience(self, request_context, document_view):
|
||||
if document_view.doi:
|
||||
request_context.statbox(action='try_sharience', doi=document_view.doi)
|
||||
pg_data = await self.delivery_service.pool_holder.execute(
|
||||
'''
|
||||
select sh.id, sh.telegram_file_id as vote_sum
|
||||
from sharience as sh
|
||||
left join votes as v
|
||||
on sh.id = v.document_id
|
||||
group by sh.id
|
||||
having coalesce(sum(v.value), 0) > -1
|
||||
and sh.parent_id = %s
|
||||
order by coalesce(sum(v.value), 0) desc;
|
||||
''', (document_view.id,), fetch=True)
|
||||
for document_id, telegram_file_id in pg_data:
|
||||
return await self.delivery_service.send_file(
|
||||
document_id=document_id,
|
||||
document_view=self.document_view,
|
||||
file=telegram_file_id,
|
||||
request_context=self.request_context,
|
||||
session_id=self.session_id,
|
||||
voting=True,
|
||||
)
|
||||
|
||||
async def download(self, document_view, progress_bar):
|
||||
collected = bytearray()
|
||||
if document_view.doi:
|
||||
try:
|
||||
async for resp in self.delivery_service.pylon_client.by_doi(
|
||||
doi=document_view.doi,
|
||||
md5=document_view.md5,
|
||||
):
|
||||
await self.process_resp(
|
||||
resp=resp,
|
||||
progress_bar=progress_bar,
|
||||
collected=collected,
|
||||
filesize=document_view.filesize,
|
||||
)
|
||||
return bytes(collected)
|
||||
except DownloadError:
|
||||
pass
|
||||
if document_view.md5:
|
||||
try:
|
||||
async for resp in self.delivery_service.pylon_client.by_md5(md5=document_view.md5):
|
||||
await self.process_resp(
|
||||
resp=resp,
|
||||
progress_bar=progress_bar,
|
||||
collected=collected,
|
||||
filesize=document_view.filesize,
|
||||
)
|
||||
return bytes(collected)
|
||||
except DownloadError:
|
||||
pass
|
||||
|
||||
async def external_cancel(self):
|
||||
self.task.cancel()
|
||||
self.request_context.statbox(action='externally_canceled')
|
||||
await self.delivery_service.telegram_client.send_message(
|
||||
self.request_context.chat.id,
|
||||
t("DOWNLOAD_CANCELED", language=self.request_context.chat.language).format(
|
||||
document=self.document_view.get_robust_title()
|
||||
),
|
||||
buttons=[close_button()]
|
||||
)
|
||||
|
||||
async def store_hashes(self, document_view, telegram_file_id, file):
|
||||
document_pb = document_view.document_pb
|
||||
document_pb.telegram_file_id = telegram_file_id
|
||||
document_pb.filesize = len(file)
|
||||
if not document_pb.md5:
|
||||
document_pb.md5 = hashlib.md5(file).hexdigest()
|
||||
del document_pb.ipfs_multihashes[:]
|
||||
document_pb.ipfs_multihashes.extend(await self.delivery_service.get_ipfs_hashes(file=file))
|
||||
|
||||
document_operation_pb = DocumentOperationPb(
|
||||
update_document=UpdateDocumentPb(
|
||||
fields=['filesize', 'ipfs_multihashes', 'md5', 'telegram_file_id'],
|
||||
typed_document=TypedDocumentPb(**{document_view.schema: document_pb}),
|
||||
),
|
||||
)
|
||||
await operation_log(document_operation_pb)
|
||||
|
||||
|
||||
class DeliveryService(DeliveryServicer, BaseHubService):
|
||||
def __init__(
|
||||
self,
|
||||
server: Server,
|
||||
service_name: str,
|
||||
bot_external_name: str,
|
||||
ipfs_config: dict,
|
||||
is_sharience_enabled: bool,
|
||||
maintenance_picture_url: str,
|
||||
pool_holder,
|
||||
pylon_config: dict,
|
||||
should_store_hashes: bool,
|
||||
should_use_telegram_file_id: bool,
|
||||
telegram_client: BaseTelegramClient,
|
||||
):
|
||||
super().__init__(
|
||||
service_name=service_name,
|
||||
bot_external_name=bot_external_name,
|
||||
ipfs_config=ipfs_config,
|
||||
telegram_client=telegram_client,
|
||||
)
|
||||
self.downloadings = set()
|
||||
self.is_sharience_enabled = is_sharience_enabled
|
||||
self.maintenance_picture_url = maintenance_picture_url
|
||||
self.pool_holder = pool_holder
|
||||
self.pylon_client = PylonClient(
|
||||
proxy=pylon_config['proxy'],
|
||||
resolve_proxy=pylon_config['resolve_proxy'],
|
||||
)
|
||||
self.server = server
|
||||
self.should_store_hashes = should_store_hashes
|
||||
self.should_use_telegram_file_id = should_use_telegram_file_id
|
||||
self.user_manager = UserManager()
|
||||
self.waits.extend([self.pylon_client])
|
||||
|
||||
async def start(self):
|
||||
add_DeliveryServicer_to_server(self, self.server)
|
||||
|
||||
async def stop(self):
|
||||
for download in set(self.downloadings):
|
||||
await download.external_cancel()
|
||||
await asyncio.gather(*map(lambda x: x.task, self.downloadings))
|
||||
await self.ipfs_client.close()
|
||||
|
||||
@aiogrpc_request_wrapper(log=False)
|
||||
async def start_delivery(
|
||||
self,
|
||||
request: StartDeliveryRequestPb,
|
||||
context: ServicerContext,
|
||||
metadata: dict,
|
||||
) -> StartDeliveryResponsePb:
|
||||
request_context = RequestContext(
|
||||
bot_name=self.service_name,
|
||||
chat=request.chat,
|
||||
request_id=metadata.get('request-id'),
|
||||
)
|
||||
request_context.add_default_fields(
|
||||
mode='start_delivery',
|
||||
session_id=metadata.get('session-id'),
|
||||
**self.get_default_service_fields(),
|
||||
)
|
||||
document_view = parse_typed_document_to_view(request.typed_document)
|
||||
cache_hit = self.should_use_telegram_file_id and document_view.telegram_file_id
|
||||
if cache_hit:
|
||||
try:
|
||||
await self.send_file(
|
||||
document_view=document_view,
|
||||
file=document_view.telegram_file_id,
|
||||
session_id=metadata.get('session-id'),
|
||||
request_context=request_context,
|
||||
voting=not is_group_or_channel(request_context.chat.id),
|
||||
)
|
||||
request_context.statbox(action='cache_hit', document_id=document_view.id)
|
||||
except ValueError:
|
||||
cache_hit = False
|
||||
if not cache_hit:
|
||||
if self.user_manager.has_task(request.chat.id, document_view.id):
|
||||
return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.ALREADY_DOWNLOADING)
|
||||
if self.user_manager.hit_limits(request.chat.id):
|
||||
return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.TOO_MANY_DOWNLOADS)
|
||||
await DownloadTask(
|
||||
delivery_service=self,
|
||||
document_view=document_view,
|
||||
request_context=request_context,
|
||||
session_id=metadata.get('session-id'),
|
||||
).schedule()
|
||||
return StartDeliveryResponsePb(status=StartDeliveryResponsePb.Status.OK)
|
188
nexus/hub/services/submitter.py
Normal file
188
nexus/hub/services/submitter.py
Normal file
@ -0,0 +1,188 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
|
||||
from aiogrobid import GrobidClient
|
||||
from aiogrobid.exceptions import BadRequestError
|
||||
from grpc import (
|
||||
Server,
|
||||
ServicerContext,
|
||||
)
|
||||
from izihawa_utils.pb_to_json import MessageToDict
|
||||
from library.aiogrpctools.base import aiogrpc_request_wrapper
|
||||
from library.telegram.base import (
|
||||
BaseTelegramClient,
|
||||
RequestContext,
|
||||
)
|
||||
from nexus.hub.exceptions import (
|
||||
FileTooBigError,
|
||||
UnavailableMetadataError,
|
||||
UnparsableDoiError,
|
||||
)
|
||||
from nexus.hub.proto.submitter_service_pb2 import \
|
||||
SubmitRequest as SubmitRequestPb
|
||||
from nexus.hub.proto.submitter_service_pb2 import \
|
||||
SubmitResponse as SubmitResponsePb
|
||||
from nexus.hub.proto.submitter_service_pb2_grpc import (
|
||||
SubmitterServicer,
|
||||
add_SubmitterServicer_to_server,
|
||||
)
|
||||
from nexus.hub.user_manager import UserManager
|
||||
from nexus.meta_api.aioclient import MetaApiGrpcClient
|
||||
from nexus.models.proto.operation_pb2 import \
|
||||
DocumentOperation as DocumentOperationPb
|
||||
from nexus.models.proto.operation_pb2 import UpdateDocument as UpdateDocumentPb
|
||||
from nexus.models.proto.sharience_pb2 import Sharience as ShariencePb
|
||||
from nexus.models.proto.typed_document_pb2 import \
|
||||
TypedDocument as TypedDocumentPb
|
||||
from nexus.translations import t
|
||||
from nexus.views.telegram.common import close_button
|
||||
from nexus.views.telegram.scimag import ScimagView
|
||||
from telethon.extensions import BinaryReader
|
||||
|
||||
from .base import BaseHubService
|
||||
|
||||
|
||||
async def operation_log(document_operation_pb):
|
||||
logging.getLogger('operation').info(msg=MessageToDict(document_operation_pb))
|
||||
|
||||
|
||||
class SubmitterService(SubmitterServicer, BaseHubService):
|
||||
def __init__(
|
||||
self,
|
||||
server: Server,
|
||||
service_name: str,
|
||||
bot_external_name: str,
|
||||
grobid_config: dict,
|
||||
ipfs_config: dict,
|
||||
meta_api_config: dict,
|
||||
telegram_client: BaseTelegramClient,
|
||||
):
|
||||
super().__init__(
|
||||
service_name=service_name,
|
||||
bot_external_name=bot_external_name,
|
||||
ipfs_config=ipfs_config,
|
||||
telegram_client=telegram_client,
|
||||
)
|
||||
self.server = server
|
||||
self.grobid_client = GrobidClient(base_url=grobid_config['url'])
|
||||
self.meta_api_client = MetaApiGrpcClient(base_url=meta_api_config['url'])
|
||||
self.telegram_client = telegram_client
|
||||
self.bot_external_name = bot_external_name
|
||||
self.user_manager = UserManager()
|
||||
self.waits.extend([self.grobid_client, self.meta_api_client])
|
||||
|
||||
async def start(self):
|
||||
add_SubmitterServicer_to_server(self, self.server)
|
||||
|
||||
async def stop(self):
|
||||
await self.ipfs_client.close()
|
||||
|
||||
@aiogrpc_request_wrapper()
|
||||
async def submit(
|
||||
self,
|
||||
request: SubmitRequestPb,
|
||||
context: ServicerContext,
|
||||
metadata: dict,
|
||||
) -> SubmitResponsePb:
|
||||
session_id = metadata.get('session-id')
|
||||
request_context = RequestContext(
|
||||
bot_name=self.service_name,
|
||||
chat=request.chat,
|
||||
request_id=metadata.get('request-id'),
|
||||
)
|
||||
request_context.add_default_fields(
|
||||
mode='submit',
|
||||
session_id=metadata.get('session-id'),
|
||||
**self.get_default_service_fields(),
|
||||
)
|
||||
|
||||
document = BinaryReader(request.telegram_document).tgread_object()
|
||||
if document.size > 20 * 1024 * 1024:
|
||||
request_context.error_log(FileTooBigError(size=document.size))
|
||||
await self.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t('FILE_TOO_BIG_ERROR', language=request_context.chat.language),
|
||||
buttons=[close_button()],
|
||||
)
|
||||
return SubmitResponsePb()
|
||||
processing_message = await self.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t("PROCESSING_PAPER", language=request_context.chat.language).format(
|
||||
filename=document.attributes[0].file_name,
|
||||
),
|
||||
)
|
||||
try:
|
||||
|
||||
file = await self.telegram_client.download_document(document=document, file=bytes)
|
||||
|
||||
try:
|
||||
processed_document = await self.grobid_client.process_fulltext_document(pdf_file=file)
|
||||
except BadRequestError as e:
|
||||
request_context.error_log(e)
|
||||
await self.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t('UNPARSABLE_DOCUMENT_ERROR', language=request_context.chat.language),
|
||||
buttons=[close_button()],
|
||||
)
|
||||
return SubmitResponsePb()
|
||||
|
||||
if not processed_document.get('doi'):
|
||||
request_context.error_log(UnparsableDoiError())
|
||||
await self.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t('UNPARSABLE_DOI_ERROR', language=request_context.chat.language),
|
||||
buttons=[close_button()],
|
||||
)
|
||||
return SubmitResponsePb()
|
||||
|
||||
search_response_pb = await self.meta_api_client.search(
|
||||
schemas=('scimag',),
|
||||
query=processed_document['doi'],
|
||||
page=0,
|
||||
page_size=1,
|
||||
request_id=request_context.request_id,
|
||||
session_id=session_id,
|
||||
user_id=request_context.chat.id,
|
||||
language=request_context.chat.language,
|
||||
)
|
||||
|
||||
if len(search_response_pb.scored_documents) == 0:
|
||||
request_context.error_log(UnavailableMetadataError(doi=processed_document['doi']))
|
||||
await self.telegram_client.send_message(
|
||||
request_context.chat.id,
|
||||
t(
|
||||
'UNAVAILABLE_METADATA_ERROR',
|
||||
language=request_context.chat.language
|
||||
).format(doi=processed_document['doi']),
|
||||
buttons=[close_button()],
|
||||
)
|
||||
return SubmitResponsePb()
|
||||
|
||||
document_view = ScimagView(search_response_pb.scored_documents[0].typed_document.scimag)
|
||||
finally:
|
||||
await processing_message.delete()
|
||||
|
||||
uploaded_message = await self.send_file(
|
||||
document_view=document_view,
|
||||
file=file,
|
||||
request_context=request_context,
|
||||
session_id=session_id,
|
||||
voting=False,
|
||||
)
|
||||
|
||||
document_operation_pb = DocumentOperationPb(
|
||||
update_document=UpdateDocumentPb(
|
||||
typed_document=TypedDocumentPb(sharience=ShariencePb(
|
||||
parent_id=document_view.id,
|
||||
uploader_id=request_context.chat.id,
|
||||
updated_at=int(time.time()),
|
||||
md5=hashlib.md5(file).hexdigest(),
|
||||
filesize=document.size,
|
||||
ipfs_multihashes=await self.get_ipfs_hashes(file=file),
|
||||
telegram_file_id=uploaded_message.file.id,
|
||||
)),
|
||||
),
|
||||
)
|
||||
await operation_log(document_operation_pb)
|
||||
return SubmitResponsePb()
|
3
nexus/hub/user_manager/__init__.py
Normal file
3
nexus/hub/user_manager/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .user_manager import UserManager
|
||||
|
||||
__all__ = ['UserManager']
|
22
nexus/hub/user_manager/user_manager.py
Normal file
22
nexus/hub/user_manager/user_manager.py
Normal file
@ -0,0 +1,22 @@
|
||||
class UserManager:
|
||||
def __init__(self):
|
||||
self.last_widget = {}
|
||||
self.tasks = set()
|
||||
self.limits = {}
|
||||
|
||||
def add_task(self, user_id, id):
|
||||
self.tasks.add((user_id, id))
|
||||
self.limits[user_id] = self.limits.get(user_id, 0) + 1
|
||||
|
||||
def remove_task(self, user_id, id):
|
||||
try:
|
||||
self.tasks.remove((user_id, id))
|
||||
self.limits[user_id] = self.limits.get(user_id, 1) - 1
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def has_task(self, user_id, id):
|
||||
return (user_id, id) in self.tasks
|
||||
|
||||
def hit_limits(self, user_id):
|
||||
return self.limits.get(user_id, 0) >= 3
|
Loadingβ¦
x
Reference in New Issue
Block a user