From 94ad523a75d119d44b0b6af1bf887a18bf5f603f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 23 Jan 2021 01:41:42 +0100 Subject: [PATCH] Optimize decoding --- .../td/middle/TdResultListMessageCodec.java | 12 +- .../td/middle/direct/AsyncTdMiddleDirect.java | 2 +- .../server/AsyncTdMiddleEventBusServer.java | 2 +- .../tdlight/utils/VertxBufferInputStream.java | 113 ++++++++++++++++++ .../utils/VertxBufferOutputStream.java | 59 +++++++++ 5 files changed, 179 insertions(+), 9 deletions(-) create mode 100644 src/main/java/it/tdlight/utils/VertxBufferInputStream.java create mode 100644 src/main/java/it/tdlight/utils/VertxBufferOutputStream.java diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java index 8ad11ea..5df74f3 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java @@ -5,8 +5,8 @@ import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; import it.tdlight.tdlibsession.td.TdResult; -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import it.tdlight.utils.VertxBufferInputStream; +import it.tdlight.utils.VertxBufferOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -21,7 +21,7 @@ public class TdResultListMessageCodec implements MessageCodec> list = new ArrayList<>(); + ArrayList> list = new ArrayList<>(size); for (int i = 0; i < size; i++) { if (dis.readBoolean()) { list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis))); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index dee4bbb..d4dd070 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -75,7 +75,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd @Override public Flux receive() { return td - .receive(new AsyncTdDirectOptions(WAIT_DURATION, 1000)) + .receive(new AsyncTdDirectOptions(WAIT_DURATION, 100)) .takeUntilOther(closeRequest.asMono()) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 7744196..31db49b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -66,7 +66,7 @@ public class AsyncTdMiddleEventBusServer { @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { this.cluster = clusterManager; - this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000); + this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100); } public Mono start(String botAddress, String botAlias, boolean local) { diff --git a/src/main/java/it/tdlight/utils/VertxBufferInputStream.java b/src/main/java/it/tdlight/utils/VertxBufferInputStream.java new file mode 100644 index 0000000..dc84d1d --- /dev/null +++ b/src/main/java/it/tdlight/utils/VertxBufferInputStream.java @@ -0,0 +1,113 @@ +package it.tdlight.utils; + +import io.vertx.core.buffer.Buffer; +import it.unimi.dsi.fastutil.io.MeasurableInputStream; +import it.unimi.dsi.fastutil.io.RepositionableStream; + +public class VertxBufferInputStream extends MeasurableInputStream implements RepositionableStream { + + private final Buffer buffer; + + /** The first valid entry. */ + public int offset; + + /** The number of valid bytes in {@link #buffer} starting from {@link #offset}. */ + public int length; + + /** The current position as a distance from {@link #offset}. */ + private int position; + + /** The current mark as a position, or -1 if no mark exists. */ + private int mark; + + /** Creates a new buffer input stream using a given buffer fragment. + * + * @param buffer the backing buffer. + * @param offset the first valid entry of the buffer. + * @param length the number of valid bytes. + */ + public VertxBufferInputStream(final Buffer buffer, final int offset, final int length) { + this.buffer = buffer; + this.offset = offset; + this.length = length; + } + + /** Creates a new buffer input stream using a given buffer. + * + * @param buffer the backing buffer. + */ + public VertxBufferInputStream(final Buffer buffer) { + this(buffer, 0, buffer.length()); + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() { + position = mark; + } + + /** Closing a fast byte buffer input stream has no effect. */ + @Override + public void close() {} + + @Override + public void mark(final int dummy) { + mark = position; + } + + @Override + public int available() { + return length - position; + } + + @Override + public long skip(long n) { + if (n <= length - position) { + position += (int)n; + return n; + } + n = length - position; + position = length; + return n; + } + + @Override + public int read() { + if (length == position) return -1; + return buffer.getByte(offset + position++) & 0xFF; + } + + /** Reads bytes from this byte-buffer input stream as + * specified in {@link java.io.InputStream#read(byte[], int, int)}. + * Note that the implementation given in {@link java.io.ByteArrayInputStream#read(byte[], int, int)} + * will return -1 on a zero-length read at EOF, contrarily to the specification. We won't. + */ + + @Override + public int read(final byte b[], final int offset, final int length) { + if (this.length == this.position) return length == 0 ? 0 : -1; + final int n = Math.min(length, this.length - this.position); + buffer.getBytes(this.offset + this.position, this.offset + this.position + n, b, offset); + this.position += n; + return n; + } + + @Override + public long position() { + return position; + } + + @Override + public void position(final long newPosition) { + position = (int)Math.min(newPosition, length); + } + + @Override + public long length() { + return length; + } +} diff --git a/src/main/java/it/tdlight/utils/VertxBufferOutputStream.java b/src/main/java/it/tdlight/utils/VertxBufferOutputStream.java new file mode 100644 index 0000000..019556b --- /dev/null +++ b/src/main/java/it/tdlight/utils/VertxBufferOutputStream.java @@ -0,0 +1,59 @@ +package it.tdlight.utils; + +import io.vertx.core.buffer.Buffer; +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import it.unimi.dsi.fastutil.io.MeasurableOutputStream; +import it.unimi.dsi.fastutil.io.RepositionableStream; + +public class VertxBufferOutputStream extends MeasurableOutputStream implements RepositionableStream { + + /** The buffer backing the output stream. */ + public Buffer buffer; + + /** Creates a new buffer output stream with an initial capacity of 0 bytes. */ + public VertxBufferOutputStream() { + this(0); + } + + /** Creates a new buffer output stream with a given initial capacity. + * + * @param initialCapacity the initial length of the backing buffer. + */ + public VertxBufferOutputStream(final int initialCapacity) { + buffer = Buffer.buffer(initialCapacity); + } + + /** Creates a new buffer output stream wrapping a given byte buffer. + * + * @param a the byte buffer to wrap. + */ + public VertxBufferOutputStream(final Buffer a) { + buffer = a; + } + + @Override + public void write(final int b) { + buffer.appendByte((byte) b); + } + + @Override + public void write(final byte[] b, final int off, final int len) { + ByteArrays.ensureOffsetLength(b, off, len); + buffer.appendBytes(b, off, len); + } + + @Override + public void position(long newPosition) { + throw new UnsupportedOperationException("Can't change position of a vertx buffer output stream"); + } + + @Override + public long position() { + return this.length(); + } + + @Override + public long length() { + return buffer.length(); + } +}