maxBytesPerRead channel configuration

Motiviation:
The current read loops don't fascilitate reading a maximum amount of bytes. This capability is useful to have more fine grain control over how much data is injested.

Modifications:
- Add a setMaxBytesPerRead(int) and getMaxBytesPerRead() to ChannelConfig
- Add a setMaxBytesPerIndividualRead(int) and getMaxBytesPerIndividualRead to ChannelConfig
- Add methods to RecvByteBufAllocator so that a pluggable scheme can be used to control the behavior of the read loop.
- Modify read loop for all transport types to respect the new RecvByteBufAllocator API

Result:
The ability to control how many bytes are read for each read operation/loop, and a more extensible read loop.
This commit is contained in:
Scott Mitchell 2015-06-04 18:36:55 -07:00
parent a57c16ea39
commit a48e5c7347
73 changed files with 1096 additions and 382 deletions

View File

@ -44,6 +44,8 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
buf = new AdvancedLeakAwareByteBuf(buf, leak); buf = new AdvancedLeakAwareByteBuf(buf, leak);
} }
break; break;
default:
break;
} }
return buf; return buf;
} }

View File

@ -16,7 +16,6 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.IllegalReferenceCountException;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;

View File

@ -23,6 +23,7 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
@ -34,7 +35,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnresolvedAddressException;
abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel { abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final int readFlag; private final int readFlag;
private final FileDescriptor fileDescriptor; private final FileDescriptor fileDescriptor;
protected int flags = Native.EPOLLET; protected int flags = Native.EPOLLET;
@ -93,7 +94,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
@Override @Override
public ChannelMetadata metadata() { public ChannelMetadata metadata() {
return DATA; return METADATA;
} }
@Override @Override
@ -230,6 +231,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected final int doReadBytes(ByteBuf byteBuf) throws Exception { protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
int writerIndex = byteBuf.writerIndex(); int writerIndex = byteBuf.writerIndex();
int localReadAmount; int localReadAmount;
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
if (byteBuf.hasMemoryAddress()) { if (byteBuf.hasMemoryAddress()) {
localReadAmount = Native.readAddress( localReadAmount = Native.readAddress(
fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
@ -294,6 +296,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending; protected boolean readPending;
private EpollRecvByteAllocatorHandle allocHandle;
/** /**
* Called once EPOLLIN event is ready to be processed * Called once EPOLLIN event is ready to be processed
@ -307,6 +310,20 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// NOOP // NOOP
} }
@Override
public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
allocHandle = newEpollHandle(super.recvBufAllocHandle());
}
return allocHandle;
}
/**
* Create a new {@EpollRecvByteAllocatorHandle} instance.
* @param handle The handle to wrap with EPOLL specific logic.
*/
protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle);
@Override @Override
protected void flush0() { protected void flush0() {
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.

View File

@ -17,18 +17,20 @@ package io.netty.channel.epoll;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
protected AbstractEpollServerChannel(int fd) { protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLIN); super(fd, Native.EPOLLIN);
@ -38,6 +40,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0); super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
} }
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override @Override
protected boolean isCompatible(EventLoop loop) { protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop; return loop instanceof EpollEventLoop;
@ -77,6 +84,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
channelPromise.setFailure(new UnsupportedOperationException()); channelPromise.setFailure(new UnsupportedOperationException());
} }
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
}
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
@ -90,13 +102,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
} }
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null; Throwable exception = null;
try { try {
try { try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
do { do {
int socketFd = Native.accept(fd().intValue(), acceptedAddress); int socketFd = Native.accept(fd().intValue(), acceptedAddress);
if (socketFd == -1) { if (socketFd == -1) {
@ -104,26 +115,23 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
break; break;
} }
readPending = false; readPending = false;
allocHandle.incMessagesRead(1);
try { try {
int len = acceptedAddress[0]; int len = acceptedAddress[0];
pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len)); pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
} catch (Throwable t) { } catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket if (edgeTriggered) { // We must keep reading if ET is enabled
pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t);
pipeline.fireExceptionCaught(t); } else {
} finally { throw t;
if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
} }
} while (++ messages < maxMessagesPerRead); } while (allocHandle.continueReading());
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
} }
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { if (exception != null) {

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
@ -48,8 +49,6 @@ import java.util.Queue;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private static final String EXPECTED_TYPES = private static final String EXPECTED_TYPES =
@ -595,9 +594,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
class EpollStreamUnsafe extends AbstractEpollUnsafe { class EpollStreamUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
private void closeOnRead(ChannelPipeline pipeline) { private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true; inputShutdown = true;
if (isOpen()) { if (isOpen()) {
@ -619,6 +615,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
byteBuf.release(); byteBuf.release();
} }
} }
recvBufAllocHandle().readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
@ -770,7 +767,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
void epollRdHupReady() { void epollRdHupReady() {
if (isActive()) { if (isActive()) {
// If it is still active, we need to call epollInReady as otherwise we may miss to // If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underyling file descriptor. // read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709 // See https://github.com/netty/netty/issues/3709
epollInReady(); epollInReady();
} else { } else {
@ -778,6 +775,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
} }
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET));
}
@Override @Override
void epollInReady() { void epollInReady() {
final ChannelConfig config = config(); final ChannelConfig config = config();
@ -791,84 +793,69 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
if (allocHandle == null) { allocHandle.reset(config);
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
int totalReadAmount = 0;
do { do {
SpliceInTask spliceTask = spliceQueue.peek(); try {
if (spliceTask != null) { SpliceInTask spliceTask = spliceQueue.peek();
if (spliceTask.spliceIn(allocHandle)) { if (spliceTask != null) {
// We need to check if it is still active as if not we removed all SpliceTasks in if (spliceTask.spliceIn(allocHandle)) {
// doClose(...) // We need to check if it is still active as if not we removed all SpliceTasks in
if (isActive()) { // doClose(...)
spliceQueue.remove(); if (isActive()) {
spliceQueue.remove();
}
continue;
} else {
break;
} }
continue; }
} else {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break; break;
} }
readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} catch (Throwable t) {
if (edgeTriggered) { // We must keep reading if ET is enabled
if (byteBuf != null) {
byteBuf.release();
byteBuf = null;
}
pipeline.fireExceptionCaught(t);
} else {
// byteBuf is release in outer exception handling if necessary.
throw t;
}
} }
} while (allocHandle.continueReading());
// we use a direct buffer here as the native implementations only be able allocHandle.readComplete();
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
allocHandle.record(totalReadAmount);
// Avoid overflow.
totalReadAmount = localReadAmount;
} else {
totalReadAmount += localReadAmount;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) { if (close) {
closeOnRead(pipeline); closeOnRead(pipeline);
close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close); boolean closed = handleReadException(pipeline, byteBuf, t, close);
if (!closed) { if (!closed) {
// trigger a read again as there may be something left to read and because of epoll ET we // trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket // will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() { eventLoop().execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
epollInReady(); epollInReady();
@ -919,8 +906,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
length -= localSplicedIn; length -= localSplicedIn;
} }
// record the number of bytes we spliced before
handle.record(splicedIn);
return splicedIn; return splicedIn;
} }
} }

View File

@ -65,6 +65,7 @@ public class EpollChannelConfig extends DefaultChannelConfig {
} }
@Override @Override
@Deprecated
public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AddressedEnvelope; import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -473,7 +474,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private final List<Object> readBuf = new ArrayList<Object>(); private final List<Object> readBuf = new ArrayList<Object>();
@Override @Override
@ -511,11 +511,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
} }
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
}
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET); boolean edgeTriggered = isFlagSet(Native.EPOLLET);
if (!readPending && !edgeTriggered && !config.isAutoRead()) { if (!readPending && !edgeTriggered && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime // ChannelConfig.setAutoRead(false) was called in the meantime
@ -523,65 +528,64 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return; return;
} }
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null; Throwable exception = null;
try { try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
do { do {
ByteBuf data = null; ByteBuf data = null;
try { try {
data = allocHandle.allocate(config.getAllocator()); data = allocHandle.allocate(allocator);
int writerIndex = data.writerIndex(); allocHandle.attemptedBytesRead(data.writableBytes());
DatagramSocketAddress remoteAddress; final DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) { if (data.hasMemoryAddress()) {
// has a memory address so use optimized call // has a memory address so use optimized call
remoteAddress = Native.recvFromAddress( remoteAddress = Native.recvFromAddress(
fd().intValue(), data.memoryAddress(), writerIndex, data.capacity()); fd().intValue(), data.memoryAddress(), data.writerIndex(), data.capacity());
} else { } else {
ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes()); ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
remoteAddress = Native.recvFrom( remoteAddress = Native.recvFrom(
fd().intValue(), nioData, nioData.position(), nioData.limit()); fd().intValue(), nioData, nioData.position(), nioData.limit());
} }
if (remoteAddress == null) { if (remoteAddress == null) {
data.release();
data = null;
break; break;
} }
int readBytes = remoteAddress.receivedAmount; allocHandle.incMessagesRead(1);
data.writerIndex(data.writerIndex() + readBytes); allocHandle.lastBytesRead(remoteAddress.receivedAmount);
allocHandle.record(readBytes); data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false; readPending = false;
readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null; data = null;
} catch (Throwable t) { } catch (Throwable t) {
// We do not break from the loop here and remember the last exception,
// because we need to consume everything from the socket used with epoll ET.
exception = t;
} finally {
if (data != null) { if (data != null) {
data.release(); data.release();
data = null;
} }
if (!edgeTriggered && !config.isAutoRead()) { if (edgeTriggered) {
// This is not using EPOLLET so we can stop reading // We do not break from the loop here and remember the last exception,
// ASAP as we will get notified again later with // because we need to consume everything from the socket used with epoll ET.
// pending data pipeline.fireExceptionCaught(t);
} else {
exception = t;
break; break;
} }
} }
} while (++ messages < maxMessagesPerRead); } while (allocHandle.continueReading());
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i)); pipeline.fireChannelRead(readBuf.get(i));
} }
readBuf.clear(); readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { if (exception != null) {

View File

@ -178,6 +178,7 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
} }
@Override @Override
@Deprecated
public EpollDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel; import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.OneTimeTask;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -140,12 +141,10 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
} }
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
try { try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
do { do {
int socketFd = Native.recvFd(fd().intValue()); int socketFd = Native.recvFd(fd().intValue());
if (socketFd == 0) { if (socketFd == 0) {
@ -155,32 +154,30 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
close(voidPromise()); close(voidPromise());
return; return;
} }
readPending = false;
readPending = false;
allocHandle.incMessagesRead(1);
try { try {
pipeline.fireChannelRead(new FileDescriptor(socketFd)); pipeline.fireChannelRead(new FileDescriptor(socketFd));
} catch (Throwable t) { } catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket // If ET is enabled we need to consume everything from the socket
pipeline.fireChannelReadComplete(); if (edgeTriggered) {
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally { } else {
if (!edgeTriggered && !config.isAutoRead()) { throw t;
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
} }
} while (++ messages < maxMessagesPerRead); } while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} catch (Throwable t) { } catch (Throwable t) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
// trigger a read again as there may be something left to read and because of epoll ET we // trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket // will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() { eventLoop().execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
epollInReady(); epollInReady();

View File

@ -58,7 +58,9 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
return true; return true;
} }
@Override @Override
@Deprecated
public EpollDomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollDomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2015 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
@ -13,9 +13,19 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel.epoll;
/** import io.netty.channel.RecvByteBufAllocator;
* A DNS client that queries a server and checks if query information and
* responses are valid to ensure codec is correct. abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
*/ private final boolean isEdgeTriggered;
package io.netty.handler.codec.dns;
public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
super(handle);
this.isEdgeTriggered = isEdgeTriggered;
}
public final boolean isEdgeTriggered() {
return isEdgeTriggered;
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.RecvByteBufAllocator;
/**
* Respects termination conditions for EPOLL message (aka packet) based protocols.
*/
final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHandle {
public EpollRecvByteAllocatorMessageHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
super(handle, isEdgeTriggered);
}
@Override
public boolean continueReading() {
/**
* If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For
* packet oriented descriptors must read until we get a EAGAIN
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
*/
return isEdgeTriggered() ? true : super.continueReading();
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.RecvByteBufAllocator;
/**
* EPOLL must read until no more data is available while in edge triggered mode. This class will always continue reading
* unless the last read did not fill up the available buffer space.
*/
final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle {
public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
super(handle, isEdgeTriggered);
}
@Override
public boolean continueReading() {
/**
* if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise.
* For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce
* a full buffer (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
*/
return isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading();
}
}

View File

@ -110,6 +110,7 @@ public class EpollServerChannelConfig extends EpollChannelConfig {
} }
@Override @Override
@Deprecated
public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -97,6 +97,7 @@ public final class EpollServerSocketChannelConfig extends EpollServerChannelConf
} }
@Override @Override
@Deprecated
public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -315,6 +315,7 @@ public final class EpollSocketChannelConfig extends EpollChannelConfig implement
} }
@Override @Override
@Deprecated
public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public EpollSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -23,6 +23,7 @@ import java.net.SocketAddress;
* <a href="http://en.wikipedia.org/wiki/Unix_domain_socket">Unix Domain Socket</a>. * <a href="http://en.wikipedia.org/wiki/Unix_domain_socket">Unix Domain Socket</a>.
*/ */
public final class DomainSocketAddress extends SocketAddress { public final class DomainSocketAddress extends SocketAddress {
private static final long serialVersionUID = -6934618000832236893L;
private final String socketPath; private final String socketPath;
public DomainSocketAddress(String socketPath) { public DomainSocketAddress(String socketPath) {

View File

@ -26,6 +26,7 @@ import io.netty.channel.RecvByteBufAllocator;
public interface DomainSocketChannelConfig extends ChannelConfig { public interface DomainSocketChannelConfig extends ChannelConfig {
@Override @Override
@Deprecated
DomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); DomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -23,7 +23,14 @@ import io.netty.channel.RecvByteBufAllocator;
import java.util.Map; import java.util.Map;
import static io.netty.channel.rxtx.RxtxChannelOption.*; import static io.netty.channel.rxtx.RxtxChannelOption.BAUD_RATE;
import static io.netty.channel.rxtx.RxtxChannelOption.DATA_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.DTR;
import static io.netty.channel.rxtx.RxtxChannelOption.PARITY_BIT;
import static io.netty.channel.rxtx.RxtxChannelOption.READ_TIMEOUT;
import static io.netty.channel.rxtx.RxtxChannelOption.RTS;
import static io.netty.channel.rxtx.RxtxChannelOption.STOP_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.WAIT_TIME;
/** /**
* Default configuration class for RXTX device connections. * Default configuration class for RXTX device connections.
@ -205,6 +212,7 @@ final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements Rxt
} }
@Override @Override
@Deprecated
public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -274,6 +274,7 @@ public interface RxtxChannelConfig extends ChannelConfig {
RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.channel.sctp; package io.netty.channel.sctp;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpStandardSocketOptions; import com.sun.nio.sctp.SctpStandardSocketOptions;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
@ -29,8 +28,10 @@ import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.sctp.SctpChannelOption.*; import static io.netty.channel.ChannelOption.SO_SNDBUF;
import static io.netty.channel.sctp.SctpChannelOption.SCTP_INIT_MAXSTREAMS;
import static io.netty.channel.sctp.SctpChannelOption.SCTP_NODELAY;
/** /**
* The default {@link SctpChannelConfig} implementation for SCTP. * The default {@link SctpChannelConfig} implementation for SCTP.
@ -180,6 +181,7 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
} }
@Override @Override
@Deprecated
public SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -157,6 +157,7 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
} }
@Override @Override
@Deprecated
public SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -102,6 +102,7 @@ public interface SctpChannelConfig extends ChannelConfig {
SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -20,8 +20,6 @@ import io.netty.channel.ChannelOption;
import java.net.SocketAddress; import java.net.SocketAddress;
import static io.netty.channel.ChannelOption.*;
/** /**
* Option for configuring the SCTP transport * Option for configuring the SCTP transport
*/ */

View File

@ -94,6 +94,7 @@ public interface SctpServerChannelConfig extends ChannelConfig {
SctpServerChannelConfig setInitMaxStreams(InitMaxStreams initMaxStreams); SctpServerChannelConfig setInitMaxStreams(InitMaxStreams initMaxStreams);
@Override @Override
@Deprecated
SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -274,15 +274,16 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
if (messageInfo == null) { if (messageInfo == null) {
return 0; return 0;
} }
buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.position() - pos)));
allocHandle.lastBytesRead(data.position() - pos);
buf.add(new SctpMessage(messageInfo,
buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
free = false; free = false;
return 1; return 1;
} catch (Throwable cause) { } catch (Throwable cause) {
PlatformDependent.throwException(cause); PlatformDependent.throwException(cause);
return -1; return -1;
} finally { } finally {
int bytesRead = buffer.readableBytes();
allocHandle.record(bytesRead);
if (free) { if (free) {
buffer.release(); buffer.release();
} }

View File

@ -46,7 +46,7 @@ import java.util.Set;
*/ */
public class NioSctpServerChannel extends AbstractNioMessageChannel public class NioSctpServerChannel extends AbstractNioMessageChannel
implements io.netty.channel.sctp.SctpServerChannel { implements io.netty.channel.sctp.SctpServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static SctpServerChannel newSocket() { private static SctpServerChannel newSocket() {
try { try {

View File

@ -19,6 +19,7 @@ import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
@ -184,8 +185,8 @@ public class OioSctpChannel extends AbstractOioMessageChannel
Set<SelectionKey> reableKeys = readSelector.selectedKeys(); Set<SelectionKey> reableKeys = readSelector.selectedKeys();
try { try {
for (SelectionKey ignored : reableKeys) { for (@SuppressWarnings("unused") SelectionKey ignored : reableKeys) {
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf buffer = allocHandle.allocate(config().getAllocator()); ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true; boolean free = true;
@ -197,14 +198,14 @@ public class OioSctpChannel extends AbstractOioMessageChannel
} }
data.flip(); data.flip();
msgs.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining()))); allocHandle.lastBytesRead(data.remaining());
msgs.add(new SctpMessage(messageInfo,
buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
free = false; free = false;
readMessages ++; ++readMessages;
} catch (Throwable cause) { } catch (Throwable cause) {
PlatformDependent.throwException(cause); PlatformDependent.throwException(cause);
} finally { } finally {
int bytesRead = buffer.readableBytes();
allocHandle.record(bytesRead);
if (free) { if (free) {
buffer.release(); buffer.release();
} }

View File

@ -53,7 +53,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSctpServerChannel.class); InternalLoggerFactory.getInstance(OioSctpServerChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static SctpServerChannel newServerSocket() { private static SctpServerChannel newServerSocket() {
try { try {

View File

@ -27,8 +27,14 @@ import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.SO_LINGER;
import static io.netty.channel.udt.UdtChannelOption.*; import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
import static io.netty.channel.ChannelOption.SO_SNDBUF;
import static io.netty.channel.udt.UdtChannelOption.PROTOCOL_RECEIVE_BUFFER_SIZE;
import static io.netty.channel.udt.UdtChannelOption.PROTOCOL_SEND_BUFFER_SIZE;
import static io.netty.channel.udt.UdtChannelOption.SYSTEM_RECEIVE_BUFFER_SIZE;
import static io.netty.channel.udt.UdtChannelOption.SYSTEM_SEND_BUFFER_SIZE;
/** /**
* The default {@link UdtChannelConfig} implementation. * The default {@link UdtChannelConfig} implementation.
@ -241,6 +247,7 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
} }
@Override @Override
@Deprecated
public UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -24,7 +24,7 @@ import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.SO_BACKLOG;
/** /**
* The default {@link UdtServerChannelConfig} implementation. * The default {@link UdtServerChannelConfig} implementation.
@ -143,6 +143,7 @@ public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig
} }
@Override @Override
@Deprecated
public UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -116,6 +116,7 @@ public interface UdtChannelConfig extends ChannelConfig {
UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -18,8 +18,6 @@ package io.netty.channel.udt;
import com.barchart.udt.OptionUDT; import com.barchart.udt.OptionUDT;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import static io.netty.channel.ChannelOption.*;
/** /**
* Options for the UDT transport * Options for the UDT transport
*/ */

View File

@ -50,6 +50,7 @@ public interface UdtServerChannelConfig extends UdtChannelConfig {
UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -43,7 +43,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im
protected static final InternalLogger logger = protected static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioUdtAcceptorChannel.class); InternalLoggerFactory.getInstance(NioUdtAcceptorChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private final UdtServerChannelConfig config; private final UdtServerChannelConfig config;

View File

@ -17,11 +17,13 @@ package io.netty.channel.udt.nio;
import com.barchart.udt.TypeUDT; import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.SocketChannelUDT; import com.barchart.udt.nio.SocketChannelUDT;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.udt.DefaultUdtChannelConfig; import io.netty.channel.udt.DefaultUdtChannelConfig;
import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.UdtChannel;
@ -42,7 +44,7 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioUdtByteConnectorChannel.class); InternalLoggerFactory.getInstance(NioUdtByteConnectorChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private final UdtChannelConfig config; private final UdtChannelConfig config;
@ -136,7 +138,9 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement
@Override @Override
protected int doReadBytes(final ByteBuf byteBuf) throws Exception { protected int doReadBytes(final ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
} }
@Override @Override

View File

@ -29,8 +29,7 @@ import java.net.SocketAddress;
* </ul> * </ul>
*/ */
public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel { public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
/** /**
* Creates a new instance. * Creates a new instance.

View File

@ -15,9 +15,6 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -31,7 +28,7 @@ import java.util.List;
* amount of the allocated buffer two times consecutively. Otherwise, it keeps * amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction. * returning the same prediction.
*/ */
public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_INITIAL = 1024;
@ -84,14 +81,14 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
} }
} }
private static final class HandleImpl implements Handle { private final class HandleImpl extends MaxMessageHandle {
private final int minIndex; private final int minIndex;
private final int maxIndex; private final int maxIndex;
private int index; private int index;
private int nextReceiveBufferSize; private int nextReceiveBufferSize;
private boolean decreaseNow; private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) { public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex; this.minIndex = minIndex;
this.maxIndex = maxIndex; this.maxIndex = maxIndex;
@ -99,18 +96,12 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
nextReceiveBufferSize = SIZE_TABLE[index]; nextReceiveBufferSize = SIZE_TABLE[index];
} }
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(nextReceiveBufferSize);
}
@Override @Override
public int guess() { public int guess() {
return nextReceiveBufferSize; return nextReceiveBufferSize;
} }
@Override private void record(int actualReadBytes) {
public void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) { if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex); index = Math.max(index - INDEX_DECREMENT, minIndex);
@ -125,6 +116,11 @@ public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
decreaseNow = false; decreaseNow = false;
} }
} }
@Override
public void readComplete() {
record(totalBytesRead());
}
} }
private final int minIndex; private final int minIndex;

View File

@ -119,16 +119,22 @@ public interface ChannelConfig {
ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
/** /**
* @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
* <p>
* Returns the maximum number of messages to read per read loop. * Returns the maximum number of messages to read per read loop.
* a {@link ChannelHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. * a {@link ChannelHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
*/ */
@Deprecated
int getMaxMessagesPerRead(); int getMaxMessagesPerRead();
/** /**
* @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
* <p>
* Sets the maximum number of messages to read per read loop. * Sets the maximum number of messages to read per read loop.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
*/ */
@Deprecated
ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
/** /**
@ -165,14 +171,12 @@ public interface ChannelConfig {
ChannelConfig setAllocator(ByteBufAllocator allocator); ChannelConfig setAllocator(ByteBufAllocator allocator);
/** /**
* Returns {@link RecvByteBufAllocator} which is used for the channel * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
* to allocate receive buffers.
*/ */
RecvByteBufAllocator getRecvByteBufAllocator(); <T extends RecvByteBufAllocator> T getRecvByteBufAllocator();
/** /**
* Set the {@link ByteBufAllocator} which is used for the channel * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
* to allocate receive buffers.
*/ */
ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator); ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);

View File

@ -20,7 +20,6 @@ import io.netty.util.internal.InternalThreadLocalMap;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.WeakHashMap;
/** /**
* Skelton implementation of a {@link ChannelHandler}. * Skelton implementation of a {@link ChannelHandler}.

View File

@ -23,6 +23,7 @@ import java.net.SocketAddress;
public final class ChannelMetadata { public final class ChannelMetadata {
private final boolean hasDisconnect; private final boolean hasDisconnect;
private final int minMaxMessagesPerRead;
/** /**
* Create a new instance * Create a new instance
@ -32,7 +33,24 @@ public final class ChannelMetadata {
* again, such as UDP/IP. * again, such as UDP/IP.
*/ */
public ChannelMetadata(boolean hasDisconnect) { public ChannelMetadata(boolean hasDisconnect) {
this(hasDisconnect, 1);
}
/**
* Create a new instance
*
* @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation
* that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
* again, such as UDP/IP.
* @param minMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum
* value enforced for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
*/
public ChannelMetadata(boolean hasDisconnect, int minMaxMessagesPerRead) {
if (minMaxMessagesPerRead <= 0) {
throw new IllegalArgumentException("minMaxMessagesPerRead: " + minMaxMessagesPerRead + " (expected > 0)");
}
this.hasDisconnect = hasDisconnect; this.hasDisconnect = hasDisconnect;
this.minMaxMessagesPerRead = minMaxMessagesPerRead;
} }
/** /**
@ -43,4 +61,12 @@ public final class ChannelMetadata {
public boolean hasDisconnect() { public boolean hasDisconnect() {
return hasDisconnect; return hasDisconnect;
} }
/**
* If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum value enforced for
* {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}.
*/
public int minMaxMessagesPerRead() {
return minMaxMessagesPerRead;
}
} }

View File

@ -76,6 +76,9 @@ public final class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR"); public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS"); public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
/**
* @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
*/
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ"); public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT"); public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK"); public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");

View File

@ -16,16 +16,24 @@
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.AUTO_CLOSE;
import static io.netty.channel.ChannelOption.AUTO_READ;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
/** /**
* The default {@link SocketChannelConfig} implementation. * The default {@link SocketChannelConfig} implementation.
@ -55,7 +63,6 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR; private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int maxMessagesPerRead;
private volatile int writeSpinCount = 16; private volatile int writeSpinCount = 16;
private volatile int autoRead = 1; private volatile int autoRead = 1;
private volatile int writeBufferHighWaterMark = 64 * 1024; private volatile int writeBufferHighWaterMark = 64 * 1024;
@ -66,16 +73,6 @@ public class DefaultChannelConfig implements ChannelConfig {
throw new NullPointerException("channel"); throw new NullPointerException("channel");
} }
this.channel = channel; this.channel = channel;
if (channel instanceof ServerChannel || channel instanceof AbstractNioByteChannel) {
// Server channels: Accept as many incoming connections as possible.
// NIO byte channels: Implemented to reduce unnecessary system calls even if it's > 1.
// See https://github.com/netty/netty/issues/2079
// TODO: Add some property to ChannelMetadata so we can remove the ugly instanceof
maxMessagesPerRead = 16;
} else {
maxMessagesPerRead = 1;
}
} }
@Override @Override
@ -203,18 +200,41 @@ public class DefaultChannelConfig implements ChannelConfig {
return this; return this;
} }
/**
* {@inheritDoc}
* <p>
* @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
* {@link MaxMessagesRecvByteBufAllocator}.
*/
@Override @Override
@Deprecated
public int getMaxMessagesPerRead() { public int getMaxMessagesPerRead() {
return maxMessagesPerRead; try {
MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
return allocator.maxMessagesPerRead();
} catch (ClassCastException e) {
throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
"MaxMessagesRecvByteBufAllocator", e);
}
} }
/**
* {@inheritDoc}
* <p>
* @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
* {@link MaxMessagesRecvByteBufAllocator}.
*/
@Override @Override
@Deprecated
public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
if (maxMessagesPerRead <= 0) { try {
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)"); MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
allocator.maxMessagesPerRead(maxMessagesPerRead);
return this;
} catch (ClassCastException e) {
throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
"MaxMessagesRecvByteBufAllocator", e);
} }
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
} }
@Override @Override
@ -246,16 +266,44 @@ public class DefaultChannelConfig implements ChannelConfig {
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public RecvByteBufAllocator getRecvByteBufAllocator() { public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
return rcvBufAllocator; return (T) rcvBufAllocator;
} }
/**
* {@inheritDoc}
* <p>
* This method enforces the {@link ChannelMetadata#minMaxMessagesPerRead()}. If these predetermined limits
* are not appropriate for your use case consider extending the channel and overriding {@link Channel#metadata()},
* or use {@link #setRecvByteBufAllocator(RecvByteBufAllocator, ChannelMetadata)}.
*/
@Override @Override
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
return setRecvByteBufAllocator(allocator, channel.metadata());
}
/**
* Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
* @param allocator the allocator to set.
* @param metadata Used to determine the {@link ChannelMetadata#minMaxMessagesPerRead()} if {@code allocator}
* is of type {@link MaxMessagesRecvByteBufAllocator}.
* @return this
*/
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
if (allocator == null) { if (allocator == null) {
throw new NullPointerException("allocator"); throw new NullPointerException("allocator");
} }
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
if (metadata == null) {
throw new NullPointerException("metadata");
}
MaxMessagesRecvByteBufAllocator maxMsgAllocator = (MaxMessagesRecvByteBufAllocator) allocator;
if (maxMsgAllocator.maxMessagesPerRead() < metadata.minMaxMessagesPerRead()) {
maxMsgAllocator.maxMessagesPerRead(metadata.minMaxMessagesPerRead());
}
}
rcvBufAllocator = allocator; rcvBufAllocator = allocator;
return this; return this;
} }

View File

@ -0,0 +1,190 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.AbstractMap;
import java.util.Map.Entry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
* The {@link RecvByteBufAllocator} that yields a buffer size prediction based upon decrementing the value from
* the max bytes per read.
*/
public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufAllocator {
private volatile int maxBytesPerRead;
private volatile int maxBytesPerIndividualRead;
private final class HandleImpl implements Handle {
private int individualReadMax;
private int bytesToRead;
private int lastBytesRead;
private int attemptBytesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public int guess() {
return Math.min(individualReadMax, bytesToRead);
}
@Override
public void reset(ChannelConfig config) {
bytesToRead = maxBytesPerRead();
individualReadMax = maxBytesPerIndividualRead();
}
@Override
public void incMessagesRead(int amt) {
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
// Ignore if bytes is negative, the interface contract states it will be detected externally after call.
// The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
bytesToRead -= bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public boolean continueReading() {
// Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided.
return bytesToRead > 0 && attemptBytesRead == lastBytesRead;
}
@Override
public void readComplete() {
}
@Override
public void attemptedBytesRead(int bytes) {
attemptBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptBytesRead;
}
}
public DefaultMaxBytesRecvByteBufAllocator() {
this(64 * 1024, 64 * 1024);
}
public DefaultMaxBytesRecvByteBufAllocator(int maxBytesPerRead, int maxBytesPerIndividualRead) {
checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead);
this.maxBytesPerRead = maxBytesPerRead;
this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
}
@Override
public Handle newHandle() {
return new HandleImpl();
}
@Override
public int maxBytesPerRead() {
return maxBytesPerRead;
}
@Override
public DefaultMaxBytesRecvByteBufAllocator maxBytesPerRead(int maxBytesPerRead) {
if (maxBytesPerRead <= 0) {
throw new IllegalArgumentException("maxBytesPerRead: " + maxBytesPerRead + " (expected: > 0)");
}
// There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
// Write operations must be synchronized, but independent read operations can just be volatile.
synchronized (this) {
final int maxBytesPerIndividualRead = maxBytesPerIndividualRead();
if (maxBytesPerRead < maxBytesPerIndividualRead) {
throw new IllegalArgumentException(
"maxBytesPerRead cannot be less than " +
"maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead);
}
this.maxBytesPerRead = maxBytesPerRead;
}
return this;
}
@Override
public int maxBytesPerIndividualRead() {
return maxBytesPerIndividualRead;
}
@Override
public DefaultMaxBytesRecvByteBufAllocator maxBytesPerIndividualRead(int maxBytesPerIndividualRead) {
if (maxBytesPerIndividualRead <= 0) {
throw new IllegalArgumentException(
"maxBytesPerIndividualRead: " + maxBytesPerIndividualRead + " (expected: > 0)");
}
// There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
// Write operations must be synchronized, but independent read operations can just be volatile.
synchronized (this) {
final int maxBytesPerRead = maxBytesPerRead();
if (maxBytesPerIndividualRead > maxBytesPerRead) {
throw new IllegalArgumentException(
"maxBytesPerIndividualRead cannot be greater than " +
"maxBytesPerRead (" + maxBytesPerRead + "): " + maxBytesPerIndividualRead);
}
this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
}
return this;
}
@Override
public synchronized Entry<Integer, Integer> maxBytesPerReadPair() {
return new AbstractMap.SimpleEntry<Integer, Integer>(maxBytesPerRead, maxBytesPerIndividualRead);
}
private void checkMaxBytesPerReadPair(int maxBytesPerRead, int maxBytesPerIndividualRead) {
if (maxBytesPerRead <= 0) {
throw new IllegalArgumentException("maxBytesPerRead: " + maxBytesPerRead + " (expected: > 0)");
}
if (maxBytesPerIndividualRead <= 0) {
throw new IllegalArgumentException(
"maxBytesPerIndividualRead: " + maxBytesPerIndividualRead + " (expected: > 0)");
}
if (maxBytesPerRead < maxBytesPerIndividualRead) {
throw new IllegalArgumentException(
"maxBytesPerRead cannot be less than " +
"maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead);
}
}
@Override
public DefaultMaxBytesRecvByteBufAllocator maxBytesPerReadPair(int maxBytesPerRead,
int maxBytesPerIndividualRead) {
checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead);
// There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
// Write operations must be synchronized, but independent read operations can just be volatile.
synchronized (this) {
this.maxBytesPerRead = maxBytesPerRead;
this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
}
return this;
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
* Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
* and also prevents overflow.
*/
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
private volatile int maxMessagesPerRead;
public DefaultMaxMessagesRecvByteBufAllocator() {
this(1);
}
public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
maxMessagesPerRead(maxMessagesPerRead);
}
@Override
public int maxMessagesPerRead() {
return maxMessagesPerRead;
}
@Override
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
if (maxMessagesPerRead <= 0) {
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
}
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
}
/**
* Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
*/
public abstract class MaxMessageHandle implements Handle {
private ChannelConfig config;
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
/**
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
*/
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
// Ignore if bytes is negative, the interface contract states it will be detected externally after call.
// The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
totalBytesRead += bytes;
if (totalBytesRead < 0) {
totalBytesRead = Integer.MAX_VALUE;
}
}
@Override
public final int lastBytesRead() {
return lastBytesRead;
}
@Override
public boolean continueReading() {
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
}
@Override
public void readComplete() {
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
protected final int totalBytesRead() {
return totalBytesRead;
}
}
}

View File

@ -15,36 +15,23 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/** /**
* The {@link RecvByteBufAllocator} that always yields the same buffer * The {@link RecvByteBufAllocator} that always yields the same buffer
* size prediction. This predictor ignores the feed back from the I/O thread. * size prediction. This predictor ignores the feed back from the I/O thread.
*/ */
public class FixedRecvByteBufAllocator implements RecvByteBufAllocator { public class FixedRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
private static final class HandleImpl implements Handle {
private final class HandleImpl extends MaxMessageHandle {
private final int bufferSize; private final int bufferSize;
HandleImpl(int bufferSize) { public HandleImpl(int bufferSize) {
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(bufferSize);
}
@Override @Override
public int guess() { public int guess() {
return bufferSize; return bufferSize;
} }
@Override
public void record(int actualReadBytes) { }
} }
private final Handle handle; private final Handle handle;

View File

@ -0,0 +1,65 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.Map.Entry;
/**
* {@link RecvByteBufAllocator} that limits a read operation based upon a maximum value per individual read
* and a maximum amount when a read operation is attempted by the event loop.
*/
public interface MaxBytesRecvByteBufAllocator extends RecvByteBufAllocator {
/**
* Returns the maximum number of bytes to read per read loop.
* a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
*/
int maxBytesPerRead();
/**
* Sets the maximum number of bytes to read per read loop.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
*/
MaxBytesRecvByteBufAllocator maxBytesPerRead(int maxBytesPerRead);
/**
* Returns the maximum number of bytes to read per individual read operation.
* a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
*/
int maxBytesPerIndividualRead();
/**
* Sets the maximum number of bytes to read per individual read operation.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
*/
MaxBytesRecvByteBufAllocator maxBytesPerIndividualRead(int maxBytesPerIndividualRead);
/**
* Atomic way to get the maximum number of bytes to read for a read loop and per individual read operation.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
* @return The Key is from {@link #maxBytesPerRead()}. The Value is from {@link #maxBytesPerIndividualRead()}
*/
Entry<Integer, Integer> maxBytesPerReadPair();
/**
* Sets the maximum number of bytes to read for a read loop and per individual read operation.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure bytes.
* @param maxBytesPerRead {@see #setMaxBytesPerRead(int)}
* @param maxBytesPerIndividualRead {@see #setMaxBytesPerIndividualRead(int)}
*/
MaxBytesRecvByteBufAllocator maxBytesPerReadPair(int maxBytesPerRead, int maxBytesPerIndividualRead);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* {@link RecvByteBufAllocator} that limits the number of read operations that will be attempted when a read operation
* is attempted by the event loop.
*/
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
/**
* Returns the maximum number of messages to read per read loop.
* a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
*/
int maxMessagesPerRead();
/**
* Sets the maximum number of messages to read per read loop.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
*/
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** /**
* Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
@ -44,11 +45,122 @@ public interface RecvByteBufAllocator {
int guess(); int guess();
/** /**
* Records the the actual number of read bytes in the previous read operation so that the allocator allocates * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
* the buffer with potentially more correct capacity. * read loop.
* * <p>
* @param actualReadBytes the actual number of read bytes in the previous read operation * This may be used by {@link #continueReading()} to determine if the read operation should complete.
* </p>
* This is only ever a hint and may be ignored by the implementation.
* @param config The channel configuration which may impact this object's behavior.
*/ */
void record(int actualReadBytes); void reset(ChannelConfig config);
/**
* Increment the number of messages that have been read for the current read loop.
* @param numMessages The amount to increment by.
*/
void incMessagesRead(int numMessages);
/**
* Set the bytes that have been read for the last read operation.
* This may be used to increment the number of bytes that have been read.
* @param bytes The number of bytes from the previous read operation. This may be negative if an read error
* occurs. If a negative value is seen it is expected to be return on the next call to
* {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
* to this class and is not required to be enforced in {@link #continueReading()}.
*/
void lastBytesRead(int bytes);
/**
* Get the amount of bytes for the previous read operation.
* @return The amount of bytes for the previous read operation.
*/
int lastBytesRead();
/**
* Set how many bytes the read operation will (or did) attempt to read.
* @param bytes How many bytes the read operation will (or did) attempt to read.
*/
void attemptedBytesRead(int bytes);
/**
* Get how many bytes the read operation will (or did) attempt to read.
* @return How many bytes the read operation will (or did) attempt to read.
*/
int attemptedBytesRead();
/**
* Determine if the current read loop should should continue.
* @param totalMessages The total number of messages read so far by this read loop.
* @param lastReadBytes The number of bytes read from the previous read operation.
* @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
*/
boolean continueReading();
/**
* The read has completed.
*/
void readComplete();
}
/**
* A {@link Handle} which delegates all call to some other {@link Handle}.
*/
class DelegatingHandle implements Handle {
private final Handle delegate;
public DelegatingHandle(Handle delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return delegate.allocate(alloc);
}
@Override
public int guess() {
return delegate.guess();
}
@Override
public void reset(ChannelConfig config) {
delegate.reset(config);
}
@Override
public void incMessagesRead(int numMessages) {
delegate.incMessagesRead(numMessages);
}
@Override
public void lastBytesRead(int bytes) {
delegate.lastBytesRead(bytes);
}
@Override
public int lastBytesRead() {
return delegate.lastBytesRead();
}
@Override
public boolean continueReading() {
return delegate.continueReading();
}
@Override
public int attemptedBytesRead() {
return delegate.attemptedBytesRead();
}
@Override
public void attemptedBytesRead(int bytes) {
delegate.attemptedBytesRead(bytes);
}
@Override
public void readComplete() {
delegate.readComplete();
}
} }
} }

View File

@ -282,6 +282,8 @@ public class LocalChannel extends AbstractChannel {
throw new NotYetConnectedException(); throw new NotYetConnectedException();
case CLOSED: case CLOSED:
throw new ClosedChannelException(); throw new ClosedChannelException();
case CONNECTED:
break;
} }
final LocalChannel peer = this.peer; final LocalChannel peer = this.peer;

View File

@ -72,8 +72,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} }
private void handleReadException(ChannelPipeline pipeline, private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
ByteBuf byteBuf, Throwable cause, boolean close) { RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {
setReadPending(false); setReadPending(false);
@ -82,6 +82,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
byteBuf.release(); byteBuf.release();
} }
} }
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
@ -100,62 +101,39 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config);
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try { try {
int totalReadAmount = 0; boolean needReadPendingReset = true;
boolean readPendingReset = false;
do { do {
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes(); allocHandle.lastBytesRead(doReadBytes(byteBuf));
int localReadAmount = doReadBytes(byteBuf); if (allocHandle.lastBytesRead() <= 0) {
if (localReadAmount <= 0) { // nothing was read. release the buffer.
// not was read release the buffer
byteBuf.release(); byteBuf.release();
byteBuf = null; byteBuf = null;
close = localReadAmount < 0;
break; break;
} }
if (!readPendingReset) {
readPendingReset = true; allocHandle.incMessagesRead(1);
if (needReadPendingReset) {
needReadPendingReset = false;
setReadPending(false); setReadPending(false);
} }
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
} while (allocHandle.continueReading());
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { allocHandle.readComplete();
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) { if (allocHandle.lastBytesRead() < 0) {
closeOnRead(pipeline); closeOnRead(pipeline);
close = false;
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close); handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.
// This could be for two reasons: // This could be for two reasons:

View File

@ -19,6 +19,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import java.io.IOException; import java.io.IOException;
@ -59,13 +60,16 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
return; return;
} }
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false; boolean closed = false;
Throwable exception = null; Throwable exception = null;
try { try {
try { try {
for (;;) { boolean needReadPendingReset = true;
do {
int localRead = doReadMessages(readBuf); int localRead = doReadMessages(readBuf);
if (localRead == 0) { if (localRead == 0) {
break; break;
@ -75,25 +79,22 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
break; break;
} }
// stop reading and remove op allocHandle.incMessagesRead(localRead);
if (!config.isAutoRead()) { if (needReadPendingReset) {
break; needReadPendingReset = false;
setReadPending(false);
} }
} while (allocHandle.continueReading());
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
} }
setReadPending(false);
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i)); pipeline.fireChannelRead(readBuf.get(i));
} }
readBuf.clear(); readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { if (exception != null) {
@ -107,6 +108,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
if (closed) { if (closed) {
setInputShutdown();
if (isOpen()) { if (isOpen()) {
close(voidPromise()); close(voidPromise());
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel.oio; package io.netty.channel.oio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
@ -73,47 +74,83 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
return false; return false;
} }
void setInputShutdown() {
inputShutdown = true;
}
private void closeOnRead(ChannelPipeline pipeline) {
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(unsafe().voidPromise());
}
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
setReadPending(false);
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
@Override @Override
protected void doRead() { protected void doRead() {
if (checkInputShutdown()) { final ChannelConfig config = config();
if (isInputShutdown() || !config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
return; return;
} }
final ChannelConfig config = config(); // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run.
setReadPending(false);
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); ByteBuf byteBuf = null;
ByteBuf byteBuf = allocHandle.allocate(alloc());
boolean closed = false;
boolean read = false; boolean read = false;
Throwable exception = null;
int localReadAmount = 0;
try { try {
int totalReadAmount = 0; byteBuf = allocHandle.allocate(allocator);
do {
for (;;) { allocHandle.lastBytesRead(doReadBytes(byteBuf));
localReadAmount = doReadBytes(byteBuf); if (allocHandle.lastBytesRead() <= 0) {
if (localReadAmount > 0) { if (!read) { // nothing was read. release the buffer.
read = true; byteBuf.release();
} else if (localReadAmount < 0) { byteBuf = null;
closed = true; }
break;
} }
read = true;
final int available = available(); final int available = available();
if (available <= 0) { if (available <= 0) {
break; break;
} }
// Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
if (!byteBuf.isWritable()) { if (!byteBuf.isWritable()) {
final int capacity = byteBuf.capacity(); final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity(); final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) { if (capacity == maxCapacity) {
if (read) { allocHandle.incMessagesRead(1);
read = false; read = false;
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = alloc().buffer(); byteBuf = allocHandle.allocate(allocator);
}
} else { } else {
final int writerIndex = byteBuf.writerIndex(); final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) { if (writerIndex + available > maxCapacity) {
@ -123,55 +160,23 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} }
} }
} }
} while (allocHandle.continueReading());
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (!config.isAutoRead()) {
// stop reading until next Channel.read() call
// See https://github.com/netty/netty/issues/1363
break;
}
}
allocHandle.record(totalReadAmount);
} catch (Throwable t) {
exception = t;
} finally {
if (read) { if (read) {
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
} else { byteBuf = null;
// nothing read into the buffer so release it
byteBuf.release();
} }
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
closed = true;
pipeline().fireExceptionCaught(exception);
} else {
pipeline.fireExceptionCaught(exception);
unsafe().close(voidPromise());
}
}
if (closed) { if (allocHandle.lastBytesRead() < 0) {
inputShutdown = true; closeOnRead(pipeline);
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(unsafe().voidPromise());
}
}
} }
if (localReadAmount == 0 && isActive()) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);
} finally {
if (allocHandle.lastBytesRead() == 0 && isActive()) {
// If the read amount was 0 and the channel is still active we need to trigger a new read() // If the read amount was 0 and the channel is still active we need to trigger a new read()
// as otherwise we will never try to read again and the user will never know. // as otherwise we will never try to read again and the user will never know.
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are

View File

@ -35,12 +35,6 @@ public abstract class AbstractOioChannel extends AbstractChannel {
private final Runnable readTask = new Runnable() { private final Runnable readTask = new Runnable() {
@Override @Override
public void run() { public void run() {
if (!isReadPending() && !config().isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime so just return
return;
}
setReadPending(false);
doRead(); doRead();
} }
}; };

View File

@ -18,6 +18,7 @@ package io.netty.channel.oio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -37,17 +38,23 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
@Override @Override
protected void doRead() { protected void doRead() {
final ChannelConfig config = config(); final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); if (!config.isAutoRead() && !isReadPending()) {
boolean closed = false; // ChannelConfig.setAutoRead(false) was called in the meantime
final int maxMessagesPerRead = config.getMaxMessagesPerRead(); return;
}
// OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run.
setReadPending(false);
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null; Throwable exception = null;
int localRead = 0;
int totalRead = 0;
try { try {
for (;;) { do {
// Perform a read. // Perform a read.
localRead = doReadMessages(readBuf); int localRead = doReadMessages(readBuf);
if (localRead == 0) { if (localRead == 0) {
break; break;
} }
@ -56,24 +63,18 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
break; break;
} }
// Notify with the received messages and clear the buffer. allocHandle.incMessagesRead(localRead);
int size = readBuf.size(); } while (allocHandle.continueReading());
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// Do not read beyond maxMessagesPerRead.
// Do not continue reading if autoRead has been turned off.
totalRead += localRead;
if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
break;
}
}
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
} }
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { if (exception != null) {
@ -81,14 +82,14 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
closed = true; closed = true;
} }
pipeline().fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
} }
if (closed) { if (closed) {
if (isOpen()) { if (isOpen()) {
unsafe().close(unsafe().voidPromise()); unsafe().close(unsafe().voidPromise());
} }
} else if (localRead == 0 && isActive()) { } else if (allocHandle.lastBytesRead() == 0 && isActive()) {
// If the read amount was 0 and the channel is still active we need to trigger a new read() // If the read amount was 0 and the channel is still active we need to trigger a new read()
// as otherwise we will never try to read again and the user will never know. // as otherwise we will never try to read again and the user will never know.
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are

View File

@ -18,6 +18,7 @@ package io.netty.channel.oio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -103,8 +104,9 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
@Override @Override
protected int doReadBytes(ByteBuf buf) throws Exception { protected int doReadBytes(ByteBuf buf) throws Exception {
int length = Math.max(1, Math.min(available(), buf.maxWritableBytes())); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
return buf.writeBytes(is, length); allocHandle.attemptedBytesRead(Math.max(1, Math.min(available(), buf.maxWritableBytes())));
return buf.writeBytes(is, allocHandle.attemptedBytesRead());
} }
@Override @Override

View File

@ -20,7 +20,6 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
/** /**
* Allows to acquire and release {@link Channel} and so act as a pool of these. * Allows to acquire and release {@link Channel} and so act as a pool of these.

View File

@ -157,6 +157,7 @@ public interface DatagramChannelConfig extends ChannelConfig {
DatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface); DatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface);
@Override @Override
@Deprecated
DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -373,6 +373,7 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
} }
@Override @Override
@Deprecated
public DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -27,7 +27,9 @@ import java.net.ServerSocket;
import java.net.SocketException; import java.net.SocketException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
/** /**
* The default {@link ServerSocketChannelConfig} implementation. * The default {@link ServerSocketChannelConfig} implementation.
@ -152,6 +154,7 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
} }
@Override @Override
@Deprecated
public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -286,6 +286,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
} }
@Override @Override
@Deprecated
public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -88,6 +88,7 @@ public interface ServerSocketChannelConfig extends ChannelConfig {
ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -161,6 +161,7 @@ public interface SocketChannelConfig extends ChannelConfig {
SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -232,6 +232,7 @@ public final class NioDatagramChannel
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = allocHandle.allocate(config.getAllocator()); ByteBuf data = allocHandle.allocate(config.getAllocator());
allocHandle.attemptedBytesRead(data.writableBytes());
boolean free = true; boolean free = true;
try { try {
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
@ -241,11 +242,9 @@ public final class NioDatagramChannel
return 0; return 0;
} }
int readBytes = nioData.position() - pos; allocHandle.lastBytesRead(nioData.position() - pos);
data.writerIndex(data.writerIndex() + readBytes); buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
allocHandle.record(readBytes); localAddress(), remoteAddress));
buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
free = false; free = false;
return 1; return 1;
} catch (Throwable cause) { } catch (Throwable cause) {

View File

@ -41,7 +41,7 @@ import java.util.List;
public class NioServerSocketChannel extends AbstractNioMessageChannel public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel { implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

View File

@ -24,6 +24,7 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
@ -46,7 +47,7 @@ import java.util.concurrent.Executor;
*/ */
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static SocketChannel newSocket(SelectorProvider provider) { private static SocketChannel newSocket(SelectorProvider provider) {
@ -238,7 +239,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception { protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
} }
@Override @Override

View File

@ -121,6 +121,7 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan
} }
@Override @Override
@Deprecated
public OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -149,6 +149,7 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im
} }
@Override @Override
@Deprecated
public OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { public OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead); super.setMaxMessagesPerRead(maxMessagesPerRead);
return this; return this;

View File

@ -200,7 +200,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
@Override @Override
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess()); ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
boolean free = true; boolean free = true;
@ -210,9 +210,8 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress(); InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
int readBytes = tmpPacket.getLength(); allocHandle.lastBytesRead(tmpPacket.getLength());
allocHandle.record(readBytes); buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
buf.add(new DatagramPacket(data.writerIndex(readBytes), localAddress(), remoteAddr));
free = false; free = false;
return 1; return 1;
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {

View File

@ -44,7 +44,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketChannel.class); InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static ServerSocket newServerSocket() { private static ServerSocket newServerSocket() {
try { try {

View File

@ -67,6 +67,7 @@ public interface OioServerSocketChannelConfig extends ServerSocketChannelConfig
OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override

View File

@ -82,6 +82,7 @@ public interface OioSocketChannelConfig extends SocketChannelConfig {
OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override @Override
@Deprecated
OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override @Override