Use buffers

This commit is contained in:
Andrea Cavalli 2021-01-27 00:58:07 +01:00
parent 20862694d2
commit 40ec712cf0
11 changed files with 202 additions and 189 deletions

View File

@ -1,14 +1,15 @@
package it.tdlight.tdlibsession.td.middle;
import java.util.Arrays;
import io.vertx.reactivex.core.buffer.Buffer;
import java.util.Objects;
import java.util.StringJoiner;
public final class EndSessionMessage {
private final int id;
private final byte[] binlog;
private final Buffer binlog;
public EndSessionMessage(int id, byte[] binlog) {
public EndSessionMessage(int id, Buffer binlog) {
this.id = id;
this.binlog = binlog;
}
@ -17,20 +18,20 @@ public final class EndSessionMessage {
return id;
}
public byte[] binlog() {
public Buffer binlog() {
return binlog;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
if (o == null || getClass() != o.getClass()) {
return false;
}
var that = (EndSessionMessage) obj;
return this.id == that.id && Arrays.equals(this.binlog, that.binlog);
EndSessionMessage that = (EndSessionMessage) o;
return id == that.id && Objects.equals(binlog, that.binlog);
}
@Override
@ -40,6 +41,9 @@ public final class EndSessionMessage {
@Override
public String toString() {
return "EndSessionMessage[" + "id=" + id + ", " + "binlog=" + Arrays.hashCode(binlog) + ']';
return new StringJoiner(", ", EndSessionMessage.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("binlog=" + binlog)
.toString();
}
}

View File

@ -2,10 +2,7 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
import it.tdlight.utils.BufferUtils;
public class EndSessionMessageCodec implements MessageCodec<EndSessionMessage, EndSessionMessage> {
@ -18,22 +15,15 @@ public class EndSessionMessageCodec implements MessageCodec<EndSessionMessage, E
@Override
public void encodeToWire(Buffer buffer, EndSessionMessage t) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
dos.writeInt(t.id());
dos.writeInt(t.binlog().length);
dos.write(t.binlog());
}
}
BufferUtils.encode(buffer, os -> {
os.writeInt(t.id());
BufferUtils.writeBuf(os, t.binlog());
});
}
@Override
public EndSessionMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new EndSessionMessage(dis.readInt(), dis.readNBytes(dis.readInt()));
}
}
return BufferUtils.decode(pos, buffer, is -> new EndSessionMessage(is.readInt(), BufferUtils.rxReadBuf(is)));
}
@Override

View File

@ -1,7 +1,7 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.json.JsonObject;
import java.util.Arrays;
import io.vertx.reactivex.core.buffer.Buffer;
import java.util.Objects;
import java.util.StringJoiner;
@ -9,11 +9,11 @@ public final class StartSessionMessage {
private final int id;
private final String alias;
private final byte[] binlog;
private final Buffer binlog;
private final long binlogDate;
private final JsonObject implementationDetails;
public StartSessionMessage(int id, String alias, byte[] binlog, long binlogDate, JsonObject implementationDetails) {
public StartSessionMessage(int id, String alias, Buffer binlog, long binlogDate, JsonObject implementationDetails) {
this.id = id;
this.alias = alias;
this.binlog = binlog;
@ -29,7 +29,7 @@ public final class StartSessionMessage {
return alias;
}
public byte[] binlog() {
public Buffer binlog() {
return binlog;
}
@ -49,32 +49,15 @@ public final class StartSessionMessage {
if (o == null || getClass() != o.getClass()) {
return false;
}
StartSessionMessage that = (StartSessionMessage) o;
if (id != that.id) {
return false;
}
if (binlogDate != that.binlogDate) {
return false;
}
if (!Objects.equals(alias, that.alias)) {
return false;
}
if (!Arrays.equals(binlog, that.binlog)) {
return false;
}
return Objects.equals(implementationDetails, that.implementationDetails);
return id == that.id && binlogDate == that.binlogDate && Objects.equals(alias, that.alias) && Objects.equals(binlog,
that.binlog
) && Objects.equals(implementationDetails, that.implementationDetails);
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (alias != null ? alias.hashCode() : 0);
result = 31 * result + Arrays.hashCode(binlog);
result = 31 * result + (int) (binlogDate ^ (binlogDate >>> 32));
result = 31 * result + (implementationDetails != null ? implementationDetails.hashCode() : 0);
return result;
return Objects.hash(id, alias, binlog, binlogDate, implementationDetails);
}
@Override
@ -82,7 +65,7 @@ public final class StartSessionMessage {
return new StringJoiner(", ", StartSessionMessage.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("alias='" + alias + "'")
.add("binlog=" + Arrays.toString(binlog))
.add("binlog=" + binlog)
.add("binlogDate=" + binlogDate)
.add("implementationDetails=" + implementationDetails)
.toString();

View File

@ -3,10 +3,8 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.json.JsonObject;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
import it.tdlight.utils.BufferUtils;
import org.warp.commonutils.serialization.UTFUtils;
public class StartSessionMessageCodec implements MessageCodec<StartSessionMessage, StartSessionMessage> {
@ -19,30 +17,23 @@ public class StartSessionMessageCodec implements MessageCodec<StartSessionMessag
@Override
public void encodeToWire(Buffer buffer, StartSessionMessage t) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
dos.writeInt(t.id());
dos.writeUTF(t.alias());
dos.writeInt(t.binlog().length);
dos.write(t.binlog());
dos.writeLong(t.binlogDate());
dos.writeUTF(t.implementationDetails().toString());
}
}
BufferUtils.encode(buffer, os -> {
os.writeInt(t.id());
UTFUtils.writeUTF(os, t.alias());
BufferUtils.writeBuf(os, t.binlog());
os.writeLong(t.binlogDate());
UTFUtils.writeUTF(os, t.implementationDetails().toString());
});
}
@Override
public StartSessionMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new StartSessionMessage(dis.readInt(),
dis.readUTF(),
dis.readNBytes(dis.readInt()),
dis.readLong(),
new JsonObject(dis.readUTF())
);
}
}
return BufferUtils.decode(pos, buffer, is -> new StartSessionMessage(is.readInt(),
UTFUtils.readUTF(is),
BufferUtils.rxReadBuf(is),
is.readLong(),
new JsonObject(UTFUtils.readUTF(is))
));
}
@Override

View File

@ -4,11 +4,7 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.utils.VertxBufferInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
import it.tdlight.utils.BufferUtils;
public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
@ -18,28 +14,17 @@ public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject,
@Override
public void encodeToWire(Buffer buffer, ExecuteObject t) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
dos.writeBoolean(t.isExecuteDirectly());
t.getRequest().serialize(dos);
}
bos.trim();
buffer.appendBytes(bos.array);
} catch (IOException ex) {
ex.printStackTrace();
}
BufferUtils.encode(buffer, os -> {
os.writeBoolean(t.isExecuteDirectly());
t.getRequest().serialize(os);
});
}
@Override
public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new ExecuteObject(dis.readBoolean(), (Function) TdApi.Deserializer.deserialize(dis));
}
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
return BufferUtils.decode(pos, buffer, is -> {
return new ExecuteObject(is.readBoolean(), (Function) TdApi.Deserializer.deserialize(is));
});
}
@Override

View File

@ -1,13 +1,11 @@
package it.tdlight.tdlibsession.td.middle;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.utils.VertxBufferInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
public class TdMessageCodec<T extends TdApi.Object> implements MessageCodec<T, T> {
@ -22,28 +20,21 @@ public class TdMessageCodec<T extends TdApi.Object> implements MessageCodec<T, T
@Override
public void encodeToWire(Buffer buffer, T t) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
t.serialize(dos);
}
bos.trim();
buffer.appendBytes(bos.array);
try (var os = new ByteBufOutputStream(buffer.getByteBuf())) {
t.serialize(os);
} catch (IOException ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
@Override
public T decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
//noinspection unchecked
return (T) TdApi.Deserializer.deserialize(dis);
}
try (var is = new ByteBufInputStream(buffer.getByteBuf(), pos)) {
//noinspection unchecked
return (T) TdApi.Deserializer.deserialize(is);
} catch (IOException ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
return null;
}
@Override

View File

@ -4,13 +4,8 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import it.tdlight.utils.BufferUtils;
import java.util.ArrayList;
import java.util.Collections;
import org.warp.commonutils.stream.SafeDataInputStream;
public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdResultList> {
@ -20,44 +15,35 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public void encodeToWire(Buffer buffer, TdResultList ts) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) {
if (ts.succeeded()) {
dos.writeBoolean(true);
var t = ts.value();
dos.writeInt(t.size());
for (TdApi.Object t1 : t) {
t1.serialize(dos);
}
} else {
dos.writeBoolean(false);
ts.error().serialize(dos);
BufferUtils.encode(buffer, os -> {
if (ts.succeeded()) {
os.writeBoolean(true);
var t = ts.value();
os.writeInt(t.size());
for (TdApi.Object t1 : t) {
t1.serialize(os);
}
} else {
os.writeBoolean(false);
ts.error().serialize(os);
}
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
@Override
public TdResultList decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
if (dis.readBoolean()) {
var size = dis.readInt();
ArrayList<TdApi.Object> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
list.add((TdApi.Object) TdApi.Deserializer.deserialize(dis));
}
return new TdResultList(list);
} else {
return new TdResultList((Error) TdApi.Deserializer.deserialize(dis));
return BufferUtils.decode(pos, buffer, is -> {
if (is.readBoolean()) {
var size = is.readInt();
ArrayList<TdApi.Object> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
list.add((TdApi.Object) TdApi.Deserializer.deserialize(is));
}
return new TdResultList(list);
} else {
return new TdResultList((Error) TdApi.Deserializer.deserialize(is));
}
} catch (IOException | UnsupportedOperationException ex) {
ex.printStackTrace();
return new TdResultList(Collections.emptyList());
}
});
}
@Override

View File

@ -3,11 +3,7 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
import it.tdlight.utils.BufferUtils;
@SuppressWarnings("rawtypes")
public class TdResultMessageCodec implements MessageCodec<TdResultMessage, TdResultMessage> {
@ -21,35 +17,26 @@ public class TdResultMessageCodec implements MessageCodec<TdResultMessage, TdRes
@Override
public void encodeToWire(Buffer buffer, TdResultMessage t) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
if (t.value != null) {
dos.writeBoolean(true);
t.value.serialize(dos.asDataOutputStream());
} else {
dos.writeBoolean(false);
t.cause.serialize(dos.asDataOutputStream());
}
BufferUtils.encode(buffer, os -> {
if (t.value != null) {
os.writeBoolean(true);
t.value.serialize(os);
} else {
os.writeBoolean(false);
t.cause.serialize(os);
}
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
@Override
public TdResultMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
if (dis.readBoolean()) {
return new TdResultMessage(TdApi.Deserializer.deserialize(dis), null);
} else {
return new TdResultMessage(null, (TdApi.Error) TdApi.Deserializer.deserialize(dis));
}
return BufferUtils.decode(pos, buffer, is -> {
if (is.readBoolean()) {
return new TdResultMessage(TdApi.Deserializer.deserialize(is), null);
} else {
return new TdResultMessage(null, (TdApi.Error) TdApi.Deserializer.deserialize(is));
}
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
});
}
@Override

View File

@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.middle.client;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
@ -99,7 +100,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog"));
}
private Mono<Void> saveBinlog(byte[] data) {
private Mono<Void> saveBinlog(Buffer data) {
return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data));
}
@ -116,7 +117,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return MonoUtils
.emitValue(this.binlog, binlog)
.then(binlog.getLastModifiedTime())
.zipWith(binlog.readFullyBytes())
.zipWith(binlog.readFully().map(Buffer::getDelegate))
.single()
.flatMap(tuple -> {
var binlogLastModifiedTime = tuple.getT1();
@ -124,7 +125,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
var msg = new StartSessionMessage(this.botId,
this.botAlias,
binlogData,
Buffer.newInstance(binlogData),
binlogLastModifiedTime,
implementationDetails
);
@ -262,7 +263,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib"))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length)))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.publishOn(Schedulers.boundedElastic())

View File

@ -44,13 +44,13 @@ public class BinlogUtils {
.publishOn(Schedulers.boundedElastic());
}
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, byte[] data) {
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, Buffer data) {
return binlog.overwrite(data);
}
public static Mono<Void> chooseBinlog(FileSystem vertxFilesystem,
Path binlogPath,
byte[] remoteBinlog,
Buffer remoteBinlog,
long remoteBinlogDate) {
var path = binlogPath.toString();
return retrieveBinlog(vertxFilesystem, binlogPath)
@ -64,7 +64,7 @@ public class BinlogUtils {
.doOnNext(v -> logger.info("Using local binlog: " + binlogPath))
.map(Tuple2::getT1)
.switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> logger.info("Using remote binlog. Overwriting " + binlogPath)))
.then(vertxFilesystem.rxWriteFile(path, Buffer.buffer(remoteBinlog)).as(MonoUtils::toMono))
.then(vertxFilesystem.rxWriteFile(path, remoteBinlog.copy()).as(MonoUtils::toMono))
.then(retrieveBinlog(vertxFilesystem, binlogPath))
)
.single()
@ -119,7 +119,8 @@ public class BinlogUtils {
})
.flatMapSequential(req -> BinlogUtils
.retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId))
.flatMap(BinlogAsyncFile::readFullyBytes)
.flatMap(BinlogAsyncFile::readFully)
.map(Buffer::copy)
.single()
.map(binlog -> Tuples.of(req, binlog))
)

View File

@ -0,0 +1,94 @@
package it.tdlight.utils;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import java.io.IOException;
import org.apache.commons.lang3.SerializationException;
public class BufferUtils {
private static final int CHUNK_SIZE = 8192;
public static void writeBuf(ByteBufOutputStream os, io.vertx.reactivex.core.buffer.Buffer dataToWrite)
throws IOException {
var len = dataToWrite.length();
os.writeInt(len);
byte[] part = new byte[CHUNK_SIZE];
for (int i = 0; i < len; i += CHUNK_SIZE) {
var end = Math.min(i + CHUNK_SIZE, len);
dataToWrite.getBytes(i, end, part, 0);
os.write(part, 0, end - i);
}
}
public static void writeBuf(ByteBufOutputStream os, io.vertx.core.buffer.Buffer dataToWrite) throws IOException {
var len = dataToWrite.length();
os.writeInt(len);
byte[] part = new byte[CHUNK_SIZE];
for (int i = 0; i < len; i += CHUNK_SIZE) {
var end = Math.min(i + CHUNK_SIZE, len);
dataToWrite.getBytes(i, end, part, 0);
os.write(part, 0, end - i);
}
}
public static io.vertx.core.buffer.Buffer readBuf(ByteBufInputStream is) throws IOException {
int len = is.readInt();
Buffer buf = Buffer.buffer(len);
byte[] part = new byte[1024];
int readPart = 0;
for (int i = 0; i < len; i += 1024) {
var lenx = (Math.min(i + 1024, len)) - i;
if (lenx > 0) {
readPart = is.readNBytes(part, 0, lenx);
buf.appendBytes(part, 0, readPart);
}
}
return buf;
}
public static io.vertx.reactivex.core.buffer.Buffer rxReadBuf(ByteBufInputStream is) throws IOException {
int len = is.readInt();
io.vertx.reactivex.core.buffer.Buffer buf = io.vertx.reactivex.core.buffer.Buffer.buffer(len);
byte[] part = new byte[1024];
int readPart = 0;
for (int i = 0; i < len; i += 1024) {
var lenx = (Math.min(i + 1024, len)) - i;
if (lenx > 0) {
readPart = is.readNBytes(part, 0, lenx);
buf.appendBytes(part, 0, readPart);
}
}
return buf;
}
public interface Writer {
void write(ByteBufOutputStream os) throws IOException;
}
public interface Reader<T> {
T read(ByteBufInputStream is) throws IOException;
}
public static void encode(Buffer buffer, Writer writer) {
try (var os = new ByteBufOutputStream(((BufferImpl) buffer).byteBuf())) {
writer.write(os);
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
public static <T> T decode(int pos, Buffer buffer, Reader<T> reader) {
try (var is = new ByteBufInputStream(buffer.slice(pos, buffer.length()).getByteBuf())) {
return reader.read(is);
} catch (IOException ex) {
throw new SerializationException(ex);
}
}
}