1)added some documentaion comments 2)minor refactoring

This commit is contained in:
Jestan Nirojan 2011-12-08 00:07:55 +05:30
parent 84f6f7d617
commit 2f5f149b52
13 changed files with 154 additions and 101 deletions

View File

@ -24,7 +24,7 @@ import org.jboss.netty.util.internal.ConversionUtil;
import java.io.IOException; import java.io.IOException;
/** /**
* The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation for SCTP.
* *
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>

View File

@ -30,18 +30,38 @@ import java.util.Set;
* @author <a href="http://github.com/jestan">Jestan Nirojan</a> * @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*/ */
public interface SctpChannel extends Channel { public interface SctpChannel extends Channel {
/**
* Return the primary local address of the SCTP channel.
*/
@Override @Override
InetSocketAddress getLocalAddress(); InetSocketAddress getLocalAddress();
/**
* Return all local addresses of the SCTP channel.
*/
Set<InetSocketAddress> getAllLocalAddresses(); Set<InetSocketAddress> getAllLocalAddresses();
/**
* Returns the configuration of this channel.
*/
@Override @Override
NioSctpChannelConfig getConfig(); NioSctpChannelConfig getConfig();
/**
* Return the primary remote address of the SCTP channel.
*/
@Override @Override
InetSocketAddress getRemoteAddress(); InetSocketAddress getRemoteAddress();
Set<InetSocketAddress> getRemoteAddresses();
/**
* Return all remote addresses of the SCTP channel.
*/
Set<InetSocketAddress> getAllRemoteAddresses();
/**
* Get the underlying SCTP association
*/
Association association(); Association association();
} }

View File

@ -44,7 +44,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
private static final int ST_CLOSED = -1; private static final int ST_CLOSED = -1;
volatile int state = ST_OPEN; volatile int state = ST_OPEN;
final com.sun.nio.sctp.SctpChannel underlayingChannel; final com.sun.nio.sctp.SctpChannel channel;
final SctpWorker worker; final SctpWorker worker;
private final NioSctpChannelConfig config; private final NioSctpChannelConfig config;
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
@ -66,12 +66,12 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
SendBuffer currentWriteBuffer; SendBuffer currentWriteBuffer;
public SctpChannelImpl(Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, public SctpChannelImpl(Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
com.sun.nio.sctp.SctpChannel underlayingChannel, SctpWorker worker) { com.sun.nio.sctp.SctpChannel channel, SctpWorker worker) {
super(parent, factory, pipeline, sink); super(parent, factory, pipeline, sink);
this.underlayingChannel = underlayingChannel; this.channel = channel;
this.worker = worker; this.worker = worker;
config = new DefaultNioSctpChannelConfig(underlayingChannel); config = new DefaultNioSctpChannelConfig(channel);
getCloseFuture().addListener(new ChannelFutureListener() { getCloseFuture().addListener(new ChannelFutureListener() {
@Override @Override
@ -91,7 +91,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
InetSocketAddress localAddress = this.localAddress; InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) { if (localAddress == null) {
try { try {
final Iterator<SocketAddress> iterator = underlayingChannel.getAllLocalAddresses().iterator(); final Iterator<SocketAddress> iterator = channel.getAllLocalAddresses().iterator();
if (iterator.hasNext()) { if (iterator.hasNext()) {
this.localAddress = localAddress = (InetSocketAddress) iterator.next(); this.localAddress = localAddress = (InetSocketAddress) iterator.next();
} }
@ -105,7 +105,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
@Override @Override
public Set<InetSocketAddress> getAllLocalAddresses() { public Set<InetSocketAddress> getAllLocalAddresses() {
try { try {
final Set<SocketAddress> allLocalAddresses = underlayingChannel.getAllLocalAddresses(); final Set<SocketAddress> allLocalAddresses = channel.getAllLocalAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size()); final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) { for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress); addresses.add((InetSocketAddress) socketAddress);
@ -121,7 +121,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
InetSocketAddress remoteAddress = this.remoteAddress; InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) { if (remoteAddress == null) {
try { try {
final Iterator<SocketAddress> iterator = underlayingChannel.getRemoteAddresses().iterator(); final Iterator<SocketAddress> iterator = channel.getRemoteAddresses().iterator();
if (iterator.hasNext()) { if (iterator.hasNext()) {
this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next(); this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next();
} }
@ -133,9 +133,9 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
} }
@Override @Override
public Set<InetSocketAddress> getRemoteAddresses() { public Set<InetSocketAddress> getAllRemoteAddresses() {
try { try {
final Set<SocketAddress> allLocalAddresses = underlayingChannel.getRemoteAddresses(); final Set<SocketAddress> allLocalAddresses = channel.getRemoteAddresses();
final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size()); final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
for(SocketAddress socketAddress: allLocalAddresses) { for(SocketAddress socketAddress: allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress); addresses.add((InetSocketAddress) socketAddress);
@ -149,7 +149,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
@Override @Override
public Association association() { public Association association() {
try { try {
return underlayingChannel.association(); return channel.association();
} catch (Throwable e) { } catch (Throwable e) {
return null; return null;
} }

View File

@ -110,7 +110,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
SctpClientChannel channel, ChannelFuture future, SctpClientChannel channel, ChannelFuture future,
SocketAddress localAddress) { SocketAddress localAddress) {
try { try {
channel.underlayingChannel.bind(localAddress); channel.channel.bind(localAddress);
channel.boundManually = true; channel.boundManually = true;
channel.setBound(); channel.setBound();
future.setSuccess(); future.setSuccess();
@ -125,7 +125,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
final SctpClientChannel channel, final ChannelFuture cf, final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) { SocketAddress remoteAddress) {
try { try {
if (channel.underlayingChannel.connect(remoteAddress)) { if (channel.channel.connect(remoteAddress)) {
channel.worker.register(channel, cf); channel.worker.register(channel, cf);
} else { } else {
channel.getCloseFuture().addListener(new ChannelFutureListener() { channel.getCloseFuture().addListener(new ChannelFutureListener() {
@ -368,7 +368,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
private void connect(SelectionKey k) { private void connect(SelectionKey k) {
SctpClientChannel ch = (SctpClientChannel) k.attachment(); SctpClientChannel ch = (SctpClientChannel) k.attachment();
try { try {
if (ch.underlayingChannel.finishConnect()) { if (ch.channel.finishConnect()) {
k.cancel(); k.cancel();
ch.worker.register(ch, ch.connectFuture); ch.worker.register(ch, ch.connectFuture);
} }
@ -398,7 +398,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
@Override @Override
public void run() { public void run() {
try { try {
channel.underlayingChannel.register( channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel); boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel)); channel.worker.close(channel, succeededFuture(channel));

View File

@ -1,64 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*/
public final class SctpMessage {
private final int streamNo;
private final int payloadProtocolId;
private final ChannelBuffer data;
public SctpMessage(int streamNo, int payloadProtocolId, ChannelBuffer data) {
this.streamNo = streamNo;
this.payloadProtocolId = payloadProtocolId;
this.data = data;
}
public int streamNumber() {
return streamNo;
}
public int payloadProtocolId() {
return payloadProtocolId;
}
public ChannelBuffer data() {
if (data.readable()) {
return data.slice();
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
}
@Override
public String toString() {
return new StringBuilder().
append("SctpMessage{").
append("streamNo=").
append(streamNo).
append(", payloadProtocolId=").
append(payloadProtocolId).
append(", data=").
append(ChannelBuffers.hexDump(data())).
append('}').toString();
}
}

View File

@ -57,6 +57,10 @@ public class SctpNotificationEvent implements ChannelEvent {
return notification; return notification;
} }
/**
* Return the attachment comes with SCTP notification
* Please note that, it may be null
*/
public Object getValue() { public Object getValue() {
return value; return value;
} }

View File

@ -0,0 +1,70 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.sctp;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*/
public final class SctpPayload {
private final int streamIdentifier;
private final int protocolIdentifier;
private final ChannelBuffer payloadBuffer;
/**
* Essential data that is being carried within SCTP Data Chunk
* @param streamIdentifier that you want to send the payload
* @param protocolIdentifier of payload
* @param payloadBuffer channel buffer
*/
public SctpPayload(int streamIdentifier, int protocolIdentifier, ChannelBuffer payloadBuffer) {
this.streamIdentifier = streamIdentifier;
this.protocolIdentifier = protocolIdentifier;
this.payloadBuffer = payloadBuffer;
}
public int getstreamIdentifier() {
return streamIdentifier;
}
public int getProtocolIdentifier() {
return protocolIdentifier;
}
public ChannelBuffer getPayloadBuffer() {
if (payloadBuffer.readable()) {
return payloadBuffer.slice();
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
}
@Override
public String toString() {
return new StringBuilder().
append("SctpPayload{").
append("streamIdentifier=").
append(streamIdentifier).
append(", protocolIdentifier=").
append(protocolIdentifier).
append(", payloadBuffer=").
append(ChannelBuffers.hexDump(getPayloadBuffer())).
append('}').toString();
}
}

View File

@ -44,18 +44,18 @@ final class SctpSendBufferPool {
} }
final SendBuffer acquire(Object message) { final SendBuffer acquire(Object message) {
if (message instanceof SctpMessage) { if (message instanceof SctpPayload) {
return acquire((SctpMessage) message); return acquire((SctpPayload) message);
} else { } else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unsupported message type: " + message.getClass()); "unsupported message type: " + message.getClass());
} }
} }
private final SendBuffer acquire(SctpMessage message) { private final SendBuffer acquire(SctpPayload message) {
final ChannelBuffer src = message.data(); final ChannelBuffer src = message.getPayloadBuffer();
final int streamNo = message.streamNumber(); final int streamNo = message.getstreamIdentifier();
final int protocolId = message.payloadProtocolId(); final int protocolId = message.getProtocolIdentifier();
final int size = src.readableBytes(); final int size = src.readableBytes();
if (size == 0) { if (size == 0) {

View File

@ -26,17 +26,35 @@ import java.util.Set;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author <a href="http://github.com/jestan">Jestan Nirojan</a> * @author <a href="http://github.com/jestan">Jestan Nirojan</a>
*
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*
*/ */
public interface SctpServerChannel extends ServerChannel { public interface SctpServerChannel extends ServerChannel {
/**
* Returns the configuration of this channel.
*/
@Override @Override
SctpServerChannelConfig getConfig(); SctpServerChannelConfig getConfig();
/**
* Return the primary local address of the SCTP server channel.
*/
@Override @Override
InetSocketAddress getLocalAddress(); InetSocketAddress getLocalAddress();
/**
* Return all local addresses of the SCTP server channel.
*/
Set<InetSocketAddress> getAllLocalAddresses(); Set<InetSocketAddress> getAllLocalAddresses();
/**
* Return the primary remote address of the server SCTP channel.
*/
@Override @Override
InetSocketAddress getRemoteAddress(); InetSocketAddress getRemoteAddress();
/**
* Return all remote addresses of the SCTP server channel.
*/
Set<InetSocketAddress> getAllRemoteAddresses();
} }

View File

@ -117,6 +117,11 @@ class SctpServerChannelImpl extends AbstractServerChannel
return null;// not available for server channel return null;// not available for server channel
} }
@Override
public Set<InetSocketAddress> getAllRemoteAddresses() {
return null;// not available for server channel
}
@Override @Override
public boolean isBound() { public boolean isBound() {
return isOpen() && bound; return isOpen() && bound;

View File

@ -301,7 +301,7 @@ class SctpWorker implements Runnable {
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try { try {
messageInfo = channel.underlayingChannel.receive(bb, null, notificationHandler); messageInfo = channel.channel.receive(bb, null, notificationHandler);
if (messageInfo != null) { if (messageInfo != null) {
messageReceived = true; messageReceived = true;
if (messageInfo.isComplete()) { if (messageInfo.isComplete()) {
@ -337,7 +337,7 @@ class SctpWorker implements Runnable {
// Fire the event. // Fire the event.
fireMessageReceived(channel, fireMessageReceived(channel,
new SctpMessage(messageInfo.streamNumber(), new SctpPayload(messageInfo.streamNumber(),
messageInfo.payloadProtocolID(), messageInfo.payloadProtocolID(),
buffer), buffer),
messageInfo.address()); messageInfo.address());
@ -345,7 +345,7 @@ class SctpWorker implements Runnable {
recvBufferPool.release(bb); recvBufferPool.release(bb);
} }
if (channel.underlayingChannel.isBlocking() && !messageReceived || failure) { if (channel.channel.isBlocking() && !messageReceived || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this. k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel)); close(channel, succeededFuture(channel));
return false; return false;
@ -437,7 +437,7 @@ class SctpWorker implements Runnable {
long writtenBytes = 0; long writtenBytes = 0;
final SctpSendBufferPool sendBufferPool = this.sendBufferPool; final SctpSendBufferPool sendBufferPool = this.sendBufferPool;
final com.sun.nio.sctp.SctpChannel ch = channel.underlayingChannel; final com.sun.nio.sctp.SctpChannel ch = channel.channel;
final Queue<MessageEvent> writeBuffer = channel.writeBuffer; final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final int writeSpinCount = channel.getConfig().getWriteSpinCount(); final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
@ -524,7 +524,7 @@ class SctpWorker implements Runnable {
private void setOpWrite(SctpChannelImpl channel) { private void setOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.underlayingChannel.keyFor(selector); SelectionKey key = channel.channel.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
} }
@ -547,7 +547,7 @@ class SctpWorker implements Runnable {
private void clearOpWrite(SctpChannelImpl channel) { private void clearOpWrite(SctpChannelImpl channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.underlayingChannel.keyFor(selector); SelectionKey key = channel.channel.keyFor(selector);
if (key == null) { if (key == null) {
return; return;
} }
@ -572,7 +572,7 @@ class SctpWorker implements Runnable {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.underlayingChannel.close(); channel.channel.close();
cancelledKeys++; cancelledKeys++;
if (channel.setClosed()) { if (channel.setClosed()) {
@ -656,7 +656,7 @@ class SctpWorker implements Runnable {
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.underlayingChannel.keyFor(selector); SelectionKey key = channel.channel.keyFor(selector);
if (key == null || selector == null) { if (key == null || selector == null) {
// Not registered to the worker yet. // Not registered to the worker yet.
@ -751,11 +751,11 @@ class SctpWorker implements Runnable {
try { try {
if (server) { if (server) {
channel.underlayingChannel.configureBlocking(false); channel.channel.configureBlocking(false);
} }
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
channel.underlayingChannel.register( channel.channel.register(
selector, channel.getRawInterestOps(), channel); selector, channel.getRawInterestOps(), channel);
} }
if (future != null) { if (future != null) {

View File

@ -35,7 +35,7 @@ final class SelectorUtil {
static void select(Selector selector) throws IOException { static void select(Selector selector) throws IOException {
try { try {
selector.select(10); selector.select(10);// does small timeout give more throughput + less CPU usage?
} catch (CancelledKeyException e) { } catch (CancelledKeyException e) {
// Harmless exception - log anyway // Harmless exception - log anyway
logger.debug( logger.debug(

View File

@ -17,7 +17,7 @@ package org.jboss.netty.example.sctp;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.sctp.SctpMessage; import org.jboss.netty.channel.socket.sctp.SctpPayload;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
@ -47,7 +47,7 @@ public class SctpClientHandler extends SimpleChannelUpstreamHandler {
*/ */
@Override @Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent stateEvent) { public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent stateEvent) {
stateEvent.getChannel().write(new SctpMessage(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes()))); stateEvent.getChannel().write(new SctpPayload(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes())));
} }
@Override @Override