From 714dd00aabb414e6cfa931888328ad41d8f323e9 Mon Sep 17 00:00:00 2001 From: Andrey Mizurov Date: Thu, 4 Jun 2020 20:14:13 +0300 Subject: [PATCH] Fix #10261 stomp can be chunked, so implement StompWebSocketFrameEncoder (#10274) Motivation: Current implementation `StompSubframeEncoder` can encode `StompFrame` into several separate chunks or encode separately `StompHeadersSubframe` and `StompContentSubframe`. But some client libraries (e.g. stomp.js) do not support aggregation. Modification: Add StompWebSocketFrameEncoder for integration between origin stomp suframe encoder and `ContinuationWebSocketFrame` to support chunks on transport level. Result: Fixes #10261 --- .../websocket/StompWebSocketFrameEncoder.java | 68 +++++++++++++++++++ .../StompWebSocketProtocolCodec.java | 13 ++-- 2 files changed, 75 insertions(+), 6 deletions(-) create mode 100644 example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java diff --git a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java new file mode 100644 index 0000000000..68b92e6992 --- /dev/null +++ b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketFrameEncoder.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.stomp.websocket; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.stomp.LastStompContentSubframe; +import io.netty.handler.codec.stomp.StompFrame; +import io.netty.handler.codec.stomp.StompHeadersSubframe; +import io.netty.handler.codec.stomp.StompSubframe; +import io.netty.handler.codec.stomp.StompSubframeEncoder; + +import java.util.List; + +public class StompWebSocketFrameEncoder extends StompSubframeEncoder { + + @Override + public void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { + super.encode(ctx, msg, out); + + if (out.isEmpty()) { + return; + } + + final WebSocketFrame webSocketFrame; + if (msg instanceof StompFrame) { + if (out.size() == 1) { + webSocketFrame = new TextWebSocketFrame(getFirst(out)); + } else { + CompositeByteBuf content = ctx.alloc().compositeBuffer(out.size()); + for (Object byteBuf : out) { + content.addComponent(true, (ByteBuf) byteBuf); + } + webSocketFrame = new TextWebSocketFrame(content); + } + } else if (msg instanceof StompHeadersSubframe) { + webSocketFrame = new TextWebSocketFrame(false, 0, getFirst(out)); + } else if (msg instanceof LastStompContentSubframe) { + webSocketFrame = new ContinuationWebSocketFrame(true, 0, getFirst(out)); + } else { + webSocketFrame = new ContinuationWebSocketFrame(false, 0, getFirst(out)); + } + + out.clear(); + out.add(webSocketFrame); + } + + private static ByteBuf getFirst(List container) { + return (ByteBuf) container.get(0); + } +} diff --git a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java index 9787078774..8d8ed4b7f9 100644 --- a/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java +++ b/example/src/main/java/io/netty/example/stomp/websocket/StompWebSocketProtocolCodec.java @@ -15,24 +15,25 @@ */ package io.netty.example.stomp.websocket; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete; +import io.netty.handler.codec.stomp.StompSubframe; import io.netty.handler.codec.stomp.StompSubframeAggregator; import io.netty.handler.codec.stomp.StompSubframeDecoder; -import io.netty.handler.codec.stomp.StompSubframeEncoder; import java.util.List; @Sharable -public class StompWebSocketProtocolCodec extends MessageToMessageCodec { +public class StompWebSocketProtocolCodec extends MessageToMessageCodec { private final StompChatHandler stompChatHandler = new StompChatHandler(); + private final StompWebSocketFrameEncoder stompWebSocketFrameEncoder = new StompWebSocketFrameEncoder(); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -40,8 +41,8 @@ public class StompWebSocketProtocolCodec extends MessageToMessageCodec out) { - out.add(new TextWebSocketFrame(stompFrame.retain())); + protected void encode(ChannelHandlerContext ctx, StompSubframe stompFrame, List out) throws Exception { + stompWebSocketFrameEncoder.encode(ctx, stompFrame, out); } @Override