Merge pull request #30 from normanmaurer/handle_complete_cleanup

Call handle.readComplete() before fireChannlReadComplete() and also c…
This commit is contained in:
Josef Grieb 2020-09-04 06:43:20 +02:00 committed by GitHub
commit 9e13c5cfd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 10 additions and 69 deletions

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.DuplexChannel;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -51,6 +50,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
super(parent, fd, remote); super(parent, fd, remote);
} }
@Override
protected AbstractUringUnsafe newUnsafe() {
return new IOUringStreamUnsafe();
}
@Override @Override
public ChannelFuture shutdown() { public ChannelFuture shutdown() {
return shutdown(newPromise()); return shutdown(newPromise());
@ -198,7 +202,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
} }
} }
class IOUringStreamUnsafe extends AbstractUringUnsafe { private final class IOUringStreamUnsafe extends AbstractUringUnsafe {
// Overridden here just to be able to access this method from AbstractEpollStreamChannel // Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override @Override
@ -267,6 +271,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
scheduleRead(); scheduleRead();
} else { } else {
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later. // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} }
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -17,7 +17,6 @@ package io.netty.channel.uring;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -56,9 +55,4 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann
socket.listen(config.getBacklog()); socket.listen(config.getBacklog());
active = true; active = true;
} }
@Override
public FileDescriptor fd() {
return super.fd();
}
} }

View File

@ -15,33 +15,14 @@
*/ */
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.uring.AbstractIOUringStreamChannel.IOUringStreamUnsafe;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
public final class IOUringSocketChannel extends AbstractIOUringStreamChannel implements SocketChannel { public final class IOUringSocketChannel extends AbstractIOUringStreamChannel implements SocketChannel {
private final IOUringSocketChannelConfig config; private final IOUringSocketChannelConfig config;
//private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
public IOUringSocketChannel() { public IOUringSocketChannel() {
super(null, LinuxSocket.newSocketStream(), false); super(null, LinuxSocket.newSocketStream(), false);
@ -53,35 +34,16 @@ public final class IOUringSocketChannel extends AbstractIOUringStreamChannel imp
this.config = new IOUringSocketChannelConfig(this); this.config = new IOUringSocketChannelConfig(this);
} }
IOUringSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
this.config = new IOUringSocketChannelConfig(this);
// if (parent instanceof IOUringSocketChannel) {
// tcpMd5SigAddresses = ((IOUringSocketChannel) parent).tcpMd5SigAddresses();
// }
}
@Override @Override
public ServerSocketChannel parent() { public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();
} }
@Override
protected AbstractUringUnsafe newUnsafe() {
return new IOUringStreamUnsafe();
}
@Override @Override
public IOUringSocketChannelConfig config() { public IOUringSocketChannelConfig config() {
return config; return config;
} }
@Override
public FileDescriptor fd() {
return super.fd();
}
@Override @Override
public InetSocketAddress remoteAddress() { public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress(); return (InetSocketAddress) super.remoteAddress();

View File

@ -27,24 +27,19 @@ import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.*;
import static io.netty.channel.unix.Limits.*;
public class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { public final class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
private volatile boolean allowHalfClosure; private volatile boolean allowHalfClosure;
private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
public IOUringSocketChannelConfig(Channel channel) { IOUringSocketChannelConfig(Channel channel) {
super(channel); super(channel);
if (PlatformDependent.canEnableTcpNoDelayByDefault()) { if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
setTcpNoDelay(true); setTcpNoDelay(true);
} }
calculateMaxBytesPerGatheringWrite();
} }
@Override @Override
@ -333,7 +328,6 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements
public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) { public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try { try {
((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); ((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
calculateMaxBytesPerGatheringWrite();
return this; return this;
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
@ -641,16 +635,4 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements
super.setMessageSizeEstimator(estimator); super.setMessageSizeEstimator(estimator);
return this; return this;
} }
final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
}
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
}
}
} }

View File

@ -83,8 +83,7 @@ final class Native {
} }
public static RingBuffer createRingBuffer() { public static RingBuffer createRingBuffer() {
//Todo throw Exception if it's null return createRingBuffer(DEFAULT_RING_SIZE);
return ioUringSetup(DEFAULT_RING_SIZE);
} }
private static native RingBuffer ioUringSetup(int entries); private static native RingBuffer ioUringSetup(int entries);

View File

@ -37,5 +37,4 @@ final class RingBuffer {
getIoUringSubmissionQueue().release(); getIoUringSubmissionQueue().release();
Native.ioUringExit(this); Native.ioUringExit(this);
} }
} }