mirror of
https://github.com/nexus-stc/hyperboria
synced 2025-01-10 02:35:53 +01:00
24 lines
592 B
Python
24 lines
592 B
Python
|
import asyncio
|
||
|
from typing import Iterable
|
||
|
|
||
|
from aiokafka import AIOKafkaProducer
|
||
|
|
||
|
from .base import BaseSink
|
||
|
|
||
|
|
||
|
class KafkaSink(BaseSink):
|
||
|
def __init__(self, kafka_hosts: Iterable[str], topic_name: str):
|
||
|
super().__init__()
|
||
|
self.producer = AIOKafkaProducer(
|
||
|
loop=asyncio.get_event_loop(),
|
||
|
bootstrap_servers=kafka_hosts,
|
||
|
)
|
||
|
self.topic_name = topic_name
|
||
|
self.starts.append(self.producer)
|
||
|
|
||
|
async def send(self, data: bytes):
|
||
|
await self.producer.send_and_wait(
|
||
|
self.topic_name,
|
||
|
data,
|
||
|
)
|