[#2375] [#2404] Fix bug in respecting ChannelConfig.setAutoRead(false) and also fix Channel.read() for OIO
Motivation: At the moment ChanneConfig.setAutoRead(false) only is guaranteer to not have an extra channelRead(...) triggered when used from within the channelRead(...) or channelReadComplete(...) method. This is not the correct behaviour as it should also work from other methods that are triggered from within the EventLoop. For example a valid use case is to have it called from within a ChannelFutureListener, which currently not work as expected. Beside this there is another bug which is kind of related. Currently Channel.read() will not work as expected for OIO as we will stop try to read even if nothing could be read there after one read operation on the socket (when the SO_TIMEOUT kicks in). Modifications: Implement the logic the right way for the NIO/OIO/SCTP and native transport, specific to the transport implementation. Also correctly handle Channel.read() for OIO transport by trigger a new read if SO_TIMEOUT was catched. Result: It is now also possible to use ChannelConfig.setAutoRead(false) from other methods that are called from within the EventLoop and have direct effect.
This commit is contained in:
parent
8683e1ef3e
commit
541abb8515
@ -0,0 +1,205 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.testsuite.transport.socket;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class SocketAutoReadTest extends AbstractSocketTest {
|
||||
private static final Random random = new Random();
|
||||
static final byte[] data = new byte[1024];
|
||||
|
||||
static {
|
||||
random.nextBytes(data);
|
||||
}
|
||||
|
||||
// See https://github.com/netty/netty/pull/2375
|
||||
@Test(timeout = 30000)
|
||||
public void testAutoReadDisableOutsideChannelRead() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testAutoReadDisableOutsideChannelRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
TestHandler sh = new TestHandler() {
|
||||
private boolean allBytesReceived;
|
||||
@Override
|
||||
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
assertFalse(allBytesReceived);
|
||||
ctx.writeAndFlush(msg);
|
||||
ctx.channel().eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
allBytesReceived = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
sb.childHandler(sh);
|
||||
|
||||
TestHandler ch = new TestHandler();
|
||||
cb.handler(ch);
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.connect().sync().channel();
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
|
||||
cc.close().sync();
|
||||
sc.close().sync();
|
||||
|
||||
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
if (sh.exception.get() != null) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
// See https://github.com/netty/netty/pull/2375
|
||||
@Test(timeout = 30000)
|
||||
public void testAutoReadDisableOutsideChannelReadManualRead() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testAutoReadDisableOutsideChannelReadManualRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
|
||||
ServerTestHandler sh = new ServerTestHandler();
|
||||
sb.childHandler(sh);
|
||||
|
||||
TestHandler ch = new TestHandler();
|
||||
cb.handler(ch);
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.connect().sync().channel();
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
|
||||
Thread.sleep(500);
|
||||
sh.await();
|
||||
cc.close().sync();
|
||||
sc.close().sync();
|
||||
|
||||
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
if (sh.exception.get() != null) {
|
||||
throw sh.exception.get();
|
||||
}
|
||||
if (ch.exception.get() != null) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ServerTestHandler extends TestHandler {
|
||||
enum State {
|
||||
AUTO_READ,
|
||||
SCHEDULED,
|
||||
BYTES_RECEIVED,
|
||||
READ_SCHEDULED
|
||||
}
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private State state = State.AUTO_READ;
|
||||
|
||||
@Override
|
||||
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
ctx.writeAndFlush(msg);
|
||||
switch (state) {
|
||||
case READ_SCHEDULED:
|
||||
latch.countDown();
|
||||
break;
|
||||
case AUTO_READ:
|
||||
state = State.SCHEDULED;
|
||||
ctx.channel().eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
state = State.BYTES_RECEIVED;
|
||||
ctx.channel().eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
state = State.READ_SCHEDULED;
|
||||
ctx.channel().read();
|
||||
}
|
||||
}, 2, TimeUnit.SECONDS);
|
||||
}
|
||||
});
|
||||
break;
|
||||
case BYTES_RECEIVED:
|
||||
// Once the state is BYTES_RECEIVED we should not receive anymore data.
|
||||
fail();
|
||||
break;
|
||||
case SCHEDULED:
|
||||
// nothing to do
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public void await() throws InterruptedException {
|
||||
latch.await();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHandler extends ChannelInboundHandlerAdapter {
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx,
|
||||
Throwable cause) throws Exception {
|
||||
if (exception.compareAndSet(null, cause)) {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
}
|
||||
}
|
@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.UnresolvedAddressException;
|
||||
@ -103,10 +104,22 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
}
|
||||
}
|
||||
|
||||
protected final void clearEpollIn() {
|
||||
if ((flags & readFlag) != 0) {
|
||||
flags &= ~readFlag;
|
||||
((EpollEventLoop) eventLoop()).modify(this);
|
||||
final void clearEpollIn() {
|
||||
final EventLoop loop = eventLoop();
|
||||
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
|
||||
if (loop.inEventLoop()) {
|
||||
unsafe.clearEpollIn0();
|
||||
} else {
|
||||
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
|
||||
loop.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!config().isAutoRead() && !unsafe.readPending) {
|
||||
// Still no read triggered so clear it now
|
||||
unsafe.clearEpollIn0();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -183,5 +196,12 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
private boolean isFlushPending() {
|
||||
return (flags & Native.EPOLLOUT) != 0;
|
||||
}
|
||||
|
||||
protected final void clearEpollIn0() {
|
||||
if ((flags & readFlag) != 0) {
|
||||
flags &= ~readFlag;
|
||||
((EpollEventLoop) eventLoop()).modify(AbstractEpollChannel.this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -302,7 +302,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
}
|
||||
|
||||
if (remoteAddress == null) {
|
||||
remoteAddress = this.remote;
|
||||
remoteAddress = remote;
|
||||
if (remoteAddress == null) {
|
||||
throw new NotYetConnectedException();
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
|
||||
|
||||
EpollDatagramChannelConfig(EpollDatagramChannel channel) {
|
||||
super(channel);
|
||||
this.datagramChannel = channel;
|
||||
datagramChannel = channel;
|
||||
setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR);
|
||||
}
|
||||
|
||||
@ -272,4 +272,9 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
|
||||
public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
|
||||
throw new UnsupportedOperationException("Multicast not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
datagramChannel.clearEpollIn();
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
clearEpollIn();
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,4 +197,8 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
channel.clearEpollIn();
|
||||
}
|
||||
}
|
||||
|
@ -355,7 +355,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
inputShutdown = true;
|
||||
if (isOpen()) {
|
||||
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
|
||||
clearEpollIn();
|
||||
clearEpollIn0();
|
||||
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
} else {
|
||||
close(voidPromise());
|
||||
@ -657,7 +657,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
clearEpollIn();
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -271,4 +271,9 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig
|
||||
super.setMessageSizeEstimator(estimator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
channel.clearEpollIn();
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
super(parent, eventLoop, sctpChannel, SelectionKey.OP_READ);
|
||||
try {
|
||||
sctpChannel.configureBlocking(false);
|
||||
config = new DefaultSctpChannelConfig(this, sctpChannel);
|
||||
config = new NioSctpChannelConfig(this, sctpChannel);
|
||||
notificationHandler = new SctpNotificationHandler(this);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
@ -374,11 +374,11 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
private static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
private static final Recycler<NioSctpChannelOutboundBuffer> RECYCLER =
|
||||
new Recycler<NioSctpChannelOutboundBuffer>() {
|
||||
@Override
|
||||
protected NioSctpChannelOutboundBuffer newObject(Handle<NioSctpChannelOutboundBuffer> handle) {
|
||||
return new NioSctpChannelOutboundBuffer(handle);
|
||||
}
|
||||
};
|
||||
@Override
|
||||
protected NioSctpChannelOutboundBuffer newObject(Handle<NioSctpChannelOutboundBuffer> handle) {
|
||||
return new NioSctpChannelOutboundBuffer(handle);
|
||||
}
|
||||
};
|
||||
|
||||
static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) {
|
||||
NioSctpChannelOutboundBuffer buffer = RECYCLER.get();
|
||||
@ -403,4 +403,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
|
||||
private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
|
||||
private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
|
||||
super(channel, javaChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel
|
||||
*/
|
||||
public NioSctpServerChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
|
||||
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
|
||||
config = new DefaultSctpServerChannelConfig(this, javaChannel());
|
||||
config = new NioSctpServerChannelConfig(this, javaChannel());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -223,4 +223,15 @@ public class NioSctpServerChannel extends AbstractNioMessageServerChannel
|
||||
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
|
||||
private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) {
|
||||
super(channel, javaChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
||||
ch.register(writeSelector, SelectionKey.OP_WRITE);
|
||||
ch.register(connectSelector, SelectionKey.OP_CONNECT);
|
||||
|
||||
config = new DefaultSctpChannelConfig(this, ch);
|
||||
config = new OioSctpChannelConfig(this, ch);
|
||||
notificationHandler = new SctpNotificationHandler(this);
|
||||
success = true;
|
||||
} catch (Exception e) {
|
||||
@ -449,4 +449,15 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
|
||||
private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
|
||||
super(channel, javaChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel
|
||||
sch.configureBlocking(false);
|
||||
selector = Selector.open();
|
||||
sch.register(selector, SelectionKey.OP_ACCEPT);
|
||||
config = new DefaultSctpServerChannelConfig(this, sch);
|
||||
config = new OioSctpServerChannelConfig(this, sch);
|
||||
success = true;
|
||||
} catch (Exception e) {
|
||||
throw new ChannelException("failed to initialize a sctp server channel", e);
|
||||
@ -291,4 +291,15 @@ public class OioSctpServerChannel extends AbstractOioMessageServerChannel
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
|
||||
private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
|
||||
super(channel, javaChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -258,10 +258,18 @@ public class DefaultChannelConfig implements ChannelConfig {
|
||||
this.autoRead = autoRead;
|
||||
if (autoRead && !oldAutoRead) {
|
||||
channel.read();
|
||||
} else if (!autoRead && oldAutoRead) {
|
||||
autoReadCleared();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
|
||||
* {@code true} before.
|
||||
*/
|
||||
protected void autoReadCleared() { }
|
||||
|
||||
@Override
|
||||
public int getWriteBufferHighWaterMark() {
|
||||
return writeBufferHighWaterMark;
|
||||
|
@ -73,7 +73,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
ByteBuf byteBuf, Throwable cause, boolean close) {
|
||||
if (byteBuf != null) {
|
||||
if (byteBuf.isReadable()) {
|
||||
readPending = false;
|
||||
setReadPending(false);
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
} else {
|
||||
byteBuf.release();
|
||||
@ -89,6 +89,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
@Override
|
||||
public void read() {
|
||||
final ChannelConfig config = config();
|
||||
if (!config.isAutoRead() && !isReadPending()) {
|
||||
// ChannelConfig.setAutoRead(false) was called in the meantime
|
||||
removeReadOp();
|
||||
return;
|
||||
}
|
||||
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
final ByteBufAllocator allocator = config.getAllocator();
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
@ -103,6 +109,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
try {
|
||||
int byteBufCapacity = allocHandle.guess();
|
||||
int totalReadAmount = 0;
|
||||
boolean readPendingReset = false;
|
||||
do {
|
||||
byteBuf = allocator.ioBuffer(byteBufCapacity);
|
||||
int writable = byteBuf.writableBytes();
|
||||
@ -113,7 +120,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
close = localReadAmount < 0;
|
||||
break;
|
||||
}
|
||||
readPending = false;
|
||||
if (!readPendingReset) {
|
||||
readPendingReset = true;
|
||||
setReadPending(false);
|
||||
}
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
|
||||
@ -153,7 +163,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
if (!config.isAutoRead() && !isReadPending()) {
|
||||
removeReadOp();
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
protected final int readInterestOp;
|
||||
private volatile SelectionKey selectionKey;
|
||||
private volatile boolean inputShutdown;
|
||||
private volatile boolean readPending;
|
||||
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent
|
||||
@ -111,6 +112,14 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
return selectionKey;
|
||||
}
|
||||
|
||||
protected boolean isReadPending() {
|
||||
return readPending;
|
||||
}
|
||||
|
||||
protected void setReadPending(boolean readPending) {
|
||||
this.readPending = readPending;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if the input of this {@link Channel} is shutdown
|
||||
*/
|
||||
@ -149,8 +158,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
|
||||
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
|
||||
|
||||
protected boolean readPending;
|
||||
|
||||
protected final void removeReadOp() {
|
||||
SelectionKey key = selectionKey();
|
||||
// Check first if the key is still valid as it may be canceled as part of the deregistration
|
||||
|
@ -51,6 +51,11 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
final ChannelConfig config = config();
|
||||
if (!config.isAutoRead() && !isReadPending()) {
|
||||
// ChannelConfig.setAutoRead(false) was called in the meantime
|
||||
removeReadOp();
|
||||
return;
|
||||
}
|
||||
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
@ -80,7 +85,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
}
|
||||
readPending = false;
|
||||
setReadPending(false);
|
||||
int size = readBuf.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
pipeline.fireChannelRead(readBuf.get(i));
|
||||
@ -111,7 +116,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
if (!config.isAutoRead() && !isReadPending()) {
|
||||
removeReadOp();
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.oio;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
@ -70,7 +71,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
if (checkInputShutdown()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ChannelConfig config = config();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
|
||||
// TODO: calculate size as in 3.x
|
||||
@ -78,9 +79,10 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
boolean closed = false;
|
||||
boolean read = false;
|
||||
Throwable exception = null;
|
||||
int localReadAmount = 0;
|
||||
try {
|
||||
for (;;) {
|
||||
int localReadAmount = doReadBytes(byteBuf);
|
||||
localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount > 0) {
|
||||
read = true;
|
||||
} else if (localReadAmount < 0) {
|
||||
@ -110,7 +112,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!config().isAutoRead()) {
|
||||
if (!config.isAutoRead()) {
|
||||
// stop reading until next Channel.read() call
|
||||
// See https://github.com/netty/netty/issues/1363
|
||||
break;
|
||||
@ -147,6 +149,15 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (localReadAmount == 0 && isActive()) {
|
||||
// If the read amount was 0 and the channel is still active we need to trigger a new read()
|
||||
// as otherwise we will never try to read again and the user will never know.
|
||||
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
|
||||
// able to process the rest of the tasks in the queue first.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2404
|
||||
read();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,12 +31,17 @@ public abstract class AbstractOioChannel extends AbstractChannel {
|
||||
|
||||
protected static final int SO_TIMEOUT = 1000;
|
||||
|
||||
private boolean readInProgress;
|
||||
private volatile boolean readPending;
|
||||
|
||||
private final Runnable readTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
readInProgress = false;
|
||||
if (!isReadPending() && !config().isAutoRead()) {
|
||||
// ChannelConfig.setAutoRead(false) was called in the meantime so just return
|
||||
return;
|
||||
}
|
||||
|
||||
setReadPending(false);
|
||||
doRead();
|
||||
}
|
||||
};
|
||||
@ -91,13 +96,21 @@ public abstract class AbstractOioChannel extends AbstractChannel {
|
||||
|
||||
@Override
|
||||
protected void doBeginRead() throws Exception {
|
||||
if (readInProgress) {
|
||||
if (isReadPending()) {
|
||||
return;
|
||||
}
|
||||
|
||||
readInProgress = true;
|
||||
setReadPending(true);
|
||||
eventLoop().execute(readTask);
|
||||
}
|
||||
|
||||
protected abstract void doRead();
|
||||
|
||||
protected boolean isReadPending() {
|
||||
return readPending;
|
||||
}
|
||||
|
||||
protected void setReadPending(boolean readPending) {
|
||||
this.readPending = readPending;
|
||||
}
|
||||
}
|
||||
|
@ -37,15 +37,16 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
|
||||
|
||||
@Override
|
||||
protected void doRead() {
|
||||
final ChannelConfig config = config();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
boolean closed = false;
|
||||
final ChannelConfig config = config();
|
||||
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||
|
||||
Throwable exception = null;
|
||||
int localRead = 0;
|
||||
try {
|
||||
for (;;) {
|
||||
int localRead = doReadMessages(readBuf);
|
||||
localRead = doReadMessages(readBuf);
|
||||
if (localRead == 0) {
|
||||
break;
|
||||
}
|
||||
@ -81,6 +82,14 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
|
||||
if (isOpen()) {
|
||||
unsafe().close(unsafe().voidPromise());
|
||||
}
|
||||
} else if (localRead == 0 && isActive()) {
|
||||
// If the read amount was 0 and the channel is still active we need to trigger a new read()
|
||||
// as otherwise we will never try to read again and the user will never know.
|
||||
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
|
||||
// able to process the rest of the tasks in the queue first.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2404
|
||||
read();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -541,4 +541,9 @@ public final class NioDatagramChannel
|
||||
protected ChannelOutboundBuffer newOutboundBuffer() {
|
||||
return NioDatagramChannelOutboundBuffer.newInstance(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setReadPending(boolean readPending) {
|
||||
super.setReadPending(readPending);
|
||||
}
|
||||
}
|
||||
|
@ -161,6 +161,17 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatagramChannelConfig setAutoRead(boolean autoRead) {
|
||||
super.setAutoRead(autoRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
((NioDatagramChannel) channel).setReadPending(false);
|
||||
}
|
||||
|
||||
private Object getOption0(Object option) {
|
||||
if (PlatformDependent.javaVersion() < 7) {
|
||||
throw new UnsupportedOperationException();
|
||||
|
@ -28,6 +28,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
@ -83,7 +84,7 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel
|
||||
*/
|
||||
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup, ServerSocketChannel channel) {
|
||||
super(null, eventLoop, childGroup, channel, SelectionKey.OP_ACCEPT);
|
||||
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
|
||||
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -179,4 +180,15 @@ public class NioServerSocketChannel extends AbstractNioMessageServerChannel
|
||||
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
|
||||
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import io.netty.util.internal.OneTimeTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
@ -91,7 +92,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
*/
|
||||
public NioSocketChannel(Channel parent, EventLoop eventLoop, SocketChannel socket) {
|
||||
super(parent, eventLoop, socket);
|
||||
config = new DefaultSocketChannelConfig(this, socket.socket());
|
||||
config = new NioSocketChannelConfig(this, socket.socket());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -321,4 +322,15 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
protected ChannelOutboundBuffer newOutboundBuffer() {
|
||||
return NioSocketChannelOutboundBuffer.newInstance(this);
|
||||
}
|
||||
|
||||
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
|
||||
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
setReadPending(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,10 +35,15 @@ import static io.netty.channel.ChannelOption.*;
|
||||
public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChannelConfig implements
|
||||
OioServerSocketChannelConfig {
|
||||
|
||||
@Deprecated
|
||||
public DefaultOioServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
DefaultOioServerSocketChannelConfig(OioServerSocketChannel channel, ServerSocket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(
|
||||
@ -145,6 +150,13 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
if (channel instanceof OioServerSocketChannel) {
|
||||
((OioServerSocketChannel) channel).setReadPending(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
|
@ -33,10 +33,15 @@ import static io.netty.channel.ChannelOption.*;
|
||||
* Default {@link OioSocketChannelConfig} implementation
|
||||
*/
|
||||
public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig implements OioSocketChannelConfig {
|
||||
@Deprecated
|
||||
public DefaultOioSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
DefaultOioSocketChannelConfig(OioSocketChannel channel, Socket javaSocket) {
|
||||
super(channel, javaSocket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(
|
||||
@ -173,6 +178,13 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
if (channel instanceof OioSocketChannel) {
|
||||
((OioSocketChannel) channel).setReadPending(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
|
@ -194,4 +194,9 @@ public class OioServerSocketChannel extends AbstractOioMessageServerChannel impl
|
||||
protected void doDisconnect() throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setReadPending(boolean readPending) {
|
||||
super.setReadPending(readPending);
|
||||
}
|
||||
}
|
||||
|
@ -232,4 +232,9 @@ public class OioSocketChannel extends OioByteStreamChannel
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setReadPending(boolean readPending) {
|
||||
super.setReadPending(readPending);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user