Clean code
This commit is contained in:
@ -0,0 +1,149 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.core.Atomix;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
import it.tdlight.reactiveapi.Event.OnPasswordRequested;
import it.tdlight.reactiveapi.Event.OnUpdateData;
import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient {
protected final ClusterEventService eventService;
protected final long userId;
private final Mono<Long> liveId;
public BaseAtomixReactiveApiClient(Atomix atomix, long userId) {
this.eventService = atomix.getEventService();
this.userId = userId;
this.liveId = resolveLiveId();
public final <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return liveId
.flatMap(liveId -> Mono
.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),
Duration.between(, timeout)
.onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else if (ex instanceof TimeoutException) {
return new TdError(408, "Request Timeout", ex);
} else {
return ex;
.handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
} else {
//noinspection unchecked
|||| item);
protected abstract Mono<Long> resolveLiveId();
public final long getUserId() {
return userId;
public final boolean isPullMode() {
return true;
static TdApi.Object deserializeResponse(byte[] bytes) {
try {
if (bytes == null || bytes.length == 0) {
return null;
return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
} catch (IOException ex) {
throw new SerializationException(ex);
static byte[] serializeRequest(Request<?> request) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
return byteArrayOutputStream.toByteArray();
} catch (UnsupportedOperationException | IOException ex) {
throw new SerializationException(ex);
static ClientBoundEvent deserializeEvent(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
return deserializeEvent(is);
} catch (IOException ex) {
throw new SerializationException(ex);
static List<ClientBoundEvent> deserializeEvents(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
var len = is.readInt();
var result = new ArrayList<ClientBoundEvent>(len);
for (int i = 0; i < len; i++) {
return result;
} catch (IOException ex) {
throw new SerializationException(ex);
static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException {
var liveId = is.readLong();
var userId = is.readLong();
return switch (is.readByte()) {
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is));
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is));
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong());
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF());
case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF());
case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF());
default -> throw new IllegalStateException("Unexpected value: " + is.readByte());
@ -1,39 +1,31 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable {
public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient implements AutoCloseable {
private static final long LIVE_ID_UNSET = -1L;
private static final long LIVE_ID_FAILED = -2L;
private final ReactiveApi api;
private final ClusterEventService eventService;
private final AtomicLong liveId = new AtomicLong(LIVE_ID_UNSET);
private final Disposable liveIdSubscription;
private final long userId;
private final Flux<ClientBoundEvent> clientBoundEvents;
private final Flux<Long> liveIdChange;
private final Mono<Long> liveIdResolution;
private volatile boolean closed;
DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) {
super(api.getAtomix(), userId);
this.api = api;
this.eventService = api.getAtomix().getEventService();
this.userId = userId;
clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId)
@ -47,7 +39,6 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
this.liveIdSubscription = liveIdChange.subscribeOn(Schedulers.parallel()).subscribe(liveId::set);
this.liveIdResolution = this.resolveLiveId();
@ -56,33 +47,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return liveIdResolution
.flatMap(liveId -> Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),
Duration.between(, timeout)
)).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else if (ex instanceof TimeoutException) {
return new TdError(408, "Request Timeout", ex);
} else {
return ex;
.<T>handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
} else {
//noinspection unchecked
|||| item);
private Mono<Long> resolveLiveId() {
protected Mono<Long> resolveLiveId() {
return Mono
.flatMap(liveId -> {
@ -103,16 +68,6 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
+ " is not found on the cluster, no live id has been associated with it locally");
public long getUserId() {
return userId;
public boolean isPullMode() {
return true;
public Flux<Long> liveIdChange() {
return liveIdChange;
@ -1,45 +1,23 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.core.Atomix;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
import it.tdlight.reactiveapi.Event.OnPasswordRequested;
import it.tdlight.reactiveapi.Event.OnUpdateData;
import it.tdlight.reactiveapi.Event.OnUpdateError;
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
private final ClusterEventService eventService;
private final long liveId;
private final long userId;
public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
private final Flux<ClientBoundEvent> clientBoundEvents;
private final Mono<Long> liveId;
LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId, String subGroupId) {
this.eventService = atomix.getEventService();
this.liveId = liveId;
this.userId = userId;
LiveAtomixReactiveApiClient(Atomix atomix,
KafkaConsumer kafkaConsumer,
long liveId,
long userId,
String subGroupId) {
super(atomix, userId);
this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share();
this.liveId = Mono.just(liveId);
@ -48,104 +26,8 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return Mono
.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),
Duration.between(, timeout)
.subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else if (ex instanceof TimeoutException) {
return new TdError(408, "Request Timeout", ex);
} else {
return ex;
.handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
} else {
//noinspection unchecked
|||| item);
public Mono<Long> resolveLiveId() {
return liveId;
public long getUserId() {
return userId;
public boolean isPullMode() {
return true;
static TdApi.Object deserializeResponse(byte[] bytes) {
try {
if (bytes == null || bytes.length == 0) {
return null;
return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
} catch (IOException ex) {
throw new SerializationException(ex);
static byte[] serializeRequest(Request<?> request) {
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
return byteArrayOutputStream.toByteArray();
} catch (UnsupportedOperationException | IOException ex) {
throw new SerializationException(ex);
static ClientBoundEvent deserializeEvent(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
return deserializeEvent(is);
} catch (IOException ex) {
throw new SerializationException(ex);
static List<ClientBoundEvent> deserializeEvents(byte[] bytes) {
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
try (var is = new DataInputStream(byteArrayInputStream)) {
var len = is.readInt();
var result = new ArrayList<ClientBoundEvent>(len);
for (int i = 0; i < len; i++) {
return result;
} catch (IOException ex) {
throw new SerializationException(ex);
static ClientBoundEvent deserializeEvent(DataInputStream is) throws IOException {
var liveId = is.readLong();
var userId = is.readLong();
return switch (is.readByte()) {
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(is));
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(is));
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, is.readLong());
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, is.readUTF());
case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, is.readUTF());
case 0x06 -> new OnPasswordRequested(liveId, userId, is.readUTF(), is.readBoolean(), is.readUTF());
default -> throw new IllegalStateException("Unexpected value: " + is.readByte());
Reference in New Issue
Block a user