From 6c025b2044728d930f15ea5f822da9cffd27645b Mon Sep 17 00:00:00 2001 From: Ben Evans Date: Wed, 29 Apr 2015 14:31:29 +0100 Subject: [PATCH] Add support for SCTP 'unordered' flag. Motivation: Some SCTP applications require the SCTP unordered flag. This flag was not exposed by Netty so applications were unable to use it. Modifications: - Add unordered flag to SctpMessage. - {Nio,Oio}SctpChannel pass unordered flag to MessageInfo on write. - SctpOutboundByteStreamHandler may optionally request unordered delivery for all outbound messages. - Added test case to SctpEchoTest using unordered flag. Result: Fixes #3698. New constructors and methods added to SctpMessage and SctpOutboundByteStreamHandler, but changes are backward compatible. --- .../transport/sctp/SctpEchoTest.java | 18 ++++++++--- .../io/netty/channel/sctp/SctpMessage.java | 31 +++++++++++++++++-- .../channel/sctp/nio/NioSctpChannel.java | 4 ++- .../channel/sctp/oio/OioSctpChannel.java | 1 + .../sctp/SctpMessageCompletionHandler.java | 2 ++ .../sctp/SctpOutboundByteStreamHandler.java | 15 +++++++-- 6 files changed, 62 insertions(+), 9 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java index 0b126b8a13..067d8e6a67 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java @@ -53,10 +53,20 @@ public class SctpEchoTest extends AbstractSctpTest { } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb); + testSimpleEcho0(sb, cb, false); } - private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb) throws Throwable { + @Test + public void testSimpleEchoUnordered() throws Throwable { + Assume.assumeTrue(TestUtils.isSctpSupported()); + run(); + } + + public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, true); + } + + private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable { final EchoHandler sh = new EchoHandler(); final EchoHandler ch = new EchoHandler(); @@ -66,7 +76,7 @@ public class SctpEchoTest extends AbstractSctpTest { c.pipeline().addLast( new SctpMessageCompletionHandler(), new SctpInboundByteStreamHandler(0, 0), - new SctpOutboundByteStreamHandler(0, 0), + new SctpOutboundByteStreamHandler(0, 0, unordered), sh); } }); @@ -76,7 +86,7 @@ public class SctpEchoTest extends AbstractSctpTest { c.pipeline().addLast( new SctpMessageCompletionHandler(), new SctpInboundByteStreamHandler(0, 0), - new SctpOutboundByteStreamHandler(0, 0), + new SctpOutboundByteStreamHandler(0, 0, unordered), ch); } }); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpMessage.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpMessage.java index 221a159e38..d602ee50e1 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpMessage.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpMessage.java @@ -26,6 +26,7 @@ import io.netty.buffer.DefaultByteBufHolder; public final class SctpMessage extends DefaultByteBufHolder { private final int streamIdentifier; private final int protocolIdentifier; + private final boolean unordered; private final MessageInfo msgInfo; @@ -36,9 +37,21 @@ public final class SctpMessage extends DefaultByteBufHolder { * @param payloadBuffer channel buffer */ public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) { + this(protocolIdentifier, streamIdentifier, false, payloadBuffer); + } + + /** + * Essential data that is being carried within SCTP Data Chunk + * @param protocolIdentifier of payload + * @param streamIdentifier that you want to send the payload + * @param unordered if {@literal true}, the SCTP Data Chunk will be sent with the U (unordered) flag set. + * @param payloadBuffer channel buffer + */ + public SctpMessage(int protocolIdentifier, int streamIdentifier, boolean unordered, ByteBuf payloadBuffer) { super(payloadBuffer); this.protocolIdentifier = protocolIdentifier; this.streamIdentifier = streamIdentifier; + this.unordered = unordered; msgInfo = null; } @@ -55,6 +68,7 @@ public final class SctpMessage extends DefaultByteBufHolder { this.msgInfo = msgInfo; streamIdentifier = msgInfo.streamNumber(); protocolIdentifier = msgInfo.payloadProtocolID(); + unordered = msgInfo.isUnordered(); } /** @@ -71,6 +85,13 @@ public final class SctpMessage extends DefaultByteBufHolder { return protocolIdentifier; } + /** + * return the unordered flag + */ + public boolean isUnordered() { + return unordered; + } + /** * Return the {@link MessageInfo} for inbound messages or {@code null} for * outbound messages. @@ -111,6 +132,10 @@ public final class SctpMessage extends DefaultByteBufHolder { return false; } + if (unordered != sctpFrame.unordered) { + return false; + } + return content().equals(sctpFrame.content()); } @@ -125,7 +150,7 @@ public final class SctpMessage extends DefaultByteBufHolder { @Override public SctpMessage copy() { if (msgInfo == null) { - return new SctpMessage(protocolIdentifier, streamIdentifier, content().copy()); + return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().copy()); } else { return new SctpMessage(msgInfo, content().copy()); } @@ -134,7 +159,7 @@ public final class SctpMessage extends DefaultByteBufHolder { @Override public SctpMessage duplicate() { if (msgInfo == null) { - return new SctpMessage(protocolIdentifier, streamIdentifier, content().duplicate()); + return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().duplicate()); } else { return new SctpMessage(msgInfo, content().copy()); } @@ -169,10 +194,12 @@ public final class SctpMessage extends DefaultByteBufHolder { if (refCnt() == 0) { return "SctpFrame{" + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + + ", unordered=" + unordered + ", data=(FREED)}"; } return "SctpFrame{" + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + + ", unordered=" + unordered + ", data=" + ByteBufUtil.hexDump(content()) + '}'; } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index e339e781b9..7dfdc7a044 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -315,6 +315,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); + mi.unordered(packet.isUnordered()); final int writtenBytes = javaChannel().send(nioData, mi); return writtenBytes > 0; @@ -329,7 +330,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return m; } - return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf)); + return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(), + newDirectBuffer(m, buf)); } throw new UnsupportedOperationException( diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index c10a9bfb0d..8abedfa308 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -258,6 +258,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); + mi.unordered(packet.isUnordered()); ch.send(nioData, mi); written ++; diff --git a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java index 30559e501f..01f5ae3c4f 100644 --- a/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java +++ b/transport-sctp/src/main/java/io/netty/handler/codec/sctp/SctpMessageCompletionHandler.java @@ -41,6 +41,7 @@ public class SctpMessageCompletionHandler extends MessageToMessageDecoder { private final int streamIdentifier; private final int protocolIdentifier; + private final boolean unordered; /** * @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association. * @param protocolIdentifier supported application protocol id. */ public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) { + this(streamIdentifier, protocolIdentifier, false); + } + + /** + * @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association. + * @param protocolIdentifier supported application protocol id. + * @param unordered if {@literal true}, SCTP Data Chunks will be sent with the U (unordered) flag set. + */ + public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier, boolean unordered) { this.streamIdentifier = streamIdentifier; this.protocolIdentifier = protocolIdentifier; + this.unordered = unordered; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { - out.add(new SctpMessage(streamIdentifier, protocolIdentifier, msg.retain())); + out.add(new SctpMessage(streamIdentifier, protocolIdentifier, unordered, msg.retain())); } }