From 2b2e690da4702d8bc8b1bf99ec6faac7808c8b5f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 22 Sep 2022 16:26:55 +0200 Subject: [PATCH] Code cleanup --- .../it/tdlight/reactiveapi/KafkaClientBoundConsumer.java | 7 ++++--- .../it/tdlight/reactiveapi/KafkaClientBoundProducer.java | 8 +++++--- src/main/java/it/tdlight/reactiveapi/KafkaProducer.java | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java index 671b7ba..240794a 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java @@ -4,6 +4,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class KafkaClientBoundConsumer extends KafkaConsumer { + private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; private final String lane; private final String name; @@ -11,15 +12,15 @@ public class KafkaClientBoundConsumer extends KafkaConsumer { super(kafkaParameters); this.lane = lane; if (lane.isBlank()) { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + this.name = CODEC.getKafkaName(); } else { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + this.name = CODEC.getKafkaName() + "-" + lane; } } @Override public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.CLIENT_BOUND_EVENT; + return CODEC; } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java index bc05555..fe20e06 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java @@ -4,20 +4,22 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class KafkaClientBoundProducer extends KafkaProducer { + private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; + private final String name; public KafkaClientBoundProducer(KafkaParameters kafkaParameters, String lane) { super(kafkaParameters); if (lane.isBlank()) { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + this.name = CODEC.getKafkaName(); } else { - this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + this.name = CODEC.getKafkaName() + "-" + lane; } } @Override public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.CLIENT_BOUND_EVENT; + return CODEC; } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 970a816..028cc58 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -44,7 +44,7 @@ public abstract class KafkaProducer { var channelName = getChannelName(); return eventsFlux .>map(event -> - SenderRecord.create(new ProducerRecord<>(channelName, event), null)) + SenderRecord.create(new ProducerRecord<>("tdlib." + channelName, event), null)) .log("produce-messages-" + channelName, Level.FINEST, SignalType.REQUEST,