1)added sctp echo example 2)refactored sctp channel impl classes

This commit is contained in:
Jestan Nirojan 2011-10-10 02:58:20 +05:30
parent dad06d539c
commit 094834d9bf
21 changed files with 559 additions and 487 deletions

View File

@ -1,114 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException;
/**
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig
implements SctpServerChannelConfig {
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
private volatile int backlog;
/**
* Creates a new instance.
*/
public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getReceiveBufferSize() {
try {
return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -17,7 +17,9 @@ package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.ConversionUtil; import org.jboss.netty.util.internal.ConversionUtil;
@ -25,19 +27,19 @@ import org.jboss.netty.util.internal.ConversionUtil;
import java.util.Map; import java.util.Map;
/** /**
* The default {@link SctpChannelConfig} implementation. * The default {@link NioSocketChannelConfig} implementation for SCTP.
* *
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan * @author Jestan Nirojan
*
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*
*/ */
class DefaultSctpChannelConfig implements SctpChannelConfig { class DefaultSctpSocketChannelConfig implements NioSocketChannelConfig {
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile int connectTimeoutMillis = 10000; // 10 seconds
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultSctpChannelConfig.class); InternalLoggerFactory.getInstance(DefaultSctpSocketChannelConfig.class);
private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY = private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY =
new AdaptiveReceiveBufferSizePredictorFactory(); new AdaptiveReceiveBufferSizePredictorFactory();
@ -48,15 +50,15 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY; private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY;
private volatile int writeSpinCount = 16; private volatile int writeSpinCount = 16;
private SctpChannel socket; private SctpChannel socket;
private int payloadProtocolId = 0;
DefaultSctpChannelConfig(SctpChannel socket) { DefaultSctpSocketChannelConfig(SctpChannel socket) {
this.socket = socket; this.socket = socket;
} }
@Override @Override
public void setOptions(Map<String, Object> options) { public void setOptions(Map<String, Object> options) {
setOptions(options); //TODO: implement this as in DefaultSocketChannelConfig
//socket.setOption(options);
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value. // Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
@ -89,11 +91,15 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
@Override @Override
public ChannelBufferFactory getBufferFactory() { public ChannelBufferFactory getBufferFactory() {
return null; return bufferFactory;
} }
@Override @Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) { public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
} }
@Override @Override
@ -103,15 +109,20 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
@Override @Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
//unused
} }
@Override @Override
public int getConnectTimeoutMillis() { public int getConnectTimeoutMillis() {
return 0; return connectTimeoutMillis;
} }
@Override @Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) { public void setConnectTimeoutMillis(int connectTimeoutMillis) {
if (connectTimeoutMillis < 0) {
throw new IllegalArgumentException("connectTimeoutMillis: " + connectTimeoutMillis);
}
this.connectTimeoutMillis = connectTimeoutMillis;
} }
@Override @Override
@ -214,10 +225,6 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
this.predictorFactory = predictorFactory; this.predictorFactory = predictorFactory;
} }
@Override
public int getPayloadProtocol() {
return payloadProtocolId;
}
@Override @Override
public boolean isTcpNoDelay() { public boolean isTcpNoDelay() {

View File

@ -15,9 +15,11 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.Association;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig; import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -26,16 +28,21 @@ import java.util.Set;
/** /**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan * @author Jestan Nirojan
*
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*
*/ */
//TODO: support set of loacal, remote addresses. public interface SctpChannel extends SocketChannel {
public interface SctpChannel extends Channel{
@Override @Override
InetSocketAddress getLocalAddress(); InetSocketAddress getLocalAddress();
Set<InetSocketAddress> getAllLocalAddresses();
@Override @Override
SctpChannelConfig getConfig(); NioSocketChannelConfig getConfig();
@Override @Override
InetSocketAddress getRemoteAddress(); InetSocketAddress getRemoteAddress();
Set<InetSocketAddress> getRemoteAddresses();
Association association();
} }

View File

@ -1,149 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
/**
* A {@link org.jboss.netty.channel.socket.SocketChannelConfig} for a NIO TCP/IP {@link org.jboss.netty.channel.socket.SocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig} and
* {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link SctpChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictor"}</td><td>{@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictorFactory"}</td><td>{@link #setReceiveBufferSizePredictorFactory(org.jboss.netty.channel.ReceiveBufferSizePredictorFactory)}</td>
* </tr>
* </table>
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public interface SctpChannelConfig extends SocketChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()}
* will start to return {@code false}.
*/
int getWriteBufferHighWaterMark();
/**
* Sets the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()}
* will start to return {@code false}.
*/
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
/**
* Returns the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return
* {@code true} again.
*/
int getWriteBufferLowWaterMark();
/**
* Sets the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return
* {@code true} again.
*/
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
/**
* Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
/**
* Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
/**
* Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictorFactory} which creates a new
* {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and
* no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory();
/**
* Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which creates a new
* {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and
* no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictorFactory(
ReceiveBufferSizePredictorFactory predictorFactory);
/**
* Get the sctp payload protocol id
* A value indicating the type of payload protocol data being transmitted. This value is passed as opaque data by SCTP.
* @return payload protocol id
*/
int getPayloadProtocol();
}

View File

@ -15,16 +15,18 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer; import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean; import org.jboss.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Queue; import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -47,9 +49,9 @@ class SctpChannelImpl extends AbstractChannel
private static final int ST_CLOSED = -1; private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN; volatile int state = ST_OPEN;
final SctpChannel sctpChannel; final SctpChannel socket;
final SctpWorker worker; final SctpWorker worker;
private final SctpChannelConfig config; private final NioSocketChannelConfig config;
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
private volatile InetSocketAddress remoteAddress; private volatile InetSocketAddress remoteAddress;
@ -71,12 +73,12 @@ class SctpChannelImpl extends AbstractChannel
public SctpChannelImpl( public SctpChannelImpl(
Channel parent, ChannelFactory factory, Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, ChannelPipeline pipeline, ChannelSink sink,
SctpChannel sctpChannel, SctpWorker worker) { SctpChannel socket, SctpWorker worker) {
super(parent, factory, pipeline, sink); super(parent, factory, pipeline, sink);
this.sctpChannel = sctpChannel; this.socket = socket;
this.worker = worker; this.worker = worker;
config = new DefaultSctpChannelConfig(sctpChannel); config = new DefaultSctpSocketChannelConfig(socket);
getCloseFuture().addListener(new ChannelFutureListener() { getCloseFuture().addListener(new ChannelFutureListener() {
@Override @Override
@ -87,7 +89,7 @@ class SctpChannelImpl extends AbstractChannel
} }
@Override @Override
public SctpChannelConfig getConfig() { public NioSocketChannelConfig getConfig() {
return config; return config;
} }
@ -96,33 +98,70 @@ class SctpChannelImpl extends AbstractChannel
InetSocketAddress localAddress = this.localAddress; InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) { if (localAddress == null) {
try { try {
//TODO: fix this final Iterator<SocketAddress> iterator = socket.getAllLocalAddresses().iterator();
this.localAddress = localAddress = if (iterator.hasNext()) {
(InetSocketAddress) sctpChannel.getAllLocalAddresses().iterator().next(); this.localAddress = localAddress = (InetSocketAddress) iterator.next();
}
} catch (Throwable t) { } catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null; return null;
} }
} }
return localAddress; return localAddress;
} }
@Override
public Set<InetSocketAddress> getAllLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = socket.getAllLocalAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override @Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress; InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) { if (remoteAddress == null) {
try { try {
//TODO: fix this final Iterator<SocketAddress> iterator = socket.getRemoteAddresses().iterator();
this.remoteAddress = remoteAddress = if (iterator.hasNext()) {
(InetSocketAddress) sctpChannel.getRemoteAddresses().iterator().next(); this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
}
} catch (Throwable t) { } catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null; return null;
} }
} }
return remoteAddress; return remoteAddress;
} }
@Override
public Set<InetSocketAddress> getRemoteAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = socket.getRemoteAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
public Association association() {
try {
return socket.association();
} catch (Throwable e) {
return null;
}
}
@Override @Override
public boolean isOpen() { public boolean isOpen() {
return state >= ST_OPEN; return state >= ST_OPEN;

View File

@ -1,35 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a client-side {@link org.jboss.netty.channel.socket.SocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
* @apiviz.has org.jboss.netty.channel.socket.SocketChannel oneway - - creates
*/
public interface SctpClientChannelFactory extends ChannelFactory {
@Override
SctpChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -113,7 +113,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
SctpClientChannel channel, ChannelFuture future, SctpClientChannel channel, ChannelFuture future,
SocketAddress localAddress) { SocketAddress localAddress) {
try { try {
channel.sctpChannel.bind(localAddress); channel.socket.bind(localAddress);
channel.boundManually = true; channel.boundManually = true;
channel.setBound(); channel.setBound();
future.setSuccess(); future.setSuccess();
@ -128,7 +128,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
final SctpClientChannel channel, final ChannelFuture cf, final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) { SocketAddress remoteAddress) {
try { try {
if (channel.sctpChannel.connect(remoteAddress)) { if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf); channel.worker.register(channel, cf);
} else { } else {
channel.getCloseFuture().addListener(new ChannelFutureListener() { channel.getCloseFuture().addListener(new ChannelFutureListener() {
@ -371,7 +371,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
private void connect(SelectionKey k) { private void connect(SelectionKey k) {
SctpClientChannel ch = (SctpClientChannel) k.attachment(); SctpClientChannel ch = (SctpClientChannel) k.attachment();
try { try {
if (ch.sctpChannel.finishConnect()) { if (ch.socket.finishConnect()) {
k.cancel(); k.cancel();
ch.worker.register(ch, ch.connectFuture); ch.worker.register(ch, ch.connectFuture);
} }
@ -401,7 +401,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
@Override @Override
public void run() { public void run() {
try { try {
channel.sctpChannel.register( channel.socket.register(
boss.selector, SelectionKey.OP_CONNECT, channel); boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel)); channel.worker.close(channel, succeededFuture(channel));

View File

@ -16,6 +16,7 @@
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -28,26 +29,26 @@ import java.util.concurrent.Executor;
* *
* <h3>How threads work</h3> * <h3>How threads work</h3>
* <p> * <p>
* There are two types of threads in a {@link DefaultSctpClientChannelFactory}; * There are two types of threads in a {@link SctpClientSocketChannelFactory};
* one is boss thread and the other is worker thread. * one is boss thread and the other is worker thread.
* *
* <h4>Boss thread</h4> * <h4>Boss thread</h4>
* <p> * <p>
* One {@link DefaultSctpClientChannelFactory} has one boss thread. It makes * One {@link SctpClientSocketChannelFactory} has one boss thread. It makes
* a connection attempt on request. Once a connection attempt succeeds, * a connection attempt on request. Once a connection attempt succeeds,
* the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker * the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link DefaultSctpClientChannelFactory} manages. * threads that the {@link SctpClientSocketChannelFactory} manages.
* *
* <h4>Worker threads</h4> * <h4>Worker threads</h4>
* <p> * <p>
* One {@link DefaultSctpClientChannelFactory} can have one or more worker * One {@link SctpClientSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or * threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
* *
* <h3>Life cycle of threads and graceful shutdown</h3> * <h3>Life cycle of threads and graceful shutdown</h3>
* <p> * <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified * All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link DefaultSctpClientChannelFactory} was created. A boss thread is * when a {@link SctpClientSocketChannelFactory} was created. A boss thread is
* acquired from the {@code bossExecutor}, and worker threads are acquired from * acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified * the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -77,7 +78,7 @@ import java.util.concurrent.Executor;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory { public class SctpClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor bossExecutor; private final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;
@ -85,7 +86,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
* {@link #DefaultSctpClientChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * {@link #SctpClientSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}. * available processors is obtained by {@link Runtime#availableProcessors()}.
* *
@ -94,7 +95,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
* @param workerExecutor * @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/ */
public DefaultSctpClientChannelFactory( public SctpClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) { Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
} }
@ -109,7 +110,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
* @param workerCount * @param workerCount
* the maximum number of I/O worker threads * the maximum number of I/O worker threads
*/ */
public DefaultSctpClientChannelFactory( public SctpClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
if (bossExecutor == null) { if (bossExecutor == null) {

View File

@ -0,0 +1,53 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public final class SctpMessage {
private final int streamNo;
private final int payloadProtocolId;
private final ChannelBuffer data;
public SctpMessage(int streamNo, int payloadProtocolId, ChannelBuffer data) {
this.streamNo = streamNo;
this.payloadProtocolId = payloadProtocolId;
this.data = data;
}
public int streamNumber() {
return streamNo;
}
public int payloadProtocolId() {
return payloadProtocolId;
}
public ChannelBuffer data() {
if (data.readable()) {
return data.slice();
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
}
}

View File

@ -18,7 +18,6 @@ package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.FileRegion;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
@ -28,7 +27,6 @@ import java.nio.ByteBuffer;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan * @author Jestan Nirojan
*
* @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $ * @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $
*/ */
final class SctpSendBufferPool { final class SctpSendBufferPool {
@ -47,27 +45,29 @@ final class SctpSendBufferPool {
} }
final SendBuffer acquire(Object message) { final SendBuffer acquire(Object message) {
if (message instanceof ChannelBuffer) { if (message instanceof SctpMessage) {
return acquire((ChannelBuffer) message); return acquire((SctpMessage) message);
} else if (message instanceof FileRegion) { } else {
return acquire((FileRegion) message);
}
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unsupported message type: " + message.getClass()); "unsupported message type: " + message.getClass());
} }
}
private final SendBuffer acquire(SctpMessage message) {
final ChannelBuffer src = message.data();
final int streamNo = message.streamNumber();
final int protocolId = message.payloadProtocolId();
private final SendBuffer acquire(ChannelBuffer src) {
final int size = src.readableBytes(); final int size = src.readableBytes();
if (size == 0) { if (size == 0) {
return EMPTY_BUFFER; return EMPTY_BUFFER;
} }
if (src.isDirect()) { if (src.isDirect()) {
return new UnpooledSendBuffer(src.toByteBuffer()); return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer());
} }
if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) { if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
return new UnpooledSendBuffer(src.toByteBuffer()); return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer());
} }
Preallocation current = this.current; Preallocation current = this.current;
@ -81,7 +81,7 @@ final class SctpSendBufferPool {
buffer.position(align(nextPos)); buffer.position(align(nextPos));
slice.limit(nextPos); slice.limit(nextPos);
current.refCnt++; current.refCnt++;
dst = new PooledSendBuffer(current, slice); dst = new PooledSendBuffer(streamNo, protocolId, current, slice);
} else if (size > remaining) { } else if (size > remaining) {
this.current = current = getPreallocation(); this.current = current = getPreallocation();
buffer = current.buffer; buffer = current.buffer;
@ -89,11 +89,11 @@ final class SctpSendBufferPool {
buffer.position(align(size)); buffer.position(align(size));
slice.limit(size); slice.limit(size);
current.refCnt++; current.refCnt++;
dst = new PooledSendBuffer(current, slice); dst = new PooledSendBuffer(streamNo, protocolId, current, slice);
} else { // size == remaining } else { // size == remaining
current.refCnt++; current.refCnt++;
this.current = getPreallocation0(); this.current = getPreallocation0();
dst = new PooledSendBuffer(current, current.buffer); dst = new PooledSendBuffer(streamNo, protocolId, current, current.buffer);
} }
ByteBuffer dstbuf = dst.buffer; ByteBuffer dstbuf = dst.buffer;
@ -166,7 +166,7 @@ final class SctpSendBufferPool {
long totalBytes(); long totalBytes();
long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException; long transferTo(SctpChannel ch) throws IOException;
void release(); void release();
} }
@ -175,8 +175,12 @@ final class SctpSendBufferPool {
final ByteBuffer buffer; final ByteBuffer buffer;
final int initialPos; final int initialPos;
final int streamNo;
final int protocolId;
UnpooledSendBuffer(ByteBuffer buffer) { UnpooledSendBuffer(int streamNo, int protocolId, ByteBuffer buffer) {
this.streamNo = streamNo;
this.protocolId = protocolId;
this.buffer = buffer; this.buffer = buffer;
initialPos = buffer.position(); initialPos = buffer.position();
} }
@ -197,7 +201,7 @@ final class SctpSendBufferPool {
} }
@Override @Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { public long transferTo(SctpChannel ch) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId); messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo); messageInfo.streamNumber(streamNo);
@ -216,8 +220,12 @@ final class SctpSendBufferPool {
private final Preallocation parent; private final Preallocation parent;
final ByteBuffer buffer; final ByteBuffer buffer;
final int initialPos; final int initialPos;
final int streamNo;
final int protocolId;
PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { PooledSendBuffer(int streamNo, int protocolId, Preallocation parent, ByteBuffer buffer) {
this.streamNo = streamNo;
this.protocolId = protocolId;
this.parent = parent; this.parent = parent;
this.buffer = buffer; this.buffer = buffer;
initialPos = buffer.position(); initialPos = buffer.position();
@ -239,7 +247,7 @@ final class SctpSendBufferPool {
} }
@Override @Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { public long transferTo(SctpChannel ch) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId); messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo); messageInfo.streamNumber(streamNo);
@ -281,7 +289,7 @@ final class SctpSendBufferPool {
} }
@Override @Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { public long transferTo(SctpChannel ch) throws IOException {
return 0; return 0;
} }

View File

@ -15,7 +15,7 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ServerChannel; import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig; import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -29,9 +29,9 @@ import java.net.InetSocketAddress;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
* *
*/ */
public interface SctpServerChannel extends ServerChannel { public interface SctpServerChannel extends ServerSocketChannel {
@Override @Override
SctpServerChannelConfig getConfig(); ServerSocketChannelConfig getConfig();
@Override @Override
InetSocketAddress getLocalAddress(); InetSocketAddress getLocalAddress();
@Override @Override

View File

@ -15,71 +15,100 @@
*/ */
package org.jboss.netty.channel.socket.sctp; package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelConfig; import com.sun.nio.sctp.SctpStandardSocketOption;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException;
/** /**
* A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.ServerSocketChannel}. * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig},
* {@link org.jboss.netty.channel.socket.sctp.SctpServerChannelConfig} allows the following options in the
* option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
* </tr><tr>
* <td>{@code "reuseAddress"}</td><td>{@link #setReuseAddress(boolean)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSize"}</td><td>{@link #setReceiveBufferSize(int)}</td>
* </tr>
* </table>
* *
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* *
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
public interface SctpServerChannelConfig extends ChannelConfig { public class SctpServerChannelConfig extends DefaultServerChannelConfig
implements ServerSocketChannelConfig {
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
private volatile int backlog;
/** /**
* Gets the backlog value to specify when the channel binds to a local * Creates a new instance.
* address.
*/ */
int getBacklog(); public SctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
/** @Override
* Sets the backlog value to specify when the channel binds to a local public boolean setOption(String key, Object value) {
* address. if (super.setOption(key, value)) {
*/ return true;
void setBacklog(int backlog); }
/** if (key.equals("receiveBufferSize")) {
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option. setReceiveBufferSize(ConversionUtil.toInt(value));
*/ } else if (key.equals("reuseAddress")) {
boolean isReuseAddress(); setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
/** @Override
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option. public boolean isReuseAddress() {
*/ return false;
void setReuseAddress(boolean reuseAddress); }
/** @Override
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option. public void setReuseAddress(boolean reuseAddress) {
*/ throw new UnsupportedOperationException("Not supported");
int getReceiveBufferSize(); }
/** @Override
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option. public int getReceiveBufferSize() {
*/ try {
void setReceiveBufferSize(int receiveBufferSize); return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
/** @Override
* Sets the performance preferences as specified in public void setReceiveBufferSize(int receiveBufferSize) {
* {@link java.net.ServerSocket#setPerformancePreferences(int, int, int)}. try {
*/ serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
void setPerformancePreferences(int connectionTime, int latency, int bandwidth); } catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
} }

View File

@ -1,34 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.channel.socket.ServerSocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a {@link org.jboss.netty.channel.socket.ServerSocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
*/
public interface SctpServerChannelFactory extends ServerChannelFactory {
@Override
SctpServerChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -46,7 +46,7 @@ class SctpServerChannelImpl extends AbstractServerChannel
final com.sun.nio.sctp.SctpServerChannel socket; final com.sun.nio.sctp.SctpServerChannel socket;
final Lock shutdownLock = new ReentrantLock(); final Lock shutdownLock = new ReentrantLock();
volatile Selector selector; volatile Selector selector;
private final SctpServerChannelConfig config; private final ServerSocketChannelConfig config;
private volatile boolean bound; private volatile boolean bound;
@ -77,13 +77,13 @@ class SctpServerChannelImpl extends AbstractServerChannel
throw new ChannelException("Failed to enter non-blocking mode.", e); throw new ChannelException("Failed to enter non-blocking mode.", e);
} }
config = new DefaultSctpServerChannelConfig(socket); config = new SctpServerChannelConfig(socket);
fireChannelOpen(this); fireChannelOpen(this);
} }
@Override @Override
public SctpServerChannelConfig getConfig() { public ServerSocketChannelConfig getConfig() {
return config; return config;
} }

View File

@ -141,7 +141,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor = Executor bossExecutor =
((DefaultSctpServerChannelFactory) channel.getFactory()).bossExecutor; ((SctpServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor, new Boss(channel)); DeadLockProofWorker.start(bossExecutor, new Boss(channel));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -29,7 +30,7 @@ import java.util.concurrent.Executor;
* *
* <h3>How threads work</h3> * <h3>How threads work</h3>
* <p> * <p>
* There are two types of threads in a {@link DefaultSctpServerChannelFactory}; * There are two types of threads in a {@link SctpServerSocketChannelFactory};
* one is boss thread and the other is worker thread. * one is boss thread and the other is worker thread.
* *
* <h4>Boss threads</h4> * <h4>Boss threads</h4>
@ -39,18 +40,18 @@ import java.util.concurrent.Executor;
* have two boss threads. A boss thread accepts incoming connections until * have two boss threads. A boss thread accepts incoming connections until
* the port is unbound. Once a connection is accepted successfully, the boss * the port is unbound. Once a connection is accepted successfully, the boss
* thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker * thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link DefaultSctpServerChannelFactory} manages. * threads that the {@link SctpServerSocketChannelFactory} manages.
* *
* <h4>Worker threads</h4> * <h4>Worker threads</h4>
* <p> * <p>
* One {@link DefaultSctpServerChannelFactory} can have one or more worker * One {@link SctpServerSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or * threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
* *
* <h3>Life cycle of threads and graceful shutdown</h3> * <h3>Life cycle of threads and graceful shutdown</h3>
* <p> * <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified * All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link DefaultSctpServerChannelFactory} was created. Boss threads are * when a {@link SctpServerSocketChannelFactory} was created. Boss threads are
* acquired from the {@code bossExecutor}, and worker threads are acquired from * acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified * the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -81,7 +82,7 @@ import java.util.concurrent.Executor;
* *
* @apiviz.landmark * @apiviz.landmark
*/ */
public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory { public class SctpServerSocketChannelFactory implements ServerChannelFactory {
final Executor bossExecutor; final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;
@ -89,7 +90,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
* {@link #DefaultSctpServerChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * {@link #SctpServerSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}. * available processors is obtained by {@link Runtime#availableProcessors()}.
* *
@ -98,7 +99,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
* @param workerExecutor * @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/ */
public DefaultSctpServerChannelFactory( public SctpServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) { Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
} }
@ -113,7 +114,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
* @param workerCount * @param workerCount
* the maximum number of I/O worker threads * the maximum number of I/O worker threads
*/ */
public DefaultSctpServerChannelFactory( public SctpServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
if (bossExecutor == null) { if (bossExecutor == null) {

View File

@ -70,7 +70,6 @@ class SctpWorker implements Runnable {
private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool(); private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool();
private int payloadProtocolId = 0;// un-known sctp payload protocol id
private NotificationHandler notificationHandler; private NotificationHandler notificationHandler;
@ -82,7 +81,6 @@ class SctpWorker implements Runnable {
boolean server = !(channel instanceof SctpClientChannel); boolean server = !(channel instanceof SctpClientChannel);
Runnable registerTask = new RegisterTask(channel, future, server); Runnable registerTask = new RegisterTask(channel, future, server);
payloadProtocolId = channel.getConfig().getPayloadProtocol();
notificationHandler = new NotificationHandler(channel, this); notificationHandler = new NotificationHandler(channel, this);
Selector selector; Selector selector;
@ -300,10 +298,11 @@ class SctpWorker implements Runnable {
boolean messageReceived = false; boolean messageReceived = false;
boolean failure = true; boolean failure = true;
MessageInfo messageInfo = null;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try { try {
MessageInfo messageInfo = channel.sctpChannel.receive(bb, this, notificationHandler); messageInfo = channel.socket.receive(bb, this, notificationHandler);
if (messageInfo != null) { if (messageInfo != null) {
messageReceived = true; messageReceived = true;
if (messageInfo.isComplete()) { if (messageInfo.isComplete()) {
@ -338,12 +337,12 @@ class SctpWorker implements Runnable {
predictor.previousReceiveBufferSize(receivedBytes); predictor.previousReceiveBufferSize(receivedBytes);
// Fire the event. // Fire the event.
fireMessageReceived(channel, buffer); fireMessageReceived(channel, new SctpMessage(messageInfo.streamNumber(), messageInfo.payloadProtocolID(), buffer));
} else { } else {
recvBufferPool.release(bb); recvBufferPool.release(bb);
} }
if (channel.sctpChannel.isBlocking() && !messageReceived || failure) { if (channel.socket.isBlocking() && !messageReceived || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this. k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel)); close(channel, succeededFuture(channel));
return false; return false;
@ -435,7 +434,7 @@ class SctpWorker implements Runnable {
long writtenBytes = 0; long writtenBytes = 0;
final SctpSendBufferPool sendBufferPool = this.sendBufferPool; final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
final com.sun.nio.sctp.SctpChannel ch = channel.sctpChannel; final com.sun.nio.sctp.SctpChannel ch = channel.socket;
final Queue<MessageEvent> writeBuffer = channel.writeBuffer; final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final int writeSpinCount = channel.getConfig().getWriteSpinCount(); final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
@ -459,7 +458,7 @@ class SctpWorker implements Runnable {
try { try {
long localWrittenBytes = 0; long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i--) { for (int i = writeSpinCount; i > 0; i--) {
localWrittenBytes = buf.transferTo(ch, payloadProtocolId, 0); localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
@ -522,7 +521,7 @@ class SctpWorker implements Runnable {
private void setOpWrite(SctpChannelImpl channel) { private void setOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
} }
@ -545,7 +544,7 @@ class SctpWorker implements Runnable {
private void clearOpWrite(SctpChannelImpl channel) { private void clearOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
} }
@ -570,7 +569,7 @@ class SctpWorker implements Runnable {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.sctpChannel.close(); channel.socket.close();
cancelledKeys++; cancelledKeys++;
if (channel.setClosed()) { if (channel.setClosed()) {
@ -654,7 +653,7 @@ class SctpWorker implements Runnable {
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) { if (key == null || selector == null) {
// Not registered to the worker yet. // Not registered to the worker yet.
@ -749,11 +748,11 @@ class SctpWorker implements Runnable {
try { try {
if (server) { if (server) {
channel.sctpChannel.configureBlocking(false); channel.socket.configureBlocking(false);
} }
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
channel.sctpChannel.register( channel.socket.register(
selector, channel.getRawInterestOps(), channel); selector, channel.getRawInterestOps(), channel);
} }
if (future != null) { if (future != null) {

View File

@ -0,0 +1,69 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class SctpClient {
public static void main(String[] args) throws Exception {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new SctpClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new SctpClientHandler());
}
});
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 2955), new InetSocketAddress("localhost", 2956));
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SctpMessage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public class SctpClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
SctpClientHandler.class.getName());
/**
* Creates a client-side handler.
*/
public SctpClientHandler() {
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
e.getChannel().write(new SctpMessage(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes())));
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Echoes back any received data from a client.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class SctpServer {
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new SctpServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SctpServerHandler());
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress("localhost", 2955));
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handler implementation for the echo server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public class SctpServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
SctpServerHandler.class.getName());
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}