From 2877eef5d579fdcc26736817394cb255026f683f Mon Sep 17 00:00:00 2001 From: Andrey Mizurov Date: Mon, 7 Dec 2020 11:00:52 +0300 Subject: [PATCH] Provide ability to extend StompSubframeEncoder and improve full stomp frame encoding (allocate one buffer for full frame considering the size of the headers) (#10778) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: At the moment `StompSubframeEncoder` encode a frame only to `ByteBuf` it is not convenient if further we need to convert it to another type of message, e.g. `WebSocketFrame`. Also, if we send a full frame, it splits into two headers and a content what makes it difficult to convert it in the next handler. Modification: Introduce additional converter methods e.g. (`Object protected convertFullFrame(StompFrame original, ByteBuf encoded`)...) for extending encoder functionality and allocate only one `ByteBuf` for full stomp frame. Change headers size calculation, previously used only 256 bytes that reallocate a new buffer each time when headers size more than this threshold. Add `StompEncoderBenchmark`. Result: Improved `StompSubframeEncoder` fro extensions. Previous version benchmark ``` Benchmark (contentLength) (headersType) (pooledAllocator) Mode Cnt Score Error Units StompEncoderBenchmark.writeStompFrame 0 ONE true thrpt 10 4432132.884 ± 178923.436 ops/s StompEncoderBenchmark.writeStompFrame 0 ONE false thrpt 10 1281122.756 ± 52484.174 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE true thrpt 10 2980897.937 ± 130253.049 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE false thrpt 10 1116883.574 ± 35471.482 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN true thrpt 10 1988012.159 ± 74352.450 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN false thrpt 10 881772.343 ± 94633.870 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN true thrpt 10 1048125.919 ± 151053.902 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN false thrpt 10 429900.066 ± 47956.661 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY true thrpt 10 660584.122 ± 104973.439 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY false thrpt 10 278255.488 ± 20143.708 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE true thrpt 10 4251498.549 ± 625050.979 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE false thrpt 10 1214006.861 ± 60421.601 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE true thrpt 10 3117736.486 ± 173613.974 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE false thrpt 10 1046605.891 ± 94428.064 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN true thrpt 10 2006986.881 ± 108456.748 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN false thrpt 10 877983.112 ± 82919.387 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN true thrpt 10 1132844.437 ± 84578.571 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN false thrpt 10 429334.649 ± 35403.161 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY true thrpt 10 657093.390 ± 48092.947 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY false thrpt 10 252140.876 ± 37337.255 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE true thrpt 10 4720507.067 ± 100993.908 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE false thrpt 10 1266182.925 ± 85888.413 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE true thrpt 10 2898746.621 ± 452579.753 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE false thrpt 10 1019555.288 ± 65640.507 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN true thrpt 10 2259187.459 ± 20025.989 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN false thrpt 10 896405.412 ± 53750.148 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN true thrpt 10 1110670.772 ± 107650.327 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN false thrpt 10 445187.398 ± 28845.959 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY true thrpt 10 611506.846 ± 25304.240 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY false thrpt 10 247687.007 ± 43471.578 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE true thrpt 10 4140949.576 ± 270274.087 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE false thrpt 10 1154515.598 ± 134413.876 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE true thrpt 10 3349996.875 ± 162309.889 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE false thrpt 10 1141040.562 ± 5895.693 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN true thrpt 10 2184632.248 ± 8957.833 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN false thrpt 10 959545.704 ± 5835.161 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN true thrpt 10 1081113.327 ± 3957.527 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN false thrpt 10 467524.660 ± 1383.236 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY true thrpt 10 568411.797 ± 108712.493 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY false thrpt 10 260764.231 ± 43149.129 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE true thrpt 10 4369787.147 ± 619367.939 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE false thrpt 10 1246782.845 ± 47468.764 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE true thrpt 10 3333328.810 ± 253061.481 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE false thrpt 10 1108278.988 ± 81905.149 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN true thrpt 10 2062961.266 ± 247096.284 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN false thrpt 10 925199.985 ± 36734.594 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN true thrpt 10 1223240.034 ± 58833.801 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN false thrpt 10 460864.117 ± 2361.459 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY true thrpt 10 655864.762 ± 35237.335 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY false thrpt 10 286388.865 ± 1002.460 ops/s ``` A new version benchmark ``` Benchmark (contentLength) (headersType) (pooledAllocator) Mode Cnt Score Error Units StompEncoderBenchmark.writeStompFrame 0 ONE true thrpt 10 4366110.018 ± 420377.867 ops/s StompEncoderBenchmark.writeStompFrame 0 ONE false thrpt 10 1289437.153 ± 215271.656 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE true thrpt 10 2818791.355 ± 218894.471 ops/s StompEncoderBenchmark.writeStompFrame 0 THREE false thrpt 10 1040151.615 ± 75352.695 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN true thrpt 10 1842144.001 ± 94668.864 ops/s StompEncoderBenchmark.writeStompFrame 0 SEVEN false thrpt 10 916742.825 ± 65467.820 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN true thrpt 10 1310454.012 ± 100747.490 ops/s StompEncoderBenchmark.writeStompFrame 0 ELEVEN false thrpt 10 679934.001 ± 82168.249 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY true thrpt 10 746867.549 ± 68373.269 ops/s StompEncoderBenchmark.writeStompFrame 0 TWENTY false thrpt 10 483316.314 ± 50978.009 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE true thrpt 10 4791698.722 ± 263890.510 ops/s StompEncoderBenchmark.writeStompFrame 10 ONE false thrpt 10 1289877.116 ± 128677.185 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE true thrpt 10 2984662.187 ± 395567.524 ops/s StompEncoderBenchmark.writeStompFrame 10 THREE false thrpt 10 1079028.782 ± 43548.555 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN true thrpt 10 1806763.709 ± 59162.209 ops/s StompEncoderBenchmark.writeStompFrame 10 SEVEN false thrpt 10 935274.980 ± 22064.148 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN true thrpt 10 1284172.151 ± 119068.047 ops/s StompEncoderBenchmark.writeStompFrame 10 ELEVEN false thrpt 10 687174.498 ± 30270.916 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY true thrpt 10 803843.483 ± 29106.133 ops/s StompEncoderBenchmark.writeStompFrame 10 TWENTY false thrpt 10 502134.552 ± 23653.215 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE true thrpt 10 4337438.694 ± 378524.452 ops/s StompEncoderBenchmark.writeStompFrame 100 ONE false thrpt 10 1289174.213 ± 50640.853 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE true thrpt 10 3232767.156 ± 311934.194 ops/s StompEncoderBenchmark.writeStompFrame 100 THREE false thrpt 10 1115247.028 ± 15683.477 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN true thrpt 10 2213147.232 ± 86326.187 ops/s StompEncoderBenchmark.writeStompFrame 100 SEVEN false thrpt 10 901120.188 ± 71344.491 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN true thrpt 10 1238317.714 ± 68148.477 ops/s StompEncoderBenchmark.writeStompFrame 100 ELEVEN false thrpt 10 671336.339 ± 72735.337 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY true thrpt 10 754565.791 ± 28574.382 ops/s StompEncoderBenchmark.writeStompFrame 100 TWENTY false thrpt 10 498939.383 ± 38146.118 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE true thrpt 10 3722594.471 ± 515861.000 ops/s StompEncoderBenchmark.writeStompFrame 1000 ONE false thrpt 10 1265629.633 ± 84113.347 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE true thrpt 10 2829696.349 ± 172520.267 ops/s StompEncoderBenchmark.writeStompFrame 1000 THREE false thrpt 10 1111454.609 ± 26275.913 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN true thrpt 10 1901506.449 ± 37701.353 ops/s StompEncoderBenchmark.writeStompFrame 1000 SEVEN false thrpt 10 912528.888 ± 46221.215 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN true thrpt 10 1299674.123 ± 21889.002 ops/s StompEncoderBenchmark.writeStompFrame 1000 ELEVEN false thrpt 10 724527.644 ± 2757.370 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY true thrpt 10 811389.799 ± 2606.626 ops/s StompEncoderBenchmark.writeStompFrame 1000 TWENTY false thrpt 10 504955.449 ± 6737.804 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE true thrpt 10 3837912.649 ± 380742.919 ops/s StompEncoderBenchmark.writeStompFrame 10000 ONE false thrpt 10 1375544.306 ± 3157.068 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE true thrpt 10 3224743.448 ± 297369.719 ops/s StompEncoderBenchmark.writeStompFrame 10000 THREE false thrpt 10 1125772.007 ± 4051.498 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN true thrpt 10 2127352.136 ± 106787.777 ops/s StompEncoderBenchmark.writeStompFrame 10000 SEVEN false thrpt 10 934848.418 ± 4564.147 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN true thrpt 10 1379672.772 ± 8778.640 ops/s StompEncoderBenchmark.writeStompFrame 10000 ELEVEN false thrpt 10 723169.459 ± 2317.767 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY true thrpt 10 802275.113 ± 4155.137 ops/s StompEncoderBenchmark.writeStompFrame 10000 TWENTY false thrpt 10 517604.265 ± 3398.384 ops/s ``` For headers over 256 bytes we get a speedup. --- .../codec/stomp/StompSubframeEncoder.java | 116 +++++++++++++----- .../codec/stomp/StompSubframeEncoderTest.java | 4 +- .../websocket/StompWebSocketFrameEncoder.java | 62 +++++----- .../StompWebSocketProtocolCodec.java | 3 +- microbench/pom.xml | 5 + .../stomp/ExampleStompHeadersSubframe.java | 108 ++++++++++++++++ .../stomp/StompEncoderBenchmark.java | 110 +++++++++++++++++ .../netty/microbench/stomp/package-info.java | 19 +++ 8 files changed, 363 insertions(+), 64 deletions(-) create mode 100644 microbench/src/main/java/io/netty/microbench/stomp/ExampleStompHeadersSubframe.java create mode 100644 microbench/src/main/java/io/netty/microbench/stomp/StompEncoderBenchmark.java create mode 100644 microbench/src/main/java/io/netty/microbench/stomp/package-info.java diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java index 38e69f89df..8bd20e2646 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java @@ -19,11 +19,12 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; -import io.netty.util.CharsetUtil; import java.util.List; import java.util.Map.Entry; +import static io.netty.handler.codec.stomp.StompConstants.*; + /** * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}. */ @@ -32,50 +33,103 @@ public class StompSubframeEncoder extends MessageToMessageEncoder @Override protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { if (msg instanceof StompFrame) { - StompFrame frame = (StompFrame) msg; - ByteBuf frameBuf = encodeFrame(frame, ctx); - if (frame.content().isReadable()) { - out.add(frameBuf); - ByteBuf contentBuf = encodeContent(frame, ctx); - out.add(contentBuf); - } else { - frameBuf.writeByte(StompConstants.NUL); - out.add(frameBuf); - } + StompFrame stompFrame = (StompFrame) msg; + ByteBuf buf = encodeFullFrame(stompFrame, ctx); + + out.add(convertFullFrame(stompFrame, buf)); } else if (msg instanceof StompHeadersSubframe) { - StompHeadersSubframe frame = (StompHeadersSubframe) msg; - ByteBuf buf = encodeFrame(frame, ctx); - out.add(buf); + StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg; + ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe)); + encodeHeaders(stompHeadersSubframe, buf); + + out.add(convertHeadersSubFrame(stompHeadersSubframe, buf)); } else if (msg instanceof StompContentSubframe) { StompContentSubframe stompContentSubframe = (StompContentSubframe) msg; ByteBuf buf = encodeContent(stompContentSubframe, ctx); - out.add(buf); + + out.add(convertContentSubFrame(stompContentSubframe, buf)); } } + /** + * An extension method to convert a STOMP encoded buffer to a different message type + * based on an original {@link StompFrame} full frame. + * + *

By default an encoded buffer is returned as is. + */ + protected Object convertFullFrame(StompFrame original, ByteBuf encoded) { + return encoded; + } + + /** + * An extension method to convert a STOMP encoded buffer to a different message type + * based on an original {@link StompHeadersSubframe} headers sub frame. + * + *

By default an encoded buffer is returned as is. + */ + protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) { + return encoded; + } + + /** + * An extension method to convert a STOMP encoded buffer to a different message type + * based on an original {@link StompHeadersSubframe} content sub frame. + * + *

By default an encoded buffer is returned as is. + */ + protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) { + return encoded; + } + + /** + * Returns a heuristic size for headers (32 bytes per header line) + (2 bytes for colon and eol) + (additional + * command buffer). + */ + protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) { + int estimatedSize = headersSubframe.headers().size() * 34 + 48; + if (estimatedSize < 128) { + return 128; + } else if (estimatedSize < 256) { + return 256; + } + + return estimatedSize; + } + + private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) { + int contentReadableBytes = frame.content().readableBytes(); + ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes); + encodeHeaders(frame, buf); + + if (contentReadableBytes > 0) { + buf.writeBytes(frame.content()); + } + + return buf.writeByte(NUL); + } + + private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) { + ByteBufUtil.writeUtf8(buf, frame.command().toString()); + buf.writeByte(StompConstants.LF); + + for (Entry entry : frame.headers()) { + ByteBufUtil.writeUtf8(buf, entry.getKey()); + buf.writeByte(StompConstants.COLON); + ByteBufUtil.writeUtf8(buf, entry.getValue()); + buf.writeByte(StompConstants.LF); + } + + buf.writeByte(StompConstants.LF); + } + private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) { if (content instanceof LastStompContentSubframe) { ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1); buf.writeBytes(content.content()); buf.writeByte(StompConstants.NUL); return buf; - } else { - return content.content().retain(); } - } - private static ByteBuf encodeFrame(StompHeadersSubframe frame, ChannelHandlerContext ctx) { - ByteBuf buf = ctx.alloc().buffer(); - - buf.writeCharSequence(frame.command().toString(), CharsetUtil.UTF_8); - buf.writeByte(StompConstants.LF); - for (Entry entry : frame.headers()) { - ByteBufUtil.writeUtf8(buf, entry.getKey()); - buf.writeByte(StompConstants.COLON); - ByteBufUtil.writeUtf8(buf, entry.getValue()); - buf.writeByte(StompConstants.LF); - } - buf.writeByte(StompConstants.LF); - return buf; + return content.content().retain(); } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java index 738a360d77..3191a9c734 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java @@ -76,9 +76,7 @@ public class StompSubframeEncoderTest { channel.writeOutbound(frame); - ByteBuf headers = channel.readOutbound(); - ByteBuf content = channel.readOutbound(); - ByteBuf fullFrame = Unpooled.wrappedBuffer(headers, content); + ByteBuf fullFrame = channel.readOutbound(); assertEquals(SEND_FRAME_UTF8, fullFrame.toString(CharsetUtil.UTF_8)); assertTrue(fullFrame.release()); } diff --git a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java index 7ec156a631..141b57a487 100644 --- a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java +++ b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java @@ -16,13 +16,15 @@ package io.netty.example.stomp.websocket; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.stomp.LastStompContentSubframe; +import io.netty.handler.codec.stomp.StompContentSubframe; import io.netty.handler.codec.stomp.StompFrame; +import io.netty.handler.codec.stomp.StompHeaders; import io.netty.handler.codec.stomp.StompHeadersSubframe; import io.netty.handler.codec.stomp.StompSubframe; import io.netty.handler.codec.stomp.StompSubframeEncoder; @@ -34,35 +36,37 @@ public class StompWebSocketFrameEncoder extends StompSubframeEncoder { @Override public void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { super.encode(ctx, msg, out); - - if (out.isEmpty()) { - return; - } - - final WebSocketFrame webSocketFrame; - if (msg instanceof StompFrame) { - if (out.size() == 1) { - webSocketFrame = new TextWebSocketFrame(getFirst(out)); - } else { - CompositeByteBuf content = ctx.alloc().compositeBuffer(out.size()); - for (Object byteBuf : out) { - content.addComponent(true, (ByteBuf) byteBuf); - } - webSocketFrame = new TextWebSocketFrame(content); - } - } else if (msg instanceof StompHeadersSubframe) { - webSocketFrame = new TextWebSocketFrame(false, 0, getFirst(out)); - } else if (msg instanceof LastStompContentSubframe) { - webSocketFrame = new ContinuationWebSocketFrame(true, 0, getFirst(out)); - } else { - webSocketFrame = new ContinuationWebSocketFrame(false, 0, getFirst(out)); - } - - out.clear(); - out.add(webSocketFrame); } - private static ByteBuf getFirst(List container) { - return (ByteBuf) container.get(0); + @Override + protected WebSocketFrame convertFullFrame(StompFrame original, ByteBuf encoded) { + if (isTextFrame(original)) { + return new TextWebSocketFrame(encoded); + } + + return new BinaryWebSocketFrame(encoded); + } + + @Override + protected WebSocketFrame convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) { + if (isTextFrame(original)) { + return new TextWebSocketFrame(false, 0, encoded); + } + + return new BinaryWebSocketFrame(false, 0, encoded); + } + + @Override + protected WebSocketFrame convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) { + if (original instanceof LastStompContentSubframe) { + return new ContinuationWebSocketFrame(true, 0, encoded); + } + + return new ContinuationWebSocketFrame(false, 0, encoded); + } + + private static boolean isTextFrame(StompHeadersSubframe headersSubframe) { + String contentType = headersSubframe.headers().getAsString(StompHeaders.CONTENT_TYPE); + return contentType != null && (contentType.startsWith("text") || contentType.startsWith("application/json")); } } diff --git a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java index c0e2341ad5..dad60c0b01 100644 --- a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java +++ b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java @@ -18,6 +18,7 @@ package io.netty.example.stomp.websocket; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; @@ -58,7 +59,7 @@ public class StompWebSocketProtocolCodec extends MessageToMessageCodecnetty-codec-mqtt ${project.version} + + io.netty + netty-codec-stomp + ${project.version} + ${project.groupId} netty-transport-native-epoll diff --git a/microbench/src/main/java/io/netty/microbench/stomp/ExampleStompHeadersSubframe.java b/microbench/src/main/java/io/netty/microbench/stomp/ExampleStompHeadersSubframe.java new file mode 100644 index 0000000000..116c26eef8 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/stomp/ExampleStompHeadersSubframe.java @@ -0,0 +1,108 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.stomp; + +import io.netty.handler.codec.stomp.DefaultStompHeadersSubframe; +import io.netty.handler.codec.stomp.StompCommand; +import io.netty.handler.codec.stomp.StompHeaders; +import io.netty.handler.codec.stomp.StompHeadersSubframe; + +import java.util.EnumMap; +import java.util.Map; +import java.util.UUID; + +public final class ExampleStompHeadersSubframe { + + public enum HeadersType { + ONE, + THREE, + SEVEN, + // Next encoded headers size will be more than 256 bytes + ELEVEN, + TWENTY + } + + public static final Map EXAMPLES = + new EnumMap( + HeadersType.class); + + static { + StompHeadersSubframe headersSubframe = new DefaultStompHeadersSubframe(StompCommand.RECEIPT); + headersSubframe.headers() + .set(StompHeaders.RECEIPT_ID, UUID.randomUUID().toString()); + EXAMPLES.put(HeadersType.ONE, headersSubframe); + + headersSubframe = new DefaultStompHeadersSubframe(StompCommand.ERROR); + headersSubframe.headers() + .set(StompHeaders.RECEIPT_ID, UUID.randomUUID().toString()) + .set(StompHeaders.CONTENT_TYPE, "text/plain") + .set(StompHeaders.MESSAGE, "malformed frame received"); + EXAMPLES.put(HeadersType.THREE, headersSubframe); + + headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE); + headersSubframe.headers() + .set(StompHeaders.SUBSCRIPTION, "7") + .set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString()) + .set(StompHeaders.DESTINATION, "/queue/chat") + .set(StompHeaders.CONTENT_TYPE, "application/octet-stream") + .set(StompHeaders.ACK, UUID.randomUUID().toString()) + .setLong("timestamp", System.currentTimeMillis()) + .set("Message-Type: 007"); + EXAMPLES.put(HeadersType.SEVEN, headersSubframe); + + headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE); + headersSubframe.headers() + .set(StompHeaders.SUBSCRIPTION, "11") + .set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString()) + .set(StompHeaders.DESTINATION, "/queue/chat") + .set(StompHeaders.CONTENT_TYPE, "application/octet-stream") + .set(StompHeaders.ACK, UUID.randomUUID().toString()) + .setLong("timestamp", System.currentTimeMillis()) + .set("Message-Type: 0011") + .set("Strict-Transport-Security", "max-age=31536000; includeSubdomains; preload") + .set("Server", "GitHub.com") + .set("Expires", "Sat, 01 Jan 2000 00:00:00 GMT") + .set("Content-Language", "en"); + EXAMPLES.put(HeadersType.ELEVEN, headersSubframe); + + headersSubframe = new DefaultStompHeadersSubframe(StompCommand.MESSAGE); + headersSubframe.headers() + .set(StompHeaders.SUBSCRIPTION, "20") + .set(StompHeaders.MESSAGE_ID, UUID.randomUUID().toString()) + .set(StompHeaders.DESTINATION, "/queue/chat") + .set(StompHeaders.CONTENT_TYPE, "application/octet-stream") + .set(StompHeaders.ACK, UUID.randomUUID().toString()) + .setLong("timestamp", System.currentTimeMillis()) + .set("Message-Type: 0020") + .set("date", "Wed, 22 Apr 2015 00:40:28 GMT") + .set("expires", "Tue, 31 Mar 1981 05:00:00 GMT") + .set("last-modified", "Wed, 22 Apr 2015 00:40:28 GMT") + .set("ms", "ms") + .set("pragma", "no-cache") + .set("server", "tsa_b") + .set("set-cookie", "noneofyourbusiness") + .set("strict-transport-security", "max-age=631138519") + .set("version", "STOMP_v1.2") + .set("x-connection-hash", "e176fe40accc1e2c613a34bc1941aa98") + .set("x-content-type-options", "nosniff") + .set("x-frame-options", "SAMEORIGIN") + .set("x-transaction", "a54142ede693444d9"); + EXAMPLES.put(HeadersType.TWENTY, headersSubframe); + } + + private ExampleStompHeadersSubframe() { + } +} diff --git a/microbench/src/main/java/io/netty/microbench/stomp/StompEncoderBenchmark.java b/microbench/src/main/java/io/netty/microbench/stomp/StompEncoderBenchmark.java new file mode 100644 index 0000000000..338fa19216 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/stomp/StompEncoderBenchmark.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.stomp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.stomp.DefaultStompFrame; +import io.netty.handler.codec.stomp.StompFrame; +import io.netty.handler.codec.stomp.StompHeadersSubframe; +import io.netty.handler.codec.stomp.StompSubframeEncoder; +import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; + +import java.util.concurrent.ThreadLocalRandom; + +@State(Scope.Benchmark) +@Fork(value = 2) +@Threads(1) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +public class StompEncoderBenchmark extends AbstractMicrobenchmark { + + private StompSubframeEncoder stompEncoder; + private ByteBuf content; + private StompFrame stompFrame; + private ChannelHandlerContext context; + + @Param({ "true", "false" }) + public boolean pooledAllocator; + + @Param({ "true", "false" }) + public boolean voidPromise; + + @Param + public ExampleStompHeadersSubframe.HeadersType headersType; + + @Param({ "0", "100", "1000" }) + public int contentLength; + + @Setup(Level.Trial) + public void setup() { + byte[] bytes = new byte[contentLength]; + ThreadLocalRandom.current().nextBytes(bytes); + content = Unpooled.wrappedBuffer(bytes); + ByteBuf testContent = Unpooled.unreleasableBuffer(content.asReadOnly()); + + StompHeadersSubframe headersSubframe = ExampleStompHeadersSubframe.EXAMPLES.get(headersType); + stompFrame = new DefaultStompFrame(headersSubframe.command(), testContent); + stompFrame.headers().setAll(headersSubframe.headers()); + + stompEncoder = new StompSubframeEncoder(); + context = new EmbeddedChannelWriteReleaseHandlerContext( + pooledAllocator? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT, stompEncoder) { + @Override + protected void handleException(Throwable t) { + handleUnexpectedException(t); + } + }; + } + + @TearDown(Level.Trial) + public void teardown() { + content.release(); + content = null; + } + + @Benchmark + public void writeStompFrame() throws Exception { + stompEncoder.write(context, stompFrame.retain(), newPromise()); + } + + private ChannelPromise newPromise() { + return voidPromise? context.voidPromise() : context.newPromise(); + } + + @Override + protected ChainedOptionsBuilder newOptionsBuilder() throws Exception { + return super.newOptionsBuilder().addProfiler(GCProfiler.class); + } +} diff --git a/microbench/src/main/java/io/netty/microbench/stomp/package-info.java b/microbench/src/main/java/io/netty/microbench/stomp/package-info.java new file mode 100644 index 0000000000..ace726202a --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/stomp/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Benchmarks for {@link io.netty.handler.codec.stomp.StompSubframeEncoder}. + */ +package io.netty.microbench.stomp;