diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java deleted file mode 100644 index 1ede019395..0000000000 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.sctp; - -import com.sun.nio.sctp.SctpStandardSocketOption; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.DefaultServerChannelConfig; -import org.jboss.netty.channel.socket.ServerSocketChannelConfig; -import org.jboss.netty.util.internal.ConversionUtil; - -import java.io.IOException; - -/** - * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. - * - * @author The Netty Project - * @author Trustin Lee - - * - * @version $Rev$, $Date$ - */ -public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig - implements SctpServerChannelConfig { - - private final com.sun.nio.sctp.SctpServerChannel serverChannel; - private volatile int backlog; - - /** - * Creates a new instance. - */ - public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) { - if (serverChannel == null) { - throw new NullPointerException("serverChannel"); - } - this.serverChannel = serverChannel; - } - - @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; - } - - if (key.equals("receiveBufferSize")) { - setReceiveBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("reuseAddress")) { - setReuseAddress(ConversionUtil.toBoolean(value)); - } else if (key.equals("backlog")) { - setBacklog(ConversionUtil.toInt(value)); - } else { - return false; - } - return true; - } - - @Override - public boolean isReuseAddress() { - return false; - } - - @Override - public void setReuseAddress(boolean reuseAddress) { - throw new UnsupportedOperationException("Not supported"); - } - - @Override - public int getReceiveBufferSize() { - try { - return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setReceiveBufferSize(int receiveBufferSize) { - try { - serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { - throw new UnsupportedOperationException("Not supported"); - } - - @Override - public int getBacklog() { - return backlog; - } - - @Override - public void setBacklog(int backlog) { - if (backlog < 0) { - throw new IllegalArgumentException("backlog: " + backlog); - } - this.backlog = backlog; - } -} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpSocketChannelConfig.java similarity index 81% rename from src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpSocketChannelConfig.java index 16656591fc..47cb66322a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpSocketChannelConfig.java @@ -17,7 +17,9 @@ package org.jboss.netty.channel.socket.sctp; import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.buffer.ChannelBufferFactory; +import org.jboss.netty.buffer.HeapChannelBufferFactory; import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.internal.ConversionUtil; @@ -25,46 +27,46 @@ import org.jboss.netty.util.internal.ConversionUtil; import java.util.Map; /** - * The default {@link SctpChannelConfig} implementation. + * The default {@link NioSocketChannelConfig} implementation for SCTP. * * @author The Netty Project * @author Trustin Lee * @author Jestan Nirojan - * * @version $Rev$, $Date$ - * */ -class DefaultSctpChannelConfig implements SctpChannelConfig { +class DefaultSctpSocketChannelConfig implements NioSocketChannelConfig { + private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance(); + private volatile int connectTimeoutMillis = 10000; // 10 seconds private static final InternalLogger logger = - InternalLoggerFactory.getInstance(DefaultSctpChannelConfig.class); + InternalLoggerFactory.getInstance(DefaultSctpSocketChannelConfig.class); private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY = - new AdaptiveReceiveBufferSizePredictorFactory(); + new AdaptiveReceiveBufferSizePredictorFactory(); private volatile int writeBufferHighWaterMark = 64 * 1024; - private volatile int writeBufferLowWaterMark = 32 * 1024; + private volatile int writeBufferLowWaterMark = 32 * 1024; private volatile ReceiveBufferSizePredictor predictor; private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY; private volatile int writeSpinCount = 16; private SctpChannel socket; - private int payloadProtocolId = 0; - DefaultSctpChannelConfig(SctpChannel socket) { + DefaultSctpSocketChannelConfig(SctpChannel socket) { this.socket = socket; } @Override public void setOptions(Map options) { - setOptions(options); + //TODO: implement this as in DefaultSocketChannelConfig + //socket.setOption(options); if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) { // Recover the integrity of the configuration with a sensible value. setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1); // Notify the user about misconfiguration. logger.warn( "writeBufferLowWaterMark cannot be greater than " + - "writeBufferHighWaterMark; setting to the half of the " + - "writeBufferHighWaterMark."); + "writeBufferHighWaterMark; setting to the half of the " + + "writeBufferHighWaterMark."); } } @@ -89,11 +91,15 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { @Override public ChannelBufferFactory getBufferFactory() { - return null; + return bufferFactory; } @Override public void setBufferFactory(ChannelBufferFactory bufferFactory) { + if (bufferFactory == null) { + throw new NullPointerException("bufferFactory"); + } + this.bufferFactory = bufferFactory; } @Override @@ -103,15 +109,20 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { @Override public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { + //unused } @Override public int getConnectTimeoutMillis() { - return 0; + return connectTimeoutMillis; } @Override public void setConnectTimeoutMillis(int connectTimeoutMillis) { + if (connectTimeoutMillis < 0) { + throw new IllegalArgumentException("connectTimeoutMillis: " + connectTimeoutMillis); + } + this.connectTimeoutMillis = connectTimeoutMillis; } @Override @@ -124,8 +135,8 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) { throw new IllegalArgumentException( "writeBufferHighWaterMark cannot be less than " + - "writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " + - writeBufferHighWaterMark); + "writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " + + writeBufferHighWaterMark); } setWriteBufferHighWaterMark0(writeBufferHighWaterMark); } @@ -148,8 +159,8 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) { throw new IllegalArgumentException( "writeBufferLowWaterMark cannot be greater than " + - "writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " + - writeBufferLowWaterMark); + "writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " + + writeBufferLowWaterMark); } setWriteBufferLowWaterMark0(writeBufferLowWaterMark); } @@ -185,7 +196,7 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { } catch (Exception e) { throw new ChannelException( "Failed to create a new " + - ReceiveBufferSizePredictor.class.getSimpleName() + '.', + ReceiveBufferSizePredictor.class.getSimpleName() + '.', e); } } @@ -214,10 +225,6 @@ class DefaultSctpChannelConfig implements SctpChannelConfig { this.predictorFactory = predictorFactory; } - @Override - public int getPayloadProtocol() { - return payloadProtocolId; - } @Override public boolean isTcpNoDelay() { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java index 6d34a0ced8..c65078b327 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java @@ -15,9 +15,11 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.Association; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannelConfig; +import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -26,16 +28,21 @@ import java.util.Set; /** * @author The Netty Project * @author Jestan Nirojan - * * @version $Rev$, $Date$ - * */ -//TODO: support set of loacal, remote addresses. -public interface SctpChannel extends Channel{ +public interface SctpChannel extends SocketChannel { @Override InetSocketAddress getLocalAddress(); + + Set getAllLocalAddresses(); + @Override - SctpChannelConfig getConfig(); + NioSocketChannelConfig getConfig(); + @Override InetSocketAddress getRemoteAddress(); + + Set getRemoteAddresses(); + + Association association(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java deleted file mode 100644 index 4ff21da255..0000000000 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelConfig.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.sctp; - -import org.jboss.netty.channel.ReceiveBufferSizePredictor; -import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory; -import org.jboss.netty.channel.socket.SocketChannelConfig; - -/** - * A {@link org.jboss.netty.channel.socket.SocketChannelConfig} for a NIO TCP/IP {@link org.jboss.netty.channel.socket.SocketChannel}. - * - *

Available options

- * - * In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig} and - * {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link SctpChannelConfig} allows the - * following options in the option map: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
NameAssociated setter method
{@code "writeBufferHighWaterMark"}{@link #setWriteBufferHighWaterMark(int)}
{@code "writeBufferLowWaterMark"}{@link #setWriteBufferLowWaterMark(int)}
{@code "writeSpinCount"}{@link #setWriteSpinCount(int)}
{@code "receiveBufferSizePredictor"}{@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}
{@code "receiveBufferSizePredictorFactory"}{@link #setReceiveBufferSizePredictorFactory(org.jboss.netty.channel.ReceiveBufferSizePredictorFactory)}
- * - * @author The Netty Project - * @author Trustin Lee - * @author Jestan Nirojan - * - * @version $Rev$, $Date$ - */ -public interface SctpChannelConfig extends SocketChannelConfig { - - /** - * Returns the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()} - * will start to return {@code false}. - */ - int getWriteBufferHighWaterMark(); - - /** - * Sets the high water mark of the write buffer. If the number of bytes - * queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()} - * will start to return {@code false}. - */ - void setWriteBufferHighWaterMark(int writeBufferHighWaterMark); - - /** - * Returns the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return - * {@code true} again. - */ - int getWriteBufferLowWaterMark(); - - /** - * Sets the low water mark of the write buffer. Once the number of bytes - * queued in the write buffer exceeded the - * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then - * dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return - * {@code true} again. - */ - void setWriteBufferLowWaterMark(int writeBufferLowWaterMark); - - /** - * Returns the maximum loop count for a write operation until - * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - */ - int getWriteSpinCount(); - - /** - * Sets the maximum loop count for a write operation until - * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - * - * @throws IllegalArgumentException - * if the specified value is {@code 0} or less than {@code 0} - */ - void setWriteSpinCount(int writeSpinCount); - - /** - * Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the - * number of readable bytes in the socket receive buffer. The default - * predictor is {@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536). - */ - ReceiveBufferSizePredictor getReceiveBufferSizePredictor(); - - /** - * Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the - * number of readable bytes in the socket receive buffer. The default - * predictor is {@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536). - */ - void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor); - - /** - * Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictorFactory} which creates a new - * {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and - * no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set - * for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)} - * will be called with the new predictor. The default factory is - * {@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536). - */ - ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory(); - - /** - * Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which creates a new - * {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and - * no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set - * for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)} - * will be called with the new predictor. The default factory is - * {@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536). - */ - void setReceiveBufferSizePredictorFactory( - ReceiveBufferSizePredictorFactory predictorFactory); - - /** - * Get the sctp payload protocol id - * A value indicating the type of payload protocol data being transmitted. This value is passed as opaque data by SCTP. - * @return payload protocol id - */ - int getPayloadProtocol(); -} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java index ae8a97ab0c..576feb0653 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java @@ -15,16 +15,18 @@ */ package org.jboss.netty.channel.socket.sctp; +import com.sun.nio.sctp.Association; import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer; import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadLocalBoolean; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Queue; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -47,9 +49,9 @@ class SctpChannelImpl extends AbstractChannel private static final int ST_CLOSED = -1; volatile int state = ST_OPEN; - final SctpChannel sctpChannel; + final SctpChannel socket; final SctpWorker worker; - private final SctpChannelConfig config; + private final NioSocketChannelConfig config; private volatile InetSocketAddress localAddress; private volatile InetSocketAddress remoteAddress; @@ -71,12 +73,12 @@ class SctpChannelImpl extends AbstractChannel public SctpChannelImpl( Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, - SctpChannel sctpChannel, SctpWorker worker) { + SctpChannel socket, SctpWorker worker) { super(parent, factory, pipeline, sink); - this.sctpChannel = sctpChannel; + this.socket = socket; this.worker = worker; - config = new DefaultSctpChannelConfig(sctpChannel); + config = new DefaultSctpSocketChannelConfig(socket); getCloseFuture().addListener(new ChannelFutureListener() { @Override @@ -87,7 +89,7 @@ class SctpChannelImpl extends AbstractChannel } @Override - public SctpChannelConfig getConfig() { + public NioSocketChannelConfig getConfig() { return config; } @@ -96,33 +98,70 @@ class SctpChannelImpl extends AbstractChannel InetSocketAddress localAddress = this.localAddress; if (localAddress == null) { try { - //TODO: fix this - this.localAddress = localAddress = - (InetSocketAddress) sctpChannel.getAllLocalAddresses().iterator().next(); + final Iterator iterator = socket.getAllLocalAddresses().iterator(); + if (iterator.hasNext()) { + this.localAddress = localAddress = (InetSocketAddress) iterator.next(); + } } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. return null; } } return localAddress; } + @Override + public Set getAllLocalAddresses() { + try { + final Set allLocalAddresses = socket.getAllLocalAddresses(); + final Set addresses = new HashSet(allLocalAddresses.size()); + for(SocketAddress socketAddress: allLocalAddresses) { + addresses.add((InetSocketAddress) socketAddress); + } + return addresses; + } catch (Throwable t) { + return Collections.emptySet(); + } + } + @Override public InetSocketAddress getRemoteAddress() { InetSocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { - //TODO: fix this - this.remoteAddress = remoteAddress = - (InetSocketAddress) sctpChannel.getRemoteAddresses().iterator().next(); + final Iterator iterator = socket.getRemoteAddresses().iterator(); + if (iterator.hasNext()) { + this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next(); + } } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. return null; } } return remoteAddress; } + @Override + public Set getRemoteAddresses() { + try { + final Set allLocalAddresses = socket.getRemoteAddresses(); + final Set addresses = new HashSet(allLocalAddresses.size()); + for(SocketAddress socketAddress: allLocalAddresses) { + addresses.add((InetSocketAddress) socketAddress); + } + return addresses; + } catch (Throwable t) { + return Collections.emptySet(); + } + } + + @Override + public Association association() { + try { + return socket.association(); + } catch (Throwable e) { + return null; + } + } + @Override public boolean isOpen() { return state >= ST_OPEN; diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java deleted file mode 100644 index ca66de644c..0000000000 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientChannelFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.sctp; - -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.socket.SocketChannel; - -/** - * A {@link org.jboss.netty.channel.ChannelFactory} which creates a client-side {@link org.jboss.netty.channel.socket.SocketChannel}. - * - * @author The Netty Project - * @author Trustin Lee - * - * @version $Rev$, $Date$ - * - * @apiviz.has org.jboss.netty.channel.socket.SocketChannel oneway - - creates - */ -public interface SctpClientChannelFactory extends ChannelFactory { - @Override - SctpChannel newChannel(ChannelPipeline pipeline); -} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java index 18afaa0b51..41cea39372 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java @@ -113,7 +113,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { SctpClientChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.sctpChannel.bind(localAddress); + channel.socket.bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -128,7 +128,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { final SctpClientChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.sctpChannel.connect(remoteAddress)) { + if (channel.socket.connect(remoteAddress)) { channel.worker.register(channel, cf); } else { channel.getCloseFuture().addListener(new ChannelFutureListener() { @@ -371,7 +371,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { private void connect(SelectionKey k) { SctpClientChannel ch = (SctpClientChannel) k.attachment(); try { - if (ch.sctpChannel.finishConnect()) { + if (ch.socket.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } @@ -401,7 +401,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { @Override public void run() { try { - channel.sctpChannel.register( + channel.socket.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientSocketChannelFactory.java similarity index 87% rename from src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientSocketChannelFactory.java index 4fa11c8f3b..85913f961d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpClientChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientSocketChannelFactory.java @@ -16,6 +16,7 @@ package org.jboss.netty.channel.socket.sctp; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.internal.ExecutorUtil; import java.util.concurrent.Executor; @@ -28,26 +29,26 @@ import java.util.concurrent.Executor; * *

How threads work

*

- * There are two types of threads in a {@link DefaultSctpClientChannelFactory}; + * There are two types of threads in a {@link SctpClientSocketChannelFactory}; * one is boss thread and the other is worker thread. * *

Boss thread

*

- * One {@link DefaultSctpClientChannelFactory} has one boss thread. It makes + * One {@link SctpClientSocketChannelFactory} has one boss thread. It makes * a connection attempt on request. Once a connection attempt succeeds, * the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker - * threads that the {@link DefaultSctpClientChannelFactory} manages. + * threads that the {@link SctpClientSocketChannelFactory} manages. * *

Worker threads

*

- * One {@link DefaultSctpClientChannelFactory} can have one or more worker + * One {@link SctpClientSocketChannelFactory} can have one or more worker * threads. A worker thread performs non-blocking read and write for one or * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * *

Life cycle of threads and graceful shutdown

*

* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified - * when a {@link DefaultSctpClientChannelFactory} was created. A boss thread is + * when a {@link SctpClientSocketChannelFactory} was created. A boss thread is * acquired from the {@code bossExecutor}, and worker threads are acquired from * the {@code workerExecutor}. Therefore, you should make sure the specified * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. @@ -77,7 +78,7 @@ import java.util.concurrent.Executor; * * @apiviz.landmark */ -public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory { +public class SctpClientSocketChannelFactory implements ClientSocketChannelFactory { private final Executor bossExecutor; private final Executor workerExecutor; @@ -85,7 +86,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory /** * Creates a new instance. Calling this constructor is same with calling - * {@link #DefaultSctpClientChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * + * {@link #SctpClientSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * @@ -94,7 +95,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public DefaultSctpClientChannelFactory( + public SctpClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor) { this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } @@ -109,7 +110,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory * @param workerCount * the maximum number of I/O worker threads */ - public DefaultSctpClientChannelFactory( + public SctpClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { if (bossExecutor == null) { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java new file mode 100644 index 0000000000..3727af8708 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java @@ -0,0 +1,53 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +/** + * @author The Netty Project + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + */ +public final class SctpMessage { + private final int streamNo; + private final int payloadProtocolId; + private final ChannelBuffer data; + + public SctpMessage(int streamNo, int payloadProtocolId, ChannelBuffer data) { + this.streamNo = streamNo; + this.payloadProtocolId = payloadProtocolId; + this.data = data; + } + + public int streamNumber() { + return streamNo; + } + + public int payloadProtocolId() { + return payloadProtocolId; + } + + public ChannelBuffer data() { + if (data.readable()) { + return data.slice(); + } else { + return ChannelBuffers.EMPTY_BUFFER; + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java index 16520856c4..4320a9ef0b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java @@ -18,7 +18,6 @@ package org.jboss.netty.channel.socket.sctp; import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.SctpChannel; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.FileRegion; import java.io.IOException; import java.lang.ref.SoftReference; @@ -28,7 +27,6 @@ import java.nio.ByteBuffer; * @author The Netty Project * @author Trustin Lee * @author Jestan Nirojan - * * @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $ */ final class SctpSendBufferPool { @@ -47,27 +45,29 @@ final class SctpSendBufferPool { } final SendBuffer acquire(Object message) { - if (message instanceof ChannelBuffer) { - return acquire((ChannelBuffer) message); - } else if (message instanceof FileRegion) { - return acquire((FileRegion) message); + if (message instanceof SctpMessage) { + return acquire((SctpMessage) message); + } else { + throw new IllegalArgumentException( + "unsupported message type: " + message.getClass()); } - - throw new IllegalArgumentException( - "unsupported message type: " + message.getClass()); } - private final SendBuffer acquire(ChannelBuffer src) { + private final SendBuffer acquire(SctpMessage message) { + final ChannelBuffer src = message.data(); + final int streamNo = message.streamNumber(); + final int protocolId = message.payloadProtocolId(); + final int size = src.readableBytes(); if (size == 0) { return EMPTY_BUFFER; } if (src.isDirect()) { - return new UnpooledSendBuffer(src.toByteBuffer()); + return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); } if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) { - return new UnpooledSendBuffer(src.toByteBuffer()); + return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer()); } Preallocation current = this.current; @@ -81,7 +81,7 @@ final class SctpSendBufferPool { buffer.position(align(nextPos)); slice.limit(nextPos); current.refCnt++; - dst = new PooledSendBuffer(current, slice); + dst = new PooledSendBuffer(streamNo, protocolId, current, slice); } else if (size > remaining) { this.current = current = getPreallocation(); buffer = current.buffer; @@ -89,11 +89,11 @@ final class SctpSendBufferPool { buffer.position(align(size)); slice.limit(size); current.refCnt++; - dst = new PooledSendBuffer(current, slice); + dst = new PooledSendBuffer(streamNo, protocolId, current, slice); } else { // size == remaining current.refCnt++; this.current = getPreallocation0(); - dst = new PooledSendBuffer(current, current.buffer); + dst = new PooledSendBuffer(streamNo, protocolId, current, current.buffer); } ByteBuffer dstbuf = dst.buffer; @@ -166,7 +166,7 @@ final class SctpSendBufferPool { long totalBytes(); - long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException; + long transferTo(SctpChannel ch) throws IOException; void release(); } @@ -175,8 +175,12 @@ final class SctpSendBufferPool { final ByteBuffer buffer; final int initialPos; + final int streamNo; + final int protocolId; - UnpooledSendBuffer(ByteBuffer buffer) { + UnpooledSendBuffer(int streamNo, int protocolId, ByteBuffer buffer) { + this.streamNo = streamNo; + this.protocolId = protocolId; this.buffer = buffer; initialPos = buffer.position(); } @@ -197,7 +201,7 @@ final class SctpSendBufferPool { } @Override - public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { + public long transferTo(SctpChannel ch) throws IOException { final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); messageInfo.payloadProtocolID(protocolId); messageInfo.streamNumber(streamNo); @@ -216,8 +220,12 @@ final class SctpSendBufferPool { private final Preallocation parent; final ByteBuffer buffer; final int initialPos; + final int streamNo; + final int protocolId; - PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { + PooledSendBuffer(int streamNo, int protocolId, Preallocation parent, ByteBuffer buffer) { + this.streamNo = streamNo; + this.protocolId = protocolId; this.parent = parent; this.buffer = buffer; initialPos = buffer.position(); @@ -239,7 +247,7 @@ final class SctpSendBufferPool { } @Override - public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { + public long transferTo(SctpChannel ch) throws IOException { final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo); messageInfo.payloadProtocolID(protocolId); messageInfo.streamNumber(streamNo); @@ -281,7 +289,7 @@ final class SctpSendBufferPool { } @Override - public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException { + public long transferTo(SctpChannel ch) throws IOException { return 0; } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java index 886f27d6d9..ca24a0357a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java @@ -15,7 +15,7 @@ */ package org.jboss.netty.channel.socket.sctp; -import org.jboss.netty.channel.ServerChannel; +import org.jboss.netty.channel.socket.ServerSocketChannel; import org.jboss.netty.channel.socket.ServerSocketChannelConfig; import java.net.InetSocketAddress; @@ -29,9 +29,9 @@ import java.net.InetSocketAddress; * @version $Rev$, $Date$ * */ -public interface SctpServerChannel extends ServerChannel { +public interface SctpServerChannel extends ServerSocketChannel { @Override - SctpServerChannelConfig getConfig(); + ServerSocketChannelConfig getConfig(); @Override InetSocketAddress getLocalAddress(); @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java index 1865b2dcdd..6463d6a2a8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelConfig.java @@ -15,71 +15,100 @@ */ package org.jboss.netty.channel.socket.sctp; -import org.jboss.netty.channel.ChannelConfig; +import com.sun.nio.sctp.SctpStandardSocketOption; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.DefaultServerChannelConfig; +import org.jboss.netty.channel.socket.ServerSocketChannelConfig; +import org.jboss.netty.util.internal.ConversionUtil; + +import java.io.IOException; /** - * A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.ServerSocketChannel}. - * - *

Available options

- * - * In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig}, - * {@link org.jboss.netty.channel.socket.sctp.SctpServerChannelConfig} allows the following options in the - * option map: - * - * - * - * - * - * - * - * - * - * - * - *
NameAssociated setter method
{@code "backlog"}{@link #setBacklog(int)}
{@code "reuseAddress"}{@link #setReuseAddress(boolean)}
{@code "receiveBufferSize"}{@link #setReceiveBufferSize(int)}
+ * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. * * @author The Netty Project * @author Trustin Lee + * * @version $Rev$, $Date$ */ -public interface SctpServerChannelConfig extends ChannelConfig { +public class SctpServerChannelConfig extends DefaultServerChannelConfig + implements ServerSocketChannelConfig { + + private final com.sun.nio.sctp.SctpServerChannel serverChannel; + private volatile int backlog; /** - * Gets the backlog value to specify when the channel binds to a local - * address. + * Creates a new instance. */ - int getBacklog(); + public SctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) { + if (serverChannel == null) { + throw new NullPointerException("serverChannel"); + } + this.serverChannel = serverChannel; + } - /** - * Sets the backlog value to specify when the channel binds to a local - * address. - */ - void setBacklog(int backlog); + @Override + public boolean setOption(String key, Object value) { + if (super.setOption(key, value)) { + return true; + } - /** - * Gets the {@code SO_REUSEADDR} option. - */ - boolean isReuseAddress(); + if (key.equals("receiveBufferSize")) { + setReceiveBufferSize(ConversionUtil.toInt(value)); + } else if (key.equals("reuseAddress")) { + setReuseAddress(ConversionUtil.toBoolean(value)); + } else if (key.equals("backlog")) { + setBacklog(ConversionUtil.toInt(value)); + } else { + return false; + } + return true; + } - /** - * Sets the {@code SO_REUSEADDR} option. - */ - void setReuseAddress(boolean reuseAddress); + @Override + public boolean isReuseAddress() { + return false; + } - /** - * Gets the {@code SO_RCVBUF} option. - */ - int getReceiveBufferSize(); + @Override + public void setReuseAddress(boolean reuseAddress) { + throw new UnsupportedOperationException("Not supported"); + } - /** - * Sets the {@code SO_RCVBUF} option. - */ - void setReceiveBufferSize(int receiveBufferSize); + @Override + public int getReceiveBufferSize() { + try { + return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF); + } catch (IOException e) { + throw new ChannelException(e); + } + } - /** - * Sets the performance preferences as specified in - * {@link java.net.ServerSocket#setPerformancePreferences(int, int, int)}. - */ - void setPerformancePreferences(int connectionTime, int latency, int bandwidth); + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + try { + serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public void setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java deleted file mode 100644 index 00880f1a73..0000000000 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 - * (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.sctp; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ServerChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannel; - -/** - * A {@link org.jboss.netty.channel.ChannelFactory} which creates a {@link org.jboss.netty.channel.socket.ServerSocketChannel}. - * - * @author The Netty Project - * @author Trustin Lee - * - * @version $Rev$, $Date$ - * - */ -public interface SctpServerChannelFactory extends ServerChannelFactory { - @Override - SctpServerChannel newChannel(ChannelPipeline pipeline); -} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java index 45162cac46..0aa9bb0484 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java @@ -46,7 +46,7 @@ class SctpServerChannelImpl extends AbstractServerChannel final com.sun.nio.sctp.SctpServerChannel socket; final Lock shutdownLock = new ReentrantLock(); volatile Selector selector; - private final SctpServerChannelConfig config; + private final ServerSocketChannelConfig config; private volatile boolean bound; @@ -77,13 +77,13 @@ class SctpServerChannelImpl extends AbstractServerChannel throw new ChannelException("Failed to enter non-blocking mode.", e); } - config = new DefaultSctpServerChannelConfig(socket); + config = new SctpServerChannelConfig(socket); fireChannelOpen(this); } @Override - public SctpServerChannelConfig getConfig() { + public ServerSocketChannelConfig getConfig() { return config; } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java index 2fa014f4a8..8c31fe45de 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerPipelineSink.java @@ -141,7 +141,7 @@ class SctpServerPipelineSink extends AbstractChannelSink { fireChannelBound(channel, channel.getLocalAddress()); Executor bossExecutor = - ((DefaultSctpServerChannelFactory) channel.getFactory()).bossExecutor; + ((SctpServerSocketChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start(bossExecutor, new Boss(channel)); bossStarted = true; } catch (Throwable t) { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerSocketChannelFactory.java similarity index 89% rename from src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java rename to src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerSocketChannelFactory.java index 661cfcfdc7..ff7ef955cb 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerSocketChannelFactory.java @@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.sctp; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.ServerChannelFactory; import org.jboss.netty.util.internal.ExecutorUtil; import java.util.concurrent.Executor; @@ -29,7 +30,7 @@ import java.util.concurrent.Executor; * *

How threads work

*

- * There are two types of threads in a {@link DefaultSctpServerChannelFactory}; + * There are two types of threads in a {@link SctpServerSocketChannelFactory}; * one is boss thread and the other is worker thread. * *

Boss threads

@@ -39,18 +40,18 @@ import java.util.concurrent.Executor; * have two boss threads. A boss thread accepts incoming connections until * the port is unbound. Once a connection is accepted successfully, the boss * thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker - * threads that the {@link DefaultSctpServerChannelFactory} manages. + * threads that the {@link SctpServerSocketChannelFactory} manages. * *

Worker threads

*

- * One {@link DefaultSctpServerChannelFactory} can have one or more worker + * One {@link SctpServerSocketChannelFactory} can have one or more worker * threads. A worker thread performs non-blocking read and write for one or * more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode. * *

Life cycle of threads and graceful shutdown

*

* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified - * when a {@link DefaultSctpServerChannelFactory} was created. Boss threads are + * when a {@link SctpServerSocketChannelFactory} was created. Boss threads are * acquired from the {@code bossExecutor}, and worker threads are acquired from * the {@code workerExecutor}. Therefore, you should make sure the specified * {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads. @@ -81,7 +82,7 @@ import java.util.concurrent.Executor; * * @apiviz.landmark */ -public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory { +public class SctpServerSocketChannelFactory implements ServerChannelFactory { final Executor bossExecutor; private final Executor workerExecutor; @@ -89,7 +90,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory /** * Creates a new instance. Calling this constructor is same with calling - * {@link #DefaultSctpServerChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * + * {@link #SctpServerSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 * * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * @@ -98,7 +99,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory * @param workerExecutor * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads */ - public DefaultSctpServerChannelFactory( + public SctpServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor) { this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } @@ -113,7 +114,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory * @param workerCount * the maximum number of I/O worker threads */ - public DefaultSctpServerChannelFactory( + public SctpServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { if (bossExecutor == null) { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java index 5d680ae2e9..056a738b0a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java @@ -70,7 +70,6 @@ class SctpWorker implements Runnable { private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool(); - private int payloadProtocolId = 0;// un-known sctp payload protocol id private NotificationHandler notificationHandler; @@ -82,7 +81,6 @@ class SctpWorker implements Runnable { boolean server = !(channel instanceof SctpClientChannel); Runnable registerTask = new RegisterTask(channel, future, server); - payloadProtocolId = channel.getConfig().getPayloadProtocol(); notificationHandler = new NotificationHandler(channel, this); Selector selector; @@ -300,10 +298,11 @@ class SctpWorker implements Runnable { boolean messageReceived = false; boolean failure = true; + MessageInfo messageInfo = null; ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { - MessageInfo messageInfo = channel.sctpChannel.receive(bb, this, notificationHandler); + messageInfo = channel.socket.receive(bb, this, notificationHandler); if (messageInfo != null) { messageReceived = true; if (messageInfo.isComplete()) { @@ -338,12 +337,12 @@ class SctpWorker implements Runnable { predictor.previousReceiveBufferSize(receivedBytes); // Fire the event. - fireMessageReceived(channel, buffer); + fireMessageReceived(channel, new SctpMessage(messageInfo.streamNumber(), messageInfo.payloadProtocolID(), buffer)); } else { recvBufferPool.release(bb); } - if (channel.sctpChannel.isBlocking() && !messageReceived || failure) { + if (channel.socket.isBlocking() && !messageReceived || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; @@ -435,7 +434,7 @@ class SctpWorker implements Runnable { long writtenBytes = 0; final SctpSendBufferPool sendBufferPool = this.sendBufferPool; - final com.sun.nio.sctp.SctpChannel ch = channel.sctpChannel; + final com.sun.nio.sctp.SctpChannel ch = channel.socket; final Queue writeBuffer = channel.writeBuffer; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { @@ -459,7 +458,7 @@ class SctpWorker implements Runnable { try { long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i--) { - localWrittenBytes = buf.transferTo(ch, payloadProtocolId, 0); + localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; @@ -522,7 +521,7 @@ class SctpWorker implements Runnable { private void setOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.sctpChannel.keyFor(selector); + SelectionKey key = channel.socket.keyFor(selector); if (key == null) { return; } @@ -545,7 +544,7 @@ class SctpWorker implements Runnable { private void clearOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.sctpChannel.keyFor(selector); + SelectionKey key = channel.socket.keyFor(selector); if (key == null) { return; } @@ -570,7 +569,7 @@ class SctpWorker implements Runnable { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { - channel.sctpChannel.close(); + channel.socket.close(); cancelledKeys++; if (channel.setClosed()) { @@ -654,7 +653,7 @@ class SctpWorker implements Runnable { // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { Selector selector = this.selector; - SelectionKey key = channel.sctpChannel.keyFor(selector); + SelectionKey key = channel.socket.keyFor(selector); if (key == null || selector == null) { // Not registered to the worker yet. @@ -749,11 +748,11 @@ class SctpWorker implements Runnable { try { if (server) { - channel.sctpChannel.configureBlocking(false); + channel.socket.configureBlocking(false); } synchronized (channel.interestOpsLock) { - channel.sctpChannel.register( + channel.socket.register( selector, channel.getRawInterestOps(), channel); } if (future != null) { diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpClient.java b/src/main/java/org/jboss/netty/example/sctp/SctpClient.java new file mode 100644 index 0000000000..ac15ab7508 --- /dev/null +++ b/src/main/java/org/jboss/netty/example/sctp/SctpClient.java @@ -0,0 +1,69 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.example.sctp; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +/** + * Sends one message when a connection is open and echoes back any received + * data to the server. Simply put, the echo client initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + * + * @author The Netty Project + * @author Trustin Lee + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + * + */ +public class SctpClient { + + public static void main(String[] args) throws Exception { + + // Configure the client. + ClientBootstrap bootstrap = new ClientBootstrap( + new SctpClientSocketChannelFactory( + Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + + // Set up the pipeline factory. + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline( + new SctpClientHandler()); + } + }); + + // Start the connection attempt. + ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 2955), new InetSocketAddress("localhost", 2956)); + + // Wait until the connection is closed or the connection attempt fails. + future.getChannel().getCloseFuture().awaitUninterruptibly(); + + // Shut down thread pools to exit. + bootstrap.releaseExternalResources(); + } +} diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java new file mode 100644 index 0000000000..9151274e8d --- /dev/null +++ b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java @@ -0,0 +1,75 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.example.sctp; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.sctp.SctpMessage; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Handler implementation for the echo client. It initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + * + * @author The Netty Project + * @author Trustin Lee + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + */ +public class SctpClientHandler extends SimpleChannelUpstreamHandler { + + private static final Logger logger = Logger.getLogger( + SctpClientHandler.class.getName()); + + /** + * Creates a client-side handler. + */ + public SctpClientHandler() { + + } + + + @Override + public void channelConnected( + ChannelHandlerContext ctx, ChannelStateEvent e) { + + e.getChannel().write(new SctpMessage(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes()))); + } + + @Override + public void messageReceived( + ChannelHandlerContext ctx, MessageEvent e) { + // Send back the received message to the remote peer. + e.getChannel().write(e.getMessage()); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, ExceptionEvent e) { + // Close the connection when an exception is raised. + logger.log( + Level.WARNING, + "Unexpected exception from downstream.", + e.getCause()); + e.getChannel().close(); + } +} diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpServer.java b/src/main/java/org/jboss/netty/example/sctp/SctpServer.java new file mode 100644 index 0000000000..aef6f80d5e --- /dev/null +++ b/src/main/java/org/jboss/netty/example/sctp/SctpServer.java @@ -0,0 +1,57 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.example.sctp; + +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +/** + * Echoes back any received data from a client. + * + * @author The Netty Project + * @author Trustin Lee + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + * + */ +public class SctpServer { + + public static void main(String[] args) throws Exception { + // Configure the server. + ServerBootstrap bootstrap = new ServerBootstrap( + new SctpServerSocketChannelFactory( + Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + + // Set up the pipeline factory. + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(new SctpServerHandler()); + } + }); + + // Bind and start to accept incoming connections. + bootstrap.bind(new InetSocketAddress("localhost", 2955)); + } +} diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java b/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java new file mode 100644 index 0000000000..4f6e868969 --- /dev/null +++ b/src/main/java/org/jboss/netty/example/sctp/SctpServerHandler.java @@ -0,0 +1,59 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.example.sctp; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Handler implementation for the echo server. + * + * @author The Netty Project + * @author Trustin Lee + * @author Jestan Nirojan + * + * @version $Rev$, $Date$ + */ +public class SctpServerHandler extends SimpleChannelUpstreamHandler { + + private static final Logger logger = Logger.getLogger( + SctpServerHandler.class.getName()); + + @Override + public void messageReceived( + ChannelHandlerContext ctx, MessageEvent e) { + // Send back the received message to the remote peer. + e.getChannel().write(e.getMessage()); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, ExceptionEvent e) { + // Close the connection when an exception is raised. + logger.log( + Level.WARNING, + "Unexpected exception from downstream.", + e.getCause()); + e.getChannel().close(); + } +}