mirror of
https://github.com/nexus-stc/hyperboria
synced 2024-12-26 03:25:49 +01:00
8472f27ec5
GitOrigin-RevId: ddf02e70d2827c048db49b687ebbcdcc67807ca6
24 lines
592 B
Python
24 lines
592 B
Python
import asyncio
|
|
from typing import Iterable
|
|
|
|
from aiokafka import AIOKafkaProducer
|
|
|
|
from .base import BaseSink
|
|
|
|
|
|
class KafkaSink(BaseSink):
|
|
def __init__(self, kafka_hosts: Iterable[str], topic_name: str):
|
|
super().__init__()
|
|
self.producer = AIOKafkaProducer(
|
|
loop=asyncio.get_event_loop(),
|
|
bootstrap_servers=kafka_hosts,
|
|
)
|
|
self.topic_name = topic_name
|
|
self.starts.append(self.producer)
|
|
|
|
async def send(self, data: bytes):
|
|
await self.producer.send_and_wait(
|
|
self.topic_name,
|
|
data,
|
|
)
|