diff --git a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpChannel.java b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpChannel.java index 287c40041b..01ad77c0e3 100644 --- a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpChannel.java +++ b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpChannel.java @@ -16,6 +16,7 @@ package com.sun.nio.sctp; import java.io.IOException; +import java.net.InetAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.spi.AbstractSelectableChannel; @@ -45,6 +46,10 @@ public abstract class SctpChannel extends AbstractSelectableChannel { public abstract SctpChannel bind(SocketAddress local) throws IOException; public abstract boolean connect(SocketAddress remote) throws IOException; public abstract boolean finishConnect() throws IOException; + + public abstract SctpChannel bindAddress(InetAddress inetAddress) throws IOException; + public abstract SctpChannel unbindAddress(InetAddress inetAddress) throws IOException; + public abstract MessageInfo receive(ByteBuffer dst, T attachment, NotificationHandler handler) throws IOException; public abstract int send(ByteBuffer src, MessageInfo messageInfo) throws IOException; } diff --git a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpServerChannel.java b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpServerChannel.java index 772a768825..eaf617e5f7 100644 --- a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpServerChannel.java +++ b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpServerChannel.java @@ -16,6 +16,7 @@ package com.sun.nio.sctp; import java.io.IOException; +import java.net.InetAddress; import java.net.SocketAddress; import java.nio.channels.spi.AbstractSelectableChannel; import java.nio.channels.spi.SelectorProvider; @@ -41,5 +42,9 @@ public abstract class SctpServerChannel extends AbstractSelectableChannel { public abstract SctpServerChannel bind(SocketAddress local) throws IOException; public abstract SctpServerChannel bind(SocketAddress local, int backlog) throws IOException; + + public abstract SctpServerChannel bindAddress(InetAddress inetAddress) throws IOException; + public abstract SctpServerChannel unbindAddress(InetAddress inetAddress) throws IOException; + public abstract SctpChannel accept() throws IOException; } diff --git a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpStandardSocketOptions.java b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpStandardSocketOptions.java index f5418a9b31..0cbca3061f 100644 --- a/transport-sctp/src/main/java/com/sun/nio/sctp/SctpStandardSocketOptions.java +++ b/transport-sctp/src/main/java/com/sun/nio/sctp/SctpStandardSocketOptions.java @@ -34,5 +34,18 @@ public class SctpStandardSocketOptions { public static final SctpSocketOption SO_SNDBUF = null; public static class InitMaxStreams { + + public static InitMaxStreams create(int i, int i1) { + return null; + } + + public int maxInStreams() { + return 0; + } + + public int maxOutStreams() { + return 0; + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java index d2fafb6ad4..475ad752ae 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java @@ -51,7 +51,8 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann } else if (key.equals("sctpNoDelay")) { setSctpNoDelay(ConversionUtil.toBoolean(value)); } else if (key.equals("sctpInitMaxStreams")) { - setInitMaxStreams((InitMaxStreams) value); + final Integer maxInOutStreams = ConversionUtil.toInt(value); + setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams)); } else { return false; } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java index b311b31c0b..56cccf2645 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpServerChannelConfig.java @@ -49,7 +49,8 @@ public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig } if (key.equals("sctpInitMaxStreams")) { - setInitMaxStreams((InitMaxStreams) value); + final Integer maxInOutStreams = ConversionUtil.toInt(value); + setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams)); } else if (key.equals("backlog")) { setBacklog(ConversionUtil.toInt(value)); } else { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpBindAddressEvent.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpBindAddressEvent.java new file mode 100644 index 0000000000..40e71b2a4d --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpBindAddressEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.sctp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelState; +import io.netty.channel.DownstreamChannelStateEvent; + +import java.net.InetAddress; + +public class SctpBindAddressEvent extends DownstreamChannelStateEvent { + + /** + * Creates a new instance. + */ + public SctpBindAddressEvent(Channel channel, ChannelFuture future, InetAddress localAddress) { + super(channel, future, ChannelState.INTEREST_OPS, localAddress); + } + + @Override + public InetAddress getValue() { + return (InetAddress) super.getValue(); + } +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannel.java index df0b9392ac..0c9231588a 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.sctp; import com.sun.nio.sctp.Association; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.nio.NioSocketChannelConfig; @@ -58,6 +59,17 @@ public interface SctpChannel extends Channel { */ Set getAllRemoteAddresses(); + /** + * Bind a multi-homing address to the already bound channel + */ + ChannelFuture bindAddress(InetAddress localAddress); + + + /** + * Unbind a multi-homing address from a already established channel + */ + ChannelFuture unbindAddress(InetAddress localAddress); + /** * Get the underlying SCTP association */ diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java index 187197f790..e688963c6c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpChannelImpl.java @@ -17,6 +17,7 @@ package io.netty.channel.sctp; import static io.netty.channel.Channels.*; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; @@ -152,6 +153,20 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel { } } + @Override + public ChannelFuture bindAddress(InetAddress localAddress) { + ChannelFuture future = future(this); + getPipeline().sendDownstream(new SctpBindAddressEvent(this, future, localAddress)); + return future; + } + + @Override + public ChannelFuture unbindAddress(InetAddress localAddress) { + ChannelFuture future = future(this); + getPipeline().sendDownstream(new SctpUnbindAddressEvent(this, future, localAddress)); + return future; + } + @Override public Association association() { try { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java index 20f5961160..952123e038 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java @@ -19,6 +19,7 @@ import static io.netty.channel.Channels.*; import java.io.IOException; import java.net.ConnectException; +import java.net.InetAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; @@ -98,7 +99,15 @@ class SctpClientPipelineSink extends AbstractChannelSink { } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + if (event instanceof SctpBindAddressEvent) { + SctpBindAddressEvent bindAddressEvent = (SctpBindAddressEvent) event; + bindAddress(channel, bindAddressEvent.getFuture(), bindAddressEvent.getValue()); + } else if (event instanceof SctpUnbindAddressEvent) { + SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event; + unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue()); + } else { + channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + } break; } } else if (e instanceof MessageEvent) { @@ -125,6 +134,32 @@ class SctpClientPipelineSink extends AbstractChannelSink { } } + private void bindAddress( + SctpClientChannel channel, ChannelFuture future, + InetAddress localAddress) { + try { + channel.channel.bindAddress(localAddress); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + private void unbindAddress( + SctpClientChannel channel, ChannelFuture future, + InetAddress localAddress) { + try { + channel.channel.unbindAddress(localAddress); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + + private void connect( final SctpClientChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { @@ -227,7 +262,7 @@ class SctpClientPipelineSink extends AbstractChannelSink { wakenUp.set(false); try { - int selectedKeyCount = selector.select(100); + int selectedKeyCount = selector.select(500); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpFrame.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpFrame.java index 30a2919ef9..3a12600f4c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpFrame.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpFrame.java @@ -68,6 +68,41 @@ public final class SctpFrame { return msgInfo; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SctpFrame sctpFrame = (SctpFrame) o; + + if (protocolIdentifier != sctpFrame.protocolIdentifier) { + return false; + } + + if (streamIdentifier != sctpFrame.streamIdentifier) { + return false; + } + + if (!payloadBuffer.equals(sctpFrame.payloadBuffer)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = streamIdentifier; + result = 31 * result + protocolIdentifier; + result = 31 * result + payloadBuffer.hashCode(); + return result; + } + @Override public String toString() { return new StringBuilder(). diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationEvent.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationEvent.java index 9835dd24c7..0e209f0430 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationEvent.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpNotificationEvent.java @@ -62,4 +62,13 @@ public class SctpNotificationEvent implements ChannelEvent { public Object getValue() { return value; } + + @Override + public String toString() { + return "SctpNotificationEvent{" + + "channel=" + channel + + ", notification=" + notification + + ", value=" + value + + '}'; + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannel.java index 2fb186cb98..3257f080d6 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannel.java @@ -15,8 +15,10 @@ */ package io.netty.channel.sctp; +import io.netty.channel.ChannelFuture; import io.netty.channel.ServerChannel; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Set; @@ -24,6 +26,17 @@ import java.util.Set; * A SCTP {@link io.netty.channel.ServerChannel} which accepts incoming SCTP connections. */ public interface SctpServerChannel extends ServerChannel { + /** + * Bind a multi-homing address to the already bound channel + */ + ChannelFuture bindAddress(InetAddress localAddress); + + + /** + * Unbind a multi-homing address from a already established channel + */ + ChannelFuture unbindAddress(InetAddress localAddress); + /** * Returns the configuration of this channel. */ diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java index ca248906f0..64e50241f8 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerChannelImpl.java @@ -18,6 +18,7 @@ package io.netty.channel.sctp; import static io.netty.channel.Channels.*; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.Selector; @@ -31,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.logging.InternalLogger; @@ -83,6 +85,20 @@ class SctpServerChannelImpl extends AbstractServerChannel fireChannelOpen(this); } + @Override + public ChannelFuture bindAddress(InetAddress localAddress) { + ChannelFuture future = future(this); + getPipeline().sendDownstream(new SctpBindAddressEvent(this, future, localAddress)); + return future; + } + + @Override + public ChannelFuture unbindAddress(InetAddress localAddress) { + ChannelFuture future = future(this); + getPipeline().sendDownstream(new SctpUnbindAddressEvent(this, future, localAddress)); + return future; + } + @Override public SctpServerChannelConfig getConfig() { return config; diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java index 31719676fb..2dd034a9af 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java @@ -18,6 +18,7 @@ package io.netty.channel.sctp; import static io.netty.channel.Channels.*; import java.io.IOException; +import java.net.InetAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.CancelledKeyException; @@ -94,6 +95,16 @@ class SctpServerPipelineSink extends AbstractChannelSink { } else { close(channel, future); } + case INTEREST_OPS: + if (event instanceof SctpBindAddressEvent) { + SctpBindAddressEvent bindAddressEvent = (SctpBindAddressEvent) event; + bindAddress(channel, bindAddressEvent.getFuture(), bindAddressEvent.getValue()); + } + + if (event instanceof SctpUnbindAddressEvent) { + SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event; + unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue()); + } break; } } @@ -158,6 +169,30 @@ class SctpServerPipelineSink extends AbstractChannelSink { } } + private void bindAddress( + SctpServerChannelImpl channel, ChannelFuture future, + InetAddress localAddress) { + try { + channel.serverChannel.bindAddress(localAddress); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + private void unbindAddress( + SctpServerChannelImpl channel, ChannelFuture future, + InetAddress localAddress) { + try { + channel.serverChannel.unbindAddress(localAddress); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + private void close(SctpServerChannelImpl channel, ChannelFuture future) { boolean bound = channel.isBound(); try { @@ -227,7 +262,7 @@ class SctpServerPipelineSink extends AbstractChannelSink { try { for (;;) { try { - if (selector.select(100) > 0) { + if (selector.select(500) > 0) { selector.selectedKeys().clear(); } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpUnbindAddressEvent.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpUnbindAddressEvent.java new file mode 100644 index 0000000000..f6d7e828bc --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpUnbindAddressEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.sctp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelState; +import io.netty.channel.DownstreamChannelStateEvent; + +import java.net.InetAddress; + +public class SctpUnbindAddressEvent extends DownstreamChannelStateEvent { + + /** + * Creates a new instance. + */ + public SctpUnbindAddressEvent(Channel channel, ChannelFuture future, InetAddress value) { + super(channel, future, ChannelState.INTEREST_OPS, value); + } + + @Override + public InetAddress getValue() { + return (InetAddress) super.getValue(); + } +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 20b876fa57..8659e3f32b 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -762,8 +762,8 @@ class SctpWorker implements Runnable { channel.channel.register( selector, channel.getRawInterestOps(), channel); } + channel.setConnected(); if (future != null) { - channel.setConnected(); future.setSuccess(); } } catch (IOException e) { diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java new file mode 100644 index 0000000000..ace6899480 --- /dev/null +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiHomingEchoTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.sctp; + +import io.netty.bootstrap.ClientBootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.*; +import io.netty.channel.sctp.*; +import io.netty.channel.sctp.codec.SctpFrameDecoder; +import io.netty.channel.sctp.codec.SctpFrameEncoder; +import io.netty.channel.sctp.handler.SimpleSctpChannelHandler; +import io.netty.testsuite.util.SctpSocketAddresses; +import io.netty.util.internal.ExecutorUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Random; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SctpMultiHomingEchoTest { + private static final Random random = new Random(); + static final byte[] data = new byte[4096];//could not test ultra jumbo frames + + private static ExecutorService executor; + + static { + random.nextBytes(data); + } + + @BeforeClass + public static void init() { + executor = Executors.newCachedThreadPool(); + } + + @AfterClass + public static void destroy() { + ExecutorUtil.terminate(executor); + } + + protected ChannelFactory newServerSocketChannelFactory(Executor executor) { + return new SctpServerSocketChannelFactory(executor, executor); + } + + protected ChannelFactory newClientSocketChannelFactory(Executor executor) { + return new SctpClientSocketChannelFactory(executor, executor); + } + + @Test(timeout = 15000) + public void testSimpleEcho() throws Throwable { + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); + + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + + + EchoHandler sh = new EchoHandler(); + EchoHandler ch = new EchoHandler(); + + sb.getPipeline().addLast("sctp-decoder", new SctpFrameDecoder()); + sb.getPipeline().addLast("sctp-encoder", new SctpFrameEncoder()); + sb.getPipeline().addLast("handler", sh); + + cb.getPipeline().addLast("sctp-decoder", new SctpFrameDecoder()); + cb.getPipeline().addLast("sctp-encoder", new SctpFrameEncoder()); + cb.getPipeline().addLast("handler", ch); + + SctpServerChannel serverChannel = (SctpServerChannel) sb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0)); + int port = serverChannel.getLocalAddress().getPort(); + + ChannelFuture multiHomingServerBindFuture = serverChannel.bindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2)); + assertTrue(multiHomingServerBindFuture.awaitUninterruptibly().isSuccess()); + + ChannelFuture bindFuture = cb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0)); + assertTrue(bindFuture.awaitUninterruptibly().isSuccess()); + + SctpChannel clientChannel = (SctpChannel) bindFuture.getChannel(); + + //adding a muti-homing address to client channel + ChannelFuture multiHomingBindFuture = clientChannel.bindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2)); + assertTrue(multiHomingBindFuture.awaitUninterruptibly().isSuccess()); + + ChannelFuture connectFuture = clientChannel.connect(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, port)); + assertTrue(connectFuture.awaitUninterruptibly().isSuccess()); + + assertEquals("Client local addresses count should be 2", 2, clientChannel.getAllLocalAddresses().size()); + assertEquals("Client remote addresses count should be 2", 2, clientChannel.getAllRemoteAddresses().size()); + + assertEquals("Server local addresses count should be 2", 2, serverChannel.getAllLocalAddresses().size()); + + for (int i = 0; i < data.length;) { + int length = Math.min(random.nextInt(1024 * 64), data.length - i); + clientChannel.write(ChannelBuffers.wrappedBuffer(data, i, length)); + i += length; + } + + while (ch.counter < data.length) { + if (sh.exception.get() != null) { + break; + } + if (ch.exception.get() != null) { + break; + } + + try { + Thread.sleep(5); + } catch (InterruptedException e) { + // Ignore. + } + } + + while (sh.counter < data.length) { + if (sh.exception.get() != null) { + break; + } + if (ch.exception.get() != null) { + break; + } + + try { + Thread.sleep(5); + } catch (InterruptedException e) { + // Ignore. + } + } + + //removing already added muti-homing address from client channel + ChannelFuture multiHomingUnbindFuture = clientChannel.unbindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2)); + assertTrue(multiHomingUnbindFuture.awaitUninterruptibly().isSuccess()); + + ChannelFuture multiHomingServerUnbindFuture = serverChannel.unbindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2)); + assertTrue(multiHomingUnbindFuture.awaitUninterruptibly().isSuccess()); + + + sh.channel.close().awaitUninterruptibly(); + ch.channel.close().awaitUninterruptibly(); + serverChannel.close().awaitUninterruptibly(); + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + private static class EchoHandler extends SimpleSctpChannelHandler { + volatile Channel channel; + final AtomicReference exception = new AtomicReference(); + volatile int counter; + + EchoHandler() { + } + + @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + channel = e.getChannel(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + ChannelBuffer m = (ChannelBuffer) e.getMessage(); + byte[] actual = new byte[m.readableBytes()]; + m.getBytes(0, actual); + + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); + } + + if (channel.getParent() != null) { + channel.write(m); + } + + counter += actual.length; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + if (exception.compareAndSet(null, e.getCause())) { + e.getChannel().close(); + } + } + + @Override + public void sctpNotificationReceived(ChannelHandlerContext ctx, SctpNotificationEvent event) { + System.out.println("SCTP notification event received :" + event); + } + } +} diff --git a/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java new file mode 100644 index 0000000000..70db9c11db --- /dev/null +++ b/transport-sctp/src/test/java/io/netty/testsuite/transport/sctp/SctpMultiStreamingEchoTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.sctp; + +import io.netty.bootstrap.ClientBootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.*; +import io.netty.channel.sctp.SctpClientSocketChannelFactory; +import io.netty.channel.sctp.SctpFrame; +import io.netty.channel.sctp.SctpServerSocketChannelFactory; +import io.netty.channel.sctp.codec.SctpFrameDecoder; +import io.netty.channel.sctp.codec.SctpFrameEncoder; +import io.netty.testsuite.util.SctpSocketAddresses; +import io.netty.util.internal.ExecutorUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Random; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SctpMultiStreamingEchoTest { + private static final Random random = new Random(); + + static final SctpFrame [] sctpFrames = new SctpFrame [4]; + + + private static ExecutorService executor; + + static ChannelBuffer makeRandomFrame() { + byte [] data = new byte[512]; + random.nextBytes(data); + return ChannelBuffers.wrappedBuffer(data); + } + + static { + int protocolId = 3; + for(int streamNumber = 0; streamNumber <= 3; streamNumber ++) { + sctpFrames [streamNumber] = new SctpFrame(protocolId, streamNumber, makeRandomFrame()); + } + } + + @BeforeClass + public static void init() { + executor = Executors.newCachedThreadPool(); + } + + @AfterClass + public static void destroy() { + ExecutorUtil.terminate(executor); + } + + protected ChannelFactory newServerSocketChannelFactory(Executor executor) { + return new SctpServerSocketChannelFactory(executor, executor); + } + + protected ChannelFactory newClientSocketChannelFactory(Executor executor) { + return new SctpClientSocketChannelFactory(executor, executor); + } + + @Test(timeout = 10000) + public void testMultiStreamingEcho() throws Throwable { + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); + + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + cb.setOption("sctpInitMaxStreams", 4); + + EchoHandler sh = new EchoHandler(); + EchoHandler ch = new EchoHandler(); + + sb.getPipeline().addLast("handler", sh); + + cb.getPipeline().addLast("handler", ch); + + Channel sc = sb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0)); + int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); + + ChannelFuture ccf = cb.connect(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, port)); + assertTrue(ccf.awaitUninterruptibly().isSuccess()); + + Channel cc = ccf.getChannel(); + + for(SctpFrame sctpFrame: sctpFrames) { + cc.write(sctpFrame); + } + + + while (sh.counter < sctpFrames.length) { + Thread.sleep(5); + } + while (ch.counter < sctpFrames.length) { + Thread.sleep(5); + } + + assertEquals(sctpFrames.length, sh.counter); + assertEquals(sctpFrames.length, ch.counter); + + sh.channel.close().awaitUninterruptibly(); + ch.channel.close().awaitUninterruptibly(); + + + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + private static class EchoHandler extends SimpleChannelUpstreamHandler { + volatile Channel channel; + final AtomicReference exception = new AtomicReference(); + volatile int counter; + + EchoHandler() { + } + + @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + channel = e.getChannel(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + SctpFrame sctpFrame = (SctpFrame) e.getMessage(); + + assertEquals(sctpFrames[counter], sctpFrame); + + if (channel.getParent() != null) { + channel.write(sctpFrame); + } + + counter ++ ; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + if (exception.compareAndSet(null, e.getCause())) { + e.getChannel().close(); + } + } + } +} diff --git a/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java b/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java index 34467995a5..1435c414a2 100644 --- a/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java +++ b/transport-sctp/src/test/java/io/netty/testsuite/util/SctpSocketAddresses.java @@ -19,4 +19,5 @@ package io.netty.testsuite.util; public class SctpSocketAddresses { //io.netty.util.SocketAddresses.LOCALHOST interface has MTU SIZE issues with SCTP, we have to use local loop back interface for testing public final static String LOOP_BACK = "127.0.0.1"; + public final static String LOOP_BACK2 = "127.0.0.2"; }