diff --git a/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java index 2feba1c..8b000da 100644 --- a/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java @@ -63,10 +63,8 @@ public class SignalMessageCodec implements MessageCodec, Sig case 0x01: return SignalMessage.onNext(typeCodec.decodeFromWire(pos + 1, buffer)); case 0x02: - var size = buffer.getInt(pos + 1); - return SignalMessage.onDecodedError(new String(buffer.getBytes(pos + 2, pos + 2 + size), - StandardCharsets.UTF_8 - )); + var size = dis.readInt(); + return SignalMessage.onDecodedError(new String(dis.readNBytes(size), StandardCharsets.UTF_8)); case 0x03: return SignalMessage.onComplete(); default: diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 6f6f57b..3d06629 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class AsyncTdDirectImpl implements AsyncTdDirect { @@ -24,6 +25,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class); private final One td = Sinks.one(); + private final Scheduler tdScheduler = Schedulers.single(); private final String botAlias; @@ -43,7 +45,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } throw new IllegalStateException("TDLib client is destroyed"); } - }).publishOn(Schedulers.boundedElastic()).single()); + }).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler)); } else { return td.asMono().flatMap(td -> Mono.>create(sink -> { if (td != null) { @@ -58,7 +60,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } sink.error(new IllegalStateException("TDLib client is destroyed")); } - })).single(); + })).single().subscribeOn(tdScheduler); } } @@ -89,6 +91,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { ex -> logger.trace("Error when disposing td client", ex) ))).subscribe(); }); - }); + }).subscribeOn(tdScheduler); } }