Use socket non-blocking instead of blocking

Motivation:

non-blocking sockets are more efficient

Modification:
-use socket non blocking
-some PR cleanups

Result:
probably better performance
This commit is contained in:
Josef Grieb 2020-07-27 20:41:26 +02:00
parent eb1c8e4991
commit 97b4537ab1
11 changed files with 84 additions and 31 deletions

View File

@ -511,7 +511,7 @@ public final class PlatformDependent {
return PlatformDependent0.getInt(address); return PlatformDependent0.getInt(address);
} }
public static int getIntVolatalile(long address) { public static int getIntVolatile(long address) {
return PlatformDependent0.getIntVolatile(address); return PlatformDependent0.getIntVolatile(address);
} }

View File

@ -605,18 +605,6 @@ final class PlatformDependent0 {
UNSAFE.putObject(o, offset, x); UNSAFE.putObject(o, offset, x);
} }
static void loadFence() {
UNSAFE.loadFence();
}
static void storeFence() {
UNSAFE.storeFence();
}
static void fullFence() {
UNSAFE.fullFence();
}
static void copyMemory(long srcAddr, long dstAddr, long length) { static void copyMemory(long srcAddr, long dstAddr, long length) {
// Manual safe-point polling is only needed prior Java9: // Manual safe-point polling is only needed prior Java9:
// See https://bugs.openjdk.java.net/browse/JDK-8149596 // See https://bugs.openjdk.java.net/browse/JDK-8149596

View File

@ -64,4 +64,4 @@ struct io_uring {
int ring_fd; int ring_fd;
}; };
#endif #endif

View File

@ -294,4 +294,4 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
//unload //unload
return NETTY_JNI_VERSION; return NETTY_JNI_VERSION;
} }

View File

@ -32,6 +32,7 @@ import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnresolvedAddressException;
@ -164,6 +165,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
unsafe.executeUringReadOperator(); unsafe.executeUringReadOperator();
} }
//Channel/ChannelHandlerContext.write
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//Todo write until there is nothing left in the buffer //Todo write until there is nothing left in the buffer
@ -240,6 +242,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
throw new UnsupportedOperationException("unsupported message type"); throw new UnsupportedOperationException("unsupported message type");
} }
@Override
protected void doRegister() throws Exception {
((IOUringEventLoop) eventLoop()).add(this);
}
@Override
protected void doDeregister() throws Exception {
((IOUringEventLoop) eventLoop()).remove(this);
}
@Override @Override
public void doBind(final SocketAddress local) throws Exception { public void doBind(final SocketAddress local) throws Exception {
if (local instanceof InetSocketAddress) { if (local instanceof InetSocketAddress) {

View File

@ -21,6 +21,8 @@ final class Event {
private long id; private long id;
private ByteBuf readBuffer; private ByteBuf readBuffer;
//Todo use fd instead
private AbstractIOUringChannel abstractIOUringChannel; private AbstractIOUringChannel abstractIOUringChannel;
private EventType op; private EventType op;

View File

@ -57,7 +57,7 @@ final class IOUringCompletionQueue {
public IOUringCqe peek() { public IOUringCqe peek() {
long cqe = 0; long cqe = 0;
long head = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress));

View File

@ -19,8 +19,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.LongObjectHashMap; import io.netty.util.collection.LongObjectHashMap;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.netty.channel.unix.Errors.*; import static io.netty.channel.unix.Errors.*;
@ -30,6 +33,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
// events should be unique to identify which event type that was // events should be unique to identify which event type that was
private long eventIdCounter; private long eventIdCounter;
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>(); private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
private RingBuffer ringBuffer; private RingBuffer ringBuffer;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
@ -44,6 +48,38 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
return eventId; return eventId;
} }
public void add(AbstractIOUringChannel ch) {
System.out.println("Add Channel: " + ch.socket.intValue());
int fd = ch.socket.intValue();
channels.put(fd, ch);
}
public void remove(AbstractIOUringChannel ch) {
System.out.println("Remove Channel: " + ch.socket.intValue());
int fd = ch.socket.intValue();
AbstractIOUringChannel old = channels.remove(fd);
if (old != null && old != ch) {
// The Channel mapping was already replaced due FD reuse, put back the stored Channel.
channels.put(fd, old);
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen();
}
}
private void closeAll() {
System.out.println("CloseAll IOUringEvenloop");
// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractIOUringChannel[] localChannels = channels.values().toArray(new AbstractIOUringChannel[0]);
for (AbstractIOUringChannel ch : localChannels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
public void addNewEvent(Event event) { public void addNewEvent(Event event) {
events.put(event.getId(), event); events.put(event.getId(), event);
} }
@ -73,7 +109,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
allocHandle.lastBytesRead(ioUringCqe.getRes()); allocHandle.lastBytesRead(ioUringCqe.getRes());
if (allocHandle.lastBytesRead() != -1) { if (allocHandle.lastBytesRead() > 0) {
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
try { try {
pipeline.fireChannelRead(abstractIOUringServerChannel pipeline.fireChannelRead(abstractIOUringServerChannel
@ -96,8 +132,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
ringBuffer.getIoUringSubmissionQueue().submit(); ringBuffer.getIoUringSubmissionQueue().submit();
break; break;
case READ: case READ:
System.out.println("Eventlloop Read Res: " + ioUringCqe.getRes()); System.out.println("EventLoop Read Res: " + ioUringCqe.getRes());
System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd());
ByteBuf byteBuf = event.getReadBuffer(); ByteBuf byteBuf = event.getReadBuffer();
int localReadAmount = ioUringCqe.getRes(); int localReadAmount = ioUringCqe.getRes();
if (localReadAmount > 0) { if (localReadAmount > 0) {
@ -126,8 +162,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
event.getAbstractIOUringChannel().executeReadEvent(); event.getAbstractIOUringChannel().executeReadEvent();
break; break;
case WRITE: case WRITE:
System.out.println("Eventloop Write Res: " + ioUringCqe.getRes()); System.out.println("EventLoop Write Res: " + ioUringCqe.getRes());
System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd());
System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop());
//remove bytes //remove bytes
int localFlushAmount = ioUringCqe.getRes(); int localFlushAmount = ioUringCqe.getRes();
if (localFlushAmount > 0) { if (localFlushAmount > 0) {
@ -135,14 +172,23 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
} }
break; break;
} }
} else {
System.out.println("Event is null!!!! ");
} }
} }
//run tasks //run tasks
if (hasTasks()) { if (hasTasks()) {
runAllTasks(); runAllTasks();
} }
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
System.out.println("Exception error " + t);
}
try { try {
Thread.sleep(10); Thread.sleep(10);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -27,7 +27,7 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann
private final IOUringServerSocketChannelConfig config; private final IOUringServerSocketChannelConfig config;
public IOUringServerSocketChannel() { public IOUringServerSocketChannel() {
super(Socket.newSocketStreamBlocking().getFd()); super(Socket.newSocketStream().intValue());
this.config = new IOUringServerSocketChannelConfig(this); this.config = new IOUringServerSocketChannelConfig(this);
} }

View File

@ -53,6 +53,9 @@ final class IOUringSubmissionQueue {
private final long ringAddress; private final long ringAddress;
private final int ringFd; private final int ringFd;
private static final int SOCK_NONBLOCK = 2048;
private static final int SOCK_CLOEXEC = 524288;
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress,
long submissionQueueArrayAddress, int ringSize, long submissionQueueArrayAddress, int ringSize,
@ -92,9 +95,16 @@ final class IOUringSubmissionQueue {
PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId);
//c union set Rw-Flags or accept_flags
if (type != EventType.ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}
// pad field array -> all fields should be zero // pad field array -> all fields should be zero
long offsetIndex = 0; long offsetIndex = 0;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
@ -124,7 +134,7 @@ final class IOUringSubmissionQueue {
private int flushSqe() { private int flushSqe() {
long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress));
long kHead = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
System.out.println("Ktail: " + kTail); System.out.println("Ktail: " + kTail);

View File

@ -232,10 +232,5 @@ public class FileDescriptor {
private static native int read(int fd, ByteBuffer buf, int pos, int limit); private static native int read(int fd, ByteBuffer buf, int pos, int limit);
private static native int readAddress(int fd, long address, int pos, int limit); private static native int readAddress(int fd, long address, int pos, int limit);
//only temporary
public int getFd() {
return fd;
}
private static native long newPipe(); private static native long newPipe();
} }