diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java new file mode 100644 index 0000000000..4873bccea8 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpInboundByteStreamHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.sctp; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SctpMessage; +import io.netty.handler.codec.CodecException; + +/** + * A ChannelHandler which receives {@link SctpMessage} belongs to a application protocol form a specific SCTP Stream + * and decode it as {@link ByteBuf}. + */ +public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAdapter { + private final int protocolIdentifier; + private final int streamIdentifier; + + + /** + * @param streamIdentifier accepted stream number, this should be >=0 or <= max stream number of the association. + * @param protocolIdentifier supported application protocol. + */ + public SctpInboundByteStreamHandler(int protocolIdentifier, int streamIdentifier) { + this.protocolIdentifier = protocolIdentifier; + this.streamIdentifier = streamIdentifier; + } + + protected boolean isDecodable(SctpMessage msg) { + return msg.getProtocolIdentifier() == protocolIdentifier && msg.getStreamIdentifier() == streamIdentifier; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { + if (!isDecodable(msg)) { + ctx.nextInboundMessageBuffer().add(msg); + ctx.fireInboundBufferUpdated(); + return; + } + + if (!msg.isComplete()) { + throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " + + "pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); + } + + ctx.nextInboundByteBuffer().writeBytes(msg.getPayloadBuffer()); + ctx.fireInboundBufferUpdated(); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java new file mode 100644 index 0000000000..afdc890567 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java @@ -0,0 +1,76 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.sctp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SctpMessage; + +import java.util.HashMap; +import java.util.Map; + +public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAdapter { + private Map fragments = new HashMap(); + + /** + */ + public SctpMessageCompletionHandler() { + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { + + final ByteBuf byteBuf = msg.getPayloadBuffer(); + final int protocolIdentifier = msg.getProtocolIdentifier(); + final int streamIdentifier = msg.getStreamIdentifier(); + final boolean isComplete = msg.isComplete(); + + ByteBuf frag; + + if (fragments.containsKey(streamIdentifier)) { + frag = fragments.remove(streamIdentifier); + } else { + frag = Unpooled.EMPTY_BUFFER; + } + + if (isComplete && !frag.readable()) { + //data chunk is not fragmented + fireAssembledMessage(ctx, msg); + } else if (!isComplete && frag.readable()) { + //more message to complete + fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf)); + } else if (isComplete && frag.readable()) { + //last message to complete + fragments.remove(streamIdentifier); + SctpMessage assembledMsg = new SctpMessage( + protocolIdentifier, + streamIdentifier, + Unpooled.wrappedBuffer(frag, byteBuf)); + fireAssembledMessage(ctx, assembledMsg); + } else { + //first incomplete message + fragments.put(streamIdentifier, byteBuf); + } + } + + protected void fireAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) { + ctx.nextInboundMessageBuffer().add(assembledMsg); + ctx.fireInboundBufferUpdated(); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java similarity index 56% rename from codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageDecoder.java rename to codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java index 08076adb3a..cfeb2c570a 100644 --- a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageDecoder.java @@ -16,24 +16,24 @@ package io.netty.handler.codec.sctp; -import com.sun.nio.sctp.MessageInfo; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.SctpMessage; +import io.netty.handler.codec.CodecException; import io.netty.handler.codec.MessageToMessageDecoder; -public class SctpMessageDecoder extends MessageToMessageDecoder { - private ByteBuf cumulation = Unpooled.EMPTY_BUFFER; +public abstract class SctpMessageToMessageDecoder extends MessageToMessageDecoder { @Override - public ByteBuf decode(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { - ByteBuf byteBuf = cumulation = Unpooled.wrappedBuffer(cumulation, msg.getPayloadBuffer()); - if (msg.isComplete()) { - cumulation = Unpooled.EMPTY_BUFFER; - return byteBuf; + public boolean isDecodable(Object msg) throws Exception { + if (msg instanceof SctpMessage) { + SctpMessage sctpMsg = (SctpMessage) msg; + if (sctpMsg.isComplete()) { + return true; + } + + throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in " + + "the pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); } else { - return null; + return false; } } } diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageEncoder.java new file mode 100644 index 0000000000..fe94baa6fc --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageToMessageEncoder.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.sctp; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.SctpMessage; +import io.netty.handler.codec.MessageToMessageEncoder; + +public abstract class SctpMessageToMessageEncoder extends MessageToMessageEncoder { + + /** + * Returns {@code true} if and only if the specified message can be encoded by this encoder. + * + * @param msg the message + */ + public boolean isEncodable(Object msg) throws Exception { + return true; + } + + public abstract SctpMessage encode(ChannelHandlerContext ctx, I msg) throws Exception; +} diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java similarity index 74% rename from codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageEncoder.java rename to codec/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java index 103b7b8b5a..127793a08f 100644 --- a/codec/src/main/java/io/netty/handler/codec/sctp/SctpMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/sctp/SctpOutboundByteStreamHandler.java @@ -20,15 +20,25 @@ import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundMessageHandlerAdapter; +import io.netty.channel.ChannelOutboundByteHandlerAdapter; import io.netty.channel.socket.SctpMessage; import io.netty.handler.codec.EncoderException; -public class SctpMessageEncoder extends ChannelOutboundMessageHandlerAdapter { +/** + * A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream + * with given protocol identifier. + * + */ +public class SctpOutboundByteStreamHandler extends ChannelOutboundByteHandlerAdapter { private final int streamIdentifier; private final int protocolIdentifier; - public SctpMessageEncoder(int streamIdentifier, int protocolIdentifier) { + + /** + * @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association. + * @param protocolIdentifier supported application protocol id. + */ + public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) { this.streamIdentifier = streamIdentifier; this.protocolIdentifier = protocolIdentifier; } diff --git a/codec/src/main/java/io/netty/handler/codec/sctp/package-info.java b/codec/src/main/java/io/netty/handler/codec/sctp/package-info.java index c277b3b769..e4fac28733 100644 --- a/codec/src/main/java/io/netty/handler/codec/sctp/package-info.java +++ b/codec/src/main/java/io/netty/handler/codec/sctp/package-info.java @@ -15,9 +15,6 @@ */ /** - * Encoder and decoder which transform a {@link io.netty.channel.socket.SctpMessage} into a - * {@link io.netty.buffer.ByteBuf} and vice versa. - * - * @apiviz.exclude \.oneone\. + * Decoder and encoders to manage message completion and multi-streaming codec in SCTP/IP. */ package io.netty.handler.codec.sctp;