From 08b509b2094f565ce03c5ef92760ff5646d101a0 Mon Sep 17 00:00:00 2001 From: Jestan Nirojan Date: Sun, 30 Oct 2011 19:44:11 +0530 Subject: [PATCH] 1)refactored sctp socket option implementation 2)tested sample sctp program under 20,000 msg per sec --- .../socket/sctp/DefaultSctpChannelConfig.java | 28 +++++++++---------- .../socket/sctp/SctpChannelConfig.java | 10 +++---- .../socket/sctp/SctpServerChannelConfig.java | 13 ++++----- .../socket/sctp/ServerSctpChannelConfig.java | 8 +++--- .../jboss/netty/example/sctp/SctpClient.java | 10 ++++--- .../netty/example/sctp/SctpClientHandler.java | 1 - .../jboss/netty/example/sctp/SctpServer.java | 9 ++++-- .../netty/example/sctp/SctpServerHandler.java | 2 +- 8 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java index fd7e221998..5dddb59f74 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java @@ -16,7 +16,7 @@ package org.jboss.netty.channel.socket.sctp; import com.sun.nio.sctp.SctpChannel; -import com.sun.nio.sctp.SctpStandardSocketOption; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.DefaultChannelConfig; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; @@ -59,7 +59,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann } else if (key.equals("soLinger")) { setSoLinger(ConversionUtil.toInt(value)); } else if (key.equals("sctpInitMaxStreams")) { - setInitMaxStreams((SctpStandardSocketOption.InitMaxStreams) value); + setInitMaxStreams((InitMaxStreams) value); } else { return false; } @@ -69,7 +69,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public boolean isSctpNoDelay() { try { - return channel.getOption(SctpStandardSocketOption.SCTP_NODELAY); + return channel.getOption(SCTP_NODELAY); } catch (IOException e) { throw new ChannelException(e); } @@ -78,7 +78,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setSctpNoDelay(boolean tcpNoDelay) { try { - channel.setOption(SctpStandardSocketOption.SCTP_NODELAY, tcpNoDelay); + channel.setOption(SCTP_NODELAY, tcpNoDelay); } catch (IOException e) { throw new ChannelException(e); } @@ -87,7 +87,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public int getSoLinger() { try { - return channel.getOption(SctpStandardSocketOption.SO_LINGER); + return channel.getOption(SO_LINGER); } catch (IOException e) { throw new ChannelException(e); } @@ -96,7 +96,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setSoLinger(int soLinger) { try { - channel.setOption(SctpStandardSocketOption.SO_LINGER, soLinger); + channel.setOption(SO_LINGER, soLinger); } catch (IOException e) { throw new ChannelException(e); } @@ -105,7 +105,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public int getSendBufferSize() { try { - return channel.getOption(SctpStandardSocketOption.SO_SNDBUF); + return channel.getOption(SO_SNDBUF); } catch (IOException e) { throw new ChannelException(e); } @@ -114,7 +114,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setSendBufferSize(int sendBufferSize) { try { - channel.setOption(SctpStandardSocketOption.SO_SNDBUF, sendBufferSize); + channel.setOption(SO_SNDBUF, sendBufferSize); } catch (IOException e) { throw new ChannelException(e); } @@ -123,7 +123,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public int getReceiveBufferSize() { try { - return channel.getOption(SctpStandardSocketOption.SO_RCVBUF); + return channel.getOption(SO_RCVBUF); } catch (IOException e) { throw new ChannelException(e); } @@ -132,25 +132,25 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setReceiveBufferSize(int receiveBufferSize) { try { - channel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize); + channel.setOption(SO_RCVBUF, receiveBufferSize); } catch (IOException e) { throw new ChannelException(e); } } @Override - public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() { + public InitMaxStreams getInitMaxStreams() { try { - return channel.getOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS); + return channel.getOption(SCTP_INIT_MAXSTREAMS); } catch (IOException e) { throw new ChannelException(e); } } @Override - public void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams) { + public void setInitMaxStreams(InitMaxStreams initMaxStreams) { try { - channel.setOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS, initMaxStreams); + channel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams); } catch (IOException e) { throw new ChannelException(e); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java index 2772e6ba4e..75367aae7a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java @@ -15,11 +15,9 @@ */ package org.jboss.netty.channel.socket.sctp; -import com.sun.nio.sctp.SctpStandardSocketOption; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; import org.jboss.netty.channel.ChannelConfig; -import java.net.SocketAddress; - /** * A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.sctp.SctpChannel}. *

@@ -40,7 +38,7 @@ import java.net.SocketAddress; * * {@code "sendBufferSize"}{@link #setSendBufferSize(int)} * - * {@code "sctpInitMaxStreams"}{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}} + * {@code "sctpInitMaxStreams"}{@link #setInitMaxStreams(InitMaxStreams)} * * * @@ -94,10 +92,10 @@ public interface SctpChannelConfig extends ChannelConfig { /** * Gets the {@code SCTP_INIT_MAXSTREAMS} option. */ - SctpStandardSocketOption.InitMaxStreams getInitMaxStreams(); + InitMaxStreams getInitMaxStreams(); /** * Gets the {@code SCTP_INIT_MAXSTREAMS} option. */ - void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams); + void setInitMaxStreams(InitMaxStreams initMaxStreams); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java index baf2d8587a..79f8e293c1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java @@ -15,13 +15,12 @@ */ package org.jboss.netty.channel.socket.sctp; -import com.sun.nio.sctp.SctpStandardSocketOption; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.DefaultServerChannelConfig; import org.jboss.netty.util.internal.ConversionUtil; import java.io.IOException; -import java.net.SocketAddress; /** * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. @@ -55,7 +54,7 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig } if (key.equals("sctpInitMaxStreams")) { - setInitMaxStreams((SctpStandardSocketOption.InitMaxStreams) value); + setInitMaxStreams((InitMaxStreams) value); } else if (key.equals("backlog")) { setBacklog(ConversionUtil.toInt(value)); } else { @@ -65,18 +64,18 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig } @Override - public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() { + public InitMaxStreams getInitMaxStreams() { try { - return serverChannel.getOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS); + return serverChannel.getOption(SCTP_INIT_MAXSTREAMS); } catch (IOException e) { throw new ChannelException(e); } } @Override - public void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams) { + public void setInitMaxStreams(InitMaxStreams initMaxStreams) { try { - serverChannel.setOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS, initMaxStreams); + serverChannel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams); } catch (IOException e) { throw new ChannelException(e); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/ServerSctpChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/ServerSctpChannelConfig.java index 50eb0268f2..0cfad45975 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/ServerSctpChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/ServerSctpChannelConfig.java @@ -15,7 +15,7 @@ */ package org.jboss.netty.channel.socket.sctp; -import com.sun.nio.sctp.SctpStandardSocketOption; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; import org.jboss.netty.channel.ChannelConfig; /** @@ -33,7 +33,7 @@ import org.jboss.netty.channel.ChannelConfig; * * {@code "backlog"}{@link #setBacklog(int)} * - * {@code "sctpInitMaxStreams"}{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}} + * {@code "sctpInitMaxStreams"}{@link #setInitMaxStreams(InitMaxStreams)} (int)}} * * * @@ -61,10 +61,10 @@ public interface ServerSctpChannelConfig extends ChannelConfig { /** * Gets the {@code SCTP_INIT_MAXSTREAMS} option. */ - SctpStandardSocketOption.InitMaxStreams getInitMaxStreams(); + InitMaxStreams getInitMaxStreams(); /** * Gets the {@code SCTP_INIT_MAXSTREAMS} option. */ - void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams); + void setInitMaxStreams(InitMaxStreams initMaxStreams); } diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpClient.java b/src/main/java/org/jboss/netty/example/sctp/SctpClient.java index 22118317fc..5a9b988097 100644 --- a/src/main/java/org/jboss/netty/example/sctp/SctpClient.java +++ b/src/main/java/org/jboss/netty/example/sctp/SctpClient.java @@ -21,6 +21,8 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory; +import org.jboss.netty.handler.execution.ExecutionHandler; +import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import java.net.InetSocketAddress; import java.util.concurrent.Executors; @@ -31,9 +33,7 @@ import java.util.concurrent.Executors; * @author The Netty Project * @author Trustin Lee * @author Jestan Nirojan - * * @version $Rev$, $Date$ - * */ public class SctpClient { @@ -45,12 +45,14 @@ public class SctpClient { Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + final ExecutionHandler executionHandler = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); + // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new SctpClientHandler()); + return Channels.pipeline(executionHandler, new SctpClientHandler()); } }); diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java index 161c486d12..f060d67dcb 100644 --- a/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java +++ b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java @@ -15,7 +15,6 @@ */ package org.jboss.netty.example.sctp; -import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.sctp.SctpMessage; diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpServer.java b/src/main/java/org/jboss/netty/example/sctp/SctpServer.java index dd8259b94e..6f8dda1975 100644 --- a/src/main/java/org/jboss/netty/example/sctp/SctpServer.java +++ b/src/main/java/org/jboss/netty/example/sctp/SctpServer.java @@ -20,6 +20,8 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory; +import org.jboss.netty.handler.execution.ExecutionHandler; +import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import java.net.InetSocketAddress; import java.util.concurrent.Executors; @@ -30,9 +32,7 @@ import java.util.concurrent.Executors; * @author The Netty Project * @author Trustin Lee * @author Jestan Nirojan - * * @version $Rev$, $Date$ - * */ public class SctpServer { @@ -43,11 +43,14 @@ public class SctpServer { Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + final ExecutionHandler executionHandler = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); + // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new SctpServerHandler()); + return Channels.pipeline(executionHandler, new SctpServerHandler()); } }); diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java b/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java index d64e1ddfb7..9a3f2475f3 100644 --- a/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java +++ b/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java @@ -43,7 +43,7 @@ public class SctpServerHandler extends SimpleChannelUpstreamHandler { public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. - logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from client."); +// logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from client."); e.getChannel().write(e.getMessage()); }