From 2f5f149b5295df9a0c77d5be5c238b947e9820bd Mon Sep 17 00:00:00 2001 From: Jestan Nirojan Date: Thu, 8 Dec 2011 00:07:55 +0530 Subject: [PATCH] 1)added some documentaion comments 2)minor refactoring --- .../sctp/DefaultSctpServerChannelConfig.java | 2 +- .../channel/socket/sctp/SctpChannel.java | 22 +++++- .../channel/socket/sctp/SctpChannelImpl.java | 20 +++--- .../socket/sctp/SctpClientPipelineSink.java | 8 +-- .../channel/socket/sctp/SctpMessage.java | 64 ----------------- .../socket/sctp/SctpNotificationEvent.java | 4 ++ .../channel/socket/sctp/SctpPayload.java | 70 +++++++++++++++++++ .../socket/sctp/SctpSendBufferPool.java | 12 ++-- .../socket/sctp/SctpServerChannel.java | 22 +++++- .../socket/sctp/SctpServerChannelImpl.java | 5 ++ .../netty/channel/socket/sctp/SctpWorker.java | 20 +++--- .../channel/socket/sctp/SelectorUtil.java | 2 +- .../netty/example/sctp/SctpClientHandler.java | 4 +- 13 files changed, 154 insertions(+), 101 deletions(-) delete mode 100644 src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java create mode 100644 src/main/java/org/jboss/netty/channel/socket/sctp/SctpPayload.java diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java index c659de0529..527b0f046c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/DefaultSctpServerChannelConfig.java @@ -24,7 +24,7 @@ import org.jboss.netty.util.internal.ConversionUtil; import java.io.IOException; /** - * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation. + * The default {@link org.jboss.netty.channel.socket.ServerSocketChannelConfig} implementation for SCTP. * * @author The Netty Project * @author Trustin Lee diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java index 060f180986..378037872c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannel.java @@ -30,18 +30,38 @@ import java.util.Set; * @author Jestan Nirojan */ public interface SctpChannel extends Channel { + + /** + * Return the primary local address of the SCTP channel. + */ @Override InetSocketAddress getLocalAddress(); + /** + * Return all local addresses of the SCTP channel. + */ Set getAllLocalAddresses(); + /** + * Returns the configuration of this channel. + */ @Override NioSctpChannelConfig getConfig(); + /** + * Return the primary remote address of the SCTP channel. + */ @Override InetSocketAddress getRemoteAddress(); - Set getRemoteAddresses(); + /** + * Return all remote addresses of the SCTP channel. + */ + Set getAllRemoteAddresses(); + + /** + * Get the underlying SCTP association + */ Association association(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java index 5b57b1df0b..d3f42fb9ef 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpChannelImpl.java @@ -44,7 +44,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { private static final int ST_CLOSED = -1; volatile int state = ST_OPEN; - final com.sun.nio.sctp.SctpChannel underlayingChannel; + final com.sun.nio.sctp.SctpChannel channel; final SctpWorker worker; private final NioSctpChannelConfig config; private volatile InetSocketAddress localAddress; @@ -66,12 +66,12 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { SendBuffer currentWriteBuffer; public SctpChannelImpl(Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, - com.sun.nio.sctp.SctpChannel underlayingChannel, SctpWorker worker) { + com.sun.nio.sctp.SctpChannel channel, SctpWorker worker) { super(parent, factory, pipeline, sink); - this.underlayingChannel = underlayingChannel; + this.channel = channel; this.worker = worker; - config = new DefaultNioSctpChannelConfig(underlayingChannel); + config = new DefaultNioSctpChannelConfig(channel); getCloseFuture().addListener(new ChannelFutureListener() { @Override @@ -91,7 +91,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { InetSocketAddress localAddress = this.localAddress; if (localAddress == null) { try { - final Iterator iterator = underlayingChannel.getAllLocalAddresses().iterator(); + final Iterator iterator = channel.getAllLocalAddresses().iterator(); if (iterator.hasNext()) { this.localAddress = localAddress = (InetSocketAddress) iterator.next(); } @@ -105,7 +105,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { @Override public Set getAllLocalAddresses() { try { - final Set allLocalAddresses = underlayingChannel.getAllLocalAddresses(); + final Set allLocalAddresses = channel.getAllLocalAddresses(); final Set addresses = new HashSet(allLocalAddresses.size()); for(SocketAddress socketAddress: allLocalAddresses) { addresses.add((InetSocketAddress) socketAddress); @@ -121,7 +121,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { InetSocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { - final Iterator iterator = underlayingChannel.getRemoteAddresses().iterator(); + final Iterator iterator = channel.getRemoteAddresses().iterator(); if (iterator.hasNext()) { this.remoteAddress = remoteAddress = (InetSocketAddress) iterator.next(); } @@ -133,9 +133,9 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { } @Override - public Set getRemoteAddresses() { + public Set getAllRemoteAddresses() { try { - final Set allLocalAddresses = underlayingChannel.getRemoteAddresses(); + final Set allLocalAddresses = channel.getRemoteAddresses(); final Set addresses = new HashSet(allLocalAddresses.size()); for(SocketAddress socketAddress: allLocalAddresses) { addresses.add((InetSocketAddress) socketAddress); @@ -149,7 +149,7 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { @Override public Association association() { try { - return underlayingChannel.association(); + return channel.association(); } catch (Throwable e) { return null; } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java index ea41c47022..d77d0a9824 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpClientPipelineSink.java @@ -110,7 +110,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { SctpClientChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.underlayingChannel.bind(localAddress); + channel.channel.bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -125,7 +125,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { final SctpClientChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.underlayingChannel.connect(remoteAddress)) { + if (channel.channel.connect(remoteAddress)) { channel.worker.register(channel, cf); } else { channel.getCloseFuture().addListener(new ChannelFutureListener() { @@ -368,7 +368,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { private void connect(SelectionKey k) { SctpClientChannel ch = (SctpClientChannel) k.attachment(); try { - if (ch.underlayingChannel.finishConnect()) { + if (ch.channel.finishConnect()) { k.cancel(); ch.worker.register(ch, ch.connectFuture); } @@ -398,7 +398,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { @Override public void run() { try { - channel.underlayingChannel.register( + channel.channel.register( boss.selector, SelectionKey.OP_CONNECT, channel); } catch (ClosedChannelException e) { channel.worker.close(channel, succeededFuture(channel)); diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java deleted file mode 100644 index 60695f3ad9..0000000000 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.jboss.netty.channel.socket.sctp; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -/** - * @author The Netty Project - * @author Jestan Nirojan - */ -public final class SctpMessage { - private final int streamNo; - private final int payloadProtocolId; - private final ChannelBuffer data; - - public SctpMessage(int streamNo, int payloadProtocolId, ChannelBuffer data) { - this.streamNo = streamNo; - this.payloadProtocolId = payloadProtocolId; - this.data = data; - } - - public int streamNumber() { - return streamNo; - } - - public int payloadProtocolId() { - return payloadProtocolId; - } - - public ChannelBuffer data() { - if (data.readable()) { - return data.slice(); - } else { - return ChannelBuffers.EMPTY_BUFFER; - } - } - - @Override - public String toString() { - return new StringBuilder(). - append("SctpMessage{"). - append("streamNo="). - append(streamNo). - append(", payloadProtocolId="). - append(payloadProtocolId). - append(", data="). - append(ChannelBuffers.hexDump(data())). - append('}').toString(); - } -} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpNotificationEvent.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpNotificationEvent.java index dc78f33440..ed7de49e1b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpNotificationEvent.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpNotificationEvent.java @@ -57,6 +57,10 @@ public class SctpNotificationEvent implements ChannelEvent { return notification; } + /** + * Return the attachment comes with SCTP notification + * Please note that, it may be null + */ public Object getValue() { return value; } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpPayload.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpPayload.java new file mode 100644 index 0000000000..7f51120326 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpPayload.java @@ -0,0 +1,70 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.sctp; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +/** + * @author The Netty Project + * @author Jestan Nirojan + */ +public final class SctpPayload { + private final int streamIdentifier; + private final int protocolIdentifier; + private final ChannelBuffer payloadBuffer; + + /** + * Essential data that is being carried within SCTP Data Chunk + * @param streamIdentifier that you want to send the payload + * @param protocolIdentifier of payload + * @param payloadBuffer channel buffer + */ + public SctpPayload(int streamIdentifier, int protocolIdentifier, ChannelBuffer payloadBuffer) { + this.streamIdentifier = streamIdentifier; + this.protocolIdentifier = protocolIdentifier; + this.payloadBuffer = payloadBuffer; + } + + public int getstreamIdentifier() { + return streamIdentifier; + } + + public int getProtocolIdentifier() { + return protocolIdentifier; + } + + public ChannelBuffer getPayloadBuffer() { + if (payloadBuffer.readable()) { + return payloadBuffer.slice(); + } else { + return ChannelBuffers.EMPTY_BUFFER; + } + } + + @Override + public String toString() { + return new StringBuilder(). + append("SctpPayload{"). + append("streamIdentifier="). + append(streamIdentifier). + append(", protocolIdentifier="). + append(protocolIdentifier). + append(", payloadBuffer="). + append(ChannelBuffers.hexDump(getPayloadBuffer())). + append('}').toString(); + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java index a0777d4ba1..3e77a4f73d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpSendBufferPool.java @@ -44,18 +44,18 @@ final class SctpSendBufferPool { } final SendBuffer acquire(Object message) { - if (message instanceof SctpMessage) { - return acquire((SctpMessage) message); + if (message instanceof SctpPayload) { + return acquire((SctpPayload) message); } else { throw new IllegalArgumentException( "unsupported message type: " + message.getClass()); } } - private final SendBuffer acquire(SctpMessage message) { - final ChannelBuffer src = message.data(); - final int streamNo = message.streamNumber(); - final int protocolId = message.payloadProtocolId(); + private final SendBuffer acquire(SctpPayload message) { + final ChannelBuffer src = message.getPayloadBuffer(); + final int streamNo = message.getstreamIdentifier(); + final int protocolId = message.getProtocolIdentifier(); final int size = src.readableBytes(); if (size == 0) { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java index e480a31332..8cc560e293 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannel.java @@ -26,17 +26,35 @@ import java.util.Set; * @author The Netty Project * @author Trustin Lee * @author Jestan Nirojan - * * @version $Rev$, $Date$ - * */ public interface SctpServerChannel extends ServerChannel { + /** + * Returns the configuration of this channel. + */ @Override SctpServerChannelConfig getConfig(); + + /** + * Return the primary local address of the SCTP server channel. + */ @Override InetSocketAddress getLocalAddress(); + /** + * Return all local addresses of the SCTP server channel. + */ Set getAllLocalAddresses(); + + /** + * Return the primary remote address of the server SCTP channel. + */ @Override InetSocketAddress getRemoteAddress(); + + + /** + * Return all remote addresses of the SCTP server channel. + */ + Set getAllRemoteAddresses(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java index cdda6cfd66..453abc99ce 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpServerChannelImpl.java @@ -117,6 +117,11 @@ class SctpServerChannelImpl extends AbstractServerChannel return null;// not available for server channel } + @Override + public Set getAllRemoteAddresses() { + return null;// not available for server channel + } + @Override public boolean isBound() { return isOpen() && bound; diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java index fe9799186f..0c8941136b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SctpWorker.java @@ -301,7 +301,7 @@ class SctpWorker implements Runnable { ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { - messageInfo = channel.underlayingChannel.receive(bb, null, notificationHandler); + messageInfo = channel.channel.receive(bb, null, notificationHandler); if (messageInfo != null) { messageReceived = true; if (messageInfo.isComplete()) { @@ -337,7 +337,7 @@ class SctpWorker implements Runnable { // Fire the event. fireMessageReceived(channel, - new SctpMessage(messageInfo.streamNumber(), + new SctpPayload(messageInfo.streamNumber(), messageInfo.payloadProtocolID(), buffer), messageInfo.address()); @@ -345,7 +345,7 @@ class SctpWorker implements Runnable { recvBufferPool.release(bb); } - if (channel.underlayingChannel.isBlocking() && !messageReceived || failure) { + if (channel.channel.isBlocking() && !messageReceived || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; @@ -437,7 +437,7 @@ class SctpWorker implements Runnable { long writtenBytes = 0; final SctpSendBufferPool sendBufferPool = this.sendBufferPool; - final com.sun.nio.sctp.SctpChannel ch = channel.underlayingChannel; + final com.sun.nio.sctp.SctpChannel ch = channel.channel; final Queue writeBuffer = channel.writeBuffer; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { @@ -524,7 +524,7 @@ class SctpWorker implements Runnable { private void setOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.underlayingChannel.keyFor(selector); + SelectionKey key = channel.channel.keyFor(selector); if (key == null) { return; } @@ -547,7 +547,7 @@ class SctpWorker implements Runnable { private void clearOpWrite(SctpChannelImpl channel) { Selector selector = this.selector; - SelectionKey key = channel.underlayingChannel.keyFor(selector); + SelectionKey key = channel.channel.keyFor(selector); if (key == null) { return; } @@ -572,7 +572,7 @@ class SctpWorker implements Runnable { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); try { - channel.underlayingChannel.close(); + channel.channel.close(); cancelledKeys++; if (channel.setClosed()) { @@ -656,7 +656,7 @@ class SctpWorker implements Runnable { // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { Selector selector = this.selector; - SelectionKey key = channel.underlayingChannel.keyFor(selector); + SelectionKey key = channel.channel.keyFor(selector); if (key == null || selector == null) { // Not registered to the worker yet. @@ -751,11 +751,11 @@ class SctpWorker implements Runnable { try { if (server) { - channel.underlayingChannel.configureBlocking(false); + channel.channel.configureBlocking(false); } synchronized (channel.interestOpsLock) { - channel.underlayingChannel.register( + channel.channel.register( selector, channel.getRawInterestOps(), channel); } if (future != null) { diff --git a/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java b/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java index af3faad7e2..51020e6ad8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java +++ b/src/main/java/org/jboss/netty/channel/socket/sctp/SelectorUtil.java @@ -35,7 +35,7 @@ final class SelectorUtil { static void select(Selector selector) throws IOException { try { - selector.select(10); + selector.select(10);// does small timeout give more throughput + less CPU usage? } catch (CancelledKeyException e) { // Harmless exception - log anyway logger.debug( diff --git a/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java index 7326b48517..c261ad05c4 100644 --- a/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java +++ b/src/main/java/org/jboss/netty/example/sctp/SctpClientHandler.java @@ -17,7 +17,7 @@ package org.jboss.netty.example.sctp; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.sctp.SctpMessage; +import org.jboss.netty.channel.socket.sctp.SctpPayload; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -47,7 +47,7 @@ public class SctpClientHandler extends SimpleChannelUpstreamHandler { */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent stateEvent) { - stateEvent.getChannel().write(new SctpMessage(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes()))); + stateEvent.getChannel().write(new SctpPayload(0, 0, ChannelBuffers.wrappedBuffer("SCTP ECHO".getBytes()))); } @Override