mirror of
https://github.com/nexus-stc/hyperboria
synced 2025-02-16 03:56:47 +01:00
- [nexus] Remove outdated protos - [nexus] Development - [nexus] Development - [nexus] Development - [nexus] Development - [nexus] Development - [nexus] Refactor views - [nexus] Update aiosumma - [nexus] Add tags - [nexus] Development - [nexus] Update repository - [nexus] Update repository - [nexus] Update dependencies - [nexus] Update dependencies - [nexus] Fixes for MetaAPI - [nexus] Support for new queries - [nexus] Adopt new versions of search - [nexus] Improving Nexus - [nexus] Various fixes - [nexus] Add profile - [nexus] Fixes for ingestion - [nexus] Refactorings and bugfixes - [idm] Add profile methods - [nexus] Fix stalled nexus-meta bugs - [nexus] Various bugfixes - [nexus] Restore IDM API functionality GitOrigin-RevId: a0842345a6dde5b321279ab5510a50c0def0e71a
49 lines
1.4 KiB
Python
49 lines
1.4 KiB
Python
from typing import (
|
|
Any,
|
|
AsyncIterable,
|
|
Iterable,
|
|
Union,
|
|
)
|
|
|
|
from aiokit import AioRootThing
|
|
from izihawa_utils.importlib import import_object
|
|
|
|
from ..sinks.base import BaseSink
|
|
|
|
|
|
class BaseJob(AioRootThing):
|
|
name = None
|
|
|
|
def __init__(self, actions: Iterable[dict], sinks: Iterable[Union[dict, BaseSink]]):
|
|
super().__init__()
|
|
real_sinks = []
|
|
for sink in sinks:
|
|
if isinstance(sink, BaseSink):
|
|
real_sinks.append(sink)
|
|
else:
|
|
real_sinks.append(import_object(sink['class'])(**sink.get('kwargs', {})))
|
|
self.sinks = real_sinks
|
|
|
|
real_actions = []
|
|
for action in actions:
|
|
real_actions.append(import_object(action['class'])(**action.get('kwargs', {})))
|
|
self.actions = real_actions
|
|
|
|
self.starts.extend(self.sinks)
|
|
self.starts.extend(self.actions)
|
|
|
|
async def iterator(self) -> AsyncIterable[Any]:
|
|
raise NotImplementedError()
|
|
|
|
async def action_iterator(self) -> AsyncIterable[Any]:
|
|
async for item in self.iterator():
|
|
processed_item = item
|
|
for action in self.actions:
|
|
processed_item = await action.do(processed_item)
|
|
yield processed_item
|
|
|
|
async def start(self):
|
|
async for data in self.action_iterator():
|
|
for sink in self.sinks:
|
|
await sink.send(data)
|