Add writev operation

Motivation:

writev which allows to write data into multiple buffers

Modification:

-Added iovec array pool to manage iov memory
-flush override to make sure that write is not called

Result:

performance is much better
This commit is contained in:
Josef Grieb 2020-08-29 21:22:15 +02:00
parent 9a5449a790
commit 37944ccffd
6 changed files with 220 additions and 10 deletions

View File

@ -90,6 +90,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private volatile SocketAddress local;
private volatile SocketAddress remote;
//to release it
private long iovecMemoryAddress;
AbstractIOUringChannel(final Channel parent, LinuxSocket socket) {
super(parent);
this.socket = checkNotNull(socket, "fd");
@ -278,15 +281,38 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
logger.trace("IOUring doWrite message size: {}", in.size());
if (!writeScheduled && in.size() >= 1) {
Object msg = in.current();
if (msg instanceof ByteBuf) {
doWriteBytes((ByteBuf) msg);
if (writeScheduled) {
return;
}
int msgCount = in.size();
if (msgCount > 1 && in.current() instanceof ByteBuf) {
doWriteMultiple(in);
//Object msg = in.current();
//doWriteSingle((ByteBuf) msg);
} else if(msgCount == 1) {
Object msg = in.current();
doWriteSingle((ByteBuf) msg);
}
}
protected final void doWriteBytes(ByteBuf buf) {
private void doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool();
iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress();
if (iovecMemoryAddress != -1) {
in.forEachFlushedMessage(iovecArray);
if (iovecArray.count() > 0) {
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
submissionQueue().submit();
}
}
//Todo error handling
}
protected final void doWriteSingle(ByteBuf buf) {
if (buf.hasMemoryAddress()) {
//link poll<link>write operation
addPollOut();
@ -311,6 +337,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
abstract class AbstractUringUnsafe extends AbstractUnsafe {
private IOUringRecvByteAllocatorHandle allocHandle;
@Override
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (!writeScheduled) {
super.flush0();
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
@ -411,12 +447,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
}
}
}
}
void writeComplete(int res) {
writeScheduled = false;
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (iovecMemoryAddress != 0) {
((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress);
}
if (res > 0) {
channelOutboundBuffer.removeBytes(res);
try {

View File

@ -28,12 +28,10 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
AbstractIOUringServerChannel(int fd) {
super(null, new LinuxSocket(fd));
System.out.println("Server Socket fd: " + fd);
}
AbstractIOUringServerChannel(LinuxSocket fd) {
super(null, fd);
System.out.println("Server Socket fd: " + fd);
}
@Override

View File

@ -21,6 +21,7 @@ import io.netty.util.internal.SystemPropertyUtil;
final class IOUring {
private static final Throwable UNAVAILABILITY_CAUSE;
static final int OP_WRITEV = 2;
static final int IO_POLL = 6;
static final int IO_TIMEOUT = 11;
static final int OP_ACCEPT = 13;

View File

@ -57,6 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
private long prevDeadlineNanos = NONE;
private boolean pendingWakeup;
private IovecArrayPool iovecArrayPool;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
@ -64,6 +65,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
ringBuffer = Native.createRingBuffer(ringSize);
eventfd = Native.newEventFd();
logger.trace("New EventLoop: {}", this.toString());
iovecArrayPool = new IovecArrayPool();
}
@Override
@ -193,7 +195,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
}
((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res);
break;
case IOUring.OP_WRITEV:
case IOUring.OP_WRITE:
AbstractIOUringChannel writeChannel = channels.get(fd);
if (writeChannel == null) {
@ -209,7 +211,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res);
}
break;
case IOUring.IO_TIMEOUT:
if (res == ETIME) {
prevDeadlineNanos = NONE;
@ -285,6 +286,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
e.printStackTrace();
}
ringBuffer.close();
iovecArrayPool.release();
}
public RingBuffer getRingBuffer() {
@ -298,4 +300,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
Native.eventFdWrite(eventfd.intValue(), 1L);
}
}
public IovecArrayPool getIovecArrayPool() {
return iovecArrayPool;
}
}

View File

@ -244,6 +244,16 @@ final class IOUringSubmissionQueue {
return true;
}
public boolean addWritev(int fd, long iovecArrayAddress, int length) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
setData(sqe, (byte) IOUring.OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
return true;
}
private int flushSqe() {
long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress));
long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));

View File

@ -0,0 +1,157 @@
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.util.Stack;
import static io.netty.channel.unix.Limits.*;
final class IovecArrayPool implements MessageProcessor {
private static final int ADDRESS_SIZE = Buffer.addressSize();
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
//Todo configurable
private static int poolSize = 40;
//Todo IOVEC entries shoule be lower IOVEMAX
private static final int IOV_ENTRIES = 500;
private static final int IOVEC_ARRAY_SIZE = IOV_SIZE * IOV_ENTRIES;
private static final int CAPACITY = IOVEC_ARRAY_SIZE * poolSize;
private final Stack<Long> remainingIovec;
private long maxBytes = SSIZE_MAX;
private int count;
private long size;
private long currentIovecMemoryAddress;
private final ByteBuffer iovecArrayMemory;
private final long iovecArrayMemoryAddress;
public IovecArrayPool() {
//setup array
remainingIovec = new Stack<Long>();
iovecArrayMemory = Buffer.allocateDirectWithNativeOrder(CAPACITY);
iovecArrayMemoryAddress = Buffer.memoryAddress(iovecArrayMemory);
for (long i = 0; i < poolSize; i++) {
remainingIovec.push(i);
}
}
//Todo better naming
public long createNewIovecMemoryAddress() {
//clear
size = 0;
count = 0;
if (remainingIovec.empty()) {
//Todo allocate new Memory
return -1;
}
long index = remainingIovec.pop();
currentIovecMemoryAddress = index * IOVEC_ARRAY_SIZE + iovecArrayMemoryAddress;
return currentIovecMemoryAddress;
}
//Todo error handling
public void releaseIovec(long iovecAddress) {
long index = (iovecAddress - iovecArrayMemoryAddress) / IOVEC_ARRAY_SIZE;
remainingIovec.push(index);
}
private boolean add(ByteBuf buf, int offset, int len) {
if (count == IOV_ENTRIES) {
// No more room!
return false;
} else if (buf.nioBufferCount() == 1) {
if (len == 0) {
return true;
}
if (buf.hasMemoryAddress()) {
return add(buf.memoryAddress() + offset, len);
} else {
return false;
}
} else {
ByteBuffer[] buffers = buf.nioBuffers(offset, len);
for (ByteBuffer nioBuffer : buffers) {
final int remaining = nioBuffer.remaining();
if (remaining != 0 &&
(!add(Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), remaining) || count ==
IOV_ENTRIES)) {
return false;
}
}
return true;
}
}
private boolean add(long addr, int len) {
assert addr != 0;
// If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we
// will attempt to write some data and make progress.
if (maxBytes - len < size && count > 0) {
// If the size + len will overflow SSIZE_MAX we stop populate the IovArray. This is done as linux
// not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will
// return 'EINVAL', which will raise an IOException.
//
// See also:
// - http://linux.die.net/man/2/writev
return false;
}
final int baseOffset = idx(count);
final int lengthOffset = baseOffset + ADDRESS_SIZE;
size += len;
++count;
if (ADDRESS_SIZE == 8) {
// 64bit
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putLong(baseOffset + currentIovecMemoryAddress, addr);
PlatformDependent.putLong(lengthOffset + currentIovecMemoryAddress, len);
}
} else {
assert ADDRESS_SIZE == 4;
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putInt(baseOffset + currentIovecMemoryAddress, (int) addr);
PlatformDependent.putInt(lengthOffset + currentIovecMemoryAddress, len);
}
}
return true;
}
@Override
public boolean processMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
return add(buffer, buffer.readerIndex(), buffer.readableBytes());
}
return false;
}
public int count() {
return count;
}
private static int idx(int index) {
return IOV_SIZE * index;
}
public void release() {
Buffer.free(iovecArrayMemory);
}
}