diff --git a/buffer/src/main/java/io/netty/buffer/Buf.java b/buffer/src/main/java/io/netty/buffer/Buf.java index 93d770a78b..7e806d07d5 100644 --- a/buffer/src/main/java/io/netty/buffer/Buf.java +++ b/buffer/src/main/java/io/netty/buffer/Buf.java @@ -18,22 +18,9 @@ package io.netty.buffer; /** * A buffer to operate on */ -public interface Buf { +public interface Buf extends Freeable { /** * The BufType which will be handled by the Buf implementation */ BufType type(); - - /** - * Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}. - */ - boolean isFreed(); - - /** - * Deallocates the internal memory block of this buffer or returns it to the allocator or pool it came from. - * The result of accessing a released buffer is unspecified and can even cause JVM crash. - * - * @throws UnsupportedOperationException if this buffer is derived - */ - void free(); } diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index d32650b7d3..e56a880045 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1893,4 +1893,19 @@ public interface ByteBuf extends Buf, Comparable { */ @Override String toString(); + + /** + * Deallocates the internal memory block of this buffer or returns it to the allocator or pool it came from. + * The result of accessing a released buffer is unspecified and can even cause JVM crash. + * + * @throws UnsupportedOperationException if this buffer is derived + */ + @Override + void free(); + + /** + * Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}. + */ + @Override + boolean isFreed(); } diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufHolder.java b/buffer/src/main/java/io/netty/buffer/ByteBufHolder.java new file mode 100644 index 0000000000..e21bea7e5b --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/ByteBufHolder.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * A packet which is send or receive. The contract for a {@link ByteBufHolder} is the + * following: + * + * When send a {@link ByteBufHolder} the {@link ByteBufHolder} will be freed by calling {@link #free()} + * in the actual transport implementation. When receive a {@link ByteBufHolder} the {@link #free()} + * must be called once is is processed. + * + */ +public interface ByteBufHolder extends Freeable { + + /** + * Return the data which is held by this {@link ByteBufHolder}. + * + */ + ByteBuf data(); + + /** + * Create a copy of this {@link ByteBufHolder} which can be used even after {@link #free()} + * is called. + */ + ByteBufHolder copy(); + + /** + * Free of the resources that are hold by this instance. This includes the {@link ByteBuf}. + */ + @Override + void free(); + + /** + * Returns {@code true} if and only if this instances was freed. + */ + @Override + boolean isFreed(); +} diff --git a/buffer/src/main/java/io/netty/buffer/DefaultByteBufHolder.java b/buffer/src/main/java/io/netty/buffer/DefaultByteBufHolder.java new file mode 100644 index 0000000000..6410209c5d --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/DefaultByteBufHolder.java @@ -0,0 +1,65 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * Default implementation of a {@link ByteBufHolder} that holds it's data in a {@link ByteBuf}. + * + */ +public class DefaultByteBufHolder implements ByteBufHolder { + private final ByteBuf data; + public DefaultByteBufHolder(ByteBuf data) { + if (data == null) { + throw new NullPointerException("data"); + } + if (data.unwrap() != null && !(data instanceof SwappedByteBuf)) { + throw new IllegalArgumentException("Only not-derived ByteBuf instance are supported, you used: " + + data.getClass().getSimpleName()); + } + this.data = data; + } + + @Override + public ByteBuf data() { + if (data.isFreed()) { + throw new IllegalBufferAccessException(); + } + return data; + } + + @Override + public void free() { + data.free(); + } + + @Override + public boolean isFreed() { + return data.isFreed(); + } + + @Override + public ByteBufHolder copy() { + return new DefaultByteBufHolder(data().copy()); + } + + @Override + public String toString() { + if (isFreed()) { + return "Message{data=(FREED)}"; + } + return "Message{data=" + ByteBufUtil.hexDump(data()) + '}'; + } +} diff --git a/buffer/src/main/java/io/netty/buffer/Freeable.java b/buffer/src/main/java/io/netty/buffer/Freeable.java new file mode 100644 index 0000000000..844aa91f7f --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/Freeable.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +public interface Freeable { + + /** + * Returns {@code true} if and only if this resource has been deallocated by {@link #free()}. + */ + boolean isFreed(); + + /** + * Deallocates the resources. + * + * The result of accessing a freed resource is unspecified and can even cause JVM crash. + * + */ + void free(); +} diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 94b3823e05..c8b5e4fa6c 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -43,6 +43,7 @@ import io.netty.channel.ChannelPipeline; * } * } * + * */ public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler { @@ -81,15 +82,24 @@ public abstract class MessageToMessageDecoder @SuppressWarnings("unchecked") I imsg = (I) msg; - O omsg = decode(ctx, imsg); - if (omsg == null) { - // Decoder consumed a message but returned null. - // Probably it needs more messages because it's an aggregator. - continue; - } - - if (ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, true)) { - notify = true; + boolean free = true; + try { + O omsg = decode(ctx, imsg); + if (omsg == null) { + // Decoder consumed a message but returned null. + // Probably it needs more messages because it's an aggregator. + continue; + } + if (omsg == imsg) { + free = false; + } + if (ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, true)) { + notify = true; + } + } finally { + if (free) { + freeInboundMessage(imsg); + } } } catch (Throwable t) { if (t instanceof CodecException) { @@ -122,4 +132,13 @@ public abstract class MessageToMessageDecoder * @throws Exception is thrown if an error accour */ protected abstract O decode(ChannelHandlerContext ctx, I msg) throws Exception; + + /** + * Is called after a message was processed via {@link #decode(ChannelHandlerContext, Object)} to free + * up any resources that is held by the inbound message. You may want to override this if your implementation + * just pass-through the input message or need it for later usage. + */ + protected void freeInboundMessage(I msg) throws Exception { + ChannelHandlerUtil.freeMessage(msg); + } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index f3762461a1..21a2974f45 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -42,6 +42,7 @@ import io.netty.channel.ChannelPromise; * } * } * + * */ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { @@ -72,14 +73,23 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessa @SuppressWarnings("unchecked") I imsg = (I) msg; - O omsg = encode(ctx, imsg); - if (omsg == null) { - // encode() might be waiting for more inbound messages to generate - // an aggregated message - keep polling. - continue; + boolean free = true; + try { + O omsg = encode(ctx, imsg); + if (omsg == null) { + // encode() might be waiting for more inbound messages to generate + // an aggregated message - keep polling. + continue; + } + if (omsg == imsg) { + free = false; + } + ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, false); + } finally { + if (free) { + freeInboundMessage(imsg); + } } - - ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, false); } catch (Throwable t) { if (t instanceof CodecException) { ctx.fireExceptionCaught(t); @@ -112,4 +122,13 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessa * @throws Exception is thrown if an error accour */ protected abstract O encode(ChannelHandlerContext ctx, I msg) throws Exception; + + /** + * Is called after a message was processed via {@link #encode(ChannelHandlerContext, Object)} to free + * up any resources that is held by the inbound message. You may want to override this if your implementation + * just pass-through the input message or need it for later usage. + */ + protected void freeInboundMessage(I msg) throws Exception { + ChannelHandlerUtil.freeMessage(msg); + } } diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java index 921fd6c929..cf4c0af4d8 100644 --- a/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java @@ -35,28 +35,31 @@ public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAd * @param protocolIdentifier supported application protocol. */ public SctpInboundByteStreamHandler(int protocolIdentifier, int streamIdentifier) { + super(SctpMessage.class); this.protocolIdentifier = protocolIdentifier; this.streamIdentifier = streamIdentifier; } + @Override + public boolean isSupported(Object msg) throws Exception { + if (super.isSupported(msg)) { + return isDecodable((SctpMessage) msg); + } + return false; + } + protected boolean isDecodable(SctpMessage msg) { return msg.protocolIdentifier() == protocolIdentifier && msg.streamIdentifier() == streamIdentifier; } @Override protected void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { - if (!isDecodable(msg)) { - ctx.nextInboundMessageBuffer().add(msg); - ctx.fireInboundBufferUpdated(); - return; - } - if (!msg.isComplete()) { throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " + "pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); } - ctx.nextInboundByteBuffer().writeBytes(msg.payloadBuffer()); + ctx.nextInboundByteBuffer().writeBytes(msg.data()); ctx.fireInboundBufferUpdated(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java index 5b35ed505a..8f0b18085a 100644 --- a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java @@ -53,7 +53,7 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd @Override protected void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { - final ByteBuf byteBuf = msg.payloadBuffer(); + final ByteBuf byteBuf = msg.data(); final int protocolIdentifier = msg.protocolIdentifier(); final int streamIdentifier = msg.streamIdentifier(); final boolean isComplete = msg.isComplete(); @@ -90,4 +90,9 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd ctx.nextInboundMessageBuffer().add(assembledMsg); assembled = true; } + + @Override + protected void freeInboundMessage(SctpMessage msg) throws Exception { + // It is an aggregator so not free it yet + } } diff --git a/pom.xml b/pom.xml index 68f3a6b793..03c451d295 100644 --- a/pom.xml +++ b/pom.xml @@ -358,6 +358,8 @@ -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+HeapDumpOnOutOfMemoryError + -Xmx2048m + -Xms1024m diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index 4a0f195276..d4b5582503 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -16,6 +16,8 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.Freeable; import io.netty.buffer.MessageBuf; /** @@ -166,6 +168,15 @@ public final class ChannelHandlerUtil { } } + /** + * Try to free up resources that are held by the message. + */ + public static void freeMessage(Object msg) throws Exception { + if (msg instanceof Freeable) { + ((Freeable) msg).free(); + } + } + private ChannelHandlerUtil() { // Unused } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index d935580e1b..b9b9a2b4a8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -58,7 +58,6 @@ public abstract class ChannelInboundMessageHandlerAdapter return Unpooled.messageBuffer(); } - @SuppressWarnings("unchecked") @Override public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { if (!beginMessageReceived(ctx)) { @@ -86,7 +85,14 @@ public abstract class ChannelInboundMessageHandlerAdapter unsupportedFound = false; ctx.fireInboundBufferUpdated(); } - messageReceived(ctx, (I) msg); + + @SuppressWarnings("unchecked") + I imsg = (I) msg; + try { + messageReceived(ctx, imsg); + } finally { + freeInboundMessage(imsg); + } } catch (Throwable t) { exceptionCaught(ctx, t); } @@ -144,4 +150,13 @@ public abstract class ChannelInboundMessageHandlerAdapter protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { // NOOP } + + /** + * Is called after a message was processed via {@link #messageReceived(ChannelHandlerContext, Object)} to free + * up any resources that is held by the inbound message. You may want to override this if your implementation + * just pass-through the input message or need it for later usage. + */ + protected void freeInboundMessage(I msg) throws Exception { + ChannelHandlerUtil.freeMessage(msg); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java index 0a1c9a2ba3..de19df0fff 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java @@ -16,15 +16,16 @@ package io.netty.channel.socket; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.DefaultByteBufHolder; import java.net.InetSocketAddress; /** * The message container that is used for {@link DatagramChannel} to communicate with the remote peer. */ -public final class DatagramPacket { +public final class DatagramPacket extends DefaultByteBufHolder { - private final ByteBuf data; private final InetSocketAddress remoteAddress; /** @@ -35,24 +36,13 @@ public final class DatagramPacket { * packet will be send */ public DatagramPacket(ByteBuf data, InetSocketAddress remoteAddress) { - if (data == null) { - throw new NullPointerException("data"); - } + super(data); if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } - this.data = data; this.remoteAddress = remoteAddress; } - - /** - * Return the data which is container. May return an empty {@link ByteBuf} - */ - public ByteBuf data() { - return data; - } - /** * The {@link InetSocketAddress} which this {@link DatagramPacket} will send to or was received from. */ @@ -60,8 +50,18 @@ public final class DatagramPacket { return remoteAddress; } + @Override + public DatagramPacket copy() { + return new DatagramPacket(data().copy(), remoteAddress()); + } + @Override public String toString() { - return "datagram(" + data.readableBytes() + "B, " + remoteAddress + ')'; + if (isFreed()) { + return "DatagramPacket{remoteAddress=" + remoteAddress().toString() + + ", data=(FREED)}"; + } + return "DatagramPacket{remoteAddress=" + remoteAddress().toString() + + ", data=" + ByteBufUtil.hexDump(data()) + '}'; } } diff --git a/transport/src/main/java/io/netty/channel/socket/SctpMessage.java b/transport/src/main/java/io/netty/channel/socket/SctpMessage.java index d38879b235..1fcbc02899 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpMessage.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpMessage.java @@ -18,16 +18,15 @@ package io.netty.channel.socket; import com.sun.nio.sctp.MessageInfo; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; +import io.netty.buffer.DefaultByteBufHolder; /** * Representation of SCTP Data Chunk */ -public final class SctpMessage { +public final class SctpMessage extends DefaultByteBufHolder { private final int streamIdentifier; private final int protocolIdentifier; - private final ByteBuf payloadBuffer; private final MessageInfo msgInfo; /** @@ -37,9 +36,9 @@ public final class SctpMessage { * @param payloadBuffer channel buffer */ public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) { + super(payloadBuffer); this.protocolIdentifier = protocolIdentifier; this.streamIdentifier = streamIdentifier; - this.payloadBuffer = payloadBuffer; msgInfo = null; } @@ -49,16 +48,13 @@ public final class SctpMessage { * @param payloadBuffer channel buffer */ public SctpMessage(MessageInfo msgInfo, ByteBuf payloadBuffer) { + super(payloadBuffer); if (msgInfo == null) { throw new NullPointerException("msgInfo"); } - if (payloadBuffer == null) { - throw new NullPointerException("payloadBuffer"); - } this.msgInfo = msgInfo; streamIdentifier = msgInfo.streamNumber(); protocolIdentifier = msgInfo.payloadProtocolID(); - this.payloadBuffer = payloadBuffer; } /** @@ -75,17 +71,6 @@ public final class SctpMessage { return protocolIdentifier; } - /** - * Return a view of the readable bytes of the payload. - */ - public ByteBuf payloadBuffer() { - if (payloadBuffer.readable()) { - return payloadBuffer.slice(); - } else { - return Unpooled.EMPTY_BUFFER; - } - } - /** * Return the {@link MessageInfo} for inbound messages or {@code null} for * outbound messages. @@ -126,7 +111,7 @@ public final class SctpMessage { return false; } - if (!payloadBuffer.equals(sctpFrame.payloadBuffer)) { + if (!data().equals(sctpFrame.data())) { return false; } @@ -137,14 +122,28 @@ public final class SctpMessage { public int hashCode() { int result = streamIdentifier; result = 31 * result + protocolIdentifier; - result = 31 * result + payloadBuffer.hashCode(); + result = 31 * result + data().hashCode(); return result; } + @Override + public SctpMessage copy() { + if (msgInfo == null) { + return new SctpMessage(protocolIdentifier, streamIdentifier, data().copy()); + } else { + return new SctpMessage(msgInfo, data().copy()); + } + } + @Override public String toString() { + if (isFreed()) { + return "SctpFrame{" + + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + + ", data=(FREED)}"; + } return "SctpFrame{" + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + - ", payloadBuffer=" + ByteBufUtil.hexDump(payloadBuffer()) + '}'; + ", data=" + ByteBufUtil.hexDump(data()) + '}'; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 4ecd6f713a..5ac79d6121 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -18,7 +18,6 @@ package io.netty.channel.socket.nio; import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; @@ -192,15 +191,34 @@ public final class NioDatagramChannel @Override protected int doReadMessages(MessageBuf buf) throws Exception { DatagramChannel ch = javaChannel(); - ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize()); - InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); - if (remoteAddress == null) { - return 0; - } + ByteBuf buffer = alloc().directBuffer(config().getReceivePacketSize()); + boolean free = true; + try { + ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes()); - data.flip(); - buf.add(new DatagramPacket(Unpooled.wrappedBuffer(data), remoteAddress)); - return 1; + InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); + if (remoteAddress == null) { + return 0; + } + buf.add(new DatagramPacket(buffer.writerIndex(buffer.writerIndex() + data.remaining()), remoteAddress)); + free = false; + return 1; + } catch (Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new ChannelException(cause); + } finally { + if (free) { + buffer.free(); + } + } } @Override @@ -237,6 +255,10 @@ public final class NioDatagramChannel // Wrote a packet. buf.remove(); + + // packet was written free up buffer + packet.free(); + if (buf.isEmpty()) { // Wrote the outbound buffer completely - clear OP_WRITE. if ((interestOps & SelectionKey.OP_WRITE) != 0) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java index cf409fe092..3b175afa7e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java @@ -22,7 +22,6 @@ import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -244,21 +243,41 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett @Override protected int doReadMessages(MessageBuf buf) throws Exception { SctpChannel ch = javaChannel(); - ByteBuffer data = ByteBuffer.allocate(config().getReceiveBufferSize()); - MessageInfo messageInfo = ch.receive(data, null, notificationHandler); - if (messageInfo == null) { - return 0; - } + ByteBuf buffer = alloc().directBuffer(config().getReceiveBufferSize()); + boolean free = true; + try { + ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes()); + MessageInfo messageInfo = ch.receive(data, null, notificationHandler); + if (messageInfo == null) { + return 0; + } - data.flip(); - buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data))); - return 1; + data.flip(); + buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining()))); + free = false; + return 1; + } catch (Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new ChannelException(cause); + } finally { + if (free) { + buffer.free(); + } + } } @Override protected int doWriteMessages(MessageBuf buf, boolean lastSpin) throws Exception { SctpMessage packet = (SctpMessage) buf.peek(); - ByteBuf data = packet.payloadBuffer(); + ByteBuf data = packet.data(); int dataLen = data.readableBytes(); ByteBuffer nioData; if (data.nioBufferCount() == 1) { @@ -293,6 +312,10 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett // Wrote a packet. buf.remove(); + + // packet was written free up buffer + packet.free(); + if (buf.isEmpty()) { // Wrote the outbound buffer completely - clear OP_WRITE. if ((interestOps & SelectionKey.OP_WRITE) != 0) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 03a13e62ea..3abe91bfcb 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -183,17 +183,23 @@ public class OioDatagramChannel extends AbstractOioMessageChannel @Override protected int doReadMessages(MessageBuf buf) throws Exception { int packetSize = config().getReceivePacketSize(); - byte[] data = new byte[packetSize]; - tmpPacket.setData(data); + // TODO: Use alloc().heapBuffer(..) but there seems to be a memory-leak, need to investigate + ByteBuf buffer = Unpooled.buffer(packetSize); + boolean free = true; + try { + int writerIndex = buffer.writerIndex(); + tmpPacket.setData(buffer.array(), writerIndex + buffer.arrayOffset(), packetSize); + socket.receive(tmpPacket); InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress(); if (remoteAddr == null) { remoteAddr = remoteAddress(); } - buf.add(new DatagramPacket(Unpooled.wrappedBuffer( - data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr)); - + DatagramPacket packet = new DatagramPacket(buffer.writerIndex(writerIndex + tmpPacket.getLength()) + .readerIndex(writerIndex), remoteAddr); + buf.add(packet); + free = false; return 1; } catch (SocketTimeoutException e) { // Expected @@ -203,27 +209,46 @@ public class OioDatagramChannel extends AbstractOioMessageChannel throw e; } return -1; + } catch (Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new ChannelException(cause); + } finally { + if (free) { + buffer.free(); + } } } @Override protected void doWriteMessages(MessageBuf buf) throws Exception { DatagramPacket p = (DatagramPacket) buf.poll(); - ByteBuf data = p.data(); - int length = data.readableBytes(); - InetSocketAddress remote = p.remoteAddress(); - if (remote != null) { - tmpPacket.setSocketAddress(remote); - } - if (data.hasArray()) { - tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); - } else { - byte[] tmp = new byte[length]; - data.getBytes(data.readerIndex(), tmp); - tmpPacket.setData(tmp); - } - socket.send(tmpPacket); + try { + ByteBuf data = p.data(); + int length = data.readableBytes(); + InetSocketAddress remote = p.remoteAddress(); + if (remote != null) { + tmpPacket.setSocketAddress(remote); + } + if (data.hasArray()) { + tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); + } else { + byte[] tmp = new byte[length]; + data.getBytes(data.readerIndex(), tmp); + tmpPacket.setData(tmp); + } + socket.send(tmpPacket); + } finally { + p.free(); + } } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java index 144dfa55fb..18cfae56be 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java @@ -22,7 +22,6 @@ import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -165,16 +164,36 @@ public class OioSctpChannel extends AbstractOioMessageChannel Set reableKeys = readSelector.selectedKeys(); try { for (SelectionKey ignored : reableKeys) { - ByteBuffer data = ByteBuffer.allocate(config().getReceiveBufferSize()); - MessageInfo messageInfo = ch.receive(data, null, notificationHandler); - if (messageInfo == null) { - return readMessages; + ByteBuf buffer = alloc().directBuffer(config().getReceiveBufferSize()); + boolean free = true; + + try { + ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes()); + MessageInfo messageInfo = ch.receive(data, null, notificationHandler); + if (messageInfo == null) { + return readMessages; + } + + data.flip(); + buf.add(new SctpMessage(messageInfo, buffer.writerIndex(buffer.writerIndex() + data.remaining()))); + free = false; + readMessages ++; + } catch (Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new ChannelException(cause); + } finally { + if (free) { + buffer.free(); + } } - - data.flip(); - buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data))); - - readMessages ++; } } finally { reableKeys.clear(); @@ -196,23 +215,27 @@ public class OioSctpChannel extends AbstractOioMessageChannel if (packet == null) { return; } - ByteBuf data = packet.payloadBuffer(); - int dataLen = data.readableBytes(); - ByteBuffer nioData; + try { + ByteBuf data = packet.data(); + int dataLen = data.readableBytes(); + ByteBuffer nioData; - if (data.nioBufferCount() != -1) { - nioData = data.nioBuffer(); - } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); + if (data.nioBufferCount() != -1) { + nioData = data.nioBuffer(); + } else { + nioData = ByteBuffer.allocate(dataLen); + data.getBytes(data.readerIndex(), nioData); + nioData.flip(); + } + + final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); + mi.payloadProtocolID(packet.protocolIdentifier()); + mi.streamNumber(packet.streamIdentifier()); + + ch.send(nioData, mi); + } finally { + packet.free(); } - - final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); - mi.payloadProtocolID(packet.protocolIdentifier()); - mi.streamNumber(packet.streamIdentifier()); - - ch.send(nioData, mi); } writableKeys.clear(); }