diff --git a/src/main/java/org/jboss/netty/channel/UpstreamChannelNotificationEvent.java b/src/main/java/org/jboss/netty/channel/UpstreamChannelNotificationEvent.java new file mode 100644 index 0000000000..323fe019b9 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/UpstreamChannelNotificationEvent.java @@ -0,0 +1,65 @@ +package org.jboss.netty.channel; +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +import com.sun.nio.sctp.Notification; + +/** + * + * @author The Netty Project + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + * + */ +public class UpstreamChannelNotificationEvent implements ChannelEvent { + private Channel channel; + private Notification notification; + private Object value; + + public UpstreamChannelNotificationEvent(Channel channel, Notification notification, Object value) { + if (channel == null) { + throw new NullPointerException("channel"); + } + if (notification == null) { + throw new NullPointerException("notification"); + } + if (value == null) { + throw new NullPointerException("value"); + } + + this.channel = channel; + this.notification = notification; + this.value = value; + } + + @Override + public Channel getChannel() { + return channel; + } + + @Override + public ChannelFuture getFuture() { + return Channels.succeededFuture(channel); + } + + public Notification getNotification() { + return notification; + } + + public Object getValue() { + return value; + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultNioSocketChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java similarity index 73% rename from src/main/java/org/jboss/netty/channel/socket/sctp/DefaultNioSocketChannelConfig.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java index 89cd0d5129..16656591fc 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultNioSocketChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java @@ -15,32 +15,29 @@ */ package org.jboss.netty.channel.socket.sctp; -import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.ReceiveBufferSizePredictor; -import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory; -import org.jboss.netty.channel.socket.DefaultSocketChannelConfig; +import com.sun.nio.sctp.SctpChannel; +import org.jboss.netty.buffer.ChannelBufferFactory; +import org.jboss.netty.channel.*; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.internal.ConversionUtil; -import java.net.Socket; import java.util.Map; /** - * The default {@link NioSocketChannelConfig} implementation. + * The default {@link SctpChannelConfig} implementation. * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig - implements NioSocketChannelConfig { +class DefaultSctpChannelConfig implements SctpChannelConfig { private static final InternalLogger logger = - InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class); + InternalLoggerFactory.getInstance(DefaultSctpChannelConfig.class); private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY = new AdaptiveReceiveBufferSizePredictorFactory(); @@ -50,14 +47,16 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig private volatile ReceiveBufferSizePredictor predictor; private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY; private volatile int writeSpinCount = 16; + private SctpChannel socket; + private int payloadProtocolId = 0; - DefaultNioSocketChannelConfig(Socket socket) { - super(socket); + DefaultSctpChannelConfig(SctpChannel socket) { + this.socket = socket; } @Override public void setOptions(Map options) { - super.setOptions(options); + setOptions(options); if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { // Recover the integrity of the configuration with a sensible value. setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); @@ -71,10 +70,6 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig @Override public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; - } - if (key.equals("writeBufferHighWaterMark")) { setWriteBufferHighWaterMark0(ConversionUtil.toInt(value)); } else if (key.equals("writeBufferLowWaterMark")) { @@ -86,11 +81,39 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig } else if (key.equals("receiveBufferSizePredictor")) { setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value); } else { + //TODO: set sctp channel options return false; } return true; } + @Override + public ChannelBufferFactory getBufferFactory() { + return null; + } + + @Override + public void setBufferFactory(ChannelBufferFactory bufferFactory) { + } + + @Override + public ChannelPipelineFactory getPipelineFactory() { + return null; + } + + @Override + public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { + } + + @Override + public int getConnectTimeoutMillis() { + return 0; + } + + @Override + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + } + @Override public int getWriteBufferHighWaterMark() { return writeBufferHighWaterMark; @@ -190,4 +213,76 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig } this.predictorFactory = predictorFactory; } + + @Override + public int getPayloadProtocol() { + return payloadProtocolId; + } + + @Override + public boolean isTcpNoDelay() { + return false; + } + + @Override + public void setTcpNoDelay(boolean tcpNoDelay) { + } + + @Override + public int getSoLinger() { + return 0; + } + + @Override + public void setSoLinger(int soLinger) { + } + + @Override + public int getSendBufferSize() { + return 0; + } + + @Override + public void setSendBufferSize(int sendBufferSize) { + } + + @Override + public int getReceiveBufferSize() { + return 0; + } + + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + } + + @Override + public boolean isKeepAlive() { + return false; + } + + @Override + public void setKeepAlive(boolean keepAlive) { + } + + @Override + public int getTrafficClass() { + return 0; + } + + @Override + public void setTrafficClass(int trafficClass) { + } + + @Override + public boolean isReuseAddress() { + return false; + } + + @Override + public void setReuseAddress(boolean reuseAddress) { + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java similarity index 79% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannelFactory.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java index 0df74577f6..4fa11c8f3b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java @@ -16,8 +16,6 @@ package org.jboss.netty.channel.socket.sctp; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.util.internal.ExecutorUtil; import java.util.concurrent.Executor; @@ -30,26 +28,26 @@ import java.util.concurrent.Executor; * *

How threads work

*

- * There are two types of threads in a {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory}; + * There are two types of threads in a {@link DefaultSctpClientChannelFactory}; * one is boss thread and the other is worker thread. * *

Boss thread

*

- * One {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} has one boss thread. It makes + * One {@link DefaultSctpClientChannelFactory} has one boss thread. It makes * a connection attempt on request. Once a connection attempt succeeds, * the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker - * threads that the {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} manages. + * threads that the {@link DefaultSctpClientChannelFactory} manages. * *

Worker threads

*

- * One {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} can have one or more worker + * One {@link DefaultSctpClientChannelFactory} can have one or more worker * threads. A worker thread performs non-blocking read and write for one or * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * *

Life cycle of threads and graceful shutdown

*

* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified - * when a {@link org.jboss.netty.channel.socket.sctp.NioClientSocketChannelFactory} was created. A boss thread is + * when a {@link DefaultSctpClientChannelFactory} was created. A boss thread is * acquired from the {@code bossExecutor}, and worker threads are acquired from * the {@code workerExecutor}. Therefore, you should make sure the specified * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. @@ -73,20 +71,21 @@ import java.util.concurrent.Executor; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * * @apiviz.landmark */ -public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { +public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory { private final Executor bossExecutor; private final Executor workerExecutor; - private final NioClientSocketPipelineSink sink; + private final SctpClientPipelineSink sink; /** * Creates a new instance. Calling this constructor is same with calling - * {@link #NioClientSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * + * {@link #DefaultSctpClientChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * @@ -95,7 +94,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public NioClientSocketChannelFactory( + public DefaultSctpClientChannelFactory( Executor bossExecutor, Executor workerExecutor) { this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } @@ -110,7 +109,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory * @param workerCount * the maximum number of I/O worker threads */ - public NioClientSocketChannelFactory( + public DefaultSctpClientChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { if (bossExecutor == null) { @@ -127,12 +126,12 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory this.bossExecutor = bossExecutor; this.workerExecutor = workerExecutor; - sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount); + sink = new SctpClientPipelineSink(bossExecutor, workerExecutor, workerCount); } @Override - public SocketChannel newChannel(ChannelPipeline pipeline) { - return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker()); + public SctpChannel newChannel(ChannelPipeline pipeline) { + return new SctpClientChannel(this, pipeline, sink, sink.nextWorker()); } @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java new file mode 100644 index 0000000000..1ede019395 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java @@ -0,0 +1,114 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import com.sun.nio.sctp.SctpStandardSocketOption; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.DefaultServerChannelConfig; +import org.jboss.netty.channel.socket.ServerSocketChannelConfig; +import org.jboss.netty.util.internal.ConversionUtil; + +import java.io.IOException; + +/** + * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. + * + * @author The Netty Project + * @author Trustin Lee + + * + * @version $Rev$, $Date$ + */ +public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig + implements SctpServerChannelConfig { + + private final com.sun.nio.sctp.SctpServerChannel serverChannel; + private volatile int backlog; + + /** + * Creates a new instance. + */ + public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) { + if (serverChannel == null) { + throw new NullPointerException("serverChannel"); + } + this.serverChannel = serverChannel; + } + + @Override + public boolean setOption(String key, Object value) { + if (super.setOption(key, value)) { + return true; + } + + if (key.equals("receiveBufferSize")) { + setReceiveBufferSize(ConversionUtil.toInt(value)); + } else if (key.equals("reuseAddress")) { + setReuseAddress(ConversionUtil.toBoolean(value)); + } else if (key.equals("backlog")) { + setBacklog(ConversionUtil.toInt(value)); + } else { + return false; + } + return true; + } + + @Override + public boolean isReuseAddress() { + return false; + } + + @Override + public void setReuseAddress(boolean reuseAddress) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public int getReceiveBufferSize() { + try { + return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + try { + serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public void setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java similarity index 83% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannelFactory.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java index c361ef0420..661cfcfdc7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java @@ -17,8 +17,6 @@ package org.jboss.netty.channel.socket.sctp; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; -import org.jboss.netty.channel.socket.ServerSocketChannel; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; import org.jboss.netty.util.internal.ExecutorUtil; import java.util.concurrent.Executor; @@ -31,7 +29,7 @@ import java.util.concurrent.Executor; * *

How threads work

*

- * There are two types of threads in a {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory}; + * There are two types of threads in a {@link DefaultSctpServerChannelFactory}; * one is boss thread and the other is worker thread. * *

Boss threads

@@ -41,18 +39,18 @@ import java.util.concurrent.Executor; * have two boss threads. A boss thread accepts incoming connections until * the port is unbound. Once a connection is accepted successfully, the boss * thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker - * threads that the {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} manages. + * threads that the {@link DefaultSctpServerChannelFactory} manages. * *

Worker threads

*

- * One {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} can have one or more worker + * One {@link DefaultSctpServerChannelFactory} can have one or more worker * threads. A worker thread performs non-blocking read and write for one or * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * *

Life cycle of threads and graceful shutdown

*

* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified - * when a {@link org.jboss.netty.channel.socket.sctp.NioServerSocketChannelFactory} was created. Boss threads are + * when a {@link DefaultSctpServerChannelFactory} was created. Boss threads are * acquired from the {@code bossExecutor}, and worker threads are acquired from * the {@code workerExecutor}. Therefore, you should make sure the specified * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. @@ -77,12 +75,13 @@ import java.util.concurrent.Executor; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * * @apiviz.landmark */ -public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { +public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory { final Executor bossExecutor; private final Executor workerExecutor; @@ -90,7 +89,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory /** * Creates a new instance. Calling this constructor is same with calling - * {@link #NioServerSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * + * {@link #DefaultSctpServerChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * @@ -99,7 +98,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public NioServerSocketChannelFactory( + public DefaultSctpServerChannelFactory( Executor bossExecutor, Executor workerExecutor) { this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } @@ -114,7 +113,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory * @param workerCount * the maximum number of I/O worker threads */ - public NioServerSocketChannelFactory( + public DefaultSctpServerChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { if (bossExecutor == null) { @@ -130,12 +129,12 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory } this.bossExecutor = bossExecutor; this.workerExecutor = workerExecutor; - sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); + sink = new SctpServerPipelineSink(workerExecutor, workerCount); } @Override - public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return new NioServerSocketChannel(this, pipeline, sink); + public SctpServerChannel newChannel(ChannelPipeline pipeline) { + return new SctpServerChannelImpl(this, pipeline, sink); } @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NotificationHandler.java b/src/main/java/org/jboss/netty/channel/socket/sctp/NotificationHandler.java new file mode 100644 index 0000000000..6a81fa32fd --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/NotificationHandler.java @@ -0,0 +1,78 @@ +package org.jboss.netty.channel.socket.sctp; +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +import com.sun.nio.sctp.*; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.UpstreamChannelNotificationEvent; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; + +/** + * + * @author The Netty Project + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + * + */ + +public class NotificationHandler extends AbstractNotificationHandler { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(NotificationHandler.class); + + private final SctpChannelImpl sctpChannel; + private final SctpWorker sctpWorker; + + public NotificationHandler(SctpChannelImpl sctpChannel, SctpWorker sctpWorker) { + this.sctpChannel = sctpChannel; + this.sctpWorker = sctpWorker; + } + + @Override + public HandlerResult handleNotification(AssociationChangeNotification notification, Object o) { + fireNotificationReceived(notification, o); + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(Notification notification, Object o) { + fireNotificationReceived(notification, o); + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(PeerAddressChangeNotification notification, Object o) { + fireNotificationReceived(notification, o); + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(SendFailedNotification notification, Object o) { + fireNotificationReceived(notification, o); + return HandlerResult.CONTINUE; + } + + @Override + public HandlerResult handleNotification(ShutdownNotification notification, Object o) { + sctpWorker.close(sctpChannel, Channels.succeededFuture(sctpChannel)); + return HandlerResult.RETURN; + } + + private void fireNotificationReceived(Notification notification, Object o) { + sctpChannel.getPipeline().sendUpstream(new UpstreamChannelNotificationEvent(sctpChannel, notification, o)); + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioAcceptedSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpAcceptedChannel.java similarity index 85% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioAcceptedSocketChannel.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpAcceptedChannel.java index 07ca483666..4f3a6fffed 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioAcceptedSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpAcceptedChannel.java @@ -15,31 +15,31 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; -import java.nio.channels.SocketChannel; - import static org.jboss.netty.channel.Channels.*; /** * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -final class NioAcceptedSocketChannel extends org.jboss.netty.channel.socket.sctp.NioSocketChannel { +final class SctpAcceptedChannel extends SctpChannelImpl { final Thread bossThread; - NioAcceptedSocketChannel( + SctpAcceptedChannel( ChannelFactory factory, ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker, Thread bossThread) { + SctpChannel socket, SctpWorker worker, Thread bossThread) { super(parent, factory, pipeline, sink, socket, worker); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java new file mode 100644 index 0000000000..6d34a0ced8 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java @@ -0,0 +1,41 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.channel.socket.SocketChannelConfig; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Set; + +/** + * @author The Netty Project + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + * + */ +//TODO: support set of loacal, remote addresses. +public interface SctpChannel extends Channel{ + @Override + InetSocketAddress getLocalAddress(); + @Override + SctpChannelConfig getConfig(); + @Override + InetSocketAddress getRemoteAddress(); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java similarity index 94% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannelConfig.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java index 2921bd5d24..4ff21da255 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java @@ -25,7 +25,7 @@ import org.jboss.netty.channel.socket.SocketChannelConfig; *

Available options

* * In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig} and - * {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link org.jboss.netty.channel.socket.sctp.NioSocketChannelConfig} allows the + * {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link SctpChannelConfig} allows the * following options in the option map: * * @@ -46,10 +46,11 @@ import org.jboss.netty.channel.socket.SocketChannelConfig; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ */ -public interface NioSocketChannelConfig extends SocketChannelConfig { +public interface SctpChannelConfig extends SocketChannelConfig { /** * Returns the high water mark of the write buffer. If the number of bytes @@ -138,4 +139,11 @@ public interface NioSocketChannelConfig extends SocketChannelConfig { */ void setReceiveBufferSizePredictorFactory( ReceiveBufferSizePredictorFactory predictorFactory); + + /** + * Get the sctp payload protocol id + * A value indicating the type of payload protocol data being transmitted. This value is passed as opaque data by SCTP. + * @return payload protocol id + */ + int getPayloadProtocol(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java similarity index 87% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannel.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java index 4c5d03e80d..ae8a97ab0c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java @@ -15,15 +15,15 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.sctp.SocketSendBufferPool.SendBuffer; +import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer; import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadLocalBoolean; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.channels.SocketChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -33,12 +33,13 @@ import static org.jboss.netty.channel.Channels.fireChannelInterestChanged; /** * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class NioSocketChannel extends AbstractChannel - implements org.jboss.netty.channel.socket.SocketChannel { +class SctpChannelImpl extends AbstractChannel + implements org.jboss.netty.channel.socket.sctp.SctpChannel { private static final int ST_OPEN = 0; private static final int ST_BOUND = 1; @@ -46,9 +47,9 @@ class NioSocketChannel extends AbstractChannel private static final int ST_CLOSED = -1; volatile int state = ST_OPEN; - final SocketChannel socket; - final NioWorker worker; - private final NioSocketChannelConfig config; + final SctpChannel sctpChannel; + final SctpWorker worker; + private final SctpChannelConfig config; private volatile InetSocketAddress localAddress; private volatile InetSocketAddress remoteAddress; @@ -67,18 +68,16 @@ class NioSocketChannel extends AbstractChannel MessageEvent currentWriteEvent; SendBuffer currentWriteBuffer; - public NioSocketChannel( + public SctpChannelImpl( Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, - SocketChannel socket, NioWorker worker) { + SctpChannel sctpChannel, SctpWorker worker) { super(parent, factory, pipeline, sink); - this.socket = socket; + this.sctpChannel = sctpChannel; this.worker = worker; - config = new DefaultNioSocketChannelConfig(socket.socket()); + config = new DefaultSctpChannelConfig(sctpChannel); - // TODO Move the state variable to AbstractChannel so that we don't need - // to add many listeners. getCloseFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -88,7 +87,7 @@ class NioSocketChannel extends AbstractChannel } @Override - public NioSocketChannelConfig getConfig() { + public SctpChannelConfig getConfig() { return config; } @@ -97,8 +96,9 @@ class NioSocketChannel extends AbstractChannel InetSocketAddress localAddress = this.localAddress; if (localAddress == null) { try { + //TODO: fix this this.localAddress = localAddress = - (InetSocketAddress) socket.socket().getLocalSocketAddress(); + (InetSocketAddress) sctpChannel.getAllLocalAddresses().iterator().next(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; @@ -112,8 +112,9 @@ class NioSocketChannel extends AbstractChannel InetSocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { + //TODO: fix this this.remoteAddress = remoteAddress = - (InetSocketAddress) socket.socket().getRemoteSocketAddress(); + (InetSocketAddress) sctpChannel.getRemoteAddresses().iterator().next(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; @@ -225,7 +226,7 @@ class NioSocketChannel extends AbstractChannel highWaterMarkCounter.incrementAndGet(); if (!notifying.get()) { notifying.set(Boolean.TRUE); - fireChannelInterestChanged(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this); + fireChannelInterestChanged(SctpChannelImpl.this); notifying.set(Boolean.FALSE); } } @@ -246,7 +247,7 @@ class NioSocketChannel extends AbstractChannel highWaterMarkCounter.decrementAndGet(); if (isConnected() && !notifying.get()) { notifying.set(Boolean.TRUE); - fireChannelInterestChanged(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this); + fireChannelInterestChanged(SctpChannelImpl.this); notifying.set(Boolean.FALSE); } } @@ -273,7 +274,7 @@ class NioSocketChannel extends AbstractChannel @Override public void run() { writeTaskInTaskQueue.set(false); - worker.writeFromTaskLoop(org.jboss.netty.channel.socket.sctp.NioSocketChannel.this); + worker.writeFromTaskLoop(SctpChannelImpl.this); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannel.java similarity index 79% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannel.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannel.java index 40d1b636b3..4f3a0cd6f3 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannel.java @@ -15,12 +15,12 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.channel.*; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import java.io.IOException; -import java.nio.channels.SocketChannel; import static org.jboss.netty.channel.Channels.fireChannelOpen; @@ -28,33 +28,34 @@ import static org.jboss.netty.channel.Channels.fireChannelOpen; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -final class NioClientSocketChannel extends NioSocketChannel { +final class SctpClientChannel extends SctpChannelImpl { private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioClientSocketChannel.class); + InternalLoggerFactory.getInstance(SctpClientChannel.class); - private static SocketChannel newSocket() { - SocketChannel socket; + private static SctpChannel newSocket() { + SctpChannel underlayingChannel; try { - socket = SocketChannel.open(); + underlayingChannel = SctpChannel.open(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } boolean success = false; try { - socket.configureBlocking(false); + underlayingChannel.configureBlocking(false); success = true; } catch (IOException e) { throw new ChannelException("Failed to enter non-blocking mode.", e); } finally { if (!success) { try { - socket.close(); + underlayingChannel.close(); } catch (IOException e) { logger.warn( "Failed to close a partially initialized socket.", @@ -63,7 +64,7 @@ final class NioClientSocketChannel extends NioSocketChannel { } } - return socket; + return underlayingChannel; } volatile ChannelFuture connectFuture; @@ -72,9 +73,9 @@ final class NioClientSocketChannel extends NioSocketChannel { // Does not need to be volatile as it's accessed by only one thread. long connectDeadlineNanos; - NioClientSocketChannel( + SctpClientChannel( ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink, NioWorker worker) { + ChannelSink sink, SctpWorker worker) { super(null, factory, pipeline, sink, newSocket(), worker); fireChannelOpen(this); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java new file mode 100644 index 0000000000..ca66de644c --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.SocketChannel; + +/** + * A {@link org.jboss.netty.channel.ChannelFactory} which creates a client-side {@link org.jboss.netty.channel.socket.SocketChannel}. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev$, $Date$ + * + * @apiviz.has org.jboss.netty.channel.socket.SocketChannel oneway - - creates + */ +public interface SctpClientChannelFactory extends ChannelFactory { + @Override + SctpChannel newChannel(ChannelPipeline pipeline); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java similarity index 92% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketPipelineSink.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java index 2808f80571..18afaa0b51 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java @@ -41,26 +41,27 @@ import static org.jboss.netty.channel.Channels.*; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class NioClientSocketPipelineSink extends AbstractChannelSink { +class SctpClientPipelineSink extends AbstractChannelSink { static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); + InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); final Executor bossExecutor; private final Boss boss = new Boss(); - private final NioWorker[] workers; + private final SctpWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); - NioClientSocketPipelineSink( + SctpClientPipelineSink( Executor bossExecutor, Executor workerExecutor, int workerCount) { this.bossExecutor = bossExecutor; - workers = new NioWorker[workerCount]; + workers = new SctpWorker[workerCount]; for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); + workers[i] = new SctpWorker(workerExecutor); } } @@ -69,8 +70,8 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; - NioClientSocketChannel channel = - (NioClientSocketChannel) event.getChannel(); + SctpClientChannel channel = + (SctpClientChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); @@ -101,7 +102,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.getChannel(); + SctpChannelImpl channel = (SctpChannelImpl) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; channel.worker.writeFromUserCode(channel); @@ -109,10 +110,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } private void bind( - NioClientSocketChannel channel, ChannelFuture future, + SctpClientChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.socket.socket().bind(localAddress); + channel.sctpChannel.bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -124,10 +125,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } private void connect( - final NioClientSocketChannel channel, final ChannelFuture cf, + final SctpClientChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.socket.connect(remoteAddress)) { + if (channel.sctpChannel.connect(remoteAddress)) { channel.worker.register(channel, cf); } else { channel.getCloseFuture().addListener(new ChannelFutureListener() { @@ -151,7 +152,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } } - NioWorker nextWorker() { + SctpWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; } @@ -168,7 +169,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { super(); } - void register(NioClientSocketChannel channel) { + void register(SctpClientChannel channel) { Runnable registerTask = new RegisterTask(this, channel); Selector selector; @@ -352,7 +353,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { continue; } - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + SctpClientChannel ch = (SctpClientChannel) k.attachment(); if (ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { @@ -368,9 +369,9 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } private void connect(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + SctpClientChannel ch = (SctpClientChannel) k.attachment(); try { - if (ch.socket.finishConnect()) { + if (ch.sctpChannel.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } @@ -383,16 +384,16 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } private void close(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + SctpClientChannel ch = (SctpClientChannel) k.attachment(); ch.worker.close(ch, succeededFuture(ch)); } } private static final class RegisterTask implements Runnable { private final Boss boss; - private final NioClientSocketChannel channel; + private final SctpClientChannel channel; - RegisterTask(Boss boss, NioClientSocketChannel channel) { + RegisterTask(Boss boss, SctpClientChannel channel) { this.boss = boss; this.channel = channel; } @@ -400,7 +401,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { @Override public void run() { try { - channel.socket.register( + channel.sctpChannel.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioProviderMetadata.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpProviderMetadata.java similarity index 98% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioProviderMetadata.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpProviderMetadata.java index 4738477946..3bf63deea6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioProviderMetadata.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpProviderMetadata.java @@ -39,13 +39,14 @@ import java.util.regex.Pattern; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class NioProviderMetadata { +class SctpProviderMetadata { static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioProviderMetadata.class); + InternalLoggerFactory.getInstance(SctpProviderMetadata.class); private static final String CONSTRAINT_LEVEL_PROPERTY = "org.jboss.netty.channel.socket.sctp.constraintLevel"; @@ -429,7 +430,7 @@ class NioProviderMetadata { new ConstraintLevelAutodetector().autodetect()); } - private NioProviderMetadata() { + private SctpProviderMetadata() { // Unused } } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SocketReceiveBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpReceiveBufferPool.java similarity index 96% rename from src/main/java/org/jboss/netty/channel/socket/sctp/SocketReceiveBufferPool.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpReceiveBufferPool.java index b35255382c..c5bcb208a6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SocketReceiveBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpReceiveBufferPool.java @@ -21,16 +21,18 @@ import java.nio.ByteBuffer; /** * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan + * * @version $Rev$, $Date$ */ -final class SocketReceiveBufferPool { +final class SctpReceiveBufferPool { private static final int POOL_SIZE = 8; @SuppressWarnings("unchecked") private final SoftReference[] pool = new SoftReference[POOL_SIZE]; - SocketReceiveBufferPool() { + SctpReceiveBufferPool() { super(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SocketSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java similarity index 73% rename from src/main/java/org/jboss/netty/channel/socket/sctp/SocketSendBufferPool.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java index 0269190da4..16520856c4 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SocketSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java @@ -15,22 +15,23 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.MessageInfo; +import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.FileRegion; import java.io.IOException; import java.lang.ref.SoftReference; -import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.WritableByteChannel; /** * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan + * * @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $ */ -final class SocketSendBufferPool { +final class SctpSendBufferPool { private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer(); @@ -41,7 +42,7 @@ final class SocketSendBufferPool { PreallocationRef poolHead = null; Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE); - SocketSendBufferPool() { + SctpSendBufferPool() { super(); } @@ -56,13 +57,6 @@ final class SocketSendBufferPool { "unsupported message type: " + message.getClass()); } - private final SendBuffer acquire(FileRegion src) { - if (src.getCount() == 0) { - return EMPTY_BUFFER; - } - return new FileSendBuffer(src); - } - private final SendBuffer acquire(ChannelBuffer src) { final int size = src.readableBytes(); if (size == 0) { @@ -86,7 +80,7 @@ final class SocketSendBufferPool { ByteBuffer slice = buffer.duplicate(); buffer.position(align(nextPos)); slice.limit(nextPos); - current.refCnt ++; + current.refCnt++; dst = new PooledSendBuffer(current, slice); } else if (size > remaining) { this.current = current = getPreallocation(); @@ -94,10 +88,10 @@ final class SocketSendBufferPool { ByteBuffer slice = buffer.duplicate(); buffer.position(align(size)); slice.limit(size); - current.refCnt ++; + current.refCnt++; dst = new PooledSendBuffer(current, slice); } else { // size == remaining - current.refCnt ++; + current.refCnt++; this.current = getPreallocation0(); dst = new PooledSendBuffer(current, current.buffer); } @@ -142,7 +136,7 @@ final class SocketSendBufferPool { int q = pos >>> ALIGN_SHIFT; int r = pos & ALIGN_MASK; if (r != 0) { - q ++; + q++; } return q << ALIGN_SHIFT; } @@ -167,11 +161,12 @@ final class SocketSendBufferPool { interface SendBuffer { boolean finished(); + long writtenBytes(); + long totalBytes(); - long transferTo(WritableByteChannel ch) throws IOException; - long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException; + long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException; void release(); } @@ -202,13 +197,12 @@ final class SocketSendBufferPool { } @Override - public final long transferTo(WritableByteChannel ch) throws IOException { - return ch.write(buffer); - } - - @Override - public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { - return ch.send(buffer, raddr); + public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { + final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); + messageInfo.payloadProtocolID(protocolId); + messageInfo.streamNumber(streamNo); + ch.send(buffer, messageInfo); + return writtenBytes(); } @Override @@ -245,19 +239,18 @@ final class SocketSendBufferPool { } @Override - public long transferTo(WritableByteChannel ch) throws IOException { - return ch.write(buffer); - } - - @Override - public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { - return ch.send(buffer, raddr); + public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { + final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); + messageInfo.payloadProtocolID(protocolId); + messageInfo.streamNumber(streamNo); + ch.send(buffer, messageInfo); + return writtenBytes(); } @Override public void release() { final Preallocation parent = this.parent; - if (-- parent.refCnt == 0) { + if (--parent.refCnt == 0) { parent.buffer.clear(); if (parent != current) { poolHead = new PreallocationRef(parent, poolHead); @@ -266,50 +259,6 @@ final class SocketSendBufferPool { } } - final class FileSendBuffer implements SendBuffer { - - private final FileRegion file; - private long writtenBytes; - - - FileSendBuffer(FileRegion file) { - this.file = file; - } - - @Override - public boolean finished() { - return writtenBytes >= file.getCount(); - } - - @Override - public long writtenBytes() { - return writtenBytes; - } - - @Override - public long totalBytes() { - return file.getCount(); - } - - @Override - public long transferTo(WritableByteChannel ch) throws IOException { - long localWrittenBytes = file.transferTo(ch, writtenBytes); - writtenBytes += localWrittenBytes; - return localWrittenBytes; - } - - @Override - public long transferTo(DatagramChannel ch, SocketAddress raddr) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void release() { - // Unpooled. - } - } - static final class EmptySendBuffer implements SendBuffer { EmptySendBuffer() { @@ -332,12 +281,7 @@ final class SocketSendBufferPool { } @Override - public final long transferTo(WritableByteChannel ch) throws IOException { - return 0; - } - - @Override - public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException { + public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { return 0; } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java new file mode 100644 index 0000000000..886f27d6d9 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java @@ -0,0 +1,39 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.channel.ServerChannel; +import org.jboss.netty.channel.socket.ServerSocketChannelConfig; + +import java.net.InetSocketAddress; + +/** + * A TCP/IP {@link org.jboss.netty.channel.ServerChannel} which accepts incoming TCP/IP connections. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev$, $Date$ + * + */ +public interface SctpServerChannel extends ServerChannel { + @Override + SctpServerChannelConfig getConfig(); + @Override + InetSocketAddress getLocalAddress(); + @Override + InetSocketAddress getRemoteAddress(); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java new file mode 100644 index 0000000000..1865b2dcdd --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java @@ -0,0 +1,85 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.channel.ChannelConfig; + +/** + * A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.ServerSocketChannel}. + * + *

Available options

+ * + * In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig}, + * {@link org.jboss.netty.channel.socket.sctp.SctpServerChannelConfig} allows the following options in the + * option map: + * + *
+ * + * + * + * + * + * + * + * + * + *
NameAssociated setter method
{@code "backlog"}{@link #setBacklog(int)}
{@code "reuseAddress"}{@link #setReuseAddress(boolean)}
{@code "receiveBufferSize"}{@link #setReceiveBufferSize(int)}
+ * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev$, $Date$ + */ +public interface SctpServerChannelConfig extends ChannelConfig { + + /** + * Gets the backlog value to specify when the channel binds to a local + * address. + */ + int getBacklog(); + + /** + * Sets the backlog value to specify when the channel binds to a local + * address. + */ + void setBacklog(int backlog); + + /** + * Gets the {@code SO_REUSEADDR} option. + */ + boolean isReuseAddress(); + + /** + * Sets the {@code SO_REUSEADDR} option. + */ + void setReuseAddress(boolean reuseAddress); + + /** + * Gets the {@code SO_RCVBUF} option. + */ + int getReceiveBufferSize(); + + /** + * Sets the {@code SO_RCVBUF} option. + */ + void setReceiveBufferSize(int receiveBufferSize); + + /** + * Sets the performance preferences as specified in + * {@link java.net.ServerSocket#setPerformancePreferences(int, int, int)}. + */ + void setPerformancePreferences(int connectionTime, int latency, int bandwidth); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java new file mode 100644 index 0000000000..00880f1a73 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ServerChannelFactory; +import org.jboss.netty.channel.socket.ServerSocketChannel; + +/** + * A {@link org.jboss.netty.channel.ChannelFactory} which creates a {@link org.jboss.netty.channel.socket.ServerSocketChannel}. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev$, $Date$ + * + */ +public interface SctpServerChannelFactory extends ServerChannelFactory { + @Override + SctpServerChannel newChannel(ChannelPipeline pipeline); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java similarity index 75% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannel.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java index a7a4dcbcb2..45162cac46 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java @@ -16,7 +16,6 @@ package org.jboss.netty.channel.socket.sctp; import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.DefaultServerSocketChannelConfig; import org.jboss.netty.channel.socket.ServerSocketChannelConfig; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; @@ -24,7 +23,6 @@ import org.jboss.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -34,22 +32,25 @@ import static org.jboss.netty.channel.Channels.fireChannelOpen; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class NioServerSocketChannel extends AbstractServerChannel - implements org.jboss.netty.channel.socket.ServerSocketChannel { +class SctpServerChannelImpl extends AbstractServerChannel + implements SctpServerChannel { private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerSocketChannel.class); + InternalLoggerFactory.getInstance(SctpServerChannelImpl.class); - final ServerSocketChannel socket; + final com.sun.nio.sctp.SctpServerChannel socket; final Lock shutdownLock = new ReentrantLock(); volatile Selector selector; - private final ServerSocketChannelConfig config; + private final SctpServerChannelConfig config; - NioServerSocketChannel( + private volatile boolean bound; + + SctpServerChannelImpl( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) { @@ -57,7 +58,7 @@ class NioServerSocketChannel extends AbstractServerChannel super(factory, pipeline, sink); try { - socket = ServerSocketChannel.open(); + socket = com.sun.nio.sctp.SctpServerChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); @@ -76,19 +77,23 @@ class NioServerSocketChannel extends AbstractServerChannel throw new ChannelException("Failed to enter non-blocking mode.", e); } - config = new DefaultServerSocketChannelConfig(socket.socket()); + config = new DefaultSctpServerChannelConfig(socket); fireChannelOpen(this); } @Override - public ServerSocketChannelConfig getConfig() { + public SctpServerChannelConfig getConfig() { return config; } @Override public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) socket.socket().getLocalSocketAddress(); + try { + return (InetSocketAddress) socket.getAllLocalAddresses().iterator().next(); + } catch (IOException e) { + return null; + } } @Override @@ -98,7 +103,11 @@ class NioServerSocketChannel extends AbstractServerChannel @Override public boolean isBound() { - return isOpen() && socket.socket().isBound(); + return isOpen() && bound; + } + + public void setBound() { + bound = true; } @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java similarity index 84% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketPipelineSink.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java index 2beb910e7b..2fa014f4a8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java @@ -15,6 +15,7 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.channel.*; import org.jboss.netty.channel.Channel; import org.jboss.netty.logging.InternalLogger; @@ -34,22 +35,23 @@ import static org.jboss.netty.channel.Channels.*; * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ * */ -class NioServerSocketPipelineSink extends AbstractChannelSink { +class SctpServerPipelineSink extends AbstractChannelSink { static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); + InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); - private final NioWorker[] workers; + private final SctpWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); - NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) { - workers = new NioWorker[workerCount]; + SctpServerPipelineSink(Executor workerExecutor, int workerCount) { + workers = new SctpWorker[workerCount]; for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); + workers[i] = new SctpWorker(workerExecutor); } } @@ -57,9 +59,9 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { Channel channel = e.getChannel(); - if (channel instanceof NioServerSocketChannel) { + if (channel instanceof SctpServerChannelImpl) { handleServerSocket(e); - } else if (channel instanceof NioSocketChannel) { + } else if (channel instanceof SctpChannelImpl) { handleAcceptedSocket(e); } } @@ -70,8 +72,8 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } ChannelStateEvent event = (ChannelStateEvent) e; - NioServerSocketChannel channel = - (NioServerSocketChannel) event.getChannel(); + SctpServerChannelImpl channel = + (SctpServerChannelImpl) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); @@ -95,7 +97,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { private void handleAcceptedSocket(ChannelEvent e) { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.getChannel(); + SctpChannelImpl channel = (SctpChannelImpl) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); @@ -113,12 +115,12 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.worker.setInterestOps(channel, future, (Integer) value); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; - NioSocketChannel channel = (NioSocketChannel) event.getChannel(); + SctpChannelImpl channel = (SctpChannelImpl) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; channel.worker.writeFromUserCode(channel); @@ -126,20 +128,20 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } private void bind( - NioServerSocketChannel channel, ChannelFuture future, + SctpServerChannelImpl channel, ChannelFuture future, SocketAddress localAddress) { boolean bound = false; boolean bossStarted = false; try { - channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); + channel.socket.bind(localAddress, channel.getConfig().getBacklog()); bound = true; - + channel.setBound(); future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); Executor bossExecutor = - ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; + ((DefaultSctpServerChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start(bossExecutor, new Boss(channel)); bossStarted = true; } catch (Throwable t) { @@ -152,7 +154,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } } - private void close(NioServerSocketChannel channel, ChannelFuture future) { + private void close(SctpServerChannelImpl channel, ChannelFuture future) { boolean bound = channel.isBound(); try { if (channel.socket.isOpen()) { @@ -186,16 +188,16 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } } - NioWorker nextWorker() { + SctpWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; } private final class Boss implements Runnable { private final Selector selector; - private final NioServerSocketChannel channel; + private final SctpServerChannelImpl channel; - Boss(NioServerSocketChannel channel) throws IOException { + Boss(SctpServerChannelImpl channel) throws IOException { this.channel = channel; selector = Selector.open(); @@ -225,7 +227,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { selector.selectedKeys().clear(); } - SocketChannel acceptedSocket = channel.socket.accept(); + SctpChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } @@ -255,14 +257,14 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } } - private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { + private void registerAcceptedChannel(SctpChannel acceptedSocket, Thread currentThread) { try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); - NioWorker worker = nextWorker(); - worker.register(new NioAcceptedSocketChannel( + SctpWorker worker = nextWorker(); + worker.register(new SctpAcceptedChannel( channel.getFactory(), pipeline, channel, - org.jboss.netty.channel.socket.sctp.NioServerSocketPipelineSink.this, acceptedSocket, + SctpServerPipelineSink.this, acceptedSocket, worker, currentThread), null); } catch (Exception e) { logger.warn( diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java similarity index 82% rename from src/main/java/org/jboss/netty/channel/socket/sctp/NioWorker.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java index a857574b93..5d680ae2e9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java @@ -15,11 +15,12 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.*; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.sctp.SocketSendBufferPool.SendBuffer; +import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.internal.DeadLockProofWorker; @@ -41,19 +42,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.jboss.netty.channel.Channels.*; /** - * * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan * * @version $Rev$, $Date$ - * */ -class NioWorker implements Runnable { +class SctpWorker implements Runnable { private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioWorker.class); + InternalLoggerFactory.getInstance(SctpWorker.class); - private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; + private static final int CONSTRAINT_LEVEL = SctpProviderMetadata.CONSTRAINT_LEVEL; static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. @@ -68,17 +68,22 @@ class NioWorker implements Runnable { private final Queue writeTaskQueue = new LinkedTransferQueue(); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); - private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); + private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool(); + private int payloadProtocolId = 0;// un-known sctp payload protocol id - NioWorker(Executor executor) { + private NotificationHandler notificationHandler; + + SctpWorker(Executor executor) { this.executor = executor; } - void register(NioSocketChannel channel, ChannelFuture future) { + void register(SctpChannelImpl channel, ChannelFuture future) { - boolean server = !(channel instanceof NioClientSocketChannel); + boolean server = !(channel instanceof SctpClientChannel); Runnable registerTask = new RegisterTask(channel, future, server); + payloadProtocolId = channel.getConfig().getPayloadProtocol(); + notificationHandler = new NotificationHandler(channel, this); Selector selector; synchronized (startStopLock) { @@ -131,13 +136,13 @@ class NioWorker implements Runnable { boolean shutdown = false; Selector selector = this.selector; - for (;;) { + for (; ;) { wakenUp.set(false); if (CONSTRAINT_LEVEL != 0) { selectorGuard.writeLock().lock(); - // This empty synchronization block prevents the selector - // from acquiring its lock. + // This empty synchronization block prevents the selector + // from acquiring its lock. selectorGuard.writeLock().unlock(); } @@ -188,7 +193,7 @@ class NioWorker implements Runnable { // concurrent manner. if (selector.keys().isEmpty()) { if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { + executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { synchronized (startStopLock) { if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { @@ -229,7 +234,7 @@ class NioWorker implements Runnable { } private void processRegisterTaskQueue() throws IOException { - for (;;) { + for (; ;) { final Runnable task = registerTaskQueue.poll(); if (task == null) { break; @@ -241,7 +246,7 @@ class NioWorker implements Runnable { } private void processWriteTaskQueue() throws IOException { - for (;;) { + for (; ;) { final Runnable task = writeTaskQueue.poll(); if (task == null) { break; @@ -287,45 +292,50 @@ class NioWorker implements Runnable { } private boolean read(SelectionKey k) { - final SocketChannel ch = (SocketChannel) k.channel(); - final NioSocketChannel channel = (NioSocketChannel) k.attachment(); + final SctpChannelImpl channel = (SctpChannelImpl) k.attachment(); final ReceiveBufferSizePredictor predictor = - channel.getConfig().getReceiveBufferSizePredictor(); + channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); - int ret = 0; - int readBytes = 0; + boolean messageReceived = false; boolean failure = true; ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { - while ((ret = ch.read(bb)) > 0) { - readBytes += ret; - if (!bb.hasRemaining()) { - break; + MessageInfo messageInfo = channel.sctpChannel.receive(bb, this, notificationHandler); + if (messageInfo != null) { + messageReceived = true; + if (messageInfo.isComplete()) { + failure = false; + } else { + logger.error("Received incomplete sctp packet, can not continue! Expected SCTP_EXPLICIT_COMPLETE message"); + failure = true; } + } else { + messageReceived = false; + failure = false; } - failure = false; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. } catch (Throwable t) { fireExceptionCaught(channel, t); } - if (readBytes > 0) { + if (messageReceived) { bb.flip(); final ChannelBufferFactory bufferFactory = - channel.getConfig().getBufferFactory(); - final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); + channel.getConfig().getBufferFactory(); + final int receivedBytes = bb.remaining(); + final ChannelBuffer buffer = bufferFactory.getBuffer(receivedBytes); buffer.setBytes(0, bb); - buffer.writerIndex(readBytes); + buffer.writerIndex(receivedBytes); recvBufferPool.release(bb); // Update the predictor. - predictor.previousReceiveBufferSize(readBytes); + predictor.previousReceiveBufferSize(receivedBytes); // Fire the event. fireMessageReceived(channel, buffer); @@ -333,7 +343,7 @@ class NioWorker implements Runnable { recvBufferPool.release(bb); } - if (ret < 0 || failure) { + if (channel.sctpChannel.isBlocking() && !messageReceived || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; @@ -343,11 +353,11 @@ class NioWorker implements Runnable { } private void close(SelectionKey k) { - NioSocketChannel ch = (NioSocketChannel) k.attachment(); + SctpChannelImpl ch = (SctpChannelImpl) k.attachment(); close(ch, succeededFuture(ch)); } - void writeFromUserCode(final NioSocketChannel channel) { + void writeFromUserCode(final SctpChannelImpl channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; @@ -370,19 +380,19 @@ class NioWorker implements Runnable { write0(channel); } - void writeFromTaskLoop(final NioSocketChannel ch) { + void writeFromTaskLoop(final SctpChannelImpl ch) { if (!ch.writeSuspended) { write0(ch); } } void writeFromSelectorLoop(final SelectionKey k) { - NioSocketChannel ch = (NioSocketChannel) k.attachment(); + SctpChannelImpl ch = (SctpChannelImpl) k.attachment(); ch.writeSuspended = false; write0(ch); } - private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { + private boolean scheduleWriteIfNecessary(final SctpChannelImpl channel) { final Thread currentThread = Thread.currentThread(); final Thread workerThread = thread; if (currentThread != workerThread) { @@ -391,8 +401,8 @@ class NioWorker implements Runnable { assert offered; } - if (!(channel instanceof NioAcceptedSocketChannel) || - ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { + if (!(channel instanceof SctpAcceptedChannel) || + ((SctpAcceptedChannel) channel).bossThread != currentThread) { final Selector workerSelector = selector; if (workerSelector != null) { if (wakenUp.compareAndSet(false, true)) { @@ -417,20 +427,20 @@ class NioWorker implements Runnable { return false; } - private void write0(NioSocketChannel channel) { + private void write0(SctpChannelImpl channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; long writtenBytes = 0; - final SocketSendBufferPool sendBufferPool = this.sendBufferPool; - final SocketChannel ch = channel.socket; + final SctpSendBufferPool sendBufferPool = this.sendBufferPool; + final com.sun.nio.sctp.SctpChannel ch = channel.sctpChannel; final Queue writeBuffer = channel.writeBuffer; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { channel.inWriteNowLoop = true; - for (;;) { + for (; ;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf; if (evt == null) { @@ -448,8 +458,8 @@ class NioWorker implements Runnable { ChannelFuture future = evt.getFuture(); try { long localWrittenBytes = 0; - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); + for (int i = writeSpinCount; i > 0; i--) { + localWrittenBytes = buf.transferTo(ch, payloadProtocolId, 0); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; @@ -510,9 +520,9 @@ class NioWorker implements Runnable { } } - private void setOpWrite(NioSocketChannel channel) { + private void setOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); + SelectionKey key = channel.sctpChannel.keyFor(selector); if (key == null) { return; } @@ -533,9 +543,9 @@ class NioWorker implements Runnable { } } - private void clearOpWrite(NioSocketChannel channel) { + private void clearOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); + SelectionKey key = channel.sctpChannel.keyFor(selector); if (key == null) { return; } @@ -556,12 +566,12 @@ class NioWorker implements Runnable { } } - void close(NioSocketChannel channel, ChannelFuture future) { + void close(SctpChannelImpl channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { - channel.socket.close(); - cancelledKeys ++; + channel.sctpChannel.close(); + cancelledKeys++; if (channel.setClosed()) { future.setSuccess(); @@ -583,7 +593,7 @@ class NioWorker implements Runnable { } } - private void cleanUpWriteBuffer(NioSocketChannel channel) { + private void cleanUpWriteBuffer(SctpChannelImpl channel) { Exception cause = null; boolean fireExceptionCaught = false; @@ -620,7 +630,7 @@ class NioWorker implements Runnable { } } - for (;;) { + for (; ;) { evt = writeBuffer.poll(); if (evt == null) { break; @@ -637,14 +647,14 @@ class NioWorker implements Runnable { } void setInterestOps( - NioSocketChannel channel, ChannelFuture future, int interestOps) { + SctpChannelImpl channel, ChannelFuture future, int interestOps) { boolean changed = false; try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { Selector selector = this.selector; - SelectionKey key = channel.socket.keyFor(selector); + SelectionKey key = channel.sctpChannel.keyFor(selector); if (key == null || selector == null) { // Not registered to the worker yet. @@ -658,38 +668,38 @@ class NioWorker implements Runnable { interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; switch (CONSTRAINT_LEVEL) { - case 0: - if (channel.getRawInterestOps() != interestOps) { - key.interestOps(interestOps); - if (Thread.currentThread() != thread && - wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { + case 0: + if (channel.getRawInterestOps() != interestOps) { key.interestOps(interestOps); + if (Thread.currentThread() != thread && + wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } changed = true; - } else { - selectorGuard.readLock().lock(); - try { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } + } + break; + case 1: + case 2: + if (channel.getRawInterestOps() != interestOps) { + if (Thread.currentThread() == thread) { key.interestOps(interestOps); changed = true; - } finally { - selectorGuard.readLock().unlock(); + } else { + selectorGuard.readLock().lock(); + try { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + key.interestOps(interestOps); + changed = true; + } finally { + selectorGuard.readLock().unlock(); + } } } - } - break; - default: - throw new Error(); + break; + default: + throw new Error(); } if (changed) { @@ -713,12 +723,12 @@ class NioWorker implements Runnable { } private final class RegisterTask implements Runnable { - private final NioSocketChannel channel; + private final SctpChannelImpl channel; private final ChannelFuture future; private final boolean server; RegisterTask( - NioSocketChannel channel, ChannelFuture future, boolean server) { + SctpChannelImpl channel, ChannelFuture future, boolean server) { this.channel = channel; this.future = future; @@ -739,11 +749,11 @@ class NioWorker implements Runnable { try { if (server) { - channel.socket.configureBlocking(false); + channel.sctpChannel.configureBlocking(false); } synchronized (channel.interestOpsLock) { - channel.socket.register( + channel.sctpChannel.register( selector, channel.getRawInterestOps(), channel); } if (future != null) { @@ -762,7 +772,7 @@ class NioWorker implements Runnable { } if (!server) { - if (!((NioClientSocketChannel) channel).boundManually) { + if (!((SctpClientChannel) channel).boundManually) { fireChannelBound(channel, localAddress); } fireChannelConnected(channel, remoteAddress); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java index 1de384a447..e40c36fb42 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java @@ -25,6 +25,8 @@ import java.nio.channels.Selector; /** * @author The Netty Project * @author Trustin Lee + * @author Jestan Nirojan + * * @version $Rev$, $Date$ */ final class SelectorUtil {