Refactoring in preparation to unify I/O logic for all branches

Motivation:

While trying to merge our ChannelOutboundBuffer changes we've made last
week, I realized that we have quite a bit of conflicting changes at 4.1
and master. It was primarily because we added
ChannelOutboundBuffer.beforeAdd() and moved some logic there, such as
direct buffer conversion.

However, this is not possible with the changes we've made for 4.0. We
made ChannelOutboundBuffer final for example.

Maintaining multiple branch is already getting painful and having
different core will make it even worse, so I think we should keep the
differences between 4.0 and other branches minimal.

Modifications:

- Move ChannelOutboundBuffer.safeRelease() to ReferenceCountUtil
- Add ByteBufUtil.threadLocalBuffer()
  - Backported from ThreadLocalPooledDirectByteBuf
- Make most methods in AbstractUnsafe final
  - Add AbstractChannel.filterOutboundMessage() so that a transport can
    convert a message to another (e.g. heap -> off-heap), and also
    reject unsupported messages
  - Move all direct buffer conversions to filterOutboundMessage()
  - Move all type checks to filterOutboundMessage()
- Move AbstractChannel.checkEOF() to OioByteStreamChannel, because it's
  the only place it is used at all
- Remove ChannelOutboundBuffer.current(Object), because it's not used
  anymore
- Add protected direct buffer conversion methods to AbstractNioChannel
  and AbstractEpollChannel so that they can be used by their subtypes
- Update all transport implementations according to the changes above

Result:

- The missing extension point in 4.0 has been added.
  - AbstractChannel.filterOutboundMessage()
  - Thanks to the new extension point, we moved all transport-specific
    logic from ChannelOutboundBuffer to each transport implementation
- We can copy most of the transport implementations in 4.0 to 4.1 and
  master now, so that we have much less merge conflict when we modify
  the core.
This commit is contained in:
Trustin Lee 2014-08-04 15:41:59 -07:00 committed by Norman Maurer
parent b175b3d8be
commit fb5583d788
23 changed files with 604 additions and 366 deletions

View File

@ -16,6 +16,9 @@
package io.netty.buffer;
import io.netty.util.CharsetUtil;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -41,6 +44,8 @@ public final class ByteBufUtil {
static final ByteBufAllocator DEFAULT_ALLOCATOR;
private static final int THREAD_LOCAL_BUFFER_SIZE;
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i ++) {
@ -62,6 +67,9 @@ public final class ByteBufUtil {
}
DEFAULT_ALLOCATOR = alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
}
/**
@ -378,5 +386,89 @@ public final class ByteBufUtil {
return dst.flip().toString();
}
/**
* Returns a cached thread-local direct buffer, if available.
*
* @return a cached thread-local direct buffer, if available. {@code null} otherwise.
*/
public static ByteBuf threadLocalDirectBuffer() {
if (THREAD_LOCAL_BUFFER_SIZE <= 0) {
return null;
}
if (PlatformDependent.hasUnsafe()) {
return ThreadLocalUnsafeDirectByteBuf.newInstance();
} else {
return ThreadLocalDirectByteBuf.newInstance();
}
}
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};
static ThreadLocalUnsafeDirectByteBuf newInstance() {
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalUnsafeDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
@Override
protected ThreadLocalDirectByteBuf newObject(Handle handle) {
return new ThreadLocalDirectByteBuf(handle);
}
};
static ThreadLocalDirectByteBuf newInstance() {
ThreadLocalDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
private ByteBufUtil() { }
}

View File

@ -62,7 +62,7 @@ public final class ReferenceCountUtil {
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg, int decrement) {
@ -72,6 +72,38 @@ public final class ReferenceCountUtil {
return false;
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release()}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use {@link #release(Object)}
* instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg) {
try {
release(msg);
} catch (Throwable t) {
logger.warn("Failed to release a message: {}", msg, t);
}
}
/**
* Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release(int)}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use
* {@link #release(Object, int)} instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg, int decrement) {
try {
release(msg, decrement);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to release a message: {} (decrement: {})", msg, decrement, t);
}
}
}
/**
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the

View File

@ -15,10 +15,15 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress;
@ -98,6 +103,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true;
if ((flags & readFlag) == 0) {
flags |= readFlag;
modifyEvents();
@ -159,6 +167,47 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
protected abstract AbstractEpollUnsafe newUnsafe();
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf);
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method.
*/
protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf == null) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
final ByteBuf directBuf = alloc.directBuffer(capacity);
directBuf.writeBytes(buf, buf.readerIndex(), capacity);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
protected static void checkResolvable(InetSocketAddress addr) {
if (addr.isUnresolved()) {
throw new UnresolvedAddressException();
@ -180,13 +229,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// NOOP
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.

View File

@ -16,13 +16,14 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -44,6 +45,12 @@ import java.nio.channels.NotYetConnectedException;
*/
public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
@ -282,27 +289,20 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
private boolean doWriteMessage(Object msg) throws IOException {
final Object m;
final ByteBuf data;
InetSocketAddress remoteAddress;
ByteBuf data;
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
remoteAddress = packet.recipient();
m = packet.content();
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
(AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
data = envelope.content();
remoteAddress = envelope.recipient();
} else {
m = msg;
data = (ByteBuf) msg;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
final int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
@ -324,9 +324,57 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
return writtenBytes > 0;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
ByteBuf content = (ByteBuf) e.content();
if (content.hasMemoryAddress()) {
return e;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public EpollDatagramChannelConfig config() {
return config;
@ -428,42 +476,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
}
}
@Override
public void write(Object msg, ChannelPromise promise) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (isCopyNeeded(content)) {
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
int readable = content.readableBytes();
ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(content, content.readerIndex(), readable);
content.release();
msg = new DatagramPacket(dst, packet.recipient(), packet.sender());
}
} else if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isCopyNeeded(buf)) {
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
int readable = buf.readableBytes();
ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(buf, buf.readerIndex(), readable);
buf.release();
msg = dst;
}
}
super.write(msg, promise);
}
private boolean isCopyNeeded(ByteBuf content) {
return !content.hasMemoryAddress() || content.nioBufferCount() != 1;
}
}
/**

View File

@ -74,7 +74,12 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
}
@Override
protected void doWrite(ChannelOutboundBuffer in) {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}

View File

@ -49,6 +49,10 @@ import java.util.concurrent.TimeUnit;
*/
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private final EpollSocketChannelConfig config;
/**
@ -339,8 +343,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
return false;
}
} else {
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg));
// Should never reach here.
throw new Error();
}
return true;
@ -379,6 +383,27 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
return true;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
return buf;
}
if (msg instanceof DefaultFileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public EpollSocketChannelConfig config() {
return config;
@ -429,24 +454,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final class EpollSocketUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override
public void write(Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (PlatformDependent.hasUnsafe() && !buf.hasMemoryAddress()) {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
int readable = buf.readableBytes();
ByteBuf dst = alloc().directBuffer(readable);
dst.writeBytes(buf, buf.readerIndex(), readable);
buf.release();
msg = dst;
assert dst.hasMemoryAddress();
}
}
super.write(msg, promise);
}
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {

View File

@ -35,6 +35,7 @@ import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -320,22 +321,25 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
boolean done = false;
try {
final int writtenBytes = javaChannel().send(nioData, mi);
done = writtenBytes > 0;
return done;
} finally {
// Handle this in the finally block to make sure we release the old buffer in all cases
// See https://github.com/netty/netty/issues/2644
if (needsCopy) {
if (!done) {
in.current(new SctpMessage(mi, data));
} else {
in.current(data);
}
final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0;
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
SctpMessage m = (SctpMessage) msg;
ByteBuf buf = m.content();
if (buf.isDirect() && buf.nioBufferCount() == 1) {
return m;
}
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf));
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) +
" (expected: " + StringUtil.simpleClassName(SctpMessage.class));
}
@Override

View File

@ -221,6 +221,11 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel);

View File

@ -34,6 +34,7 @@ import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -64,6 +65,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
InternalLoggerFactory.getInstance(OioSctpChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
private final SctpChannel ch;
private final SctpChannelConfig config;
@ -273,6 +275,16 @@ public class OioSctpChannel extends AbstractOioMessageChannel
}
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
}
@Override
public Association association() {
try {

View File

@ -290,6 +290,11 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel);

View File

@ -98,6 +98,11 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean isActive() {
return javaChannel().socket().isBound();

View File

@ -27,6 +27,7 @@ import io.netty.channel.udt.DefaultUdtChannelConfig;
import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.UdtChannelConfig;
import io.netty.channel.udt.UdtMessage;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -47,6 +48,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
InternalLoggerFactory.getInstance(NioUdtMessageConnectorChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(UdtMessage.class) + ')';
private final UdtChannelConfig config;
@ -196,6 +198,16 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp
return true;
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof UdtMessage) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
}
@Override
public boolean isActive() {
final SocketChannelUDT channelUDT = javaChannel();

View File

@ -25,7 +25,6 @@ import io.netty.util.internal.ThreadLocalRandom;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -374,7 +373,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private boolean inFlush0;
@Override
@ -618,7 +617,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void beginRead() {
public final void beginRead() {
if (!isActive()) {
return;
}
@ -637,7 +636,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void write(Object msg, ChannelPromise promise) {
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
@ -649,15 +648,25 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
ReferenceCountUtil.release(msg);
return;
}
int size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
@Override
public void flush() {
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
@ -707,7 +716,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelPromise voidPromise() {
public final ChannelPromise voidPromise() {
return unsafeVoidPromise;
}
@ -823,12 +832,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
protected static void checkEOF(FileRegion region) throws IOException {
if (region.transfered() < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ region.transfered());
}
/**
* Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
* the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
*/
protected Object filterOutboundMessage(Object msg) throws Exception {
return msg;
}
static final class CloseFuture extends DefaultChannelPromise {

View File

@ -15,7 +15,7 @@
*/
package io.netty.channel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import java.net.SocketAddress;
@ -71,24 +71,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) {
throw new UnsupportedOperationException();
}
private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void write(Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
reject(promise);
}
@Override
public void flush() {
// ignore
}
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
reject(promise);
}
private void reject(ChannelPromise promise) {
safeSetFailure(promise, new UnsupportedOperationException());
}
}

View File

@ -16,11 +16,8 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@ -28,7 +25,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -47,15 +43,6 @@ public final class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final int threadLocalDirectBufferSize =
SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}
}
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() throws Exception {
@ -215,61 +202,12 @@ public final class ChannelOutboundBuffer {
* Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
*/
public Object current() {
return current(true);
}
/**
* Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
* If {@code true} is specified a direct {@link ByteBuf} or {@link ByteBufHolder} is prefered and
* so the current message may be copied into a direct buffer.
*/
public Object current(boolean preferDirect) {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
Object msg = entry.msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg;
}
/**
* Replace the current msg with the given one.
* {@link ReferenceCountUtil#release(Object)} will automatically be called on the replaced message.
*/
public void current(Object msg) {
Entry entry = flushedEntry;
assert entry != null;
safeRelease(entry.msg);
entry.msg = msg;
return entry.msg;
}
/**
@ -305,7 +243,7 @@ public final class ChannelOutboundBuffer {
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size);
}
@ -335,7 +273,7 @@ public final class ChannelOutboundBuffer {
if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before.
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size);
@ -403,7 +341,6 @@ public final class ChannelOutboundBuffer {
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final ByteBufAllocator alloc = channel.alloc();
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
@ -425,27 +362,22 @@ public final class ChannelOutboundBuffer {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
readableBytes, alloc, nioBuffers, nioBufferCount);
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
@ -467,24 +399,6 @@ public final class ChannelOutboundBuffer {
return nioBufferCount;
}
private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release();
entry.msg = directBuf;
// cache ByteBuffer
ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes);
entry.count = 1;
nioBuffers[nioBufferCount ++] = nioBuf;
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
@ -593,7 +507,7 @@ public final class ChannelOutboundBuffer {
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (!e.cancelled) {
safeRelease(e.msg);
ReferenceCountUtil.safeRelease(e.msg);
safeFail(e.promise, cause);
}
e = e.recycleAndGetNext();
@ -603,14 +517,6 @@ public final class ChannelOutboundBuffer {
}
}
private static void safeRelease(Object message) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable t) {
logger.warn("Failed to release a message.", t);
}
}
private static void safeSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
@ -708,7 +614,7 @@ public final class ChannelOutboundBuffer {
int pSize = pendingSize;
// release message and replace with an empty buffer
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
msg = Unpooled.EMPTY_BUFFER;
pendingSize = 0;
@ -741,36 +647,4 @@ public final class ChannelOutboundBuffer {
return next;
}
}
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle handle;
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
@Override
protected ThreadLocalPooledByteBuf newObject(Handle handle) {
return new ThreadLocalPooledByteBuf(handle);
}
};
private ThreadLocalPooledByteBuf(Recycler.Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
static ThreadLocalPooledByteBuf newInstance() {
ThreadLocalPooledByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
}

View File

@ -35,6 +35,11 @@ import java.nio.channels.SelectionKey;
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
*/
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private Runnable flushTask;
/**
@ -188,17 +193,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
continue;
}
if (!buf.isDirect()) {
ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we rely on JDK's direct buffer pool.
buf = alloc.directBuffer(readableBytes).writeBytes(buf);
in.current(buf);
}
}
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
@ -258,11 +252,31 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break;
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
// Should not reach here.
throw new Error();
}
}
}
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {

View File

@ -15,6 +15,10 @@
*/
package io.netty.channel.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -23,6 +27,8 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -174,19 +180,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
public SelectableChannel ch() {
public final SelectableChannel ch() {
return javaChannel();
}
@Override
public void connect(
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
@ -277,7 +276,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void finishConnect() {
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
@ -306,7 +305,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
protected void flush0() {
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
@ -317,7 +316,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void forceFlush() {
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
@ -362,6 +361,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
@ -371,6 +371,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
@ -386,4 +388,73 @@ public abstract class AbstractNioChannel extends AbstractChannel {
* Finish the connect
*/
protected abstract void doFinishConnect() throws Exception;
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
* Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
* but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is
* too high, but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
if (holder != buf) {
// Ensure to call holder.release() to give the holder a chance to release other resources than its content.
buf.retain();
ReferenceCountUtil.safeRelease(holder);
}
return buf;
}
}

View File

@ -33,10 +33,14 @@ import java.io.IOException;
* Abstract base class for OIO which reads and writes bytes from/to a Socket
*/
public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
@ -214,6 +218,16 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf || msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Return the number of bytes ready to read from the underlying Socket.
*/

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.FileRegion;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -140,6 +141,13 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
}
}
private static void checkEOF(FileRegion region) throws IOException {
if (region.transfered() < region.count()) {
throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
"but only wrote " + region.transfered());
}
}
@Override
protected void doClose() throws Exception {
InputStream is = this.is;

View File

@ -16,8 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -64,6 +62,12 @@ public final class NioDatagramChannel
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final DatagramChannelConfig config;
@ -259,77 +263,74 @@ public final class NioDatagramChannel
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
final Object m;
final SocketAddress remoteAddress;
ByteBuf data;
final ByteBuf data;
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg;
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
remoteAddress = envelope.recipient();
m = envelope.content();
data = envelope.content();
} else {
m = msg;
data = (ByteBuf) msg;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
final int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
final int writtenBytes;
if (remoteAddress != null) {
writtenBytes = javaChannel().send(nioData, remoteAddress);
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
writtenBytes = javaChannel().write(nioData);
}
return writtenBytes > 0;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf content = p.content();
if (isSingleDirectBuffer(content)) {
return p;
}
return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
}
boolean done = false;
try {
final int writtenBytes;
if (remoteAddress != null) {
writtenBytes = javaChannel().send(nioData, remoteAddress);
} else {
writtenBytes = javaChannel().write(nioData);
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isSingleDirectBuffer(buf)) {
return buf;
}
done = writtenBytes > 0;
return done;
} finally {
// Handle this in the finally block to make sure we release the old buffer in all cases
// See https://github.com/netty/netty/issues/2644
if (needsCopy) {
// This means we have allocated a new buffer and need to store it back so we not need to allocate it
// later again
if (remoteAddress == null) {
// remoteAddress is null which means we can handle it as ByteBuf directly
in.current(data);
} else {
if (!done) {
// store it back with all the needed informations
in.current(new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(data, remoteAddress));
} else {
// Just store back the new created buffer so it is cleaned up once in.remove() is called.
in.current(data);
}
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
ByteBuf content = (ByteBuf) e.content();
if (isSingleDirectBuffer(content)) {
return e;
}
return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
* (We check this because otherwise we need to make it a non-composite buffer.)
*/
private static boolean isSingleDirectBuffer(ByteBuf buf) {
return buf.isDirect() && buf.nioBufferCount() == 1;
}
@Override

View File

@ -179,6 +179,11 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -61,6 +60,12 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final MulticastSocket socket;
private final DatagramChannelConfig config;
@ -241,28 +246,19 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
break;
}
final Object m;
final ByteBuf data;
final SocketAddress remoteAddress;
if (o instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
remoteAddress = envelope.recipient();
m = envelope.content();
data = envelope.content();
} else {
m = o;
data = (ByteBuf) o;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
}
int length = data.readableBytes();
final int length = data.readableBytes();
if (remoteAddress != null) {
tmpPacket.setSocketAddress(remoteAddress);
}
@ -285,6 +281,24 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
return msg;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
return msg;
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newPromise());

View File

@ -174,6 +174,11 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {