From 4a3ef90381f343a457243d6e2eabac01e551315b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 5 Aug 2014 14:24:49 +0200 Subject: [PATCH] Port ChannelOutboundBuffer and related changes from 4.0 Motivation: We did various changes related to the ChannelOutboundBuffer in 4.0 branch. This commit port all of them over and so make sure our branches are synced in terms of these changes. Related to [#2734], [#2709], [#2729], [#2710] and [#2693] . Modification: Port all changes that was done on the ChannelOutboundBuffer. This includes the port of the following commits: - 73dfd7c01b49aca006a34cc48197dee3fc360af1 - 997d8c32d23f2d88903b7b607360907b99101002 - e282e504f17b0874719ff606c728494e3509b1a0 - 5e5d1a58fd3159c04ac7d10edfb8ed7a83d3935e - 8ee3575e72d6ee000a99c717d96f36695a8667a0 - d6f0d12a8692c095df43b2a4462cbc97cf5c5a2d - 16e50765d1fb99005ad761409c28dcedf477531b - 3f3e66c31ae3da70c36cc125ca9bcac8215390e4 Result: - Less memory usage by ChannelOutboundBuffer - Same code as in 4.0 branch - Make it possible to use ChannelOutboundBuffer with Channel implementation that not extends AbstractChannel --- .../java/io/netty/buffer/ByteBufUtil.java | 95 ++- .../io/netty/util/ReferenceCountUtil.java | 36 +- .../channel/epoll/AbstractEpollChannel.java | 56 +- .../epoll/EpollChannelOutboundBuffer.java | 102 --- .../channel/epoll/EpollDatagramChannel.java | 87 ++- .../EpollDatagramChannelOutboundBuffer.java | 63 -- .../epoll/EpollServerSocketChannel.java | 7 +- .../channel/epoll/EpollSocketChannel.java | 292 ++++---- .../java/io/netty/channel/epoll/IovArray.java | 63 +- .../channel/sctp/nio/NioSctpChannel.java | 76 +-- .../sctp/nio/NioSctpServerChannel.java | 5 + .../channel/sctp/oio/OioSctpChannel.java | 12 + .../sctp/oio/OioSctpServerChannel.java | 5 + .../udt/nio/NioUdtAcceptorChannel.java | 5 + .../io/netty/channel/AbstractChannel.java | 49 +- .../netty/channel/AbstractServerChannel.java | 22 +- .../netty/channel/ChannelOutboundBuffer.java | 623 ++++++++++-------- .../channel/nio/AbstractNioByteChannel.java | 27 +- .../netty/channel/nio/AbstractNioChannel.java | 95 ++- .../channel/oio/AbstractOioByteChannel.java | 18 +- .../channel/oio/OioByteStreamChannel.java | 8 + .../socket/nio/NioDatagramChannel.java | 78 ++- .../nio/NioDatagramChannelOutboundBuffer.java | 65 -- .../socket/nio/NioServerSocketChannel.java | 5 + .../channel/socket/nio/NioSocketChannel.java | 38 +- .../nio/NioSocketChannelOutboundBuffer.java | 233 ------- .../socket/oio/OioDatagramChannel.java | 42 +- .../socket/oio/OioServerSocketChannel.java | 5 + ...st.java => ChannelOutboundBufferTest.java} | 149 +++-- 29 files changed, 1196 insertions(+), 1165 deletions(-) delete mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java delete mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java rename transport/src/test/java/io/netty/channel/{socket/nio/NioSocketChannelOutboundBufferTest.java => ChannelOutboundBufferTest.java} (50%) diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java index 0aa9807f1b..e0448ea459 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java @@ -16,6 +16,8 @@ package io.netty.buffer; import io.netty.util.CharsetUtil; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -42,6 +44,8 @@ public final class ByteBufUtil { static final ByteBufAllocator DEFAULT_ALLOCATOR; + private static final int THREAD_LOCAL_BUFFER_SIZE; + static { final char[] DIGITS = "0123456789abcdef".toCharArray(); for (int i = 0; i < 256; i ++) { @@ -49,9 +53,7 @@ public final class ByteBufUtil { HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F]; } - String allocType = SystemPropertyUtil.get( - "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); - allocType = allocType.toLowerCase(Locale.US).trim(); + String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { @@ -66,6 +68,9 @@ public final class ByteBufUtil { } DEFAULT_ALLOCATOR = alloc; + + THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); + logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE); } /** @@ -414,5 +419,89 @@ public final class ByteBufUtil { return dst.flip().toString(); } + /** + * Returns a cached thread-local direct buffer, if available. + * + * @return a cached thread-local direct buffer, if available. {@code null} otherwise. + */ + public static ByteBuf threadLocalDirectBuffer() { + if (THREAD_LOCAL_BUFFER_SIZE <= 0) { + return null; + } + + if (PlatformDependent.hasUnsafe()) { + return ThreadLocalUnsafeDirectByteBuf.newInstance(); + } else { + return ThreadLocalDirectByteBuf.newInstance(); + } + } + + static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { + + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) { + return new ThreadLocalUnsafeDirectByteBuf(handle); + } + }; + + static ThreadLocalUnsafeDirectByteBuf newInstance() { + ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + private final Handle handle; + + private ThreadLocalUnsafeDirectByteBuf(Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + @Override + protected void deallocate() { + if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } + + static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf { + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ThreadLocalDirectByteBuf newObject(Handle handle) { + return new ThreadLocalDirectByteBuf(handle); + } + }; + + static ThreadLocalDirectByteBuf newInstance() { + ThreadLocalDirectByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + private final Handle handle; + + private ThreadLocalDirectByteBuf(Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + @Override + protected void deallocate() { + if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } + private ByteBufUtil() { } } diff --git a/common/src/main/java/io/netty/util/ReferenceCountUtil.java b/common/src/main/java/io/netty/util/ReferenceCountUtil.java index c22a056717..529d16af99 100644 --- a/common/src/main/java/io/netty/util/ReferenceCountUtil.java +++ b/common/src/main/java/io/netty/util/ReferenceCountUtil.java @@ -39,7 +39,7 @@ public final class ReferenceCountUtil { } /** - * Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}. + * Try to call {@link ReferenceCounted#retain(int)} if the specified message implements {@link ReferenceCounted}. * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. */ @SuppressWarnings("unchecked") @@ -87,7 +87,7 @@ public final class ReferenceCountUtil { } /** - * Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}. + * Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}. * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. */ public static boolean release(Object msg, int decrement) { @@ -97,6 +97,38 @@ public final class ReferenceCountUtil { return false; } + /** + * Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}. + * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. + * Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release()} + * and logs it, rather than rethrowing it to the caller. It is usually recommended to use {@link #release(Object)} + * instead, unless you absolutely need to swallow an exception. + */ + public static void safeRelease(Object msg) { + try { + release(msg); + } catch (Throwable t) { + logger.warn("Failed to release a message: {}", msg, t); + } + } + + /** + * Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}. + * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. + * Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release(int)} + * and logs it, rather than rethrowing it to the caller. It is usually recommended to use + * {@link #release(Object, int)} instead, unless you absolutely need to swallow an exception. + */ + public static void safeRelease(Object msg, int decrement) { + try { + release(msg, decrement); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to release a message: {} (decrement: {})", msg, decrement, t); + } + } + } + /** * Schedules the specified object to be released when the caller thread terminates. Note that this operation is * intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 35c8aecf00..79fef4403c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -15,10 +15,15 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; import io.netty.channel.EventLoop; +import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; @@ -98,6 +103,9 @@ abstract class AbstractEpollChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { + // Channel.read() or ChannelHandlerContext.read() was called + ((AbstractEpollUnsafe) unsafe()).readPending = true; + if ((flags & readFlag) == 0) { flags |= readFlag; modifyEvents(); @@ -159,6 +167,47 @@ abstract class AbstractEpollChannel extends AbstractChannel { @Override protected abstract AbstractEpollUnsafe newUnsafe(); + /** + * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one. + */ + protected final ByteBuf newDirectBuffer(ByteBuf buf) { + return newDirectBuffer(buf, buf); + } + + /** + * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder. + * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by + * this method. + */ + protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) { + final int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + ReferenceCountUtil.safeRelease(holder); + return Unpooled.EMPTY_BUFFER; + } + + final ByteBufAllocator alloc = alloc(); + if (alloc.isDirectBufferPooled()) { + return newDirectBuffer0(holder, buf, alloc, readableBytes); + } + + final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); + if (directBuf == null) { + return newDirectBuffer0(holder, buf, alloc, readableBytes); + } + + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + ReferenceCountUtil.safeRelease(holder); + return directBuf; + } + + private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) { + final ByteBuf directBuf = alloc.directBuffer(capacity); + directBuf.writeBytes(buf, buf.readerIndex(), capacity); + ReferenceCountUtil.safeRelease(holder); + return directBuf; + } + protected static void checkResolvable(InetSocketAddress addr) { if (addr.isUnresolved()) { throw new UnresolvedAddressException(); @@ -180,13 +229,6 @@ abstract class AbstractEpollChannel extends AbstractChannel { // NOOP } - @Override - public void beginRead() { - // Channel.read() or ChannelHandlerContext.read() was called - readPending = true; - super.beginRead(); - } - @Override protected void flush0() { // Flush immediately only when there's no pending flush. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java deleted file mode 100644 index 49ea3187ca..0000000000 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOutboundBuffer.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.util.Recycler; - -import java.nio.ByteBuffer; - -/** - * Special {@link ChannelOutboundBuffer} implementation which allows to obtain a {@link IovArray} - * and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce - * GC pressure a lot. - */ -final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer { - private static final Recycler RECYCLER = new Recycler() { - @Override - protected EpollChannelOutboundBuffer newObject(Handle handle) { - return new EpollChannelOutboundBuffer(handle); - } - }; - - /** - * Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel} - */ - static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) { - EpollChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } - - private EpollChannelOutboundBuffer(Recycler.Handle handle) { - super(handle); - } - - /** - * Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this - * {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance. - */ - @Override - protected Object beforeAdd(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!buf.hasMemoryAddress()) { - return copyToDirectByteBuf(buf); - } - } - return msg; - } - - /** - * Returns a {@link IovArray} if the currently pending messages. - *

- * Note that the returned {@link IovArray} is reused and thus should not escape - * {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}. - */ - IovArray iovArray() { - IovArray array = IovArray.get(); - final Entry[] buffer = entries(); - final int mask = entryMask(); - int unflushed = unflushed(); - int flushed = flushed(); - Object m; - - while (flushed != unflushed && (m = buffer[flushed].msg()) != null) { - if (!(m instanceof ByteBuf)) { - // Just break out of the loop as we can still use gathering writes for the buffers that we - // found by now. - break; - } - - Entry entry = buffer[flushed]; - - // Check if the entry was cancelled. if so we just skip it. - if (!entry.isCancelled()) { - ByteBuf buf = (ByteBuf) m; - if (!array.add(buf)) { - // Can not hold more data so break here. - // We will handle this on the next write loop. - break; - } - } - - flushed = flushed + 1 & mask; - } - return array; - } -} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 25ef612fc9..26cc442a73 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -16,13 +16,14 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; +import io.netty.channel.AddressedEnvelope; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -44,6 +45,12 @@ import java.nio.channels.NotYetConnectedException; */ public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(true); + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " + + StringUtil.simpleClassName(AddressedEnvelope.class) + '<' + + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(InetSocketAddress.class) + ">, " + + StringUtil.simpleClassName(ByteBuf.class) + ')'; private volatile InetSocketAddress local; private volatile InetSocketAddress remote; @@ -282,27 +289,20 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } private boolean doWriteMessage(Object msg) throws IOException { - final Object m; + final ByteBuf data; InetSocketAddress remoteAddress; - ByteBuf data; - if (msg instanceof DatagramPacket) { - DatagramPacket packet = (DatagramPacket) msg; - remoteAddress = packet.recipient(); - m = packet.content(); + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope envelope = + (AddressedEnvelope) msg; + data = envelope.content(); + remoteAddress = envelope.recipient(); } else { - m = msg; + data = (ByteBuf) msg; remoteAddress = null; } - if (m instanceof ByteBufHolder) { - data = ((ByteBufHolder) m).content(); - } else if (m instanceof ByteBuf) { - data = (ByteBuf) m; - } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); - } - - int dataLen = data.readableBytes(); + final int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } @@ -324,19 +324,62 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(), remoteAddress.getAddress(), remoteAddress.getPort()); } + return writtenBytes > 0; } + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket) msg; + ByteBuf content = packet.content(); + if (content.hasMemoryAddress()) { + return msg; + } + + // We can only handle direct buffers so we need to copy if a non direct is + // passed to write. + return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()); + } + + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (buf.hasMemoryAddress()) { + return msg; + } + + // We can only handle direct buffers so we need to copy if a non direct is + // passed to write. + return newDirectBuffer(buf); + } + + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope e = (AddressedEnvelope) msg; + if (e.content() instanceof ByteBuf && + (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) { + + ByteBuf content = (ByteBuf) e.content(); + if (content.hasMemoryAddress()) { + return e; + } + + // We can only handle direct buffers so we need to copy if a non direct is + // passed to write. + return new DefaultAddressedEnvelope( + newDirectBuffer(e, content), (InetSocketAddress) e.recipient()); + } + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + @Override public EpollDatagramChannelConfig config() { return config; } - @Override - protected ChannelOutboundBuffer newOutboundBuffer() { - return EpollDatagramChannelOutboundBuffer.newInstance(this); - } - @Override protected void doDisconnect() throws Exception { connected = false; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java deleted file mode 100644 index 494ef41274..0000000000 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.socket.DatagramPacket; -import io.netty.util.Recycler; - -final class EpollDatagramChannelOutboundBuffer extends ChannelOutboundBuffer { - private static final Recycler RECYCLER = - new Recycler() { - @Override - protected EpollDatagramChannelOutboundBuffer newObject(Handle handle) { - return new EpollDatagramChannelOutboundBuffer(handle); - } - }; - - static EpollDatagramChannelOutboundBuffer newInstance(EpollDatagramChannel channel) { - EpollDatagramChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } - - private EpollDatagramChannelOutboundBuffer(Recycler.Handle handle) { - super(handle); - } - - @Override - protected Object beforeAdd(Object msg) { - if (msg instanceof DatagramPacket) { - DatagramPacket packet = (DatagramPacket) msg; - ByteBuf content = packet.content(); - if (isCopyNeeded(content)) { - ByteBuf direct = copyToDirectByteBuf(content); - return new DatagramPacket(direct, packet.recipient(), packet.sender()); - } - } else if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (isCopyNeeded(buf)) { - msg = copyToDirectByteBuf((ByteBuf) msg); - } - } - return msg; - } - - private static boolean isCopyNeeded(ByteBuf content) { - return !content.hasMemoryAddress() || content.nioBufferCount() != 1; - } -} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index 3ed89eca28..1722ac29a2 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -74,7 +74,12 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } @Override - protected void doWrite(ChannelOutboundBuffer in) { + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 49c5179ba4..008f0e94d7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -32,7 +32,6 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannelOutboundBuffer; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; @@ -50,6 +49,10 @@ import java.util.concurrent.TimeUnit; */ public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; + private final EpollSocketChannelConfig config; /** @@ -111,6 +114,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.remove(); return true; } + boolean done = false; long writtenBytes = 0; if (buf.hasMemoryAddress()) { @@ -132,7 +136,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - updateOutboundBuffer(in, writtenBytes, 1, done); + + in.removeBytes(writtenBytes); return done; } else if (buf.nioBufferCount() == 1) { int readerIndex = buf.readerIndex(); @@ -154,23 +159,27 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - updateOutboundBuffer(in, writtenBytes, 1, done); + + in.removeBytes(writtenBytes); return done; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes); + return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); } } - private boolean writeBytesMultiple( - EpollChannelOutboundBuffer in, IovArray array) throws IOException { - boolean done = false; + private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { + long expectedWrittenBytes = array.size(); int cnt = array.count(); + + assert expectedWrittenBytes != 0; + assert cnt != 0; + + boolean done = false; long writtenBytes = 0; int offset = 0; int end = offset + cnt; - int messages = cnt; for (;;) { long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); if (localWrittenBytes == 0) { @@ -200,92 +209,55 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } while (offset < end && localWrittenBytes > 0); } - updateOutboundBuffer(in, writtenBytes, messages, done); + in.removeBytes(writtenBytes); return done; } private boolean writeBytesMultiple( - ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, + ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes) throws IOException { + + assert expectedWrittenBytes != 0; + boolean done = false; long writtenBytes = 0; int offset = 0; int end = offset + nioBufferCnt; - loop: while (nioBufferCnt > 0) { - for (;;) { - int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; - - long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break loop; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break loop; - } - do { - ByteBuffer buffer = nioBuffers[offset]; - int pos = buffer.position(); - int bytes = buffer.limit() - pos; - if (bytes > localWrittenBytes) { - buffer.position(pos + (int) localWrittenBytes); - // incomplete write - break; - } else { - offset++; - nioBufferCnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); + for (;;) { + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + do { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); } - updateOutboundBuffer(in, writtenBytes, msgCount, done); + + in.removeBytes(writtenBytes); return done; } - private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, - boolean done) { - if (done) { - // Release all buffers - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - in.progress(buf.readableBytes()); - in.remove(); - } - in.progress(writtenBytes); - } else { - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes < writtenBytes) { - in.progress(readableBytes); - in.remove(); - writtenBytes -= readableBytes; - } else if (readableBytes > writtenBytes) { - buf.readerIndex(readerIndex + (int) writtenBytes); - in.progress(writtenBytes); - break; - } else { // readable == writtenBytes - in.progress(readableBytes); - in.remove(); - break; - } - } - } - } - /** * Write a {@link DefaultFileRegion} * @@ -293,6 +265,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So * @return amount the amount of written bytes */ private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { + if (region.transfered() >= region.count()) { + in.remove(); + return true; + } + boolean done = false; long flushedAmount = 0; @@ -331,69 +308,102 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } - // Do gathering write if: - // * the outbound buffer contains more than one messages and - // * they are all buffers rather than a file region. - if (msgCount >= 1) { - if (PlatformDependent.hasUnsafe()) { - // this means we can cast to EpollChannelOutboundBuffer and write the IovArray directly. - EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in; - IovArray array = epollIn.iovArray(); - int cnt = array.count(); - if (cnt > 1) { - if (!writeBytesMultiple(epollIn, array)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - break; - } - - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - continue; - } - } else { - NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; - // Ensure the pending writes are made of memoryaddresses only. - ByteBuffer[] nioBuffers = nioIn.nioBuffers(); - int nioBufferCnt = nioIn.nioBufferCount(); - if (nioBufferCnt > 1) { - if (!writeBytesMultiple(nioIn, msgCount, nioBuffers, nioBufferCnt, nioIn.nioBufferSize())) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - break; - } - - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - continue; - } - } - } - - // The outbound buffer contains only one message or it contains a file region. - Object msg = in.current(); - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!writeBytes(in, buf)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. + // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. + if (msgCount > 1 && in.current() instanceof ByteBuf) { + if (!doWriteMultiple(in)) { break; } - } else if (msg instanceof DefaultFileRegion) { - DefaultFileRegion region = (DefaultFileRegion) msg; - if (!writeFileRegion(in, region)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. + + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + } else { // msgCount == 1 + if (!doWriteSingle(in)) { break; } - } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } } } + private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { + // The outbound buffer contains only one message or it contains a file region. + Object msg = in.current(); + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!writeBytes(in, buf)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else if (msg instanceof DefaultFileRegion) { + DefaultFileRegion region = (DefaultFileRegion) msg; + if (!writeFileRegion(in, region)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { + // Should never reach here. + throw new Error(); + } + + return true; + } + + private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + if (PlatformDependent.hasUnsafe()) { + // this means we can cast to IovArray and write the IovArray directly. + IovArray array = IovArray.get(in); + int cnt = array.count(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, array)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } else { + ByteBuffer[] buffers = in.nioBuffers(); + int cnt = in.nioBufferCount(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } + + return true; + } + + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { + // We can only handle buffers with memory address so we need to copy if a non direct is + // passed to write. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + return buf; + } + + if (msg instanceof DefaultFileRegion) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + @Override public EpollSocketChannelConfig config() { return config; @@ -755,16 +765,4 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } } } - - @Override - protected ChannelOutboundBuffer newOutboundBuffer() { - if (PlatformDependent.hasUnsafe()) { - // This means we will be able to access the memory addresses directly and so be able to do - // gathering writes with the AddressEntry. - return EpollChannelOutboundBuffer.newInstance(this); - } else { - // No access to the memoryAddres, so fallback to use ByteBuffer[] for gathering writes. - return NioSocketChannelOutboundBuffer.newInstance(this); - } - } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java index 76c1000d31..bb4431870e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java @@ -16,6 +16,8 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; @@ -23,7 +25,7 @@ import io.netty.util.internal.PlatformDependent; * Represent an array of struct array and so can be passed directly over via JNI without the need to do any more * array copies. * - * The buffers are written out directly into direct memory to match the struct iov. See also man writev. + * The buffers are written out directly into direct memory to match the struct iov. See also {@code man writev}. * *

  * struct iovec {
@@ -33,19 +35,24 @@ import io.netty.util.internal.PlatformDependent;
  * 
* * See also - * - * Efficient JNI programming IV: Wrapping native data objects. + * Efficient JNI programming IV: Wrapping native data objects. */ -final class IovArray { - // Maximal number of struct iov entries that can be passed to writev(...) - private static final int IOV_MAX = Native.IOV_MAX; - // The size of an address which should be 8 for 64 bits and 4 for 32 bits. +final class IovArray implements MessageProcessor { + + /** The size of an address which should be 8 for 64 bits and 4 for 32 bits. */ private static final int ADDRESS_SIZE = PlatformDependent.addressSize(); - // The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the - // address. + + /** + * The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the + * address. + */ private static final int IOV_SIZE = 2 * ADDRESS_SIZE; - // The needed memory to hold up to IOV_MAX iov entries. - private static final int CAPACITY = IOV_MAX * IOV_SIZE; + + /** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified + * the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}. + */ + private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE; private static final FastThreadLocal ARRAY = new FastThreadLocal() { @Override @@ -72,17 +79,26 @@ final class IovArray { * Try to add the given {@link ByteBuf}. Returns {@code true} on success, * {@code false} otherwise. */ - boolean add(ByteBuf buf) { - if (count == IOV_MAX) { + private boolean add(ByteBuf buf) { + if (count == Native.IOV_MAX) { // No more room! return false; } - int len = buf.readableBytes(); - long addr = buf.memoryAddress(); - int offset = buf.readerIndex(); - long baseOffset = memoryAddress(count++); - long lengthOffset = baseOffset + ADDRESS_SIZE; + final int len = buf.readableBytes(); + if (len == 0) { + // No need to add an empty buffer. + // We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue + // fetching the next buffers. + return true; + } + + final long addr = buf.memoryAddress(); + final int offset = buf.readerIndex(); + + final long baseOffset = memoryAddress(count++); + final long lengthOffset = baseOffset + ADDRESS_SIZE; + if (ADDRESS_SIZE == 8) { // 64bit PlatformDependent.putLong(baseOffset, addr + offset); @@ -92,6 +108,7 @@ final class IovArray { PlatformDependent.putInt(baseOffset, (int) addr + offset); PlatformDependent.putInt(lengthOffset, len); } + size += len; return true; } @@ -147,13 +164,19 @@ final class IovArray { return memoryAddress + IOV_SIZE * offset; } + @Override + public boolean processMessage(Object msg) throws Exception { + return msg instanceof ByteBuf && add((ByteBuf) msg); + } + /** - * Returns a {@link IovArray} which can be filled. + * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}. */ - static IovArray get() { + static IovArray get(ChannelOutboundBuffer buffer) throws Exception { IovArray array = ARRAY.get(); array.size = 0; array.count = 0; + buffer.forEachFlushedMessage(array); return array; } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index cdf93de001..8542ce5fbe 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -20,7 +20,7 @@ import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.ByteBuf; -import io.netty.channel.AbstractChannel; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -34,8 +34,8 @@ import io.netty.channel.sctp.SctpChannelConfig; import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpServerChannel; -import io.netty.util.Recycler; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -303,17 +303,45 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return true; } - ByteBuffer nioData = data.nioBuffer(); - + ByteBufAllocator alloc = alloc(); + boolean needsCopy = data.nioBufferCount() != 1; + if (!needsCopy) { + if (!data.isDirect() && alloc.isDirectBufferPooled()) { + needsCopy = true; + } + } + ByteBuffer nioData; + if (!needsCopy) { + nioData = data.nioBuffer(); + } else { + data = alloc.directBuffer(dataLen).writeBytes(data); + nioData = data.nioBuffer(); + } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); final int writtenBytes = javaChannel().send(nioData, mi); - return writtenBytes > 0; } + @Override + protected final Object filterOutboundMessage(Object msg) throws Exception { + if (msg instanceof SctpMessage) { + SctpMessage m = (SctpMessage) msg; + ByteBuf buf = m.content(); + if (buf.isDirect() && buf.nioBufferCount() == 1) { + return m; + } + + return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf)); + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + + " (expected: " + StringUtil.simpleClassName(SctpMessage.class)); + } + @Override public ChannelFuture bindAddress(InetAddress localAddress) { return bindAddress(localAddress, newPromise()); @@ -364,44 +392,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return promise; } - @Override - protected ChannelOutboundBuffer newOutboundBuffer() { - return NioSctpChannelOutboundBuffer.newInstance(this); - } - - static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer { - private static final Recycler RECYCLER = - new Recycler() { - @Override - protected NioSctpChannelOutboundBuffer newObject(Handle handle) { - return new NioSctpChannelOutboundBuffer(handle); - } - }; - - static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) { - NioSctpChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } - - private NioSctpChannelOutboundBuffer(Recycler.Handle handle) { - super(handle); - } - - @Override - protected Object beforeAdd(Object msg) { - if (msg instanceof SctpMessage) { - SctpMessage message = (SctpMessage) msg; - ByteBuf content = message.content(); - if (!content.isDirect() || content.nioBufferCount() != 1) { - ByteBuf direct = copyToDirectByteBuf(content); - return new SctpMessage(message.protocolIdentifier(), message.streamIdentifier(), direct); - } - } - return msg; - } - } - private final class NioSctpChannelConfig extends DefaultSctpChannelConfig { private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) { super(channel, javaChannel); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 568a06ed72..1a0d50b8de 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -221,6 +221,11 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel throw new UnsupportedOperationException(); } + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) { super(channel, javaChannel); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 227a3a126a..e12981f7a6 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -34,6 +34,7 @@ import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpServerChannel; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -64,6 +65,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel InternalLoggerFactory.getInstance(OioSctpChannel.class); private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')'; private final SctpChannel ch; private final SctpChannelConfig config; @@ -273,6 +275,16 @@ public class OioSctpChannel extends AbstractOioMessageChannel } } + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + if (msg instanceof SctpMessage) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE); + } + @Override public Association association() { try { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index 47af27adc4..fc592b5c9f 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -290,6 +290,11 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel throw new UnsupportedOperationException(); } + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) { super(channel, javaChannel); diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index b913a8a257..4380977236 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -98,6 +98,11 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im throw new UnsupportedOperationException(); } + @Override + protected final Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + @Override public boolean isActive() { return javaChannel().socket().isBound(); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index ee5571988e..8574626c92 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -24,7 +24,6 @@ import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -101,7 +100,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public boolean isWritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); - return buf != null && buf.getWritable(); + return buf != null && buf.isWritable(); } @Override @@ -397,8 +396,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract class AbstractUnsafe implements Unsafe { - private ChannelOutboundBuffer outboundBuffer = newOutboundBuffer(); - + private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private boolean inFlush0; @Override @@ -647,7 +645,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void beginRead() { + public final void beginRead() { if (!isActive()) { return; } @@ -666,7 +664,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void write(Object msg, ChannelPromise promise) { + public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so @@ -678,11 +676,25 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha ReferenceCountUtil.release(msg); return; } - outboundBuffer.addMessage(msg, promise); + + int size; + try { + msg = filterOutboundMessage(msg); + size = estimatorHandle().size(msg); + if (size < 0) { + size = 0; + } + } catch (Throwable t) { + safeSetFailure(promise, t); + ReferenceCountUtil.release(msg); + return; + } + + outboundBuffer.addMessage(msg, size, promise); } @Override - public void flush() { + public final void flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; @@ -729,7 +741,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelPromise voidPromise() { + public final ChannelPromise voidPromise() { return unsafeVoidPromise; } @@ -787,13 +799,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - /** - * Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}. - */ - protected ChannelOutboundBuffer newOutboundBuffer() { - return ChannelOutboundBuffer.newInstance(this); - } - /** * Return {@code true} if the given {@link EventLoop} is compatible with this instance. */ @@ -852,12 +857,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception; - protected static void checkEOF(FileRegion region) throws IOException { - if (region.transfered() < region.count()) { - throw new EOFException("Expected to be able to write " - + region.count() + " bytes, but only wrote " - + region.transfered()); - } + /** + * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that + * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer) + */ + protected Object filterOutboundMessage(Object msg) throws Exception { + return msg; } static final class CloseFuture extends DefaultChannelPromise { diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 7a2794a587..c6fc05e56e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -15,8 +15,6 @@ */ package io.netty.channel; -import io.netty.util.ReferenceCountUtil; - import java.net.SocketAddress; /** @@ -71,24 +69,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new UnsupportedOperationException(); } + @Override + protected final Object filterOutboundMessage(Object msg) { + throw new UnsupportedOperationException(); + } + private final class DefaultServerUnsafe extends AbstractUnsafe { - @Override - public void write(Object msg, ChannelPromise promise) { - ReferenceCountUtil.release(msg); - reject(promise); - } - - @Override - public void flush() { - // ignore - } - @Override public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - reject(promise); - } - - private void reject(ChannelPromise promise) { safeSetFailure(promise, new UnsupportedOperationException()); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 41be4521ad..f289a86243 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -13,24 +13,22 @@ * License for the specific language governing permissions and limitations * under the License. */ -/* - * Written by Josh Bloch of Google Inc. and released to the public domain, - * as explained at http://creativecommons.org/publicdomain/zero/1.0/. - */ package io.netty.channel; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -38,56 +36,48 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. + * + * All the methods should only be called by the {@link EventLoop} of the {@link Channel}. */ -public class ChannelOutboundBuffer { +public final class ChannelOutboundBuffer { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); - protected static final int INITIAL_CAPACITY = - SystemPropertyUtil.getInt("io.netty.outboundBufferInitialCapacity", 4); - - static { - if (logger.isDebugEnabled()) { - logger.debug("-Dio.netty.outboundBufferInitialCapacity: {}", INITIAL_CAPACITY); - } - } - - private static final Recycler RECYCLER = new Recycler() { + private static final FastThreadLocal NIO_BUFFERS = new FastThreadLocal() { @Override - protected ChannelOutboundBuffer newObject(Handle handle) { - return new ChannelOutboundBuffer(handle); + protected ByteBuffer[] initialValue() throws Exception { + return new ByteBuffer[1024]; } }; - /** - * Get a new instance of this {@link ChannelOutboundBuffer} and attach it the given {@link AbstractChannel} - */ - static ChannelOutboundBuffer newInstance(AbstractChannel channel) { - ChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } + private final Channel channel; - private final Handle handle; - - protected AbstractChannel channel; - - // A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The - // flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range - // [unflushed, tail). - private Entry[] buffer; + // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) + // + // The Entry that is the first in the linked-list structure that was flushed + private Entry flushedEntry; + // The Entry which is the first unflushed in the linked-list structure + private Entry unflushedEntry; + // The Entry which represents the tail of the buffer + private Entry tailEntry; + // The number of flushed entries that are not written yet private int flushed; - private int unflushed; - private int tail; + + private int nioBufferCount; + private long nioBufferSize; private boolean inFail; private static final AtomicLongFieldUpdater TOTAL_PENDING_SIZE_UPDATER; + @SuppressWarnings("unused") private volatile long totalPendingSize; private static final AtomicIntegerFieldUpdater WRITABLE_UPDATER; + @SuppressWarnings("FieldMayBeFinal") + private volatile int writable = 1; + static { AtomicIntegerFieldUpdater writableUpdater = PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable"); @@ -104,46 +94,26 @@ public class ChannelOutboundBuffer { TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater; } - private volatile int writable = 1; - - protected ChannelOutboundBuffer(Handle handle) { - this.handle = handle; - - buffer = new Entry[INITIAL_CAPACITY]; - for (int i = 0; i < buffer.length; i++) { - buffer[i] = newEntry(); - } + ChannelOutboundBuffer(AbstractChannel channel) { + this.channel = channel; } /** - * Return the array of {@link Entry}'s which hold the pending write requests in an circular array. + * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once + * the message was written. */ - protected final Entry[] entries() { - return buffer; - } - - /** - * Add the given message to this {@link ChannelOutboundBuffer} so it will be marked as flushed once - * {@link #addFlush()} was called. The {@link ChannelPromise} will be notified once the write operations - * completes. - */ - public final void addMessage(Object msg, ChannelPromise promise) { - msg = beforeAdd(msg); - int size = channel.estimatorHandle().size(msg); - if (size < 0) { - size = 0; + public void addMessage(Object msg, int size, ChannelPromise promise) { + Entry entry = Entry.newInstance(msg, size, total(msg), promise); + if (tailEntry == null) { + flushedEntry = null; + tailEntry = entry; + } else { + Entry tail = tailEntry; + tail.next = entry; + tailEntry = entry; } - - Entry e = buffer[tail++]; - e.msg = msg; - e.pendingSize = size; - e.promise = promise; - e.total = total(msg); - - tail &= buffer.length - 1; - - if (tail == flushed) { - addCapacity(); + if (unflushedEntry == null) { + unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. @@ -152,62 +122,32 @@ public class ChannelOutboundBuffer { } /** - * Is called before the message is actually added to the {@link ChannelOutboundBuffer} and so allow to - * convert it to a different format. Sub-classes may override this. + * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed + * and so you will be able to handle them. */ - protected Object beforeAdd(Object msg) { - return msg; - } - - /** - * Expand internal array which holds the {@link Entry}'s. - */ - private void addCapacity() { - int p = flushed; - int n = buffer.length; - int r = n - p; // number of elements to the right of p - int s = size(); - - int newCapacity = n << 1; - if (newCapacity < 0) { - throw new IllegalStateException(); - } - - Entry[] e = new Entry[newCapacity]; - System.arraycopy(buffer, p, e, 0, r); - System.arraycopy(buffer, 0, e, r, p); - for (int i = n; i < e.length; i++) { - e[i] = newEntry(); - } - - buffer = e; - flushed = 0; - unflushed = s; - tail = n; - } - - /** - * Mark all messages in this {@link ChannelOutboundBuffer} as flushed. - */ - public final void addFlush() { + public void addFlush() { // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 - if (unflushed != tail) { - unflushed = tail; - - final int mask = buffer.length - 1; - int i = flushed; - while (i != unflushed && buffer[i].msg != null) { - Entry entry = buffer[i]; + Entry entry = unflushedEntry; + if (entry != null) { + if (flushedEntry == null) { + // there is no flushedEntry yet, so start with the entry + flushedEntry = entry; + } + do { + flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending); } - i = i + 1 & mask; - } + entry = entry.next; + } while (entry != null); + + // All flushed so reset unflushedEntry + unflushedEntry = null; } } @@ -215,11 +155,8 @@ public class ChannelOutboundBuffer { * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ - final void incrementPendingOutboundBytes(int size) { - // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets - // recycled while process this method. - Channel channel = this.channel; - if (size == 0 || channel == null) { + void incrementPendingOutboundBytes(int size) { + if (size == 0) { return; } @@ -235,11 +172,8 @@ public class ChannelOutboundBuffer { * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ - final void decrementPendingOutboundBytes(int size) { - // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets - // recycled while process this method. - Channel channel = this.channel; - if (size == 0 || channel == null) { + void decrementPendingOutboundBytes(int size) { + if (size == 0) { return; } @@ -265,20 +199,23 @@ public class ChannelOutboundBuffer { } /** - * Return current message or {@code null} if no flushed message is left to process. + * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written. */ - public final Object current() { - if (isEmpty()) { + public Object current() { + Entry entry = flushedEntry; + if (entry == null) { return null; - } else { - // TODO: Think of a smart way to handle ByteBufHolder messages - Entry entry = buffer[flushed]; - return entry.msg; } + + return entry.msg; } - public final void progress(long amount) { - Entry e = buffer[flushed]; + /** + * Notify the {@link ChannelPromise} of the current message about writing progress. + */ + public void progress(long amount) { + Entry e = flushedEntry; + assert e != null; ChannelPromise p = e.promise; if (p instanceof ChannelProgressivePromise) { long progress = e.progress + amount; @@ -288,93 +225,237 @@ public class ChannelOutboundBuffer { } /** - * Mark the current message as successful written and remove it from this {@link ChannelOutboundBuffer}. - * This method will return {@code true} if there are more messages left to process, {@code false} otherwise. + * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no + * flushed message exists at the time this method is called it will return {@code false} to signal that no more + * messages are ready to be handled. */ - public final boolean remove() { - if (isEmpty()) { + public boolean remove() { + Entry e = flushedEntry; + if (e == null) { return false; } - - Entry e = buffer[flushed]; Object msg = e.msg; - if (msg == null) { - return false; - } ChannelPromise promise = e.promise; int size = e.pendingSize; - e.clear(); - - flushed = flushed + 1 & buffer.length - 1; + removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. - safeRelease(msg); + ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size); } + // recycle the entry + e.recycle(); + return true; } /** - * Mark the current message as failure with the given {@link java.lang.Throwable} and remove it from this - * {@link ChannelOutboundBuffer}. This method will return {@code true} if there are more messages left to process, - * {@code false} otherwise. + * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable} + * and return {@code true}. If no flushed message exists at the time this method is called it will return + * {@code false} to signal that no more messages are ready to be handled. */ - public final boolean remove(Throwable cause) { - if (isEmpty()) { + public boolean remove(Throwable cause) { + Entry e = flushedEntry; + if (e == null) { return false; } - - Entry e = buffer[flushed]; Object msg = e.msg; - if (msg == null) { - return false; - } ChannelPromise promise = e.promise; int size = e.pendingSize; - e.clear(); - - flushed = flushed + 1 & buffer.length - 1; + removeEntry(e); if (!e.cancelled) { // only release message, fail and decrement if it was not canceled before. - safeRelease(msg); + ReferenceCountUtil.safeRelease(msg); safeFail(promise, cause); decrementPendingOutboundBytes(size); } + // recycle the entry + e.recycle(); + return true; } - final boolean getWritable() { + private void removeEntry(Entry e) { + if (-- flushed == 0) { + // processed everything + flushedEntry = null; + if (e == tailEntry) { + tailEntry = null; + unflushedEntry = null; + } + } else { + flushedEntry = e.next; + } + } + + /** + * Removes the fully written entries and update the reader index of the partially written entry. + * This operation assumes all messages in this buffer is {@link ByteBuf}. + */ + public void removeBytes(long writtenBytes) { + for (;;) { + final ByteBuf buf = (ByteBuf) current(); + if (buf == null) { + break; + } + + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes <= writtenBytes) { + if (writtenBytes != 0) { + progress(readableBytes); + writtenBytes -= readableBytes; + } + remove(); + } else { // readableBytes > writtenBytes + if (writtenBytes != 0) { + buf.readerIndex(readerIndex + (int) writtenBytes); + progress(writtenBytes); + } + break; + } + } + } + + /** + * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. + * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned + * array and the total number of readable bytes of the NIO buffers respectively. + *

+ * Note that the returned array is reused and thus should not escape + * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. + * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. + *

+ */ + public ByteBuffer[] nioBuffers() { + long nioBufferSize = 0; + int nioBufferCount = 0; + final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); + Entry entry = flushedEntry; + while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { + if (!entry.cancelled) { + ByteBuf buf = (ByteBuf) entry.msg; + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes > 0) { + nioBufferSize += readableBytes; + int count = entry.count; + if (count == -1) { + //noinspection ConstantValueVariableUse + entry.count = count = buf.nioBufferCount(); + } + int neededSpace = nioBufferCount + count; + if (neededSpace > nioBuffers.length) { + nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); + NIO_BUFFERS.set(threadLocalMap, nioBuffers); + } + if (count == 1) { + ByteBuffer nioBuf = entry.buf; + if (nioBuf == null) { + // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a + // derived buffer + entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); + } + nioBuffers[nioBufferCount ++] = nioBuf; + } else { + ByteBuffer[] nioBufs = entry.bufs; + if (nioBufs == null) { + // cached ByteBuffers as they may be expensive to create in terms + // of Object allocation + entry.bufs = nioBufs = buf.nioBuffers(); + } + nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); + } + } + } + entry = entry.next; + } + this.nioBufferCount = nioBufferCount; + this.nioBufferSize = nioBufferSize; + + return nioBuffers; + } + + private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { + for (ByteBuffer nioBuf: nioBufs) { + if (nioBuf == null) { + break; + } + nioBuffers[nioBufferCount ++] = nioBuf; + } + return nioBufferCount; + } + + private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { + int newCapacity = array.length; + do { + // double capacity until it is big enough + // See https://github.com/netty/netty/issues/1890 + newCapacity <<= 1; + + if (newCapacity < 0) { + throw new IllegalStateException(); + } + + } while (neededSpace > newCapacity); + + ByteBuffer[] newArray = new ByteBuffer[newCapacity]; + System.arraycopy(array, 0, newArray, 0, size); + + return newArray; + } + + /** + * Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was + * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} + * was called. + */ + public int nioBufferCount() { + return nioBufferCount; + } + + /** + * Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was + * obtained via {@link #nioBuffers()}. This method MUST be called after {@link #nioBuffers()} + * was called. + */ + public long nioBufferSize() { + return nioBufferSize; + } + + boolean isWritable() { return writable != 0; } /** - * Return the number of messages that are ready to be written (flushed before). + * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}. */ - public final int size() { - return unflushed - flushed & buffer.length - 1; + public int size() { + return flushed; } /** - * Return {@code true} if this {@link ChannelOutboundBuffer} contains no flushed messages + * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false} + * otherwise. */ - public final boolean isEmpty() { - return unflushed == flushed; + public boolean isEmpty() { + return flushed == 0; } - /** - * Fail all previous flushed messages with the given {@link Throwable}. - */ - final void failFlushed(Throwable cause) { + void failFlushed(Throwable cause) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) @@ -396,10 +477,7 @@ public class ChannelOutboundBuffer { } } - /** - * Fail all pending messages with the given {@link ClosedChannelException}. - */ - final void close(final ClosedChannelException cause) { + void close(final ClosedChannelException cause) { if (inFail) { channel.eventLoop().execute(new Runnable() { @Override @@ -421,140 +499,95 @@ public class ChannelOutboundBuffer { } // Release all unflushed messages. - final int unflushedCount = tail - unflushed & buffer.length - 1; try { - for (int i = 0; i < unflushedCount; i++) { - Entry e = buffer[unflushed + i & buffer.length - 1]; - + Entry e = unflushedEntry; + while (e != null) { // Just decrease; do not trigger any events via decrementPendingOutboundBytes() int size = e.pendingSize; TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); - e.pendingSize = 0; if (!e.cancelled) { - safeRelease(e.msg); + ReferenceCountUtil.safeRelease(e.msg); safeFail(e.promise, cause); } - e.msg = null; - e.promise = null; + e = e.recycleAndGetNext(); } } finally { - tail = unflushed; inFail = false; } - - recycle(); } - /** - * Release the message and log if any error happens during release. - */ - protected static void safeRelease(Object message) { - try { - ReferenceCountUtil.release(message); - } catch (Throwable t) { - logger.warn("Failed to release a message.", t); - } - } - - /** - * Try to mark the given {@link ChannelPromise} as success and log if this failed. - */ private static void safeSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } } - /** - * Try to mark the given {@link ChannelPromise} as failued with the given {@link Throwable} and log if this failed. - */ private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } } - /** - * Recycle this {@link ChannelOutboundBuffer}. After this was called it is disallowed to use it with the previous - * assigned {@link AbstractChannel}. - */ - @SuppressWarnings("unchecked") + @Deprecated public void recycle() { - if (buffer.length > INITIAL_CAPACITY) { - Entry[] e = new Entry[INITIAL_CAPACITY]; - System.arraycopy(buffer, 0, e, 0, INITIAL_CAPACITY); - buffer = e; - } - - // reset flushed, unflushed and tail - // See https://github.com/netty/netty/issues/1772 - flushed = 0; - unflushed = 0; - tail = 0; - - // Set the channel to null so it can be GC'ed ASAP - channel = null; - - totalPendingSize = 0; - writable = 1; - - RECYCLER.recycle(this, (Handle) handle); + // NOOP } - /** - * Return the total number of pending bytes. - */ - public final long totalPendingWriteBytes() { + public long totalPendingWriteBytes() { return totalPendingSize; } /** - * Create a new {@link Entry} to use for the internal datastructure. Sub-classes may override this use a special - * sub-class. + * Call {@link MessageProcessor#processMessage(Object)} for each flushed message + * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)} + * returns {@code false} or there are no more flushed messages to process. */ - protected Entry newEntry() { - return new Entry(); - } - - /** - * Return the index of the first flushed message. - */ - protected final int flushed() { - return flushed; - } - - /** - * Return the index of the first unflushed messages. - */ - protected final int unflushed() { - return unflushed; - } - - protected final int entryMask() { - return buffer.length - 1; - } - - protected ByteBuf copyToDirectByteBuf(ByteBuf buf) { - int readableBytes = buf.readableBytes(); - ByteBufAllocator alloc = channel.alloc(); - if (alloc.isDirectBufferPooled()) { - ByteBuf directBuf = alloc.directBuffer(readableBytes); - directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); - safeRelease(buf); - return directBuf; + public void forEachFlushedMessage(MessageProcessor processor) throws Exception { + if (processor == null) { + throw new NullPointerException("processor"); } - if (ThreadLocalPooledDirectByteBuf.threadLocalDirectBufferSize > 0) { - ByteBuf directBuf = ThreadLocalPooledDirectByteBuf.newInstance(); - directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); - safeRelease(buf); - return directBuf; + + Entry entry = flushedEntry; + if (entry == null) { + return; } - return buf; + + do { + if (!entry.cancelled) { + if (!processor.processMessage(entry.msg)) { + return; + } + } + entry = entry.next; + } while (isFlushedEntry(entry)); } - protected static class Entry { + private boolean isFlushedEntry(Entry e) { + return e != null && e != unflushedEntry; + } + + public interface MessageProcessor { + /** + * Will be called for each flushed message until it either there are no more flushed messages or this + * method returns {@code false}. + */ + boolean processMessage(Object msg) throws Exception; + } + + static final class Entry { + private static final Recycler RECYCLER = new Recycler() { + @Override + protected Entry newObject(Handle handle) { + return new Entry(handle); + } + }; + + private final Handle handle; + Entry next; Object msg; + ByteBuffer[] bufs; + ByteBuffer buf; ChannelPromise promise; long progress; long total; @@ -562,43 +595,42 @@ public class ChannelOutboundBuffer { int count = -1; boolean cancelled; - public Object msg() { - return msg; + private Entry(Handle handle) { + this.handle = handle; } - /** - * Return {@code true} if the {@link Entry} was cancelled via {@link #cancel()} before, - * {@code false} otherwise. - */ - public boolean isCancelled() { - return cancelled; + static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { + Entry entry = RECYCLER.get(); + entry.msg = msg; + entry.pendingSize = size; + entry.total = total; + entry.promise = promise; + return entry; } - /** - * Cancel this {@link Entry} and the message that was hold by this {@link Entry}. This method returns the - * number of pending bytes for the cancelled message. - */ - public int cancel() { + int cancel() { if (!cancelled) { cancelled = true; int pSize = pendingSize; // release message and replace with an empty buffer - safeRelease(msg); + ReferenceCountUtil.safeRelease(msg); msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; + bufs = null; + buf = null; return pSize; } return 0; } - /** - * Clear this {@link Entry} and so release all resources. - */ - public void clear() { + void recycle() { + next = null; + bufs = null; + buf = null; msg = null; promise = null; progress = 0; @@ -606,6 +638,13 @@ public class ChannelOutboundBuffer { pendingSize = 0; count = -1; cancelled = false; + RECYCLER.recycle(this, handle); + } + + Entry recycleAndGetNext() { + Entry next = this.next; + recycle(); + return next; } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index d18cff47f4..20e1727104 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -35,6 +35,11 @@ import java.nio.channels.SelectionKey; * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. */ public abstract class AbstractNioByteChannel extends AbstractNioChannel { + + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(FileRegion.class) + ')'; + private Runnable flushTask; /** @@ -247,11 +252,31 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { break; } } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); + // Should not reach here. + throw new Error(); } } } + @Override + protected final Object filterOutboundMessage(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (buf.isDirect()) { + return msg; + } + + return newDirectBuffer(buf); + } + + if (msg instanceof FileRegion) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index bf144dee78..e61fa94f57 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -15,6 +15,10 @@ */ package io.netty.channel.nio; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -23,6 +27,8 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -174,19 +180,12 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - public void beginRead() { - // Channel.read() or ChannelHandlerContext.read() was called - readPending = true; - super.beginRead(); - } - - @Override - public SelectableChannel ch() { + public final SelectableChannel ch() { return javaChannel(); } @Override - public void connect( + public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; @@ -277,7 +276,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - public void finishConnect() { + public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. @@ -306,7 +305,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - protected void flush0() { + protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. @@ -317,7 +316,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - public void forceFlush() { + public final void forceFlush() { // directly call super.flush0() to force a flush now super.flush0(); } @@ -362,6 +361,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { + // Channel.read() or ChannelHandlerContext.read() was called if (inputShutdown) { return; } @@ -371,6 +371,8 @@ public abstract class AbstractNioChannel extends AbstractChannel { return; } + readPending = true; + final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); @@ -386,4 +388,73 @@ public abstract class AbstractNioChannel extends AbstractChannel { * Finish the connect */ protected abstract void doFinishConnect() throws Exception; + + /** + * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one. + * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high, + * but just returns the original {@link ByteBuf}.. + */ + protected final ByteBuf newDirectBuffer(ByteBuf buf) { + final int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + ReferenceCountUtil.safeRelease(buf); + return Unpooled.EMPTY_BUFFER; + } + + final ByteBufAllocator alloc = alloc(); + if (alloc.isDirectBufferPooled()) { + ByteBuf directBuf = alloc.directBuffer(readableBytes); + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + ReferenceCountUtil.safeRelease(buf); + return directBuf; + } + + final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); + if (directBuf != null) { + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + ReferenceCountUtil.safeRelease(buf); + return directBuf; + } + + // Allocating and deallocating an unpooled direct buffer is very expensive; give up. + return buf; + } + + /** + * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder. + * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by + * this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is + * too high, but just returns the original {@link ByteBuf}.. + */ + protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) { + final int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + ReferenceCountUtil.safeRelease(holder); + return Unpooled.EMPTY_BUFFER; + } + + final ByteBufAllocator alloc = alloc(); + if (alloc.isDirectBufferPooled()) { + ByteBuf directBuf = alloc.directBuffer(readableBytes); + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + ReferenceCountUtil.safeRelease(holder); + return directBuf; + } + + final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); + if (directBuf != null) { + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + ReferenceCountUtil.safeRelease(holder); + return directBuf; + } + + // Allocating and deallocating an unpooled direct buffer is very expensive; give up. + if (holder != buf) { + // Ensure to call holder.release() to give the holder a chance to release other resources than its content. + buf.retain(); + ReferenceCountUtil.safeRelease(holder); + } + + return buf; + } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index c161505ba3..285d853853 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -33,10 +33,14 @@ import java.io.IOException; * Abstract base class for OIO which reads and writes bytes from/to a Socket */ public abstract class AbstractOioByteChannel extends AbstractOioChannel { - private RecvByteBufAllocator.Handle allocHandle; - private volatile boolean inputShutdown; private static final ChannelMetadata METADATA = new ChannelMetadata(false); + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(FileRegion.class) + ')'; + + private RecvByteBufAllocator.Handle allocHandle; + private volatile boolean inputShutdown; /** * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) @@ -214,6 +218,16 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } + @Override + protected final Object filterOutboundMessage(Object msg) throws Exception { + if (msg instanceof ByteBuf || msg instanceof FileRegion) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + /** * Return the number of bytes ready to read from the underlying Socket. */ diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index 4ddc164671..ff83ae588a 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.FileRegion; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -140,6 +141,13 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { } } + private static void checkEOF(FileRegion region) throws IOException { + if (region.transfered() < region.count()) { + throw new EOFException("Expected to be able to write " + region.count() + " bytes, " + + "but only wrote " + region.transfered()); + } + } + @Override protected void doClose() throws Exception { InputStream is = this.is; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 9ef4717209..099b9f2b70 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -25,6 +24,7 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -62,6 +62,12 @@ public final class NioDatagramChannel private static final ChannelMetadata METADATA = new ChannelMetadata(true); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " + + StringUtil.simpleClassName(AddressedEnvelope.class) + '<' + + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(SocketAddress.class) + ">, " + + StringUtil.simpleClassName(ByteBuf.class) + ')'; private final DatagramChannelConfig config; @@ -257,34 +263,24 @@ public final class NioDatagramChannel @Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { - final Object m; final SocketAddress remoteAddress; - ByteBuf data; + final ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") - AddressedEnvelope envelope = (AddressedEnvelope) msg; + AddressedEnvelope envelope = (AddressedEnvelope) msg; remoteAddress = envelope.recipient(); - m = envelope.content(); + data = envelope.content(); } else { - m = msg; + data = (ByteBuf) msg; remoteAddress = null; } - if (m instanceof ByteBufHolder) { - data = ((ByteBufHolder) m).content(); - } else if (m instanceof ByteBuf) { - data = (ByteBuf) m; - } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); - } - - int dataLen = data.readableBytes(); + final int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } - ByteBuffer nioData = data.nioBuffer(); - + final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); final int writtenBytes; if (remoteAddress != null) { writtenBytes = javaChannel().send(nioData, remoteAddress); @@ -294,6 +290,49 @@ public final class NioDatagramChannel return writtenBytes > 0; } + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket p = (DatagramPacket) msg; + ByteBuf content = p.content(); + if (isSingleDirectBuffer(content)) { + return p; + } + return new DatagramPacket(newDirectBuffer(p, content), p.recipient()); + } + + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (isSingleDirectBuffer(buf)) { + return buf; + } + return newDirectBuffer(buf); + } + + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope e = (AddressedEnvelope) msg; + if (e.content() instanceof ByteBuf) { + ByteBuf content = (ByteBuf) e.content(); + if (isSingleDirectBuffer(content)) { + return e; + } + return new DefaultAddressedEnvelope(newDirectBuffer(e, content), e.recipient()); + } + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + + /** + * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer. + * (We check this because otherwise we need to make it a non-composite buffer.) + */ + private static boolean isSingleDirectBuffer(ByteBuf buf) { + return buf.isDirect() && buf.nioBufferCount() == 1; + } + @Override protected boolean continueOnWriteError() { // Continue on write error as a DatagramChannel can write to multiple remote peers @@ -543,11 +582,6 @@ public final class NioDatagramChannel return promise; } - @Override - protected ChannelOutboundBuffer newOutboundBuffer() { - return NioDatagramChannelOutboundBuffer.newInstance(this); - } - @Override protected void setReadPending(boolean readPending) { super.setReadPending(readPending); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java deleted file mode 100644 index b3bd3b676d..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelOutboundBuffer.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.socket.DatagramPacket; -import io.netty.util.Recycler; - -/** - * Special {@link ChannelOutboundBuffer} for {@link NioDatagramChannel} implementations. - */ -final class NioDatagramChannelOutboundBuffer extends ChannelOutboundBuffer { - private static final Recycler RECYCLER = - new Recycler() { - @Override - protected NioDatagramChannelOutboundBuffer newObject(Handle handle) { - return new NioDatagramChannelOutboundBuffer(handle); - } - }; - - /** - * Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given - * {@link .NioDatagramChannel}. - */ - static NioDatagramChannelOutboundBuffer newInstance(NioDatagramChannel channel) { - NioDatagramChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } - - private NioDatagramChannelOutboundBuffer(Recycler.Handle handle) { - super(handle); - } - - /** - * Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation - * will do the conversation itself and we can do a better job here. - */ - @Override - protected Object beforeAdd(Object msg) { - if (msg instanceof DatagramPacket) { - DatagramPacket packet = (DatagramPacket) msg; - ByteBuf content = packet.content(); - if (!content.isDirect() || content.nioBufferCount() != 1) { - ByteBuf direct = copyToDirectByteBuf(content); - return new DatagramPacket(direct, packet.recipient(), packet.sender()); - } - } - return msg; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index c0dc36bc76..3281d193bb 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -179,6 +179,11 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel throw new UnsupportedOperationException(); } + @Override + protected final Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 29297d2fbd..f73092cd3b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -245,17 +245,18 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty super.doWrite(in); return; } + // Ensure the pending writes are made of ByteBufs only. - NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; - ByteBuffer[] nioBuffers = nioIn.nioBuffers(); - int nioBufferCnt = nioIn.nioBufferCount(); + ByteBuffer[] nioBuffers = in.nioBuffers(); + int nioBufferCnt = in.nioBufferCount(); + if (nioBufferCnt <= 1) { // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); - return; + break; } - long expectedWrittenBytes = nioIn.nioBufferSize(); + long expectedWrittenBytes = in.nioBufferSize(); final SocketChannel ch = javaChannel(); long writtenBytes = 0; @@ -291,38 +292,13 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. - - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes < writtenBytes) { - nioIn.progress(readableBytes); - nioIn.remove(); - writtenBytes -= readableBytes; - } else if (readableBytes > writtenBytes) { - buf.readerIndex(readerIndex + (int) writtenBytes); - nioIn.progress(writtenBytes); - break; - } else { // readableBytes == writtenBytes - nioIn.progress(readableBytes); - nioIn.remove(); - break; - } - } - + in.removeBytes(writtenBytes); incompleteWrite(setOpWrite); break; } } } - @Override - protected ChannelOutboundBuffer newOutboundBuffer() { - return NioSocketChannelOutboundBuffer.newInstance(this); - } - private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java deleted file mode 100644 index ca699186cf..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBuffer.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Written by Josh Bloch of Google Inc. and released to the public domain, - * as explained at http://creativecommons.org/publicdomain/zero/1.0/. - */ -package io.netty.channel.socket.nio; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.AbstractChannel; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.util.Recycler; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -/** - * Special {@link ChannelOutboundBuffer} implementation which allows to also access flushed {@link ByteBuffer} to - * allow efficent gathering writes. - */ -public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer { - - private ByteBuffer[] nioBuffers; - private int nioBufferCount; - private long nioBufferSize; - - private static final Recycler RECYCLER = - new Recycler() { - @Override - protected NioSocketChannelOutboundBuffer newObject(Handle handle) { - return new NioSocketChannelOutboundBuffer(handle); - } - }; - - /** - * Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given {@link AbstractChannel} - */ - public static NioSocketChannelOutboundBuffer newInstance(AbstractChannel channel) { - NioSocketChannelOutboundBuffer buffer = RECYCLER.get(); - buffer.channel = channel; - return buffer; - } - - private NioSocketChannelOutboundBuffer(Recycler.Handle handle) { - super(handle); - nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; - } - - /** - * Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation - * will do the conversation itself and we can do a better job here. - */ - @Override - protected Object beforeAdd(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!buf.isDirect()) { - return copyToDirectByteBuf(buf); - } - } - return msg; - } - - /** - * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. - * {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and - * {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number - * of readable bytes of the NIO buffers respectively. - *

- * Note that the returned array is reused and thus should not escape - * {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}. - * Refer to {@link io.netty.channel.socket.nio.NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. - *

- */ - public ByteBuffer[] nioBuffers() { - long nioBufferSize = 0; - int nioBufferCount = 0; - final Entry[] buffer = entries(); - final int mask = entryMask(); - ByteBuffer[] nioBuffers = this.nioBuffers; - Object m; - int unflushed = unflushed(); - int i = flushed(); - while (i != unflushed && (m = buffer[i].msg()) != null) { - if (!(m instanceof ByteBuf)) { - // Just break out of the loop as we can still use gathering writes for the buffers that we - // found by now. - break; - } - - NioEntry entry = (NioEntry) buffer[i]; - - if (!entry.isCancelled()) { - ByteBuf buf = (ByteBuf) m; - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes > 0) { - nioBufferSize += readableBytes; - int count = entry.count; - if (count == -1) { - //noinspection ConstantValueVariableUse - entry.count = count = buf.nioBufferCount(); - } - int neededSpace = nioBufferCount + count; - if (neededSpace > nioBuffers.length) { - this.nioBuffers = nioBuffers = - expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); - } - if (count == 1) { - ByteBuffer nioBuf = entry.buf; - if (nioBuf == null) { - // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a - // derived buffer - entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); - } - nioBuffers[nioBufferCount ++] = nioBuf; - } else { - ByteBuffer[] nioBufs = entry.buffers; - if (nioBufs == null) { - // cached ByteBuffers as they may be expensive to create in terms - // of Object allocation - entry.buffers = nioBufs = buf.nioBuffers(); - } - nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); - } - } - } - - i = i + 1 & mask; - } - this.nioBufferCount = nioBufferCount; - this.nioBufferSize = nioBufferSize; - - return nioBuffers; - } - - private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) { - for (ByteBuffer nioBuf: nioBufs) { - if (nioBuf == null) { - break; - } - nioBuffers[nioBufferCount ++] = nioBuf; - } - return nioBufferCount; - } - - private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { - int newCapacity = array.length; - do { - // double capacity until it is big enough - // See https://github.com/netty/netty/issues/1890 - newCapacity <<= 1; - - if (newCapacity < 0) { - throw new IllegalStateException(); - } - - } while (neededSpace > newCapacity); - - ByteBuffer[] newArray = new ByteBuffer[newCapacity]; - System.arraycopy(array, 0, newArray, 0, size); - - return newArray; - } - - /** - * Return the number of {@link java.nio.ByteBuffer} which can be written. - */ - public int nioBufferCount() { - return nioBufferCount; - } - - /** - * Return the number of bytes that can be written via gathering writes. - */ - public long nioBufferSize() { - return nioBufferSize; - } - - @Override - public void recycle() { - // take care of recycle the ByteBuffer[] structure. - if (nioBuffers.length > INITIAL_CAPACITY) { - nioBuffers = new ByteBuffer[INITIAL_CAPACITY]; - } else { - // null out the nio buffers array so the can be GC'ed - // https://github.com/netty/netty/issues/1763 - Arrays.fill(nioBuffers, null); - } - super.recycle(); - } - - @Override - protected NioEntry newEntry() { - return new NioEntry(); - } - - protected static final class NioEntry extends Entry { - ByteBuffer[] buffers; - ByteBuffer buf; - int count = -1; - - @Override - public void clear() { - buffers = null; - buf = null; - count = -1; - super.clear(); - } - - @Override - public int cancel() { - buffers = null; - buf = null; - count = -1; - return super.cancel(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index f8e96e6e84..07189de5a8 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -61,6 +60,12 @@ public class OioDatagramChannel extends AbstractOioMessageChannel private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); private static final ChannelMetadata METADATA = new ChannelMetadata(true); + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " + + StringUtil.simpleClassName(AddressedEnvelope.class) + '<' + + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(SocketAddress.class) + ">, " + + StringUtil.simpleClassName(ByteBuf.class) + ')'; private final MulticastSocket socket; private final DatagramChannelConfig config; @@ -241,28 +246,19 @@ public class OioDatagramChannel extends AbstractOioMessageChannel break; } - final Object m; final ByteBuf data; final SocketAddress remoteAddress; if (o instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") - AddressedEnvelope envelope = (AddressedEnvelope) o; + AddressedEnvelope envelope = (AddressedEnvelope) o; remoteAddress = envelope.recipient(); - m = envelope.content(); + data = envelope.content(); } else { - m = o; + data = (ByteBuf) o; remoteAddress = null; } - if (m instanceof ByteBufHolder) { - data = ((ByteBufHolder) m).content(); - } else if (m instanceof ByteBuf) { - data = (ByteBuf) m; - } else { - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); - } - - int length = data.readableBytes(); + final int length = data.readableBytes(); if (remoteAddress != null) { tmpPacket.setSocketAddress(remoteAddress); } @@ -285,6 +281,24 @@ public class OioDatagramChannel extends AbstractOioMessageChannel } } + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof DatagramPacket || msg instanceof ByteBuf) { + return msg; + } + + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope e = (AddressedEnvelope) msg; + if (e.content() instanceof ByteBuf) { + return msg; + } + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + @Override public ChannelFuture joinGroup(InetAddress multicastAddress) { return joinGroup(multicastAddress, newPromise()); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index ed2e019f0e..9574459d33 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -174,6 +174,11 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel throw new UnsupportedOperationException(); } + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + @Override protected void doConnect( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { diff --git a/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java similarity index 50% rename from transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java rename to transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index d42100ceb6..296a0e7d13 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -13,27 +13,25 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.channel.socket.nio; +package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import io.netty.channel.AbstractChannel; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import org.junit.Test; +import java.net.SocketAddress; import java.nio.ByteBuffer; import static io.netty.buffer.Unpooled.*; import static org.junit.Assert.*; -public class NioSocketChannelOutboundBufferTest { +public class ChannelOutboundBufferTest { @Test public void testEmptyNioBuffers() { - AbstractChannel channel = new EmbeddedChannel(); - NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); + TestChannel channel = new TestChannel(); + ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel); assertEquals(0, buffer.nioBufferCount()); ByteBuffer[] buffers = buffer.nioBuffers(); assertNotNull(buffers); @@ -46,31 +44,22 @@ public class NioSocketChannelOutboundBufferTest { @Test public void testNioBuffersSingleBacked() { - AbstractChannel channel = new EmbeddedChannel(); - NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); - assertEquals(0, buffer.nioBufferCount()); - ByteBuffer[] buffers = buffer.nioBuffers(); - assertNotNull(buffers); - for (ByteBuffer b: buffers) { - assertNull(b); - } + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel); assertEquals(0, buffer.nioBufferCount()); ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes()); - buffer.addMessage(buf, channel.voidPromise()); - buffers = buffer.nioBuffers(); + buffer.addMessage(buf, buf.readableBytes(), channel.voidPromise()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); - for (ByteBuffer b: buffers) { - assertNull(b); - } buffer.addFlush(); - buffers = buffer.nioBuffers(); + ByteBuffer[] buffers = buffer.nioBuffers(); assertNotNull(buffers); assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount()); - for (int i = 0; i < buffers.length; i++) { + for (int i = 0; i < buffer.nioBufferCount(); i++) { if (i == 0) { - assertEquals(buffers[0], nioBuf); + assertEquals(buffers[i], nioBuf); } else { assertNull(buffers[i]); } @@ -80,24 +69,20 @@ public class NioSocketChannelOutboundBufferTest { @Test public void testNioBuffersExpand() { - AbstractChannel channel = new EmbeddedChannel(); - NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); for (int i = 0; i < 64; i++) { - buffer.addMessage(buf.copy(), channel.voidPromise()); + buffer.addMessage(buf.copy(), buf.readableBytes(), channel.voidPromise()); } - ByteBuffer[] nioBuffers = buffer.nioBuffers(); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); - for (ByteBuffer b: nioBuffers) { - assertNull(b); - } buffer.addFlush(); - nioBuffers = buffer.nioBuffers(); - assertEquals(64, nioBuffers.length); + ByteBuffer[] buffers = buffer.nioBuffers(); assertEquals(64, buffer.nioBufferCount()); - for (ByteBuffer nioBuf: nioBuffers) { - assertEquals(nioBuf, buf.internalNioBuffer(0, buf.readableBytes())); + for (int i = 0; i < buffer.nioBufferCount(); i++) { + assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); } release(buffer); buf.release(); @@ -105,26 +90,22 @@ public class NioSocketChannelOutboundBufferTest { @Test public void testNioBuffersExpand2() { - AbstractChannel channel = new EmbeddedChannel(); - NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); + TestChannel channel = new TestChannel(); + + ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel); CompositeByteBuf comp = compositeBuffer(256); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); for (int i = 0; i < 65; i++) { comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes()); } - buffer.addMessage(comp, channel.voidPromise()); + buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise()); - ByteBuffer[] buffers = buffer.nioBuffers(); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); - for (ByteBuffer b: buffers) { - assertNull(b); - } buffer.addFlush(); - buffers = buffer.nioBuffers(); - assertEquals(128, buffers.length); + ByteBuffer[] buffers = buffer.nioBuffers(); assertEquals(65, buffer.nioBufferCount()); - for (int i = 0; i < buffers.length; i++) { + for (int i = 0; i < buffer.nioBufferCount(); i++) { if (i < 65) { assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); } else { @@ -142,4 +123,84 @@ public class NioSocketChannelOutboundBufferTest { } } } + + private static final class TestChannel extends AbstractChannel { + private final ChannelConfig config = new DefaultChannelConfig(this); + + TestChannel() { + super(null); + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new TestUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return false; + } + + @Override + protected SocketAddress localAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected SocketAddress remoteAddress0() { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doClose() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doBeginRead() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public ChannelMetadata metadata() { + throw new UnsupportedOperationException(); + } + + final class TestUnsafe extends AbstractUnsafe { + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + throw new UnsupportedOperationException(); + } + } + } }