EpollRecvByteAllocatorHandle doesn't inform delegate of more data

Motivation:
EpollRecvByteAllocatorHandle intends to override the meaning of "maybe more data to read" which is a concept also used in all existing implementations of RecvByteBufAllocator$Handle but the interface doesn't support overriding. Because the interfaces lack the ability to propagate this computation EpollRecvByteAllocatorHandle attempts to implement a heuristic on top of the delegate which may lead to reading when we shouldn't or not reading data.

Modifications:
- Create a new interface ExtendedRecvByteBufAllocator and ExtendedHandle which allows the "maybe more data to read" between interfaces
- Deprecate RecvByteBufAllocator and change all existing implementations to extend ExtendedRecvByteBufAllocator
- transport-native-epoll should require ExtendedRecvByteBufAllocator so the "maybe more data to read" can be propagated to the ExtendedHandle

Result:
Fixes https://github.com/netty/netty/issues/6303.
This commit is contained in:
Scott Mitchell 2017-02-02 10:15:10 -08:00
parent 6765e9f99d
commit a1b5b5dcca
23 changed files with 600 additions and 88 deletions

View File

@ -0,0 +1,48 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
/**
* Represents a supplier of {@code boolean}-valued results which doesn't throw any checked exceptions.
*/
public interface UncheckedBooleanSupplier extends BooleanSupplier {
/**
* Gets a boolean value.
* @return a boolean value.
*/
@Override
boolean get();
/**
* A supplier which always returns {@code false} and never throws.
*/
UncheckedBooleanSupplier FALSE_SUPPLIER = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return false;
}
};
/**
* A supplier which always returns {@code true} and never throws.
*/
UncheckedBooleanSupplier TRUE_SUPPLIER = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return true;
}
};
}

View File

@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.UncheckedBooleanSupplier;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@ -161,8 +162,8 @@ public class SocketAutoReadTest extends AbstractSocketTest {
*/
private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
@Override
public Handle newHandle() {
return new Handle() {
public ExtendedHandle newHandle() {
return new ExtendedHandle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
@ -211,6 +212,11 @@ public class SocketAutoReadTest extends AbstractSocketTest {
return config.isAutoRead();
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead();
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.

View File

@ -0,0 +1,225 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.DuplexChannel;
import io.netty.util.UncheckedBooleanSupplier;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class SocketHalfClosedTest extends AbstractSocketTest {
@Test
public void testAllDataReadAfterHalfClosure() throws Throwable {
run();
}
public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testAllDataReadAfterHalfClosure(true, sb, cb);
testAllDataReadAfterHalfClosure(false, sb, cb);
}
public void testAllDataReadAfterHalfClosure(final boolean autoRead,
ServerBootstrap sb, Bootstrap cb) throws Throwable {
final int totalServerBytesWritten = 1024 * 16;
final int numReadsPerReadLoop = 2;
final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
final AtomicInteger clientReadCompletes = new AtomicInteger();
Channel serverChannel = null;
Channel clientChannel = null;
try {
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
.option(ChannelOption.AUTO_READ, autoRead)
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
buf.writerIndex(buf.capacity());
ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
((DuplexChannel) future.channel()).shutdownOutput();
}
});
serverInitializedLatch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private int bytesRead;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
bytesRead += buf.readableBytes();
buf.release();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt == ChannelInputShutdownEvent.INSTANCE) {
clientHalfClosedLatch.countDown();
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
clientReadCompletes.incrementAndGet();
if (bytesRead == totalServerBytesWritten) {
clientReadAllDataLatch.countDown();
}
if (!autoRead) {
ctx.read();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
clientChannel.read();
serverInitializedLatch.await();
clientReadAllDataLatch.await();
clientHalfClosedLatch.await();
assertTrue("too many read complete events: " + clientReadCompletes.get(),
totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get());
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
/**
* Designed to read a single byte at a time to control the number of reads done at a fine granularity.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final int numReads;
TestNumReadsRecvByteBufAllocator(int numReads) {
this.numReads = numReads;
}
@Override
public ExtendedHandle newHandle() {
return new ExtendedHandle() {
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess(), guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
numMessagesRead = 0;
}
@Override
public void incMessagesRead(int numMessages) {
numMessagesRead += numMessages;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return numMessagesRead < numReads;
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return continueReading() && maybeMoreDataSupplier.get();
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.
}
};
}
}
}

View File

@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.UncheckedBooleanSupplier;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
@ -39,7 +40,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SocketReadPendingTest extends AbstractSocketTest {
@Test
@Test(timeout = 30000)
public void testReadPendingIsResetAfterEachRead() throws Throwable {
run();
}
@ -130,7 +131,7 @@ public class SocketReadPendingTest extends AbstractSocketTest {
}
/**
* Designed to keep reading as long as autoread is enabled.
* Designed to read a single byte at a time to control the number of reads done at a fine granularity.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final int numReads;
@ -139,8 +140,8 @@ public class SocketReadPendingTest extends AbstractSocketTest {
}
@Override
public Handle newHandle() {
return new Handle() {
public ExtendedHandle newHandle() {
return new ExtendedHandle() {
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@ -189,6 +190,11 @@ public class SocketReadPendingTest extends AbstractSocketTest {
return numMessagesRead < numReads;
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return continueReading();
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.

View File

@ -24,10 +24,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil;
@ -45,6 +45,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private final int readFlag;
private final Socket fileDescriptor;
protected int flags = Native.EPOLLET;
boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending;
protected volatile boolean active;
@ -107,6 +109,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
@Override
protected void doClose() throws Exception {
active = false;
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
// socket which has not even been connected yet. This has been observed to block during unit tests.
inputClosedSeenErrorOnRead = true;
try {
doDeregister();
} finally {
@ -148,10 +153,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
if (unsafe.maybeMoreDataToRead) {
unsafe.executeEpollInReadyRunnable();
unsafe.executeEpollInReadyRunnable(config());
}
}
final boolean shouldBreakEpollInReady(ChannelConfig config) {
return fileDescriptor.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
}
final boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof EpollSocketChannelConfig &&
((EpollSocketChannelConfig) config).isAllowHalfClosure();
}
final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise
if (isRegistered()) {
@ -186,12 +200,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
@Override
protected void doRegister() throws Exception {
EpollEventLoop loop = (EpollEventLoop) eventLoop();
// Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
// make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
((AbstractEpollUnsafe) unsafe()).epollInReadyRunnablePending = false;
loop.add(this);
epollInReadyRunnablePending = false;
((EpollEventLoop) eventLoop()).add(this);
}
@Override
@ -314,9 +327,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
boolean readPending;
boolean maybeMoreDataToRead;
boolean epollInReadyRunnablePending;
private EpollRecvByteAllocatorHandle allocHandle;
private Runnable epollInReadyRunnable;
private final Runnable epollInReadyRunnable = new Runnable() {
@Override
public void run() {
epollInReadyRunnablePending = false;
epollInReady();
}
};
/**
* Called once EPOLLIN event is ready to be processed
@ -326,7 +344,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
final void epollInBefore() { maybeMoreDataToRead = false; }
final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
@ -335,7 +353,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
} else if (readPending && maybeMoreDataToRead && !fd().isInputShutdown()) {
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
@ -343,24 +361,15 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable();
executeEpollInReadyRunnable(config);
}
}
final void executeEpollInReadyRunnable() {
if (epollInReadyRunnablePending) {
final void executeEpollInReadyRunnable(ChannelConfig config) {
if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
return;
}
epollInReadyRunnablePending = true;
if (epollInReadyRunnable == null) {
epollInReadyRunnable = new Runnable() {
@Override
public void run() {
epollInReadyRunnablePending = false;
epollInReady();
}
};
}
eventLoop().execute(epollInReadyRunnable);
}
@ -380,9 +389,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
}
// epollInReady may call this, but we should ensure that it gets called.
shutdownInput();
shutdownInput(true);
}
/**
@ -400,12 +408,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
/**
* Shutdown the input side of the channel.
*/
void shutdownInput() {
void shutdownInput(boolean rdHup) {
if (!fd().isInputShutdown()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
if (isAllowHalfClosure(config())) {
try {
fd().shutdown(true, false);
clearEpollIn0();
} catch (IOException ignored) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
@ -421,6 +428,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} else {
close(voidPromise());
}
} else if (!rdHup) {
inputClosedSeenErrorOnRead = true;
pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
@ -432,7 +442,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
@Override
public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
allocHandle = newEpollHandle(super.recvBufAllocHandle());
allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
}
return allocHandle;
}
@ -441,8 +451,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
* Create a new {@link EpollRecvByteAllocatorHandle} instance.
* @param handle The handle to wrap with EPOLL specific logic.
*/
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorHandle(handle, config());
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
return new EpollRecvByteAllocatorHandle(handle);
}
@Override

View File

@ -107,11 +107,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
final ChannelConfig config = config();
if (shouldBreakEpollInReady(config)) {
clearEpollIn0();
return;
}
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

View File

@ -795,7 +795,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
class EpollStreamUnsafe extends AbstractEpollUnsafe {
// Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override
protected Executor prepareToClose() {
@ -816,7 +815,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
shutdownInput();
shutdownInput(false);
}
}
@ -963,17 +962,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
@Override
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, config());
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle);
}
@Override
void epollInReady() {
if (fd().isInputShutdown()) {
final ChannelConfig config = config();
if (shouldBreakEpollInReady(config)) {
clearEpollIn0();
return;
}
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
@ -1018,7 +1017,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (fd().isInputShutdown()) {
if (shouldBreakEpollInReady(config)) {
// We need to do this for two reasons:
//
// - If the input was shutdown in between (which may be the case when the user did it in the
@ -1038,7 +1037,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
pipeline.fireChannelReadComplete();
if (close) {
shutdownInput();
shutdownInput(false);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);

View File

@ -86,6 +86,10 @@ public class EpollChannelConfig extends DefaultChannelConfig {
@Override
public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
RecvByteBufAllocator.ExtendedHandle.class);
}
super.setRecvByteBufAllocator(allocator);
return this;
}

View File

@ -523,11 +523,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
DatagramChannelConfig config = config();
if (shouldBreakEpollInReady(config)) {
clearEpollIn0();
return;
}
DatagramChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

View File

@ -15,17 +15,26 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import io.netty.util.internal.ObjectUtil;
class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandle {
private final RecvByteBufAllocator.ExtendedHandle delegate;
private boolean isEdgeTriggered;
private final ChannelConfig config;
private boolean receivedRdHup;
private final UncheckedBooleanSupplier defaultMaybeMoreDataSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return maybeMoreDataToRead();
}
};
EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
super(handle);
this.config = config;
EpollRecvByteAllocatorHandle(RecvByteBufAllocator.ExtendedHandle handle) {
this.delegate = ObjectUtil.checkNotNull(handle, "handle");
}
final void receivedRdHup() {
@ -37,8 +46,16 @@ class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle
}
boolean maybeMoreDataToRead() {
// If EPOLLRDHUP has been received we must read until we get a read error.
return isEdgeTriggered && (lastBytesRead() > 0 || receivedRdHup);
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. It is expected that the
* {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not
* read, and will force a EPOLLIN ready event.
*/
return (isEdgeTriggered && lastBytesRead() > 0) ||
(!isEdgeTriggered && lastBytesRead() == attemptedBytesRead()) ||
receivedRdHup;
}
final void edgeTriggered(boolean edgeTriggered) {
@ -49,16 +66,59 @@ class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle
return isEdgeTriggered;
}
@Override
public final ByteBuf allocate(ByteBufAllocator alloc) {
return delegate.allocate(alloc);
}
@Override
public final int guess() {
return delegate.guess();
}
@Override
public final void reset(ChannelConfig config) {
delegate.reset(config);
}
@Override
public final void incMessagesRead(int numMessages) {
delegate.incMessagesRead(numMessages);
}
@Override
public final void lastBytesRead(int bytes) {
delegate.lastBytesRead(bytes);
}
@Override
public final int lastBytesRead() {
return delegate.lastBytesRead();
}
@Override
public final int attemptedBytesRead() {
return delegate.attemptedBytesRead();
}
@Override
public final void attemptedBytesRead(int bytes) {
delegate.attemptedBytesRead(bytes);
}
@Override
public final void readComplete() {
delegate.readComplete();
}
@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return delegate.continueReading(maybeMoreDataSupplier);
}
@Override
public final boolean continueReading() {
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. If auto read is on we force reading to
* continue to avoid a {@link StackOverflowError} between channelReadComplete and reading from the
* channel. It is expected that the {@link #EpollSocketChannel} implementations will track if we are in
* edgeTriggered mode and all data was not read, and will force a EPOLLIN ready event.
*/
return maybeMoreDataToRead() && config.isAutoRead() || super.continueReading();
// We must override the supplier which determines if there maybe more data to read.
return delegate.continueReading(defaultMaybeMoreDataSupplier);
}
}

View File

@ -15,12 +15,11 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle {
public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
super(handle, config);
public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.ExtendedHandle handle) {
super(handle);
}
@Override
@ -31,6 +30,6 @@ final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocator
*
* If EPOLLRDHUP has been received we must read until we get a read error.
*/
return isEdgeTriggered() && (lastBytesRead() == attemptedBytesRead() || isReceivedRdHup());
return lastBytesRead() == attemptedBytesRead() || isReceivedRdHup();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
import java.util.List;
public class EpollETSocketHalfClosed extends SocketHalfClosedTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
import java.util.List;
public class EpollLTSocketHalfClosed extends SocketHalfClosedTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -140,7 +140,7 @@ public class EpollSocketChannelConfigTest {
public void testGetOptionWhenClosed() {
ch.close().syncUninterruptibly();
try {
ch.config().getSoLinger();
ch.config().getSoLinger();
fail();
} catch (ChannelException e) {
assertTrue(e.getCause() instanceof ClosedChannelException);

View File

@ -175,6 +175,7 @@ public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufA
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);

View File

@ -29,8 +29,8 @@ import static io.netty.channel.ChannelOption.AUTO_READ;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
@ -307,7 +307,7 @@ public class DefaultChannelConfig implements ChannelConfig {
} else if (allocator == null) {
throw new NullPointerException("allocator");
}
rcvBufAllocator = allocator;
setRecvByteBufAllocator(allocator);
}
@Override

View File

@ -15,11 +15,12 @@
*/
package io.netty.channel;
import java.util.AbstractMap;
import java.util.Map.Entry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import java.util.AbstractMap;
import java.util.Map.Entry;
/**
* The {@link RecvByteBufAllocator} that yields a buffer size prediction based upon decrementing the value from
@ -29,11 +30,17 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA
private volatile int maxBytesPerRead;
private volatile int maxBytesPerIndividualRead;
private final class HandleImpl implements Handle {
private final class HandleImpl implements ExtendedHandle {
private int individualReadMax;
private int bytesToRead;
private int lastBytesRead;
private int attemptBytesRead;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptBytesRead == lastBytesRead;
}
};
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
@ -70,8 +77,13 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
// Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided.
return bytesToRead > 0 && attemptBytesRead == lastBytesRead;
return bytesToRead > 0 && maybeMoreDataSupplier.get();
}
@Override
@ -99,6 +111,7 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA
this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl();

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
/**
* Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
@ -50,13 +51,19 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa
/**
* Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
*/
public abstract class MaxMessageHandle implements Handle {
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
/**
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
@ -81,11 +88,8 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa
@Override
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
// Ignore if bytes is negative, the interface contract states it will be detected externally after call.
// The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
totalBytesRead += bytes;
if (totalBytesRead < 0) {
totalBytesRead = Integer.MAX_VALUE;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
@ -96,10 +100,15 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
totalBytesRead > 0;
}
@Override
@ -117,7 +126,7 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa
}
protected final int totalBytesRead() {
return totalBytesRead;
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
}
}

View File

@ -48,6 +48,7 @@ public class FixedRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllo
this.bufferSize = bufferSize;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(bufferSize);

View File

@ -17,6 +17,9 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import io.netty.util.internal.UnstableApi;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
@ -24,13 +27,16 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull;
* not to waste its space.
*/
public interface RecvByteBufAllocator {
/**
* Creates a new handle. The handle provides the actual operations and keeps the internal information which is
* required for predicting an optimal buffer capacity.
*/
Handle newHandle();
/**
* @Deprecated Use {@link ExtendedHandle}.
*/
@Deprecated
interface Handle {
/**
* Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
@ -101,6 +107,16 @@ public interface RecvByteBufAllocator {
void readComplete();
}
@SuppressWarnings("deprecation")
@UnstableApi
interface ExtendedHandle extends Handle {
/**
* Same as {@link Handle#continueReading()} except "more data" is determined by the supplier parameter.
* @param maybeMoreDataSupplier A supplier that determines if there maybe more data to read.
*/
boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier);
}
/**
* A {@link Handle} which delegates all call to some other {@link Handle}.
*/

View File

@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -59,6 +60,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
*/
protected abstract ChannelFuture shutdownInput();
protected boolean isInputShutdown0() {
return false;
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
@ -72,15 +77,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
private void closeOnRead(ChannelPipeline pipeline) {
if (isOpen()) {
if (!isInputShutdown0()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
shutdownInput();
SelectionKey key = selectionKey();
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
/**
* User event that signifies the channel's input side is shutdown, and we tried to shut it down again. This typically
* indicates that there is no more data to read.
*/
public final class ChannelInputShutdownReadComplete {
public static final ChannelInputShutdownReadComplete INSTANCE = new ChannelInputShutdownReadComplete();
private ChannelInputShutdownReadComplete() {
}
}

View File

@ -182,6 +182,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return shutdownInput(newPromise());
}
@Override
protected boolean isInputShutdown0() {
return isInputShutdown();
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();