Optimize decoding

This commit is contained in:
Andrea Cavalli 2021-01-23 01:41:42 +01:00
parent b51fcbbf90
commit 94ad523a75
5 changed files with 179 additions and 9 deletions

View File

@ -5,8 +5,8 @@ import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.tdlibsession.td.TdResult;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -21,7 +21,7 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public void encodeToWire(Buffer buffer, TdResultList ts) {
try (var bos = new FastByteArrayOutputStream()) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) {
var t = ts.getValues();
dos.writeInt(t.size());
@ -35,8 +35,6 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
}
}
}
bos.trim();
buffer.appendBytes(bos.array);
} catch (IOException ex) {
ex.printStackTrace();
}
@ -44,10 +42,10 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public TdResultList decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var fis = new VertxBufferInputStream(buffer, pos, buffer.length())) {
try (var dis = new DataInputStream(fis)) {
var size = dis.readInt();
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>();
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
if (dis.readBoolean()) {
list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis)));

View File

@ -75,7 +75,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
@Override
public Flux<TdApi.Object> receive() {
return td
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 1000))
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 100))
.takeUntilOther(closeRequest.asMono())
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))

View File

@ -66,7 +66,7 @@ public class AsyncTdMiddleEventBusServer {
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000);
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100);
}
public Mono<AsyncTdMiddleEventBusServer> start(String botAddress, String botAlias, boolean local) {

View File

@ -0,0 +1,113 @@
package it.tdlight.utils;
import io.vertx.core.buffer.Buffer;
import it.unimi.dsi.fastutil.io.MeasurableInputStream;
import it.unimi.dsi.fastutil.io.RepositionableStream;
public class VertxBufferInputStream extends MeasurableInputStream implements RepositionableStream {
private final Buffer buffer;
/** The first valid entry. */
public int offset;
/** The number of valid bytes in {@link #buffer} starting from {@link #offset}. */
public int length;
/** The current position as a distance from {@link #offset}. */
private int position;
/** The current mark as a position, or -1 if no mark exists. */
private int mark;
/** Creates a new buffer input stream using a given buffer fragment.
*
* @param buffer the backing buffer.
* @param offset the first valid entry of the buffer.
* @param length the number of valid bytes.
*/
public VertxBufferInputStream(final Buffer buffer, final int offset, final int length) {
this.buffer = buffer;
this.offset = offset;
this.length = length;
}
/** Creates a new buffer input stream using a given buffer.
*
* @param buffer the backing buffer.
*/
public VertxBufferInputStream(final Buffer buffer) {
this(buffer, 0, buffer.length());
}
@Override
public boolean markSupported() {
return true;
}
@Override
public void reset() {
position = mark;
}
/** Closing a fast byte buffer input stream has no effect. */
@Override
public void close() {}
@Override
public void mark(final int dummy) {
mark = position;
}
@Override
public int available() {
return length - position;
}
@Override
public long skip(long n) {
if (n <= length - position) {
position += (int)n;
return n;
}
n = length - position;
position = length;
return n;
}
@Override
public int read() {
if (length == position) return -1;
return buffer.getByte(offset + position++) & 0xFF;
}
/** Reads bytes from this byte-buffer input stream as
* specified in {@link java.io.InputStream#read(byte[], int, int)}.
* Note that the implementation given in {@link java.io.ByteArrayInputStream#read(byte[], int, int)}
* will return -1 on a zero-length read at EOF, contrarily to the specification. We won't.
*/
@Override
public int read(final byte b[], final int offset, final int length) {
if (this.length == this.position) return length == 0 ? 0 : -1;
final int n = Math.min(length, this.length - this.position);
buffer.getBytes(this.offset + this.position, this.offset + this.position + n, b, offset);
this.position += n;
return n;
}
@Override
public long position() {
return position;
}
@Override
public void position(final long newPosition) {
position = (int)Math.min(newPosition, length);
}
@Override
public long length() {
return length;
}
}

View File

@ -0,0 +1,59 @@
package it.tdlight.utils;
import io.vertx.core.buffer.Buffer;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import it.unimi.dsi.fastutil.io.MeasurableOutputStream;
import it.unimi.dsi.fastutil.io.RepositionableStream;
public class VertxBufferOutputStream extends MeasurableOutputStream implements RepositionableStream {
/** The buffer backing the output stream. */
public Buffer buffer;
/** Creates a new buffer output stream with an initial capacity of 0 bytes. */
public VertxBufferOutputStream() {
this(0);
}
/** Creates a new buffer output stream with a given initial capacity.
*
* @param initialCapacity the initial length of the backing buffer.
*/
public VertxBufferOutputStream(final int initialCapacity) {
buffer = Buffer.buffer(initialCapacity);
}
/** Creates a new buffer output stream wrapping a given byte buffer.
*
* @param a the byte buffer to wrap.
*/
public VertxBufferOutputStream(final Buffer a) {
buffer = a;
}
@Override
public void write(final int b) {
buffer.appendByte((byte) b);
}
@Override
public void write(final byte[] b, final int off, final int len) {
ByteArrays.ensureOffsetLength(b, off, len);
buffer.appendBytes(b, off, len);
}
@Override
public void position(long newPosition) {
throw new UnsupportedOperationException("Can't change position of a vertx buffer output stream");
}
@Override
public long position() {
return this.length();
}
@Override
public long length() {
return buffer.length();
}
}