Added SCTP Codec Handlers + minor refactoring

This commit is contained in:
Jestan Nirojan 2012-09-23 14:12:32 +08:00
parent bf22173ed1
commit b268f0b333
12 changed files with 243 additions and 149 deletions

View File

@ -0,0 +1,39 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.sctp;
import com.sun.nio.sctp.MessageInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.MessageToMessageDecoder;
public class SctpMessageDecoder extends MessageToMessageDecoder<SctpMessage, ByteBuf> {
private ByteBuf cumulation = Unpooled.EMPTY_BUFFER;
@Override
public ByteBuf decode(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
ByteBuf byteBuf = cumulation = Unpooled.wrappedBuffer(cumulation, msg.getPayloadBuffer());
if (msg.isComplete()) {
cumulation = Unpooled.EMPTY_BUFFER;
return byteBuf;
} else {
return null;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.sctp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.socket.SctpMessage;
import io.netty.handler.codec.EncoderException;
public class SctpMessageEncoder extends ChannelOutboundMessageHandlerAdapter<ByteBuf> {
private final int streamIdentifier;
private final int protocolIdentifier;
public SctpMessageEncoder(int streamIdentifier, int protocolIdentifier) {
this.streamIdentifier = streamIdentifier;
this.protocolIdentifier = protocolIdentifier;
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ByteBuf in = ctx.outboundByteBuffer();
try {
MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
ByteBuf payload = Unpooled.buffer(in.readableBytes());
payload.writeBytes(in);
out.add(new SctpMessage(streamIdentifier, protocolIdentifier, payload));
in.discardReadBytes();
} catch (Throwable t) {
ctx.fireExceptionCaught(new EncoderException(t));
}
ctx.flush(future);
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Encoder and decoder which transform a {@link io.netty.channel.socket.SctpMessage} into a
* {@link io.netty.buffer.ByteBuf} and vice versa.
*
* @apiviz.exclude \.oneone\.
*/
package io.netty.handler.codec.sctp;

View File

@ -77,16 +77,16 @@ public class OioSctpEchoClient {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Print usage if no argument is specified. // Print usage if no argument is specified.
if (args.length < 2 || args.length > 3) { /* if (args.length < 2 || args.length > 3) {
System.err.println( System.err.println(
"Usage: " + OioSctpEchoClient.class.getSimpleName() + "Usage: " + OioSctpEchoClient.class.getSimpleName() +
" <host> <port> [<first message size>]"); " <host> <port> [<first message size>]");
return; return;
} }*/
// Parse options. // Parse options.
final String host = "localhost"; final String host = "localhost";
final int port = Integer.parseInt(args[1]); final int port = 2556;
final int firstMessageSize; final int firstMessageSize;
if (args.length == 3) { if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]); firstMessageSize = Integer.parseInt(args[2]);

View File

@ -20,7 +20,6 @@ import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.socket.SctpData;
import io.netty.channel.socket.SctpMessage; import io.netty.channel.socket.SctpMessage;
import java.util.logging.Level; import java.util.logging.Level;
@ -53,16 +52,14 @@ public class SctpEchoClientHandler extends ChannelInboundMessageHandlerAdapter<S
@Override @Override
public void channelActive(ChannelHandlerContext ctx) { public void channelActive(ChannelHandlerContext ctx) {
ctx.write(new SctpData(0, 0, firstMessage)); ctx.write(new SctpMessage(0, 0, firstMessage));
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
if (msg instanceof SctpData) { MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
MessageBuf<Object> out = ctx.nextOutboundMessageBuffer(); out.add(msg);
out.add(msg); ctx.flush();
ctx.flush();
}
} }
@Override @Override

View File

@ -19,7 +19,6 @@ import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.socket.SctpData;
import io.netty.channel.socket.SctpMessage; import io.netty.channel.socket.SctpMessage;
import java.util.logging.Level; import java.util.logging.Level;
@ -43,10 +42,8 @@ public class SctpEchoServerHandler extends ChannelInboundMessageHandlerAdapter<S
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
if (msg instanceof SctpData) { MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
MessageBuf<Object> out = ctx.nextOutboundMessageBuffer(); out.add(msg);
out.add(msg); ctx.flush();
ctx.flush();
}
} }
} }

View File

@ -1,120 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import com.sun.nio.sctp.MessageInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
* Representation of SCTP Data Chunk
*/
public final class SctpData implements SctpMessage {
private final int streamIdentifier;
private final int protocolIdentifier;
private final ByteBuf payloadBuffer;
private MessageInfo msgInfo;
/**
* Essential data that is being carried within SCTP Data Chunk
* @param protocolIdentifier of payload
* @param streamIdentifier that you want to send the payload
* @param payloadBuffer channel buffer
*/
public SctpData(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) {
this.protocolIdentifier = protocolIdentifier;
this.streamIdentifier = streamIdentifier;
this.payloadBuffer = payloadBuffer;
}
public SctpData(MessageInfo msgInfo, ByteBuf payloadBuffer) {
this.msgInfo = msgInfo;
this.streamIdentifier = msgInfo.streamNumber();
this.protocolIdentifier = msgInfo.payloadProtocolID();
this.payloadBuffer = payloadBuffer;
}
public int getStreamIdentifier() {
return streamIdentifier;
}
public int getProtocolIdentifier() {
return protocolIdentifier;
}
public ByteBuf getPayloadBuffer() {
if (payloadBuffer.readable()) {
return payloadBuffer.slice();
} else {
return Unpooled.EMPTY_BUFFER;
}
}
public MessageInfo getMessageInfo() {
return msgInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SctpData sctpFrame = (SctpData) o;
if (protocolIdentifier != sctpFrame.protocolIdentifier) {
return false;
}
if (streamIdentifier != sctpFrame.streamIdentifier) {
return false;
}
if (!payloadBuffer.equals(sctpFrame.payloadBuffer)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = streamIdentifier;
result = 31 * result + protocolIdentifier;
result = 31 * result + payloadBuffer.hashCode();
return result;
}
@Override
public String toString() {
return new StringBuilder().
append("SctpFrame{").
append("streamIdentifier=").
append(streamIdentifier).
append(", protocolIdentifier=").
append(protocolIdentifier).
append(", payloadBuffer=").
append(ByteBufUtil.hexDump(getPayloadBuffer())).
append('}').toString();
}
}

View File

@ -15,8 +15,115 @@
*/ */
package io.netty.channel.socket; package io.netty.channel.socket;
import com.sun.nio.sctp.MessageInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/** /**
* A marker interface for a SCTP/IP message * Representation of SCTP Data Chunk
*/ */
public interface SctpMessage { public final class SctpMessage {
private final int streamIdentifier;
private final int protocolIdentifier;
private final ByteBuf payloadBuffer;
private MessageInfo msgInfo;
/**
* Essential data that is being carried within SCTP Data Chunk
* @param protocolIdentifier of payload
* @param streamIdentifier that you want to send the payload
* @param payloadBuffer channel buffer
*/
public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) {
this.protocolIdentifier = protocolIdentifier;
this.streamIdentifier = streamIdentifier;
this.payloadBuffer = payloadBuffer;
}
public SctpMessage(MessageInfo msgInfo, ByteBuf payloadBuffer) {
this.msgInfo = msgInfo;
this.streamIdentifier = msgInfo.streamNumber();
this.protocolIdentifier = msgInfo.payloadProtocolID();
this.payloadBuffer = payloadBuffer;
}
public int getStreamIdentifier() {
return streamIdentifier;
}
public int getProtocolIdentifier() {
return protocolIdentifier;
}
public ByteBuf getPayloadBuffer() {
if (payloadBuffer.readable()) {
return payloadBuffer.slice();
} else {
return Unpooled.EMPTY_BUFFER;
}
}
public MessageInfo getMessageInfo() {
return msgInfo;
}
public boolean isComplete() {
if (msgInfo != null) {
return msgInfo.isComplete();
} else {
//all outbound sctp messages are complete
return true;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SctpMessage sctpFrame = (SctpMessage) o;
if (protocolIdentifier != sctpFrame.protocolIdentifier) {
return false;
}
if (streamIdentifier != sctpFrame.streamIdentifier) {
return false;
}
if (!payloadBuffer.equals(sctpFrame.payloadBuffer)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = streamIdentifier;
result = 31 * result + protocolIdentifier;
result = 31 * result + payloadBuffer.hashCode();
return result;
}
@Override
public String toString() {
return new StringBuilder().
append("SctpFrame{").
append("streamIdentifier=").
append(streamIdentifier).
append(", protocolIdentifier=").
append(protocolIdentifier).
append(", payloadBuffer=").
append(ByteBufUtil.hexDump(getPayloadBuffer())).
append('}').toString();
}
} }

View File

@ -17,12 +17,12 @@ package io.netty.channel.socket;
import com.sun.nio.sctp.Notification; import com.sun.nio.sctp.Notification;
public final class SctpNotification implements SctpMessage { public final class SctpNotificationEvent {
private Notification notification; private Notification notification;
private Object attachment; private Object attachment;
public SctpNotification(Notification notification, Object attachment) { public SctpNotificationEvent(Notification notification, Object attachment) {
this.notification = notification; this.notification = notification;
this.attachment = attachment; this.attachment = attachment;
} }
@ -44,7 +44,7 @@ public final class SctpNotification implements SctpMessage {
return false; return false;
} }
SctpNotification that = (SctpNotification) o; SctpNotificationEvent that = (SctpNotificationEvent) o;
if (!attachment.equals(that.attachment)) { if (!attachment.equals(that.attachment)) {
return false; return false;

View File

@ -22,7 +22,6 @@ import com.sun.nio.sctp.Notification;
import com.sun.nio.sctp.PeerAddressChangeNotification; import com.sun.nio.sctp.PeerAddressChangeNotification;
import com.sun.nio.sctp.SendFailedNotification; import com.sun.nio.sctp.SendFailedNotification;
import com.sun.nio.sctp.ShutdownNotification; import com.sun.nio.sctp.ShutdownNotification;
import io.netty.channel.ChannelPipeline;
public class SctpNotificationHandler extends AbstractNotificationHandler<Object> { public class SctpNotificationHandler extends AbstractNotificationHandler<Object> {
@ -58,7 +57,7 @@ public class SctpNotificationHandler extends AbstractNotificationHandler<Object>
} }
private void updateInboundBuffer(Notification notification, Object o) { private void updateInboundBuffer(Notification notification, Object o) {
sctpChannel.pipeline().inboundMessageBuffer().add(new SctpNotification(notification, o)); sctpChannel.pipeline().fireUserEventTriggered(new SctpNotificationEvent(notification, o));
} }
} }

View File

@ -29,7 +29,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSctpChannelConfig; import io.netty.channel.socket.DefaultSctpChannelConfig;
import io.netty.channel.socket.SctpChannelConfig; import io.netty.channel.socket.SctpChannelConfig;
import io.netty.channel.socket.SctpData; import io.netty.channel.socket.SctpMessage;
import io.netty.channel.socket.SctpNotificationHandler; import io.netty.channel.socket.SctpNotificationHandler;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -229,13 +229,13 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
} }
data.flip(); data.flip();
buf.add(new SctpData(messageInfo, Unpooled.wrappedBuffer(data))); buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data)));
return 1; return 1;
} }
@Override @Override
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception { protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
SctpData packet = (SctpData) buf.peek(); SctpMessage packet = (SctpMessage) buf.peek();
ByteBuf data = packet.getPayloadBuffer(); ByteBuf data = packet.getPayloadBuffer();
int dataLen = data.readableBytes(); int dataLen = data.readableBytes();
ByteBuffer nioData; ByteBuffer nioData;

View File

@ -29,7 +29,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSctpChannelConfig; import io.netty.channel.socket.DefaultSctpChannelConfig;
import io.netty.channel.socket.SctpChannelConfig; import io.netty.channel.socket.SctpChannelConfig;
import io.netty.channel.socket.SctpData; import io.netty.channel.socket.SctpMessage;
import io.netty.channel.socket.SctpNotificationHandler; import io.netty.channel.socket.SctpNotificationHandler;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -120,7 +120,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
} }
data.flip(); data.flip();
buf.add(new SctpData(messageInfo, Unpooled.wrappedBuffer(data))); buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data)));
if (readSuspended) { if (readSuspended) {
return 0; return 0;
@ -132,7 +132,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override @Override
protected void doWriteMessages(MessageBuf<Object> buf) throws Exception { protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
SctpData packet = (SctpData) buf.poll(); SctpMessage packet = (SctpMessage) buf.poll();
ByteBuf data = packet.getPayloadBuffer(); ByteBuf data = packet.getPayloadBuffer();
int dataLen = data.readableBytes(); int dataLen = data.readableBytes();
ByteBuffer nioData; ByteBuffer nioData;