diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java index 7eda9d97bb..7cb54c1365 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java @@ -44,6 +44,8 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; + default: + break; } return buf; } diff --git a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java index 407c8bee83..c9a7c8bc56 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java @@ -16,7 +16,6 @@ package io.netty.buffer; -import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache; import io.netty.util.Recycler; import io.netty.util.internal.PlatformDependent; diff --git a/buffer/src/test/java/io/netty/buffer/SlicedByteBufTest.java b/buffer/src/test/java/io/netty/buffer/SlicedByteBufTest.java index 5922f8a92a..e303176310 100644 --- a/buffer/src/test/java/io/netty/buffer/SlicedByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/SlicedByteBufTest.java @@ -15,7 +15,6 @@ */ package io.netty.buffer; -import io.netty.util.IllegalReferenceCountException; import org.junit.Test; import java.io.IOException; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index a543749aa2..f5166ff743 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -23,6 +23,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -34,7 +35,7 @@ import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel { - private static final ChannelMetadata DATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false); private final int readFlag; private final FileDescriptor fileDescriptor; protected int flags = Native.EPOLLET; @@ -93,7 +94,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override public ChannelMetadata metadata() { - return DATA; + return METADATA; } @Override @@ -230,6 +231,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected final int doReadBytes(ByteBuf byteBuf) throws Exception { int writerIndex = byteBuf.writerIndex(); int localReadAmount; + unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); if (byteBuf.hasMemoryAddress()) { localReadAmount = Native.readAddress( fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); @@ -294,6 +296,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected boolean readPending; + private EpollRecvByteAllocatorHandle allocHandle; /** * Called once EPOLLIN event is ready to be processed @@ -307,6 +310,20 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // NOOP } + @Override + public EpollRecvByteAllocatorHandle recvBufAllocHandle() { + if (allocHandle == null) { + allocHandle = newEpollHandle(super.recvBufAllocHandle()); + } + return allocHandle; + } + + /** + * Create a new {@EpollRecvByteAllocatorHandle} instance. + * @param handle The handle to wrap with EPOLL specific logic. + */ + protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle); + @Override protected void flush0() { // Flush immediately only when there's no pending flush. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index db66538500..eaeded0d16 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -17,18 +17,20 @@ package io.netty.channel.epoll; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.channel.unix.FileDescriptor; import java.net.InetSocketAddress; import java.net.SocketAddress; - public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); protected AbstractEpollServerChannel(int fd) { super(fd, Native.EPOLLIN); @@ -38,6 +40,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0); } + @Override + public ChannelMetadata metadata() { + return METADATA; + } + @Override protected boolean isCompatible(EventLoop loop) { return loop instanceof EpollEventLoop; @@ -77,6 +84,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im channelPromise.setFailure(new UnsupportedOperationException()); } + @Override + protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { + return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET)); + } + @Override void epollInReady() { assert eventLoop().inEventLoop(); @@ -90,13 +102,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im } final ChannelPipeline pipeline = pipeline(); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); + Throwable exception = null; try { try { - // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. - final int maxMessagesPerRead = edgeTriggered - ? Integer.MAX_VALUE : config.getMaxMessagesPerRead(); - int messages = 0; do { int socketFd = Native.accept(fd().intValue(), acceptedAddress); if (socketFd == -1) { @@ -104,26 +115,23 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im break; } readPending = false; + allocHandle.incMessagesRead(1); try { int len = acceptedAddress[0]; pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len)); } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); - } finally { - if (!edgeTriggered && !config.isAutoRead()) { - // This is not using EPOLLET so we can stop reading - // ASAP as we will get notified again later with - // pending data - break; + if (edgeTriggered) { // We must keep reading if ET is enabled + pipeline.fireExceptionCaught(t); + } else { + throw t; } } - } while (++ messages < maxMessagesPerRead); + } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 98774cb819..6708d04276 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -48,8 +49,6 @@ import java.util.Queue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import static io.netty.util.internal.ObjectUtil.checkNotNull; - public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private static final String EXPECTED_TYPES = @@ -595,9 +594,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } class EpollStreamUnsafe extends AbstractEpollUnsafe { - - private RecvByteBufAllocator.Handle allocHandle; - private void closeOnRead(ChannelPipeline pipeline) { inputShutdown = true; if (isOpen()) { @@ -619,6 +615,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { byteBuf.release(); } } + recvBufAllocHandle().readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { @@ -770,7 +767,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { void epollRdHupReady() { if (isActive()) { // If it is still active, we need to call epollInReady as otherwise we may miss to - // read pending data from the underyling file descriptor. + // read pending data from the underlying file descriptor. // See https://github.com/netty/netty/issues/3709 epollInReady(); } else { @@ -778,6 +775,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } + @Override + protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { + return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET)); + } + @Override void epollInReady() { final ChannelConfig config = config(); @@ -791,84 +793,69 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); - RecvByteBufAllocator.Handle allocHandle = this.allocHandle; - if (allocHandle == null) { - this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); - } + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { - // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. - final int maxMessagesPerRead = edgeTriggered - ? Integer.MAX_VALUE : config.getMaxMessagesPerRead(); - int messages = 0; - int totalReadAmount = 0; do { - SpliceInTask spliceTask = spliceQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - spliceQueue.remove(); + try { + SpliceInTask spliceTask = spliceQueue.peek(); + if (spliceTask != null) { + if (spliceTask.spliceIn(allocHandle)) { + // We need to check if it is still active as if not we removed all SpliceTasks in + // doClose(...) + if (isActive()) { + spliceQueue.remove(); + } + continue; + } else { + break; } - continue; - } else { + } + + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocHandle.allocate(allocator); + allocHandle.lastBytesRead(doReadBytes(byteBuf)); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + close = allocHandle.lastBytesRead() < 0; break; } + readPending = false; + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + } catch (Throwable t) { + if (edgeTriggered) { // We must keep reading if ET is enabled + if (byteBuf != null) { + byteBuf.release(); + byteBuf = null; + } + pipeline.fireExceptionCaught(t); + } else { + // byteBuf is release in outer exception handling if necessary. + throw t; + } } + } while (allocHandle.continueReading()); - // we use a direct buffer here as the native implementations only be able - // to handle direct buffers. - byteBuf = allocHandle.allocate(allocator); - int writable = byteBuf.writableBytes(); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount <= 0) { - // not was read release the buffer - byteBuf.release(); - close = localReadAmount < 0; - break; - } - readPending = false; - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - - if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { - allocHandle.record(totalReadAmount); - - // Avoid overflow. - totalReadAmount = localReadAmount; - } else { - totalReadAmount += localReadAmount; - } - - if (localReadAmount < writable) { - // Read less than what the buffer can hold, - // which might mean we drained the recv buffer completely. - break; - } - if (!edgeTriggered && !config.isAutoRead()) { - // This is not using EPOLLET so we can stop reading - // ASAP as we will get notified again later with - // pending data - break; - } - } while (++ messages < maxMessagesPerRead); - + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); - allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); - close = false; } } catch (Throwable t) { boolean closed = handleReadException(pipeline, byteBuf, t, close); if (!closed) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket - eventLoop().execute(new Runnable() { + eventLoop().execute(new OneTimeTask() { @Override public void run() { epollInReady(); @@ -919,8 +906,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { length -= localSplicedIn; } - // record the number of bytes we spliced before - handle.record(splicedIn); return splicedIn; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java index 9429315f24..4bda3dd5b0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -65,6 +65,7 @@ public class EpollChannelConfig extends DefaultChannelConfig { } @Override + @Deprecated public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 2eb36bf848..3681024e3a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.ChannelFuture; @@ -473,7 +474,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { - private final List readBuf = new ArrayList(); @Override @@ -511,11 +511,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } } + @Override + protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { + return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET)); + } + @Override void epollInReady() { assert eventLoop().inEventLoop(); DatagramChannelConfig config = config(); - boolean edgeTriggered = isFlagSet(Native.EPOLLET); + boolean edgeTriggered = isFlagSet(Native.EPOLLET); if (!readPending && !edgeTriggered && !config.isAutoRead()) { // ChannelConfig.setAutoRead(false) was called in the meantime @@ -523,65 +528,64 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return; } - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); + Throwable exception = null; try { - // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. - final int maxMessagesPerRead = edgeTriggered - ? Integer.MAX_VALUE : config.getMaxMessagesPerRead(); - int messages = 0; do { ByteBuf data = null; try { - data = allocHandle.allocate(config.getAllocator()); - int writerIndex = data.writerIndex(); - DatagramSocketAddress remoteAddress; + data = allocHandle.allocate(allocator); + allocHandle.attemptedBytesRead(data.writableBytes()); + final DatagramSocketAddress remoteAddress; if (data.hasMemoryAddress()) { // has a memory address so use optimized call remoteAddress = Native.recvFromAddress( - fd().intValue(), data.memoryAddress(), writerIndex, data.capacity()); + fd().intValue(), data.memoryAddress(), data.writerIndex(), data.capacity()); } else { - ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes()); + ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); remoteAddress = Native.recvFrom( fd().intValue(), nioData, nioData.position(), nioData.limit()); } if (remoteAddress == null) { + data.release(); + data = null; break; } - int readBytes = remoteAddress.receivedAmount; - data.writerIndex(data.writerIndex() + readBytes); - allocHandle.record(readBytes); + allocHandle.incMessagesRead(1); + allocHandle.lastBytesRead(remoteAddress.receivedAmount); + data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()); readPending = false; readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); data = null; } catch (Throwable t) { - // We do not break from the loop here and remember the last exception, - // because we need to consume everything from the socket used with epoll ET. - exception = t; - } finally { if (data != null) { data.release(); + data = null; } - if (!edgeTriggered && !config.isAutoRead()) { - // This is not using EPOLLET so we can stop reading - // ASAP as we will get notified again later with - // pending data + if (edgeTriggered) { + // We do not break from the loop here and remember the last exception, + // because we need to consume everything from the socket used with epoll ET. + pipeline.fireExceptionCaught(t); + } else { + exception = t; break; } } - } while (++ messages < maxMessagesPerRead); + } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } - readBuf.clear(); + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index b714a6d0c0..d9f24696dc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -178,6 +178,7 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme } @Override + @Deprecated public EpollDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 76917d19cb..fb94e2a85b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.DomainSocketChannel; import io.netty.channel.unix.FileDescriptor; +import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; @@ -140,12 +141,10 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i } final ChannelPipeline pipeline = pipeline(); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); try { - // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. - final int maxMessagesPerRead = edgeTriggered - ? Integer.MAX_VALUE : config.getMaxMessagesPerRead(); - int messages = 0; do { int socketFd = Native.recvFd(fd().intValue()); if (socketFd == 0) { @@ -155,32 +154,30 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i close(voidPromise()); return; } - readPending = false; + readPending = false; + allocHandle.incMessagesRead(1); try { pipeline.fireChannelRead(new FileDescriptor(socketFd)); } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); - } finally { - if (!edgeTriggered && !config.isAutoRead()) { - // This is not using EPOLLET so we can stop reading - // ASAP as we will get notified again later with - // pending data - break; + // If ET is enabled we need to consume everything from the socket + if (edgeTriggered) { + pipeline.fireExceptionCaught(t); + } else { + throw t; } } - } while (++ messages < maxMessagesPerRead); + } while (allocHandle.continueReading()); + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); - } catch (Throwable t) { + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket - eventLoop().execute(new Runnable() { + eventLoop().execute(new OneTimeTask() { @Override public void run() { epollInReady(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java index b5fb3bfa48..c049f70d16 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java @@ -58,7 +58,9 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig return true; } + @Override + @Deprecated public EpollDomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/codec-dns/src/test/java/io/netty/handler/codec/dns/package-info.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java similarity index 53% rename from codec-dns/src/test/java/io/netty/handler/codec/dns/package-info.java rename to transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java index 35930c2162..6373683bba 100644 --- a/codec-dns/src/test/java/io/netty/handler/codec/dns/package-info.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2015 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,9 +13,19 @@ * License for the specific language governing permissions and limitations * under the License. */ +package io.netty.channel.epoll; -/** - * A DNS client that queries a server and checks if query information and - * responses are valid to ensure codec is correct. - */ -package io.netty.handler.codec.dns; +import io.netty.channel.RecvByteBufAllocator; + +abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle { + private final boolean isEdgeTriggered; + + public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { + super(handle); + this.isEdgeTriggered = isEdgeTriggered; + } + + public final boolean isEdgeTriggered() { + return isEdgeTriggered; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java new file mode 100644 index 0000000000..9ea5539458 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.RecvByteBufAllocator; + +/** + * Respects termination conditions for EPOLL message (aka packet) based protocols. + */ +final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHandle { + public EpollRecvByteAllocatorMessageHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { + super(handle, isEdgeTriggered); + } + + @Override + public boolean continueReading() { + /** + * If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For + * packet oriented descriptors must read until we get a EAGAIN + * (see Q9 in epoll man). + */ + return isEdgeTriggered() ? true : super.continueReading(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java new file mode 100644 index 0000000000..1fc3bf1e96 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java @@ -0,0 +1,38 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.RecvByteBufAllocator; + +/** + * EPOLL must read until no more data is available while in edge triggered mode. This class will always continue reading + * unless the last read did not fill up the available buffer space. + */ +final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle { + public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { + super(handle, isEdgeTriggered); + } + + @Override + public boolean continueReading() { + /** + * if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. + * For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce + * a full buffer (see Q9 in epoll man). + */ + return isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java index b24e2632f6..0f795851e8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java @@ -110,6 +110,7 @@ public class EpollServerChannelConfig extends EpollChannelConfig { } @Override + @Deprecated public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index e6888eb55e..2138187908 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -97,6 +97,7 @@ public final class EpollServerSocketChannelConfig extends EpollServerChannelConf } @Override + @Deprecated public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java index cc3a128d6b..bae7af6484 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -315,6 +315,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement } @Override + @Deprecated public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketAddress.java b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketAddress.java index 9f581c94da..496c6a358f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketAddress.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketAddress.java @@ -23,6 +23,7 @@ import java.net.SocketAddress; * Unix Domain Socket. */ public final class DomainSocketAddress extends SocketAddress { + private static final long serialVersionUID = -6934618000832236893L; private final String socketPath; public DomainSocketAddress(String socketPath) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannelConfig.java index 9931c5fb31..dcdacd860e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannelConfig.java @@ -26,6 +26,7 @@ import io.netty.channel.RecvByteBufAllocator; public interface DomainSocketChannelConfig extends ChannelConfig { @Override + @Deprecated DomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/DefaultRxtxChannelConfig.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/DefaultRxtxChannelConfig.java index 16b09cb475..b9ace86e76 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/DefaultRxtxChannelConfig.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/DefaultRxtxChannelConfig.java @@ -23,7 +23,14 @@ import io.netty.channel.RecvByteBufAllocator; import java.util.Map; -import static io.netty.channel.rxtx.RxtxChannelOption.*; +import static io.netty.channel.rxtx.RxtxChannelOption.BAUD_RATE; +import static io.netty.channel.rxtx.RxtxChannelOption.DATA_BITS; +import static io.netty.channel.rxtx.RxtxChannelOption.DTR; +import static io.netty.channel.rxtx.RxtxChannelOption.PARITY_BIT; +import static io.netty.channel.rxtx.RxtxChannelOption.READ_TIMEOUT; +import static io.netty.channel.rxtx.RxtxChannelOption.RTS; +import static io.netty.channel.rxtx.RxtxChannelOption.STOP_BITS; +import static io.netty.channel.rxtx.RxtxChannelOption.WAIT_TIME; /** * Default configuration class for RXTX device connections. @@ -205,6 +212,7 @@ final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements Rxt } @Override + @Deprecated public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelConfig.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelConfig.java index 9321198a25..6fc63d5bf8 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelConfig.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelConfig.java @@ -274,6 +274,7 @@ public interface RxtxChannelConfig extends ChannelConfig { RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java index 79f540a833..39b685d1d0 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java @@ -15,7 +15,6 @@ */ package io.netty.channel.sctp; - import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpStandardSocketOptions; import io.netty.buffer.ByteBufAllocator; @@ -29,8 +28,10 @@ import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.util.Map; -import static io.netty.channel.ChannelOption.*; -import static io.netty.channel.sctp.SctpChannelOption.*; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_SNDBUF; +import static io.netty.channel.sctp.SctpChannelOption.SCTP_INIT_MAXSTREAMS; +import static io.netty.channel.sctp.SctpChannelOption.SCTP_NODELAY; /** * The default {@link SctpChannelConfig} implementation for SCTP. @@ -180,6 +181,7 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc } @Override + @Deprecated public SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java index cc13cc9033..e89d71858e 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java @@ -157,6 +157,7 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme } @Override + @Deprecated public SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelConfig.java index d0b23c8613..e4e41a974c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelConfig.java @@ -102,6 +102,7 @@ public interface SctpChannelConfig extends ChannelConfig { SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelOption.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelOption.java index 7e171e72ca..506e42ae50 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelOption.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelOption.java @@ -20,8 +20,6 @@ import io.netty.channel.ChannelOption; import java.net.SocketAddress; -import static io.netty.channel.ChannelOption.*; - /** * Option for configuring the SCTP transport */ diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelConfig.java index 20a7669edb..1d3c94b00e 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelConfig.java @@ -94,6 +94,7 @@ public interface SctpServerChannelConfig extends ChannelConfig { SctpServerChannelConfig setInitMaxStreams(InitMaxStreams initMaxStreams); @Override + @Deprecated SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 7dfdc7a044..ae14b96047 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -274,15 +274,16 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett if (messageInfo == null) { return 0; } - buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos))); + + allocHandle.lastBytesRead(data.position() - pos); + buf.add(new SctpMessage(messageInfo, + buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead()))); free = false; return 1; } catch (Throwable cause) { PlatformDependent.throwException(cause); return -1; } finally { - int bytesRead = buffer.readableBytes(); - allocHandle.record(bytesRead); if (free) { buffer.release(); } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 1a0d50b8de..5e1fc2f3bf 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -46,7 +46,7 @@ import java.util.Set; */ public class NioSctpServerChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpServerChannel { - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static SctpServerChannel newSocket() { try { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 8abedfa308..d0a9c10d2e 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -19,6 +19,7 @@ import com.sun.nio.sctp.Association; import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -184,8 +185,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel Set reableKeys = readSelector.selectedKeys(); try { - for (SelectionKey ignored : reableKeys) { - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + for (@SuppressWarnings("unused") SelectionKey ignored : reableKeys) { + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); ByteBuf buffer = allocHandle.allocate(config().getAllocator()); boolean free = true; @@ -197,14 +198,14 @@ public class OioSctpChannel extends AbstractOioMessageChannel } data.flip(); - msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining()))); + allocHandle.lastBytesRead(data.remaining()); + msgs.add(new SctpMessage(messageInfo, + buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead()))); free = false; - readMessages ++; + ++readMessages; } catch (Throwable cause) { PlatformDependent.throwException(cause); } finally { - int bytesRead = buffer.readableBytes(); - allocHandle.record(bytesRead); if (free) { buffer.release(); } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index fc592b5c9f..b396a9c780 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -53,7 +53,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioSctpServerChannel.class); - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static SctpServerChannel newServerSocket() { try { diff --git a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java index 4867dfcd2b..9681cb1fc4 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java @@ -27,8 +27,14 @@ import io.netty.channel.RecvByteBufAllocator; import java.io.IOException; import java.util.Map; -import static io.netty.channel.ChannelOption.*; -import static io.netty.channel.udt.UdtChannelOption.*; +import static io.netty.channel.ChannelOption.SO_LINGER; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; +import static io.netty.channel.ChannelOption.SO_SNDBUF; +import static io.netty.channel.udt.UdtChannelOption.PROTOCOL_RECEIVE_BUFFER_SIZE; +import static io.netty.channel.udt.UdtChannelOption.PROTOCOL_SEND_BUFFER_SIZE; +import static io.netty.channel.udt.UdtChannelOption.SYSTEM_RECEIVE_BUFFER_SIZE; +import static io.netty.channel.udt.UdtChannelOption.SYSTEM_SEND_BUFFER_SIZE; /** * The default {@link UdtChannelConfig} implementation. @@ -241,6 +247,7 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements } @Override + @Deprecated public UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java index 6d5e81d697..8dc9680512 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java @@ -24,7 +24,7 @@ import io.netty.channel.RecvByteBufAllocator; import java.io.IOException; import java.util.Map; -import static io.netty.channel.ChannelOption.*; +import static io.netty.channel.ChannelOption.SO_BACKLOG; /** * The default {@link UdtServerChannelConfig} implementation. @@ -143,6 +143,7 @@ public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig } @Override + @Deprecated public UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java index 00c3ef5b21..6b1369bcdf 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java @@ -116,6 +116,7 @@ public interface UdtChannelConfig extends ChannelConfig { UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelOption.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelOption.java index 41a92a9c0f..a8deef35ec 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelOption.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelOption.java @@ -18,8 +18,6 @@ package io.netty.channel.udt; import com.barchart.udt.OptionUDT; import io.netty.channel.ChannelOption; -import static io.netty.channel.ChannelOption.*; - /** * Options for the UDT transport */ diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java index e7fbf8cc81..af938635dd 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java @@ -50,6 +50,7 @@ public interface UdtServerChannelConfig extends UdtChannelConfig { UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index 6b8471be8c..06b1bc6e18 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -43,7 +43,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im protected static final InternalLogger logger = InternalLoggerFactory.getInstance(NioUdtAcceptorChannel.class); - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private final UdtServerChannelConfig config; diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java index de47a8b94b..20adae249b 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java @@ -17,11 +17,13 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.SocketChannelUDT; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.FileRegion; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.udt.DefaultUdtChannelConfig; import io.netty.channel.udt.UdtChannel; @@ -42,7 +44,7 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioUdtByteConnectorChannel.class); - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private final UdtChannelConfig config; @@ -136,7 +138,9 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement @Override protected int doReadBytes(final ByteBuf byteBuf) throws Exception { - return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.attemptedBytesRead(byteBuf.writableBytes()); + return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index c6fc05e56e..126ce5fe62 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -29,8 +29,7 @@ import java.net.SocketAddress; * */ public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel { - - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); /** * Creates a new instance. diff --git a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java index 03dc710422..59df52414d 100644 --- a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java @@ -15,9 +15,6 @@ */ package io.netty.channel; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; - import java.util.ArrayList; import java.util.List; @@ -31,7 +28,7 @@ import java.util.List; * amount of the allocated buffer two times consecutively. Otherwise, it keeps * returning the same prediction. */ -public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { +public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; @@ -84,14 +81,14 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { } } - private static final class HandleImpl implements Handle { + private final class HandleImpl extends MaxMessageHandle { private final int minIndex; private final int maxIndex; private int index; private int nextReceiveBufferSize; private boolean decreaseNow; - HandleImpl(int minIndex, int maxIndex, int initial) { + public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; @@ -99,18 +96,12 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { nextReceiveBufferSize = SIZE_TABLE[index]; } - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(nextReceiveBufferSize); - } - @Override public int guess() { return nextReceiveBufferSize; } - @Override - public void record(int actualReadBytes) { + private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = Math.max(index - INDEX_DECREMENT, minIndex); @@ -125,6 +116,11 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { decreaseNow = false; } } + + @Override + public void readComplete() { + record(totalBytesRead()); + } } private final int minIndex; diff --git a/transport/src/main/java/io/netty/channel/ChannelConfig.java b/transport/src/main/java/io/netty/channel/ChannelConfig.java index 8f6d02cbc1..2798a7c357 100644 --- a/transport/src/main/java/io/netty/channel/ChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/ChannelConfig.java @@ -119,16 +119,22 @@ public interface ChannelConfig { ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); /** + * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} + *

* Returns the maximum number of messages to read per read loop. * a {@link ChannelHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ + @Deprecated int getMaxMessagesPerRead(); /** + * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} + *

* Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ + @Deprecated ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); /** @@ -165,14 +171,12 @@ public interface ChannelConfig { ChannelConfig setAllocator(ByteBufAllocator allocator); /** - * Returns {@link RecvByteBufAllocator} which is used for the channel - * to allocate receive buffers. + * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ - RecvByteBufAllocator getRecvByteBufAllocator(); + T getRecvByteBufAllocator(); /** - * Set the {@link ByteBufAllocator} which is used for the channel - * to allocate receive buffers. + * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index 44ab96e0da..d54b65b159 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -20,7 +20,6 @@ import io.netty.util.internal.InternalThreadLocalMap; import java.net.SocketAddress; import java.util.Map; -import java.util.WeakHashMap; /** * Skelton implementation of a {@link ChannelHandler}. diff --git a/transport/src/main/java/io/netty/channel/ChannelMetadata.java b/transport/src/main/java/io/netty/channel/ChannelMetadata.java index 628ad43fdd..de84d50964 100644 --- a/transport/src/main/java/io/netty/channel/ChannelMetadata.java +++ b/transport/src/main/java/io/netty/channel/ChannelMetadata.java @@ -23,6 +23,7 @@ import java.net.SocketAddress; public final class ChannelMetadata { private final boolean hasDisconnect; + private final int minMaxMessagesPerRead; /** * Create a new instance @@ -32,7 +33,24 @@ public final class ChannelMetadata { * again, such as UDP/IP. */ public ChannelMetadata(boolean hasDisconnect) { + this(hasDisconnect, 1); + } + + /** + * Create a new instance + * + * @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation + * that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} + * again, such as UDP/IP. + * @param minMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum + * value enforced for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}. + */ + public ChannelMetadata(boolean hasDisconnect, int minMaxMessagesPerRead) { + if (minMaxMessagesPerRead <= 0) { + throw new IllegalArgumentException("minMaxMessagesPerRead: " + minMaxMessagesPerRead + " (expected > 0)"); + } this.hasDisconnect = hasDisconnect; + this.minMaxMessagesPerRead = minMaxMessagesPerRead; } /** @@ -43,4 +61,12 @@ public final class ChannelMetadata { public boolean hasDisconnect() { return hasDisconnect; } + + /** + * If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum value enforced for + * {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. + */ + public int minMaxMessagesPerRead() { + return minMaxMessagesPerRead; + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 9a75dd6c69..fe67e7b11e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -76,6 +76,9 @@ public final class ChannelOption extends AbstractConstant> { public static final ChannelOption MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR"); public static final ChannelOption CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS"); + /** + * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} + */ public static final ChannelOption MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ"); public static final ChannelOption WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT"); public static final ChannelOption WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK"); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index abc70f1dbf..bc9629f5bb 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -16,16 +16,24 @@ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.SocketChannelConfig; import io.netty.util.internal.PlatformDependent; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.IdentityHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import static io.netty.channel.ChannelOption.*; +import static io.netty.channel.ChannelOption.ALLOCATOR; +import static io.netty.channel.ChannelOption.AUTO_CLOSE; +import static io.netty.channel.ChannelOption.AUTO_READ; +import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; +import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; +import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; +import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK; +import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK; +import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT; /** * The default {@link SocketChannelConfig} implementation. @@ -55,7 +63,6 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR; private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; - private volatile int maxMessagesPerRead; private volatile int writeSpinCount = 16; private volatile int autoRead = 1; private volatile int writeBufferHighWaterMark = 64 * 1024; @@ -66,16 +73,6 @@ public class DefaultChannelConfig implements ChannelConfig { throw new NullPointerException("channel"); } this.channel = channel; - - if (channel instanceof ServerChannel || channel instanceof AbstractNioByteChannel) { - // Server channels: Accept as many incoming connections as possible. - // NIO byte channels: Implemented to reduce unnecessary system calls even if it's > 1. - // See https://github.com/netty/netty/issues/2079 - // TODO: Add some property to ChannelMetadata so we can remove the ugly instanceof - maxMessagesPerRead = 16; - } else { - maxMessagesPerRead = 1; - } } @Override @@ -203,18 +200,41 @@ public class DefaultChannelConfig implements ChannelConfig { return this; } + /** + * {@inheritDoc} + *

+ * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type + * {@link MaxMessagesRecvByteBufAllocator}. + */ @Override + @Deprecated public int getMaxMessagesPerRead() { - return maxMessagesPerRead; + try { + MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator(); + return allocator.maxMessagesPerRead(); + } catch (ClassCastException e) { + throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " + + "MaxMessagesRecvByteBufAllocator", e); + } } + /** + * {@inheritDoc} + *

+ * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type + * {@link MaxMessagesRecvByteBufAllocator}. + */ @Override + @Deprecated public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { - if (maxMessagesPerRead <= 0) { - throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)"); + try { + MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator(); + allocator.maxMessagesPerRead(maxMessagesPerRead); + return this; + } catch (ClassCastException e) { + throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " + + "MaxMessagesRecvByteBufAllocator", e); } - this.maxMessagesPerRead = maxMessagesPerRead; - return this; } @Override @@ -246,16 +266,44 @@ public class DefaultChannelConfig implements ChannelConfig { return this; } + @SuppressWarnings("unchecked") @Override - public RecvByteBufAllocator getRecvByteBufAllocator() { - return rcvBufAllocator; + public T getRecvByteBufAllocator() { + return (T) rcvBufAllocator; } + /** + * {@inheritDoc} + *

+ * This method enforces the {@link ChannelMetadata#minMaxMessagesPerRead()}. If these predetermined limits + * are not appropriate for your use case consider extending the channel and overriding {@link Channel#metadata()}, + * or use {@link #setRecvByteBufAllocator(RecvByteBufAllocator, ChannelMetadata)}. + */ @Override public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + return setRecvByteBufAllocator(allocator, channel.metadata()); + } + + /** + * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. + * @param allocator the allocator to set. + * @param metadata Used to determine the {@link ChannelMetadata#minMaxMessagesPerRead()} if {@code allocator} + * is of type {@link MaxMessagesRecvByteBufAllocator}. + * @return this + */ + public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator == null) { throw new NullPointerException("allocator"); } + if (allocator instanceof MaxMessagesRecvByteBufAllocator) { + if (metadata == null) { + throw new NullPointerException("metadata"); + } + MaxMessagesRecvByteBufAllocator maxMsgAllocator = (MaxMessagesRecvByteBufAllocator) allocator; + if (maxMsgAllocator.maxMessagesPerRead() < metadata.minMaxMessagesPerRead()) { + maxMsgAllocator.maxMessagesPerRead(metadata.minMaxMessagesPerRead()); + } + } rcvBufAllocator = allocator; return this; } diff --git a/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java new file mode 100644 index 0000000000..430b5f14b0 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java @@ -0,0 +1,190 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.AbstractMap; +import java.util.Map.Entry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +/** + * The {@link RecvByteBufAllocator} that yields a buffer size prediction based upon decrementing the value from + * the max bytes per read. + */ +public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufAllocator { + private volatile int maxBytesPerRead; + private volatile int maxBytesPerIndividualRead; + + private final class HandleImpl implements Handle { + private int individualReadMax; + private int bytesToRead; + private int lastBytesRead; + private int attemptBytesRead; + + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess()); + } + + @Override + public int guess() { + return Math.min(individualReadMax, bytesToRead); + } + + @Override + public void reset(ChannelConfig config) { + bytesToRead = maxBytesPerRead(); + individualReadMax = maxBytesPerIndividualRead(); + } + + @Override + public void incMessagesRead(int amt) { + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + // Ignore if bytes is negative, the interface contract states it will be detected externally after call. + // The value may be "invalid" after this point, but it doesn't matter because reading will be stopped. + bytesToRead -= bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public boolean continueReading() { + // Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided. + return bytesToRead > 0 && attemptBytesRead == lastBytesRead; + } + + @Override + public void readComplete() { + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptBytesRead; + } + } + + public DefaultMaxBytesRecvByteBufAllocator() { + this(64 * 1024, 64 * 1024); + } + + public DefaultMaxBytesRecvByteBufAllocator(int maxBytesPerRead, int maxBytesPerIndividualRead) { + checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead); + this.maxBytesPerRead = maxBytesPerRead; + this.maxBytesPerIndividualRead = maxBytesPerIndividualRead; + } + + @Override + public Handle newHandle() { + return new HandleImpl(); + } + + @Override + public int maxBytesPerRead() { + return maxBytesPerRead; + } + + @Override + public DefaultMaxBytesRecvByteBufAllocator maxBytesPerRead(int maxBytesPerRead) { + if (maxBytesPerRead <= 0) { + throw new IllegalArgumentException("maxBytesPerRead: " + maxBytesPerRead + " (expected: > 0)"); + } + // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b). + // Write operations must be synchronized, but independent read operations can just be volatile. + synchronized (this) { + final int maxBytesPerIndividualRead = maxBytesPerIndividualRead(); + if (maxBytesPerRead < maxBytesPerIndividualRead) { + throw new IllegalArgumentException( + "maxBytesPerRead cannot be less than " + + "maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead); + } + + this.maxBytesPerRead = maxBytesPerRead; + } + return this; + } + + @Override + public int maxBytesPerIndividualRead() { + return maxBytesPerIndividualRead; + } + + @Override + public DefaultMaxBytesRecvByteBufAllocator maxBytesPerIndividualRead(int maxBytesPerIndividualRead) { + if (maxBytesPerIndividualRead <= 0) { + throw new IllegalArgumentException( + "maxBytesPerIndividualRead: " + maxBytesPerIndividualRead + " (expected: > 0)"); + } + // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b). + // Write operations must be synchronized, but independent read operations can just be volatile. + synchronized (this) { + final int maxBytesPerRead = maxBytesPerRead(); + if (maxBytesPerIndividualRead > maxBytesPerRead) { + throw new IllegalArgumentException( + "maxBytesPerIndividualRead cannot be greater than " + + "maxBytesPerRead (" + maxBytesPerRead + "): " + maxBytesPerIndividualRead); + } + + this.maxBytesPerIndividualRead = maxBytesPerIndividualRead; + } + return this; + } + + @Override + public synchronized Entry maxBytesPerReadPair() { + return new AbstractMap.SimpleEntry(maxBytesPerRead, maxBytesPerIndividualRead); + } + + private void checkMaxBytesPerReadPair(int maxBytesPerRead, int maxBytesPerIndividualRead) { + if (maxBytesPerRead <= 0) { + throw new IllegalArgumentException("maxBytesPerRead: " + maxBytesPerRead + " (expected: > 0)"); + } + if (maxBytesPerIndividualRead <= 0) { + throw new IllegalArgumentException( + "maxBytesPerIndividualRead: " + maxBytesPerIndividualRead + " (expected: > 0)"); + } + if (maxBytesPerRead < maxBytesPerIndividualRead) { + throw new IllegalArgumentException( + "maxBytesPerRead cannot be less than " + + "maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead); + } + } + + @Override + public DefaultMaxBytesRecvByteBufAllocator maxBytesPerReadPair(int maxBytesPerRead, + int maxBytesPerIndividualRead) { + checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead); + // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b). + // Write operations must be synchronized, but independent read operations can just be volatile. + synchronized (this) { + this.maxBytesPerRead = maxBytesPerRead; + this.maxBytesPerIndividualRead = maxBytesPerIndividualRead; + } + return this; + } +} diff --git a/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java new file mode 100644 index 0000000000..28d89e2909 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java @@ -0,0 +1,123 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +/** + * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()} + * and also prevents overflow. + */ +public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { + private volatile int maxMessagesPerRead; + + public DefaultMaxMessagesRecvByteBufAllocator() { + this(1); + } + + public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) { + maxMessagesPerRead(maxMessagesPerRead); + } + + @Override + public int maxMessagesPerRead() { + return maxMessagesPerRead; + } + + @Override + public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) { + if (maxMessagesPerRead <= 0) { + throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)"); + } + this.maxMessagesPerRead = maxMessagesPerRead; + return this; + } + + /** + * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}. + */ + public abstract class MaxMessageHandle implements Handle { + private ChannelConfig config; + private int maxMessagePerRead; + private int totalMessages; + private int totalBytesRead; + private int attemptedBytesRead; + private int lastBytesRead; + + /** + * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used. + */ + @Override + public void reset(ChannelConfig config) { + this.config = config; + maxMessagePerRead = maxMessagesPerRead(); + totalMessages = totalBytesRead = 0; + } + + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess()); + } + + @Override + public final void incMessagesRead(int amt) { + totalMessages += amt; + } + + @Override + public final void lastBytesRead(int bytes) { + lastBytesRead = bytes; + // Ignore if bytes is negative, the interface contract states it will be detected externally after call. + // The value may be "invalid" after this point, but it doesn't matter because reading will be stopped. + totalBytesRead += bytes; + if (totalBytesRead < 0) { + totalBytesRead = Integer.MAX_VALUE; + } + } + + @Override + public final int lastBytesRead() { + return lastBytesRead; + } + + @Override + public boolean continueReading() { + return config.isAutoRead() && + attemptedBytesRead == lastBytesRead && + totalMessages < maxMessagePerRead && + totalBytesRead < Integer.MAX_VALUE; + } + + @Override + public void readComplete() { + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + protected final int totalBytesRead() { + return totalBytesRead; + } + } +} diff --git a/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java index b4087bae66..839089e230 100644 --- a/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java @@ -15,36 +15,23 @@ */ package io.netty.channel; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; - - /** * The {@link RecvByteBufAllocator} that always yields the same buffer * size prediction. This predictor ignores the feed back from the I/O thread. */ -public class FixedRecvByteBufAllocator implements RecvByteBufAllocator { - - private static final class HandleImpl implements Handle { +public class FixedRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { + private final class HandleImpl extends MaxMessageHandle { private final int bufferSize; - HandleImpl(int bufferSize) { + public HandleImpl(int bufferSize) { this.bufferSize = bufferSize; } - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(bufferSize); - } - @Override public int guess() { return bufferSize; } - - @Override - public void record(int actualReadBytes) { } } private final Handle handle; diff --git a/transport/src/main/java/io/netty/channel/MaxBytesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/MaxBytesRecvByteBufAllocator.java new file mode 100644 index 0000000000..f21cdd1b2b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/MaxBytesRecvByteBufAllocator.java @@ -0,0 +1,65 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.Map.Entry; + +/** + * {@link RecvByteBufAllocator} that limits a read operation based upon a maximum value per individual read + * and a maximum amount when a read operation is attempted by the event loop. + */ +public interface MaxBytesRecvByteBufAllocator extends RecvByteBufAllocator { + /** + * Returns the maximum number of bytes to read per read loop. + * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + */ + int maxBytesPerRead(); + + /** + * Sets the maximum number of bytes to read per read loop. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + */ + MaxBytesRecvByteBufAllocator maxBytesPerRead(int maxBytesPerRead); + + /** + * Returns the maximum number of bytes to read per individual read operation. + * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + */ + int maxBytesPerIndividualRead(); + + /** + * Sets the maximum number of bytes to read per individual read operation. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + */ + MaxBytesRecvByteBufAllocator maxBytesPerIndividualRead(int maxBytesPerIndividualRead); + + /** + * Atomic way to get the maximum number of bytes to read for a read loop and per individual read operation. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + * @return The Key is from {@link #maxBytesPerRead()}. The Value is from {@link #maxBytesPerIndividualRead()} + */ + Entry maxBytesPerReadPair(); + + /** + * Sets the maximum number of bytes to read for a read loop and per individual read operation. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes. + * @param maxBytesPerRead {@see #setMaxBytesPerRead(int)} + * @param maxBytesPerIndividualRead {@see #setMaxBytesPerIndividualRead(int)} + */ + MaxBytesRecvByteBufAllocator maxBytesPerReadPair(int maxBytesPerRead, int maxBytesPerIndividualRead); +} diff --git a/transport/src/main/java/io/netty/channel/MaxMessagesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/MaxMessagesRecvByteBufAllocator.java new file mode 100644 index 0000000000..c4ed288ac5 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/MaxMessagesRecvByteBufAllocator.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +/** + * {@link RecvByteBufAllocator} that limits the number of read operations that will be attempted when a read operation + * is attempted by the event loop. + */ +public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator { + /** + * Returns the maximum number of messages to read per read loop. + * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. + */ + int maxMessagesPerRead(); + + /** + * Sets the maximum number of messages to read per read loop. + * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. + */ + MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead); +} diff --git a/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java index cbf3fb3ebb..24b02dc126 100644 --- a/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough @@ -44,11 +45,122 @@ public interface RecvByteBufAllocator { int guess(); /** - * Records the the actual number of read bytes in the previous read operation so that the allocator allocates - * the buffer with potentially more correct capacity. - * - * @param actualReadBytes the actual number of read bytes in the previous read operation + * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next + * read loop. + *

+ * This may be used by {@link #continueReading()} to determine if the read operation should complete. + *

+ * This is only ever a hint and may be ignored by the implementation. + * @param config The channel configuration which may impact this object's behavior. */ - void record(int actualReadBytes); + void reset(ChannelConfig config); + + /** + * Increment the number of messages that have been read for the current read loop. + * @param numMessages The amount to increment by. + */ + void incMessagesRead(int numMessages); + + /** + * Set the bytes that have been read for the last read operation. + * This may be used to increment the number of bytes that have been read. + * @param bytes The number of bytes from the previous read operation. This may be negative if an read error + * occurs. If a negative value is seen it is expected to be return on the next call to + * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally + * to this class and is not required to be enforced in {@link #continueReading()}. + */ + void lastBytesRead(int bytes); + + /** + * Get the amount of bytes for the previous read operation. + * @return The amount of bytes for the previous read operation. + */ + int lastBytesRead(); + + /** + * Set how many bytes the read operation will (or did) attempt to read. + * @param bytes How many bytes the read operation will (or did) attempt to read. + */ + void attemptedBytesRead(int bytes); + + /** + * Get how many bytes the read operation will (or did) attempt to read. + * @return How many bytes the read operation will (or did) attempt to read. + */ + int attemptedBytesRead(); + + /** + * Determine if the current read loop should should continue. + * @param totalMessages The total number of messages read so far by this read loop. + * @param lastReadBytes The number of bytes read from the previous read operation. + * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete. + */ + boolean continueReading(); + + /** + * The read has completed. + */ + void readComplete(); + } + + /** + * A {@link Handle} which delegates all call to some other {@link Handle}. + */ + class DelegatingHandle implements Handle { + private final Handle delegate; + + public DelegatingHandle(Handle delegate) { + this.delegate = checkNotNull(delegate, "delegate"); + } + + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return delegate.allocate(alloc); + } + + @Override + public int guess() { + return delegate.guess(); + } + + @Override + public void reset(ChannelConfig config) { + delegate.reset(config); + } + + @Override + public void incMessagesRead(int numMessages) { + delegate.incMessagesRead(numMessages); + } + + @Override + public void lastBytesRead(int bytes) { + delegate.lastBytesRead(bytes); + } + + @Override + public int lastBytesRead() { + return delegate.lastBytesRead(); + } + + @Override + public boolean continueReading() { + return delegate.continueReading(); + } + + @Override + public int attemptedBytesRead() { + return delegate.attemptedBytesRead(); + } + + @Override + public void attemptedBytesRead(int bytes) { + delegate.attemptedBytesRead(bytes); + } + + @Override + public void readComplete() { + delegate.readComplete(); + } } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index dc05768767..74dd5d5f32 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -282,6 +282,8 @@ public class LocalChannel extends AbstractChannel { throw new NotYetConnectedException(); case CLOSED: throw new ClosedChannelException(); + case CONNECTED: + break; } final LocalChannel peer = this.peer; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 80bf96cc65..bfdb4e11f5 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -72,8 +72,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } - private void handleReadException(ChannelPipeline pipeline, - ByteBuf byteBuf, Throwable cause, boolean close) { + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, + RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { setReadPending(false); @@ -82,6 +82,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { byteBuf.release(); } } + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { @@ -100,62 +101,39 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); - RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); + final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); ByteBuf byteBuf = null; - int messages = 0; - boolean close = false; try { - int totalReadAmount = 0; - boolean readPendingReset = false; + boolean needReadPendingReset = true; do { byteBuf = allocHandle.allocate(allocator); - int writable = byteBuf.writableBytes(); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount <= 0) { - // not was read release the buffer + allocHandle.lastBytesRead(doReadBytes(byteBuf)); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; - close = localReadAmount < 0; break; } - if (!readPendingReset) { - readPendingReset = true; + + allocHandle.incMessagesRead(1); + if (needReadPendingReset) { + needReadPendingReset = false; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; + } while (allocHandle.continueReading()); - if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { - // Avoid overflow. - totalReadAmount = Integer.MAX_VALUE; - break; - } - - totalReadAmount += localReadAmount; - - // stop reading - if (!config.isAutoRead()) { - break; - } - - if (localReadAmount < writable) { - // Read less than what the buffer can hold, - // which might mean we drained the recv buffer completely. - break; - } - } while (++ messages < maxMessagesPerRead); - + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); - allocHandle.record(totalReadAmount); - if (close) { + if (allocHandle.lastBytesRead() < 0) { closeOnRead(pipeline); - close = false; } } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, close); + handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index c101872fb3..d9b6dbbc28 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -19,6 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import java.io.IOException; @@ -59,13 +60,16 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { return; } - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config); + boolean closed = false; Throwable exception = null; try { try { - for (;;) { + boolean needReadPendingReset = true; + do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; @@ -75,25 +79,22 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { break; } - // stop reading and remove op - if (!config.isAutoRead()) { - break; + allocHandle.incMessagesRead(localRead); + if (needReadPendingReset) { + needReadPendingReset = false; + setReadPending(false); } - - if (readBuf.size() >= maxMessagesPerRead) { - break; - } - } + } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } - setReadPending(false); + int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } - readBuf.clear(); + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { @@ -107,6 +108,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } if (closed) { + setInputShutdown(); if (isOpen()) { close(voidPromise()); } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index b65753636d..a5c438337f 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.oio; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelMetadata; @@ -73,47 +74,83 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { return false; } + void setInputShutdown() { + inputShutdown = true; + } + + private void closeOnRead(ChannelPipeline pipeline) { + setInputShutdown(); + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + unsafe().close(unsafe().voidPromise()); + } + } + } + + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, + RecvByteBufAllocator.Handle allocHandle) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + setReadPending(false); + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + } + } + @Override protected void doRead() { - if (checkInputShutdown()) { + final ChannelConfig config = config(); + if (isInputShutdown() || !config.isAutoRead() && !isReadPending()) { + // ChannelConfig.setAutoRead(false) was called in the meantime return; } - final ChannelConfig config = config(); + // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. + setReadPending(false); + final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config); - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - - ByteBuf byteBuf = allocHandle.allocate(alloc()); - - boolean closed = false; + ByteBuf byteBuf = null; boolean read = false; - Throwable exception = null; - int localReadAmount = 0; try { - int totalReadAmount = 0; - - for (;;) { - localReadAmount = doReadBytes(byteBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; + byteBuf = allocHandle.allocate(allocator); + do { + allocHandle.lastBytesRead(doReadBytes(byteBuf)); + if (allocHandle.lastBytesRead() <= 0) { + if (!read) { // nothing was read. release the buffer. + byteBuf.release(); + byteBuf = null; + } + break; } + read = true; final int available = available(); if (available <= 0) { break; } + // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline. if (!byteBuf.isWritable()) { final int capacity = byteBuf.capacity(); final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { - if (read) { - read = false; - pipeline.fireChannelRead(byteBuf); - byteBuf = alloc().buffer(); - } + allocHandle.incMessagesRead(1); + read = false; + pipeline.fireChannelRead(byteBuf); + byteBuf = allocHandle.allocate(allocator); } else { final int writerIndex = byteBuf.writerIndex(); if (writerIndex + available > maxCapacity) { @@ -123,55 +160,23 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } } + } while (allocHandle.continueReading()); - if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { - // Avoid overflow. - totalReadAmount = Integer.MAX_VALUE; - break; - } - - totalReadAmount += localReadAmount; - - if (!config.isAutoRead()) { - // stop reading until next Channel.read() call - // See https://github.com/netty/netty/issues/1363 - break; - } - } - allocHandle.record(totalReadAmount); - - } catch (Throwable t) { - exception = t; - } finally { if (read) { pipeline.fireChannelRead(byteBuf); - } else { - // nothing read into the buffer so release it - byteBuf.release(); + byteBuf = null; } + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); - if (exception != null) { - if (exception instanceof IOException) { - closed = true; - pipeline().fireExceptionCaught(exception); - } else { - pipeline.fireExceptionCaught(exception); - unsafe().close(voidPromise()); - } - } - if (closed) { - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - unsafe().close(unsafe().voidPromise()); - } - } + if (allocHandle.lastBytesRead() < 0) { + closeOnRead(pipeline); } - if (localReadAmount == 0 && isActive()) { + } catch (Throwable t) { + handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); + } finally { + if (allocHandle.lastBytesRead() == 0 && isActive()) { // If the read amount was 0 and the channel is still active we need to trigger a new read() // as otherwise we will never try to read again and the user will never know. // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 1c7619714b..553eb3156a 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -35,12 +35,6 @@ public abstract class AbstractOioChannel extends AbstractChannel { private final Runnable readTask = new Runnable() { @Override public void run() { - if (!isReadPending() && !config().isAutoRead()) { - // ChannelConfig.setAutoRead(false) was called in the meantime so just return - return; - } - - setReadPending(false); doRead(); } }; diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index f10ee86611..e9a30cd456 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -18,6 +18,7 @@ package io.netty.channel.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; +import io.netty.channel.RecvByteBufAllocator; import java.io.IOException; import java.util.ArrayList; @@ -37,17 +38,23 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { @Override protected void doRead() { final ChannelConfig config = config(); - final ChannelPipeline pipeline = pipeline(); - boolean closed = false; - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); + if (!config.isAutoRead() && !isReadPending()) { + // ChannelConfig.setAutoRead(false) was called in the meantime + return; + } + // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. + setReadPending(false); + final ChannelPipeline pipeline = pipeline(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config); + + boolean closed = false; Throwable exception = null; - int localRead = 0; - int totalRead = 0; try { - for (;;) { + do { // Perform a read. - localRead = doReadMessages(readBuf); + int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } @@ -56,24 +63,18 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { break; } - // Notify with the received messages and clear the buffer. - int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); - } - readBuf.clear(); - - // Do not read beyond maxMessagesPerRead. - // Do not continue reading if autoRead has been turned off. - totalRead += localRead; - if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) { - break; - } - } + allocHandle.incMessagesRead(localRead); + } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { @@ -81,14 +82,14 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { closed = true; } - pipeline().fireExceptionCaught(exception); + pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { unsafe().close(unsafe().voidPromise()); } - } else if (localRead == 0 && isActive()) { + } else if (allocHandle.lastBytesRead() == 0 && isActive()) { // If the read amount was 0 and the channel is still active we need to trigger a new read() // as otherwise we will never try to read again and the user will never know. // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index ff83ae588a..758acb236d 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -18,6 +18,7 @@ package io.netty.channel.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.FileRegion; +import io.netty.channel.RecvByteBufAllocator; import java.io.EOFException; import java.io.IOException; @@ -103,8 +104,9 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { @Override protected int doReadBytes(ByteBuf buf) throws Exception { - int length = Math.max(1, Math.min(available(), buf.maxWritableBytes())); - return buf.writeBytes(is, length); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.attemptedBytesRead(Math.max(1, Math.min(available(), buf.maxWritableBytes()))); + return buf.writeBytes(is, allocHandle.attemptedBytesRead()); } @Override diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java index d9e5d014f2..5d76aaf3cc 100644 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java @@ -20,7 +20,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.Closeable; -import java.io.IOException; /** * Allows to acquire and release {@link Channel} and so act as a pool of these. diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java index 478819461d..a136b16ae5 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java @@ -157,6 +157,7 @@ public interface DatagramChannelConfig extends ChannelConfig { DatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface); @Override + @Deprecated DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index 1d77ee7334..a0cde10ee1 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -373,6 +373,7 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement } @Override + @Deprecated public DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java index 6d04a97a8c..066618300a 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java @@ -27,7 +27,9 @@ import java.net.ServerSocket; import java.net.SocketException; import java.util.Map; -import static io.netty.channel.ChannelOption.*; +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; /** * The default {@link ServerSocketChannelConfig} implementation. @@ -152,6 +154,7 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig } @Override + @Deprecated public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java index 88f86162ef..0c171becaa 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java @@ -286,6 +286,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig } @Override + @Deprecated public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java index 20dea0e7c2..1bf8efc697 100644 --- a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java @@ -88,6 +88,7 @@ public interface ServerSocketChannelConfig extends ChannelConfig { ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java index 5eb69d91fc..3caa1de34e 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java @@ -161,6 +161,7 @@ public interface SocketChannelConfig extends ChannelConfig { SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index f43cf189f7..7b65e16dae 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -232,6 +232,7 @@ public final class NioDatagramChannel RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); ByteBuf data = allocHandle.allocate(config.getAllocator()); + allocHandle.attemptedBytesRead(data.writableBytes()); boolean free = true; try { ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); @@ -241,11 +242,9 @@ public final class NioDatagramChannel return 0; } - int readBytes = nioData.position() - pos; - data.writerIndex(data.writerIndex() + readBytes); - allocHandle.record(readBytes); - - buf.add(new DatagramPacket(data, localAddress(), remoteAddress)); + allocHandle.lastBytesRead(nioData.position() - pos); + buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()), + localAddress(), remoteAddress)); free = false; return 1; } catch (Throwable cause) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 3281d193bb..a7e6fa6e10 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -41,7 +41,7 @@ import java.util.List; public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 9c817bbbbb..9077de1e51 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; @@ -46,7 +47,7 @@ import java.util.concurrent.Executor; */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static SocketChannel newSocket(SelectorProvider provider) { @@ -238,7 +239,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { - return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.attemptedBytesRead(byteBuf.writableBytes()); + return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java index 49edd6cef4..1cbfac2567 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java @@ -121,6 +121,7 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan } @Override + @Deprecated public OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java index 92de3da08e..66b7c84f4f 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java @@ -149,6 +149,7 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im } @Override + @Deprecated public OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 69d729eede..a5065fe66e 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -200,7 +200,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel @Override protected int doReadMessages(List buf) throws Exception { DatagramChannelConfig config = config(); - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess()); boolean free = true; @@ -210,9 +210,8 @@ public class OioDatagramChannel extends AbstractOioMessageChannel InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress(); - int readBytes = tmpPacket.getLength(); - allocHandle.record(readBytes); - buf.add(new DatagramPacket(data.writerIndex(readBytes), localAddress(), remoteAddr)); + allocHandle.lastBytesRead(tmpPacket.getLength()); + buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr)); free = false; return 1; } catch (SocketTimeoutException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 9574459d33..65943e81d9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -44,7 +44,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioServerSocketChannel.class); - private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static ServerSocket newServerSocket() { try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannelConfig.java index 697c6d2da3..e471992f59 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannelConfig.java @@ -67,6 +67,7 @@ public interface OioServerSocketChannelConfig extends ServerSocketChannelConfig OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannelConfig.java index 8bea404a51..1e032d3c06 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannelConfig.java @@ -82,6 +82,7 @@ public interface OioSocketChannelConfig extends SocketChannelConfig { OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); @Override + @Deprecated OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); @Override