From 3bed3052d0837fe48863a139f4e814b407dc75fa Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 7 Oct 2022 16:03:51 +0200 Subject: [PATCH] Remove required kafka dependency, zero-copy deserialization --- pom.xml | 2 +- .../reactiveapi/AtomixReactiveApi.java | 2 +- .../BaseAtomixReactiveApiClient.java | 4 +- .../it/tdlight/reactiveapi/ChannelCodec.java | 2 - .../ClientBoundEventDeserializer.java | 13 ++++- .../ClientBoundEventSerializer.java | 13 ++++- .../it/tdlight/reactiveapi/Deserializer.java | 22 +++++++++ .../reactiveapi/ReactiveApiPublisher.java | 14 ++++-- .../it/tdlight/reactiveapi/ReactorUtils.java | 23 +++++++-- .../reactiveapi/SerializationException.java | 32 +++++++++++++ .../it/tdlight/reactiveapi/Serializer.java | 27 +++++++++++ .../reactiveapi/TdlibChannelsSharedHost.java | 2 +- .../TdlibChannelsSharedReceive.java | 2 +- .../reactiveapi/TdlibDeserializer.java | 14 ++++-- .../reactiveapi/TdlibRequestDeserializer.java | 47 ++++++++++--------- .../reactiveapi/TdlibRequestSerializer.java | 45 ++++++++++-------- .../TdlibResponseDeserializer.java | 32 +++++++------ .../reactiveapi/TdlibResponseSerializer.java | 32 +++++++------ .../tdlight/reactiveapi/TdlibSerializer.java | 18 ++++--- .../java/it/tdlight/reactiveapi/UtfCodec.java | 16 ++----- .../reactiveapi/kafka/KafkaConsumer.java | 5 +- .../reactiveapi/kafka/KafkaDeserializer.java | 39 +++++++++++++++ .../reactiveapi/kafka/KafkaProducer.java | 3 +- .../reactiveapi/kafka/KafkaSerializer.java | 38 +++++++++++++++ .../rsocket/ConsumerConnection.java | 2 +- .../reactiveapi/rsocket/MyRSocketClient.java | 4 +- .../reactiveapi/rsocket/MyRSocketServer.java | 4 +- .../reactiveapi/rsocket/RSocketUtils.java | 33 ++++++++++--- src/main/java/module-info.java | 2 +- 29 files changed, 362 insertions(+), 130 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/Deserializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/SerializationException.java create mode 100644 src/main/java/it/tdlight/reactiveapi/Serializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/kafka/KafkaDeserializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/kafka/KafkaSerializer.java diff --git a/pom.xml b/pom.xml index cb1f5ab..2eac0c1 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,7 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml - 2.13.4 + 2.14.0-rc1 jakarta.xml.bind diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index d45112d..8daf1c3 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -147,7 +147,7 @@ public class AtomixReactiveApi implements ReactiveApi { .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex)); } - })).transform(ReactorUtils::subscribeOnce); + })).transform(ReactorUtils::subscribeOnceUntilUnsubscribe); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 7860437..b9dcbff 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -19,6 +19,7 @@ import it.tdlight.reactiveapi.Event.OnUpdateData; import it.tdlight.reactiveapi.Event.OnUpdateError; import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.time.Duration; @@ -28,7 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +117,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { } } - static @NotNull ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException { + static @NotNull ClientBoundEvent deserializeEvent(DataInput is) throws IOException { var userId = is.readLong(); var dataVersion = is.readInt(); if (dataVersion != SERIAL_VERSION) { diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java index da307fe..08291c6 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java @@ -1,8 +1,6 @@ package it.tdlight.reactiveapi; import java.lang.reflect.InvocationTargetException; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; public class ChannelCodec { public static final ChannelCodec CLIENT_BOUND_EVENT = new ChannelCodec(ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class); diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java index 4d1d34e..7c9f356 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java @@ -1,15 +1,24 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import org.apache.kafka.common.serialization.Deserializer; +import java.io.DataInput; +import java.io.IOException; public class ClientBoundEventDeserializer implements Deserializer { @Override - public ClientBoundEvent deserialize(String topic, byte[] data) { + public ClientBoundEvent deserialize(byte[] data) { if (data == null || data.length == 0) { return null; } return LiveAtomixReactiveApiClient.deserializeEvent(data); } + + @Override + public ClientBoundEvent deserialize(int length, DataInput dataInput) throws IOException { + if (dataInput == null || length == 0) { + return null; + } + return LiveAtomixReactiveApiClient.deserializeEvent(dataInput); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java index 48d8d2d..264a58a 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java @@ -1,15 +1,24 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import org.apache.kafka.common.serialization.Serializer; +import java.io.DataOutput; +import java.io.IOException; public class ClientBoundEventSerializer implements Serializer { @Override - public byte[] serialize(String topic, ClientBoundEvent data) { + public byte[] serialize(ClientBoundEvent data) { if (data == null) { return null; } return ReactiveApiPublisher.serializeEvent(data); } + + @Override + public void serialize(ClientBoundEvent data, DataOutput output) throws IOException { + if (data == null) { + return; + } + ReactiveApiPublisher.writeClientBoundEvent(data, output); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/Deserializer.java b/src/main/java/it/tdlight/reactiveapi/Deserializer.java new file mode 100644 index 0000000..18c9488 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/Deserializer.java @@ -0,0 +1,22 @@ +package it.tdlight.reactiveapi; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + +public interface Deserializer { + + default T deserialize(byte[] data) throws IOException { + var bais = new ByteArrayInputStream(data); + return deserialize(data.length, new DataInputStream(bais)); + } + + default T deserialize(int length, DataInput dataInput) throws IOException { + byte[] data = new byte[length]; + dataInput.readFully(data); + return deserialize(data); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 1f66219..13959f1 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -38,6 +38,7 @@ import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed; import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; import it.tdlight.tdlight.ClientManager; import java.io.ByteArrayOutputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -51,7 +52,6 @@ import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; @@ -431,7 +431,7 @@ public abstract class ReactiveApiPublisher { } } - private static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutputStream dataOutputStream) + public static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutput dataOutputStream) throws IOException { dataOutputStream.writeLong(clientBoundEvent.userId()); dataOutputStream.writeInt(SERIAL_VERSION); @@ -511,9 +511,13 @@ public abstract class ReactiveApiPublisher { @Override public void onNext(Object responseObj) { - r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(), - onRequestObj.requestId(), - userId, responseObj)); + try { + r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(), + onRequestObj.requestId(), + userId, responseObj)); + } catch (Throwable ex) { + onError(ex); + } } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java index c94ebcb..e958259 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -1,14 +1,9 @@ package it.tdlight.reactiveapi; -import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; -import java.util.List; -import java.util.Objects; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import org.jetbrains.annotations.NotNull; @@ -36,6 +31,15 @@ public class ReactorUtils { }); } + public static Flux subscribeOnceUntilUnsubscribe(Flux f) { + AtomicBoolean subscribed = new AtomicBoolean(); + return f.doOnSubscribe(s -> { + if (!subscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Can't subscribe more than once!"); + } + }).doFinally(s -> subscribed.set(false)); + } + public static Mono subscribeOnce(Mono f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { @@ -45,6 +49,15 @@ public class ReactorUtils { }); } + public static Mono subscribeOnceUntilUnsubscribe(Mono f) { + AtomicBoolean subscribed = new AtomicBoolean(); + return f.doOnSubscribe(s -> { + if (!subscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Can't subscribe more than once!"); + } + }).doFinally(s -> subscribed.set(false)); + } + public static Flux createLastestSubscriptionFlux(Flux upstream, int maxBufferSize) { return upstream.transform(parent -> { AtomicReference subscriptionAtomicReference = new AtomicReference<>(); diff --git a/src/main/java/it/tdlight/reactiveapi/SerializationException.java b/src/main/java/it/tdlight/reactiveapi/SerializationException.java new file mode 100644 index 0000000..a699876 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/SerializationException.java @@ -0,0 +1,32 @@ +package it.tdlight.reactiveapi; + +/** + * Any exception during serialization in the producer + */ +public class SerializationException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(String message) { + super(message); + } + + public SerializationException(Throwable cause) { + super(cause); + } + + public SerializationException() { + super(); + } + + /* avoid the expensive and useless stack trace for serialization exceptions */ + @Override + public Throwable fillInStackTrace() { + return this; + } + +} \ No newline at end of file diff --git a/src/main/java/it/tdlight/reactiveapi/Serializer.java b/src/main/java/it/tdlight/reactiveapi/Serializer.java new file mode 100644 index 0000000..3fa7cfe --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/Serializer.java @@ -0,0 +1,27 @@ +package it.tdlight.reactiveapi; + +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import java.io.Closeable; +import java.util.Map; + +public interface Serializer { + + default byte[] serialize(T data) throws IOException { + try (var baos = new FastByteArrayOutputStream()) { + try (var daos = new DataOutputStream(baos)) { + serialize(data, daos); + baos.trim(); + return baos.array; + } + } + } + + default void serialize(T data, DataOutput output) throws IOException { + output.write(serialize(data)); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index a82d820..c48f4f0 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -51,7 +51,7 @@ public class TdlibChannelsSharedHost implements Closeable { public TdlibChannelsSharedHost(Set allLanes, TdlibChannelsServers tdServersChannels) { this.tdServersChannels = tdServersChannels; this.responsesSub = Mono.defer(() -> tdServersChannels.response() - .sendMessages(responses.asFlux().log("responses", Level.FINE))) + .sendMessages(responses.asFlux()/*.log("responses", Level.FINE)*/)) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java index 8ba7d82..ad2e4d6 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java @@ -43,7 +43,7 @@ public class TdlibChannelsSharedReceive implements Closeable { this.tdClientsChannels = tdClientsChannels; this.responses = Flux .defer(() -> tdClientsChannels.response().consumeMessages()) - .log("responses", Level.FINE) + //.log("responses", Level.FINE) .repeatWhen(REPEAT_STRATEGY) .retryWhen(RETRY_STRATEGY) .publish() diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java index 2c8d53d..395ec5d 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java @@ -5,25 +5,29 @@ import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; public class TdlibDeserializer implements Deserializer { @Override - public Object deserialize(String topic, byte[] data) { + public Object deserialize(byte[] data) { if (data.length == 0) { return null; } var bais = new ByteArrayInputStream(data); var dais = new DataInputStream(bais); + return deserialize(-1, dais); + } + + @Override + public Object deserialize(int length, DataInput dataInput) { try { - if (dais.readInt() != SERIAL_VERSION) { + if (dataInput.readInt() != SERIAL_VERSION) { return new TdApi.Error(400, "Conflicting protocol version"); } - return TdApi.Deserializer.deserialize(dais); + return TdApi.Deserializer.deserialize(dataInput); } catch (IOException e) { throw new SerializationException("Failed to deserialize TDLib object", e); } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java index cf53703..720b7b1 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java @@ -8,45 +8,48 @@ import it.tdlight.reactiveapi.Event.OnRequest.InvalidRequest; import it.tdlight.reactiveapi.Event.OnRequest.Request; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; public class TdlibRequestDeserializer implements Deserializer> { @Override - public OnRequest deserialize(String topic, byte[] data) { + public OnRequest deserialize(byte[] data) { if (data.length == 0) { return null; } try { var bais = new ByteArrayInputStream(data); var dais = new DataInputStream(bais); - var userId = dais.readLong(); - var clientId = dais.readLong(); - var requestId = dais.readLong(); - if (dais.readInt() != SERIAL_VERSION) { - // Deprecated request - return new InvalidRequest<>(userId, clientId, requestId); - } else { - long millis = dais.readLong(); - Instant timeout; - if (millis == -1) { - timeout = Instant.ofEpochMilli(Long.MAX_VALUE); - } else { - timeout = Instant.ofEpochMilli(millis); - } - @SuppressWarnings("unchecked") - TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dais); - return new Request<>(userId, clientId, requestId, request, timeout); - } + return deserialize(-1, dais); } catch (UnsupportedOperationException | IOException e) { throw new SerializationException(e); } } + + @Override + public OnRequest deserialize(int length, DataInput dataInput) throws IOException { + var userId = dataInput.readLong(); + var clientId = dataInput.readLong(); + var requestId = dataInput.readLong(); + if (dataInput.readInt() != SERIAL_VERSION) { + // Deprecated request + return new InvalidRequest<>(userId, clientId, requestId); + } else { + long millis = dataInput.readLong(); + Instant timeout; + if (millis == -1) { + timeout = Instant.ofEpochMilli(Long.MAX_VALUE); + } else { + timeout = Instant.ofEpochMilli(millis); + } + @SuppressWarnings("unchecked") + TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dataInput); + return new Request<>(userId, clientId, requestId, request, timeout); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java index a6f1704..1d2d2c5 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java @@ -6,41 +6,25 @@ import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnRequest.Request; import java.io.ByteArrayOutputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; public class TdlibRequestSerializer implements Serializer> { private static final Instant INFINITE_TIMEOUT = Instant.now().plus(100_000, ChronoUnit.DAYS); @Override - public byte[] serialize(String topic, OnRequest data) { + public byte[] serialize(OnRequest data) { try { if (data == null) { return new byte[0]; } else { try(var baos = new ByteArrayOutputStream()) { try (var daos = new DataOutputStream(baos)) { - daos.writeLong(data.userId()); - daos.writeLong(data.clientId()); - daos.writeLong(data.requestId()); - daos.writeInt(SERIAL_VERSION); - if (data instanceof OnRequest.Request request) { - if (request.timeout() == Instant.MAX || request.timeout().compareTo(INFINITE_TIMEOUT) >= 0) { - daos.writeLong(-1); - } else { - daos.writeLong(request.timeout().toEpochMilli()); - } - request.request().serialize(daos); - } else if (data instanceof OnRequest.InvalidRequest) { - daos.writeLong(-2); - } else { - throw new SerializationException("Unknown request type: " + daos.getClass()); - } + serialize(data, daos); daos.flush(); return baos.toByteArray(); } @@ -50,4 +34,27 @@ public class TdlibRequestSerializer implements Serialize throw new SerializationException("Failed to serialize TDLib object", e); } } + + @Override + public void serialize(OnRequest data, DataOutput dataOutput) throws IOException { + if (data == null) { + return; + } + dataOutput.writeLong(data.userId()); + dataOutput.writeLong(data.clientId()); + dataOutput.writeLong(data.requestId()); + dataOutput.writeInt(SERIAL_VERSION); + if (data instanceof OnRequest.Request request) { + if (request.timeout() == Instant.MAX || request.timeout().compareTo(INFINITE_TIMEOUT) >= 0) { + dataOutput.writeLong(-1); + } else { + dataOutput.writeLong(request.timeout().toEpochMilli()); + } + request.request().serialize(dataOutput); + } else if (data instanceof OnRequest.InvalidRequest) { + dataOutput.writeLong(-2); + } else { + throw new SerializationException("Unknown request type: " + dataOutput.getClass()); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibResponseDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibResponseDeserializer.java index 8a43ddd..0361a44 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibResponseDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibResponseDeserializer.java @@ -7,35 +7,39 @@ import it.tdlight.reactiveapi.Event.OnResponse; import it.tdlight.reactiveapi.Event.OnResponse.InvalidResponse; import it.tdlight.reactiveapi.Event.OnResponse.Response; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.time.Instant; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; public class TdlibResponseDeserializer implements Deserializer> { @Override - public OnResponse deserialize(String topic, byte[] data) { + public OnResponse deserialize(byte[] data) { if (data.length == 0) { return null; } try { var bais = new ByteArrayInputStream(data); var dais = new DataInputStream(bais); - var clientId = dais.readLong(); - var requestId = dais.readLong(); - var userId = dais.readLong(); - if (dais.readInt() != SERIAL_VERSION) { - // Deprecated response - return new InvalidResponse<>(clientId, requestId, userId); - } else { - @SuppressWarnings("unchecked") - T response = (T) TdApi.Deserializer.deserialize(dais); - return new Response<>(clientId, requestId, userId, response); - } + return deserialize(-1, dais); } catch (UnsupportedOperationException | IOException e) { throw new SerializationException(e); } } + + @Override + public OnResponse deserialize(int length, DataInput dataInput) throws IOException { + var clientId = dataInput.readLong(); + var requestId = dataInput.readLong(); + var userId = dataInput.readLong(); + if (dataInput.readInt() != SERIAL_VERSION) { + // Deprecated response + return new InvalidResponse<>(clientId, requestId, userId); + } else { + @SuppressWarnings("unchecked") + T response = (T) TdApi.Deserializer.deserialize(dataInput); + return new Response<>(clientId, requestId, userId, response); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibResponseSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibResponseSerializer.java index 741c0e7..3f02d48 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibResponseSerializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibResponseSerializer.java @@ -6,34 +6,23 @@ import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.OnResponse; import it.tdlight.reactiveapi.Event.OnResponse.Response; import java.io.ByteArrayOutputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; public class TdlibResponseSerializer implements Serializer> { @Override - public byte[] serialize(String topic, OnResponse data) { + public byte[] serialize(OnResponse data) { try { if (data == null) { return new byte[0]; } else { try(var baos = new ByteArrayOutputStream()) { try (var daos = new DataOutputStream(baos)) { - daos.writeLong(data.clientId()); - daos.writeLong(data.requestId()); - daos.writeLong(data.userId()); - daos.writeInt(SERIAL_VERSION); - if (data instanceof Response response) { - response.response().serialize(daos); - } else if (data instanceof OnResponse.InvalidResponse) { - daos.writeLong(-2); - } else { - throw new SerializationException("Unknown response type: " + daos.getClass()); - } + serialize(data, daos); daos.flush(); return baos.toByteArray(); } @@ -43,4 +32,19 @@ public class TdlibResponseSerializer implements Serializ throw new SerializationException("Failed to serialize TDLib object", e); } } + + @Override + public void serialize(OnResponse data, DataOutput dataOutput) throws IOException { + dataOutput.writeLong(data.clientId()); + dataOutput.writeLong(data.requestId()); + dataOutput.writeLong(data.userId()); + dataOutput.writeInt(SERIAL_VERSION); + if (data instanceof Response response) { + response.response().serialize(dataOutput); + } else if (data instanceof OnResponse.InvalidResponse) { + dataOutput.writeLong(-2); + } else { + throw new SerializationException("Unknown response type: " + dataOutput.getClass()); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibSerializer.java index 9636226..cc8d433 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibSerializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibSerializer.java @@ -3,27 +3,25 @@ package it.tdlight.reactiveapi; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; public class TdlibSerializer implements Serializer { @Override - public byte[] serialize(String topic, TdApi.Object data) { + public byte[] serialize(TdApi.Object data) { try { if (data == null) { return new byte[0]; } else { try(var baos = new ByteArrayOutputStream()) { try (var daos = new DataOutputStream(baos)) { - daos.writeInt(SERIAL_VERSION); - data.serialize(daos); + serialize(data, daos); daos.flush(); return baos.toByteArray(); } @@ -34,4 +32,12 @@ public class TdlibSerializer implements Serializer { } } + @Override + public void serialize(Object data, DataOutput output) throws IOException { + if (data == null) { + return; + } + output.writeInt(SERIAL_VERSION); + data.serialize(output); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/UtfCodec.java b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java index 5485655..4d25ef1 100644 --- a/src/main/java/it/tdlight/reactiveapi/UtfCodec.java +++ b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java @@ -1,28 +1,20 @@ package it.tdlight.reactiveapi; +import java.io.DataInput; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; public class UtfCodec implements Serializer, Deserializer { @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public String deserialize(String topic, byte[] data) { + public String deserialize(byte[] data) { return new String(data, StandardCharsets.UTF_8); } @Override - public byte[] serialize(String topic, String data) { + public byte[] serialize(String data) { return data.getBytes(StandardCharsets.UTF_8); } - @Override - public void close() { - } } diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java index 9d8daf7..5456409 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java @@ -60,7 +60,8 @@ public final class KafkaConsumer implements EventConsumer { props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelCodec().getDeserializerClass()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaDeserializer.class); + props.put("custom.deserializer.class", getChannelCodec().getDeserializerClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis())); if (!isQuickResponse()) { @@ -141,6 +142,6 @@ public final class KafkaConsumer implements EventConsumer { }) .transform(this::retryIfCleanup) .transform(this::retryIfCommitFailed) - .transform(ReactorUtils::subscribeOnce); + .transform(ReactorUtils::subscribeOnceUntilUnsubscribe); } } diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaDeserializer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaDeserializer.java new file mode 100644 index 0000000..f02b127 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaDeserializer.java @@ -0,0 +1,39 @@ +package it.tdlight.reactiveapi.kafka; + +import it.tdlight.reactiveapi.Deserializer; +import it.tdlight.reactiveapi.SerializationException; +import it.tdlight.reactiveapi.Serializer; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; + +public class KafkaDeserializer implements Deserializer, org.apache.kafka.common.serialization.Deserializer { + + private Deserializer deserializer; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + var clazz = (Class) configs.get("custom.deserializer.class"); + try { + this.deserializer = (Deserializer) clazz.getConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + return deserializer.deserialize(data); + } catch (IOException e) { + throw new SerializationException(e); + } + } + + @Override + public T deserialize(byte[] data) throws IOException { + return deserializer.deserialize(data); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java index f76e403..4da8354 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java @@ -37,7 +37,8 @@ public final class KafkaProducer implements EventProducer { props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getChannelCodec().getSerializerClass()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class); + props.put("custom.serializer.class", getChannelCodec().getSerializerClass()); SenderOptions senderOptions = SenderOptions.create(props); sender = KafkaSender.create(senderOptions.maxInFlight(1024)); diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaSerializer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaSerializer.java new file mode 100644 index 0000000..57695b2 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaSerializer.java @@ -0,0 +1,38 @@ +package it.tdlight.reactiveapi.kafka; + +import it.tdlight.reactiveapi.Serializer; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; + +public class KafkaSerializer implements Serializer, org.apache.kafka.common.serialization.Serializer { + + private Serializer serializer; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + var clazz = (Class) configs.get("custom.serializer.class"); + try { + this.serializer = (Serializer) clazz.getConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] serialize(String topic, T data) { + try { + return serializer.serialize(data); + } catch (IOException e) { + throw new SerializationException(e); + } + } + + @Override + public byte[] serialize(T data) throws IOException { + return serializer.serialize(data); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java index 0fc58b6..ad1a21e 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -1,10 +1,10 @@ package it.tdlight.reactiveapi.rsocket; import io.rsocket.Payload; +import it.tdlight.reactiveapi.Deserializer; import it.tdlight.reactiveapi.Timestamped; import java.time.Duration; import java.util.Optional; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java index 10b44ff..1b41400 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java @@ -10,16 +10,16 @@ import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.Deserializer; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Serializer; import it.tdlight.reactiveapi.SimpleEventProducer; import it.tdlight.reactiveapi.Timestamped; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java index 39d2e4d..18ab1f9 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -11,14 +11,14 @@ import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.Deserializer; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Serializer; import it.tdlight.reactiveapi.Timestamped; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java index 51d200d..bc5c14f 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java @@ -1,23 +1,42 @@ package it.tdlight.reactiveapi.rsocket; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import io.rsocket.Payload; import io.rsocket.util.DefaultPayload; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import it.tdlight.reactiveapi.Deserializer; +import it.tdlight.reactiveapi.Serializer; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import reactor.core.publisher.Flux; public class RSocketUtils { public static Flux deserialize(Flux payloadFlux, Deserializer deserializer) { return payloadFlux.map(payload -> { - var slice = payload.sliceData(); - byte[] bytes = new byte[slice.readableBytes()]; - slice.readBytes(bytes, 0, bytes.length); - return deserializer.deserialize(null, bytes); + try { + try (var bis = new ByteBufInputStream(payload.sliceData())) { + return deserializer.deserialize(payload.data().readableBytes(), bis); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } }); } public static Flux serialize(Flux flux, Serializer serializer) { - return flux.map(element -> DefaultPayload.create(serializer.serialize(null, element))); + return flux.map(element -> { + var buf = ByteBufAllocator.DEFAULT.ioBuffer(); + try (var baos = new ByteBufOutputStream(buf)) { + serializer.serialize(element, baos); + return DefaultPayload.create(baos.buffer().retain()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + buf.release(); + } + }); } } diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 11b01c7..137f2ef 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -11,7 +11,7 @@ module tdlib.reactive.api { requires tdlight.api; requires com.google.common; requires java.logging; - requires kafka.clients; + requires static kafka.clients; requires org.apache.logging.log4j; requires reactor.kafka; requires com.fasterxml.jackson.databind;