First running prototype 🎉

Motivation:
missing eventLoop completionQueue logic

Modification:
-mapping between channel and eventloop
-added new prototype blocking example

Result:
First running prototype
This commit is contained in:
Josef Grieb 2020-07-20 22:43:40 +02:00
parent 339d5f1565
commit 8c9b874a2d
33 changed files with 697 additions and 325 deletions

View File

@ -471,18 +471,6 @@ public final class PlatformDependent {
return new ConcurrentHashMap<K, V>(map);
}
public static void loadFence() {
PlatformDependent0.loadFence();
}
public static void storeFence() {
PlatformDependent0.storeFence();
}
public static void fullFence() {
PlatformDependent0.fullFence();
}
/**
* Try to deallocate the specified direct {@link ByteBuffer}. Please note this method does nothing if
* the current platform does not support this operation or the specified buffer is not a direct buffer.
@ -523,6 +511,14 @@ public final class PlatformDependent {
return PlatformDependent0.getInt(address);
}
public static int getIntVolatalile(long address) {
return PlatformDependent0.getIntVolatile(address);
}
public static void putIntOrdered(long adddress, int newValue) {
PlatformDependent0.putIntOrdered(adddress, newValue);
}
public static long getLong(long address) {
return PlatformDependent0.getLong(address);
}

View File

@ -529,6 +529,14 @@ final class PlatformDependent0 {
return UNSAFE.getInt(address);
}
static int getIntVolatile(long address) {
return UNSAFE.getIntVolatile(null, address);
}
static void putIntOrdered(long adddress, int newValue) {
UNSAFE.putOrderedInt(null, adddress, newValue);
}
static long getLong(long address) {
return UNSAFE.getLong(address);
}

View File

@ -43,6 +43,13 @@
<artifactId>netty-buffer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<version>${project.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport</artifactId>

View File

@ -0,0 +1,65 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.uring;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.uring.IOUringEventLoopGroup;
import io.netty.channel.uring.IOUringServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
//temporary prototype example
public class EchoIOUringServer {
private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String []args) {
EventLoopGroup bossGroup = new IOUringEventLoopGroup(1);
EventLoopGroup workerGroup = new IOUringEventLoopGroup(1);
final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(IOUringServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.uring;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
//temporary prototype example
@Sharable
public class EchoIOUringServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

View File

@ -1,5 +1,6 @@
#!/bin/bash -e
EXAMPLE_MAP=(
'uring:io.netty.example.uring.EchoIOUringServer'
'discard-client:io.netty.example.discard.DiscardClient'
'discard-server:io.netty.example.discard.DiscardServer'
'echo-client:io.netty.example.echo.EchoClient'

View File

@ -1,9 +1,27 @@
/* SPDX-License-Identifier: MIT */
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include <linux/io_uring.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
#include "netty_unix_jni.h"
#include "netty_unix_socket.h"
#include "netty_unix_util.h"
#ifndef LIB_TEST
#define LIB_TEST

View File

@ -1,10 +1,20 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#define _GNU_SOURCE // RTLD_DEFAULT
#include "io_uring.h"
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
#include "netty_unix_jni.h"
#include "netty_unix_socket.h"
#include "netty_unix_util.h"
#include <dlfcn.h>
#include <errno.h>
#include <fcntl.h>

View File

@ -1,6 +1,17 @@
/* SPDX-License-Identifier: MIT */
/*
* Will go away once libc support is there
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include "syscall.h"
#include <signal.h>

View File

@ -1,4 +1,18 @@
/* SPDX-License-Identifier: MIT */
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include <linux/io_uring.h>
#include <signal.h>
#ifndef LIBURING_SYSCALL_H

View File

@ -24,8 +24,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
@ -36,17 +38,26 @@ import java.nio.channels.UnresolvedAddressException;
import static io.netty.util.internal.ObjectUtil.*;
public abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel {
private volatile SocketAddress local;
abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket;
protected volatile boolean active;
boolean uringInReadyPending;
private volatile SocketAddress local;
private volatile SocketAddress remote;
AbstractIOUringChannel(final Channel parent, LinuxSocket fd) {
super(parent);
this.socket = checkNotNull(fd, "fd");
this.active = true;
if (active) {
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.local = fd.localAddress();
this.remote = fd.remoteAddress();
}
}
public boolean isOpen() {
@ -63,6 +74,11 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
return METADATA;
}
@Override
public FileDescriptor fd() {
return socket;
}
@Override
protected abstract AbstractUringUnsafe newUnsafe();
@ -86,6 +102,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
event.setAbstractIOUringChannel(this);
submissionQueue.add(eventId, EventType.READ, socket.getFd(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity());
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit();
}
}
@ -128,6 +146,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
@Override
protected void doClose() throws Exception {
socket.close();
}
// Channel/ChannelHandlerContext.read() was called
@ -140,9 +159,15 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
}
}
public void executeReadEvent() {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
unsafe.executeUringReadOperator();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (in.size() == 1) {
//Todo write until there is nothing left in the buffer
if (in.size() >= 1) {
Object msg = in.current();
if (msg instanceof ByteBuf) {
doWriteBytes((ByteBuf) msg);
@ -150,7 +175,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
}
}
protected final void doWriteBytes(ByteBuf buf) throws Exception {
protected final void doWriteBytes(ByteBuf buf) {
if (buf.hasMemoryAddress()) {
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
@ -161,6 +186,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
event.setAbstractIOUringChannel(this);
submissionQueue.add(eventId, EventType.WRITE, socket.getFd(), buf.memoryAddress(), buf.readerIndex(),
buf.writerIndex());
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit();
}
}
@ -174,26 +201,23 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
}
};
/**
* Create a new {@link } instance.
*
* @param handle The handle to wrap with EPOLL specific logic.
*/
IOUringRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) {
return new IOUringRecvByteAllocatorHandle(handle);
}
@Override
public IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
allocHandle = newIOUringHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
}
return allocHandle;
}
//Todo
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
promise.setFailure(new Exception());
}
final void executeUringReadOperator() {
@ -217,7 +241,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
}
@Override
public void doBind(final SocketAddress localAddress) throws Exception {
public void doBind(final SocketAddress local) throws Exception {
if (local instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) local);
}
@ -236,12 +260,12 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
@Override
protected SocketAddress localAddress0() {
return null;
return local;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
return remote;
}
public LinuxSocket getSocket() {

View File

@ -16,17 +16,13 @@
package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor;
import java.net.SocketAddress;
public abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
private volatile SocketAddress local;
abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
AbstractIOUringServerChannel(int fd) {
super(null, new LinuxSocket(fd));
@ -41,31 +37,16 @@ public abstract class AbstractIOUringServerChannel extends AbstractIOUringChanne
return new UringServerChannelUnsafe();
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public FileDescriptor fd() {
return null;
}
public AbstractIOUringChannel getChannel() {
return this;
}
abstract Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception;
abstract Channel newChildChannel(int fd) throws Exception;
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
private final byte[] acceptedAddress = new byte[26];
@ -87,9 +68,11 @@ public abstract class AbstractIOUringServerChannel extends AbstractIOUringChanne
event.setOp(EventType.ACCEPT);
event.setAbstractIOUringChannel(getChannel());
//todo get network addresses
//Todo get network addresses
submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().getFd(), 0, 0, 0);
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit();
}
}
}

View File

@ -17,7 +17,7 @@ package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
public class Event {
final class Event {
private long id;
private ByteBuf readBuffer;
@ -56,4 +56,3 @@ public class Event {
this.op = op;
}
}

View File

@ -15,7 +15,7 @@
*/
package io.netty.channel.uring;
public enum EventType {
enum EventType {
ACCEPT(13),
READ(22),
WRITE(23);

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.Channel;

View File

@ -21,13 +21,13 @@ public class IOUringCompletionQueue {
//these offsets are used to access specific properties
//CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162)
private final int CQE_USER_DATA_FIELD = 0;
private final int CQE_RES_FIELD = 8;
private final int CQE_FLAGS_FIELD = 12;
private static final int CQE_USER_DATA_FIELD = 0;
private static final int CQE_RES_FIELD = 8;
private static final int CQE_FLAGS_FIELD = 12;
private final int CQE_SIZE = 16;
private static final int CQE_SIZE = 16;
private final int IORING_ENTER_GETEVENTS = 1;
private static final int IORING_ENTER_GETEVENTS = 1;
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
private final long kHeadAddress;
@ -55,12 +55,10 @@ public class IOUringCompletionQueue {
this.ringFd = ringFd;
}
private IOUringCqe peek() {
public IOUringCqe peek() {
long cqe = 0;
long head = toUnsignedLong(PlatformDependent.getInt(kHeadAddress));
long head = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress));
//aquire memory barrier https://openjdk.java.net/jeps/171
PlatformDependent.loadFence();
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress));
cqe = index * CQE_SIZE + completionQueueArrayAddress;
@ -70,8 +68,7 @@ public class IOUringCompletionQueue {
long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD));
//Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
PlatformDependent.storeFence();
PlatformDependent.putInt(kHeadAddress, (int) (head + 1));
PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1));
return new IOUringCqe(eventId, res, flags);
}

View File

@ -15,12 +15,12 @@
*/
package io.netty.channel.uring;
public class IOUringCqe {
class IOUringCqe {
private final long eventId;
private final int res;
private final long flags;
public IOUringCqe(long eventId, int res, long flags) {
IOUringCqe(long eventId, int res, long flags) {
this.eventId = eventId;
this.res = res;
this.flags = flags;

View File

@ -15,23 +15,22 @@
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import java.util.HashMap;
import java.util.concurrent.Executor;
import static io.netty.channel.unix.Errors.*;
class IOUringEventLoop extends SingleThreadEventLoop {
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
// events should be unique to identify which event type that was
private long eventIdCounter;
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
private final RingBuffer ringBuffer;
private RingBuffer ringBuffer;
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
@ -40,6 +39,7 @@ class IOUringEventLoop extends SingleThreadEventLoop {
public long incrementEventIdCounter() {
long eventId = eventIdCounter;
System.out.println(" incrementEventIdCounter EventId: " + eventId);
eventIdCounter++;
return eventId;
}
@ -50,26 +50,104 @@ class IOUringEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
//Todo
for (;;) {
// wait until an event has finished
//final long cqe = Native.ioUringWaitCqe(io_uring);
//final Event event = events.get(Native.ioUringGetEventId(cqe));
//final int ret = Native.ioUringGetRes(cqe);
// switch (event.getOp()) {
// case ACCEPT:
// // serverChannel is necessary to call newChildchannel
// // create a new accept event
// break;
// case READ:
// // need to save the Bytebuf before I execute the read operation
// // fireChannelRead(byteBuf)
// break;
// case WRITE:
// // you have to store Bytebuf to continue writing
// break;
// }
// processing Tasks
final IOUringCompletionQueue ioUringCompletionQueue = ringBuffer.getIoUringCompletionQueue();
final IOUringCqe ioUringCqe = ioUringCompletionQueue.peek(); // or waiting
if (ioUringCqe != null) {
final Event event = events.get(ioUringCqe.getEventId());
System.out.println("Completion EventId: " + ioUringCqe.getEventId());
if (event != null) {
switch (event.getOp()) {
case ACCEPT:
System.out.println("EventLoop Accept Res: " + ioUringCqe.getRes());
if (ioUringCqe.getRes() != -1 && ioUringCqe.getRes() != ERRNO_EAGAIN_NEGATIVE &&
ioUringCqe.getRes() != ERRNO_EWOULDBLOCK_NEGATIVE) {
AbstractIOUringServerChannel abstractIOUringServerChannel =
(AbstractIOUringServerChannel) event.getAbstractIOUringChannel();
System.out.println("EventLoop Fd: " + abstractIOUringServerChannel.getSocket().getFd());
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
allocHandle.lastBytesRead(ioUringCqe.getRes());
if (allocHandle.lastBytesRead() != -1) {
allocHandle.incMessagesRead(1);
try {
pipeline.fireChannelRead(abstractIOUringServerChannel
.newChildChannel(allocHandle.lastBytesRead()));
} catch (Exception e) {
e.printStackTrace();
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
long eventId = incrementEventIdCounter();
event.setId(eventId);
ringBuffer.getIoUringSubmissionQueue()
.add(eventId, EventType.ACCEPT, event.getAbstractIOUringChannel().getSocket().getFd(),
0,
0,
0);
addNewEvent(event);
ringBuffer.getIoUringSubmissionQueue().submit();
break;
case READ:
System.out.println("Eventlloop Read Res: " + ioUringCqe.getRes());
System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd());
ByteBuf byteBuf = event.getReadBuffer();
int localReadAmount = ioUringCqe.getRes();
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
allocHandle.lastBytesRead(localReadAmount);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
break;
}
allocHandle.incMessagesRead(1);
//readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
event.getAbstractIOUringChannel().executeReadEvent();
break;
case WRITE:
System.out.println("Eventloop Write Res: " + ioUringCqe.getRes());
System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd());
//remove bytes
int localFlushAmount = ioUringCqe.getRes();
if (localFlushAmount > 0) {
event.getAbstractIOUringChannel().unsafe().outboundBuffer().removeBytes(localFlushAmount);
}
break;
}
} else {
System.out.println("Event is null!!!! ");
}
}
//run tasks
if (hasTasks()) {
runAllTasks();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -27,31 +27,41 @@ import io.netty.util.concurrent.RejectedExecutionHandlers;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
public class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads and the default {@link ThreadFactory}.
*/
public IOUringEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
public IOUringEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory) null);
}
/**
* Create a new instance using the default number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, 0);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, (ThreadFactory) null, selectStrategyFactory);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, 0);
@ -61,17 +71,33 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory,
SelectStrategyFactory selectStrategyFactory) {
this(nThreads, threadFactory, 0, selectStrategyFactory);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #IOUringEventLoopGroup(int)} or {@link #IOUringEventLoopGroup(int, ThreadFactory)}
*/
@Deprecated
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #IOUringEventLoopGroup(int)}, {@link #IOUringEventLoopGroup(int, ThreadFactory)}, or
* {@link #IOUringEventLoopGroup(int, SelectStrategyFactory)}
*/
@Deprecated
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce,
SelectStrategyFactory selectStrategyFactory) {
@ -101,6 +127,9 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
selectStrategyFactory, rejectedExecutionHandler, queueFactory);
}
/**
* @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts.
*/
@Deprecated
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
@ -108,9 +137,14 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
}
}
//Todo
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null;
// return new IOUringEventLoop(this, executor, (Integer) args[0],
// ((SelectStrategyFactory) args[1]).newSelectStrategy(),
// (RejectedExecutionHandler) args[2], queueFactory);
return new IOUringEventLoop(this, executor, false);
}
}

View File

@ -21,7 +21,7 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
public class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle
final class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle
implements RecvByteBufAllocator.ExtendedHandle {
private final PreferredDirectByteBufAllocator preferredDirectByteBufAllocator =
new PreferredDirectByteBufAllocator();
@ -37,19 +37,19 @@ public class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.Delegat
}
@Override
public final ByteBuf allocate(ByteBufAllocator alloc) {
public ByteBuf allocate(ByteBufAllocator alloc) {
// We need to ensure we always allocate a direct ByteBuf as we can only use a direct buffer to read via JNI.
preferredDirectByteBufAllocator.updateAllocator(alloc);
return delegate().allocate(preferredDirectByteBufAllocator);
}
@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
}
@Override
public final boolean continueReading() {
public boolean continueReading() {
return false;
}
}

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBufAllocator;

View File

@ -17,6 +17,7 @@ package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import java.net.InetSocketAddress;
@ -26,29 +27,17 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp
private final IOUringServerSocketChannelConfig config;
public IOUringServerSocketChannel() {
super(Socket.newSocketStream().getFd());
super(Socket.newSocketStreamBlocking().getFd());
this.config = new IOUringServerSocketChannelConfig(this);
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);
}
@Override
public IOUringServerSocketChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return false;
}
@Override
Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception {
Channel newChildChannel(int fd) throws Exception {
return new IOUringSocketChannel(this, new LinuxSocket(fd));
}
@ -66,4 +55,16 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);
socket.listen(500);
active = true;
}
@Override
public FileDescriptor fd() {
return super.fd();
}
}

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.socket.ServerSocketChannelConfig;

View File

@ -21,7 +21,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
@ -43,11 +45,6 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
return (ServerSocketChannel) super.parent();
}
@Override
public IOUringSocketChannelConfig config() {
return config;
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new AbstractUringUnsafe() {
@ -67,7 +64,8 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
public IOUringSocketChannelConfig config() {
return config;
}
@Override
@ -117,17 +115,17 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
@Override
public FileDescriptor fd() {
return null;
return super.fd();
}
@Override
protected SocketAddress localAddress0() {
return null;
return super.localAddress0();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
return super.remoteAddress0();
}
@Override

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBufAllocator;
@ -20,7 +35,6 @@ public class IOUringSocketChannelConfig extends IOUringChannelConfig implements
super(channel);
}
@Override
public int getReceiveBufferSize() {
try {

View File

@ -19,21 +19,21 @@ import io.netty.util.internal.PlatformDependent;
public class IOUringSubmissionQueue {
private final int SQE_SIZE = 64;
private final int INT_SIZE = 4;
private static final int SQE_SIZE = 64;
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
//these offsets are used to access specific properties
//SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21
private final int SQE_OP_CODE_FIELD = 0;
private final int SQE_FLAGS_FIELD = 1;
private final int SQE_IOPRIO_FIELD = 2; // u16
private final int SQE_FD_FIELD = 4; // s32
private final int SQE_OFFSET_FIELD = 8;
private final int SQE_ADDRESS_FIELD = 16;
private final int SQE_LEN_FIELD = 24;
private final int SQE_RW_FLAGS_FIELD = 28;
private final int SQE_USER_DATA_FIELD = 32;
private final int SQE_PAD_FIELD = 40;
private static final int SQE_OP_CODE_FIELD = 0;
private static final int SQE_FLAGS_FIELD = 1;
private static final int SQE_IOPRIO_FIELD = 2; // u16
private static final int SQE_FD_FIELD = 4; // s32
private static final int SQE_OFFSET_FIELD = 8;
private static final int SQE_ADDRESS_FIELD = 16;
private static final int SQE_LEN_FIELD = 24;
private static final int SQE_RW_FLAGS_FIELD = 28;
private static final int SQE_USER_DATA_FIELD = 32;
private static final int SQE_PAD_FIELD = 40;
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
private final long kHeadAddress;
@ -54,7 +54,8 @@ public class IOUringSubmissionQueue {
private final int ringFd;
public IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize,
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
long submissionQueueArrayAddress, int ringSize,
long ringAddress, int ringFd) {
this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress;
@ -102,21 +103,28 @@ public class IOUringSubmissionQueue {
}
System.out.println("OPField: " + PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD));
System.out.println("UserDataField: " + PlatformDependent.getByte(sqe + SQE_USER_DATA_FIELD));
System.out.println("UserDataField: " + PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
System.out.println("BufferAddress: " + PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
System.out.println("Length: " + PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
System.out.println("Offset: " + PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
}
//Todo ring buffer errors for example if submission queue is full
public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
System.out.println("fd " + fd);
System.out.println("BufferAddress + pos: " + (bufferAddress + pos));
System.out.println("limit + pos " + (limit - pos));
setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0);
return true;
}
private int flushSqe() {
long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress));
long kHead = toUnsignedLong(PlatformDependent.getInt(kHeadAddress));
long kHead = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress));
long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
System.out.println("Ktail: " + kTail);
@ -139,10 +147,7 @@ public class IOUringSubmissionQueue {
toSubmit--;
}
//release memory barrier
PlatformDependent.storeFence();
PlatformDependent.putInt(kTailAddress, (int) kTail);
PlatformDependent.putIntOrdered(kTailAddress, (int) kTail);
return (int) (kTail - kHead);
}

View File

@ -25,18 +25,7 @@ public class LinuxSocket extends Socket {
this.fd = fd;
}
//Todo
// public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
// return Native.ioUringRead(ring, fd, eventId, bufferAddress, pos, limit);
// }
// public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
// return Native.ioUringWrite(ring, fd, eventId, bufferAddress, pos, limit);
// }
// public int acceptEvent(long ring, long eventId, byte[] addr) {
// return Native.ioUringAccept(ring, eventId, addr);
// }
public int getFd() {
return fd;
}
}

View File

@ -25,7 +25,6 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Locale;
public final class Native {
@ -33,6 +32,7 @@ public final class Native {
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
static {
loadNativeLibrary();
Socket.initialize();
}
public static RingBuffer createRingBuffer(int ringSize) {

View File

@ -15,13 +15,12 @@
*/
package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
public class RingBuffer {
class RingBuffer {
private final IOUringSubmissionQueue ioUringSubmissionQueue;
private final IOUringCompletionQueue ioUringCompletionQueue;
public RingBuffer(IOUringSubmissionQueue ioUringSubmissionQueue, IOUringCompletionQueue ioUringCompletionQueue) {
RingBuffer(IOUringSubmissionQueue ioUringSubmissionQueue, IOUringCompletionQueue ioUringCompletionQueue) {
this.ioUringSubmissionQueue = ioUringSubmissionQueue;
this.ioUringCompletionQueue = ioUringCompletionQueue;
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.uring;
import io.netty.channel.unix.Socket;
import org.junit.Test;
import java.io.FileInputStream;

View File

@ -69,6 +69,11 @@ static int nettyNonBlockingSocket(int domain, int type, int protocol) {
#endif
}
//only temporary
static int nettyBlockingSocket(int domain, int type, int protocol) {
return socket(domain, type, protocol);
}
int netty_unix_socket_ipAddressLength(const struct sockaddr_storage* addr) {
if (addr->ss_family == AF_INET) {
return 4;
@ -613,6 +618,17 @@ static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz, jbool
return _socket(env, clazz, domain, SOCK_STREAM);
}
//only temporary
static jint netty_unix_socket_newSocketStreamFd_blocking(JNIEnv* env, jclass clazz, jboolean ipv6) {
int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET;
int fd = nettyBlockingSocket(domain, SOCK_STREAM, 0);
if (fd == -1) {
return -errno;
}
return fd;
}
static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) {
int fd = nettyNonBlockingSocket(PF_UNIX, SOCK_STREAM, 0);
if (fd == -1) {
@ -979,6 +995,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress },
{ "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd },
{ "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd },
{ "newSocketStreamFdBlocking", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd_blocking }, //temporary
{ "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd },
{ "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo },
{ "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress },

View File

@ -397,6 +397,11 @@ public class Socket extends FileDescriptor {
return new Socket(newSocketStream0());
}
public static Socket newSocketStreamBlocking() {
System.out.println("newSocketStreamBlocking");
return new Socket(newSocketStreamBlocking(isIPv6Preferred()));
}
public static Socket newSocketDgram() {
return new Socket(newSocketDgram0());
}
@ -423,6 +428,15 @@ public class Socket extends FileDescriptor {
return res;
}
//only temporary
protected static int newSocketStreamBlocking(boolean ipv6) {
int res = newSocketStreamFdBlocking(ipv6);
if (res < 0) {
throw new ChannelException(newIOException("newSocketStream", res));
}
return res;
}
protected static int newSocketDgram0() {
return newSocketDgram0(isIPv6Preferred());
}
@ -471,6 +485,7 @@ public class Socket extends FileDescriptor {
private static native int sendFd(int socketFd, int fd);
private static native int newSocketStreamFd(boolean ipv6);
private static native int newSocketStreamFdBlocking(boolean ipv6);
private static native int newSocketDgramFd(boolean ipv6);
private static native int newSocketDomainFd();