1)refactored sctp socket option implementation

2)tested sample sctp program under 20,000 msg per sec
This commit is contained in:
Jestan Nirojan 2011-10-30 19:44:11 +05:30
parent 6766513157
commit 08b509b209
8 changed files with 41 additions and 40 deletions

View File

@ -16,7 +16,7 @@
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpStandardSocketOption; import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultChannelConfig; import org.jboss.netty.channel.DefaultChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
@ -59,7 +59,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
} else if (key.equals("soLinger")) { } else if (key.equals("soLinger")) {
setSoLinger(ConversionUtil.toInt(value)); setSoLinger(ConversionUtil.toInt(value));
} else if (key.equals("sctpInitMaxStreams")) { } else if (key.equals("sctpInitMaxStreams")) {
setInitMaxStreams((SctpStandardSocketOption.InitMaxStreams) value); setInitMaxStreams((InitMaxStreams) value);
} else { } else {
return false; return false;
} }
@ -69,7 +69,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public boolean isSctpNoDelay() { public boolean isSctpNoDelay() {
try { try {
return channel.getOption(SctpStandardSocketOption.SCTP_NODELAY); return channel.getOption(SCTP_NODELAY);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -78,7 +78,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public void setSctpNoDelay(boolean tcpNoDelay) { public void setSctpNoDelay(boolean tcpNoDelay) {
try { try {
channel.setOption(SctpStandardSocketOption.SCTP_NODELAY, tcpNoDelay); channel.setOption(SCTP_NODELAY, tcpNoDelay);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -87,7 +87,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public int getSoLinger() { public int getSoLinger() {
try { try {
return channel.getOption(SctpStandardSocketOption.SO_LINGER); return channel.getOption(SO_LINGER);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -96,7 +96,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public void setSoLinger(int soLinger) { public void setSoLinger(int soLinger) {
try { try {
channel.setOption(SctpStandardSocketOption.SO_LINGER, soLinger); channel.setOption(SO_LINGER, soLinger);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -105,7 +105,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public int getSendBufferSize() { public int getSendBufferSize() {
try { try {
return channel.getOption(SctpStandardSocketOption.SO_SNDBUF); return channel.getOption(SO_SNDBUF);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -114,7 +114,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public void setSendBufferSize(int sendBufferSize) { public void setSendBufferSize(int sendBufferSize) {
try { try {
channel.setOption(SctpStandardSocketOption.SO_SNDBUF, sendBufferSize); channel.setOption(SO_SNDBUF, sendBufferSize);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -123,7 +123,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
try { try {
return channel.getOption(SctpStandardSocketOption.SO_RCVBUF); return channel.getOption(SO_RCVBUF);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -132,25 +132,25 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override @Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
try { try {
channel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize); channel.setOption(SO_RCVBUF, receiveBufferSize);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
@Override @Override
public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() { public InitMaxStreams getInitMaxStreams() {
try { try {
return channel.getOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS); return channel.getOption(SCTP_INIT_MAXSTREAMS);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
@Override @Override
public void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams) { public void setInitMaxStreams(InitMaxStreams initMaxStreams) {
try { try {
channel.setOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS, initMaxStreams); channel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }

View File

@ -15,11 +15,9 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption; import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import org.jboss.netty.channel.ChannelConfig; import org.jboss.netty.channel.ChannelConfig;
import java.net.SocketAddress;
/** /**
* A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.sctp.SctpChannel}. * A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.sctp.SctpChannel}.
* <p/> * <p/>
@ -40,7 +38,7 @@ import java.net.SocketAddress;
* </tr><tr> * </tr><tr>
* <td>{@code "sendBufferSize"}</td><td>{@link #setSendBufferSize(int)}</td> * <td>{@code "sendBufferSize"}</td><td>{@link #setSendBufferSize(int)}</td>
* </tr><tr> * </tr><tr>
* <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}}</td> * <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(InitMaxStreams)}</td>
* </tr> * </tr>
* </table> * </table>
* *
@ -94,10 +92,10 @@ public interface SctpChannelConfig extends ChannelConfig {
/** /**
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option. * Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
*/ */
SctpStandardSocketOption.InitMaxStreams getInitMaxStreams(); InitMaxStreams getInitMaxStreams();
/** /**
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option. * Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
*/ */
void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams); void setInitMaxStreams(InitMaxStreams initMaxStreams);
} }

View File

@ -15,13 +15,12 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption; import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig; import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil; import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress;
/** /**
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
@ -55,7 +54,7 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig
} }
if (key.equals("sctpInitMaxStreams")) { if (key.equals("sctpInitMaxStreams")) {
setInitMaxStreams((SctpStandardSocketOption.InitMaxStreams) value); setInitMaxStreams((InitMaxStreams) value);
} else if (key.equals("backlog")) { } else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value)); setBacklog(ConversionUtil.toInt(value));
} else { } else {
@ -65,18 +64,18 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig
} }
@Override @Override
public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() { public InitMaxStreams getInitMaxStreams() {
try { try {
return serverChannel.getOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS); return serverChannel.getOption(SCTP_INIT_MAXSTREAMS);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
@Override @Override
public void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams) { public void setInitMaxStreams(InitMaxStreams initMaxStreams) {
try { try {
serverChannel.setOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS, initMaxStreams); serverChannel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }

View File

@ -15,7 +15,7 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption; import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import org.jboss.netty.channel.ChannelConfig; import org.jboss.netty.channel.ChannelConfig;
/** /**
@ -33,7 +33,7 @@ import org.jboss.netty.channel.ChannelConfig;
* </tr><tr> * </tr><tr>
* <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td> * <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
* </tr><tr> * </tr><tr>
* <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}}</td> * <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(InitMaxStreams)} (int)}}</td>
* </tr> * </tr>
* </table> * </table>
* *
@ -61,10 +61,10 @@ public interface ServerSctpChannelConfig extends ChannelConfig {
/** /**
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option. * Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
*/ */
SctpStandardSocketOption.InitMaxStreams getInitMaxStreams(); InitMaxStreams getInitMaxStreams();
/** /**
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option. * Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
*/ */
void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams); void setInitMaxStreams(InitMaxStreams initMaxStreams);
} }

View File

@ -21,6 +21,8 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory; import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -31,9 +33,7 @@ import java.util.concurrent.Executors;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author <a href="http://github.com/jestan">Jestan Nirojan</a> * @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*
*/ */
public class SctpClient { public class SctpClient {
@ -45,12 +45,14 @@ public class SctpClient {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory. // Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline( return Channels.pipeline(executionHandler, new SctpClientHandler());
new SctpClientHandler());
} }
}); });

View File

@ -15,7 +15,6 @@
*/ */
package org.jboss.netty.example.sctp; package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SctpMessage; import org.jboss.netty.channel.socket.sctp.SctpMessage;

View File

@ -20,6 +20,8 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory; import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -30,9 +32,7 @@ import java.util.concurrent.Executors;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author <a href="http://github.com/jestan">Jestan Nirojan</a> * @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*
*/ */
public class SctpServer { public class SctpServer {
@ -43,11 +43,14 @@ public class SctpServer {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory. // Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SctpServerHandler()); return Channels.pipeline(executionHandler, new SctpServerHandler());
} }
}); });

View File

@ -43,7 +43,7 @@ public class SctpServerHandler extends SimpleChannelUpstreamHandler {
public void messageReceived( public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) { ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer. // Send back the received message to the remote peer.
logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from client."); // logger.log(Level.INFO, "Received " + counter.incrementAndGet() + "th message from client.");
e.getChannel().write(e.getMessage()); e.getChannel().write(e.getMessage());
} }