mirror of
https://github.com/nexus-stc/hyperboria
synced 2025-01-02 06:55:49 +01:00
48 lines
1.4 KiB
Python
48 lines
1.4 KiB
Python
|
from typing import (
|
||
|
Any,
|
||
|
AsyncIterable,
|
||
|
Iterable,
|
||
|
)
|
||
|
|
||
|
from aiokit import AioRootThing
|
||
|
from izihawa_utils.importlib import import_object
|
||
|
|
||
|
from ..sinks.base import BaseSink
|
||
|
|
||
|
|
||
|
class BaseJob(AioRootThing):
|
||
|
name = None
|
||
|
|
||
|
def __init__(self, actions: Iterable[dict], sinks: Iterable[dict]):
|
||
|
super().__init__()
|
||
|
real_sinks = []
|
||
|
for sink in sinks:
|
||
|
if isinstance(sink, BaseSink):
|
||
|
real_sinks.append(sink)
|
||
|
else:
|
||
|
real_sinks.append(import_object(sink['class'])(**sink.get('kwargs', {})))
|
||
|
self.sinks = real_sinks
|
||
|
|
||
|
real_actions = []
|
||
|
for action in actions:
|
||
|
real_actions.append(import_object(action['class'])(**action.get('kwargs', {})))
|
||
|
self.actions = real_actions
|
||
|
|
||
|
self.waits.extend(self.sinks)
|
||
|
self.waits.extend(self.actions)
|
||
|
|
||
|
async def iterator(self) -> AsyncIterable[Any]:
|
||
|
raise NotImplementedError()
|
||
|
|
||
|
async def action_iterator(self) -> AsyncIterable[Any]:
|
||
|
async for item in self.iterator():
|
||
|
processed_item = item
|
||
|
for action in self.actions:
|
||
|
processed_item = await action.do(processed_item)
|
||
|
yield processed_item
|
||
|
|
||
|
async def start(self):
|
||
|
async for data in self.action_iterator():
|
||
|
for sink in self.sinks:
|
||
|
await sink.send(data)
|