Port ChannelOutboundBuffer and related changes from 4.0

Motivation:

We did various changes related to the ChannelOutboundBuffer in 4.0 branch. This commit port all of them over and so make sure our branches are synced in terms of these changes.

Related to [#2734], [#2709], [#2729], [#2710] and [#2693] .

Modification:
Port all changes that was done on the ChannelOutboundBuffer.

This includes the port of the following commits:
 - 73dfd7c01b
 - 997d8c32d2
 - e282e504f1
 - 5e5d1a58fd
 - 8ee3575e72
 - d6f0d12a86
 - 16e50765d1
 - 3f3e66c31a

Result:
 - Less memory usage by ChannelOutboundBuffer
 - Same code as in 4.0 branch
 - Make it possible to use ChannelOutboundBuffer with Channel implementation that not extends AbstractChannel
This commit is contained in:
Norman Maurer 2014-08-05 14:24:49 +02:00
parent e33f12f5b8
commit 869687bd71
29 changed files with 1196 additions and 1165 deletions

View File

@ -16,6 +16,8 @@
package io.netty.buffer;
import io.netty.util.CharsetUtil;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -42,6 +44,8 @@ public final class ByteBufUtil {
static final ByteBufAllocator DEFAULT_ALLOCATOR;
private static final int THREAD_LOCAL_BUFFER_SIZE;
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i ++) {
@ -49,9 +53,7 @@ public final class ByteBufUtil {
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
@ -66,6 +68,9 @@ public final class ByteBufUtil {
}
DEFAULT_ALLOCATOR = alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
}
/**
@ -414,5 +419,89 @@ public final class ByteBufUtil {
return dst.flip().toString();
}
/**
* Returns a cached thread-local direct buffer, if available.
*
* @return a cached thread-local direct buffer, if available. {@code null} otherwise.
*/
public static ByteBuf threadLocalDirectBuffer() {
if (THREAD_LOCAL_BUFFER_SIZE <= 0) {
return null;
}
if (PlatformDependent.hasUnsafe()) {
return ThreadLocalUnsafeDirectByteBuf.newInstance();
} else {
return ThreadLocalDirectByteBuf.newInstance();
}
}
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};
static ThreadLocalUnsafeDirectByteBuf newInstance() {
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalUnsafeDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
@Override
protected ThreadLocalDirectByteBuf newObject(Handle handle) {
return new ThreadLocalDirectByteBuf(handle);
}
};
static ThreadLocalDirectByteBuf newInstance() {
ThreadLocalDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
private final Handle handle;
private ThreadLocalDirectByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
private ByteBufUtil() { }
}

View File

@ -39,7 +39,7 @@ public final class ReferenceCountUtil {
}
/**
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}.
* Try to call {@link ReferenceCounted#retain(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
@SuppressWarnings("unchecked")
@ -87,7 +87,7 @@ public final class ReferenceCountUtil {
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg, int decrement) {
@ -97,6 +97,38 @@ public final class ReferenceCountUtil {
return false;
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release()}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use {@link #release(Object)}
* instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg) {
try {
release(msg);
} catch (Throwable t) {
logger.warn("Failed to release a message: {}", msg, t);
}
}
/**
* Try to call {@link ReferenceCounted#release(int)} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
* Unlike {@link #release(Object)} this method catches an exception raised by {@link ReferenceCounted#release(int)}
* and logs it, rather than rethrowing it to the caller. It is usually recommended to use
* {@link #release(Object, int)} instead, unless you absolutely need to swallow an exception.
*/
public static void safeRelease(Object msg, int decrement) {
try {
release(msg, decrement);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to release a message: {} (decrement: {})", msg, decrement, t);
}
}
}
/**
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the

View File

@ -15,10 +15,15 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress;
@ -98,6 +103,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true;
if ((flags & readFlag) == 0) {
flags |= readFlag;
modifyEvents();
@ -159,6 +167,47 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
protected abstract AbstractEpollUnsafe newUnsafe();
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf);
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method.
*/
protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf == null) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
final ByteBuf directBuf = alloc.directBuffer(capacity);
directBuf.writeBytes(buf, buf.readerIndex(), capacity);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
protected static void checkResolvable(InetSocketAddress addr) {
if (addr.isUnresolved()) {
throw new UnresolvedAddressException();
@ -180,13 +229,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// NOOP
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.

View File

@ -1,102 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain a {@link IovArray}
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
* GC pressure a lot.
*/
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
@Override
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
return new EpollChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel}
*/
static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) {
EpollChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
super(handle);
}
/**
* Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this
* {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress()) {
return copyToDirectByteBuf(buf);
}
}
return msg;
}
/**
* Returns a {@link IovArray} if the currently pending messages.
* <p>
* Note that the returned {@link IovArray} is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
*/
IovArray iovArray() {
IovArray array = IovArray.get();
final Entry[] buffer = entries();
final int mask = entryMask();
int unflushed = unflushed();
int flushed = flushed();
Object m;
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
// found by now.
break;
}
Entry entry = buffer[flushed];
// Check if the entry was cancelled. if so we just skip it.
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
if (!array.add(buf)) {
// Can not hold more data so break here.
// We will handle this on the next write loop.
break;
}
}
flushed = flushed + 1 & mask;
}
return array;
}
}

View File

@ -16,13 +16,14 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -44,6 +45,12 @@ import java.nio.channels.NotYetConnectedException;
*/
public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
@ -282,27 +289,20 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
private boolean doWriteMessage(Object msg) throws IOException {
final Object m;
final ByteBuf data;
InetSocketAddress remoteAddress;
ByteBuf data;
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
remoteAddress = packet.recipient();
m = packet.content();
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
(AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
data = envelope.content();
remoteAddress = envelope.recipient();
} else {
m = msg;
data = (ByteBuf) msg;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
final int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
@ -324,19 +324,62 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
return writtenBytes > 0;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.hasMemoryAddress()) {
return msg;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
ByteBuf content = (ByteBuf) e.content();
if (content.hasMemoryAddress()) {
return e;
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public EpollDatagramChannelConfig config() {
return config;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return EpollDatagramChannelOutboundBuffer.newInstance(this);
}
@Override
protected void doDisconnect() throws Exception {
connected = false;

View File

@ -1,63 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
final class EpollDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<EpollDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<EpollDatagramChannelOutboundBuffer>() {
@Override
protected EpollDatagramChannelOutboundBuffer newObject(Handle<EpollDatagramChannelOutboundBuffer> handle) {
return new EpollDatagramChannelOutboundBuffer(handle);
}
};
static EpollDatagramChannelOutboundBuffer newInstance(EpollDatagramChannel channel) {
EpollDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollDatagramChannelOutboundBuffer(Recycler.Handle<EpollDatagramChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (isCopyNeeded(content)) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
} else if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isCopyNeeded(buf)) {
msg = copyToDirectByteBuf((ByteBuf) msg);
}
}
return msg;
}
private static boolean isCopyNeeded(ByteBuf content) {
return !content.hasMemoryAddress() || content.nioBufferCount() != 1;
}
}

View File

@ -74,7 +74,12 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
}
@Override
protected void doWrite(ChannelOutboundBuffer in) {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}

View File

@ -32,7 +32,6 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelOutboundBuffer;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
@ -50,6 +49,10 @@ import java.util.concurrent.TimeUnit;
*/
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private final EpollSocketChannelConfig config;
/**
@ -111,6 +114,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove();
return true;
}
boolean done = false;
long writtenBytes = 0;
if (buf.hasMemoryAddress()) {
@ -132,7 +136,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
updateOutboundBuffer(in, writtenBytes, 1, done);
in.removeBytes(writtenBytes);
return done;
} else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex();
@ -154,23 +159,27 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
updateOutboundBuffer(in, writtenBytes, 1, done);
in.removeBytes(writtenBytes);
return done;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, 1, nioBuffers, nioBuffers.length, readableBytes);
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
}
}
private boolean writeBytesMultiple(
EpollChannelOutboundBuffer in, IovArray array) throws IOException {
boolean done = false;
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
long expectedWrittenBytes = array.size();
int cnt = array.count();
assert expectedWrittenBytes != 0;
assert cnt != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + cnt;
int messages = cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
@ -200,33 +209,34 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} while (offset < end && localWrittenBytes > 0);
}
updateOutboundBuffer(in, writtenBytes, messages, done);
in.removeBytes(writtenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
assert expectedWrittenBytes != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + nioBufferCnt;
loop: while (nioBufferCnt > 0) {
for (;;) {
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt;
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break loop;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
@ -243,49 +253,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
} while (offset < end && localWrittenBytes > 0);
}
}
updateOutboundBuffer(in, writtenBytes, msgCount, done);
in.removeBytes(writtenBytes);
return done;
}
private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
boolean done) {
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
in.progress(buf.readableBytes());
in.remove();
}
in.progress(writtenBytes);
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
}
}
/**
* Write a {@link DefaultFileRegion}
*
@ -293,6 +265,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
if (region.transfered() >= region.count()) {
in.remove();
return true;
}
boolean done = false;
long flushedAmount = 0;
@ -331,47 +308,24 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
// Do gathering write if:
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount >= 1) {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to EpollChannelOutboundBuffer and write the IovArray directly.
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
IovArray array = epollIn.iovArray();
int cnt = array.count();
if (cnt > 1) {
if (!writeBytesMultiple(epollIn, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) {
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
} else {
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only.
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
int nioBufferCnt = nioIn.nioBufferCount();
if (nioBufferCnt > 1) {
if (!writeBytesMultiple(nioIn, msgCount, nioBuffers, nioBufferCnt, nioIn.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
} else { // msgCount == 1
if (!doWriteSingle(in)) {
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}
}
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
@ -379,19 +333,75 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
return false;
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
// Should never reach here.
throw new Error();
}
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArray.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
}
return true;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
return buf;
}
if (msg instanceof DefaultFileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
@ -755,16 +765,4 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
}
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
if (PlatformDependent.hasUnsafe()) {
// This means we will be able to access the memory addresses directly and so be able to do
// gathering writes with the AddressEntry.
return EpollChannelOutboundBuffer.newInstance(this);
} else {
// No access to the memoryAddres, so fallback to use ByteBuffer[] for gathering writes.
return NioSocketChannelOutboundBuffer.newInstance(this);
}
}
}

View File

@ -16,6 +16,8 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
@ -23,7 +25,7 @@ import io.netty.util.internal.PlatformDependent;
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
* array copies.
*
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>.
* The buffers are written out directly into direct memory to match the struct iov. See also {@code man writev}.
*
* <pre>
* struct iovec {
@ -33,19 +35,24 @@ import io.netty.util.internal.PlatformDependent;
* </pre>
*
* See also
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/">
* Efficient JNI programming IV: Wrapping native data objects</a>.
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/"
* >Efficient JNI programming IV: Wrapping native data objects</a>.
*/
final class IovArray {
// Maximal number of struct iov entries that can be passed to writev(...)
private static final int IOV_MAX = Native.IOV_MAX;
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
final class IovArray implements MessageProcessor {
/** The size of an address which should be 8 for 64 bits and 4 for 32 bits. */
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
// address.
/**
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
* address.
*/
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
// The needed memory to hold up to IOV_MAX iov entries.
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
/** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
*/
private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE;
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
@ -72,17 +79,26 @@ final class IovArray {
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
* {@code false} otherwise.
*/
boolean add(ByteBuf buf) {
if (count == IOV_MAX) {
private boolean add(ByteBuf buf) {
if (count == Native.IOV_MAX) {
// No more room!
return false;
}
int len = buf.readableBytes();
long addr = buf.memoryAddress();
int offset = buf.readerIndex();
long baseOffset = memoryAddress(count++);
long lengthOffset = baseOffset + ADDRESS_SIZE;
final int len = buf.readableBytes();
if (len == 0) {
// No need to add an empty buffer.
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
// fetching the next buffers.
return true;
}
final long addr = buf.memoryAddress();
final int offset = buf.readerIndex();
final long baseOffset = memoryAddress(count++);
final long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
PlatformDependent.putLong(baseOffset, addr + offset);
@ -92,6 +108,7 @@ final class IovArray {
PlatformDependent.putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len);
}
size += len;
return true;
}
@ -147,13 +164,19 @@ final class IovArray {
return memoryAddress + IOV_SIZE * offset;
}
@Override
public boolean processMessage(Object msg) throws Exception {
return msg instanceof ByteBuf && add((ByteBuf) msg);
}
/**
* Returns a {@link IovArray} which can be filled.
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/
static IovArray get() {
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
buffer.forEachFlushedMessage(array);
return array;
}
}

View File

@ -20,7 +20,7 @@ import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -34,8 +34,8 @@ import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -303,17 +303,45 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return true;
}
ByteBuffer nioData = data.nioBuffer();
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0;
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
SctpMessage m = (SctpMessage) msg;
ByteBuf buf = m.content();
if (buf.isDirect() && buf.nioBufferCount() == 1) {
return m;
}
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf));
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) +
" (expected: " + StringUtil.simpleClassName(SctpMessage.class));
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
return bindAddress(localAddress, newPromise());
@ -364,44 +392,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return promise;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSctpChannelOutboundBuffer.newInstance(this);
}
static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioSctpChannelOutboundBuffer> RECYCLER =
new Recycler<NioSctpChannelOutboundBuffer>() {
@Override
protected NioSctpChannelOutboundBuffer newObject(Handle<NioSctpChannelOutboundBuffer> handle) {
return new NioSctpChannelOutboundBuffer(handle);
}
};
static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSctpChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSctpChannelOutboundBuffer(Recycler.Handle<NioSctpChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof SctpMessage) {
SctpMessage message = (SctpMessage) msg;
ByteBuf content = message.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new SctpMessage(message.protocolIdentifier(), message.streamIdentifier(), direct);
}
}
return msg;
}
}
private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
super(channel, javaChannel);

View File

@ -221,6 +221,11 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel);

View File

@ -34,6 +34,7 @@ import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -64,6 +65,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
InternalLoggerFactory.getInstance(OioSctpChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
private final SctpChannel ch;
private final SctpChannelConfig config;
@ -273,6 +275,16 @@ public class OioSctpChannel extends AbstractOioMessageChannel
}
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
}
@Override
public Association association() {
try {

View File

@ -290,6 +290,11 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel);

View File

@ -98,6 +98,11 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean isActive() {
return javaChannel().socket().isBound();

View File

@ -24,7 +24,6 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -101,7 +100,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.getWritable();
return buf != null && buf.isWritable();
}
@Override
@ -397,8 +396,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = newOutboundBuffer();
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private boolean inFlush0;
@Override
@ -647,7 +645,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void beginRead() {
public final void beginRead() {
if (!isActive()) {
return;
}
@ -666,7 +664,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void write(Object msg, ChannelPromise promise) {
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
@ -678,11 +676,25 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, promise);
int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
@Override
public void flush() {
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
@ -733,7 +745,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelPromise voidPromise() {
public final ChannelPromise voidPromise() {
return unsafeVoidPromise;
}
@ -791,13 +803,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
/**
* Create a new {@link ChannelOutboundBuffer} which holds the pending messages for this {@link AbstractChannel}.
*/
protected ChannelOutboundBuffer newOutboundBuffer() {
return ChannelOutboundBuffer.newInstance(this);
}
/**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/
@ -856,12 +861,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
protected static void checkEOF(FileRegion region) throws IOException {
if (region.transfered() < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ region.transfered());
}
/**
* Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
* the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
*/
protected Object filterOutboundMessage(Object msg) throws Exception {
return msg;
}
static final class CloseFuture extends DefaultChannelPromise {

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel;
import io.netty.util.ReferenceCountUtil;
import java.net.SocketAddress;
/**
@ -71,24 +69,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) {
throw new UnsupportedOperationException();
}
private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void write(Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
reject(promise);
}
@Override
public void flush() {
// ignore
}
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
reject(promise);
}
private void reject(ChannelPromise promise) {
safeSetFailure(promise, new UnsupportedOperationException());
}
}

View File

@ -13,24 +13,22 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -38,56 +36,48 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
* outbound write requests.
*
* All the methods should only be called by the {@link EventLoop} of the {@link Channel}.
*/
public class ChannelOutboundBuffer {
public final class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
protected static final int INITIAL_CAPACITY =
SystemPropertyUtil.getInt("io.netty.outboundBufferInitialCapacity", 4);
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.outboundBufferInitialCapacity: {}", INITIAL_CAPACITY);
}
}
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ChannelOutboundBuffer newObject(Handle<ChannelOutboundBuffer> handle) {
return new ChannelOutboundBuffer(handle);
protected ByteBuffer[] initialValue() throws Exception {
return new ByteBuffer[1024];
}
};
/**
* Get a new instance of this {@link ChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private final Channel channel;
private final Handle<? extends ChannelOutboundBuffer> handle;
protected AbstractChannel channel;
// A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The
// flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range
// [unflushed, tail).
private Entry[] buffer;
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
// The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
private Entry tailEntry;
// The number of flushed entries that are not written yet
private int flushed;
private int unflushed;
private int tail;
private int nioBufferCount;
private long nioBufferSize;
private boolean inFail;
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;
@SuppressWarnings("unused")
private volatile long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER;
@SuppressWarnings("FieldMayBeFinal")
private volatile int writable = 1;
static {
AtomicIntegerFieldUpdater<ChannelOutboundBuffer> writableUpdater =
PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable");
@ -104,46 +94,26 @@ public class ChannelOutboundBuffer {
TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater;
}
private volatile int writable = 1;
protected ChannelOutboundBuffer(Handle<? extends ChannelOutboundBuffer> handle) {
this.handle = handle;
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = newEntry();
}
ChannelOutboundBuffer(AbstractChannel channel) {
this.channel = channel;
}
/**
* Return the array of {@link Entry}'s which hold the pending write requests in an circular array.
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/
protected final Entry[] entries() {
return buffer;
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
/**
* Add the given message to this {@link ChannelOutboundBuffer} so it will be marked as flushed once
* {@link #addFlush()} was called. The {@link ChannelPromise} will be notified once the write operations
* completes.
*/
public final void addMessage(Object msg, ChannelPromise promise) {
msg = beforeAdd(msg);
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
Entry e = buffer[tail++];
e.msg = msg;
e.pendingSize = size;
e.promise = promise;
e.total = total(msg);
tail &= buffer.length - 1;
if (tail == flushed) {
addCapacity();
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
@ -152,62 +122,32 @@ public class ChannelOutboundBuffer {
}
/**
* Is called before the message is actually added to the {@link ChannelOutboundBuffer} and so allow to
* convert it to a different format. Sub-classes may override this.
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
protected Object beforeAdd(Object msg) {
return msg;
}
/**
* Expand internal array which holds the {@link Entry}'s.
*/
private void addCapacity() {
int p = flushed;
int n = buffer.length;
int r = n - p; // number of elements to the right of p
int s = size();
int newCapacity = n << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
Entry[] e = new Entry[newCapacity];
System.arraycopy(buffer, p, e, 0, r);
System.arraycopy(buffer, 0, e, r, p);
for (int i = n; i < e.length; i++) {
e[i] = newEntry();
}
buffer = e;
flushed = 0;
unflushed = s;
tail = n;
}
/**
* Mark all messages in this {@link ChannelOutboundBuffer} as flushed.
*/
public final void addFlush() {
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
if (unflushed != tail) {
unflushed = tail;
final int mask = buffer.length - 1;
int i = flushed;
while (i != unflushed && buffer[i].msg != null) {
Entry entry = buffer[i];
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending);
}
i = i + 1 & mask;
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
@ -215,11 +155,8 @@ public class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
final void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
void incrementPendingOutboundBytes(int size) {
if (size == 0) {
return;
}
@ -235,11 +172,8 @@ public class ChannelOutboundBuffer {
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
final void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
void decrementPendingOutboundBytes(int size) {
if (size == 0) {
return;
}
@ -265,20 +199,23 @@ public class ChannelOutboundBuffer {
}
/**
* Return current message or {@code null} if no flushed message is left to process.
* Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
*/
public final Object current() {
if (isEmpty()) {
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
} else {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = buffer[flushed];
return entry.msg;
}
}
public final void progress(long amount) {
Entry e = buffer[flushed];
return entry.msg;
}
/**
* Notify the {@link ChannelPromise} of the current message about writing progress.
*/
public void progress(long amount) {
Entry e = flushedEntry;
assert e != null;
ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount;
@ -288,93 +225,237 @@ public class ChannelOutboundBuffer {
}
/**
* Mark the current message as successful written and remove it from this {@link ChannelOutboundBuffer}.
* This method will return {@code true} if there are more messages left to process, {@code false} otherwise.
* Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
* flushed message exists at the time this method is called it will return {@code false} to signal that no more
* messages are ready to be handled.
*/
public final boolean remove() {
if (isEmpty()) {
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
return false;
}
Entry e = buffer[flushed];
Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise;
int size = e.pendingSize;
e.clear();
flushed = flushed + 1 & buffer.length - 1;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size);
}
// recycle the entry
e.recycle();
return true;
}
/**
* Mark the current message as failure with the given {@link java.lang.Throwable} and remove it from this
* {@link ChannelOutboundBuffer}. This method will return {@code true} if there are more messages left to process,
* {@code false} otherwise.
* Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
* and return {@code true}. If no flushed message exists at the time this method is called it will return
* {@code false} to signal that no more messages are ready to be handled.
*/
public final boolean remove(Throwable cause) {
if (isEmpty()) {
public boolean remove(Throwable cause) {
Entry e = flushedEntry;
if (e == null) {
return false;
}
Entry e = buffer[flushed];
Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise;
int size = e.pendingSize;
e.clear();
flushed = flushed + 1 & buffer.length - 1;
removeEntry(e);
if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before.
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size);
}
// recycle the entry
e.recycle();
return true;
}
final boolean getWritable() {
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
public void removeBytes(long writtenBytes) {
for (;;) {
final ByteBuf buf = (ByteBuf) current();
if (buf == null) {
break;
}
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
* array and the total number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public long nioBufferSize() {
return nioBufferSize;
}
boolean isWritable() {
return writable != 0;
}
/**
* Return the number of messages that are ready to be written (flushed before).
* Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
*/
public final int size() {
return unflushed - flushed & buffer.length - 1;
public int size() {
return flushed;
}
/**
* Return {@code true} if this {@link ChannelOutboundBuffer} contains no flushed messages
* Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
* otherwise.
*/
public final boolean isEmpty() {
return unflushed == flushed;
public boolean isEmpty() {
return flushed == 0;
}
/**
* Fail all previous flushed messages with the given {@link Throwable}.
*/
final void failFlushed(Throwable cause) {
void failFlushed(Throwable cause) {
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
// indirectly (usually by closing the channel.)
@ -396,10 +477,7 @@ public class ChannelOutboundBuffer {
}
}
/**
* Fail all pending messages with the given {@link ClosedChannelException}.
*/
final void close(final ClosedChannelException cause) {
void close(final ClosedChannelException cause) {
if (inFail) {
channel.eventLoop().execute(new Runnable() {
@Override
@ -421,140 +499,95 @@ public class ChannelOutboundBuffer {
}
// Release all unflushed messages.
final int unflushedCount = tail - unflushed & buffer.length - 1;
try {
for (int i = 0; i < unflushedCount; i++) {
Entry e = buffer[unflushed + i & buffer.length - 1];
Entry e = unflushedEntry;
while (e != null) {
// Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = e.pendingSize;
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
e.pendingSize = 0;
if (!e.cancelled) {
safeRelease(e.msg);
ReferenceCountUtil.safeRelease(e.msg);
safeFail(e.promise, cause);
}
e.msg = null;
e.promise = null;
e = e.recycleAndGetNext();
}
} finally {
tail = unflushed;
inFail = false;
}
recycle();
}
/**
* Release the message and log if any error happens during release.
*/
protected static void safeRelease(Object message) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable t) {
logger.warn("Failed to release a message.", t);
}
}
/**
* Try to mark the given {@link ChannelPromise} as success and log if this failed.
*/
private static void safeSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
/**
* Try to mark the given {@link ChannelPromise} as failued with the given {@link Throwable} and log if this failed.
*/
private static void safeFail(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}
/**
* Recycle this {@link ChannelOutboundBuffer}. After this was called it is disallowed to use it with the previous
* assigned {@link AbstractChannel}.
*/
@SuppressWarnings("unchecked")
@Deprecated
public void recycle() {
if (buffer.length > INITIAL_CAPACITY) {
Entry[] e = new Entry[INITIAL_CAPACITY];
System.arraycopy(buffer, 0, e, 0, INITIAL_CAPACITY);
buffer = e;
// NOOP
}
// reset flushed, unflushed and tail
// See https://github.com/netty/netty/issues/1772
flushed = 0;
unflushed = 0;
tail = 0;
// Set the channel to null so it can be GC'ed ASAP
channel = null;
totalPendingSize = 0;
writable = 1;
RECYCLER.recycle(this, (Handle<ChannelOutboundBuffer>) handle);
}
/**
* Return the total number of pending bytes.
*/
public final long totalPendingWriteBytes() {
public long totalPendingWriteBytes() {
return totalPendingSize;
}
/**
* Create a new {@link Entry} to use for the internal datastructure. Sub-classes may override this use a special
* sub-class.
* Call {@link MessageProcessor#processMessage(Object)} for each flushed message
* in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
* returns {@code false} or there are no more flushed messages to process.
*/
protected Entry newEntry() {
return new Entry();
public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
if (processor == null) {
throw new NullPointerException("processor");
}
Entry entry = flushedEntry;
if (entry == null) {
return;
}
do {
if (!entry.cancelled) {
if (!processor.processMessage(entry.msg)) {
return;
}
}
entry = entry.next;
} while (isFlushedEntry(entry));
}
private boolean isFlushedEntry(Entry e) {
return e != null && e != unflushedEntry;
}
public interface MessageProcessor {
/**
* Return the index of the first flushed message.
* Will be called for each flushed message until it either there are no more flushed messages or this
* method returns {@code false}.
*/
protected final int flushed() {
return flushed;
boolean processMessage(Object msg) throws Exception;
}
/**
* Return the index of the first unflushed messages.
*/
protected final int unflushed() {
return unflushed;
static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
protected final int entryMask() {
return buffer.length - 1;
}
protected ByteBuf copyToDirectByteBuf(ByteBuf buf) {
int readableBytes = buf.readableBytes();
ByteBufAllocator alloc = channel.alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
safeRelease(buf);
return directBuf;
}
if (ThreadLocalPooledDirectByteBuf.threadLocalDirectBufferSize > 0) {
ByteBuf directBuf = ThreadLocalPooledDirectByteBuf.newInstance();
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
safeRelease(buf);
return directBuf;
}
return buf;
}
protected static class Entry {
private final Handle handle;
Entry next;
Object msg;
ByteBuffer[] bufs;
ByteBuffer buf;
ChannelPromise promise;
long progress;
long total;
@ -562,43 +595,42 @@ public class ChannelOutboundBuffer {
int count = -1;
boolean cancelled;
public Object msg() {
return msg;
private Entry(Handle handle) {
this.handle = handle;
}
/**
* Return {@code true} if the {@link Entry} was cancelled via {@link #cancel()} before,
* {@code false} otherwise.
*/
public boolean isCancelled() {
return cancelled;
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size;
entry.total = total;
entry.promise = promise;
return entry;
}
/**
* Cancel this {@link Entry} and the message that was hold by this {@link Entry}. This method returns the
* number of pending bytes for the cancelled message.
*/
public int cancel() {
int cancel() {
if (!cancelled) {
cancelled = true;
int pSize = pendingSize;
// release message and replace with an empty buffer
safeRelease(msg);
ReferenceCountUtil.safeRelease(msg);
msg = Unpooled.EMPTY_BUFFER;
pendingSize = 0;
total = 0;
progress = 0;
bufs = null;
buf = null;
return pSize;
}
return 0;
}
/**
* Clear this {@link Entry} and so release all resources.
*/
public void clear() {
void recycle() {
next = null;
bufs = null;
buf = null;
msg = null;
promise = null;
progress = 0;
@ -606,6 +638,13 @@ public class ChannelOutboundBuffer {
pendingSize = 0;
count = -1;
cancelled = false;
RECYCLER.recycle(this, handle);
}
Entry recycleAndGetNext() {
Entry next = this.next;
recycle();
return next;
}
}
}

View File

@ -35,6 +35,11 @@ import java.nio.channels.SelectionKey;
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
*/
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private Runnable flushTask;
/**
@ -247,11 +252,31 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
break;
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
// Should not reach here.
throw new Error();
}
}
}
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {

View File

@ -15,6 +15,10 @@
*/
package io.netty.channel.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -23,6 +27,8 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -174,19 +180,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void beginRead() {
// Channel.read() or ChannelHandlerContext.read() was called
readPending = true;
super.beginRead();
}
@Override
public SelectableChannel ch() {
public final SelectableChannel ch() {
return javaChannel();
}
@Override
public void connect(
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
@ -277,7 +276,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void finishConnect() {
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
@ -306,7 +305,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
protected void flush0() {
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
@ -317,7 +316,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public void forceFlush() {
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
@ -362,6 +361,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
@ -371,6 +371,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
@ -386,4 +388,73 @@ public abstract class AbstractNioChannel extends AbstractChannel {
* Finish the connect
*/
protected abstract void doFinishConnect() throws Exception;
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
* Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
* but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is
* too high, but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
if (holder != buf) {
// Ensure to call holder.release() to give the holder a chance to release other resources than its content.
buf.retain();
ReferenceCountUtil.safeRelease(holder);
}
return buf;
}
}

View File

@ -33,10 +33,14 @@ import java.io.IOException;
* Abstract base class for OIO which reads and writes bytes from/to a Socket
*/
public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
@ -214,6 +218,16 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf || msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Return the number of bytes ready to read from the underlying Socket.
*/

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.FileRegion;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -140,6 +141,13 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
}
}
private static void checkEOF(FileRegion region) throws IOException {
if (region.transfered() < region.count()) {
throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
"but only wrote " + region.transfered());
}
}
@Override
protected void doClose() throws Exception {
InputStream is = this.is;

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -25,6 +24,7 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -62,6 +62,12 @@ public final class NioDatagramChannel
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final DatagramChannelConfig config;
@ -257,34 +263,24 @@ public final class NioDatagramChannel
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
final Object m;
final SocketAddress remoteAddress;
ByteBuf data;
final ByteBuf data;
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg;
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
remoteAddress = envelope.recipient();
m = envelope.content();
data = envelope.content();
} else {
m = msg;
data = (ByteBuf) msg;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
final int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
ByteBuffer nioData = data.nioBuffer();
final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
final int writtenBytes;
if (remoteAddress != null) {
writtenBytes = javaChannel().send(nioData, remoteAddress);
@ -294,6 +290,49 @@ public final class NioDatagramChannel
return writtenBytes > 0;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf content = p.content();
if (isSingleDirectBuffer(content)) {
return p;
}
return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isSingleDirectBuffer(buf)) {
return buf;
}
return newDirectBuffer(buf);
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
ByteBuf content = (ByteBuf) e.content();
if (isSingleDirectBuffer(content)) {
return e;
}
return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
* (We check this because otherwise we need to make it a non-composite buffer.)
*/
private static boolean isSingleDirectBuffer(ByteBuf buf) {
return buf.isDirect() && buf.nioBufferCount() == 1;
}
@Override
protected boolean continueOnWriteError() {
// Continue on write error as a DatagramChannel can write to multiple remote peers
@ -543,11 +582,6 @@ public final class NioDatagramChannel
return promise;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioDatagramChannelOutboundBuffer.newInstance(this);
}
@Override
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);

View File

@ -1,65 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
/**
* Special {@link ChannelOutboundBuffer} for {@link NioDatagramChannel} implementations.
*/
final class NioDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<NioDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<NioDatagramChannelOutboundBuffer>() {
@Override
protected NioDatagramChannelOutboundBuffer newObject(Handle<NioDatagramChannelOutboundBuffer> handle) {
return new NioDatagramChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given
* {@link .NioDatagramChannel}.
*/
static NioDatagramChannelOutboundBuffer newInstance(NioDatagramChannel channel) {
NioDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioDatagramChannelOutboundBuffer(Recycler.Handle<NioDatagramChannelOutboundBuffer> handle) {
super(handle);
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (!content.isDirect() || content.nioBufferCount() != 1) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
}
return msg;
}
}

View File

@ -179,6 +179,11 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);

View File

@ -246,17 +246,18 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
super.doWrite(in);
return;
}
// Ensure the pending writes are made of ByteBufs only.
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
ByteBuffer[] nioBuffers = nioIn.nioBuffers();
int nioBufferCnt = nioIn.nioBufferCount();
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
if (nioBufferCnt <= 1) {
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
break;
}
long expectedWrittenBytes = nioIn.nioBufferSize();
long expectedWrittenBytes = in.nioBufferSize();
final SocketChannel ch = javaChannel();
long writtenBytes = 0;
@ -292,38 +293,13 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
nioIn.progress(readableBytes);
nioIn.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
nioIn.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
nioIn.progress(readableBytes);
nioIn.remove();
break;
}
}
in.removeBytes(writtenBytes);
incompleteWrite(setOpWrite);
break;
}
}
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return NioSocketChannelOutboundBuffer.newInstance(this);
}
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);

View File

@ -1,233 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Special {@link ChannelOutboundBuffer} implementation which allows to also access flushed {@link ByteBuffer} to
* allow efficent gathering writes.
*/
public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer {
private ByteBuffer[] nioBuffers;
private int nioBufferCount;
private long nioBufferSize;
private static final Recycler<NioSocketChannelOutboundBuffer> RECYCLER =
new Recycler<NioSocketChannelOutboundBuffer>() {
@Override
protected NioSocketChannelOutboundBuffer newObject(Handle<NioSocketChannelOutboundBuffer> handle) {
return new NioSocketChannelOutboundBuffer(handle);
}
};
/**
* Get a new instance of this {@link NioSocketChannelOutboundBuffer} and attach it the given {@link AbstractChannel}
*/
public static NioSocketChannelOutboundBuffer newInstance(AbstractChannel channel) {
NioSocketChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private NioSocketChannelOutboundBuffer(Recycler.Handle<NioSocketChannelOutboundBuffer> handle) {
super(handle);
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
}
/**
* Convert all non direct {@link ByteBuf} to direct {@link ByteBuf}'s. This is done as the JDK implementation
* will do the conversation itself and we can do a better job here.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isDirect()) {
return copyToDirectByteBuf(buf);
}
}
return msg;
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
* {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number
* of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link io.netty.channel.socket.nio.NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0;
int nioBufferCount = 0;
final Entry[] buffer = entries();
final int mask = entryMask();
ByteBuffer[] nioBuffers = this.nioBuffers;
Object m;
int unflushed = unflushed();
int i = flushed();
while (i != unflushed && (m = buffer[i].msg()) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
// found by now.
break;
}
NioEntry entry = (NioEntry) buffer[i];
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
}
}
i = i + 1 & mask;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {
for (ByteBuffer nioBuf: nioBufs) {
if (nioBuf == null) {
break;
}
nioBuffers[nioBufferCount ++] = nioBuf;
}
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
/**
* Return the number of {@link java.nio.ByteBuffer} which can be written.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Return the number of bytes that can be written via gathering writes.
*/
public long nioBufferSize() {
return nioBufferSize;
}
@Override
public void recycle() {
// take care of recycle the ByteBuffer[] structure.
if (nioBuffers.length > INITIAL_CAPACITY) {
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(nioBuffers, null);
}
super.recycle();
}
@Override
protected NioEntry newEntry() {
return new NioEntry();
}
protected static final class NioEntry extends Entry {
ByteBuffer[] buffers;
ByteBuffer buf;
int count = -1;
@Override
public void clear() {
buffers = null;
buf = null;
count = -1;
super.clear();
}
@Override
public int cancel() {
buffers = null;
buf = null;
count = -1;
return super.cancel();
}
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -61,6 +60,12 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final MulticastSocket socket;
private final DatagramChannelConfig config;
@ -241,28 +246,19 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
break;
}
final Object m;
final ByteBuf data;
final SocketAddress remoteAddress;
if (o instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
remoteAddress = envelope.recipient();
m = envelope.content();
data = envelope.content();
} else {
m = o;
data = (ByteBuf) o;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o));
}
int length = data.readableBytes();
final int length = data.readableBytes();
if (remoteAddress != null) {
tmpPacket.setSocketAddress(remoteAddress);
}
@ -285,6 +281,24 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
return msg;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
return msg;
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newPromise());

View File

@ -174,6 +174,11 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {

View File

@ -13,27 +13,25 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import static io.netty.buffer.Unpooled.*;
import static org.junit.Assert.*;
public class NioSocketChannelOutboundBufferTest {
public class ChannelOutboundBufferTest {
@Test
public void testEmptyNioBuffers() {
AbstractChannel channel = new EmbeddedChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers);
@ -46,31 +44,22 @@ public class NioSocketChannelOutboundBufferTest {
@Test
public void testNioBuffersSingleBacked() {
AbstractChannel channel = new EmbeddedChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers);
for (ByteBuffer b: buffers) {
assertNull(b);
}
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes());
buffer.addMessage(buf, channel.voidPromise());
buffers = buffer.nioBuffers();
buffer.addMessage(buf, buf.readableBytes(), channel.voidPromise());
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush();
buffers = buffer.nioBuffers();
ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers);
assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) {
for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i == 0) {
assertEquals(buffers[0], nioBuf);
assertEquals(buffers[i], nioBuf);
} else {
assertNull(buffers[i]);
}
@ -80,24 +69,20 @@ public class NioSocketChannelOutboundBufferTest {
@Test
public void testNioBuffersExpand() {
AbstractChannel channel = new EmbeddedChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 64; i++) {
buffer.addMessage(buf.copy(), channel.voidPromise());
buffer.addMessage(buf.copy(), buf.readableBytes(), channel.voidPromise());
}
ByteBuffer[] nioBuffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: nioBuffers) {
assertNull(b);
}
buffer.addFlush();
nioBuffers = buffer.nioBuffers();
assertEquals(64, nioBuffers.length);
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(64, buffer.nioBufferCount());
for (ByteBuffer nioBuf: nioBuffers) {
assertEquals(nioBuf, buf.internalNioBuffer(0, buf.readableBytes()));
for (int i = 0; i < buffer.nioBufferCount(); i++) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
}
release(buffer);
buf.release();
@ -105,26 +90,22 @@ public class NioSocketChannelOutboundBufferTest {
@Test
public void testNioBuffersExpand2() {
AbstractChannel channel = new EmbeddedChannel();
NioSocketChannelOutboundBuffer buffer = NioSocketChannelOutboundBuffer.newInstance(channel);
TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 65; i++) {
comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes());
}
buffer.addMessage(comp, channel.voidPromise());
buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush();
buffers = buffer.nioBuffers();
assertEquals(128, buffers.length);
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(65, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) {
for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i < 65) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} else {
@ -142,4 +123,84 @@ public class NioSocketChannelOutboundBufferTest {
}
}
}
private static final class TestChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
TestChannel() {
super(null);
}
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return false;
}
@Override
protected SocketAddress localAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doClose() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doBeginRead() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isActive() {
return true;
}
@Override
public ChannelMetadata metadata() {
throw new UnsupportedOperationException();
}
final class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
}
}
}