1)refactored sctp server channel config 2)removed unsupported sctp socket options
This commit is contained in:
parent
a20a2c1e17
commit
b22c7e4a08
@ -15,18 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.jboss.netty.channel.socket.sctp;
|
package org.jboss.netty.channel.socket.sctp;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import com.sun.nio.sctp.SctpChannel;
|
import com.sun.nio.sctp.SctpChannel;
|
||||||
import com.sun.nio.sctp.SctpStandardSocketOption;
|
import com.sun.nio.sctp.SctpStandardSocketOption;
|
||||||
|
|
||||||
import org.jboss.netty.channel.ChannelException;
|
import org.jboss.netty.channel.ChannelException;
|
||||||
import org.jboss.netty.channel.DefaultChannelConfig;
|
import org.jboss.netty.channel.DefaultChannelConfig;
|
||||||
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
|
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
|
||||||
import org.jboss.netty.util.internal.ConversionUtil;
|
import org.jboss.netty.util.internal.ConversionUtil;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default {@link NioSocketChannelConfig} implementation for SCTP.
|
* The default {@link NioSocketChannelConfig} implementation for SCTP.
|
||||||
@ -34,6 +30,7 @@ import org.jboss.netty.util.internal.ConversionUtil;
|
|||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
|
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
|
||||||
|
*
|
||||||
* @version $Rev$, $Date$
|
* @version $Rev$, $Date$
|
||||||
*/
|
*/
|
||||||
class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig {
|
class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig {
|
||||||
@ -59,10 +56,6 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
|
|||||||
setSendBufferSize(ConversionUtil.toInt(value));
|
setSendBufferSize(ConversionUtil.toInt(value));
|
||||||
} else if (key.equals("sctpNoDelay")) {
|
} else if (key.equals("sctpNoDelay")) {
|
||||||
setSctpNoDelay(ConversionUtil.toBoolean(value));
|
setSctpNoDelay(ConversionUtil.toBoolean(value));
|
||||||
} else if (key.equals("sctpPrimaryAddress")) {
|
|
||||||
setPrimaryAddress(ConversionUtil.toSocketAddress(value));
|
|
||||||
} else if (key.equals("sctpPeerPrimaryAddress")) {
|
|
||||||
setPeerPrimaryAddress(ConversionUtil.toSocketAddress(value));
|
|
||||||
} else if (key.equals("soLinger")) {
|
} else if (key.equals("soLinger")) {
|
||||||
setSoLinger(ConversionUtil.toInt(value));
|
setSoLinger(ConversionUtil.toInt(value));
|
||||||
} else if (key.equals("sctpInitMaxStreams")) {
|
} else if (key.equals("sctpInitMaxStreams")) {
|
||||||
@ -145,42 +138,6 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public SocketAddress getPrimaryAddress() {
|
|
||||||
try {
|
|
||||||
return channel.getOption(SctpStandardSocketOption.SCTP_PRIMARY_ADDR);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPrimaryAddress(SocketAddress primaryAddress) {
|
|
||||||
try {
|
|
||||||
channel.setOption(SctpStandardSocketOption.SCTP_PRIMARY_ADDR, primaryAddress);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SocketAddress getPeerPrimaryAddress() {
|
|
||||||
try {
|
|
||||||
return channel.getOption(SctpStandardSocketOption.SCTP_SET_PEER_PRIMARY_ADDR);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPeerPrimaryAddress(SocketAddress peerPrimaryAddress) {
|
|
||||||
try {
|
|
||||||
channel.setOption(SctpStandardSocketOption.SCTP_SET_PEER_PRIMARY_ADDR, peerPrimaryAddress);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() {
|
public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() {
|
||||||
try {
|
try {
|
||||||
|
@ -40,10 +40,6 @@ import java.net.SocketAddress;
|
|||||||
* </tr><tr>
|
* </tr><tr>
|
||||||
* <td>{@code "sendBufferSize"}</td><td>{@link #setSendBufferSize(int)}</td>
|
* <td>{@code "sendBufferSize"}</td><td>{@link #setSendBufferSize(int)}</td>
|
||||||
* </tr><tr>
|
* </tr><tr>
|
||||||
* <td>{@code "sctpPrimaryAddress"}</td><td>{@link #setPrimaryAddress(SocketAddress)}}</td>
|
|
||||||
* </tr><tr>
|
|
||||||
* <td>{@code "sctpPeerPrimaryAddress"}</td><td>{@link #setPeerPrimaryAddress(SocketAddress)}}</td>
|
|
||||||
* </tr><tr>
|
|
||||||
* <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}}</td>
|
* <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}}</td>
|
||||||
* </tr>
|
* </tr>
|
||||||
* </table>
|
* </table>
|
||||||
@ -95,26 +91,6 @@ public interface SctpChannelConfig extends ChannelConfig {
|
|||||||
*/
|
*/
|
||||||
void setReceiveBufferSize(int receiveBufferSize);
|
void setReceiveBufferSize(int receiveBufferSize);
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_PRIMARY_ADDR}</a> option.
|
|
||||||
*/
|
|
||||||
SocketAddress getPrimaryAddress();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_PRIMARY_ADDR}</a> option.
|
|
||||||
*/
|
|
||||||
void setPrimaryAddress(SocketAddress primaryAddress);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_SET_PEER_PRIMARY_ADDR}</a> option.
|
|
||||||
*/
|
|
||||||
SocketAddress getPeerPrimaryAddress();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_SET_PEER_PRIMARY_ADDR}</a> option.
|
|
||||||
*/
|
|
||||||
void setPeerPrimaryAddress(SocketAddress peerPrimaryAddress);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
|
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
|
||||||
*/
|
*/
|
||||||
|
@ -19,7 +19,6 @@ import com.sun.nio.sctp.Association;
|
|||||||
import com.sun.nio.sctp.SctpChannel;
|
import com.sun.nio.sctp.SctpChannel;
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.channel.*;
|
import org.jboss.netty.channel.*;
|
||||||
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
|
|
||||||
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
|
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
|
||||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||||
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
||||||
@ -49,7 +48,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
private static final int ST_CLOSED = -1;
|
private static final int ST_CLOSED = -1;
|
||||||
volatile int state = ST_OPEN;
|
volatile int state = ST_OPEN;
|
||||||
|
|
||||||
final SctpChannel socket;
|
final SctpChannel channel;
|
||||||
final SctpWorker worker;
|
final SctpWorker worker;
|
||||||
private final NioSctpChannelConfig config;
|
private final NioSctpChannelConfig config;
|
||||||
private volatile InetSocketAddress localAddress;
|
private volatile InetSocketAddress localAddress;
|
||||||
@ -73,12 +72,12 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
public SctpChannelImpl(
|
public SctpChannelImpl(
|
||||||
Channel parent, ChannelFactory factory,
|
Channel parent, ChannelFactory factory,
|
||||||
ChannelPipeline pipeline, ChannelSink sink,
|
ChannelPipeline pipeline, ChannelSink sink,
|
||||||
SctpChannel socket, SctpWorker worker) {
|
SctpChannel channel, SctpWorker worker) {
|
||||||
super(parent, factory, pipeline, sink);
|
super(parent, factory, pipeline, sink);
|
||||||
|
|
||||||
this.socket = socket;
|
this.channel = channel;
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
config = new DefaultNioSctpChannelConfig(socket);
|
config = new DefaultNioSctpChannelConfig(channel);
|
||||||
|
|
||||||
getCloseFuture().addListener(new ChannelFutureListener() {
|
getCloseFuture().addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
@ -98,7 +97,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
InetSocketAddress localAddress = this.localAddress;
|
InetSocketAddress localAddress = this.localAddress;
|
||||||
if (localAddress == null) {
|
if (localAddress == null) {
|
||||||
try {
|
try {
|
||||||
final Iterator<SocketAddress> iterator = socket.getAllLocalAddresses().iterator();
|
final Iterator<SocketAddress> iterator = channel.getAllLocalAddresses().iterator();
|
||||||
if (iterator.hasNext()) {
|
if (iterator.hasNext()) {
|
||||||
this.localAddress = localAddress = (InetSocketAddress) iterator.next();
|
this.localAddress = localAddress = (InetSocketAddress) iterator.next();
|
||||||
}
|
}
|
||||||
@ -112,7 +111,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
@Override
|
@Override
|
||||||
public Set<InetSocketAddress> getAllLocalAddresses() {
|
public Set<InetSocketAddress> getAllLocalAddresses() {
|
||||||
try {
|
try {
|
||||||
final Set<SocketAddress> allLocalAddresses = socket.getAllLocalAddresses();
|
final Set<SocketAddress> allLocalAddresses = channel.getAllLocalAddresses();
|
||||||
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
|
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
|
||||||
for(SocketAddress socketAddress: allLocalAddresses) {
|
for(SocketAddress socketAddress: allLocalAddresses) {
|
||||||
addresses.add((InetSocketAddress) socketAddress);
|
addresses.add((InetSocketAddress) socketAddress);
|
||||||
@ -128,7 +127,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
InetSocketAddress remoteAddress = this.remoteAddress;
|
InetSocketAddress remoteAddress = this.remoteAddress;
|
||||||
if (remoteAddress == null) {
|
if (remoteAddress == null) {
|
||||||
try {
|
try {
|
||||||
final Iterator<SocketAddress> iterator = socket.getRemoteAddresses().iterator();
|
final Iterator<SocketAddress> iterator = channel.getRemoteAddresses().iterator();
|
||||||
if (iterator.hasNext()) {
|
if (iterator.hasNext()) {
|
||||||
this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
|
this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
|
||||||
}
|
}
|
||||||
@ -142,7 +141,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
@Override
|
@Override
|
||||||
public Set<InetSocketAddress> getRemoteAddresses() {
|
public Set<InetSocketAddress> getRemoteAddresses() {
|
||||||
try {
|
try {
|
||||||
final Set<SocketAddress> allLocalAddresses = socket.getRemoteAddresses();
|
final Set<SocketAddress> allLocalAddresses = channel.getRemoteAddresses();
|
||||||
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
|
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
|
||||||
for(SocketAddress socketAddress: allLocalAddresses) {
|
for(SocketAddress socketAddress: allLocalAddresses) {
|
||||||
addresses.add((InetSocketAddress) socketAddress);
|
addresses.add((InetSocketAddress) socketAddress);
|
||||||
@ -156,7 +155,7 @@ class SctpChannelImpl extends AbstractChannel
|
|||||||
@Override
|
@Override
|
||||||
public Association association() {
|
public Association association() {
|
||||||
try {
|
try {
|
||||||
return socket.association();
|
return channel.association();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
|
|||||||
SctpClientChannel channel, ChannelFuture future,
|
SctpClientChannel channel, ChannelFuture future,
|
||||||
SocketAddress localAddress) {
|
SocketAddress localAddress) {
|
||||||
try {
|
try {
|
||||||
channel.socket.bind(localAddress);
|
channel.channel.bind(localAddress);
|
||||||
channel.boundManually = true;
|
channel.boundManually = true;
|
||||||
channel.setBound();
|
channel.setBound();
|
||||||
future.setSuccess();
|
future.setSuccess();
|
||||||
@ -128,7 +128,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
|
|||||||
final SctpClientChannel channel, final ChannelFuture cf,
|
final SctpClientChannel channel, final ChannelFuture cf,
|
||||||
SocketAddress remoteAddress) {
|
SocketAddress remoteAddress) {
|
||||||
try {
|
try {
|
||||||
if (channel.socket.connect(remoteAddress)) {
|
if (channel.channel.connect(remoteAddress)) {
|
||||||
channel.worker.register(channel, cf);
|
channel.worker.register(channel, cf);
|
||||||
} else {
|
} else {
|
||||||
channel.getCloseFuture().addListener(new ChannelFutureListener() {
|
channel.getCloseFuture().addListener(new ChannelFutureListener() {
|
||||||
@ -371,7 +371,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
|
|||||||
private void connect(SelectionKey k) {
|
private void connect(SelectionKey k) {
|
||||||
SctpClientChannel ch = (SctpClientChannel) k.attachment();
|
SctpClientChannel ch = (SctpClientChannel) k.attachment();
|
||||||
try {
|
try {
|
||||||
if (ch.socket.finishConnect()) {
|
if (ch.channel.finishConnect()) {
|
||||||
k.cancel();
|
k.cancel();
|
||||||
ch.worker.register(ch, ch.connectFuture);
|
ch.worker.register(ch, ch.connectFuture);
|
||||||
}
|
}
|
||||||
@ -401,7 +401,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
channel.socket.register(
|
channel.channel.register(
|
||||||
boss.selector, SelectionKey.OP_CONNECT, channel);
|
boss.selector, SelectionKey.OP_CONNECT, channel);
|
||||||
} catch (ClosedChannelException e) {
|
} catch (ClosedChannelException e) {
|
||||||
channel.worker.close(channel, succeededFuture(channel));
|
channel.worker.close(channel, succeededFuture(channel));
|
||||||
|
@ -16,24 +16,27 @@
|
|||||||
package org.jboss.netty.channel.socket.sctp;
|
package org.jboss.netty.channel.socket.sctp;
|
||||||
|
|
||||||
import org.jboss.netty.channel.ServerChannel;
|
import org.jboss.netty.channel.ServerChannel;
|
||||||
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A TCP/IP {@link org.jboss.netty.channel.ServerChannel} which accepts incoming TCP/IP connections.
|
* A TCP/IP {@link org.jboss.netty.channel.ServerChannel} which accepts incoming TCP/IP connections.
|
||||||
*
|
*
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
|
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
|
||||||
*
|
*
|
||||||
* @version $Rev$, $Date$
|
* @version $Rev$, $Date$
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface SctpServerChannel extends ServerChannel {
|
public interface SctpServerChannel extends ServerChannel {
|
||||||
@Override
|
@Override
|
||||||
ServerSocketChannelConfig getConfig();
|
ServerSctpChannelConfig getConfig();
|
||||||
@Override
|
@Override
|
||||||
InetSocketAddress getLocalAddress();
|
InetSocketAddress getLocalAddress();
|
||||||
|
|
||||||
|
Set<InetSocketAddress> getAllLocalAddresses();
|
||||||
@Override
|
@Override
|
||||||
InetSocketAddress getRemoteAddress();
|
InetSocketAddress getRemoteAddress();
|
||||||
}
|
}
|
||||||
|
@ -18,22 +18,22 @@ package org.jboss.netty.channel.socket.sctp;
|
|||||||
import com.sun.nio.sctp.SctpStandardSocketOption;
|
import com.sun.nio.sctp.SctpStandardSocketOption;
|
||||||
import org.jboss.netty.channel.ChannelException;
|
import org.jboss.netty.channel.ChannelException;
|
||||||
import org.jboss.netty.channel.DefaultServerChannelConfig;
|
import org.jboss.netty.channel.DefaultServerChannelConfig;
|
||||||
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
|
|
||||||
import org.jboss.netty.util.internal.ConversionUtil;
|
import org.jboss.netty.util.internal.ConversionUtil;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
|
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
|
||||||
*
|
*
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
|
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
|
||||||
*
|
*
|
||||||
* @version $Rev$, $Date$
|
* @version $Rev$, $Date$
|
||||||
*/
|
*/
|
||||||
public class SctpServerChannelConfig extends DefaultServerChannelConfig
|
public class SctpServerChannelConfig extends DefaultServerChannelConfig
|
||||||
implements ServerSocketChannelConfig {
|
implements ServerSctpChannelConfig {
|
||||||
|
|
||||||
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
|
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
|
||||||
private volatile int backlog;
|
private volatile int backlog;
|
||||||
@ -54,10 +54,8 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key.equals("receiveBufferSize")) {
|
if (key.equals("sctpInitMaxStreams")) {
|
||||||
setReceiveBufferSize(ConversionUtil.toInt(value));
|
setInitMaxStreams((SctpStandardSocketOption.InitMaxStreams) value);
|
||||||
} else if (key.equals("reuseAddress")) {
|
|
||||||
setReuseAddress(ConversionUtil.toBoolean(value));
|
|
||||||
} else if (key.equals("backlog")) {
|
} else if (key.equals("backlog")) {
|
||||||
setBacklog(ConversionUtil.toInt(value));
|
setBacklog(ConversionUtil.toInt(value));
|
||||||
} else {
|
} else {
|
||||||
@ -67,38 +65,23 @@ public class SctpServerChannelConfig extends DefaultServerChannelConfig
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isReuseAddress() {
|
public SctpStandardSocketOption.InitMaxStreams getInitMaxStreams() {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setReuseAddress(boolean reuseAddress) {
|
|
||||||
throw new UnsupportedOperationException("Not supported");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getReceiveBufferSize() {
|
|
||||||
try {
|
try {
|
||||||
return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
|
return serverChannel.getOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ChannelException(e);
|
throw new ChannelException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReceiveBufferSize(int receiveBufferSize) {
|
public void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams) {
|
||||||
try {
|
try {
|
||||||
serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
|
serverChannel.setOption(SctpStandardSocketOption.SCTP_INIT_MAXSTREAMS, initMaxStreams);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ChannelException(e);
|
throw new ChannelException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
|
|
||||||
throw new UnsupportedOperationException("Not supported");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getBacklog() {
|
public int getBacklog() {
|
||||||
return backlog;
|
return backlog;
|
||||||
|
@ -22,7 +22,12 @@ import org.jboss.netty.logging.InternalLoggerFactory;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@ -43,10 +48,10 @@ class SctpServerChannelImpl extends AbstractServerChannel
|
|||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(SctpServerChannelImpl.class);
|
InternalLoggerFactory.getInstance(SctpServerChannelImpl.class);
|
||||||
|
|
||||||
final com.sun.nio.sctp.SctpServerChannel socket;
|
final com.sun.nio.sctp.SctpServerChannel serverChannel;
|
||||||
final Lock shutdownLock = new ReentrantLock();
|
final Lock shutdownLock = new ReentrantLock();
|
||||||
volatile Selector selector;
|
volatile Selector selector;
|
||||||
private final ServerSocketChannelConfig config;
|
private final ServerSctpChannelConfig config;
|
||||||
|
|
||||||
private volatile boolean bound;
|
private volatile boolean bound;
|
||||||
|
|
||||||
@ -58,17 +63,17 @@ class SctpServerChannelImpl extends AbstractServerChannel
|
|||||||
super(factory, pipeline, sink);
|
super(factory, pipeline, sink);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
socket = com.sun.nio.sctp.SctpServerChannel.open();
|
serverChannel = com.sun.nio.sctp.SctpServerChannel.open();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ChannelException(
|
throw new ChannelException(
|
||||||
"Failed to open a server socket.", e);
|
"Failed to open a server socket.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
socket.configureBlocking(false);
|
serverChannel.configureBlocking(false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
try {
|
try {
|
||||||
socket.close();
|
serverChannel.close();
|
||||||
} catch (IOException e2) {
|
} catch (IOException e2) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Failed to close a partially initialized socket.", e2);
|
"Failed to close a partially initialized socket.", e2);
|
||||||
@ -77,28 +82,43 @@ class SctpServerChannelImpl extends AbstractServerChannel
|
|||||||
throw new ChannelException("Failed to enter non-blocking mode.", e);
|
throw new ChannelException("Failed to enter non-blocking mode.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
config = new SctpServerChannelConfig(socket);
|
config = new SctpServerChannelConfig(serverChannel);
|
||||||
|
|
||||||
fireChannelOpen(this);
|
fireChannelOpen(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerSocketChannelConfig getConfig() {
|
public ServerSctpChannelConfig getConfig() {
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetSocketAddress getLocalAddress() {
|
public InetSocketAddress getLocalAddress() {
|
||||||
try {
|
try {
|
||||||
return (InetSocketAddress) socket.getAllLocalAddresses().iterator().next();
|
final Iterator<SocketAddress> iterator = serverChannel.getAllLocalAddresses().iterator();
|
||||||
|
return iterator.hasNext() ? (InetSocketAddress) iterator.next() : null;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<InetSocketAddress> getAllLocalAddresses() {
|
||||||
|
try {
|
||||||
|
final Set<SocketAddress> allLocalAddresses = serverChannel.getAllLocalAddresses();
|
||||||
|
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
|
||||||
|
for (SocketAddress socketAddress : allLocalAddresses) {
|
||||||
|
addresses.add((InetSocketAddress) socketAddress);
|
||||||
|
}
|
||||||
|
return addresses;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetSocketAddress getRemoteAddress() {
|
public InetSocketAddress getRemoteAddress() {
|
||||||
return null;
|
return null;// not available for server channel
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -134,7 +134,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
|
|||||||
boolean bound = false;
|
boolean bound = false;
|
||||||
boolean bossStarted = false;
|
boolean bossStarted = false;
|
||||||
try {
|
try {
|
||||||
channel.socket.bind(localAddress, channel.getConfig().getBacklog());
|
channel.serverChannel.bind(localAddress, channel.getConfig().getBacklog());
|
||||||
bound = true;
|
bound = true;
|
||||||
channel.setBound();
|
channel.setBound();
|
||||||
future.setSuccess();
|
future.setSuccess();
|
||||||
@ -157,8 +157,8 @@ class SctpServerPipelineSink extends AbstractChannelSink {
|
|||||||
private void close(SctpServerChannelImpl channel, ChannelFuture future) {
|
private void close(SctpServerChannelImpl channel, ChannelFuture future) {
|
||||||
boolean bound = channel.isBound();
|
boolean bound = channel.isBound();
|
||||||
try {
|
try {
|
||||||
if (channel.socket.isOpen()) {
|
if (channel.serverChannel.isOpen()) {
|
||||||
channel.socket.close();
|
channel.serverChannel.close();
|
||||||
Selector selector = channel.selector;
|
Selector selector = channel.selector;
|
||||||
if (selector != null) {
|
if (selector != null) {
|
||||||
selector.wakeup();
|
selector.wakeup();
|
||||||
@ -204,7 +204,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
|
|||||||
|
|
||||||
boolean registered = false;
|
boolean registered = false;
|
||||||
try {
|
try {
|
||||||
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
|
channel.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
registered = true;
|
registered = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!registered) {
|
if (!registered) {
|
||||||
@ -227,7 +227,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
|
|||||||
selector.selectedKeys().clear();
|
selector.selectedKeys().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
SctpChannel acceptedSocket = channel.socket.accept();
|
SctpChannel acceptedSocket = channel.serverChannel.accept();
|
||||||
if (acceptedSocket != null) {
|
if (acceptedSocket != null) {
|
||||||
registerAcceptedChannel(acceptedSocket, currentThread);
|
registerAcceptedChannel(acceptedSocket, currentThread);
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ class SctpWorker implements Runnable {
|
|||||||
|
|
||||||
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
|
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
|
||||||
try {
|
try {
|
||||||
messageInfo = channel.socket.receive(bb, this, notificationHandler);
|
messageInfo = channel.channel.receive(bb, this, notificationHandler);
|
||||||
if (messageInfo != null) {
|
if (messageInfo != null) {
|
||||||
messageReceived = true;
|
messageReceived = true;
|
||||||
if (messageInfo.isComplete()) {
|
if (messageInfo.isComplete()) {
|
||||||
@ -337,12 +337,16 @@ class SctpWorker implements Runnable {
|
|||||||
predictor.previousReceiveBufferSize(receivedBytes);
|
predictor.previousReceiveBufferSize(receivedBytes);
|
||||||
|
|
||||||
// Fire the event.
|
// Fire the event.
|
||||||
fireMessageReceived(channel, new SctpMessage(messageInfo.streamNumber(), messageInfo.payloadProtocolID(), buffer));
|
fireMessageReceived(channel,
|
||||||
|
new SctpMessage(messageInfo.streamNumber(),
|
||||||
|
messageInfo.payloadProtocolID(),
|
||||||
|
buffer),
|
||||||
|
messageInfo.address());
|
||||||
} else {
|
} else {
|
||||||
recvBufferPool.release(bb);
|
recvBufferPool.release(bb);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (channel.socket.isBlocking() && !messageReceived || failure) {
|
if (channel.channel.isBlocking() && !messageReceived || failure) {
|
||||||
k.cancel(); // Some JDK implementations run into an infinite loop without this.
|
k.cancel(); // Some JDK implementations run into an infinite loop without this.
|
||||||
close(channel, succeededFuture(channel));
|
close(channel, succeededFuture(channel));
|
||||||
return false;
|
return false;
|
||||||
@ -434,7 +438,7 @@ class SctpWorker implements Runnable {
|
|||||||
long writtenBytes = 0;
|
long writtenBytes = 0;
|
||||||
|
|
||||||
final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
|
final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
|
||||||
final com.sun.nio.sctp.SctpChannel ch = channel.socket;
|
final com.sun.nio.sctp.SctpChannel ch = channel.channel;
|
||||||
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
|
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
|
||||||
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
|
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
|
||||||
synchronized (channel.writeLock) {
|
synchronized (channel.writeLock) {
|
||||||
@ -521,7 +525,7 @@ class SctpWorker implements Runnable {
|
|||||||
|
|
||||||
private void setOpWrite(SctpChannelImpl channel) {
|
private void setOpWrite(SctpChannelImpl channel) {
|
||||||
Selector selector = this.selector;
|
Selector selector = this.selector;
|
||||||
SelectionKey key = channel.socket.keyFor(selector);
|
SelectionKey key = channel.channel.keyFor(selector);
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -544,7 +548,7 @@ class SctpWorker implements Runnable {
|
|||||||
|
|
||||||
private void clearOpWrite(SctpChannelImpl channel) {
|
private void clearOpWrite(SctpChannelImpl channel) {
|
||||||
Selector selector = this.selector;
|
Selector selector = this.selector;
|
||||||
SelectionKey key = channel.socket.keyFor(selector);
|
SelectionKey key = channel.channel.keyFor(selector);
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -569,7 +573,7 @@ class SctpWorker implements Runnable {
|
|||||||
boolean connected = channel.isConnected();
|
boolean connected = channel.isConnected();
|
||||||
boolean bound = channel.isBound();
|
boolean bound = channel.isBound();
|
||||||
try {
|
try {
|
||||||
channel.socket.close();
|
channel.channel.close();
|
||||||
cancelledKeys++;
|
cancelledKeys++;
|
||||||
|
|
||||||
if (channel.setClosed()) {
|
if (channel.setClosed()) {
|
||||||
@ -653,7 +657,7 @@ class SctpWorker implements Runnable {
|
|||||||
// Acquire a lock to avoid possible race condition.
|
// Acquire a lock to avoid possible race condition.
|
||||||
synchronized (channel.interestOpsLock) {
|
synchronized (channel.interestOpsLock) {
|
||||||
Selector selector = this.selector;
|
Selector selector = this.selector;
|
||||||
SelectionKey key = channel.socket.keyFor(selector);
|
SelectionKey key = channel.channel.keyFor(selector);
|
||||||
|
|
||||||
if (key == null || selector == null) {
|
if (key == null || selector == null) {
|
||||||
// Not registered to the worker yet.
|
// Not registered to the worker yet.
|
||||||
@ -748,11 +752,11 @@ class SctpWorker implements Runnable {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if (server) {
|
if (server) {
|
||||||
channel.socket.configureBlocking(false);
|
channel.channel.configureBlocking(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (channel.interestOpsLock) {
|
synchronized (channel.interestOpsLock) {
|
||||||
channel.socket.register(
|
channel.channel.register(
|
||||||
selector, channel.getRawInterestOps(), channel);
|
selector, channel.getRawInterestOps(), channel);
|
||||||
}
|
}
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
|
@ -0,0 +1,70 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2009 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel.socket.sctp;
|
||||||
|
|
||||||
|
import com.sun.nio.sctp.SctpStandardSocketOption;
|
||||||
|
import org.jboss.netty.channel.ChannelConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.sctp.ServerSctpChannelConfig}.
|
||||||
|
* <p/>
|
||||||
|
* <h3>Available options</h3>
|
||||||
|
* <p/>
|
||||||
|
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig},
|
||||||
|
* {@link org.jboss.netty.channel.socket.sctp.ServerSctpChannelConfig} allows the following options in the
|
||||||
|
* option map:
|
||||||
|
* <p/>
|
||||||
|
* <table border="1" cellspacing="0" cellpadding="6">
|
||||||
|
* <tr>
|
||||||
|
* <th>Name</th><th>Associated setter method</th>
|
||||||
|
* </tr><tr>
|
||||||
|
* <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
|
||||||
|
* </tr><tr>
|
||||||
|
* <td>{@code "sctpInitMaxStreams"}</td><td>{@link #setInitMaxStreams(com.sun.nio.sctp.SctpStandardSocketOption.InitMaxStreams)} (int)}}</td>
|
||||||
|
* </tr>
|
||||||
|
* </table>
|
||||||
|
*
|
||||||
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
|
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
|
||||||
|
*
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
|
public interface ServerSctpChannelConfig extends ChannelConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the backlog value to specify when the channel binds to a local
|
||||||
|
* address.
|
||||||
|
*/
|
||||||
|
int getBacklog();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the backlog value to specify when the channel binds to a local
|
||||||
|
* address.
|
||||||
|
*/
|
||||||
|
void setBacklog(int backlog);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
|
||||||
|
*/
|
||||||
|
SctpStandardSocketOption.InitMaxStreams getInitMaxStreams();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the <a href="http://openjdk.java.net/projects/sctp/javadoc/com/sun/nio/sctp/SctpStandardSocketOption.html">{@code SCTP_INIT_MAXSTREAMS}</a> option.
|
||||||
|
*/
|
||||||
|
void setInitMaxStreams(SctpStandardSocketOption.InitMaxStreams initMaxStreams);
|
||||||
|
}
|
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.jboss.netty.util.internal;
|
package org.jboss.netty.util.internal;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -95,19 +93,6 @@ public class ConversionUtil {
|
|||||||
return String.valueOf(value).split("[, \\t\\n\\r\\f\\e\\a]");
|
return String.valueOf(value).split("[, \\t\\n\\r\\f\\e\\a]");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts the specified object into a socketAddress.
|
|
||||||
*/
|
|
||||||
public static SocketAddress toSocketAddress(Object value) {
|
|
||||||
|
|
||||||
if (value instanceof String) {
|
|
||||||
String [] hostPorts = (((String) value).trim()).split(":");
|
|
||||||
return new InetSocketAddress(hostPorts[0], Integer.parseInt(hostPorts[1]));
|
|
||||||
} else {
|
|
||||||
return (SocketAddress) value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final String[] INTEGERS = {
|
private static final String[] INTEGERS = {
|
||||||
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
|
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
|
||||||
"10","11","12","13","14","15",
|
"10","11","12","13","14","15",
|
||||||
|
@ -19,9 +19,6 @@ import static org.junit.Assert.*;
|
|||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
@ -77,14 +74,4 @@ public class ConversionUtilTest {
|
|||||||
assertFalse(ConversionUtil.toBoolean("FALSE"));
|
assertFalse(ConversionUtil.toBoolean("FALSE"));
|
||||||
assertFalse(ConversionUtil.toBoolean("0"));
|
assertFalse(ConversionUtil.toBoolean("0"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testToSocketAddress() throws Exception {
|
|
||||||
SocketAddress socketAddress = ConversionUtil.toSocketAddress("localhost:9658");
|
|
||||||
assertNotNull(socketAddress);
|
|
||||||
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
|
|
||||||
assertEquals("localhost", inetSocketAddress.getHostName());
|
|
||||||
assertEquals(9658, inetSocketAddress.getPort());
|
|
||||||
assertNotNull(ConversionUtil.toSocketAddress(inetSocketAddress));
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user