This commit is contained in:
Andrea Cavalli 2021-09-23 23:45:41 +02:00
parent 1a73a5a33f
commit be1ca997a7

View File

@ -619,7 +619,17 @@ public class LLUtils {
public static Buffer compositeBuffer(BufferAllocator alloc,
@NotNull Send<Buffer> buffer1,
@NotNull Send<Buffer> buffer2) {
return CompositeBuffer.compose(alloc, buffer1, buffer2);
var b1 = buffer1.receive();
try (var b2 = buffer2.receive()) {
if (b1.writerOffset() < b1.capacity() || b2.writerOffset() < b2.capacity()) {
b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true);
b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes());
b1.writerOffset(b1.writerOffset() + b2.readableBytes());
return b1;
} else {
return CompositeBuffer.compose(alloc, b1.send(), b2.send());
}
}
}
@NotNull
@ -627,7 +637,25 @@ public class LLUtils {
@NotNull Send<Buffer> buffer1,
@NotNull Send<Buffer> buffer2,
@NotNull Send<Buffer> buffer3) {
return CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3);
var b1 = buffer1.receive();
try (var b2 = buffer2.receive()) {
try (var b3 = buffer3.receive()) {
if (b1.writerOffset() < b1.capacity()
|| b2.writerOffset() < b2.capacity()
|| b3.writerOffset() < b3.capacity()) {
b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true);
b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes());
b1.writerOffset(b1.writerOffset() + b2.readableBytes());
b1.ensureWritable(b3.readableBytes(), b3.readableBytes(), true);
b3.copyInto(b3.readerOffset(), b1, b1.writerOffset(), b3.readableBytes());
b1.writerOffset(b1.writerOffset() + b3.readableBytes());
return b1;
} else {
return CompositeBuffer.compose(alloc, b1.send(), b2.send(), b3.send());
}
}
}
}
public static <T> Mono<T> resolveDelta(Mono<Delta<T>> prev, UpdateReturnMode updateReturnMode) {