Provide ability to extend StompSubframeEncoder and improve full stomp frame encoding (allocate one buffer for full frame considering the size of the headers) (#10778)
Motivation: At the moment `StompSubframeEncoder` encode a frame only to `ByteBuf` it is not convenient if further we need to convert it to another type of message, e.g. `WebSocketFrame`. Also, if we send a full frame, it splits into two headers and a content what makes it difficult to convert it in the next handler. Modification: Introduce additional converter methods e.g. (`Object protected convertFullFrame(StompFrame original, ByteBuf encoded`)...) for extending encoder functionality and allocate only one `ByteBuf` for full stomp frame. Change headers size calculation, previously used only 256 bytes that reallocate a new buffer each time when headers size more than this threshold. Add `StompEncoderBenchmark`. Result: Improved `StompSubframeEncoder` fro extensions. Previous version benchmark ``` Benchmark (contentLength) (headersType) (pooledAllocator) Mode Cnt Score Error Units StompEncoderBenchmark.writeStompFrame 0 ONE true thrpt 10 4432132.884 ± 178923.436 ops/s StompEncoderBenchmark.writeStompFrame 0 ONE false thrpt 10 1281122.756 ± 52484.174 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE true thrpt 10 2980897.937 ± 130253.049 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE false thrpt 10 1116883.574 ± 35471.482 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN true thrpt 10 1988012.159 ± 74352.450 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN false thrpt 10 881772.343 ± 94633.870 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN true thrpt 10 1048125.919 ± 151053.902 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN false thrpt 10 429900.066 ± 47956.661 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY true thrpt 10 660584.122 ± 104973.439 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY false thrpt 10 278255.488 ± 20143.708 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE true thrpt 10 4251498.549 ± 625050.979 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE false thrpt 10 1214006.861 ± 60421.601 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE true thrpt 10 3117736.486 ± 173613.974 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE false thrpt 10 1046605.891 ± 94428.064 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN true thrpt 10 2006986.881 ± 108456.748 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN false thrpt 10 877983.112 ± 82919.387 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN true thrpt 10 1132844.437 ± 84578.571 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN false thrpt 10 429334.649 ± 35403.161 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY true thrpt 10 657093.390 ± 48092.947 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY false thrpt 10 252140.876 ± 37337.255 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE true thrpt 10 4720507.067 ± 100993.908 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE false thrpt 10 1266182.925 ± 85888.413 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE true thrpt 10 2898746.621 ± 452579.753 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE false thrpt 10 1019555.288 ± 65640.507 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN true thrpt 10 2259187.459 ± 20025.989 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN false thrpt 10 896405.412 ± 53750.148 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN true thrpt 10 1110670.772 ± 107650.327 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN false thrpt 10 445187.398 ± 28845.959 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY true thrpt 10 611506.846 ± 25304.240 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY false thrpt 10 247687.007 ± 43471.578 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE true thrpt 10 4140949.576 ± 270274.087 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE false thrpt 10 1154515.598 ± 134413.876 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE true thrpt 10 3349996.875 ± 162309.889 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE false thrpt 10 1141040.562 ± 5895.693 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN true thrpt 10 2184632.248 ± 8957.833 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN false thrpt 10 959545.704 ± 5835.161 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN true thrpt 10 1081113.327 ± 3957.527 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN false thrpt 10 467524.660 ± 1383.236 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY true thrpt 10 568411.797 ± 108712.493 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY false thrpt 10 260764.231 ± 43149.129 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE true thrpt 10 4369787.147 ± 619367.939 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE false thrpt 10 1246782.845 ± 47468.764 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE true thrpt 10 3333328.810 ± 253061.481 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE false thrpt 10 1108278.988 ± 81905.149 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN true thrpt 10 2062961.266 ± 247096.284 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN false thrpt 10 925199.985 ± 36734.594 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN true thrpt 10 1223240.034 ± 58833.801 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN false thrpt 10 460864.117 ± 2361.459 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY true thrpt 10 655864.762 ± 35237.335 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY false thrpt 10 286388.865 ± 1002.460 ops/s ``` A new version benchmark ``` Benchmark (contentLength) (headersType) (pooledAllocator) Mode Cnt Score Error Units StompEncoderBenchmark.writeStompFrame 0 ONE true thrpt 10 4366110.018 ± 420377.867 ops/s StompEncoderBenchmark.writeStompFrame 0 ONE false thrpt 10 1289437.153 ± 215271.656 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE true thrpt 10 2818791.355 ± 218894.471 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE false thrpt 10 1040151.615 ± 75352.695 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN true thrpt 10 1842144.001 ± 94668.864 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN false thrpt 10 916742.825 ± 65467.820 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN true thrpt 10 1310454.012 ± 100747.490 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN false thrpt 10 679934.001 ± 82168.249 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY true thrpt 10 746867.549 ± 68373.269 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY false thrpt 10 483316.314 ± 50978.009 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE true thrpt 10 4791698.722 ± 263890.510 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE false thrpt 10 1289877.116 ± 128677.185 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE true thrpt 10 2984662.187 ± 395567.524 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE false thrpt 10 1079028.782 ± 43548.555 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN true thrpt 10 1806763.709 ± 59162.209 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN false thrpt 10 935274.980 ± 22064.148 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN true thrpt 10 1284172.151 ± 119068.047 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN false thrpt 10 687174.498 ± 30270.916 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY true thrpt 10 803843.483 ± 29106.133 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY false thrpt 10 502134.552 ± 23653.215 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE true thrpt 10 4337438.694 ± 378524.452 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE false thrpt 10 1289174.213 ± 50640.853 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE true thrpt 10 3232767.156 ± 311934.194 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE false thrpt 10 1115247.028 ± 15683.477 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN true thrpt 10 2213147.232 ± 86326.187 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN false thrpt 10 901120.188 ± 71344.491 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN true thrpt 10 1238317.714 ± 68148.477 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN false thrpt 10 671336.339 ± 72735.337 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY true thrpt 10 754565.791 ± 28574.382 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY false thrpt 10 498939.383 ± 38146.118 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE true thrpt 10 3722594.471 ± 515861.000 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE false thrpt 10 1265629.633 ± 84113.347 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE true thrpt 10 2829696.349 ± 172520.267 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE false thrpt 10 1111454.609 ± 26275.913 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN true thrpt 10 1901506.449 ± 37701.353 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN false thrpt 10 912528.888 ± 46221.215 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN true thrpt 10 1299674.123 ± 21889.002 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN false thrpt 10 724527.644 ± 2757.370 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY true thrpt 10 811389.799 ± 2606.626 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY false thrpt 10 504955.449 ± 6737.804 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE true thrpt 10 3837912.649 ± 380742.919 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE false thrpt 10 1375544.306 ± 3157.068 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE true thrpt 10 3224743.448 ± 297369.719 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE false thrpt 10 1125772.007 ± 4051.498 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN true thrpt 10 2127352.136 ± 106787.777 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN false thrpt 10 934848.418 ± 4564.147 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN true thrpt 10 1379672.772 ± 8778.640 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN false thrpt 10 723169.459 ± 2317.767 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY true thrpt 10 802275.113 ± 4155.137 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY false thrpt 10 517604.265 ± 3398.384 ops/s ``` For headers over 256 bytes we get a speedup.
This commit is contained in:
parent
4c86fbd967
commit
2877eef5d5
@ -19,11 +19,12 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToMessageEncoder;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import static io.netty.handler.codec.stomp.StompConstants.*;
|
||||
|
||||
/**
|
||||
* Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
|
||||
*/
|
||||
@ -32,50 +33,103 @@ public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe>
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
|
||||
if (msg instanceof StompFrame) {
|
||||
StompFrame frame = (StompFrame) msg;
|
||||
ByteBuf frameBuf = encodeFrame(frame, ctx);
|
||||
if (frame.content().isReadable()) {
|
||||
out.add(frameBuf);
|
||||
ByteBuf contentBuf = encodeContent(frame, ctx);
|
||||
out.add(contentBuf);
|
||||
} else {
|
||||
frameBuf.writeByte(StompConstants.NUL);
|
||||
out.add(frameBuf);
|
||||
}
|
||||
StompFrame stompFrame = (StompFrame) msg;
|
||||
ByteBuf buf = encodeFullFrame(stompFrame, ctx);
|
||||
|
||||
out.add(convertFullFrame(stompFrame, buf));
|
||||
} else if (msg instanceof StompHeadersSubframe) {
|
||||
StompHeadersSubframe frame = (StompHeadersSubframe) msg;
|
||||
ByteBuf buf = encodeFrame(frame, ctx);
|
||||
out.add(buf);
|
||||
StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
|
||||
ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
|
||||
encodeHeaders(stompHeadersSubframe, buf);
|
||||
|
||||
out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
|
||||
} else if (msg instanceof StompContentSubframe) {
|
||||
StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
|
||||
ByteBuf buf = encodeContent(stompContentSubframe, ctx);
|
||||
out.add(buf);
|
||||
|
||||
out.add(convertContentSubFrame(stompContentSubframe, buf));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An extension method to convert a STOMP encoded buffer to a different message type
|
||||
* based on an original {@link StompFrame} full frame.
|
||||
*
|
||||
* <p>By default an encoded buffer is returned as is.
|
||||
*/
|
||||
protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
|
||||
return encoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* An extension method to convert a STOMP encoded buffer to a different message type
|
||||
* based on an original {@link StompHeadersSubframe} headers sub frame.
|
||||
*
|
||||
* <p>By default an encoded buffer is returned as is.
|
||||
*/
|
||||
protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
|
||||
return encoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* An extension method to convert a STOMP encoded buffer to a different message type
|
||||
* based on an original {@link StompHeadersSubframe} content sub frame.
|
||||
*
|
||||
* <p>By default an encoded buffer is returned as is.
|
||||
*/
|
||||
protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
|
||||
return encoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a heuristic size for headers (32 bytes per header line) + (2 bytes for colon and eol) + (additional
|
||||
* command buffer).
|
||||
*/
|
||||
protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
|
||||
int estimatedSize = headersSubframe.headers().size() * 34 + 48;
|
||||
if (estimatedSize < 128) {
|
||||
return 128;
|
||||
} else if (estimatedSize < 256) {
|
||||
return 256;
|
||||
}
|
||||
|
||||
return estimatedSize;
|
||||
}
|
||||
|
||||
private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
|
||||
int contentReadableBytes = frame.content().readableBytes();
|
||||
ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
|
||||
encodeHeaders(frame, buf);
|
||||
|
||||
if (contentReadableBytes > 0) {
|
||||
buf.writeBytes(frame.content());
|
||||
}
|
||||
|
||||
return buf.writeByte(NUL);
|
||||
}
|
||||
|
||||
private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
|
||||
ByteBufUtil.writeUtf8(buf, frame.command().toString());
|
||||
buf.writeByte(StompConstants.LF);
|
||||
|
||||
for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
|
||||
ByteBufUtil.writeUtf8(buf, entry.getKey());
|
||||
buf.writeByte(StompConstants.COLON);
|
||||
ByteBufUtil.writeUtf8(buf, entry.getValue());
|
||||
buf.writeByte(StompConstants.LF);
|
||||
}
|
||||
|
||||
buf.writeByte(StompConstants.LF);
|
||||
}
|
||||
|
||||
private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
|
||||
if (content instanceof LastStompContentSubframe) {
|
||||
ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
|
||||
buf.writeBytes(content.content());
|
||||
buf.writeByte(StompConstants.NUL);
|
||||
return buf;
|
||||
} else {
|
||||
return content.content().retain();
|
||||
}
|
||||
}
|
||||
|
||||
private static ByteBuf encodeFrame(StompHeadersSubframe frame, ChannelHandlerContext ctx) {
|
||||
ByteBuf buf = ctx.alloc().buffer();
|
||||
|
||||
buf.writeCharSequence(frame.command().toString(), CharsetUtil.UTF_8);
|
||||
buf.writeByte(StompConstants.LF);
|
||||
for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
|
||||
ByteBufUtil.writeUtf8(buf, entry.getKey());
|
||||
buf.writeByte(StompConstants.COLON);
|
||||
ByteBufUtil.writeUtf8(buf, entry.getValue());
|
||||
buf.writeByte(StompConstants.LF);
|
||||
}
|
||||
buf.writeByte(StompConstants.LF);
|
||||
return buf;
|
||||
return content.content().retain();
|
||||
}
|
||||
}
|
||||
|
@ -76,9 +76,7 @@ public class StompSubframeEncoderTest {
|
||||
|
||||
channel.writeOutbound(frame);
|
||||
|
||||
ByteBuf headers = channel.readOutbound();
|
||||
ByteBuf content = channel.readOutbound();
|
||||
ByteBuf fullFrame = Unpooled.wrappedBuffer(headers, content);
|
||||
ByteBuf fullFrame = channel.readOutbound();
|
||||
assertEquals(SEND_FRAME_UTF8, fullFrame.toString(CharsetUtil.UTF_8));
|
||||
assertTrue(fullFrame.release());
|
||||
}
|
||||
|
@ -16,13 +16,15 @@
|
||||
package io.netty.example.stomp.websocket;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
|
||||
import io.netty.handler.codec.stomp.LastStompContentSubframe;
|
||||
import io.netty.handler.codec.stomp.StompContentSubframe;
|
||||
import io.netty.handler.codec.stomp.StompFrame;
|
||||
import io.netty.handler.codec.stomp.StompHeaders;
|
||||
import io.netty.handler.codec.stomp.StompHeadersSubframe;
|
||||
import io.netty.handler.codec.stomp.StompSubframe;
|
||||
import io.netty.handler.codec.stomp.StompSubframeEncoder;
|
||||
@ -34,35 +36,37 @@ public class StompWebSocketFrameEncoder extends StompSubframeEncoder {
|
||||
@Override
|
||||
public void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
|
||||
super.encode(ctx, msg, out);
|
||||
|
||||
if (out.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final WebSocketFrame webSocketFrame;
|
||||
if (msg instanceof StompFrame) {
|
||||
if (out.size() == 1) {
|
||||
webSocketFrame = new TextWebSocketFrame(getFirst(out));
|
||||
} else {
|
||||
CompositeByteBuf content = ctx.alloc().compositeBuffer(out.size());
|
||||
for (Object byteBuf : out) {
|
||||
content.addComponent(true, (ByteBuf) byteBuf);
|
||||
}
|
||||
webSocketFrame = new TextWebSocketFrame(content);
|
||||
}
|
||||
} else if (msg instanceof StompHeadersSubframe) {
|
||||
webSocketFrame = new TextWebSocketFrame(false, 0, getFirst(out));
|
||||
} else if (msg instanceof LastStompContentSubframe) {
|
||||
webSocketFrame = new ContinuationWebSocketFrame(true, 0, getFirst(out));
|
||||
} else {
|
||||
webSocketFrame = new ContinuationWebSocketFrame(false, 0, getFirst(out));
|
||||
}
|
||||
|
||||
out.clear();
|
||||
out.add(webSocketFrame);
|
||||
}
|
||||
|
||||
private static ByteBuf getFirst(List<Object> container) {
|
||||
return (ByteBuf) container.get(0);
|
||||
@Override
|
||||
protected WebSocketFrame convertFullFrame(StompFrame original, ByteBuf encoded) {
|
||||
if (isTextFrame(original)) {
|
||||
return new TextWebSocketFrame(encoded);
|
||||
}
|
||||
|
||||
return new BinaryWebSocketFrame(encoded);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WebSocketFrame convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
|
||||
if (isTextFrame(original)) {
|
||||
return new TextWebSocketFrame(false, 0, encoded);
|
||||
}
|
||||
|
||||
return new BinaryWebSocketFrame(false, 0, encoded);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WebSocketFrame convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
|
||||
if (original instanceof LastStompContentSubframe) {
|
||||
return new ContinuationWebSocketFrame(true, 0, encoded);
|
||||
}
|
||||
|
||||
return new ContinuationWebSocketFrame(false, 0, encoded);
|
||||
}
|
||||
|
||||
private static boolean isTextFrame(StompHeadersSubframe headersSubframe) {
|
||||
String contentType = headersSubframe.headers().getAsString(StompHeaders.CONTENT_TYPE);
|
||||
return contentType != null && (contentType.startsWith("text") || contentType.startsWith("application/json"));
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package io.netty.example.stomp.websocket;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToMessageCodec;
|
||||
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
|
||||
@ -58,7 +59,7 @@ public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocket
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {
|
||||
if (webSocketFrame instanceof TextWebSocketFrame) {
|
||||
if (webSocketFrame instanceof TextWebSocketFrame || webSocketFrame instanceof BinaryWebSocketFrame) {
|
||||
ctx.fireChannelRead(webSocketFrame.content().retain());
|
||||
} else {
|
||||
ctx.close();
|
||||
|
@ -157,6 +157,11 @@
|
||||
<artifactId>netty-codec-mqtt</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec-stomp</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>netty-transport-native-epoll</artifactId>
|
||||
|
@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.microbench.stomp;
|
||||
|
||||
import io.netty.handler.codec.stomp.DefaultStompHeadersSubframe;
|
||||
import io.netty.handler.codec.stomp.StompCommand;
|
||||
import io.netty.handler.codec.stomp.StompHeaders;
|
||||
import io.netty.handler.codec.stomp.StompHeadersSubframe;
|
||||
|
||||
import java.util.EnumMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public final class ExampleStompHeadersSubframe {
|
||||
|
||||
public enum HeadersType {
|
||||
ONE,
|
||||
THREE,
|
||||
SEVEN,
|
||||
// Next encoded headers size will be more than 256 bytes
|
||||
ELEVEN,
|
||||
TWENTY
|
||||
}
|
||||
|
||||
public static final Map<HeadersType, StompHeadersSubframe> EXAMPLES =
|
||||
new EnumMap<HeadersType, StompHeadersSubframe>(
|
||||
HeadersType.class);
|
||||
|
||||
static {
|
||||
StompHeadersSubframe headersSubframe = new DefaultStompHeadersSubframe(StompCommand.RECEIPT);
|
||||
headersSubframe.headers()
|
||||
.set(StompHeaders.RECEIPT_ID, UUID.randomUUID().toString());
|
||||
EXAMPLES.put(HeadersType.ONE, headersSubframe);
|
||||
|
||||
headersSubframe = new DefaultStompHeadersSubframe(StompCommand.ERROR);
|
||||
headersSubframe.headers()
|
||||
.set(StompHeaders.RECEIPT_ID, UUID.randomUUID().toString())
|
||||
.set(StompHeaders.CONTENT_TYPE, "text/plain")
|
||||
.set(StompHeaders.MESSAGE, "malformed frame received");
|
||||
EXAMPLES.put(HeadersType.THREE, headersSubframe);
|
||||
|
||||
headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE);
|
||||
headersSubframe.headers()
|
||||
.set(StompHeaders.SUBSCRIPTION, "7")
|
||||
.set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString())
|
||||
.set(StompHeaders.DESTINATION, "/queue/chat")
|
||||
.set(StompHeaders.CONTENT_TYPE, "application/octet-stream")
|
||||
.set(StompHeaders.ACK, UUID.randomUUID().toString())
|
||||
.setLong("timestamp", System.currentTimeMillis())
|
||||
.set("Message-Type: 007");
|
||||
EXAMPLES.put(HeadersType.SEVEN, headersSubframe);
|
||||
|
||||
headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE);
|
||||
headersSubframe.headers()
|
||||
.set(StompHeaders.SUBSCRIPTION, "11")
|
||||
.set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString())
|
||||
.set(StompHeaders.DESTINATION, "/queue/chat")
|
||||
.set(StompHeaders.CONTENT_TYPE, "application/octet-stream")
|
||||
.set(StompHeaders.ACK, UUID.randomUUID().toString())
|
||||
.setLong("timestamp", System.currentTimeMillis())
|
||||
.set("Message-Type: 0011")
|
||||
.set("Strict-Transport-Security", "max-age=31536000; includeSubdomains; preload")
|
||||
.set("Server", "GitHub.com")
|
||||
.set("Expires", "Sat, 01 Jan 2000 00:00:00 GMT")
|
||||
.set("Content-Language", "en");
|
||||
EXAMPLES.put(HeadersType.ELEVEN, headersSubframe);
|
||||
|
||||
headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE);
|
||||
headersSubframe.headers()
|
||||
.set(StompHeaders.SUBSCRIPTION, "20")
|
||||
.set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString())
|
||||
.set(StompHeaders.DESTINATION, "/queue/chat")
|
||||
.set(StompHeaders.CONTENT_TYPE, "application/octet-stream")
|
||||
.set(StompHeaders.ACK, UUID.randomUUID().toString())
|
||||
.setLong("timestamp", System.currentTimeMillis())
|
||||
.set("Message-Type: 0020")
|
||||
.set("date", "Wed, 22 Apr 2015 00:40:28 GMT")
|
||||
.set("expires", "Tue, 31 Mar 1981 05:00:00 GMT")
|
||||
.set("last-modified", "Wed, 22 Apr 2015 00:40:28 GMT")
|
||||
.set("ms", "ms")
|
||||
.set("pragma", "no-cache")
|
||||
.set("server", "tsa_b")
|
||||
.set("set-cookie", "noneofyourbusiness")
|
||||
.set("strict-transport-security", "max-age=631138519")
|
||||
.set("version", "STOMP_v1.2")
|
||||
.set("x-connection-hash", "e176fe40accc1e2c613a34bc1941aa98")
|
||||
.set("x-content-type-options", "nosniff")
|
||||
.set("x-frame-options", "SAMEORIGIN")
|
||||
.set("x-transaction", "a54142ede693444d9");
|
||||
EXAMPLES.put(HeadersType.TWENTY, headersSubframe);
|
||||
}
|
||||
|
||||
private ExampleStompHeadersSubframe() {
|
||||
}
|
||||
}
|
@ -0,0 +1,110 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.microbench.stomp;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.stomp.DefaultStompFrame;
|
||||
import io.netty.handler.codec.stomp.StompFrame;
|
||||
import io.netty.handler.codec.stomp.StompHeadersSubframe;
|
||||
import io.netty.handler.codec.stomp.StompSubframeEncoder;
|
||||
import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
|
||||
import io.netty.microbench.util.AbstractMicrobenchmark;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.annotations.Threads;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.profile.GCProfiler;
|
||||
import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@Fork(value = 2)
|
||||
@Threads(1)
|
||||
@Warmup(iterations = 5)
|
||||
@Measurement(iterations = 10)
|
||||
public class StompEncoderBenchmark extends AbstractMicrobenchmark {
|
||||
|
||||
private StompSubframeEncoder stompEncoder;
|
||||
private ByteBuf content;
|
||||
private StompFrame stompFrame;
|
||||
private ChannelHandlerContext context;
|
||||
|
||||
@Param({ "true", "false" })
|
||||
public boolean pooledAllocator;
|
||||
|
||||
@Param({ "true", "false" })
|
||||
public boolean voidPromise;
|
||||
|
||||
@Param
|
||||
public ExampleStompHeadersSubframe.HeadersType headersType;
|
||||
|
||||
@Param({ "0", "100", "1000" })
|
||||
public int contentLength;
|
||||
|
||||
@Setup(Level.Trial)
|
||||
public void setup() {
|
||||
byte[] bytes = new byte[contentLength];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
content = Unpooled.wrappedBuffer(bytes);
|
||||
ByteBuf testContent = Unpooled.unreleasableBuffer(content.asReadOnly());
|
||||
|
||||
StompHeadersSubframe headersSubframe = ExampleStompHeadersSubframe.EXAMPLES.get(headersType);
|
||||
stompFrame = new DefaultStompFrame(headersSubframe.command(), testContent);
|
||||
stompFrame.headers().setAll(headersSubframe.headers());
|
||||
|
||||
stompEncoder = new StompSubframeEncoder();
|
||||
context = new EmbeddedChannelWriteReleaseHandlerContext(
|
||||
pooledAllocator? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT, stompEncoder) {
|
||||
@Override
|
||||
protected void handleException(Throwable t) {
|
||||
handleUnexpectedException(t);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@TearDown(Level.Trial)
|
||||
public void teardown() {
|
||||
content.release();
|
||||
content = null;
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void writeStompFrame() throws Exception {
|
||||
stompEncoder.write(context, stompFrame.retain(), newPromise());
|
||||
}
|
||||
|
||||
private ChannelPromise newPromise() {
|
||||
return voidPromise? context.voidPromise() : context.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChainedOptionsBuilder newOptionsBuilder() throws Exception {
|
||||
return super.newOptionsBuilder().addProfiler(GCProfiler.class);
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
/**
|
||||
* Benchmarks for {@link io.netty.handler.codec.stomp.StompSubframeEncoder}.
|
||||
*/
|
||||
package io.netty.microbench.stomp;
|
Loading…
Reference in New Issue
Block a user