[#861] Let SctpMessageCompletionHandler fire inboundMessageBufferUpdated() only if needed
This commit is contained in:
parent
88838413c7
commit
64351ad7d6
@ -19,14 +19,36 @@ package io.netty.handler.codec.sctp;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandler;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.socket.SctpMessage;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* {@link ChannelInboundMessageHandlerAdapter} which will take care of handle fragemented {@link SctpMessage}s, so
|
||||
* only <strong>complete</strong> {@link SctpMessage}s will be forwarded to the next
|
||||
* {@link ChannelInboundMessageHandler}.
|
||||
*/
|
||||
public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAdapter<SctpMessage> {
|
||||
private final Map<Integer, ByteBuf> fragments = new HashMap<Integer, ByteBuf>();
|
||||
private boolean assembled;
|
||||
|
||||
@Override
|
||||
protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||
assembled = false;
|
||||
return super.beginMessageReceived(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||
if (assembled) {
|
||||
assembled = false;
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
super.endMessageReceived(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
|
||||
@ -46,7 +68,7 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
|
||||
|
||||
if (isComplete && !frag.readable()) {
|
||||
//data chunk is not fragmented
|
||||
fireAssembledMessage(ctx, msg);
|
||||
handleAssembledMessage(ctx, msg);
|
||||
} else if (!isComplete && frag.readable()) {
|
||||
//more message to complete
|
||||
fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
|
||||
@ -57,15 +79,15 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
|
||||
protocolIdentifier,
|
||||
streamIdentifier,
|
||||
Unpooled.wrappedBuffer(frag, byteBuf));
|
||||
fireAssembledMessage(ctx, assembledMsg);
|
||||
handleAssembledMessage(ctx, assembledMsg);
|
||||
} else {
|
||||
//first incomplete message
|
||||
fragments.put(streamIdentifier, byteBuf);
|
||||
}
|
||||
}
|
||||
|
||||
protected void fireAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) {
|
||||
private void handleAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) {
|
||||
ctx.nextInboundMessageBuffer().add(assembledMsg);
|
||||
ctx.fireInboundBufferUpdated();
|
||||
assembled = true;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user