1)set the selector select timout values equals to tcp nio transport

2)removed unsupported sctp socket options from SctpChannelConfig classes
3)added testcases for sctp multi homing and multi streaming
This commit is contained in:
Jestan Nirojan 2012-02-17 01:52:24 +05:30
parent 97a38872f5
commit 4b99c4f4f7
19 changed files with 676 additions and 6 deletions

View File

@ -16,6 +16,7 @@
package com.sun.nio.sctp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.spi.AbstractSelectableChannel;
@ -45,6 +46,10 @@ public abstract class SctpChannel extends AbstractSelectableChannel {
public abstract SctpChannel bind(SocketAddress local) throws IOException;
public abstract boolean connect(SocketAddress remote) throws IOException;
public abstract boolean finishConnect() throws IOException;
public abstract SctpChannel bindAddress(InetAddress inetAddress) throws IOException;
public abstract SctpChannel unbindAddress(InetAddress inetAddress) throws IOException;
public abstract <T> MessageInfo receive(ByteBuffer dst, T attachment, NotificationHandler<T> handler) throws IOException;
public abstract int send(ByteBuffer src, MessageInfo messageInfo) throws IOException;
}

View File

@ -16,6 +16,7 @@
package com.sun.nio.sctp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.SelectorProvider;
@ -41,5 +42,9 @@ public abstract class SctpServerChannel extends AbstractSelectableChannel {
public abstract SctpServerChannel bind(SocketAddress local) throws IOException;
public abstract SctpServerChannel bind(SocketAddress local, int backlog) throws IOException;
public abstract SctpServerChannel bindAddress(InetAddress inetAddress) throws IOException;
public abstract SctpServerChannel unbindAddress(InetAddress inetAddress) throws IOException;
public abstract SctpChannel accept() throws IOException;
}

View File

@ -34,5 +34,18 @@ public class SctpStandardSocketOptions {
public static final SctpSocketOption<Integer> SO_SNDBUF = null;
public static class InitMaxStreams {
public static InitMaxStreams create(int i, int i1) {
return null;
}
public int maxInStreams() {
return 0;
}
public int maxOutStreams() {
return 0;
}
}
}

View File

@ -51,7 +51,8 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
} else if (key.equals("sctpNoDelay")) {
setSctpNoDelay(ConversionUtil.toBoolean(value));
} else if (key.equals("sctpInitMaxStreams")) {
setInitMaxStreams((InitMaxStreams) value);
final Integer maxInOutStreams = ConversionUtil.toInt(value);
setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams));
} else {
return false;
}

View File

@ -49,7 +49,8 @@ public class DefaultSctpServerChannelConfig extends DefaultServerChannelConfig
}
if (key.equals("sctpInitMaxStreams")) {
setInitMaxStreams((InitMaxStreams) value);
final Integer maxInOutStreams = ConversionUtil.toInt(value);
setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else {

View File

@ -0,0 +1,38 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelState;
import io.netty.channel.DownstreamChannelStateEvent;
import java.net.InetAddress;
public class SctpBindAddressEvent extends DownstreamChannelStateEvent {
/**
* Creates a new instance.
*/
public SctpBindAddressEvent(Channel channel, ChannelFuture future, InetAddress localAddress) {
super(channel, future, ChannelState.INTEREST_OPS, localAddress);
}
@Override
public InetAddress getValue() {
return (InetAddress) super.getValue();
}
}

View File

@ -17,6 +17,7 @@ package io.netty.channel.sctp;
import com.sun.nio.sctp.Association;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
@ -58,6 +59,17 @@ public interface SctpChannel extends Channel {
*/
Set<InetSocketAddress> getAllRemoteAddresses();
/**
* Bind a multi-homing address to the already bound channel
*/
ChannelFuture bindAddress(InetAddress localAddress);
/**
* Unbind a multi-homing address from a already established channel
*/
ChannelFuture unbindAddress(InetAddress localAddress);
/**
* Get the underlying SCTP association
*/

View File

@ -17,6 +17,7 @@ package io.netty.channel.sctp;
import static io.netty.channel.Channels.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
@ -152,6 +153,20 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
}
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
ChannelFuture future = future(this);
getPipeline().sendDownstream(new SctpBindAddressEvent(this, future, localAddress));
return future;
}
@Override
public ChannelFuture unbindAddress(InetAddress localAddress) {
ChannelFuture future = future(this);
getPipeline().sendDownstream(new SctpUnbindAddressEvent(this, future, localAddress));
return future;
}
@Override
public Association association() {
try {

View File

@ -19,6 +19,7 @@ import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
@ -98,7 +99,15 @@ class SctpClientPipelineSink extends AbstractChannelSink {
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
if (event instanceof SctpBindAddressEvent) {
SctpBindAddressEvent bindAddressEvent = (SctpBindAddressEvent) event;
bindAddress(channel, bindAddressEvent.getFuture(), bindAddressEvent.getValue());
} else if (event instanceof SctpUnbindAddressEvent) {
SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event;
unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue());
} else {
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
}
break;
}
} else if (e instanceof MessageEvent) {
@ -125,6 +134,32 @@ class SctpClientPipelineSink extends AbstractChannelSink {
}
}
private void bindAddress(
SctpClientChannel channel, ChannelFuture future,
InetAddress localAddress) {
try {
channel.channel.bindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void unbindAddress(
SctpClientChannel channel, ChannelFuture future,
InetAddress localAddress) {
try {
channel.channel.unbindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(
final SctpClientChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
@ -227,7 +262,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
wakenUp.set(false);
try {
int selectedKeyCount = selector.select(100);
int selectedKeyCount = selector.select(500);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up

View File

@ -68,6 +68,41 @@ public final class SctpFrame {
return msgInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SctpFrame sctpFrame = (SctpFrame) o;
if (protocolIdentifier != sctpFrame.protocolIdentifier) {
return false;
}
if (streamIdentifier != sctpFrame.streamIdentifier) {
return false;
}
if (!payloadBuffer.equals(sctpFrame.payloadBuffer)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = streamIdentifier;
result = 31 * result + protocolIdentifier;
result = 31 * result + payloadBuffer.hashCode();
return result;
}
@Override
public String toString() {
return new StringBuilder().

View File

@ -62,4 +62,13 @@ public class SctpNotificationEvent implements ChannelEvent {
public Object getValue() {
return value;
}
@Override
public String toString() {
return "SctpNotificationEvent{" +
"channel=" + channel +
", notification=" + notification +
", value=" + value +
'}';
}
}

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.sctp;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ServerChannel;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
@ -24,6 +26,17 @@ import java.util.Set;
* A SCTP {@link io.netty.channel.ServerChannel} which accepts incoming SCTP connections.
*/
public interface SctpServerChannel extends ServerChannel {
/**
* Bind a multi-homing address to the already bound channel
*/
ChannelFuture bindAddress(InetAddress localAddress);
/**
* Unbind a multi-homing address from a already established channel
*/
ChannelFuture unbindAddress(InetAddress localAddress);
/**
* Returns the configuration of this channel.
*/

View File

@ -18,6 +18,7 @@ package io.netty.channel.sctp;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.Selector;
@ -31,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.logging.InternalLogger;
@ -83,6 +85,20 @@ class SctpServerChannelImpl extends AbstractServerChannel
fireChannelOpen(this);
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
ChannelFuture future = future(this);
getPipeline().sendDownstream(new SctpBindAddressEvent(this, future, localAddress));
return future;
}
@Override
public ChannelFuture unbindAddress(InetAddress localAddress) {
ChannelFuture future = future(this);
getPipeline().sendDownstream(new SctpUnbindAddressEvent(this, future, localAddress));
return future;
}
@Override
public SctpServerChannelConfig getConfig() {
return config;

View File

@ -18,6 +18,7 @@ package io.netty.channel.sctp;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
@ -94,6 +95,16 @@ class SctpServerPipelineSink extends AbstractChannelSink {
} else {
close(channel, future);
}
case INTEREST_OPS:
if (event instanceof SctpBindAddressEvent) {
SctpBindAddressEvent bindAddressEvent = (SctpBindAddressEvent) event;
bindAddress(channel, bindAddressEvent.getFuture(), bindAddressEvent.getValue());
}
if (event instanceof SctpUnbindAddressEvent) {
SctpUnbindAddressEvent unbindAddressEvent = (SctpUnbindAddressEvent) event;
unbindAddress(channel, unbindAddressEvent.getFuture(), unbindAddressEvent.getValue());
}
break;
}
}
@ -158,6 +169,30 @@ class SctpServerPipelineSink extends AbstractChannelSink {
}
}
private void bindAddress(
SctpServerChannelImpl channel, ChannelFuture future,
InetAddress localAddress) {
try {
channel.serverChannel.bindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void unbindAddress(
SctpServerChannelImpl channel, ChannelFuture future,
InetAddress localAddress) {
try {
channel.serverChannel.unbindAddress(localAddress);
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void close(SctpServerChannelImpl channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
@ -227,7 +262,7 @@ class SctpServerPipelineSink extends AbstractChannelSink {
try {
for (;;) {
try {
if (selector.select(100) > 0) {
if (selector.select(500) > 0) {
selector.selectedKeys().clear();
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelState;
import io.netty.channel.DownstreamChannelStateEvent;
import java.net.InetAddress;
public class SctpUnbindAddressEvent extends DownstreamChannelStateEvent {
/**
* Creates a new instance.
*/
public SctpUnbindAddressEvent(Channel channel, ChannelFuture future, InetAddress value) {
super(channel, future, ChannelState.INTEREST_OPS, value);
}
@Override
public InetAddress getValue() {
return (InetAddress) super.getValue();
}
}

View File

@ -762,8 +762,8 @@ class SctpWorker implements Runnable {
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
}
channel.setConnected();
if (future != null) {
channel.setConnected();
future.setSuccess();
}
} catch (IOException e) {

View File

@ -0,0 +1,222 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.sctp;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.*;
import io.netty.channel.sctp.*;
import io.netty.channel.sctp.codec.SctpFrameDecoder;
import io.netty.channel.sctp.codec.SctpFrameEncoder;
import io.netty.channel.sctp.handler.SimpleSctpChannelHandler;
import io.netty.testsuite.util.SctpSocketAddresses;
import io.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SctpMultiHomingEchoTest {
private static final Random random = new Random();
static final byte[] data = new byte[4096];//could not test ultra jumbo frames
private static ExecutorService executor;
static {
random.nextBytes(data);
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new SctpServerSocketChannelFactory(executor, executor);
}
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new SctpClientSocketChannelFactory(executor, executor);
}
@Test(timeout = 15000)
public void testSimpleEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("sctp-decoder", new SctpFrameDecoder());
sb.getPipeline().addLast("sctp-encoder", new SctpFrameEncoder());
sb.getPipeline().addLast("handler", sh);
cb.getPipeline().addLast("sctp-decoder", new SctpFrameDecoder());
cb.getPipeline().addLast("sctp-encoder", new SctpFrameEncoder());
cb.getPipeline().addLast("handler", ch);
SctpServerChannel serverChannel = (SctpServerChannel) sb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0));
int port = serverChannel.getLocalAddress().getPort();
ChannelFuture multiHomingServerBindFuture = serverChannel.bindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2));
assertTrue(multiHomingServerBindFuture.awaitUninterruptibly().isSuccess());
ChannelFuture bindFuture = cb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0));
assertTrue(bindFuture.awaitUninterruptibly().isSuccess());
SctpChannel clientChannel = (SctpChannel) bindFuture.getChannel();
//adding a muti-homing address to client channel
ChannelFuture multiHomingBindFuture = clientChannel.bindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2));
assertTrue(multiHomingBindFuture.awaitUninterruptibly().isSuccess());
ChannelFuture connectFuture = clientChannel.connect(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, port));
assertTrue(connectFuture.awaitUninterruptibly().isSuccess());
assertEquals("Client local addresses count should be 2", 2, clientChannel.getAllLocalAddresses().size());
assertEquals("Client remote addresses count should be 2", 2, clientChannel.getAllRemoteAddresses().size());
assertEquals("Server local addresses count should be 2", 2, serverChannel.getAllLocalAddresses().size());
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
clientChannel.write(ChannelBuffers.wrappedBuffer(data, i, length));
i += length;
}
while (ch.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
// Ignore.
}
}
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
// Ignore.
}
}
//removing already added muti-homing address from client channel
ChannelFuture multiHomingUnbindFuture = clientChannel.unbindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2));
assertTrue(multiHomingUnbindFuture.awaitUninterruptibly().isSuccess());
ChannelFuture multiHomingServerUnbindFuture = serverChannel.unbindAddress(InetAddress.getByName(SctpSocketAddresses.LOOP_BACK2));
assertTrue(multiHomingUnbindFuture.awaitUninterruptibly().isSuccess());
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
serverChannel.close().awaitUninterruptibly();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class EchoHandler extends SimpleSctpChannelHandler {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler() {
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channel = e.getChannel();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
ChannelBuffer m = (ChannelBuffer) e.getMessage();
byte[] actual = new byte[m.readableBytes()];
m.getBytes(0, actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(data[i + lastIdx], actual[i]);
}
if (channel.getParent() != null) {
channel.write(m);
}
counter += actual.length;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
if (exception.compareAndSet(null, e.getCause())) {
e.getChannel().close();
}
}
@Override
public void sctpNotificationReceived(ChannelHandlerContext ctx, SctpNotificationEvent event) {
System.out.println("SCTP notification event received :" + event);
}
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.sctp;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.*;
import io.netty.channel.sctp.SctpClientSocketChannelFactory;
import io.netty.channel.sctp.SctpFrame;
import io.netty.channel.sctp.SctpServerSocketChannelFactory;
import io.netty.channel.sctp.codec.SctpFrameDecoder;
import io.netty.channel.sctp.codec.SctpFrameEncoder;
import io.netty.testsuite.util.SctpSocketAddresses;
import io.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SctpMultiStreamingEchoTest {
private static final Random random = new Random();
static final SctpFrame [] sctpFrames = new SctpFrame [4];
private static ExecutorService executor;
static ChannelBuffer makeRandomFrame() {
byte [] data = new byte[512];
random.nextBytes(data);
return ChannelBuffers.wrappedBuffer(data);
}
static {
int protocolId = 3;
for(int streamNumber = 0; streamNumber <= 3; streamNumber ++) {
sctpFrames [streamNumber] = new SctpFrame(protocolId, streamNumber, makeRandomFrame());
}
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new SctpServerSocketChannelFactory(executor, executor);
}
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new SctpClientSocketChannelFactory(executor, executor);
}
@Test(timeout = 10000)
public void testMultiStreamingEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
cb.setOption("sctpInitMaxStreams", 4);
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("handler", sh);
cb.getPipeline().addLast("handler", ch);
Channel sc = sb.bind(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, 0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(SctpSocketAddresses.LOOP_BACK, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess());
Channel cc = ccf.getChannel();
for(SctpFrame sctpFrame: sctpFrames) {
cc.write(sctpFrame);
}
while (sh.counter < sctpFrames.length) {
Thread.sleep(5);
}
while (ch.counter < sctpFrames.length) {
Thread.sleep(5);
}
assertEquals(sctpFrames.length, sh.counter);
assertEquals(sctpFrames.length, ch.counter);
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class EchoHandler extends SimpleChannelUpstreamHandler {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler() {
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channel = e.getChannel();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
SctpFrame sctpFrame = (SctpFrame) e.getMessage();
assertEquals(sctpFrames[counter], sctpFrame);
if (channel.getParent() != null) {
channel.write(sctpFrame);
}
counter ++ ;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
if (exception.compareAndSet(null, e.getCause())) {
e.getChannel().close();
}
}
}
}

View File

@ -19,4 +19,5 @@ package io.netty.testsuite.util;
public class SctpSocketAddresses {
//io.netty.util.SocketAddresses.LOCALHOST interface has MTU SIZE issues with SCTP, we have to use local loop back interface for testing
public final static String LOOP_BACK = "127.0.0.1";
public final static String LOOP_BACK2 = "127.0.0.2";
}