Call handle.readComplete() before fireChannlReadComplete() and also cleanup some code

This commit is contained in:
Norman Maurer 2020-09-03 18:40:44 +02:00
parent 1440b4fa0c
commit 0631824dcd
6 changed files with 10 additions and 69 deletions

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DuplexChannel;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -51,6 +50,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
super(parent, fd, remote);
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new IOUringStreamUnsafe();
}
@Override
public ChannelFuture shutdown() {
return shutdown(newPromise());
@ -198,7 +202,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
}
}
class IOUringStreamUnsafe extends AbstractUringUnsafe {
private final class IOUringStreamUnsafe extends AbstractUringUnsafe {
// Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override
@ -272,6 +276,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
scheduleRead();
} else {
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
} catch (Throwable t) {

View File

@ -17,7 +17,6 @@ package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -56,9 +55,4 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann
socket.listen(config.getBacklog());
active = true;
}
@Override
public FileDescriptor fd() {
return super.fd();
}
}

View File

@ -15,33 +15,14 @@
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.uring.AbstractIOUringStreamChannel.IOUringStreamUnsafe;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
public final class IOUringSocketChannel extends AbstractIOUringStreamChannel implements SocketChannel {
private final IOUringSocketChannelConfig config;
//private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
public IOUringSocketChannel() {
super(null, LinuxSocket.newSocketStream(), false);
@ -53,35 +34,16 @@ public final class IOUringSocketChannel extends AbstractIOUringStreamChannel imp
this.config = new IOUringSocketChannelConfig(this);
}
IOUringSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
this.config = new IOUringSocketChannelConfig(this);
// if (parent instanceof IOUringSocketChannel) {
// tcpMd5SigAddresses = ((IOUringSocketChannel) parent).tcpMd5SigAddresses();
// }
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new IOUringStreamUnsafe();
}
@Override
public IOUringSocketChannelConfig config() {
return config;
}
@Override
public FileDescriptor fd() {
return super.fd();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();

View File

@ -27,24 +27,19 @@ import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
import static io.netty.channel.unix.Limits.*;
public class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
public final class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
private volatile boolean allowHalfClosure;
private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
public IOUringSocketChannelConfig(Channel channel) {
IOUringSocketChannelConfig(Channel channel) {
super(channel);
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
setTcpNoDelay(true);
}
calculateMaxBytesPerGatheringWrite();
}
@Override
@ -333,7 +328,6 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements
public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try {
((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
calculateMaxBytesPerGatheringWrite();
return this;
} catch (IOException e) {
throw new ChannelException(e);
@ -641,16 +635,4 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements
super.setMessageSizeEstimator(estimator);
return this;
}
final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
}
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
}
}
}

View File

@ -83,8 +83,7 @@ final class Native {
}
public static RingBuffer createRingBuffer() {
//Todo throw Exception if it's null
return ioUringSetup(DEFAULT_RING_SIZE);
return createRingBuffer(DEFAULT_RING_SIZE);
}
private static native RingBuffer ioUringSetup(int entries);

View File

@ -37,5 +37,4 @@ final class RingBuffer {
getIoUringSubmissionQueue().release();
Native.ioUringExit(this);
}
}