diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java index 671b7ba..240794a 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java @@ -4,6 +4,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class KafkaClientBoundConsumer extends KafkaConsumer { + private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; private final String lane; private final String name; @@ -11,15 +12,15 @@ public class KafkaClientBoundConsumer extends KafkaConsumer { super(kafkaParameters); this.lane = lane; if (lane.isBlank()) { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + this.name = CODEC.getKafkaName(); } else { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + this.name = CODEC.getKafkaName() + "-" + lane; } } @Override public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.CLIENT_BOUND_EVENT; + return CODEC; } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java index bc05555..fe20e06 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java @@ -4,20 +4,22 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class KafkaClientBoundProducer extends KafkaProducer { + private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; + private final String name; public KafkaClientBoundProducer(KafkaParameters kafkaParameters, String lane) { super(kafkaParameters); if (lane.isBlank()) { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + this.name = CODEC.getKafkaName(); } else { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + this.name = CODEC.getKafkaName() + "-" + lane; } } @Override public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.CLIENT_BOUND_EVENT; + return CODEC; } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 970a816..028cc58 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -44,7 +44,7 @@ public abstract class KafkaProducer { var channelName = getChannelName(); return eventsFlux .>map(event -> - SenderRecord.create(new ProducerRecord<>(channelName, event), null)) + SenderRecord.create(new ProducerRecord<>("tdlib." + channelName, event), null)) .log("produce-messages-" + channelName, Level.FINEST, SignalType.REQUEST,