1)added sctp echo example 2)refactored sctp channel impl classes

This commit is contained in:
Jestan Nirojan 2011-10-10 02:58:20 +05:30
parent dad06d539c
commit a6d16daa08
21 changed files with 559 additions and 487 deletions

View File

@ -1,114 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpStandardSocketOption;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException;
/**
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig
implements SctpServerChannelConfig {
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
private volatile int backlog;
/**
* Creates a new instance.
*/
public DefaultSctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getReceiveBufferSize() {
try {
return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -17,7 +17,9 @@ package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.ConversionUtil;
@ -25,46 +27,46 @@ import org.jboss.netty.util.internal.ConversionUtil;
import java.util.Map;
/**
* The default {@link SctpChannelConfig} implementation.
* The default {@link NioSocketChannelConfig} implementation for SCTP.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
class DefaultSctpChannelConfig implements SctpChannelConfig {
class DefaultSctpSocketChannelConfig implements NioSocketChannelConfig {
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile int connectTimeoutMillis = 10000; // 10 seconds
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultSctpChannelConfig.class);
InternalLoggerFactory.getInstance(DefaultSctpSocketChannelConfig.class);
private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY =
new AdaptiveReceiveBufferSizePredictorFactory();
new AdaptiveReceiveBufferSizePredictorFactory();
private volatile int writeBufferHighWaterMark = 64 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile ReceiveBufferSizePredictor predictor;
private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY;
private volatile int writeSpinCount = 16;
private SctpChannel socket;
private int payloadProtocolId = 0;
DefaultSctpChannelConfig(SctpChannel socket) {
DefaultSctpSocketChannelConfig(SctpChannel socket) {
this.socket = socket;
}
@Override
public void setOptions(Map<String, Object> options) {
setOptions(options);
//TODO: implement this as in DefaultSocketChannelConfig
//socket.setOption(options);
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
// Notify the user about misconfiguration.
logger.warn(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark; setting to the half of the " +
"writeBufferHighWaterMark.");
"writeBufferHighWaterMark; setting to the half of the " +
"writeBufferHighWaterMark.");
}
}
@ -89,11 +91,15 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
@Override
public ChannelBufferFactory getBufferFactory() {
return null;
return bufferFactory;
}
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
@Override
@ -103,15 +109,20 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
//unused
}
@Override
public int getConnectTimeoutMillis() {
return 0;
return connectTimeoutMillis;
}
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
if (connectTimeoutMillis < 0) {
throw new IllegalArgumentException("connectTimeoutMillis: " + connectTimeoutMillis);
}
this.connectTimeoutMillis = connectTimeoutMillis;
}
@Override
@ -124,8 +135,8 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark cannot be less than " +
"writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
"writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
}
setWriteBufferHighWaterMark0(writeBufferHighWaterMark);
}
@ -148,8 +159,8 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
"writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
}
setWriteBufferLowWaterMark0(writeBufferLowWaterMark);
}
@ -185,7 +196,7 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
} catch (Exception e) {
throw new ChannelException(
"Failed to create a new " +
ReceiveBufferSizePredictor.class.getSimpleName() + '.',
ReceiveBufferSizePredictor.class.getSimpleName() + '.',
e);
}
}
@ -214,10 +225,6 @@ class DefaultSctpChannelConfig implements SctpChannelConfig {
this.predictorFactory = predictorFactory;
}
@Override
public int getPayloadProtocol() {
return payloadProtocolId;
}
@Override
public boolean isTcpNoDelay() {

View File

@ -15,9 +15,11 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.Association;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -26,16 +28,21 @@ import java.util.Set;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
//TODO: support set of loacal, remote addresses.
public interface SctpChannel extends Channel{
public interface SctpChannel extends SocketChannel {
@Override
InetSocketAddress getLocalAddress();
Set<InetSocketAddress> getAllLocalAddresses();
@Override
SctpChannelConfig getConfig();
NioSocketChannelConfig getConfig();
@Override
InetSocketAddress getRemoteAddress();
Set<InetSocketAddress> getRemoteAddresses();
Association association();
}

View File

@ -1,149 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
/**
* A {@link org.jboss.netty.channel.socket.SocketChannelConfig} for a NIO TCP/IP {@link org.jboss.netty.channel.socket.SocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig} and
* {@link org.jboss.netty.channel.socket.SocketChannelConfig}, {@link SctpChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictor"}</td><td>{@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictorFactory"}</td><td>{@link #setReceiveBufferSizePredictorFactory(org.jboss.netty.channel.ReceiveBufferSizePredictorFactory)}</td>
* </tr>
* </table>
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public interface SctpChannelConfig extends SocketChannelConfig {
/**
* Returns the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()}
* will start to return {@code false}.
*/
int getWriteBufferHighWaterMark();
/**
* Sets the high water mark of the write buffer. If the number of bytes
* queued in the write buffer exceeds this value, {@link org.jboss.netty.channel.Channel#isWritable()}
* will start to return {@code false}.
*/
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
/**
* Returns the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return
* {@code true} again.
*/
int getWriteBufferLowWaterMark();
/**
* Sets the low water mark of the write buffer. Once the number of bytes
* queued in the write buffer exceeded the
* {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
* dropped down below this value, {@link org.jboss.netty.channel.Channel#isWritable()} will return
* {@code true} again.
*/
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
/**
* Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
/**
* Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
/**
* Returns the {@link org.jboss.netty.channel.ReceiveBufferSizePredictorFactory} which creates a new
* {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and
* no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory();
/**
* Sets the {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} which creates a new
* {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} when a new channel is created and
* no {@link org.jboss.netty.channel.ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(org.jboss.netty.channel.ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictorFactory(
ReceiveBufferSizePredictorFactory predictorFactory);
/**
* Get the sctp payload protocol id
* A value indicating the type of payload protocol data being transmitted. This value is passed as opaque data by SCTP.
* @return payload protocol id
*/
int getPayloadProtocol();
}

View File

@ -15,16 +15,18 @@
*/
package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.channel.socket.sctp.SctpSendBufferPool.SendBuffer;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -47,9 +49,9 @@ class SctpChannelImpl extends AbstractChannel
private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN;
final SctpChannel sctpChannel;
final SctpChannel socket;
final SctpWorker worker;
private final SctpChannelConfig config;
private final NioSocketChannelConfig config;
private volatile InetSocketAddress localAddress;
private volatile InetSocketAddress remoteAddress;
@ -71,12 +73,12 @@ class SctpChannelImpl extends AbstractChannel
public SctpChannelImpl(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
SctpChannel sctpChannel, SctpWorker worker) {
SctpChannel socket, SctpWorker worker) {
super(parent, factory, pipeline, sink);
this.sctpChannel = sctpChannel;
this.socket = socket;
this.worker = worker;
config = new DefaultSctpChannelConfig(sctpChannel);
config = new DefaultSctpSocketChannelConfig(socket);
getCloseFuture().addListener(new ChannelFutureListener() {
@Override
@ -87,7 +89,7 @@ class SctpChannelImpl extends AbstractChannel
}
@Override
public SctpChannelConfig getConfig() {
public NioSocketChannelConfig getConfig() {
return config;
}
@ -96,33 +98,70 @@ class SctpChannelImpl extends AbstractChannel
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
//TODO: fix this
this.localAddress = localAddress =
(InetSocketAddress) sctpChannel.getAllLocalAddresses().iterator().next();
final Iterator<SocketAddress> iterator = socket.getAllLocalAddresses().iterator();
if (iterator.hasNext()) {
this.localAddress = localAddress = (InetSocketAddress) iterator.next();
}
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
@Override
public Set<InetSocketAddress> getAllLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = socket.getAllLocalAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
//TODO: fix this
this.remoteAddress = remoteAddress =
(InetSocketAddress) sctpChannel.getRemoteAddresses().iterator().next();
final Iterator<SocketAddress> iterator = socket.getRemoteAddresses().iterator();
if (iterator.hasNext()) {
this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
}
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
@Override
public Set<InetSocketAddress> getRemoteAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = socket.getRemoteAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable t) {
return Collections.emptySet();
}
}
@Override
public Association association() {
try {
return socket.association();
} catch (Throwable e) {
return null;
}
}
@Override
public boolean isOpen() {
return state >= ST_OPEN;

View File

@ -1,35 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a client-side {@link org.jboss.netty.channel.socket.SocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
* @apiviz.has org.jboss.netty.channel.socket.SocketChannel oneway - - creates
*/
public interface SctpClientChannelFactory extends ChannelFactory {
@Override
SctpChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -113,7 +113,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
SctpClientChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.sctpChannel.bind(localAddress);
channel.socket.bind(localAddress);
channel.boundManually = true;
channel.setBound();
future.setSuccess();
@ -128,7 +128,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
if (channel.sctpChannel.connect(remoteAddress)) {
if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
channel.getCloseFuture().addListener(new ChannelFutureListener() {
@ -371,7 +371,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
private void connect(SelectionKey k) {
SctpClientChannel ch = (SctpClientChannel) k.attachment();
try {
if (ch.sctpChannel.finishConnect()) {
if (ch.socket.finishConnect()) {
k.cancel();
ch.worker.register(ch, ch.connectFuture);
}
@ -401,7 +401,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
@Override
public void run() {
try {
channel.sctpChannel.register(
channel.socket.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));

View File

@ -16,6 +16,7 @@
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
@ -28,26 +29,26 @@ import java.util.concurrent.Executor;
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link DefaultSctpClientChannelFactory};
* There are two types of threads in a {@link SctpClientSocketChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss thread</h4>
* <p>
* One {@link DefaultSctpClientChannelFactory} has one boss thread. It makes
* One {@link SctpClientSocketChannelFactory} has one boss thread. It makes
* a connection attempt on request. Once a connection attempt succeeds,
* the boss thread passes the connected {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link DefaultSctpClientChannelFactory} manages.
* threads that the {@link SctpClientSocketChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link DefaultSctpClientChannelFactory} can have one or more worker
* One {@link SctpClientSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link DefaultSctpClientChannelFactory} was created. A boss thread is
* when a {@link SctpClientSocketChannelFactory} was created. A boss thread is
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -77,7 +78,7 @@ import java.util.concurrent.Executor;
*
* @apiviz.landmark
*/
public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory {
public class SctpClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor bossExecutor;
private final Executor workerExecutor;
@ -85,7 +86,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #DefaultSctpClientChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* {@link #SctpClientSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
@ -94,7 +95,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/
public DefaultSctpClientChannelFactory(
public SctpClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
@ -109,7 +110,7 @@ public class DefaultSctpClientChannelFactory implements SctpClientChannelFactory
* @param workerCount
* the maximum number of I/O worker threads
*/
public DefaultSctpClientChannelFactory(
public SctpClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
if (bossExecutor == null) {

View File

@ -0,0 +1,53 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public final class SctpMessage {
private final int streamNo;
private final int payloadProtocolId;
private final ChannelBuffer data;
public SctpMessage(int streamNo, int payloadProtocolId, ChannelBuffer data) {
this.streamNo = streamNo;
this.payloadProtocolId = payloadProtocolId;
this.data = data;
}
public int streamNumber() {
return streamNo;
}
public int payloadProtocolId() {
return payloadProtocolId;
}
public ChannelBuffer data() {
if (data.readable()) {
return data.slice();
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
}
}

View File

@ -18,7 +18,6 @@ package org.jboss.netty.channel.socket.sctp;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.FileRegion;
import java.io.IOException;
import java.lang.ref.SoftReference;
@ -28,7 +27,6 @@ import java.nio.ByteBuffer;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $
*/
final class SctpSendBufferPool {
@ -47,27 +45,29 @@ final class SctpSendBufferPool {
}
final SendBuffer acquire(Object message) {
if (message instanceof ChannelBuffer) {
return acquire((ChannelBuffer) message);
} else if (message instanceof FileRegion) {
return acquire((FileRegion) message);
if (message instanceof SctpMessage) {
return acquire((SctpMessage) message);
} else {
throw new IllegalArgumentException(
"unsupported message type: " + message.getClass());
}
throw new IllegalArgumentException(
"unsupported message type: " + message.getClass());
}
private final SendBuffer acquire(ChannelBuffer src) {
private final SendBuffer acquire(SctpMessage message) {
final ChannelBuffer src = message.data();
final int streamNo = message.streamNumber();
final int protocolId = message.payloadProtocolId();
final int size = src.readableBytes();
if (size == 0) {
return EMPTY_BUFFER;
}
if (src.isDirect()) {
return new UnpooledSendBuffer(src.toByteBuffer());
return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer());
}
if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
return new UnpooledSendBuffer(src.toByteBuffer());
return new UnpooledSendBuffer(streamNo, protocolId, src.toByteBuffer());
}
Preallocation current = this.current;
@ -81,7 +81,7 @@ final class SctpSendBufferPool {
buffer.position(align(nextPos));
slice.limit(nextPos);
current.refCnt++;
dst = new PooledSendBuffer(current, slice);
dst = new PooledSendBuffer(streamNo, protocolId, current, slice);
} else if (size > remaining) {
this.current = current = getPreallocation();
buffer = current.buffer;
@ -89,11 +89,11 @@ final class SctpSendBufferPool {
buffer.position(align(size));
slice.limit(size);
current.refCnt++;
dst = new PooledSendBuffer(current, slice);
dst = new PooledSendBuffer(streamNo, protocolId, current, slice);
} else { // size == remaining
current.refCnt++;
this.current = getPreallocation0();
dst = new PooledSendBuffer(current, current.buffer);
dst = new PooledSendBuffer(streamNo, protocolId, current, current.buffer);
}
ByteBuffer dstbuf = dst.buffer;
@ -166,7 +166,7 @@ final class SctpSendBufferPool {
long totalBytes();
long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException;
long transferTo(SctpChannel ch) throws IOException;
void release();
}
@ -175,8 +175,12 @@ final class SctpSendBufferPool {
final ByteBuffer buffer;
final int initialPos;
final int streamNo;
final int protocolId;
UnpooledSendBuffer(ByteBuffer buffer) {
UnpooledSendBuffer(int streamNo, int protocolId, ByteBuffer buffer) {
this.streamNo = streamNo;
this.protocolId = protocolId;
this.buffer = buffer;
initialPos = buffer.position();
}
@ -197,7 +201,7 @@ final class SctpSendBufferPool {
}
@Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
public long transferTo(SctpChannel ch) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo);
@ -216,8 +220,12 @@ final class SctpSendBufferPool {
private final Preallocation parent;
final ByteBuffer buffer;
final int initialPos;
final int streamNo;
final int protocolId;
PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
PooledSendBuffer(int streamNo, int protocolId, Preallocation parent, ByteBuffer buffer) {
this.streamNo = streamNo;
this.protocolId = protocolId;
this.parent = parent;
this.buffer = buffer;
initialPos = buffer.position();
@ -239,7 +247,7 @@ final class SctpSendBufferPool {
}
@Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
public long transferTo(SctpChannel ch) throws IOException {
final MessageInfo messageInfo = MessageInfo.createOutgoing(ch.association(), null, streamNo);
messageInfo.payloadProtocolID(protocolId);
messageInfo.streamNumber(streamNo);
@ -281,7 +289,7 @@ final class SctpSendBufferPool {
}
@Override
public long transferTo(SctpChannel ch, int protocolId, int streamNo) throws IOException {
public long transferTo(SctpChannel ch) throws IOException {
return 0;
}

View File

@ -15,7 +15,7 @@
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
@ -29,9 +29,9 @@ import java.net.InetSocketAddress;
* @version $Rev$, $Date$
*
*/
public interface SctpServerChannel extends ServerChannel {
public interface SctpServerChannel extends ServerSocketChannel {
@Override
SctpServerChannelConfig getConfig();
ServerSocketChannelConfig getConfig();
@Override
InetSocketAddress getLocalAddress();
@Override

View File

@ -15,71 +15,100 @@
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelConfig;
import com.sun.nio.sctp.SctpStandardSocketOption;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException;
/**
* A {@link org.jboss.netty.channel.ChannelConfig} for a {@link org.jboss.netty.channel.socket.ServerSocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link org.jboss.netty.channel.ChannelConfig},
* {@link org.jboss.netty.channel.socket.sctp.SctpServerChannelConfig} allows the following options in the
* option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
* </tr><tr>
* <td>{@code "reuseAddress"}</td><td>{@link #setReuseAddress(boolean)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSize"}</td><td>{@link #setReceiveBufferSize(int)}</td>
* </tr>
* </table>
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public interface SctpServerChannelConfig extends ChannelConfig {
public class SctpServerChannelConfig extends DefaultServerChannelConfig
implements ServerSocketChannelConfig {
private final com.sun.nio.sctp.SctpServerChannel serverChannel;
private volatile int backlog;
/**
* Gets the backlog value to specify when the channel binds to a local
* address.
* Creates a new instance.
*/
int getBacklog();
public SctpServerChannelConfig(com.sun.nio.sctp.SctpServerChannel serverChannel) {
if (serverChannel == null) {
throw new NullPointerException("serverChannel");
}
this.serverChannel = serverChannel;
}
/**
* Sets the backlog value to specify when the channel binds to a local
* address.
*/
void setBacklog(int backlog);
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
/**
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option.
*/
boolean isReuseAddress();
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
/**
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_REUSEADDR}</a> option.
*/
void setReuseAddress(boolean reuseAddress);
@Override
public boolean isReuseAddress() {
return false;
}
/**
* Gets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option.
*/
int getReceiveBufferSize();
@Override
public void setReuseAddress(boolean reuseAddress) {
throw new UnsupportedOperationException("Not supported");
}
/**
* Sets the <a href="http://java.sun.com/javase/6/docs/technotes/guides/net/socketOpt.html">{@code SO_RCVBUF}</a> option.
*/
void setReceiveBufferSize(int receiveBufferSize);
@Override
public int getReceiveBufferSize() {
try {
return serverChannel.getOption(SctpStandardSocketOption.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
/**
* Sets the performance preferences as specified in
* {@link java.net.ServerSocket#setPerformancePreferences(int, int, int)}.
*/
void setPerformancePreferences(int connectionTime, int latency, int bandwidth);
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
serverChannel.setOption(SctpStandardSocketOption.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.channel.socket.ServerSocketChannel;
/**
* A {@link org.jboss.netty.channel.ChannelFactory} which creates a {@link org.jboss.netty.channel.socket.ServerSocketChannel}.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
*/
public interface SctpServerChannelFactory extends ServerChannelFactory {
@Override
SctpServerChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -46,7 +46,7 @@ class SctpServerChannelImpl extends AbstractServerChannel
final com.sun.nio.sctp.SctpServerChannel socket;
final Lock shutdownLock = new ReentrantLock();
volatile Selector selector;
private final SctpServerChannelConfig config;
private final ServerSocketChannelConfig config;
private volatile boolean bound;
@ -77,13 +77,13 @@ class SctpServerChannelImpl extends AbstractServerChannel
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultSctpServerChannelConfig(socket);
config = new SctpServerChannelConfig(socket);
fireChannelOpen(this);
}
@Override
public SctpServerChannelConfig getConfig() {
public ServerSocketChannelConfig getConfig() {
return config;
}

View File

@ -141,7 +141,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((DefaultSctpServerChannelFactory) channel.getFactory()).bossExecutor;
((SctpServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor, new Boss(channel));
bossStarted = true;
} catch (Throwable t) {

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
@ -29,7 +30,7 @@ import java.util.concurrent.Executor;
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link DefaultSctpServerChannelFactory};
* There are two types of threads in a {@link SctpServerSocketChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss threads</h4>
@ -39,18 +40,18 @@ import java.util.concurrent.Executor;
* have two boss threads. A boss thread accepts incoming connections until
* the port is unbound. Once a connection is accepted successfully, the boss
* thread passes the accepted {@link org.jboss.netty.channel.Channel} to one of the worker
* threads that the {@link DefaultSctpServerChannelFactory} manages.
* threads that the {@link SctpServerSocketChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* One {@link DefaultSctpServerChannelFactory} can have one or more worker
* One {@link SctpServerSocketChannelFactory} can have one or more worker
* threads. A worker thread performs non-blocking read and write for one or
* more {@link org.jboss.netty.channel.Channel}s in a non-blocking mode.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link java.util.concurrent.Executor}s which were specified
* when a {@link DefaultSctpServerChannelFactory} was created. Boss threads are
* when a {@link SctpServerSocketChannelFactory} was created. Boss threads are
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link java.util.concurrent.Executor}s are able to lend the sufficient number of threads.
@ -81,7 +82,7 @@ import java.util.concurrent.Executor;
*
* @apiviz.landmark
*/
public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory {
public class SctpServerSocketChannelFactory implements ServerChannelFactory {
final Executor bossExecutor;
private final Executor workerExecutor;
@ -89,7 +90,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #DefaultSctpServerChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* {@link #SctpServerSocketChannelFactory(java.util.concurrent.Executor, java.util.concurrent.Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
@ -98,7 +99,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
*/
public DefaultSctpServerChannelFactory(
public SctpServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
@ -113,7 +114,7 @@ public class DefaultSctpServerChannelFactory implements SctpServerChannelFactory
* @param workerCount
* the maximum number of I/O worker threads
*/
public DefaultSctpServerChannelFactory(
public SctpServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
if (bossExecutor == null) {

View File

@ -70,7 +70,6 @@ class SctpWorker implements Runnable {
private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
private final SctpSendBufferPool sendBufferPool = new SctpSendBufferPool();
private int payloadProtocolId = 0;// un-known sctp payload protocol id
private NotificationHandler notificationHandler;
@ -82,7 +81,6 @@ class SctpWorker implements Runnable {
boolean server = !(channel instanceof SctpClientChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
payloadProtocolId = channel.getConfig().getPayloadProtocol();
notificationHandler = new NotificationHandler(channel, this);
Selector selector;
@ -300,10 +298,11 @@ class SctpWorker implements Runnable {
boolean messageReceived = false;
boolean failure = true;
MessageInfo messageInfo = null;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
MessageInfo messageInfo = channel.sctpChannel.receive(bb, this, notificationHandler);
messageInfo = channel.socket.receive(bb, this, notificationHandler);
if (messageInfo != null) {
messageReceived = true;
if (messageInfo.isComplete()) {
@ -338,12 +337,12 @@ class SctpWorker implements Runnable {
predictor.previousReceiveBufferSize(receivedBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
fireMessageReceived(channel, new SctpMessage(messageInfo.streamNumber(), messageInfo.payloadProtocolID(), buffer));
} else {
recvBufferPool.release(bb);
}
if (channel.sctpChannel.isBlocking() && !messageReceived || failure) {
if (channel.socket.isBlocking() && !messageReceived || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
@ -435,7 +434,7 @@ class SctpWorker implements Runnable {
long writtenBytes = 0;
final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
final com.sun.nio.sctp.SctpChannel ch = channel.sctpChannel;
final com.sun.nio.sctp.SctpChannel ch = channel.socket;
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
@ -459,7 +458,7 @@ class SctpWorker implements Runnable {
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i--) {
localWrittenBytes = buf.transferTo(ch, payloadProtocolId, 0);
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
@ -522,7 +521,7 @@ class SctpWorker implements Runnable {
private void setOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector);
SelectionKey key = channel.socket.keyFor(selector);
if (key == null) {
return;
}
@ -545,7 +544,7 @@ class SctpWorker implements Runnable {
private void clearOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector);
SelectionKey key = channel.socket.keyFor(selector);
if (key == null) {
return;
}
@ -570,7 +569,7 @@ class SctpWorker implements Runnable {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.sctpChannel.close();
channel.socket.close();
cancelledKeys++;
if (channel.setClosed()) {
@ -654,7 +653,7 @@ class SctpWorker implements Runnable {
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
Selector selector = this.selector;
SelectionKey key = channel.sctpChannel.keyFor(selector);
SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) {
// Not registered to the worker yet.
@ -749,11 +748,11 @@ class SctpWorker implements Runnable {
try {
if (server) {
channel.sctpChannel.configureBlocking(false);
channel.socket.configureBlocking(false);
}
synchronized (channel.interestOpsLock) {
channel.sctpChannel.register(
channel.socket.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {

View File

@ -0,0 +1,69 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpClientSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class SctpClient {
public static void main(String[] args) throws Exception {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new SctpClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new SctpClientHandler());
}
});
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 2955), new InetSocketAddress("localhost", 2956));
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SctpMessage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public class SctpClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
SctpClientHandler.class.getName());
/**
* Creates a client-side handler.
*/
public SctpClientHandler() {
}
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
e.getChannel().write(new SctpMessage(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes())));
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.sctp.SctpServerSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* Echoes back any received data from a client.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*
*/
public class SctpServer {
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new SctpServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SctpServerHandler());
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress("localhost", 2955));
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handler implementation for the echo server.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Jestan Nirojan
*
* @version $Rev$, $Date$
*/
public class SctpServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
SctpServerHandler.class.getName());
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}