From 22cbaa489a6ed418679bb2e8616bd1fdf03c5fee Mon Sep 17 00:00:00 2001 From: Jestan Nirojan Date: Tue, 4 Sep 2012 17:47:48 +0530 Subject: [PATCH] Ported SCTP Transport - Ported SCTP Echo Server/Client Example - Added SctpMessage, SctpData and SctpNotification classes --- .../io/netty/example/sctp/SctpEchoClient.java | 101 ++++++++++++++++++ .../example/sctp/SctpEchoClientHandler.java | 76 +++++++++++++ .../io/netty/example/sctp/SctpEchoServer.java | 82 ++++++++++++++ .../example/sctp/SctpEchoServerHandler.java | 54 ++++++++++ .../socket/{SctpFrame.java => SctpData.java} | 8 +- .../io/netty/channel/socket/SctpMessage.java | 22 ++++ .../channel/socket/SctpNotification.java | 43 +++++++- .../channel/socket/nio/NioSctpChannel.java | 6 +- 8 files changed, 382 insertions(+), 10 deletions(-) create mode 100644 example/src/main/java/io/netty/example/sctp/SctpEchoClient.java create mode 100644 example/src/main/java/io/netty/example/sctp/SctpEchoClientHandler.java create mode 100644 example/src/main/java/io/netty/example/sctp/SctpEchoServer.java create mode 100644 example/src/main/java/io/netty/example/sctp/SctpEchoServerHandler.java rename transport/src/main/java/io/netty/channel/socket/{SctpFrame.java => SctpData.java} (92%) create mode 100644 transport/src/main/java/io/netty/channel/socket/SctpMessage.java diff --git a/example/src/main/java/io/netty/example/sctp/SctpEchoClient.java b/example/src/main/java/io/netty/example/sctp/SctpEchoClient.java new file mode 100644 index 0000000000..f2a4321cb3 --- /dev/null +++ b/example/src/main/java/io/netty/example/sctp/SctpEchoClient.java @@ -0,0 +1,101 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.sctp; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SctpChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSctpChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.net.InetSocketAddress; + +/** + * Sends one message when a connection is open and echoes back any received + * data to the server over SCTP connection. + * + * Simply put, the echo client initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public class SctpEchoClient { + + private final String host; + private final int port; + private final int firstMessageSize; + + public SctpEchoClient(String host, int port, int firstMessageSize) { + this.host = host; + this.port = port; + this.firstMessageSize = firstMessageSize; + } + + public void run() throws Exception { + // Configure the client. + Bootstrap b = new Bootstrap(); + try { + b.group(new NioEventLoopGroup()) + .channel(new NioSctpChannel()) + .option(ChannelOption.SCTP_NODELAY, true) + .remoteAddress(new InetSocketAddress(host, port)) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SctpChannel ch) throws Exception { + ch.pipeline().addLast( + new LoggingHandler(LogLevel.INFO), + new SctpEchoClientHandler(firstMessageSize)); + } + }); + + // Start the client. + ChannelFuture f = b.connect().sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + b.shutdown(); + } + } + + public static void main(String[] args) throws Exception { + // Print usage if no argument is specified. + if (args.length < 2 || args.length > 3) { + System.err.println( + "Usage: " + SctpEchoClient.class.getSimpleName() + + " []"); + return; + } + + // Parse options. + final String host = "localhost"; + final int port = Integer.parseInt(args[1]); + final int firstMessageSize; + if (args.length == 3) { + firstMessageSize = Integer.parseInt(args[2]); + } else { + firstMessageSize = 256; + } + + new SctpEchoClient(host, port, firstMessageSize).run(); + } +} diff --git a/example/src/main/java/io/netty/example/sctp/SctpEchoClientHandler.java b/example/src/main/java/io/netty/example/sctp/SctpEchoClientHandler.java new file mode 100644 index 0000000000..9394313a8b --- /dev/null +++ b/example/src/main/java/io/netty/example/sctp/SctpEchoClientHandler.java @@ -0,0 +1,76 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.sctp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SctpData; +import io.netty.channel.socket.SctpMessage; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Handler implementation for the SCTP echo client. It initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public class SctpEchoClientHandler extends ChannelInboundMessageHandlerAdapter { + + private static final Logger logger = Logger.getLogger( + SctpEchoClientHandler.class.getName()); + + private final ByteBuf firstMessage; + + /** + * Creates a client-side handler. + */ + public SctpEchoClientHandler(int firstMessageSize) { + if (firstMessageSize <= 0) { + throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize); + } + firstMessage = Unpooled.buffer(firstMessageSize); + for (int i = 0; i < firstMessage.capacity(); i++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.write(new SctpData(0, 0, firstMessage)); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { + if (msg instanceof SctpData) { + MessageBuf out = ctx.nextOutboundMessageBuffer(); + out.add(msg); + ctx.flush(); + } else { + logger.log(Level.INFO, "Received SCTP Notification", msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); + ctx.close(); + } +} diff --git a/example/src/main/java/io/netty/example/sctp/SctpEchoServer.java b/example/src/main/java/io/netty/example/sctp/SctpEchoServer.java new file mode 100644 index 0000000000..0658ac1010 --- /dev/null +++ b/example/src/main/java/io/netty/example/sctp/SctpEchoServer.java @@ -0,0 +1,82 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.sctp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SctpChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSctpServerChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.net.InetSocketAddress; + +/** + * Echoes back any received data from a SCTP client. + */ +public class SctpEchoServer { + + private final int port; + + public SctpEchoServer(int port) { + this.port = port; + } + + public void run() throws Exception { + // Configure the server. + ServerBootstrap b = new ServerBootstrap(); + try { + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) + .channel(new NioSctpServerChannel()) + .option(ChannelOption.SO_BACKLOG, 100) + .localAddress(new InetSocketAddress(port)) + .childOption(ChannelOption.SCTP_NODELAY, true) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SctpChannel ch) throws Exception { + ch.pipeline().addLast( + new LoggingHandler(LogLevel.INFO), + new SctpEchoServerHandler()); + } + }); + + // Start the server. + ChannelFuture f = b.bind().sync(); + + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down all event loops to terminate all threads. + b.shutdown(); + } + } + + public static void main(String[] args) throws Exception { + int port; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } else { + port = 2556; + } + new SctpEchoServer(port).run(); + } +} diff --git a/example/src/main/java/io/netty/example/sctp/SctpEchoServerHandler.java b/example/src/main/java/io/netty/example/sctp/SctpEchoServerHandler.java new file mode 100644 index 0000000000..77d772ebdb --- /dev/null +++ b/example/src/main/java/io/netty/example/sctp/SctpEchoServerHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.sctp; + +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SctpData; +import io.netty.channel.socket.SctpMessage; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Handler implementation for the SCTP echo server. + */ +@Sharable +public class SctpEchoServerHandler extends ChannelInboundMessageHandlerAdapter { + + private static final Logger logger = Logger.getLogger( + SctpEchoServerHandler.class.getName()); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); + ctx.close(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { + if (msg instanceof SctpData) { + MessageBuf out = ctx.nextOutboundMessageBuffer(); + out.add(msg); + ctx.flush(); + } else { + logger.log(Level.INFO, "Received SCTP Notification", msg); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/SctpFrame.java b/transport/src/main/java/io/netty/channel/socket/SctpData.java similarity index 92% rename from transport/src/main/java/io/netty/channel/socket/SctpFrame.java rename to transport/src/main/java/io/netty/channel/socket/SctpData.java index 469fc8e869..25804872c0 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpFrame.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpData.java @@ -23,7 +23,7 @@ import io.netty.buffer.Unpooled; /** * Representation of SCTP Data Chunk */ -public final class SctpFrame { +public final class SctpData implements SctpMessage { private final int streamIdentifier; private final int protocolIdentifier; @@ -37,13 +37,13 @@ public final class SctpFrame { * @param streamIdentifier that you want to send the payload * @param payloadBuffer channel buffer */ - public SctpFrame(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) { + public SctpData(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) { this.protocolIdentifier = protocolIdentifier; this.streamIdentifier = streamIdentifier; this.payloadBuffer = payloadBuffer; } - public SctpFrame(MessageInfo msgInfo, ByteBuf payloadBuffer) { + public SctpData(MessageInfo msgInfo, ByteBuf payloadBuffer) { this.msgInfo = msgInfo; this.streamIdentifier = msgInfo.streamNumber(); this.protocolIdentifier = msgInfo.payloadProtocolID(); @@ -80,7 +80,7 @@ public final class SctpFrame { return false; } - SctpFrame sctpFrame = (SctpFrame) o; + SctpData sctpFrame = (SctpData) o; if (protocolIdentifier != sctpFrame.protocolIdentifier) { return false; diff --git a/transport/src/main/java/io/netty/channel/socket/SctpMessage.java b/transport/src/main/java/io/netty/channel/socket/SctpMessage.java new file mode 100644 index 0000000000..545dcd5c96 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/SctpMessage.java @@ -0,0 +1,22 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +/** + * A marker interface for a SCTP/IP message + */ +public interface SctpMessage { +} diff --git a/transport/src/main/java/io/netty/channel/socket/SctpNotification.java b/transport/src/main/java/io/netty/channel/socket/SctpNotification.java index 6381737bbf..a02e4bac73 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpNotification.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpNotification.java @@ -17,7 +17,7 @@ package io.netty.channel.socket; import com.sun.nio.sctp.Notification; -public class SctpNotification { +public final class SctpNotification implements SctpMessage { private Notification notification; private Object attachment; @@ -27,11 +27,48 @@ public class SctpNotification { this.attachment = attachment; } - public Notification getNotification() { + public Notification notification() { return notification; } - public Object getAttachment() { + public Object attachment() { return attachment; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SctpNotification that = (SctpNotification) o; + + if (!attachment.equals(that.attachment)) { + return false; + } + + if (!notification.equals(that.notification)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = notification.hashCode(); + result = 31 * result + attachment.hashCode(); + return result; + } + + @Override + public String toString() { + return "SctpNotification{" + + "notification=" + notification + + ", attachment=" + attachment + + '}'; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java index b1e01fbfc8..b4e4e72391 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpChannel.java @@ -28,7 +28,7 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.socket.DefaultSctpChannelConfig; import io.netty.channel.socket.SctpChannelConfig; -import io.netty.channel.socket.SctpFrame; +import io.netty.channel.socket.SctpData; import io.netty.channel.socket.SctpNotificationHandler; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -226,13 +226,13 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett } data.flip(); - buf.add(new SctpFrame(messageInfo, Unpooled.wrappedBuffer(data))); + buf.add(new SctpData(messageInfo, Unpooled.wrappedBuffer(data))); return 1; } @Override protected int doWriteMessages(MessageBuf buf, boolean lastSpin) throws Exception { - SctpFrame packet = (SctpFrame) buf.peek(); + SctpData packet = (SctpData) buf.peek(); ByteBuf data = packet.getPayloadBuffer(); int dataLen = data.readableBytes(); ByteBuffer nioData;