diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java index 470b18d..281c349 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessage.java @@ -1,14 +1,15 @@ package it.tdlight.tdlibsession.td.middle; -import java.util.Arrays; +import io.vertx.reactivex.core.buffer.Buffer; import java.util.Objects; +import java.util.StringJoiner; public final class EndSessionMessage { private final int id; - private final byte[] binlog; + private final Buffer binlog; - public EndSessionMessage(int id, byte[] binlog) { + public EndSessionMessage(int id, Buffer binlog) { this.id = id; this.binlog = binlog; } @@ -17,20 +18,20 @@ public final class EndSessionMessage { return id; } - public byte[] binlog() { + public Buffer binlog() { return binlog; } @Override - public boolean equals(Object obj) { - if (obj == this) { + public boolean equals(Object o) { + if (this == o) { return true; } - if (obj == null || obj.getClass() != this.getClass()) { + if (o == null || getClass() != o.getClass()) { return false; } - var that = (EndSessionMessage) obj; - return this.id == that.id && Arrays.equals(this.binlog, that.binlog); + EndSessionMessage that = (EndSessionMessage) o; + return id == that.id && Objects.equals(binlog, that.binlog); } @Override @@ -40,6 +41,9 @@ public final class EndSessionMessage { @Override public String toString() { - return "EndSessionMessage[" + "id=" + id + ", " + "binlog=" + Arrays.hashCode(binlog) + ']'; + return new StringJoiner(", ", EndSessionMessage.class.getSimpleName() + "[", "]") + .add("id=" + id) + .add("binlog=" + binlog) + .toString(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessageCodec.java index 86bfb1a..1f72ab1 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/EndSessionMessageCodec.java @@ -2,10 +2,7 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; -import it.tdlight.utils.VertxBufferInputStream; -import it.tdlight.utils.VertxBufferOutputStream; -import org.warp.commonutils.stream.SafeDataInputStream; -import org.warp.commonutils.stream.SafeDataOutputStream; +import it.tdlight.utils.BufferUtils; public class EndSessionMessageCodec implements MessageCodec { @@ -18,22 +15,15 @@ public class EndSessionMessageCodec implements MessageCodec { + os.writeInt(t.id()); + BufferUtils.writeBuf(os, t.binlog()); + }); } @Override public EndSessionMessage decodeFromWire(int pos, Buffer buffer) { - try (var fis = new VertxBufferInputStream(buffer, pos)) { - try (var dis = new SafeDataInputStream(fis)) { - return new EndSessionMessage(dis.readInt(), dis.readNBytes(dis.readInt())); - } - } + return BufferUtils.decode(pos, buffer, is -> new EndSessionMessage(is.readInt(), BufferUtils.rxReadBuf(is))); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java index bfe3d5d..75687fe 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessage.java @@ -1,7 +1,7 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.json.JsonObject; -import java.util.Arrays; +import io.vertx.reactivex.core.buffer.Buffer; import java.util.Objects; import java.util.StringJoiner; @@ -9,11 +9,11 @@ public final class StartSessionMessage { private final int id; private final String alias; - private final byte[] binlog; + private final Buffer binlog; private final long binlogDate; private final JsonObject implementationDetails; - public StartSessionMessage(int id, String alias, byte[] binlog, long binlogDate, JsonObject implementationDetails) { + public StartSessionMessage(int id, String alias, Buffer binlog, long binlogDate, JsonObject implementationDetails) { this.id = id; this.alias = alias; this.binlog = binlog; @@ -29,7 +29,7 @@ public final class StartSessionMessage { return alias; } - public byte[] binlog() { + public Buffer binlog() { return binlog; } @@ -49,32 +49,15 @@ public final class StartSessionMessage { if (o == null || getClass() != o.getClass()) { return false; } - StartSessionMessage that = (StartSessionMessage) o; - - if (id != that.id) { - return false; - } - if (binlogDate != that.binlogDate) { - return false; - } - if (!Objects.equals(alias, that.alias)) { - return false; - } - if (!Arrays.equals(binlog, that.binlog)) { - return false; - } - return Objects.equals(implementationDetails, that.implementationDetails); + return id == that.id && binlogDate == that.binlogDate && Objects.equals(alias, that.alias) && Objects.equals(binlog, + that.binlog + ) && Objects.equals(implementationDetails, that.implementationDetails); } @Override public int hashCode() { - int result = id; - result = 31 * result + (alias != null ? alias.hashCode() : 0); - result = 31 * result + Arrays.hashCode(binlog); - result = 31 * result + (int) (binlogDate ^ (binlogDate >>> 32)); - result = 31 * result + (implementationDetails != null ? implementationDetails.hashCode() : 0); - return result; + return Objects.hash(id, alias, binlog, binlogDate, implementationDetails); } @Override @@ -82,7 +65,7 @@ public final class StartSessionMessage { return new StringJoiner(", ", StartSessionMessage.class.getSimpleName() + "[", "]") .add("id=" + id) .add("alias='" + alias + "'") - .add("binlog=" + Arrays.toString(binlog)) + .add("binlog=" + binlog) .add("binlogDate=" + binlogDate) .add("implementationDetails=" + implementationDetails) .toString(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java index ca8d3b4..b1f48f3 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/StartSessionMessageCodec.java @@ -3,10 +3,8 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.json.JsonObject; -import it.tdlight.utils.VertxBufferInputStream; -import it.tdlight.utils.VertxBufferOutputStream; -import org.warp.commonutils.stream.SafeDataInputStream; -import org.warp.commonutils.stream.SafeDataOutputStream; +import it.tdlight.utils.BufferUtils; +import org.warp.commonutils.serialization.UTFUtils; public class StartSessionMessageCodec implements MessageCodec { @@ -19,30 +17,23 @@ public class StartSessionMessageCodec implements MessageCodec { + os.writeInt(t.id()); + UTFUtils.writeUTF(os, t.alias()); + BufferUtils.writeBuf(os, t.binlog()); + os.writeLong(t.binlogDate()); + UTFUtils.writeUTF(os, t.implementationDetails().toString()); + }); } @Override public StartSessionMessage decodeFromWire(int pos, Buffer buffer) { - try (var fis = new VertxBufferInputStream(buffer, pos)) { - try (var dis = new SafeDataInputStream(fis)) { - return new StartSessionMessage(dis.readInt(), - dis.readUTF(), - dis.readNBytes(dis.readInt()), - dis.readLong(), - new JsonObject(dis.readUTF()) - ); - } - } + return BufferUtils.decode(pos, buffer, is -> new StartSessionMessage(is.readInt(), + UTFUtils.readUTF(is), + BufferUtils.rxReadBuf(is), + is.readLong(), + new JsonObject(UTFUtils.readUTF(is)) + )); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java index 1b27afe..9eefe01 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java @@ -4,11 +4,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; -import it.tdlight.utils.VertxBufferInputStream; -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import org.warp.commonutils.stream.SafeDataInputStream; +import it.tdlight.utils.BufferUtils; public class TdExecuteObjectMessageCodec implements MessageCodec { @@ -18,28 +14,17 @@ public class TdExecuteObjectMessageCodec implements MessageCodec { + os.writeBoolean(t.isExecuteDirectly()); + t.getRequest().serialize(os); + }); } @Override public ExecuteObject decodeFromWire(int pos, Buffer buffer) { - try (var fis = new VertxBufferInputStream(buffer, pos)) { - try (var dis = new SafeDataInputStream(fis)) { - return new ExecuteObject(dis.readBoolean(), (Function) TdApi.Deserializer.deserialize(dis)); - } - } catch (IOException ex) { - ex.printStackTrace(); - } - return null; + return BufferUtils.decode(pos, buffer, is -> { + return new ExecuteObject(is.readBoolean(), (Function) TdApi.Deserializer.deserialize(is)); + }); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java index 8483d50..01fffce 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdMessageCodec.java @@ -1,13 +1,11 @@ package it.tdlight.tdlibsession.td.middle; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; -import it.tdlight.utils.VertxBufferInputStream; -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import org.warp.commonutils.stream.SafeDataInputStream; public class TdMessageCodec implements MessageCodec { @@ -22,28 +20,21 @@ public class TdMessageCodec implements MessageCodec { @@ -20,44 +15,35 @@ public class TdResultListMessageCodec implements MessageCodec { + if (ts.succeeded()) { + os.writeBoolean(true); + var t = ts.value(); + os.writeInt(t.size()); + for (TdApi.Object t1 : t) { + t1.serialize(os); } + } else { + os.writeBoolean(false); + ts.error().serialize(os); } - } catch (IOException ex) { - ex.printStackTrace(); - } + }); } @Override public TdResultList decodeFromWire(int pos, Buffer buffer) { - try (var fis = new VertxBufferInputStream(buffer, pos)) { - try (var dis = new SafeDataInputStream(fis)) { - if (dis.readBoolean()) { - var size = dis.readInt(); - ArrayList list = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - list.add((TdApi.Object) TdApi.Deserializer.deserialize(dis)); - } - return new TdResultList(list); - } else { - return new TdResultList((Error) TdApi.Deserializer.deserialize(dis)); + return BufferUtils.decode(pos, buffer, is -> { + if (is.readBoolean()) { + var size = is.readInt(); + ArrayList list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add((TdApi.Object) TdApi.Deserializer.deserialize(is)); } + return new TdResultList(list); + } else { + return new TdResultList((Error) TdApi.Deserializer.deserialize(is)); } - } catch (IOException | UnsupportedOperationException ex) { - ex.printStackTrace(); - return new TdResultList(Collections.emptyList()); - } + }); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java index 5cf568b..de9457d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java @@ -3,11 +3,7 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; -import it.tdlight.utils.VertxBufferInputStream; -import it.tdlight.utils.VertxBufferOutputStream; -import java.io.IOException; -import org.warp.commonutils.stream.SafeDataInputStream; -import org.warp.commonutils.stream.SafeDataOutputStream; +import it.tdlight.utils.BufferUtils; @SuppressWarnings("rawtypes") public class TdResultMessageCodec implements MessageCodec { @@ -21,35 +17,26 @@ public class TdResultMessageCodec implements MessageCodec { + if (t.value != null) { + os.writeBoolean(true); + t.value.serialize(os); + } else { + os.writeBoolean(false); + t.cause.serialize(os); } - } catch (IOException ex) { - ex.printStackTrace(); - } + }); } @Override public TdResultMessage decodeFromWire(int pos, Buffer buffer) { - try (var fis = new VertxBufferInputStream(buffer, pos)) { - try (var dis = new SafeDataInputStream(fis)) { - if (dis.readBoolean()) { - return new TdResultMessage(TdApi.Deserializer.deserialize(dis), null); - } else { - return new TdResultMessage(null, (TdApi.Error) TdApi.Deserializer.deserialize(dis)); - } + return BufferUtils.decode(pos, buffer, is -> { + if (is.readBoolean()) { + return new TdResultMessage(TdApi.Deserializer.deserialize(is), null); + } else { + return new TdResultMessage(null, (TdApi.Error) TdApi.Deserializer.deserialize(is)); } - } catch (IOException ex) { - ex.printStackTrace(); - } - return null; + }); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 06cff25..47b7963 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.middle.client; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Function; @@ -99,7 +100,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog")); } - private Mono saveBinlog(byte[] data) { + private Mono saveBinlog(Buffer data) { return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); } @@ -116,7 +117,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return MonoUtils .emitValue(this.binlog, binlog) .then(binlog.getLastModifiedTime()) - .zipWith(binlog.readFullyBytes()) + .zipWith(binlog.readFully().map(Buffer::getDelegate)) .single() .flatMap(tuple -> { var binlogLastModifiedTime = tuple.getT1(); @@ -124,7 +125,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { var msg = new StartSessionMessage(this.botId, this.botAlias, - binlogData, + Buffer.newInstance(binlogData), binlogLastModifiedTime, implementationDetails ); @@ -262,7 +263,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic())) - .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length))) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) .publishOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index f8e6229..149d2a4 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -44,13 +44,13 @@ public class BinlogUtils { .publishOn(Schedulers.boundedElastic()); } - public static Mono saveBinlog(BinlogAsyncFile binlog, byte[] data) { + public static Mono saveBinlog(BinlogAsyncFile binlog, Buffer data) { return binlog.overwrite(data); } public static Mono chooseBinlog(FileSystem vertxFilesystem, Path binlogPath, - byte[] remoteBinlog, + Buffer remoteBinlog, long remoteBinlogDate) { var path = binlogPath.toString(); return retrieveBinlog(vertxFilesystem, binlogPath) @@ -64,7 +64,7 @@ public class BinlogUtils { .doOnNext(v -> logger.info("Using local binlog: " + binlogPath)) .map(Tuple2::getT1) .switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> logger.info("Using remote binlog. Overwriting " + binlogPath))) - .then(vertxFilesystem.rxWriteFile(path, Buffer.buffer(remoteBinlog)).as(MonoUtils::toMono)) + .then(vertxFilesystem.rxWriteFile(path, remoteBinlog.copy()).as(MonoUtils::toMono)) .then(retrieveBinlog(vertxFilesystem, binlogPath)) ) .single() @@ -119,7 +119,8 @@ public class BinlogUtils { }) .flatMapSequential(req -> BinlogUtils .retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId)) - .flatMap(BinlogAsyncFile::readFullyBytes) + .flatMap(BinlogAsyncFile::readFully) + .map(Buffer::copy) .single() .map(binlog -> Tuples.of(req, binlog)) ) diff --git a/src/main/java/it/tdlight/utils/BufferUtils.java b/src/main/java/it/tdlight/utils/BufferUtils.java new file mode 100644 index 0000000..d0bdd16 --- /dev/null +++ b/src/main/java/it/tdlight/utils/BufferUtils.java @@ -0,0 +1,94 @@ +package it.tdlight.utils; + +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; +import java.io.IOException; +import org.apache.commons.lang3.SerializationException; + +public class BufferUtils { + + private static final int CHUNK_SIZE = 8192; + + public static void writeBuf(ByteBufOutputStream os, io.vertx.reactivex.core.buffer.Buffer dataToWrite) + throws IOException { + var len = dataToWrite.length(); + os.writeInt(len); + byte[] part = new byte[CHUNK_SIZE]; + for (int i = 0; i < len; i += CHUNK_SIZE) { + var end = Math.min(i + CHUNK_SIZE, len); + dataToWrite.getBytes(i, end, part, 0); + os.write(part, 0, end - i); + } + } + + public static void writeBuf(ByteBufOutputStream os, io.vertx.core.buffer.Buffer dataToWrite) throws IOException { + var len = dataToWrite.length(); + os.writeInt(len); + byte[] part = new byte[CHUNK_SIZE]; + for (int i = 0; i < len; i += CHUNK_SIZE) { + var end = Math.min(i + CHUNK_SIZE, len); + dataToWrite.getBytes(i, end, part, 0); + os.write(part, 0, end - i); + } + } + + public static io.vertx.core.buffer.Buffer readBuf(ByteBufInputStream is) throws IOException { + int len = is.readInt(); + Buffer buf = Buffer.buffer(len); + byte[] part = new byte[1024]; + int readPart = 0; + for (int i = 0; i < len; i += 1024) { + var lenx = (Math.min(i + 1024, len)) - i; + if (lenx > 0) { + readPart = is.readNBytes(part, 0, lenx); + buf.appendBytes(part, 0, readPart); + } + } + return buf; + } + + public static io.vertx.reactivex.core.buffer.Buffer rxReadBuf(ByteBufInputStream is) throws IOException { + int len = is.readInt(); + io.vertx.reactivex.core.buffer.Buffer buf = io.vertx.reactivex.core.buffer.Buffer.buffer(len); + byte[] part = new byte[1024]; + int readPart = 0; + for (int i = 0; i < len; i += 1024) { + var lenx = (Math.min(i + 1024, len)) - i; + if (lenx > 0) { + readPart = is.readNBytes(part, 0, lenx); + buf.appendBytes(part, 0, readPart); + } + } + return buf; + } + + public interface Writer { + + void write(ByteBufOutputStream os) throws IOException; + } + + public interface Reader { + + T read(ByteBufInputStream is) throws IOException; + } + + public static void encode(Buffer buffer, Writer writer) { + try (var os = new ByteBufOutputStream(((BufferImpl) buffer).byteBuf())) { + writer.write(os); + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + + public static T decode(int pos, Buffer buffer, Reader reader) { + try (var is = new ByteBufInputStream(buffer.slice(pos, buffer.length()).getByteBuf())) { + return reader.read(is); + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + +}