diff --git a/pom.xml b/pom.xml
index cb1f5ab..2eac0c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,7 +180,7 @@
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
- 2.13.4
+ 2.14.0-rc1
jakarta.xml.bind
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index d45112d..8daf1c3 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -147,7 +147,7 @@ public class AtomixReactiveApi implements ReactiveApi {
.subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex));
}
- })).transform(ReactorUtils::subscribeOnce);
+ })).transform(ReactorUtils::subscribeOnceUntilUnsubscribe);
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
index 7860437..b9dcbff 100644
--- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
@@ -19,6 +19,7 @@ import it.tdlight.reactiveapi.Event.OnUpdateData;
import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import java.io.ByteArrayInputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.time.Duration;
@@ -28,7 +29,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import org.apache.kafka.common.errors.SerializationException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
}
}
- static @NotNull ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException {
+ static @NotNull ClientBoundEvent deserializeEvent(DataInput is) throws IOException {
var userId = is.readLong();
var dataVersion = is.readInt();
if (dataVersion != SERIAL_VERSION) {
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
index da307fe..08291c6 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
@@ -1,8 +1,6 @@
package it.tdlight.reactiveapi;
import java.lang.reflect.InvocationTargetException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
public class ChannelCodec {
public static final ChannelCodec CLIENT_BOUND_EVENT = new ChannelCodec(ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class);
diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java
index 4d1d34e..7c9f356 100644
--- a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java
+++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java
@@ -1,15 +1,24 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
-import org.apache.kafka.common.serialization.Deserializer;
+import java.io.DataInput;
+import java.io.IOException;
public class ClientBoundEventDeserializer implements Deserializer {
@Override
- public ClientBoundEvent deserialize(String topic, byte[] data) {
+ public ClientBoundEvent deserialize(byte[] data) {
if (data == null || data.length == 0) {
return null;
}
return LiveAtomixReactiveApiClient.deserializeEvent(data);
}
+
+ @Override
+ public ClientBoundEvent deserialize(int length, DataInput dataInput) throws IOException {
+ if (dataInput == null || length == 0) {
+ return null;
+ }
+ return LiveAtomixReactiveApiClient.deserializeEvent(dataInput);
+ }
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java
index 48d8d2d..264a58a 100644
--- a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java
+++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java
@@ -1,15 +1,24 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
-import org.apache.kafka.common.serialization.Serializer;
+import java.io.DataOutput;
+import java.io.IOException;
public class ClientBoundEventSerializer implements Serializer {
@Override
- public byte[] serialize(String topic, ClientBoundEvent data) {
+ public byte[] serialize(ClientBoundEvent data) {
if (data == null) {
return null;
}
return ReactiveApiPublisher.serializeEvent(data);
}
+
+ @Override
+ public void serialize(ClientBoundEvent data, DataOutput output) throws IOException {
+ if (data == null) {
+ return;
+ }
+ ReactiveApiPublisher.writeClientBoundEvent(data, output);
+ }
}
diff --git a/src/main/java/it/tdlight/reactiveapi/Deserializer.java b/src/main/java/it/tdlight/reactiveapi/Deserializer.java
new file mode 100644
index 0000000..18c9488
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/Deserializer.java
@@ -0,0 +1,22 @@
+package it.tdlight.reactiveapi;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public interface Deserializer {
+
+ default T deserialize(byte[] data) throws IOException {
+ var bais = new ByteArrayInputStream(data);
+ return deserialize(data.length, new DataInputStream(bais));
+ }
+
+ default T deserialize(int length, DataInput dataInput) throws IOException {
+ byte[] data = new byte[length];
+ dataInput.readFully(data);
+ return deserialize(data);
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index 1f66219..13959f1 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -38,6 +38,7 @@ import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import it.tdlight.tdlight.ClientManager;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -51,7 +52,6 @@ import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import org.apache.kafka.common.errors.SerializationException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
@@ -431,7 +431,7 @@ public abstract class ReactiveApiPublisher {
}
}
- private static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutputStream dataOutputStream)
+ public static void writeClientBoundEvent(ClientBoundEvent clientBoundEvent, DataOutput dataOutputStream)
throws IOException {
dataOutputStream.writeLong(clientBoundEvent.userId());
dataOutputStream.writeInt(SERIAL_VERSION);
@@ -511,9 +511,13 @@ public abstract class ReactiveApiPublisher {
@Override
public void onNext(Object responseObj) {
- r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(),
- onRequestObj.requestId(),
- userId, responseObj));
+ try {
+ r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(),
+ onRequestObj.requestId(),
+ userId, responseObj));
+ } catch (Throwable ex) {
+ onError(ex);
+ }
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
index c94ebcb..e958259 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
@@ -1,14 +1,9 @@
package it.tdlight.reactiveapi;
-import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import org.jetbrains.annotations.NotNull;
@@ -36,6 +31,15 @@ public class ReactorUtils {
});
}
+ public static Flux subscribeOnceUntilUnsubscribe(Flux f) {
+ AtomicBoolean subscribed = new AtomicBoolean();
+ return f.doOnSubscribe(s -> {
+ if (!subscribed.compareAndSet(false, true)) {
+ throw new UnsupportedOperationException("Can't subscribe more than once!");
+ }
+ }).doFinally(s -> subscribed.set(false));
+ }
+
public static Mono subscribeOnce(Mono f) {
AtomicBoolean subscribed = new AtomicBoolean();
return f.doOnSubscribe(s -> {
@@ -45,6 +49,15 @@ public class ReactorUtils {
});
}
+ public static Mono subscribeOnceUntilUnsubscribe(Mono f) {
+ AtomicBoolean subscribed = new AtomicBoolean();
+ return f.doOnSubscribe(s -> {
+ if (!subscribed.compareAndSet(false, true)) {
+ throw new UnsupportedOperationException("Can't subscribe more than once!");
+ }
+ }).doFinally(s -> subscribed.set(false));
+ }
+
public static Flux createLastestSubscriptionFlux(Flux upstream, int maxBufferSize) {
return upstream.transform(parent -> {
AtomicReference subscriptionAtomicReference = new AtomicReference<>();
diff --git a/src/main/java/it/tdlight/reactiveapi/SerializationException.java b/src/main/java/it/tdlight/reactiveapi/SerializationException.java
new file mode 100644
index 0000000..a699876
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/SerializationException.java
@@ -0,0 +1,32 @@
+package it.tdlight.reactiveapi;
+
+/**
+ * Any exception during serialization in the producer
+ */
+public class SerializationException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SerializationException(String message) {
+ super(message);
+ }
+
+ public SerializationException(Throwable cause) {
+ super(cause);
+ }
+
+ public SerializationException() {
+ super();
+ }
+
+ /* avoid the expensive and useless stack trace for serialization exceptions */
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/it/tdlight/reactiveapi/Serializer.java b/src/main/java/it/tdlight/reactiveapi/Serializer.java
new file mode 100644
index 0000000..3fa7cfe
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/Serializer.java
@@ -0,0 +1,27 @@
+package it.tdlight.reactiveapi;
+
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public interface Serializer {
+
+ default byte[] serialize(T data) throws IOException {
+ try (var baos = new FastByteArrayOutputStream()) {
+ try (var daos = new DataOutputStream(baos)) {
+ serialize(data, daos);
+ baos.trim();
+ return baos.array;
+ }
+ }
+ }
+
+ default void serialize(T data, DataOutput output) throws IOException {
+ output.write(serialize(data));
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
index a82d820..c48f4f0 100644
--- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
@@ -51,7 +51,7 @@ public class TdlibChannelsSharedHost implements Closeable {
public TdlibChannelsSharedHost(Set allLanes, TdlibChannelsServers tdServersChannels) {
this.tdServersChannels = tdServersChannels;
this.responsesSub = Mono.defer(() -> tdServersChannels.response()
- .sendMessages(responses.asFlux().log("responses", Level.FINE)))
+ .sendMessages(responses.asFlux()/*.log("responses", Level.FINE)*/))
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.subscribeOn(Schedulers.parallel())
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java
index 8ba7d82..ad2e4d6 100644
--- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java
@@ -43,7 +43,7 @@ public class TdlibChannelsSharedReceive implements Closeable {
this.tdClientsChannels = tdClientsChannels;
this.responses = Flux
.defer(() -> tdClientsChannels.response().consumeMessages())
- .log("responses", Level.FINE)
+ //.log("responses", Level.FINE)
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.publish()
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java
index 2c8d53d..395ec5d 100644
--- a/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibDeserializer.java
@@ -5,25 +5,29 @@ import static it.tdlight.reactiveapi.Event.SERIAL_VERSION;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import java.io.ByteArrayInputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
public class TdlibDeserializer implements Deserializer