Add support for SCTP 'unordered' flag.

Motivation:

Some SCTP applications require the SCTP unordered flag.
This flag was not exposed by Netty so applications were unable
to use it.

Modifications:

- Add unordered flag to SctpMessage.
- {Nio,Oio}SctpChannel pass unordered flag to MessageInfo on write.
- SctpOutboundByteStreamHandler may optionally request unordered
  delivery for all outbound messages.
- Added test case to SctpEchoTest using unordered flag.

Result:

Fixes #3698. New constructors and methods added to SctpMessage and
SctpOutboundByteStreamHandler, but changes are backward compatible.
This commit is contained in:
Ben Evans 2015-04-29 14:31:29 +01:00 committed by Norman Maurer
parent 05f9593352
commit 6c025b2044
6 changed files with 62 additions and 9 deletions

View File

@ -53,10 +53,20 @@ public class SctpEchoTest extends AbstractSctpTest {
} }
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb); testSimpleEcho0(sb, cb, false);
} }
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb) throws Throwable { @Test
public void testSimpleEchoUnordered() throws Throwable {
Assume.assumeTrue(TestUtils.isSctpSupported());
run();
}
public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true);
}
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable {
final EchoHandler sh = new EchoHandler(); final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler(); final EchoHandler ch = new EchoHandler();
@ -66,7 +76,7 @@ public class SctpEchoTest extends AbstractSctpTest {
c.pipeline().addLast( c.pipeline().addLast(
new SctpMessageCompletionHandler(), new SctpMessageCompletionHandler(),
new SctpInboundByteStreamHandler(0, 0), new SctpInboundByteStreamHandler(0, 0),
new SctpOutboundByteStreamHandler(0, 0), new SctpOutboundByteStreamHandler(0, 0, unordered),
sh); sh);
} }
}); });
@ -76,7 +86,7 @@ public class SctpEchoTest extends AbstractSctpTest {
c.pipeline().addLast( c.pipeline().addLast(
new SctpMessageCompletionHandler(), new SctpMessageCompletionHandler(),
new SctpInboundByteStreamHandler(0, 0), new SctpInboundByteStreamHandler(0, 0),
new SctpOutboundByteStreamHandler(0, 0), new SctpOutboundByteStreamHandler(0, 0, unordered),
ch); ch);
} }
}); });

View File

@ -26,6 +26,7 @@ import io.netty.buffer.DefaultByteBufHolder;
public final class SctpMessage extends DefaultByteBufHolder { public final class SctpMessage extends DefaultByteBufHolder {
private final int streamIdentifier; private final int streamIdentifier;
private final int protocolIdentifier; private final int protocolIdentifier;
private final boolean unordered;
private final MessageInfo msgInfo; private final MessageInfo msgInfo;
@ -36,9 +37,21 @@ public final class SctpMessage extends DefaultByteBufHolder {
* @param payloadBuffer channel buffer * @param payloadBuffer channel buffer
*/ */
public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) { public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) {
this(protocolIdentifier, streamIdentifier, false, payloadBuffer);
}
/**
* Essential data that is being carried within SCTP Data Chunk
* @param protocolIdentifier of payload
* @param streamIdentifier that you want to send the payload
* @param unordered if {@literal true}, the SCTP Data Chunk will be sent with the U (unordered) flag set.
* @param payloadBuffer channel buffer
*/
public SctpMessage(int protocolIdentifier, int streamIdentifier, boolean unordered, ByteBuf payloadBuffer) {
super(payloadBuffer); super(payloadBuffer);
this.protocolIdentifier = protocolIdentifier; this.protocolIdentifier = protocolIdentifier;
this.streamIdentifier = streamIdentifier; this.streamIdentifier = streamIdentifier;
this.unordered = unordered;
msgInfo = null; msgInfo = null;
} }
@ -55,6 +68,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
this.msgInfo = msgInfo; this.msgInfo = msgInfo;
streamIdentifier = msgInfo.streamNumber(); streamIdentifier = msgInfo.streamNumber();
protocolIdentifier = msgInfo.payloadProtocolID(); protocolIdentifier = msgInfo.payloadProtocolID();
unordered = msgInfo.isUnordered();
} }
/** /**
@ -71,6 +85,13 @@ public final class SctpMessage extends DefaultByteBufHolder {
return protocolIdentifier; return protocolIdentifier;
} }
/**
* return the unordered flag
*/
public boolean isUnordered() {
return unordered;
}
/** /**
* Return the {@link MessageInfo} for inbound messages or {@code null} for * Return the {@link MessageInfo} for inbound messages or {@code null} for
* outbound messages. * outbound messages.
@ -111,6 +132,10 @@ public final class SctpMessage extends DefaultByteBufHolder {
return false; return false;
} }
if (unordered != sctpFrame.unordered) {
return false;
}
return content().equals(sctpFrame.content()); return content().equals(sctpFrame.content());
} }
@ -125,7 +150,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
@Override @Override
public SctpMessage copy() { public SctpMessage copy() {
if (msgInfo == null) { if (msgInfo == null) {
return new SctpMessage(protocolIdentifier, streamIdentifier, content().copy()); return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().copy());
} else { } else {
return new SctpMessage(msgInfo, content().copy()); return new SctpMessage(msgInfo, content().copy());
} }
@ -134,7 +159,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
@Override @Override
public SctpMessage duplicate() { public SctpMessage duplicate() {
if (msgInfo == null) { if (msgInfo == null) {
return new SctpMessage(protocolIdentifier, streamIdentifier, content().duplicate()); return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().duplicate());
} else { } else {
return new SctpMessage(msgInfo, content().copy()); return new SctpMessage(msgInfo, content().copy());
} }
@ -169,10 +194,12 @@ public final class SctpMessage extends DefaultByteBufHolder {
if (refCnt() == 0) { if (refCnt() == 0) {
return "SctpFrame{" + return "SctpFrame{" +
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
", unordered=" + unordered +
", data=(FREED)}"; ", data=(FREED)}";
} }
return "SctpFrame{" + return "SctpFrame{" +
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier + "streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
", unordered=" + unordered +
", data=" + ByteBufUtil.hexDump(content()) + '}'; ", data=" + ByteBufUtil.hexDump(content()) + '}';
} }
} }

View File

@ -315,6 +315,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier()); mi.streamNumber(packet.streamIdentifier());
mi.unordered(packet.isUnordered());
final int writtenBytes = javaChannel().send(nioData, mi); final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0; return writtenBytes > 0;
@ -329,7 +330,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
return m; return m;
} }
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf)); return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
newDirectBuffer(m, buf));
} }
throw new UnsupportedOperationException( throw new UnsupportedOperationException(

View File

@ -258,6 +258,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier()); mi.streamNumber(packet.streamIdentifier());
mi.unordered(packet.isUnordered());
ch.send(nioData, mi); ch.send(nioData, mi);
written ++; written ++;

View File

@ -41,6 +41,7 @@ public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMe
final int protocolIdentifier = msg.protocolIdentifier(); final int protocolIdentifier = msg.protocolIdentifier();
final int streamIdentifier = msg.streamIdentifier(); final int streamIdentifier = msg.streamIdentifier();
final boolean isComplete = msg.isComplete(); final boolean isComplete = msg.isComplete();
final boolean isUnordered = msg.isUnordered();
ByteBuf frag; ByteBuf frag;
if (fragments.containsKey(streamIdentifier)) { if (fragments.containsKey(streamIdentifier)) {
@ -61,6 +62,7 @@ public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMe
SctpMessage assembledMsg = new SctpMessage( SctpMessage assembledMsg = new SctpMessage(
protocolIdentifier, protocolIdentifier,
streamIdentifier, streamIdentifier,
isUnordered,
Unpooled.wrappedBuffer(frag, byteBuf)); Unpooled.wrappedBuffer(frag, byteBuf));
out.add(assembledMsg); out.add(assembledMsg);
} else { } else {

View File

@ -25,23 +25,34 @@ import java.util.List;
/** /**
* A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream * A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream
* with given protocol identifier. * with given protocol identifier.
* * Unordered delivery of all messages may be requested by passing unordered = true to the constructor.
*/ */
public class SctpOutboundByteStreamHandler extends MessageToMessageEncoder<ByteBuf> { public class SctpOutboundByteStreamHandler extends MessageToMessageEncoder<ByteBuf> {
private final int streamIdentifier; private final int streamIdentifier;
private final int protocolIdentifier; private final int protocolIdentifier;
private final boolean unordered;
/** /**
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association. * @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
* @param protocolIdentifier supported application protocol id. * @param protocolIdentifier supported application protocol id.
*/ */
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) { public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) {
this(streamIdentifier, protocolIdentifier, false);
}
/**
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
* @param protocolIdentifier supported application protocol id.
* @param unordered if {@literal true}, SCTP Data Chunks will be sent with the U (unordered) flag set.
*/
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier, boolean unordered) {
this.streamIdentifier = streamIdentifier; this.streamIdentifier = streamIdentifier;
this.protocolIdentifier = protocolIdentifier; this.protocolIdentifier = protocolIdentifier;
this.unordered = unordered;
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(new SctpMessage(streamIdentifier, protocolIdentifier, msg.retain())); out.add(new SctpMessage(streamIdentifier, protocolIdentifier, unordered, msg.retain()));
} }
} }