Add support for SCTP 'unordered' flag.
Motivation: Some SCTP applications require the SCTP unordered flag. This flag was not exposed by Netty so applications were unable to use it. Modifications: - Add unordered flag to SctpMessage. - {Nio,Oio}SctpChannel pass unordered flag to MessageInfo on write. - SctpOutboundByteStreamHandler may optionally request unordered delivery for all outbound messages. - Added test case to SctpEchoTest using unordered flag. Result: Fixes #3698. New constructors and methods added to SctpMessage and SctpOutboundByteStreamHandler, but changes are backward compatible.
This commit is contained in:
parent
2c2b89d30f
commit
44d64ce039
@ -53,10 +53,20 @@ public class SctpEchoTest extends AbstractSctpTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testSimpleEcho0(sb, cb);
|
testSimpleEcho0(sb, cb, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
@Test
|
||||||
|
public void testSimpleEchoUnordered() throws Throwable {
|
||||||
|
Assume.assumeTrue(TestUtils.isSctpSupported());
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
testSimpleEcho0(sb, cb, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable {
|
||||||
final EchoHandler sh = new EchoHandler();
|
final EchoHandler sh = new EchoHandler();
|
||||||
final EchoHandler ch = new EchoHandler();
|
final EchoHandler ch = new EchoHandler();
|
||||||
|
|
||||||
@ -66,7 +76,7 @@ public class SctpEchoTest extends AbstractSctpTest {
|
|||||||
c.pipeline().addLast(
|
c.pipeline().addLast(
|
||||||
new SctpMessageCompletionHandler(),
|
new SctpMessageCompletionHandler(),
|
||||||
new SctpInboundByteStreamHandler(0, 0),
|
new SctpInboundByteStreamHandler(0, 0),
|
||||||
new SctpOutboundByteStreamHandler(0, 0),
|
new SctpOutboundByteStreamHandler(0, 0, unordered),
|
||||||
sh);
|
sh);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -76,7 +86,7 @@ public class SctpEchoTest extends AbstractSctpTest {
|
|||||||
c.pipeline().addLast(
|
c.pipeline().addLast(
|
||||||
new SctpMessageCompletionHandler(),
|
new SctpMessageCompletionHandler(),
|
||||||
new SctpInboundByteStreamHandler(0, 0),
|
new SctpInboundByteStreamHandler(0, 0),
|
||||||
new SctpOutboundByteStreamHandler(0, 0),
|
new SctpOutboundByteStreamHandler(0, 0, unordered),
|
||||||
ch);
|
ch);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -26,6 +26,7 @@ import io.netty.buffer.DefaultByteBufHolder;
|
|||||||
public final class SctpMessage extends DefaultByteBufHolder {
|
public final class SctpMessage extends DefaultByteBufHolder {
|
||||||
private final int streamIdentifier;
|
private final int streamIdentifier;
|
||||||
private final int protocolIdentifier;
|
private final int protocolIdentifier;
|
||||||
|
private final boolean unordered;
|
||||||
|
|
||||||
private final MessageInfo msgInfo;
|
private final MessageInfo msgInfo;
|
||||||
|
|
||||||
@ -36,9 +37,21 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
* @param payloadBuffer channel buffer
|
* @param payloadBuffer channel buffer
|
||||||
*/
|
*/
|
||||||
public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) {
|
public SctpMessage(int protocolIdentifier, int streamIdentifier, ByteBuf payloadBuffer) {
|
||||||
|
this(protocolIdentifier, streamIdentifier, false, payloadBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Essential data that is being carried within SCTP Data Chunk
|
||||||
|
* @param protocolIdentifier of payload
|
||||||
|
* @param streamIdentifier that you want to send the payload
|
||||||
|
* @param unordered if {@literal true}, the SCTP Data Chunk will be sent with the U (unordered) flag set.
|
||||||
|
* @param payloadBuffer channel buffer
|
||||||
|
*/
|
||||||
|
public SctpMessage(int protocolIdentifier, int streamIdentifier, boolean unordered, ByteBuf payloadBuffer) {
|
||||||
super(payloadBuffer);
|
super(payloadBuffer);
|
||||||
this.protocolIdentifier = protocolIdentifier;
|
this.protocolIdentifier = protocolIdentifier;
|
||||||
this.streamIdentifier = streamIdentifier;
|
this.streamIdentifier = streamIdentifier;
|
||||||
|
this.unordered = unordered;
|
||||||
msgInfo = null;
|
msgInfo = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,6 +68,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
this.msgInfo = msgInfo;
|
this.msgInfo = msgInfo;
|
||||||
streamIdentifier = msgInfo.streamNumber();
|
streamIdentifier = msgInfo.streamNumber();
|
||||||
protocolIdentifier = msgInfo.payloadProtocolID();
|
protocolIdentifier = msgInfo.payloadProtocolID();
|
||||||
|
unordered = msgInfo.isUnordered();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -71,6 +85,13 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
return protocolIdentifier;
|
return protocolIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return the unordered flag
|
||||||
|
*/
|
||||||
|
public boolean isUnordered() {
|
||||||
|
return unordered;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the {@link MessageInfo} for inbound messages or {@code null} for
|
* Return the {@link MessageInfo} for inbound messages or {@code null} for
|
||||||
* outbound messages.
|
* outbound messages.
|
||||||
@ -111,6 +132,10 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (unordered != sctpFrame.unordered) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (!content().equals(sctpFrame.content())) {
|
if (!content().equals(sctpFrame.content())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -129,7 +154,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
@Override
|
@Override
|
||||||
public SctpMessage copy() {
|
public SctpMessage copy() {
|
||||||
if (msgInfo == null) {
|
if (msgInfo == null) {
|
||||||
return new SctpMessage(protocolIdentifier, streamIdentifier, content().copy());
|
return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().copy());
|
||||||
} else {
|
} else {
|
||||||
return new SctpMessage(msgInfo, content().copy());
|
return new SctpMessage(msgInfo, content().copy());
|
||||||
}
|
}
|
||||||
@ -138,7 +163,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
@Override
|
@Override
|
||||||
public SctpMessage duplicate() {
|
public SctpMessage duplicate() {
|
||||||
if (msgInfo == null) {
|
if (msgInfo == null) {
|
||||||
return new SctpMessage(protocolIdentifier, streamIdentifier, content().duplicate());
|
return new SctpMessage(protocolIdentifier, streamIdentifier, unordered, content().duplicate());
|
||||||
} else {
|
} else {
|
||||||
return new SctpMessage(msgInfo, content().copy());
|
return new SctpMessage(msgInfo, content().copy());
|
||||||
}
|
}
|
||||||
@ -161,10 +186,12 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
|||||||
if (refCnt() == 0) {
|
if (refCnt() == 0) {
|
||||||
return "SctpFrame{" +
|
return "SctpFrame{" +
|
||||||
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
|
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
|
||||||
|
", unordered=" + unordered +
|
||||||
", data=(FREED)}";
|
", data=(FREED)}";
|
||||||
}
|
}
|
||||||
return "SctpFrame{" +
|
return "SctpFrame{" +
|
||||||
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
|
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
|
||||||
|
", unordered=" + unordered +
|
||||||
", data=" + ByteBufUtil.hexDump(content()) + '}';
|
", data=" + ByteBufUtil.hexDump(content()) + '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -320,6 +320,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
|||||||
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
||||||
mi.payloadProtocolID(packet.protocolIdentifier());
|
mi.payloadProtocolID(packet.protocolIdentifier());
|
||||||
mi.streamNumber(packet.streamIdentifier());
|
mi.streamNumber(packet.streamIdentifier());
|
||||||
|
mi.unordered(packet.isUnordered());
|
||||||
|
|
||||||
final int writtenBytes = javaChannel().send(nioData, mi);
|
final int writtenBytes = javaChannel().send(nioData, mi);
|
||||||
return writtenBytes > 0;
|
return writtenBytes > 0;
|
||||||
@ -334,7 +335,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
|||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), newDirectBuffer(m, buf));
|
return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
|
||||||
|
newDirectBuffer(m, buf));
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
|
@ -263,6 +263,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
|||||||
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
|
||||||
mi.payloadProtocolID(packet.protocolIdentifier());
|
mi.payloadProtocolID(packet.protocolIdentifier());
|
||||||
mi.streamNumber(packet.streamIdentifier());
|
mi.streamNumber(packet.streamIdentifier());
|
||||||
|
mi.unordered(packet.isUnordered());
|
||||||
|
|
||||||
ch.send(nioData, mi);
|
ch.send(nioData, mi);
|
||||||
written ++;
|
written ++;
|
||||||
|
@ -41,6 +41,7 @@ public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMe
|
|||||||
final int protocolIdentifier = msg.protocolIdentifier();
|
final int protocolIdentifier = msg.protocolIdentifier();
|
||||||
final int streamIdentifier = msg.streamIdentifier();
|
final int streamIdentifier = msg.streamIdentifier();
|
||||||
final boolean isComplete = msg.isComplete();
|
final boolean isComplete = msg.isComplete();
|
||||||
|
final boolean isUnordered = msg.isUnordered();
|
||||||
|
|
||||||
ByteBuf frag;
|
ByteBuf frag;
|
||||||
if (fragments.containsKey(streamIdentifier)) {
|
if (fragments.containsKey(streamIdentifier)) {
|
||||||
@ -61,6 +62,7 @@ public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMe
|
|||||||
SctpMessage assembledMsg = new SctpMessage(
|
SctpMessage assembledMsg = new SctpMessage(
|
||||||
protocolIdentifier,
|
protocolIdentifier,
|
||||||
streamIdentifier,
|
streamIdentifier,
|
||||||
|
isUnordered,
|
||||||
Unpooled.wrappedBuffer(frag, byteBuf));
|
Unpooled.wrappedBuffer(frag, byteBuf));
|
||||||
out.add(assembledMsg);
|
out.add(assembledMsg);
|
||||||
} else {
|
} else {
|
||||||
|
@ -25,23 +25,34 @@ import java.util.List;
|
|||||||
/**
|
/**
|
||||||
* A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream
|
* A ChannelHandler which transform {@link ByteBuf} to {@link SctpMessage} and send it through a specific stream
|
||||||
* with given protocol identifier.
|
* with given protocol identifier.
|
||||||
*
|
* Unordered delivery of all messages may be requested by passing unordered = true to the constructor.
|
||||||
*/
|
*/
|
||||||
public class SctpOutboundByteStreamHandler extends MessageToMessageEncoder<ByteBuf> {
|
public class SctpOutboundByteStreamHandler extends MessageToMessageEncoder<ByteBuf> {
|
||||||
private final int streamIdentifier;
|
private final int streamIdentifier;
|
||||||
private final int protocolIdentifier;
|
private final int protocolIdentifier;
|
||||||
|
private final boolean unordered;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
|
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
|
||||||
* @param protocolIdentifier supported application protocol id.
|
* @param protocolIdentifier supported application protocol id.
|
||||||
*/
|
*/
|
||||||
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) {
|
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier) {
|
||||||
|
this(streamIdentifier, protocolIdentifier, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param streamIdentifier stream number, this should be >=0 or <= max stream number of the association.
|
||||||
|
* @param protocolIdentifier supported application protocol id.
|
||||||
|
* @param unordered if {@literal true}, SCTP Data Chunks will be sent with the U (unordered) flag set.
|
||||||
|
*/
|
||||||
|
public SctpOutboundByteStreamHandler(int streamIdentifier, int protocolIdentifier, boolean unordered) {
|
||||||
this.streamIdentifier = streamIdentifier;
|
this.streamIdentifier = streamIdentifier;
|
||||||
this.protocolIdentifier = protocolIdentifier;
|
this.protocolIdentifier = protocolIdentifier;
|
||||||
|
this.unordered = unordered;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
|
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
|
||||||
out.add(new SctpMessage(streamIdentifier, protocolIdentifier, msg.retain()));
|
out.add(new SctpMessage(streamIdentifier, protocolIdentifier, unordered, msg.retain()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user