Fix #10261 stomp can be chunked, so implement StompWebSocketFrameEncoder (#10274)

Motivation:

Current implementation `StompSubframeEncoder` can encode `StompFrame` into several separate chunks or encode separately `StompHeadersSubframe` and `StompContentSubframe`. But some client libraries (e.g. stomp.js) do not support aggregation.

Modification:

Add StompWebSocketFrameEncoder for integration between origin stomp suframe encoder and `ContinuationWebSocketFrame` to support  chunks on transport level.

Result:

Fixes #10261
This commit is contained in:
Andrey Mizurov 2020-06-04 20:14:13 +03:00 committed by GitHub
parent 9a558f1be9
commit 714dd00aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 6 deletions

View File

@ -0,0 +1,68 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.stomp.websocket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.stomp.LastStompContentSubframe;
import io.netty.handler.codec.stomp.StompFrame;
import io.netty.handler.codec.stomp.StompHeadersSubframe;
import io.netty.handler.codec.stomp.StompSubframe;
import io.netty.handler.codec.stomp.StompSubframeEncoder;
import java.util.List;
public class StompWebSocketFrameEncoder extends StompSubframeEncoder {
@Override
public void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
super.encode(ctx, msg, out);
if (out.isEmpty()) {
return;
}
final WebSocketFrame webSocketFrame;
if (msg instanceof StompFrame) {
if (out.size() == 1) {
webSocketFrame = new TextWebSocketFrame(getFirst(out));
} else {
CompositeByteBuf content = ctx.alloc().compositeBuffer(out.size());
for (Object byteBuf : out) {
content.addComponent(true, (ByteBuf) byteBuf);
}
webSocketFrame = new TextWebSocketFrame(content);
}
} else if (msg instanceof StompHeadersSubframe) {
webSocketFrame = new TextWebSocketFrame(false, 0, getFirst(out));
} else if (msg instanceof LastStompContentSubframe) {
webSocketFrame = new ContinuationWebSocketFrame(true, 0, getFirst(out));
} else {
webSocketFrame = new ContinuationWebSocketFrame(false, 0, getFirst(out));
}
out.clear();
out.add(webSocketFrame);
}
private static ByteBuf getFirst(List<Object> container) {
return (ByteBuf) container.get(0);
}
}

View File

@ -15,24 +15,25 @@
*/
package io.netty.example.stomp.websocket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
import io.netty.handler.codec.stomp.StompSubframe;
import io.netty.handler.codec.stomp.StompSubframeAggregator;
import io.netty.handler.codec.stomp.StompSubframeDecoder;
import io.netty.handler.codec.stomp.StompSubframeEncoder;
import java.util.List;
@Sharable
public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocketFrame, ByteBuf> {
public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocketFrame, StompSubframe> {
private final StompChatHandler stompChatHandler = new StompChatHandler();
private final StompWebSocketFrameEncoder stompWebSocketFrameEncoder = new StompWebSocketFrameEncoder();
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@ -40,8 +41,8 @@ public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocket
StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol());
ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion);
ctx.pipeline()
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new StompSubframeDecoder())
.addLast(new StompSubframeEncoder())
.addLast(new StompSubframeAggregator(65536))
.addLast(stompChatHandler)
.remove(StompWebSocketClientPageHandler.INSTANCE);
@ -51,8 +52,8 @@ public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocket
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf stompFrame, List<Object> out) {
out.add(new TextWebSocketFrame(stompFrame.retain()));
protected void encode(ChannelHandlerContext ctx, StompSubframe stompFrame, List<Object> out) throws Exception {
stompWebSocketFrameEncoder.encode(ctx, stompFrame, out);
}
@Override