From 797808114cfffb0cea7d320e2878e5263c56996b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 21 Jan 2022 22:25:47 +0100 Subject: [PATCH] Clean code --- .../BaseAtomixReactiveApiClient.java | 149 ++++++++++++++++++ .../DynamicAtomixReactiveApiClient.java | 53 +------ .../LiveAtomixReactiveApiClient.java | 140 ++-------------- 3 files changed, 164 insertions(+), 178 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java new file mode 100644 index 0000000..ea4a54c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -0,0 +1,149 @@ +package it.tdlight.reactiveapi; + +import io.atomix.cluster.messaging.ClusterEventService; +import io.atomix.cluster.messaging.MessagingException; +import io.atomix.core.Atomix; +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; +import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; +import it.tdlight.reactiveapi.Event.OnPasswordRequested; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import it.tdlight.reactiveapi.Event.OnUpdateError; +import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; +import it.tdlight.reactiveapi.Event.Request; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient { + + protected final ClusterEventService eventService; + protected final long userId; + private final Mono liveId; + + public BaseAtomixReactiveApiClient(Atomix atomix, long userId) { + this.eventService = atomix.getEventService(); + this.userId = userId; + this.liveId = resolveLiveId(); + } + + @Override + public final Mono request(TdApi.Function request, Instant timeout) { + return liveId + .flatMap(liveId -> Mono + .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", + new Request<>(liveId, request, timeout), + LiveAtomixReactiveApiClient::serializeRequest, + LiveAtomixReactiveApiClient::deserializeResponse, + Duration.between(Instant.now(), timeout) + )) + .subscribeOn(Schedulers.boundedElastic()) + .onErrorMap(ex -> { + if (ex instanceof MessagingException.NoRemoteHandler) { + return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); + } else if (ex instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); + } else { + return ex; + } + }) + ) + .handle((item, sink) -> { + if (item instanceof TdApi.Error error) { + sink.error(new TdError(error.code, error.message)); + } else { + //noinspection unchecked + sink.next((T) item); + } + }); + } + + protected abstract Mono resolveLiveId(); + + @Override + public final long getUserId() { + return userId; + } + + @Override + public final boolean isPullMode() { + return true; + } + + + static TdApi.Object deserializeResponse(byte[] bytes) { + try { + if (bytes == null || bytes.length == 0) { + return null; + } + return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + static byte[] serializeRequest(Request request) { + try (var byteArrayOutputStream = new ByteArrayOutputStream()) { + try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + dataOutputStream.writeLong(request.liveId()); + dataOutputStream.writeLong(request.timeout().toEpochMilli()); + request.request().serialize(dataOutputStream); + dataOutputStream.flush(); + return byteArrayOutputStream.toByteArray(); + } + } catch (UnsupportedOperationException | IOException ex) { + throw new SerializationException(ex); + } + } + + static ClientBoundEvent deserializeEvent(byte[] bytes) { + try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { + try (var is = new DataInputStream(byteArrayInputStream)) { + return deserializeEvent(is); + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + static List deserializeEvents(byte[] bytes) { + try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { + try (var is = new DataInputStream(byteArrayInputStream)) { + var len = is.readInt(); + var result = new ArrayList(len); + for (int i = 0; i < len; i++) { + result.add(deserializeEvent(is)); + } + return result; + } + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException { + var liveId = is.readLong(); + var userId = is.readLong(); + return switch (is.readByte()) { + case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is)); + case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is)); + case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong()); + case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF()); + case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF()); + case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF()); + default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); + }; + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 9700206..e41d537 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -1,39 +1,31 @@ package it.tdlight.reactiveapi; -import io.atomix.cluster.messaging.ClusterEventService; -import io.atomix.cluster.messaging.MessagingException; -import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; -import java.time.Instant; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable { +public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient implements AutoCloseable { private static final long LIVE_ID_UNSET = -1L; private static final long LIVE_ID_FAILED = -2L; private final ReactiveApi api; - private final ClusterEventService eventService; private final AtomicLong liveId = new AtomicLong(LIVE_ID_UNSET); private final Disposable liveIdSubscription; private final long userId; private final Flux clientBoundEvents; private final Flux liveIdChange; - private final Mono liveIdResolution; private volatile boolean closed; DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) { + super(api.getAtomix(), userId); this.api = api; - this.eventService = api.getAtomix().getEventService(); this.userId = userId; clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId) @@ -47,42 +39,15 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl .distinctUntilChanged(); this.liveIdSubscription = liveIdChange.subscribeOn(Schedulers.parallel()).subscribe(liveId::set); - this.liveIdResolution = this.resolveLiveId(); } @Override public Flux clientBoundEvents() { return clientBoundEvents; } - - @Override - public Mono request(TdApi.Function request, Instant timeout) { - return liveIdResolution - .flatMap(liveId -> Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", - new Request<>(liveId, request, timeout), - LiveAtomixReactiveApiClient::serializeRequest, - LiveAtomixReactiveApiClient::deserializeResponse, - Duration.between(Instant.now(), timeout) - )).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> { - if (ex instanceof MessagingException.NoRemoteHandler) { - return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); - } else if (ex instanceof TimeoutException) { - return new TdError(408, "Request Timeout", ex); - } else { - return ex; - } - })) - .handle((item, sink) -> { - if (item instanceof TdApi.Error error) { - sink.error(new TdError(error.code, error.message)); - } else { - //noinspection unchecked - sink.next((T) item); - } - }); - } - private Mono resolveLiveId() { + @Override + protected Mono resolveLiveId() { return Mono .fromSupplier(this.liveId::get) .flatMap(liveId -> { @@ -103,16 +68,6 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl + " is not found on the cluster, no live id has been associated with it locally"); } - @Override - public long getUserId() { - return userId; - } - - @Override - public boolean isPullMode() { - return true; - } - public Flux liveIdChange() { return liveIdChange; } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index d40de1a..556139c 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -1,151 +1,33 @@ package it.tdlight.reactiveapi; -import io.atomix.cluster.messaging.ClusterEventService; -import io.atomix.cluster.messaging.MessagingException; import io.atomix.core.Atomix; -import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; -import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested; -import it.tdlight.reactiveapi.Event.OnPasswordRequested; -import it.tdlight.reactiveapi.Event.OnUpdateData; -import it.tdlight.reactiveapi.Event.OnUpdateError; -import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; -import it.tdlight.reactiveapi.Event.Request; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; -import org.apache.commons.lang3.SerializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -public class LiveAtomixReactiveApiClient implements ReactiveApiClient { - - private final ClusterEventService eventService; - private final long liveId; - private final long userId; +public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { private final Flux clientBoundEvents; + private final Mono liveId; - LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId, String subGroupId) { - this.eventService = atomix.getEventService(); - this.liveId = liveId; - this.userId = userId; + LiveAtomixReactiveApiClient(Atomix atomix, + KafkaConsumer kafkaConsumer, + long liveId, + long userId, + String subGroupId) { + super(atomix, userId); this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share(); + this.liveId = Mono.just(liveId); } @Override public Flux clientBoundEvents() { return clientBoundEvents; } - - @Override - public Mono request(TdApi.Function request, Instant timeout) { - return Mono - .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", - new Request<>(liveId, request, timeout), - LiveAtomixReactiveApiClient::serializeRequest, - LiveAtomixReactiveApiClient::deserializeResponse, - Duration.between(Instant.now(), timeout) - )) - .subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> { - if (ex instanceof MessagingException.NoRemoteHandler) { - return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); - } else if (ex instanceof TimeoutException) { - return new TdError(408, "Request Timeout", ex); - } else { - return ex; - } - }) - .handle((item, sink) -> { - if (item instanceof TdApi.Error error) { - sink.error(new TdError(error.code, error.message)); - } else { - //noinspection unchecked - sink.next((T) item); - } - }); - } @Override - public long getUserId() { - return userId; + public Mono resolveLiveId() { + return liveId; } - @Override - public boolean isPullMode() { - return true; - } - - static TdApi.Object deserializeResponse(byte[] bytes) { - try { - if (bytes == null || bytes.length == 0) { - return null; - } - return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); - } catch (IOException ex) { - throw new SerializationException(ex); - } - } - - static byte[] serializeRequest(Request request) { - try (var byteArrayOutputStream = new ByteArrayOutputStream()) { - try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { - dataOutputStream.writeLong(request.liveId()); - dataOutputStream.writeLong(request.timeout().toEpochMilli()); - request.request().serialize(dataOutputStream); - dataOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); - } - } catch (UnsupportedOperationException | IOException ex) { - throw new SerializationException(ex); - } - } - - static ClientBoundEvent deserializeEvent(byte[] bytes) { - try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { - try (var is = new DataInputStream(byteArrayInputStream)) { - return deserializeEvent(is); - } - } catch (IOException ex) { - throw new SerializationException(ex); - } - } - - static List deserializeEvents(byte[] bytes) { - try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { - try (var is = new DataInputStream(byteArrayInputStream)) { - var len = is.readInt(); - var result = new ArrayList(len); - for (int i = 0; i < len; i++) { - result.add(deserializeEvent(is)); - } - return result; - } - } catch (IOException ex) { - throw new SerializationException(ex); - } - } - - static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException { - var liveId = is.readLong(); - var userId = is.readLong(); - return switch (is.readByte()) { - case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is)); - case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is)); - case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong()); - case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF()); - case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF()); - case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF()); - default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); - }; - } }