Allow to obtain RecvByteBufAllocator.Handle to allow more flexible implementations

Motivation:

At the moment it's only possible for a user to set the RecvByteBufAllocator for a Channel but not access the Handle once it is assigned. This makes it hard to write more flexible implementations.

Modifications:

Add a new method to the Channel.Unsafe to allow access the the used Handle for the Channel. The RecvByteBufAllocator.Handle is created lazily.

Result:

It's possible to write more flexible implementatons that allow to adjust stuff on the fly for a Handle that is used by a Channel
This commit is contained in:
Norman Maurer 2014-08-06 14:38:23 +02:00
parent 7764b0e3de
commit c5b6808645
10 changed files with 24 additions and 42 deletions

View File

@ -386,7 +386,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override @Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
@ -419,10 +418,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
void epollInReady() { void epollInReady() {
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();

View File

@ -693,10 +693,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final ChannelConfig config = config(); final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;

View File

@ -68,8 +68,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
private final NotificationHandler<?> notificationHandler; private final NotificationHandler<?> notificationHandler;
private RecvByteBufAllocator.Handle allocHandle;
private static SctpChannel newSctpChannel() { private static SctpChannel newSctpChannel() {
try { try {
return SctpChannel.open(); return SctpChannel.open();
@ -265,10 +263,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
SctpChannel ch = javaChannel(); SctpChannel ch = javaChannel();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
}
ByteBuf buffer = allocHandle.allocate(config().getAllocator()); ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true; boolean free = true;
try { try {

View File

@ -76,8 +76,6 @@ public class OioSctpChannel extends AbstractOioMessageChannel
private final NotificationHandler<?> notificationHandler; private final NotificationHandler<?> notificationHandler;
private RecvByteBufAllocator.Handle allocHandle;
private static SctpChannel openChannel() { private static SctpChannel openChannel() {
try { try {
return SctpChannel.open(); return SctpChannel.open();
@ -187,10 +185,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
Set<SelectionKey> reableKeys = readSelector.selectedKeys(); Set<SelectionKey> reableKeys = readSelector.selectedKeys();
try { try {
for (SelectionKey ignored : reableKeys) { for (SelectionKey ignored : reableKeys) {
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
}
ByteBuf buffer = allocHandle.allocate(config().getAllocator()); ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true; boolean free = true;

View File

@ -419,8 +419,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract class AbstractUnsafe implements Unsafe { protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;
private boolean inFlush0; private boolean inFlush0;
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
@Override @Override
public final ChannelHandlerInvoker invoker() { public final ChannelHandlerInvoker invoker() {
// return the unwrapped invoker. // return the unwrapped invoker.

View File

@ -464,6 +464,12 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
*/ */
interface Unsafe { interface Unsafe {
/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/** /**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user. * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/ */

View File

@ -58,7 +58,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
private final class NioByteUnsafe extends AbstractNioUnsafe { private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
private void closeOnRead(ChannelPipeline pipeline) { private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey(); SelectionKey key = selectionKey();
@ -102,10 +101,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
int messages = 0; int messages = 0;

View File

@ -39,7 +39,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')'; StringUtil.simpleClassName(FileRegion.class) + ')';
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown; private volatile boolean inputShutdown;
/** /**
@ -82,10 +81,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
final ChannelConfig config = config(); final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = allocHandle.allocate(alloc()); ByteBuf byteBuf = allocHandle.allocate(alloc());

View File

@ -72,7 +72,6 @@ public final class NioDatagramChannel
private final DatagramChannelConfig config; private final DatagramChannelConfig config;
private Map<InetAddress, List<MembershipKey>> memberships; private Map<InetAddress, List<MembershipKey>> memberships;
private RecvByteBufAllocator.Handle allocHandle;
private static DatagramChannel newSocket(SelectorProvider provider) { private static DatagramChannel newSocket(SelectorProvider provider) {
try { try {
@ -230,10 +229,8 @@ public final class NioDatagramChannel
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel(); DatagramChannel ch = javaChannel();
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf data = allocHandle.allocate(config.getAllocator()); ByteBuf data = allocHandle.allocate(config.getAllocator());
boolean free = true; boolean free = true;
try { try {

View File

@ -71,8 +71,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private final DatagramChannelConfig config; private final DatagramChannelConfig config;
private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0); private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
private RecvByteBufAllocator.Handle allocHandle;
private static MulticastSocket newSocket() { private static MulticastSocket newSocket() {
try { try {
return new MulticastSocket(null); return new MulticastSocket(null);
@ -202,10 +200,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
@Override @Override
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess()); ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
boolean free = true; boolean free = true;