Added SCTP Codec

This commit is contained in:
Jestan Nirojan 2012-09-29 01:42:21 +08:00
parent 942f05d336
commit 64ebece730
6 changed files with 201 additions and 19 deletions

View File

@ -0,0 +1,63 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.sctp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.CodecException;
/**
* A ChannelHandler which receives {@link SctpMessage} belongs to a application protocol form a specific SCTP Stream
* and decode it as {@link ByteBuf}.
*/
public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAdapter<SctpMessage> {
private final int protocolIdentifier;
private final int streamIdentifier;
/**
* @param streamIdentifier accepted stream number, this should be >=0 or <= max stream number of the association.
* @param protocolIdentifier supported application protocol.
*/
public SctpInboundByteStreamHandler(int protocolIdentifier, int streamIdentifier) {
this.protocolIdentifier = protocolIdentifier;
this.streamIdentifier = streamIdentifier;
}
protected boolean isDecodable(SctpMessage msg) {
return msg.getProtocolIdentifier() == protocolIdentifier && msg.getStreamIdentifier() == streamIdentifier;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
if (!isDecodable(msg)) {
ctx.nextInboundMessageBuffer().add(msg);
ctx.fireInboundBufferUpdated();
return;
}
if (!msg.isComplete()) {
throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " +
"pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName()));
}
ctx.nextInboundByteBuffer().writeBytes(msg.getPayloadBuffer());
ctx.fireInboundBufferUpdated();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.sctp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.socket.SctpMessage;
import java.util.HashMap;
import java.util.Map;
public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAdapter<SctpMessage> {
private Map<Integer, ByteBuf> fragments = new HashMap<Integer, ByteBuf>();
/**
*/
public SctpMessageCompletionHandler() {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
final ByteBuf byteBuf = msg.getPayloadBuffer();
final int protocolIdentifier = msg.getProtocolIdentifier();
final int streamIdentifier = msg.getStreamIdentifier();
final boolean isComplete = msg.isComplete();
ByteBuf frag;
if (fragments.containsKey(streamIdentifier)) {
frag = fragments.remove(streamIdentifier);
} else {
frag = Unpooled.EMPTY_BUFFER;
}
if (isComplete && !frag.readable()) {
//data chunk is not fragmented
fireAssembledMessage(ctx, msg);
} else if (!isComplete && frag.readable()) {
//more message to complete
fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
} else if (isComplete && frag.readable()) {
//last message to complete
fragments.remove(streamIdentifier);
SctpMessage assembledMsg = new SctpMessage(
protocolIdentifier,
streamIdentifier,
Unpooled.wrappedBuffer(frag, byteBuf));
fireAssembledMessage(ctx, assembledMsg);
} else {
//first incomplete message
fragments.put(streamIdentifier, byteBuf);
}
}
protected void fireAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) {
ctx.nextInboundMessageBuffer().add(assembledMsg);
ctx.fireInboundBufferUpdated();
}
}

View File

@ -16,24 +16,24 @@
package io.netty.handler.codec.sctp;
import com.sun.nio.sctp.MessageInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.MessageToMessageDecoder;
public class SctpMessageDecoder extends MessageToMessageDecoder<SctpMessage, ByteBuf> {
private ByteBuf cumulation = Unpooled.EMPTY_BUFFER;
public abstract class SctpMessageToMessageDecoder<O> extends MessageToMessageDecoder<SctpMessage, O> {
@Override
public ByteBuf decode(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
ByteBuf byteBuf = cumulation = Unpooled.wrappedBuffer(cumulation, msg.getPayloadBuffer());
if (msg.isComplete()) {
cumulation = Unpooled.EMPTY_BUFFER;
return byteBuf;
public boolean isDecodable(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
SctpMessage sctpMsg = (SctpMessage) msg;
if (sctpMsg.isComplete()) {
return true;
}
throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in " +
"the pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName()));
} else {
return null;
return false;
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.sctp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.MessageToMessageEncoder;
public abstract class SctpMessageToMessageEncoder<I> extends MessageToMessageEncoder<I, SctpMessage> {
/**
* Returns {@code true} if and only if the specified message can be encoded by this encoder.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return true;
}
public abstract SctpMessage encode(ChannelHandlerContext ctx, I msg) throws Exception;
}

View File

@ -20,15 +20,25 @@ import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelOutboundByteHandlerAdapter;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.EncoderException;
public class SctpMessageEncoder extends ChannelOutboundMessageHandlerAdapter<ByteBuf> {
/**
* A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream
* with given protocol identifier.
*
*/
public class SctpOutboundByteStreamHandler extends ChannelOutboundByteHandlerAdapter {
private final int streamIdentifier;
private final int protocolIdentifier;
public SctpMessageEncoder(int streamIdentifier, int protocolIdentifier) {
/**
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
* @param protocolIdentifier supported application protocol id.
*/
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) {
this.streamIdentifier = streamIdentifier;
this.protocolIdentifier = protocolIdentifier;
}

View File

@ -15,9 +15,6 @@
*/
/**
* Encoder and decoder which transform a {@link io.netty.channel.socket.SctpMessage} into a
* {@link io.netty.buffer.ByteBuf} and vice versa.
*
* @apiviz.exclude \.oneone\.
* Decoder and encoders to manage message completion and multi-streaming codec in SCTP/IP.
*/
package io.netty.handler.codec.sctp;