added initial version of sctp transport

This commit is contained in:
Jestan Nirojan 2011-04-23 18:38:32 +05:30
parent 92ccd2fbfa
commit dad06d539c
23 changed files with 884 additions and 319 deletions

View File

@ -0,0 +1,65 @@
package org.jboss.netty.channel;
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import com.sun.nio.sctp.Notification;
/**
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class UpstreamChannelNotificationEvent implements ChannelEvent {
private Channel channel;
private Notification notification;
private Object value;
public UpstreamChannelNotificationEvent(Channel channel, Notification notification, Object value) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (notification == null) {
throw new NullPointerException("notification");
}
if (value == null) {
throw new NullPointerException("value");
}
this.channel = channel;
this.notification = notification;
this.value = value;
}
@Override
public Channel getChannel() {
return channel;
}
@Override
public ChannelFuture getFuture() {
return Channels.succeededFuture(channel);
}
public Notification getNotification() {
return notification;
}
public Object getValue() {
return value;
}
}

View File

@ -15,32 +15,29 @@
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.*;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.ConversionUtil;
import java.net.Socket;
import java.util.Map;
/**
* The default {@link NioSocketChannelConfig} implementation.
* The default {@link SctpChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
implements NioSocketChannelConfig {
class DefaultSctpChannelConfig implements SctpChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class);
InternalLoggerFactory.getInstance(DefaultSctpChannelConfig.class);
private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY =
new AdaptiveReceiveBufferSizePredictorFactory();
@ -50,14 +47,16 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
private volatile ReceiveBufferSizePredictor predictor;
private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY;
private volatile int writeSpinCount = 16;
private SctpChannel socket;
private int payloadProtocolId = 0;
DefaultNioSocketChannelConfig(Socket socket) {
super(socket);
DefaultSctpChannelConfig(SctpChannel socket) {
this.socket = socket;
}
@Override
public void setOptions(Map<String, Object> options) {
super.setOptions(options);
setOptions(options);
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
@ -71,10 +70,6 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("writeBufferHighWaterMark")) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
@ -86,11 +81,39 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
} else if (key.equals("receiveBufferSizePredictor")) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
//TODO: set sctp channel options
return false;
}
return true;
}
@Override
public ChannelBufferFactory getBufferFactory() {
return null;
}
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
}
@Override
public ChannelPipelineFactory getPipelineFactory() {
return null;
}
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
}
@Override
public int getConnectTimeoutMillis() {
return 0;
}
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
}
@Override
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
@ -190,4 +213,76 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
}
this.predictorFactory = predictorFactory;
}
@Override
public int getPayloadProtocol() {
return payloadProtocolId;
}
@Override
public boolean isTcpNoDelay() {
return false;
}
@Override
public void setTcpNoDelay(boolean tcpNoDelay) {
}
@Override
public int getSoLinger() {
return 0;
}
@Override
public void setSoLinger(int soLinger) {
}
@Override
public int getSendBufferSize() {
return 0;
}
@Override
public void setSendBufferSize(int sendBufferSize) {
}
@Override
public int getReceiveBufferSize() {
return 0;
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
}
@Override
public boolean isKeepAlive() {
return false;
}
@Override
public void setKeepAlive(boolean keepAlive) {
}
@Override
public int getTrafficClass() {
return 0;
}
@Override
public void setTrafficClass(int trafficClass) {
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
}
}

View File

@ -16,8 +16,6 @@
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
@ -30,26 +28,26 @@ import java.util.concurrent.Executor;
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory};
* There are two types of threads in a {@link DefaultSctpClientChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss thread</h4>
* <p>
* One {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} has one boss thread. It makes
* One {@link DefaultSctpClientChannelFactory} has one boss thread. It makes
* a connection attempt on request. Once a connection attempt succeeds,
* the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} manages.
* threads that the {@link DefaultSctpClientChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} can have one or more worker
* One {@link DefaultSctpClientChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} was created. A boss thread is
* when a {@link DefaultSctpClientChannelFactory} was created. A boss thread is
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -73,20 +71,21 @@ import java.util.concurrent.Executor;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
* @apiviz.landmark
*/
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory {
private final Executor bossExecutor;
private final Executor workerExecutor;
private final NioClientSocketPipelineSink sink;
private final SctpClientPipelineSink sink;
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* {@link #DefaultSctpClientChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
@ -95,7 +94,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/
public NioClientSocketChannelFactory(
public DefaultSctpClientChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
@ -110,7 +109,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioClientSocketChannelFactory(
public DefaultSctpClientChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
if (bossExecutor == null) {
@ -127,12 +126,12 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor;
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount);
sink = new SctpClientPipelineSink(bossExecutor, workerExecutor, workerCount);
}
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
public SctpChannel newChannel(ChannelPipeline pipeline) {
return new SctpClientChannel(this, pipeline, sink, sink.nextWorker());
}
@Override

View File

@ -0,0 +1,114 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException;
/**
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig
implements SctpServerChannelConfig {
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
private volatile int backlog;
/**
* Creates a new instance.
*/
public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getReceiveBufferSize() {
try {
return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -17,8 +17,6 @@ package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
@ -31,7 +29,7 @@ import java.util.concurrent.Executor;
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory};
* There are two types of threads in a {@link DefaultSctpServerChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss threads</h4>
@ -41,18 +39,18 @@ import java.util.concurrent.Executor;
* have two boss threads. A boss thread accepts incoming connections until
* the port is unbound. Once a connection is accepted successfully, the boss
* thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} manages.
* threads that the {@link DefaultSctpServerChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} can have one or more worker
* One {@link DefaultSctpServerChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} was created. Boss threads are
* when a {@link DefaultSctpServerChannelFactory} was created. Boss threads are
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -77,12 +75,13 @@ import java.util.concurrent.Executor;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
* @apiviz.landmark
*/
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory {
final Executor bossExecutor;
private final Executor workerExecutor;
@ -90,7 +89,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioServerSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* {@link #DefaultSctpServerChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
@ -99,7 +98,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/
public NioServerSocketChannelFactory(
public DefaultSctpServerChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
@ -114,7 +113,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioServerSocketChannelFactory(
public DefaultSctpServerChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
if (bossExecutor == null) {
@ -130,12 +129,12 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
}
this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor;
sink = new NioServerSocketPipelineSink(workerExecutor, workerCount);
sink = new SctpServerPipelineSink(workerExecutor, workerCount);
}
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink);
public SctpServerChannel newChannel(ChannelPipeline pipeline) {
return new SctpServerChannelImpl(this, pipeline, sink);
}
@Override

View File

@ -0,0 +1,78 @@
package org.jboss.netty.channel.socket.sctp;
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import com.sun.nio.sctp.*;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.UpstreamChannelNotificationEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class NotificationHandler extends AbstractNotificationHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NotificationHandler.class);
private final SctpChannelImpl sctpChannel;
private final SctpWorker sctpWorker;
public NotificationHandler(SctpChannelImpl sctpChannel, SctpWorker sctpWorker) {
this.sctpChannel = sctpChannel;
this.sctpWorker = sctpWorker;
}
@Override
public HandlerResult handleNotification(AssociationChangeNotification notification, Object o) {
fireNotificationReceived(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(Notification notification, Object o) {
fireNotificationReceived(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(PeerAddressChangeNotification notification, Object o) {
fireNotificationReceived(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(SendFailedNotification notification, Object o) {
fireNotificationReceived(notification, o);
return HandlerResult.CONTINUE;
}
@Override
public HandlerResult handleNotification(ShutdownNotification notification, Object o) {
sctpWorker.close(sctpChannel, Channels.succeededFuture(sctpChannel));
return HandlerResult.RETURN;
}
private void fireNotificationReceived(Notification notification, Object o) {
sctpChannel.getPipeline().sendUpstream(new UpstreamChannelNotificationEvent(sctpChannel, notification, o));
}
}

View File

@ -15,31 +15,31 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import java.nio.channels.SocketChannel;
import static org.jboss.netty.channel.Channels.*;
/**
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
final class NioAcceptedSocketChannel extends org.jboss.netty.channel.socket.sctp.NioSocketChannel {
final class SctpAcceptedChannel extends SctpChannelImpl {
final Thread bossThread;
NioAcceptedSocketChannel(
SctpAcceptedChannel(
ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) {
SctpChannel socket, SctpWorker worker, Thread bossThread) {
super(parent, factory, pipeline, sink, socket, worker);

View File

@ -0,0 +1,41 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
//TODO: support set of loacal, remote addresses.
public interface SctpChannel extends Channel{
@Override
InetSocketAddress getLocalAddress();
@Override
SctpChannelConfig getConfig();
@Override
InetSocketAddress getRemoteAddress();
}

View File

@ -25,7 +25,7 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig} and
* {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link org.jboss.netty.channel.socket.sctp.NioSocketChannelConfig} allows the
* {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link SctpChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
@ -46,10 +46,11 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public interface NioSocketChannelConfig extends SocketChannelConfig {
public interface SctpChannelConfig extends SocketChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
@ -138,4 +139,11 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
*/
void setReceiveBufferSizePredictorFactory(
ReceiveBufferSizePredictorFactory predictorFactory);
/**
* Get the sctp payload protocol id
* A value indicating the type of payload protocol data being transmitted. This value is passed as opaque data by SCTP.
* @return payload protocol id
*/
int getPayloadProtocol();
}

View File

@ -15,15 +15,15 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -33,12 +33,13 @@ import static org.jboss.netty.channel.Channels.fireChannelInterestChanged;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
class SctpChannelImpl extends AbstractChannel
implements org.jboss.netty.channel.socket.sctp.SctpChannel {
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
@ -46,9 +47,9 @@ class NioSocketChannel extends AbstractChannel
private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN;
final SocketChannel socket;
final NioWorker worker;
private final NioSocketChannelConfig config;
final SctpChannel sctpChannel;
final SctpWorker worker;
private final SctpChannelConfig config;
private volatile InetSocketAddress localAddress;
private volatile InetSocketAddress remoteAddress;
@ -67,18 +68,16 @@ class NioSocketChannel extends AbstractChannel
MessageEvent currentWriteEvent;
SendBuffer currentWriteBuffer;
public NioSocketChannel(
public SctpChannelImpl(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
SctpChannel sctpChannel, SctpWorker worker) {
super(parent, factory, pipeline, sink);
this.socket = socket;
this.sctpChannel = sctpChannel;
this.worker = worker;
config = new DefaultNioSocketChannelConfig(socket.socket());
config = new DefaultSctpChannelConfig(sctpChannel);
// TODO Move the state variable to AbstractChannel so that we don't need
// to add many listeners.
getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -88,7 +87,7 @@ class NioSocketChannel extends AbstractChannel
}
@Override
public NioSocketChannelConfig getConfig() {
public SctpChannelConfig getConfig() {
return config;
}
@ -97,8 +96,9 @@ class NioSocketChannel extends AbstractChannel
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
//TODO: fix this
this.localAddress = localAddress =
(InetSocketAddress) socket.socket().getLocalSocketAddress();
(InetSocketAddress) sctpChannel.getAllLocalAddresses().iterator().next();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
@ -112,8 +112,9 @@ class NioSocketChannel extends AbstractChannel
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
//TODO: fix this
this.remoteAddress = remoteAddress =
(InetSocketAddress) socket.socket().getRemoteSocketAddress();
(InetSocketAddress) sctpChannel.getRemoteAddresses().iterator().next();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
@ -225,7 +226,7 @@ class NioSocketChannel extends AbstractChannel
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this);
fireChannelInterestChanged(SctpChannelImpl.this);
notifying.set(Boolean.FALSE);
}
}
@ -246,7 +247,7 @@ class NioSocketChannel extends AbstractChannel
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this);
fireChannelInterestChanged(SctpChannelImpl.this);
notifying.set(Boolean.FALSE);
}
}
@ -273,7 +274,7 @@ class NioSocketChannel extends AbstractChannel
@Override
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this);
worker.writeFromTaskLoop(SctpChannelImpl.this);
}
}
}

View File

@ -15,12 +15,12 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.channel.*;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
@ -28,33 +28,34 @@ import static org.jboss.netty.channel.Channels.fireChannelOpen;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
final class NioClientSocketChannel extends NioSocketChannel {
final class SctpClientChannel extends SctpChannelImpl {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketChannel.class);
InternalLoggerFactory.getInstance(SctpClientChannel.class);
private static SocketChannel newSocket() {
SocketChannel socket;
private static SctpChannel newSocket() {
SctpChannel underlayingChannel;
try {
socket = SocketChannel.open();
underlayingChannel = SctpChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
boolean success = false;
try {
socket.configureBlocking(false);
underlayingChannel.configureBlocking(false);
success = true;
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
} finally {
if (!success) {
try {
socket.close();
underlayingChannel.close();
} catch (IOException e) {
logger.warn(
"Failed to close a partially initialized socket.",
@ -63,7 +64,7 @@ final class NioClientSocketChannel extends NioSocketChannel {
}
}
return socket;
return underlayingChannel;
}
volatile ChannelFuture connectFuture;
@ -72,9 +73,9 @@ final class NioClientSocketChannel extends NioSocketChannel {
// Does not need to be volatile as it's accessed by only one thread.
long connectDeadlineNanos;
NioClientSocketChannel(
SctpClientChannel(
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
ChannelSink sink, SctpWorker worker) {
super(null, factory, pipeline, sink, newSocket(), worker);
fireChannelOpen(this);

View File

@ -0,0 +1,35 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a client-side {@link org.jboss.netty.channel.socket.SocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
* @apiviz.has org.jboss.netty.channel.socket.SocketChannel oneway - - creates
*/
public interface SctpClientChannelFactory extends ChannelFactory {
@Override
SctpChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -41,26 +41,27 @@ import static org.jboss.netty.channel.Channels.*;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioClientSocketPipelineSink extends AbstractChannelSink {
class SctpClientPipelineSink extends AbstractChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
InternalLoggerFactory.getInstance(SctpClientPipelineSink.class);
final Executor bossExecutor;
private final Boss boss = new Boss();
private final NioWorker[] workers;
private final SctpWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
NioClientSocketPipelineSink(
SctpClientPipelineSink(
Executor bossExecutor, Executor workerExecutor, int workerCount) {
this.bossExecutor = bossExecutor;
workers = new NioWorker[workerCount];
workers = new SctpWorker[workerCount];
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(workerExecutor);
workers[i] = new SctpWorker(workerExecutor);
}
}
@ -69,8 +70,8 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioClientSocketChannel channel =
(NioClientSocketChannel) event.getChannel();
SctpClientChannel channel =
(SctpClientChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
@ -101,7 +102,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
SctpChannelImpl channel = (SctpChannelImpl) event.getChannel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.worker.writeFromUserCode(channel);
@ -109,10 +110,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
private void bind(
NioClientSocketChannel channel, ChannelFuture future,
SctpClientChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.socket.socket().bind(localAddress);
channel.sctpChannel.bind(localAddress);
channel.boundManually = true;
channel.setBound();
future.setSuccess();
@ -124,10 +125,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
if (channel.socket.connect(remoteAddress)) {
if (channel.sctpChannel.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
channel.getCloseFuture().addListener(new ChannelFutureListener() {
@ -151,7 +152,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
}
NioWorker nextWorker() {
SctpWorker nextWorker() {
return workers[Math.abs(
workerIndex.getAndIncrement() % workers.length)];
}
@ -168,7 +169,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
super();
}
void register(NioClientSocketChannel channel) {
void register(SctpClientChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
@ -352,7 +353,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
continue;
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
SctpClientChannel ch = (SctpClientChannel) k.attachment();
if (ch.connectDeadlineNanos > 0 &&
currentTimeNanos >= ch.connectDeadlineNanos) {
@ -368,9 +369,9 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
SctpClientChannel ch = (SctpClientChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
if (ch.sctpChannel.finishConnect()) {
k.cancel();
ch.worker.register(ch, ch.connectFuture);
}
@ -383,16 +384,16 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}
private void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
SctpClientChannel ch = (SctpClientChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
}
private static final class RegisterTask implements Runnable {
private final Boss boss;
private final NioClientSocketChannel channel;
private final SctpClientChannel channel;
RegisterTask(Boss boss, NioClientSocketChannel channel) {
RegisterTask(Boss boss, SctpClientChannel channel) {
this.boss = boss;
this.channel = channel;
}
@ -400,7 +401,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
@Override
public void run() {
try {
channel.socket.register(
channel.sctpChannel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));

View File

@ -39,13 +39,14 @@ import java.util.regex.Pattern;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioProviderMetadata {
class SctpProviderMetadata {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioProviderMetadata.class);
InternalLoggerFactory.getInstance(SctpProviderMetadata.class);
private static final String CONSTRAINT_LEVEL_PROPERTY =
"org.jboss.netty.channel.socket.sctp.constraintLevel";
@ -429,7 +430,7 @@ class NioProviderMetadata {
new ConstraintLevelAutodetector().autodetect());
}
private NioProviderMetadata() {
private SctpProviderMetadata() {
// Unused
}
}

View File

@ -21,16 +21,18 @@ import java.nio.ByteBuffer;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
final class SocketReceiveBufferPool {
final class SctpReceiveBufferPool {
private static final int POOL_SIZE = 8;
@SuppressWarnings("unchecked")
private final SoftReference<ByteBuffer>[] pool = new SoftReference[POOL_SIZE];
SocketReceiveBufferPool() {
SctpReceiveBufferPool() {
super();
}

View File

@ -15,22 +15,23 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.FileRegion;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.WritableByteChannel;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $
*/
final class SocketSendBufferPool {
final class SctpSendBufferPool {
private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
@ -41,7 +42,7 @@ final class SocketSendBufferPool {
PreallocationRef poolHead = null;
Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
SocketSendBufferPool() {
SctpSendBufferPool() {
super();
}
@ -56,13 +57,6 @@ final class SocketSendBufferPool {
"unsupported message type: " + message.getClass());
}
private final SendBuffer acquire(FileRegion src) {
if (src.getCount() == 0) {
return EMPTY_BUFFER;
}
return new FileSendBuffer(src);
}
private final SendBuffer acquire(ChannelBuffer src) {
final int size = src.readableBytes();
if (size == 0) {
@ -86,7 +80,7 @@ final class SocketSendBufferPool {
ByteBuffer slice = buffer.duplicate();
buffer.position(align(nextPos));
slice.limit(nextPos);
current.refCnt ++;
current.refCnt++;
dst = new PooledSendBuffer(current, slice);
} else if (size > remaining) {
this.current = current = getPreallocation();
@ -94,10 +88,10 @@ final class SocketSendBufferPool {
ByteBuffer slice = buffer.duplicate();
buffer.position(align(size));
slice.limit(size);
current.refCnt ++;
current.refCnt++;
dst = new PooledSendBuffer(current, slice);
} else { // size == remaining
current.refCnt ++;
current.refCnt++;
this.current = getPreallocation0();
dst = new PooledSendBuffer(current, current.buffer);
}
@ -142,7 +136,7 @@ final class SocketSendBufferPool {
int q = pos >>> ALIGN_SHIFT;
int r = pos & ALIGN_MASK;
if (r != 0) {
q ++;
q++;
}
return q << ALIGN_SHIFT;
}
@ -167,11 +161,12 @@ final class SocketSendBufferPool {
interface SendBuffer {
boolean finished();
long writtenBytes();
long totalBytes();
long transferTo(WritableByteChannel ch) throws IOException;
long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException;
void release();
}
@ -202,13 +197,12 @@ final class SocketSendBufferPool {
}
@Override
public final long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
@Override
public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo);
ch.send(buffer, messageInfo);
return writtenBytes();
}
@Override
@ -245,19 +239,18 @@ final class SocketSendBufferPool {
}
@Override
public long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
@Override
public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo);
ch.send(buffer, messageInfo);
return writtenBytes();
}
@Override
public void release() {
final Preallocation parent = this.parent;
if (-- parent.refCnt == 0) {
if (--parent.refCnt == 0) {
parent.buffer.clear();
if (parent != current) {
poolHead = new PreallocationRef(parent, poolHead);
@ -266,50 +259,6 @@ final class SocketSendBufferPool {
}
}
final class FileSendBuffer implements SendBuffer {
private final FileRegion file;
private long writtenBytes;
FileSendBuffer(FileRegion file) {
this.file = file;
}
@Override
public boolean finished() {
return writtenBytes >= file.getCount();
}
@Override
public long writtenBytes() {
return writtenBytes;
}
@Override
public long totalBytes() {
return file.getCount();
}
@Override
public long transferTo(WritableByteChannel ch) throws IOException {
long localWrittenBytes = file.transferTo(ch, writtenBytes);
writtenBytes += localWrittenBytes;
return localWrittenBytes;
}
@Override
public long transferTo(DatagramChannel ch, SocketAddress raddr)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void release() {
// Unpooled.
}
}
static final class EmptySendBuffer implements SendBuffer {
EmptySendBuffer() {
@ -332,12 +281,7 @@ final class SocketSendBufferPool {
}
@Override
public final long transferTo(WritableByteChannel ch) throws IOException {
return 0;
}
@Override
public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
return 0;
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
/**
* A TCP/IP {@link org.jboss.netty.channel.ServerChannel} which accepts incoming TCP/IP connections.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
*/
public interface SctpServerChannel extends ServerChannel {
@Override
SctpServerChannelConfig getConfig();
@Override
InetSocketAddress getLocalAddress();
@Override
InetSocketAddress getRemoteAddress();
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelConfig;
/**
* A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.ServerSocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig},
* {@link org.jboss.netty.channel.socket.sctp.SctpServerChannelConfig} allows the following options in the
* option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
* </tr><tr>
* <td>{@code "reuseAddress"}</td><td>{@link #setReuseAddress(boolean)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSize"}</td><td>{@link #setReceiveBufferSize(int)}</td>
* </tr>
* </table>
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public interface SctpServerChannelConfig extends ChannelConfig {
/**
* Gets the backlog value to specify when the channel binds to a local
* address.
*/
int getBacklog();
/**
* Sets the backlog value to specify when the channel binds to a local
* address.
*/
void setBacklog(int backlog);
/**
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option.
*/
boolean isReuseAddress();
/**
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option.
*/
void setReuseAddress(boolean reuseAddress);
/**
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option.
*/
int getReceiveBufferSize();
/**
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option.
*/
void setReceiveBufferSize(int receiveBufferSize);
/**
* Sets the performance preferences as specified in
* {@link java.net.ServerSocket#setPerformancePreferences(int, int, int)}.
*/
void setPerformancePreferences(int connectionTime, int latency, int bandwidth);
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.channel.socket.ServerSocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a {@link org.jboss.netty.channel.socket.ServerSocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
*/
public interface SctpServerChannelFactory extends ServerChannelFactory {
@Override
SctpServerChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -16,7 +16,6 @@
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.DefaultServerSocketChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
@ -24,7 +23,6 @@ import org.jboss.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -34,22 +32,25 @@ import static org.jboss.netty.channel.Channels.fireChannelOpen;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioServerSocketChannel extends AbstractServerChannel
implements org.jboss.netty.channel.socket.ServerSocketChannel {
class SctpServerChannelImpl extends AbstractServerChannel
implements SctpServerChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
InternalLoggerFactory.getInstance(SctpServerChannelImpl.class);
final ServerSocketChannel socket;
final com.sun.nio.sctp.SctpServerChannel socket;
final Lock shutdownLock = new ReentrantLock();
volatile Selector selector;
private final ServerSocketChannelConfig config;
private final SctpServerChannelConfig config;
NioServerSocketChannel(
private volatile boolean bound;
SctpServerChannelImpl(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
@ -57,7 +58,7 @@ class NioServerSocketChannel extends AbstractServerChannel
super(factory, pipeline, sink);
try {
socket = ServerSocketChannel.open();
socket = com.sun.nio.sctp.SctpServerChannel.open();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
@ -76,19 +77,23 @@ class NioServerSocketChannel extends AbstractServerChannel
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultServerSocketChannelConfig(socket.socket());
config = new DefaultSctpServerChannelConfig(socket);
fireChannelOpen(this);
}
@Override
public ServerSocketChannelConfig getConfig() {
public SctpServerChannelConfig getConfig() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.socket().getLocalSocketAddress();
try {
return (InetSocketAddress) socket.getAllLocalAddresses().iterator().next();
} catch (IOException e) {
return null;
}
}
@Override
@ -98,7 +103,11 @@ class NioServerSocketChannel extends AbstractServerChannel
@Override
public boolean isBound() {
return isOpen() && socket.socket().isBound();
return isOpen() && bound;
}
public void setBound() {
bound = true;
}
@Override

View File

@ -15,6 +15,7 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.logging.InternalLogger;
@ -34,22 +35,23 @@ import static org.jboss.netty.channel.Channels.*;
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioServerSocketPipelineSink extends AbstractChannelSink {
class SctpServerPipelineSink extends AbstractChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
InternalLoggerFactory.getInstance(SctpServerPipelineSink.class);
private final NioWorker[] workers;
private final SctpWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
workers = new NioWorker[workerCount];
SctpServerPipelineSink(Executor workerExecutor, int workerCount) {
workers = new SctpWorker[workerCount];
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(workerExecutor);
workers[i] = new SctpWorker(workerExecutor);
}
}
@ -57,9 +59,9 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
if (channel instanceof SctpServerChannelImpl) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
} else if (channel instanceof SctpChannelImpl) {
handleAcceptedSocket(e);
}
}
@ -70,8 +72,8 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
SctpServerChannelImpl channel =
(SctpServerChannelImpl) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
@ -95,7 +97,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
private void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
SctpChannelImpl channel = (SctpChannelImpl) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
@ -113,12 +115,12 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
channel.worker.setInterestOps(channel, future, (Integer) value);
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
SctpChannelImpl channel = (SctpChannelImpl) event.getChannel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.worker.writeFromUserCode(channel);
@ -126,20 +128,20 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SctpServerChannelImpl channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
channel.socket.bind(localAddress, channel.getConfig().getBacklog());
bound = true;
channel.setBound();
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
((DefaultSctpServerChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor, new Boss(channel));
bossStarted = true;
} catch (Throwable t) {
@ -152,7 +154,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
}
private void close(NioServerSocketChannel channel, ChannelFuture future) {
private void close(SctpServerChannelImpl channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
if (channel.socket.isOpen()) {
@ -186,16 +188,16 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
}
NioWorker nextWorker() {
SctpWorker nextWorker() {
return workers[Math.abs(
workerIndex.getAndIncrement() % workers.length)];
}
private final class Boss implements Runnable {
private final Selector selector;
private final NioServerSocketChannel channel;
private final SctpServerChannelImpl channel;
Boss(NioServerSocketChannel channel) throws IOException {
Boss(SctpServerChannelImpl channel) throws IOException {
this.channel = channel;
selector = Selector.open();
@ -225,7 +227,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
SctpChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
@ -255,14 +257,14 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
private void registerAcceptedChannel(SctpChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
SctpWorker worker = nextWorker();
worker.register(new SctpAcceptedChannel(
channel.getFactory(), pipeline, channel,
org.jboss.netty.channel.socket.sctp.NioServerSocketPipelineSink.this, acceptedSocket,
SctpServerPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(

View File

@ -15,11 +15,12 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.DeadLockProofWorker;
@ -41,19 +42,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.jboss.netty.channel.Channels.*;
/**
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class NioWorker implements Runnable {
class SctpWorker implements Runnable {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioWorker.class);
InternalLoggerFactory.getInstance(SctpWorker.class);
private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
private static final int CONSTRAINT_LEVEL = SctpProviderMetadata.CONSTRAINT_LEVEL;
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
@ -68,17 +68,22 @@ class NioWorker implements Runnable {
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool();
private int payloadProtocolId = 0;// un-known sctp payload protocol id
NioWorker(Executor executor) {
private NotificationHandler notificationHandler;
SctpWorker(Executor executor) {
this.executor = executor;
}
void register(NioSocketChannel channel, ChannelFuture future) {
void register(SctpChannelImpl channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
boolean server = !(channel instanceof SctpClientChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
payloadProtocolId = channel.getConfig().getPayloadProtocol();
notificationHandler = new NotificationHandler(channel, this);
Selector selector;
synchronized (startStopLock) {
@ -131,13 +136,13 @@ class NioWorker implements Runnable {
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
for (; ;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
@ -188,7 +193,7 @@ class NioWorker implements Runnable {
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
@ -229,7 +234,7 @@ class NioWorker implements Runnable {
}
private void processRegisterTaskQueue() throws IOException {
for (;;) {
for (; ;) {
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
@ -241,7 +246,7 @@ class NioWorker implements Runnable {
}
private void processWriteTaskQueue() throws IOException {
for (;;) {
for (; ;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
@ -287,45 +292,50 @@ class NioWorker implements Runnable {
}
private boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final SctpChannelImpl channel = (SctpChannelImpl) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
int ret = 0;
int readBytes = 0;
boolean messageReceived = false;
boolean failure = true;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
MessageInfo messageInfo = channel.sctpChannel.receive(bb, this, notificationHandler);
if (messageInfo != null) {
messageReceived = true;
if (messageInfo.isComplete()) {
failure = false;
} else {
logger.error("Received incomplete sctp packet, can not continue! Expected SCTP_EXPLICIT_COMPLETE message");
failure = true;
}
} else {
messageReceived = false;
failure = false;
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
if (messageReceived) {
bb.flip();
final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
channel.getConfig().getBufferFactory();
final int receivedBytes = bb.remaining();
final ChannelBuffer buffer = bufferFactory.getBuffer(receivedBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
buffer.writerIndex(receivedBytes);
recvBufferPool.release(bb);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
predictor.previousReceiveBufferSize(receivedBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
@ -333,7 +343,7 @@ class NioWorker implements Runnable {
recvBufferPool.release(bb);
}
if (ret < 0 || failure) {
if (channel.sctpChannel.isBlocking() && !messageReceived || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
@ -343,11 +353,11 @@ class NioWorker implements Runnable {
}
private void close(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
SctpChannelImpl ch = (SctpChannelImpl) k.attachment();
close(ch, succeededFuture(ch));
}
void writeFromUserCode(final NioSocketChannel channel) {
void writeFromUserCode(final SctpChannelImpl channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
@ -370,19 +380,19 @@ class NioWorker implements Runnable {
write0(channel);
}
void writeFromTaskLoop(final NioSocketChannel ch) {
void writeFromTaskLoop(final SctpChannelImpl ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
SctpChannelImpl ch = (SctpChannelImpl) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
private boolean scheduleWriteIfNecessary(final SctpChannelImpl channel) {
final Thread currentThread = Thread.currentThread();
final Thread workerThread = thread;
if (currentThread != workerThread) {
@ -391,8 +401,8 @@ class NioWorker implements Runnable {
assert offered;
}
if (!(channel instanceof NioAcceptedSocketChannel) ||
((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
if (!(channel instanceof SctpAcceptedChannel) ||
((SctpAcceptedChannel) channel).bossThread != currentThread) {
final Selector workerSelector = selector;
if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) {
@ -417,20 +427,20 @@ class NioWorker implements Runnable {
return false;
}
private void write0(NioSocketChannel channel) {
private void write0(SctpChannelImpl channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final SocketChannel ch = channel.socket;
final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
final com.sun.nio.sctp.SctpChannel ch = channel.sctpChannel;
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
for (; ;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
@ -448,8 +458,8 @@ class NioWorker implements Runnable {
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
for (int i = writeSpinCount; i > 0; i--) {
localWrittenBytes = buf.transferTo(ch, payloadProtocolId, 0);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
@ -510,9 +520,9 @@ class NioWorker implements Runnable {
}
}
private void setOpWrite(NioSocketChannel channel) {
private void setOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector;
SelectionKey key = channel.socket.keyFor(selector);
SelectionKey key = channel.sctpChannel.keyFor(selector);
if (key == null) {
return;
}
@ -533,9 +543,9 @@ class NioWorker implements Runnable {
}
}
private void clearOpWrite(NioSocketChannel channel) {
private void clearOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector;
SelectionKey key = channel.socket.keyFor(selector);
SelectionKey key = channel.sctpChannel.keyFor(selector);
if (key == null) {
return;
}
@ -556,12 +566,12 @@ class NioWorker implements Runnable {
}
}
void close(NioSocketChannel channel, ChannelFuture future) {
void close(SctpChannelImpl channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.socket.close();
cancelledKeys ++;
channel.sctpChannel.close();
cancelledKeys++;
if (channel.setClosed()) {
future.setSuccess();
@ -583,7 +593,7 @@ class NioWorker implements Runnable {
}
}
private void cleanUpWriteBuffer(NioSocketChannel channel) {
private void cleanUpWriteBuffer(SctpChannelImpl channel) {
Exception cause = null;
boolean fireExceptionCaught = false;
@ -620,7 +630,7 @@ class NioWorker implements Runnable {
}
}
for (;;) {
for (; ;) {
evt = writeBuffer.poll();
if (evt == null) {
break;
@ -637,14 +647,14 @@ class NioWorker implements Runnable {
}
void setInterestOps(
NioSocketChannel channel, ChannelFuture future, int interestOps) {
SctpChannelImpl channel, ChannelFuture future, int interestOps) {
boolean changed = false;
try {
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
Selector selector = this.selector;
SelectionKey key = channel.socket.keyFor(selector);
SelectionKey key = channel.sctpChannel.keyFor(selector);
if (key == null || selector == null) {
// Not registered to the worker yet.
@ -658,38 +668,38 @@ class NioWorker implements Runnable {
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
switch (CONSTRAINT_LEVEL) {
case 0:
if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
break;
case 1:
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == thread) {
case 0:
if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
} else {
selectorGuard.readLock().lock();
try {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
break;
case 1:
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == thread) {
key.interestOps(interestOps);
changed = true;
} finally {
selectorGuard.readLock().unlock();
} else {
selectorGuard.readLock().lock();
try {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
key.interestOps(interestOps);
changed = true;
} finally {
selectorGuard.readLock().unlock();
}
}
}
}
break;
default:
throw new Error();
break;
default:
throw new Error();
}
if (changed) {
@ -713,12 +723,12 @@ class NioWorker implements Runnable {
}
private final class RegisterTask implements Runnable {
private final NioSocketChannel channel;
private final SctpChannelImpl channel;
private final ChannelFuture future;
private final boolean server;
RegisterTask(
NioSocketChannel channel, ChannelFuture future, boolean server) {
SctpChannelImpl channel, ChannelFuture future, boolean server) {
this.channel = channel;
this.future = future;
@ -739,11 +749,11 @@ class NioWorker implements Runnable {
try {
if (server) {
channel.socket.configureBlocking(false);
channel.sctpChannel.configureBlocking(false);
}
synchronized (channel.interestOpsLock) {
channel.socket.register(
channel.sctpChannel.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
@ -762,7 +772,7 @@ class NioWorker implements Runnable {
}
if (!server) {
if (!((NioClientSocketChannel) channel).boundManually) {
if (!((SctpClientChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);

View File

@ -25,6 +25,8 @@ import java.nio.channels.Selector;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
final class SelectorUtil {