Port ChannelOutboundBuffer and related changes from 4.0

Motivation:

We did various changes related to the ChannelOutboundBuffer in 4.0 branch. This commit port all of them over and so make sure our branches are synced in terms of these changes.

Related to [#2734], [#2709], [#2729], [#2710] and [#2693] .

Modification:
Port all changes that was done on the ChannelOutboundBuffer.

This includes the port of the following commits:
 - 73dfd7c01b
 - 997d8c32d2
 - e282e504f1
 - 5e5d1a58fd
 - 8ee3575e72
 - d6f0d12a86
 - 16e50765d1
 - 3f3e66c31a

Result:
 - Less memory usage by ChannelOutboundBuffer
 - Same code as in 4.0 branch
 - Make it possible to use ChannelOutboundBuffer with Channel implementation that not extends AbstractChannel
This commit is contained in:
Norman Maurer 2014-08-05 14:24:49 +02:00
parent 2258b32549
commit 4a3ef90381
29 changed files with 1196 additions and 1165 deletions

View File

@ -16,6 +16,8 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -42,6 +44,8 @@ public final class ByteBufUtil {
static final ByteBufAllocator DEFAULT_ALLOCATOR; static final ByteBufAllocator DEFAULT_ALLOCATOR;
private static final int THREAD_LOCAL_BUFFER_SIZE;
static { static {
final char[] DIGITS = "0123456789abcdef".toCharArray(); final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i ++) { for (int i = 0; i < 256; i ++) {
@ -49,9 +53,7 @@ public final class ByteBufUtil {
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F]; HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
} }
String allocType = SystemPropertyUtil.get( String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc; ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) { if ("unpooled".equals(allocType)) {
@ -66,6 +68,9 @@ public final class ByteBufUtil {
} }
DEFAULT_ALLOCATOR = alloc; DEFAULT_ALLOCATOR = alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
} }
/** /**
@ -414,5 +419,89 @@ public final class ByteBufUtil {
return dst.flip().toString(); return dst.flip().toString();
} }
/**
* Returns a cached thread-local direct buffer, if available.
*
* @return a cached thread-local direct buffer, if available. {@code null} otherwise.
*/
public static ByteBuf threadLocalDirectBuffer() {
if (THREAD_LOCAL_BUFFER_SIZE <= 0) {
return null;
}
if (PlatformDependent.hasUnsafe()) {
return ThreadLocalUnsafeDirectByteBuf.newInstance();
} else {
return ThreadLocalDirectByteBuf.newInstance();
}
}
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};
static ThreadLocalUnsafeDirectByteBuf newInstance() {
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalUnsafeDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
@Override
protected ThreadLocalDirectByteBuf newObject(Handle handle) {
return new ThreadLocalDirectByteBuf(handle);
}
};
static ThreadLocalDirectByteBuf newInstance() {
ThreadLocalDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
private ByteBufUtil() { } private ByteBufUtil() { }
} }

View File

@ -39,7 +39,7 @@ public final class ReferenceCountUtil {
} }
/** /**
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}. * Try to call {@link ReferenceCounted#retain(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -87,7 +87,7 @@ public final class ReferenceCountUtil {
} }
/** /**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}. * Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing. * If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/ */
public static boolean release(Object msg, int decrement) { public static boolean release(Object msg, int decrement) {
@ -97,6 +97,38 @@ public final class ReferenceCountUtil {
return false; return false;
} }
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release()}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use {@link #release(Object)}
* instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg) {
try {
release(msg);
} catch (Throwable t) {
logger.warn("Failed to release a message: {}", msg, t);
}
}
/**
* Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release(int)}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use
* {@link #release(Object, int)} instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg, int decrement) {
try {
release(msg, decrement);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to release a message: {} (decrement: {})", msg, decrement, t);
}
}
}
/** /**
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is * Schedules the specified object to be released when the caller thread terminates. Note that this operation is
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the * intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the

View File

@ -15,10 +15,15 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -98,6 +103,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override @Override
protected void doBeginRead() throws Exception { protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true;
if ((flags & readFlag) == 0) { if ((flags & readFlag) == 0) {
flags |= readFlag; flags |= readFlag;
modifyEvents(); modifyEvents();
@ -159,6 +167,47 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override @Override
protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract AbstractEpollUnsafe newUnsafe();
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf);
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method.
*/
protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf == null) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
final ByteBuf directBuf = alloc.directBuffer(capacity);
directBuf.writeBytes(buf, buf.readerIndex(), capacity);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
protected static void checkResolvable(InetSocketAddress addr) { protected static void checkResolvable(InetSocketAddress addr) {
if (addr.isUnresolved()) { if (addr.isUnresolved()) {
throw new UnresolvedAddressException(); throw new UnresolvedAddressException();
@ -180,13 +229,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// NOOP // NOOP
} }
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override @Override
protected void flush0() { protected void flush0() {
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.

View File

@ -1,102 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain a {@link IovArray}
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
* GC pressure a lot.
*/
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
@Override
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
return new EpollChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel}
*/
static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) {
EpollChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
super(handle);
}
/**
* Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this
* {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress()) {
return copyToDirectByteBuf(buf);
}
}
return msg;
}
/**
* Returns a {@link IovArray} if the currently pending messages.
* <p>
* Note that the returned {@link IovArray} is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
*/
IovArray iovArray() {
IovArray array = IovArray.get();
final Entry[] buffer = entries();
final int mask = entryMask();
int unflushed = unflushed();
int flushed = flushed();
Object m;
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
// found by now.
break;
}
Entry entry = buffer[flushed];
// Check if the entry was cancelled. if so we just skip it.
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
if (!array.add(buf)) {
// Can not hold more data so break here.
// We will handle this on the next write loop.
break;
}
}
flushed = flushed + 1 & mask;
}
return array;
}
}

View File

@ -16,13 +16,14 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
@ -44,6 +45,12 @@ import java.nio.channels.NotYetConnectedException;
*/ */
public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel { public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(true); private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private volatile InetSocketAddress local; private volatile InetSocketAddress local;
private volatile InetSocketAddress remote; private volatile InetSocketAddress remote;
@ -282,27 +289,20 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
} }
private boolean doWriteMessage(Object msg) throws IOException { private boolean doWriteMessage(Object msg) throws IOException {
final Object m; final ByteBuf data;
InetSocketAddress remoteAddress; InetSocketAddress remoteAddress;
ByteBuf data; if (msg instanceof AddressedEnvelope) {
if (msg instanceof DatagramPacket) { @SuppressWarnings("unchecked")
DatagramPacket packet = (DatagramPacket) msg; AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
remoteAddress = packet.recipient(); (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
m = packet.content(); data = envelope.content();
remoteAddress = envelope.recipient();
} else { } else {
m = msg; data = (ByteBuf) msg;
remoteAddress = null; remoteAddress = null;
} }
if (m instanceof ByteBufHolder) { final int dataLen = data.readableBytes();
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
if (dataLen == 0) { if (dataLen == 0) {
return true; return true;
} }
@ -324,19 +324,62 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(), writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort()); remoteAddress.getAddress(), remoteAddress.getPort());
} }
return writtenBytes > 0; return writtenBytes > 0;
} }
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
ByteBuf content = (ByteBuf) e.content();
if (content.hasMemoryAddress()) {
return e;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override @Override
public EpollDatagramChannelConfig config() { public EpollDatagramChannelConfig config() {
return config; return config;
} }
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return EpollDatagramChannelOutboundBuffer.newInstance(this);
}
@Override @Override
protected void doDisconnect() throws Exception { protected void doDisconnect() throws Exception {
connected = false; connected = false;

View File

@ -1,63 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
final class EpollDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<EpollDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<EpollDatagramChannelOutboundBuffer>() {
@Override
protected EpollDatagramChannelOutboundBuffer newObject(Handle<EpollDatagramChannelOutboundBuffer> handle) {
return new EpollDatagramChannelOutboundBuffer(handle);
}
};
static EpollDatagramChannelOutboundBuffer newInstance(EpollDatagramChannel channel) {
EpollDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollDatagramChannelOutboundBuffer(Recycler.Handle<EpollDatagramChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (isCopyNeeded(content)) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
} else if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isCopyNeeded(buf)) {
msg = copyToDirectByteBuf((ByteBuf) msg);
}
}
return msg;
}
private static boolean isCopyNeeded(ByteBuf content) {
return !content.hasMemoryAddress() || content.nioBufferCount() != 1;
}
}

View File

@ -74,7 +74,12 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -32,7 +32,6 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelOutboundBuffer;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -50,6 +49,10 @@ import java.util.concurrent.TimeUnit;
*/ */
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private final EpollSocketChannelConfig config; private final EpollSocketChannelConfig config;
/** /**
@ -111,6 +114,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove(); in.remove();
return true; return true;
} }
boolean done = false; boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
@ -132,7 +136,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
updateOutboundBuffer(in, writtenBytes, 1, done);
in.removeBytes(writtenBytes);
return done; return done;
} else if (buf.nioBufferCount() == 1) { } else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
@ -154,23 +159,27 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
updateOutboundBuffer(in, writtenBytes, 1, done);
in.removeBytes(writtenBytes);
return done; return done;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
} }
} }
private boolean writeBytesMultiple( private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
EpollChannelOutboundBuffer in, IovArray array) throws IOException {
boolean done = false;
long expectedWrittenBytes = array.size(); long expectedWrittenBytes = array.size();
int cnt = array.count(); int cnt = array.count();
assert expectedWrittenBytes != 0;
assert cnt != 0;
boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + cnt; int end = offset + cnt;
int messages = cnt;
for (;;) { for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
@ -200,92 +209,55 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} while (offset < end && localWrittenBytes > 0); } while (offset < end && localWrittenBytes > 0);
} }
updateOutboundBuffer(in, writtenBytes, messages, done); in.removeBytes(writtenBytes);
return done; return done;
} }
private boolean writeBytesMultiple( private boolean writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException { int nioBufferCnt, long expectedWrittenBytes) throws IOException {
assert expectedWrittenBytes != 0;
boolean done = false; boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + nioBufferCnt; int end = offset + nioBufferCnt;
loop: while (nioBufferCnt > 0) { for (;;) {
for (;;) { long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); setEpollOut();
if (localWrittenBytes == 0) { break;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break loop;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
} }
updateOutboundBuffer(in, writtenBytes, msgCount, done);
in.removeBytes(writtenBytes);
return done; return done;
} }
private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
boolean done) {
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
in.progress(buf.readableBytes());
in.remove();
}
in.progress(writtenBytes);
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
}
}
/** /**
* Write a {@link DefaultFileRegion} * Write a {@link DefaultFileRegion}
* *
@ -293,6 +265,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @return amount the amount of written bytes * @return amount the amount of written bytes
*/ */
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
if (region.transfered() >= region.count()) {
in.remove();
return true;
}
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
@ -331,69 +308,102 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
// Do gathering write if: // Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
// * the outbound buffer contains more than one messages and if (msgCount > 1 && in.current() instanceof ByteBuf) {
// * they are all buffers rather than a file region. if (!doWriteMultiple(in)) {
if (msgCount >= 1) {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to EpollChannelOutboundBuffer and write the IovArray directly.
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
IovArray array = epollIn.iovArray();
int cnt = array.count();
if (cnt > 1) {
if (!writeBytesMultiple(epollIn, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
} else {
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only.
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
int nioBufferCnt = nioIn.nioBufferCount();
if (nioBufferCnt > 1) {
if (!writeBytesMultiple(nioIn, msgCount, nioBuffers, nioBufferCnt, nioIn.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}
}
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break; break;
} }
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg; // We do not break the loop here even if the outbound buffer was flushed completely,
if (!writeFileRegion(in, region)) { // because a user might have triggered another write and flush when we notify his or her
// was not able to write everything so break here we will get notified later again once // listeners.
// the network stack can handle more writes. } else { // msgCount == 1
if (!doWriteSingle(in)) {
break; break;
} }
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
} }
} }
} }
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
// Should never reach here.
throw new Error();
}
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArray.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
}
return true;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
return buf;
}
if (msg instanceof DefaultFileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override @Override
public EpollSocketChannelConfig config() { public EpollSocketChannelConfig config() {
return config; return config;
@ -755,16 +765,4 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
} }
} }
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
if (PlatformDependent.hasUnsafe()) {
// This means we will be able to access the memory addresses directly and so be able to do
// gathering writes with the AddressEntry.
return EpollChannelOutboundBuffer.newInstance(this);
} else {
// No access to the memoryAddres, so fallback to use ByteBuffer[] for gathering writes.
return NioSocketChannelOutboundBuffer.newInstance(this);
}
}
} }

View File

@ -16,6 +16,8 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -23,7 +25,7 @@ import io.netty.util.internal.PlatformDependent;
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more * Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
* array copies. * array copies.
* *
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>. * The buffers are written out directly into direct memory to match the struct iov. See also {@code man writev}.
* *
* <pre> * <pre>
* struct iovec { * struct iovec {
@ -33,19 +35,24 @@ import io.netty.util.internal.PlatformDependent;
* </pre> * </pre>
* *
* See also * See also
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/"> * <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/"
* Efficient JNI programming IV: Wrapping native data objects</a>. * >Efficient JNI programming IV: Wrapping native data objects</a>.
*/ */
final class IovArray { final class IovArray implements MessageProcessor {
// Maximal number of struct iov entries that can be passed to writev(...)
private static final int IOV_MAX = Native.IOV_MAX; /** The size of an address which should be 8 for 64 bits and 4 for 32 bits. */
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
private static final int ADDRESS_SIZE = PlatformDependent.addressSize(); private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
// address. /**
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
* address.
*/
private static final int IOV_SIZE = 2 * ADDRESS_SIZE; private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
// The needed memory to hold up to IOV_MAX iov entries.
private static final int CAPACITY = IOV_MAX * IOV_SIZE; /** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
*/
private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE;
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() { private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override @Override
@ -72,17 +79,26 @@ final class IovArray {
* Try to add the given {@link ByteBuf}. Returns {@code true} on success, * Try to add the given {@link ByteBuf}. Returns {@code true} on success,
* {@code false} otherwise. * {@code false} otherwise.
*/ */
boolean add(ByteBuf buf) { private boolean add(ByteBuf buf) {
if (count == IOV_MAX) { if (count == Native.IOV_MAX) {
// No more room! // No more room!
return false; return false;
} }
int len = buf.readableBytes();
long addr = buf.memoryAddress();
int offset = buf.readerIndex();
long baseOffset = memoryAddress(count++); final int len = buf.readableBytes();
long lengthOffset = baseOffset + ADDRESS_SIZE; if (len == 0) {
// No need to add an empty buffer.
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
// fetching the next buffers.
return true;
}
final long addr = buf.memoryAddress();
final int offset = buf.readerIndex();
final long baseOffset = memoryAddress(count++);
final long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) { if (ADDRESS_SIZE == 8) {
// 64bit // 64bit
PlatformDependent.putLong(baseOffset, addr + offset); PlatformDependent.putLong(baseOffset, addr + offset);
@ -92,6 +108,7 @@ final class IovArray {
PlatformDependent.putInt(baseOffset, (int) addr + offset); PlatformDependent.putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len); PlatformDependent.putInt(lengthOffset, len);
} }
size += len; size += len;
return true; return true;
} }
@ -147,13 +164,19 @@ final class IovArray {
return memoryAddress + IOV_SIZE * offset; return memoryAddress + IOV_SIZE * offset;
} }
@Override
public boolean processMessage(Object msg) throws Exception {
return msg instanceof ByteBuf && add((ByteBuf) msg);
}
/** /**
* Returns a {@link IovArray} which can be filled. * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/ */
static IovArray get() { static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get(); IovArray array = ARRAY.get();
array.size = 0; array.size = 0;
array.count = 0; array.count = 0;
buffer.forEachFlushedMessage(array);
return array; return array;
} }
} }

View File

@ -20,7 +20,7 @@ import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -34,8 +34,8 @@ import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel; import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -303,17 +303,45 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return true; return true;
} }
ByteBuffer nioData = data.nioBuffer(); ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier()); mi.streamNumber(packet.streamIdentifier());
final int writtenBytes = javaChannel().send(nioData, mi); final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0; return writtenBytes > 0;
} }
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
SctpMessage m = (SctpMessage) msg;
ByteBuf buf = m.content();
if (buf.isDirect() && buf.nioBufferCount() == 1) {
return m;
}
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf));
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) +
" (expected: " + StringUtil.simpleClassName(SctpMessage.class));
}
@Override @Override
public ChannelFuture bindAddress(InetAddress localAddress) { public ChannelFuture bindAddress(InetAddress localAddress) {
return bindAddress(localAddress, newPromise()); return bindAddress(localAddress, newPromise());
@ -364,44 +392,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return promise; return promise;
} }
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSctpChannelOutboundBuffer.newInstance(this);
}
static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioSctpChannelOutboundBuffer> RECYCLER =
new Recycler<NioSctpChannelOutboundBuffer>() {
@Override
protected NioSctpChannelOutboundBuffer newObject(Handle<NioSctpChannelOutboundBuffer> handle) {
return new NioSctpChannelOutboundBuffer(handle);
}
};
static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSctpChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSctpChannelOutboundBuffer(Recycler.Handle<NioSctpChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof SctpMessage) {
SctpMessage message = (SctpMessage) msg;
ByteBuf content = message.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new SctpMessage(message.protocolIdentifier(), message.streamIdentifier(), direct);
}
}
return msg;
}
}
private final class NioSctpChannelConfig extends DefaultSctpChannelConfig { private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) { private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
super(channel, javaChannel); super(channel, javaChannel);

View File

@ -221,6 +221,11 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) { private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel); super(channel, javaChannel);

View File

@ -34,6 +34,7 @@ import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel; import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -64,6 +65,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
InternalLoggerFactory.getInstance(OioSctpChannel.class); InternalLoggerFactory.getInstance(OioSctpChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
private final SctpChannel ch; private final SctpChannel ch;
private final SctpChannelConfig config; private final SctpChannelConfig config;
@ -273,6 +275,16 @@ public class OioSctpChannel extends AbstractOioMessageChannel
} }
} }
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
}
@Override @Override
public Association association() { public Association association() {
try { try {

View File

@ -290,6 +290,11 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) { private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel); super(channel, javaChannel);

View File

@ -98,6 +98,11 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override @Override
public boolean isActive() { public boolean isActive() {
return javaChannel().socket().isBound(); return javaChannel().socket().isBound();

View File

@ -24,7 +24,6 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -101,7 +100,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public boolean isWritable() { public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer(); ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.getWritable(); return buf != null && buf.isWritable();
} }
@Override @Override
@ -397,8 +396,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract class AbstractUnsafe implements Unsafe { protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = newOutboundBuffer(); private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private boolean inFlush0; private boolean inFlush0;
@Override @Override
@ -647,7 +645,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
@Override @Override
public void beginRead() { public final void beginRead() {
if (!isActive()) { if (!isActive()) {
return; return;
} }
@ -666,7 +664,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
@Override @Override
public void write(Object msg, ChannelPromise promise) { public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so // If the outboundBuffer is null we know the channel was closed and so
@ -678,11 +676,25 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
return; return;
} }
outboundBuffer.addMessage(msg, promise);
int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
} }
@Override @Override
public void flush() { public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { if (outboundBuffer == null) {
return; return;
@ -729,7 +741,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
@Override @Override
public ChannelPromise voidPromise() { public final ChannelPromise voidPromise() {
return unsafeVoidPromise; return unsafeVoidPromise;
} }
@ -787,13 +799,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
} }
/**
* Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}.
*/
protected ChannelOutboundBuffer newOutboundBuffer() {
return ChannelOutboundBuffer.newInstance(this);
}
/** /**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance. * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/ */
@ -852,12 +857,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception; protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
protected static void checkEOF(FileRegion region) throws IOException { /**
if (region.transfered() < region.count()) { * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
throw new EOFException("Expected to be able to write " * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
+ region.count() + " bytes, but only wrote " */
+ region.transfered()); protected Object filterOutboundMessage(Object msg) throws Exception {
} return msg;
} }
static final class CloseFuture extends DefaultChannelPromise { static final class CloseFuture extends DefaultChannelPromise {

View File

@ -15,8 +15,6 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.ReferenceCountUtil;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
@ -71,24 +69,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected final Object filterOutboundMessage(Object msg) {
throw new UnsupportedOperationException();
}
private final class DefaultServerUnsafe extends AbstractUnsafe { private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void write(Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
reject(promise);
}
@Override
public void flush() {
// ignore
}
@Override @Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
reject(promise);
}
private void reject(ChannelPromise promise) {
safeSetFailure(promise, new UnsupportedOperationException()); safeSetFailure(promise, new UnsupportedOperationException());
} }
} }

View File

@ -13,24 +13,22 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder; import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -38,56 +36,48 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/** /**
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
* outbound write requests. * outbound write requests.
*
* All the methods should only be called by the {@link EventLoop} of the {@link Channel}.
*/ */
public class ChannelOutboundBuffer { public final class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
protected static final int INITIAL_CAPACITY = private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
SystemPropertyUtil.getInt("io.netty.outboundBufferInitialCapacity", 4);
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.outboundBufferInitialCapacity: {}", INITIAL_CAPACITY);
}
}
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override @Override
protected ChannelOutboundBuffer newObject(Handle<ChannelOutboundBuffer> handle) { protected ByteBuffer[] initialValue() throws Exception {
return new ChannelOutboundBuffer(handle); return new ByteBuffer[1024];
} }
}; };
/** private final Channel channel;
* Get a new instance of this {@link ChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private final Handle<? extends ChannelOutboundBuffer> handle; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
protected AbstractChannel channel; // The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
// A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The // The Entry which is the first unflushed in the linked-list structure
// flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range private Entry unflushedEntry;
// [unflushed, tail). // The Entry which represents the tail of the buffer
private Entry[] buffer; private Entry tailEntry;
// The number of flushed entries that are not written yet
private int flushed; private int flushed;
private int unflushed;
private int tail; private int nioBufferCount;
private long nioBufferSize;
private boolean inFail; private boolean inFail;
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER; private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;
@SuppressWarnings("unused")
private volatile long totalPendingSize; private volatile long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER;
@SuppressWarnings("FieldMayBeFinal")
private volatile int writable = 1;
static { static {
AtomicIntegerFieldUpdater<ChannelOutboundBuffer> writableUpdater = AtomicIntegerFieldUpdater<ChannelOutboundBuffer> writableUpdater =
PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable"); PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable");
@ -104,46 +94,26 @@ public class ChannelOutboundBuffer {
TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater; TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater;
} }
private volatile int writable = 1; ChannelOutboundBuffer(AbstractChannel channel) {
this.channel = channel;
protected ChannelOutboundBuffer(Handle<? extends ChannelOutboundBuffer> handle) {
this.handle = handle;
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = newEntry();
}
} }
/** /**
* Return the array of {@link Entry}'s which hold the pending write requests in an circular array. * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/ */
protected final Entry[] entries() { public void addMessage(Object msg, int size, ChannelPromise promise) {
return buffer; Entry entry = Entry.newInstance(msg, size, total(msg), promise);
} if (tailEntry == null) {
flushedEntry = null;
/** tailEntry = entry;
* Add the given message to this {@link ChannelOutboundBuffer} so it will be marked as flushed once } else {
* {@link #addFlush()} was called. The {@link ChannelPromise} will be notified once the write operations Entry tail = tailEntry;
* completes. tail.next = entry;
*/ tailEntry = entry;
public final void addMessage(Object msg, ChannelPromise promise) {
msg = beforeAdd(msg);
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
} }
if (unflushedEntry == null) {
Entry e = buffer[tail++]; unflushedEntry = entry;
e.msg = msg;
e.pendingSize = size;
e.promise = promise;
e.total = total(msg);
tail &= buffer.length - 1;
if (tail == flushed) {
addCapacity();
} }
// increment pending bytes after adding message to the unflushed arrays. // increment pending bytes after adding message to the unflushed arrays.
@ -152,62 +122,32 @@ public class ChannelOutboundBuffer {
} }
/** /**
* Is called before the message is actually added to the {@link ChannelOutboundBuffer} and so allow to * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* convert it to a different format. Sub-classes may override this. * and so you will be able to handle them.
*/ */
protected Object beforeAdd(Object msg) { public void addFlush() {
return msg;
}
/**
* Expand internal array which holds the {@link Entry}'s.
*/
private void addCapacity() {
int p = flushed;
int n = buffer.length;
int r = n - p; // number of elements to the right of p
int s = size();
int newCapacity = n << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
Entry[] e = new Entry[newCapacity];
System.arraycopy(buffer, p, e, 0, r);
System.arraycopy(buffer, 0, e, r, p);
for (int i = n; i < e.length; i++) {
e[i] = newEntry();
}
buffer = e;
flushed = 0;
unflushed = s;
tail = n;
}
/**
* Mark all messages in this {@link ChannelOutboundBuffer} as flushed.
*/
public final void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages // There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime. // where added in the meantime.
// //
// See https://github.com/netty/netty/issues/2577 // See https://github.com/netty/netty/issues/2577
if (unflushed != tail) { Entry entry = unflushedEntry;
unflushed = tail; if (entry != null) {
if (flushedEntry == null) {
final int mask = buffer.length - 1; // there is no flushedEntry yet, so start with the entry
int i = flushed; flushedEntry = entry;
while (i != unflushed && buffer[i].msg != null) { }
Entry entry = buffer[i]; do {
flushed ++;
if (!entry.promise.setUncancellable()) { if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes // Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel(); int pending = entry.cancel();
decrementPendingOutboundBytes(pending); decrementPendingOutboundBytes(pending);
} }
i = i + 1 & mask; entry = entry.next;
} } while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
} }
} }
@ -215,11 +155,8 @@ public class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point. * Increment the pending bytes which will be written at some point.
* This method is thread-safe! * This method is thread-safe!
*/ */
final void incrementPendingOutboundBytes(int size) { void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets if (size == 0) {
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
return; return;
} }
@ -235,11 +172,8 @@ public class ChannelOutboundBuffer {
* Decrement the pending bytes which will be written at some point. * Decrement the pending bytes which will be written at some point.
* This method is thread-safe! * This method is thread-safe!
*/ */
final void decrementPendingOutboundBytes(int size) { void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets if (size == 0) {
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
return; return;
} }
@ -265,20 +199,23 @@ public class ChannelOutboundBuffer {
} }
/** /**
* Return current message or {@code null} if no flushed message is left to process. * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
*/ */
public final Object current() { public Object current() {
if (isEmpty()) { Entry entry = flushedEntry;
if (entry == null) {
return null; return null;
} else {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = buffer[flushed];
return entry.msg;
} }
return entry.msg;
} }
public final void progress(long amount) { /**
Entry e = buffer[flushed]; * Notify the {@link ChannelPromise} of the current message about writing progress.
*/
public void progress(long amount) {
Entry e = flushedEntry;
assert e != null;
ChannelPromise p = e.promise; ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) { if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount; long progress = e.progress + amount;
@ -288,93 +225,237 @@ public class ChannelOutboundBuffer {
} }
/** /**
* Mark the current message as successful written and remove it from this {@link ChannelOutboundBuffer}. * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
* This method will return {@code true} if there are more messages left to process, {@code false} otherwise. * flushed message exists at the time this method is called it will return {@code false} to signal that no more
* messages are ready to be handled.
*/ */
public final boolean remove() { public boolean remove() {
if (isEmpty()) { Entry e = flushedEntry;
if (e == null) {
return false; return false;
} }
Entry e = buffer[flushed];
Object msg = e.msg; Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise; ChannelPromise promise = e.promise;
int size = e.pendingSize; int size = e.pendingSize;
e.clear(); removeEntry(e);
flushed = flushed + 1 & buffer.length - 1;
if (!e.cancelled) { if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before. // only release message, notify and decrement if it was not canceled before.
safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise); safeSuccess(promise);
decrementPendingOutboundBytes(size); decrementPendingOutboundBytes(size);
} }
// recycle the entry
e.recycle();
return true; return true;
} }
/** /**
* Mark the current message as failure with the given {@link java.lang.Throwable} and remove it from this * Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
* {@link ChannelOutboundBuffer}. This method will return {@code true} if there are more messages left to process, * and return {@code true}. If no flushed message exists at the time this method is called it will return
* {@code false} otherwise. * {@code false} to signal that no more messages are ready to be handled.
*/ */
public final boolean remove(Throwable cause) { public boolean remove(Throwable cause) {
if (isEmpty()) { Entry e = flushedEntry;
if (e == null) {
return false; return false;
} }
Entry e = buffer[flushed];
Object msg = e.msg; Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise; ChannelPromise promise = e.promise;
int size = e.pendingSize; int size = e.pendingSize;
e.clear(); removeEntry(e);
flushed = flushed + 1 & buffer.length - 1;
if (!e.cancelled) { if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before. // only release message, fail and decrement if it was not canceled before.
safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
safeFail(promise, cause); safeFail(promise, cause);
decrementPendingOutboundBytes(size); decrementPendingOutboundBytes(size);
} }
// recycle the entry
e.recycle();
return true; return true;
} }
final boolean getWritable() { private void removeEntry(Entry e) {
if (-- flushed == 0) {
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
public void removeBytes(long writtenBytes) {
for (;;) {
final ByteBuf buf = (ByteBuf) current();
if (buf == null) {
break;
}
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
* array and the total number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public long nioBufferSize() {
return nioBufferSize;
}
boolean isWritable() {
return writable != 0; return writable != 0;
} }
/** /**
* Return the number of messages that are ready to be written (flushed before). * Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
*/ */
public final int size() { public int size() {
return unflushed - flushed & buffer.length - 1; return flushed;
} }
/** /**
* Return {@code true} if this {@link ChannelOutboundBuffer} contains no flushed messages * Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
* otherwise.
*/ */
public final boolean isEmpty() { public boolean isEmpty() {
return unflushed == flushed; return flushed == 0;
} }
/** void failFlushed(Throwable cause) {
* Fail all previous flushed messages with the given {@link Throwable}.
*/
final void failFlushed(Throwable cause) {
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the // Make sure that this method does not reenter. A listener added to the current promise can be notified by the
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
// indirectly (usually by closing the channel.) // indirectly (usually by closing the channel.)
@ -396,10 +477,7 @@ public class ChannelOutboundBuffer {
} }
} }
/** void close(final ClosedChannelException cause) {
* Fail all pending messages with the given {@link ClosedChannelException}.
*/
final void close(final ClosedChannelException cause) {
if (inFail) { if (inFail) {
channel.eventLoop().execute(new Runnable() { channel.eventLoop().execute(new Runnable() {
@Override @Override
@ -421,140 +499,95 @@ public class ChannelOutboundBuffer {
} }
// Release all unflushed messages. // Release all unflushed messages.
final int unflushedCount = tail - unflushed & buffer.length - 1;
try { try {
for (int i = 0; i < unflushedCount; i++) { Entry e = unflushedEntry;
Entry e = buffer[unflushed + i & buffer.length - 1]; while (e != null) {
// Just decrease; do not trigger any events via decrementPendingOutboundBytes() // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = e.pendingSize; int size = e.pendingSize;
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
e.pendingSize = 0;
if (!e.cancelled) { if (!e.cancelled) {
safeRelease(e.msg); ReferenceCountUtil.safeRelease(e.msg);
safeFail(e.promise, cause); safeFail(e.promise, cause);
} }
e.msg = null; e = e.recycleAndGetNext();
e.promise = null;
} }
} finally { } finally {
tail = unflushed;
inFail = false; inFail = false;
} }
recycle();
} }
/**
* Release the message and log if any error happens during release.
*/
protected static void safeRelease(Object message) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable t) {
logger.warn("Failed to release a message.", t);
}
}
/**
* Try to mark the given {@link ChannelPromise} as success and log if this failed.
*/
private static void safeSuccess(ChannelPromise promise) { private static void safeSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise); logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
} }
} }
/**
* Try to mark the given {@link ChannelPromise} as failued with the given {@link Throwable} and log if this failed.
*/
private static void safeFail(ChannelPromise promise, Throwable cause) { private static void safeFail(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
} }
} }
/** @Deprecated
* Recycle this {@link ChannelOutboundBuffer}. After this was called it is disallowed to use it with the previous
* assigned {@link AbstractChannel}.
*/
@SuppressWarnings("unchecked")
public void recycle() { public void recycle() {
if (buffer.length > INITIAL_CAPACITY) { // NOOP
Entry[] e = new Entry[INITIAL_CAPACITY];
System.arraycopy(buffer, 0, e, 0, INITIAL_CAPACITY);
buffer = e;
}
// reset flushed, unflushed and tail
// See https://github.com/netty/netty/issues/1772
flushed = 0;
unflushed = 0;
tail = 0;
// Set the channel to null so it can be GC'ed ASAP
channel = null;
totalPendingSize = 0;
writable = 1;
RECYCLER.recycle(this, (Handle<ChannelOutboundBuffer>) handle);
} }
/** public long totalPendingWriteBytes() {
* Return the total number of pending bytes.
*/
public final long totalPendingWriteBytes() {
return totalPendingSize; return totalPendingSize;
} }
/** /**
* Create a new {@link Entry} to use for the internal datastructure. Sub-classes may override this use a special * Call {@link MessageProcessor#processMessage(Object)} for each flushed message
* sub-class. * in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
* returns {@code false} or there are no more flushed messages to process.
*/ */
protected Entry newEntry() { public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
return new Entry(); if (processor == null) {
} throw new NullPointerException("processor");
/**
* Return the index of the first flushed message.
*/
protected final int flushed() {
return flushed;
}
/**
* Return the index of the first unflushed messages.
*/
protected final int unflushed() {
return unflushed;
}
protected final int entryMask() {
return buffer.length - 1;
}
protected ByteBuf copyToDirectByteBuf(ByteBuf buf) {
int readableBytes = buf.readableBytes();
ByteBufAllocator alloc = channel.alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
safeRelease(buf);
return directBuf;
} }
if (ThreadLocalPooledDirectByteBuf.threadLocalDirectBufferSize > 0) {
ByteBuf directBuf = ThreadLocalPooledDirectByteBuf.newInstance(); Entry entry = flushedEntry;
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); if (entry == null) {
safeRelease(buf); return;
return directBuf;
} }
return buf;
do {
if (!entry.cancelled) {
if (!processor.processMessage(entry.msg)) {
return;
}
}
entry = entry.next;
} while (isFlushedEntry(entry));
} }
protected static class Entry { private boolean isFlushedEntry(Entry e) {
return e != null && e != unflushedEntry;
}
public interface MessageProcessor {
/**
* Will be called for each flushed message until it either there are no more flushed messages or this
* method returns {@code false}.
*/
boolean processMessage(Object msg) throws Exception;
}
static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
private final Handle handle;
Entry next;
Object msg; Object msg;
ByteBuffer[] bufs;
ByteBuffer buf;
ChannelPromise promise; ChannelPromise promise;
long progress; long progress;
long total; long total;
@ -562,43 +595,42 @@ public class ChannelOutboundBuffer {
int count = -1; int count = -1;
boolean cancelled; boolean cancelled;
public Object msg() { private Entry(Handle handle) {
return msg; this.handle = handle;
} }
/** static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
* Return {@code true} if the {@link Entry} was cancelled via {@link #cancel()} before, Entry entry = RECYCLER.get();
* {@code false} otherwise. entry.msg = msg;
*/ entry.pendingSize = size;
public boolean isCancelled() { entry.total = total;
return cancelled; entry.promise = promise;
return entry;
} }
/** int cancel() {
* Cancel this {@link Entry} and the message that was hold by this {@link Entry}. This method returns the
* number of pending bytes for the cancelled message.
*/
public int cancel() {
if (!cancelled) { if (!cancelled) {
cancelled = true; cancelled = true;
int pSize = pendingSize; int pSize = pendingSize;
// release message and replace with an empty buffer // release message and replace with an empty buffer
safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
msg = Unpooled.EMPTY_BUFFER; msg = Unpooled.EMPTY_BUFFER;
pendingSize = 0; pendingSize = 0;
total = 0; total = 0;
progress = 0; progress = 0;
bufs = null;
buf = null;
return pSize; return pSize;
} }
return 0; return 0;
} }
/** void recycle() {
* Clear this {@link Entry} and so release all resources. next = null;
*/ bufs = null;
public void clear() { buf = null;
msg = null; msg = null;
promise = null; promise = null;
progress = 0; progress = 0;
@ -606,6 +638,13 @@ public class ChannelOutboundBuffer {
pendingSize = 0; pendingSize = 0;
count = -1; count = -1;
cancelled = false; cancelled = false;
RECYCLER.recycle(this, handle);
}
Entry recycleAndGetNext() {
Entry next = this.next;
recycle();
return next;
} }
} }
} }

View File

@ -35,6 +35,11 @@ import java.nio.channels.SelectionKey;
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
*/ */
public abstract class AbstractNioByteChannel extends AbstractNioChannel { public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private Runnable flushTask; private Runnable flushTask;
/** /**
@ -247,11 +252,31 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break; break;
} }
} else { } else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); // Should not reach here.
throw new Error();
} }
} }
} }
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
protected final void incompleteWrite(boolean setOpWrite) { protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely. // Did not write completely.
if (setOpWrite) { if (setOpWrite) {

View File

@ -15,6 +15,10 @@
*/ */
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
@ -23,6 +27,8 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException; import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -174,19 +180,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
} }
@Override @Override
public void beginRead() { public final SelectableChannel ch() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
public SelectableChannel ch() {
return javaChannel(); return javaChannel();
} }
@Override @Override
public void connect( public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) { if (!promise.setUncancellable() || !ensureOpen(promise)) {
return; return;
@ -277,7 +276,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
} }
@Override @Override
public void finishConnect() { public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was // Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out. // neither cancelled nor timed out.
@ -306,7 +305,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
} }
@Override @Override
protected void flush0() { protected final void flush0() {
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later, // If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now. // and thus there's no need to call it now.
@ -317,7 +316,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
} }
@Override @Override
public void forceFlush() { public final void forceFlush() {
// directly call super.flush0() to force a flush now // directly call super.flush0() to force a flush now
super.flush0(); super.flush0();
} }
@ -362,6 +361,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override @Override
protected void doBeginRead() throws Exception { protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) { if (inputShutdown) {
return; return;
} }
@ -371,6 +371,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return; return;
} }
readPending = true;
final int interestOps = selectionKey.interestOps(); final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) { if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp); selectionKey.interestOps(interestOps | readInterestOp);
@ -386,4 +388,73 @@ public abstract class AbstractNioChannel extends AbstractChannel {
* Finish the connect * Finish the connect
*/ */
protected abstract void doFinishConnect() throws Exception; protected abstract void doFinishConnect() throws Exception;
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
* Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
* but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is
* too high, but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
if (holder != buf) {
// Ensure to call holder.release() to give the holder a chance to release other resources than its content.
buf.retain();
ReferenceCountUtil.safeRelease(holder);
}
return buf;
}
} }

View File

@ -33,10 +33,14 @@ import java.io.IOException;
* Abstract base class for OIO which reads and writes bytes from/to a Socket * Abstract base class for OIO which reads and writes bytes from/to a Socket
*/ */
public abstract class AbstractOioByteChannel extends AbstractOioChannel { public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
/** /**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
@ -214,6 +218,16 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} }
} }
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf || msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/** /**
* Return the number of bytes ready to read from the underlying Socket. * Return the number of bytes ready to read from the underlying Socket.
*/ */

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -140,6 +141,13 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
} }
} }
private static void checkEOF(FileRegion region) throws IOException {
if (region.transfered() < region.count()) {
throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
"but only wrote " + region.transfered());
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
InputStream is = this.is; InputStream is = this.is;

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope; import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
@ -25,6 +24,7 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
@ -62,6 +62,12 @@ public final class NioDatagramChannel
private static final ChannelMetadata METADATA = new ChannelMetadata(true); private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final DatagramChannelConfig config; private final DatagramChannelConfig config;
@ -257,34 +263,24 @@ public final class NioDatagramChannel
@Override @Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
final Object m;
final SocketAddress remoteAddress; final SocketAddress remoteAddress;
ByteBuf data; final ByteBuf data;
if (msg instanceof AddressedEnvelope) { if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg; AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
remoteAddress = envelope.recipient(); remoteAddress = envelope.recipient();
m = envelope.content(); data = envelope.content();
} else { } else {
m = msg; data = (ByteBuf) msg;
remoteAddress = null; remoteAddress = null;
} }
if (m instanceof ByteBufHolder) { final int dataLen = data.readableBytes();
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
if (dataLen == 0) { if (dataLen == 0) {
return true; return true;
} }
ByteBuffer nioData = data.nioBuffer(); final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
final int writtenBytes; final int writtenBytes;
if (remoteAddress != null) { if (remoteAddress != null) {
writtenBytes = javaChannel().send(nioData, remoteAddress); writtenBytes = javaChannel().send(nioData, remoteAddress);
@ -294,6 +290,49 @@ public final class NioDatagramChannel
return writtenBytes > 0; return writtenBytes > 0;
} }
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf content = p.content();
if (isSingleDirectBuffer(content)) {
return p;
}
return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isSingleDirectBuffer(buf)) {
return buf;
}
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
ByteBuf content = (ByteBuf) e.content();
if (isSingleDirectBuffer(content)) {
return e;
}
return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
* (We check this because otherwise we need to make it a non-composite buffer.)
*/
private static boolean isSingleDirectBuffer(ByteBuf buf) {
return buf.isDirect() && buf.nioBufferCount() == 1;
}
@Override @Override
protected boolean continueOnWriteError() { protected boolean continueOnWriteError() {
// Continue on write error as a DatagramChannel can write to multiple remote peers // Continue on write error as a DatagramChannel can write to multiple remote peers
@ -543,11 +582,6 @@ public final class NioDatagramChannel
return promise; return promise;
} }
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioDatagramChannelOutboundBuffer.newInstance(this);
}
@Override @Override
protected void setReadPending(boolean readPending) { protected void setReadPending(boolean readPending) {
super.setReadPending(readPending); super.setReadPending(readPending);

View File

@ -1,65 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
/**
* Special {@link ChannelOutboundBuffer} for {@link NioDatagramChannel} implementations.
*/
final class NioDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<NioDatagramChannelOutboundBuffer>() {
@Override
protected NioDatagramChannelOutboundBuffer newObject(Handle<NioDatagramChannelOutboundBuffer> handle) {
return new NioDatagramChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given
* {@link .NioDatagramChannel}.
*/
static NioDatagramChannelOutboundBuffer newInstance(NioDatagramChannel channel) {
NioDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioDatagramChannelOutboundBuffer(Recycler.Handle<NioDatagramChannelOutboundBuffer> handle) {
super(handle);
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
}
return msg;
}
}

View File

@ -179,6 +179,11 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket); super(channel, javaSocket);

View File

@ -245,17 +245,18 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
super.doWrite(in); super.doWrite(in);
return; return;
} }
// Ensure the pending writes are made of ByteBufs only. // Ensure the pending writes are made of ByteBufs only.
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; ByteBuffer[] nioBuffers = in.nioBuffers();
ByteBuffer[] nioBuffers = nioIn.nioBuffers(); int nioBufferCnt = in.nioBufferCount();
int nioBufferCnt = nioIn.nioBufferCount();
if (nioBufferCnt <= 1) { if (nioBufferCnt <= 1) {
// We have something else beside ByteBuffers to write so fallback to normal writes. // We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in); super.doWrite(in);
return; break;
} }
long expectedWrittenBytes = nioIn.nioBufferSize(); long expectedWrittenBytes = in.nioBufferSize();
final SocketChannel ch = javaChannel(); final SocketChannel ch = javaChannel();
long writtenBytes = 0; long writtenBytes = 0;
@ -291,38 +292,13 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} else { } else {
// Did not write all buffers completely. // Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer. // Release the fully written buffers and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
nioIn.progress(readableBytes);
nioIn.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
nioIn.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
nioIn.progress(readableBytes);
nioIn.remove();
break;
}
}
incompleteWrite(setOpWrite); incompleteWrite(setOpWrite);
break; break;
} }
} }
} }
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSocketChannelOutboundBuffer.newInstance(this);
}
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket); super(channel, javaSocket);

View File

@ -1,233 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to also access flushed {@link ByteBuffer} to
* allow efficent gathering writes.
*/
public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer {
private ByteBuffer[] nioBuffers;
private int nioBufferCount;
private long nioBufferSize;
private static final Recycler<NioSocketChannelOutboundBuffer> RECYCLER =
new Recycler<NioSocketChannelOutboundBuffer>() {
@Override
protected NioSocketChannelOutboundBuffer newObject(Handle<NioSocketChannelOutboundBuffer> handle) {
return new NioSocketChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
public static NioSocketChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSocketChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSocketChannelOutboundBuffer(Recycler.Handle<NioSocketChannelOutboundBuffer> handle) {
super(handle);
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) {
return copyToDirectByteBuf(buf);
}
}
return msg;
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
* {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number
* of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link io.netty.channel.socket.nio.NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final Entry[] buffer = entries();
final int mask = entryMask();
ByteBuffer[] nioBuffers = this.nioBuffers;
Object m;
int unflushed = unflushed();
int i = flushed();
while (i != unflushed && (m = buffer[i].msg()) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
// found by now.
break;
}
NioEntry entry = (NioEntry) buffer[i];
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
i = i + 1 & mask;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Return the number of {@link java.nio.ByteBuffer} which can be written.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Return the number of bytes that can be written via gathering writes.
*/
public long nioBufferSize() {
return nioBufferSize;
}
@Override
public void recycle() {
// take care of recycle the ByteBuffer[] structure.
if (nioBuffers.length > INITIAL_CAPACITY) {
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(nioBuffers, null);
}
super.recycle();
}
@Override
protected NioEntry newEntry() {
return new NioEntry();
}
protected static final class NioEntry extends Entry {
ByteBuffer[] buffers;
ByteBuffer buf;
int count = -1;
@Override
public void clear() {
buffers = null;
buf = null;
count = -1;
super.clear();
}
@Override
public int cancel() {
buffers = null;
buf = null;
count = -1;
return super.cancel();
}
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.oio; package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope; import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
@ -61,6 +60,12 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(true); private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final MulticastSocket socket; private final MulticastSocket socket;
private final DatagramChannelConfig config; private final DatagramChannelConfig config;
@ -241,28 +246,19 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
break; break;
} }
final Object m;
final ByteBuf data; final ByteBuf data;
final SocketAddress remoteAddress; final SocketAddress remoteAddress;
if (o instanceof AddressedEnvelope) { if (o instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o; AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
remoteAddress = envelope.recipient(); remoteAddress = envelope.recipient();
m = envelope.content(); data = envelope.content();
} else { } else {
m = o; data = (ByteBuf) o;
remoteAddress = null; remoteAddress = null;
} }
if (m instanceof ByteBufHolder) { final int length = data.readableBytes();
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
}
int length = data.readableBytes();
if (remoteAddress != null) { if (remoteAddress != null) {
tmpPacket.setSocketAddress(remoteAddress); tmpPacket.setSocketAddress(remoteAddress);
} }
@ -285,6 +281,24 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
} }
} }
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
return msg;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
return msg;
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override @Override
public ChannelFuture joinGroup(InetAddress multicastAddress) { public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newPromise()); return joinGroup(multicastAddress, newPromise());

View File

@ -174,6 +174,11 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override @Override
protected void doConnect( protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {

View File

@ -13,27 +13,25 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel.socket.nio; package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.junit.Test; import org.junit.Test;
import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import static io.netty.buffer.Unpooled.*; import static io.netty.buffer.Unpooled.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class NioSocketChannelOutboundBufferTest { public class ChannelOutboundBufferTest {
@Test @Test
public void testEmptyNioBuffers() { public void testEmptyNioBuffers() {
AbstractChannel channel = new EmbeddedChannel(); TestChannel channel = new TestChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
assertEquals(0, buffer.nioBufferCount()); assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers); assertNotNull(buffers);
@ -46,31 +44,22 @@ public class NioSocketChannelOutboundBufferTest {
@Test @Test
public void testNioBuffersSingleBacked() { public void testNioBuffersSingleBacked() {
AbstractChannel channel = new EmbeddedChannel(); TestChannel channel = new TestChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount()); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers);
for (ByteBuffer b: buffers) {
assertNull(b);
}
assertEquals(0, buffer.nioBufferCount()); assertEquals(0, buffer.nioBufferCount());
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes()); ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes());
buffer.addMessage(buf, channel.voidPromise()); buffer.addMessage(buf, buf.readableBytes(), channel.voidPromise());
buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers); assertNotNull(buffers);
assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i == 0) { if (i == 0) {
assertEquals(buffers[0], nioBuf); assertEquals(buffers[i], nioBuf);
} else { } else {
assertNull(buffers[i]); assertNull(buffers[i]);
} }
@ -80,24 +69,20 @@ public class NioSocketChannelOutboundBufferTest {
@Test @Test
public void testNioBuffersExpand() { public void testNioBuffersExpand() {
AbstractChannel channel = new EmbeddedChannel(); TestChannel channel = new TestChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 64; i++) { for (int i = 0; i < 64; i++) {
buffer.addMessage(buf.copy(), channel.voidPromise()); buffer.addMessage(buf.copy(), buf.readableBytes(), channel.voidPromise());
} }
ByteBuffer[] nioBuffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: nioBuffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
nioBuffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(64, nioBuffers.length);
assertEquals(64, buffer.nioBufferCount()); assertEquals(64, buffer.nioBufferCount());
for (ByteBuffer nioBuf: nioBuffers) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
assertEquals(nioBuf, buf.internalNioBuffer(0, buf.readableBytes())); assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} }
release(buffer); release(buffer);
buf.release(); buf.release();
@ -105,26 +90,22 @@ public class NioSocketChannelOutboundBufferTest {
@Test @Test
public void testNioBuffersExpand2() { public void testNioBuffersExpand2() {
AbstractChannel channel = new EmbeddedChannel(); TestChannel channel = new TestChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
CompositeByteBuf comp = compositeBuffer(256); CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 65; i++) { for (int i = 0; i < 65; i++) {
comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes()); comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes());
} }
buffer.addMessage(comp, channel.voidPromise()); buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(128, buffers.length);
assertEquals(65, buffer.nioBufferCount()); assertEquals(65, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i < 65) { if (i < 65) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} else { } else {
@ -142,4 +123,84 @@ public class NioSocketChannelOutboundBufferTest {
} }
} }
} }
private static final class TestChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
TestChannel() {
super(null);
}
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return false;
}
@Override
protected SocketAddress localAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doClose() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doBeginRead() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isActive() {
return true;
}
@Override
public ChannelMetadata metadata() {
throw new UnsupportedOperationException();
}
final class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
}
}
} }