From a8143eda274ac1f44f7da770bd5ec13527ceb8e2 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 4 Jun 2014 16:39:50 +0900 Subject: [PATCH] Overall refactoring of the STOMP codec - StompObject -> StompSubframe - StompFrame -> StompHeadersSubframe - StompContent -> StompContntSubframe - FullStompFrame -> StompFrame - StompEncoder/Decoder -> StompSubframeEncoder/Decoder - StompAggregator -> StompSubframeAggregator - Simplify the example - Update Javadoc - Miscellaneous cleanup --- codec-stomp/pom.xml | 2 +- .../codec/stomp/DefaultFullStompFrame.java | 105 ------------ ...a => DefaultLastStompContentSubframe.java} | 26 +-- ....java => DefaultStompContentSubframe.java} | 30 ++-- .../codec/stomp/DefaultStompFrame.java | 74 +++++++-- .../stomp/DefaultStompHeadersSubframe.java | 63 ++++++++ .../handler/codec/stomp/FullStompFrame.java | 41 ----- ...ent.java => LastStompContentSubframe.java} | 35 ++-- .../handler/codec/stomp/StompCommand.java | 15 +- .../handler/codec/stomp/StompConstants.java | 11 +- ...Content.java => StompContentSubframe.java} | 27 ++-- .../netty/handler/codec/stomp/StompFrame.java | 34 ++-- .../handler/codec/stomp/StompHeaders.java | 24 +-- .../codec/stomp/StompHeadersSubframe.java | 21 ++- .../{StompObject.java => StompSubframe.java} | 11 +- ...ator.java => StompSubframeAggregator.java} | 46 +++--- ...Decoder.java => StompSubframeDecoder.java} | 120 ++++++++------ ...Encoder.java => StompSubframeEncoder.java} | 50 +++--- .../handler/codec/stomp/package-info.java | 4 +- ....java => StompSubframeAggregatorTest.java} | 29 ++-- ...est.java => StompSubframeDecoderTest.java} | 96 +++++------ ...est.java => StompSubframeEncoderTest.java} | 25 ++- .../codec/stomp/StompTestConstants.java | 5 +- .../io/netty/example/stomp/StompClient.java | 151 +++--------------- .../example/stomp/StompClientHandler.java | 81 ++++++++-- run-example.sh | 1 + 26 files changed, 533 insertions(+), 594 deletions(-) delete mode 100644 codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultFullStompFrame.java rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{DefaultLastStompContent.java => DefaultLastStompContentSubframe.java} (60%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{DefaultStompContent.java => DefaultStompContentSubframe.java} (70%) create mode 100644 codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java delete mode 100644 codec-stomp/src/main/java/io/netty/handler/codec/stomp/FullStompFrame.java rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{LastStompContent.java => LastStompContentSubframe.java} (66%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{StompContent.java => StompContentSubframe.java} (54%) rename example/src/main/java/io/netty/example/stomp/StompFrameListener.java => codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeadersSubframe.java (62%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{StompObject.java => StompSubframe.java} (77%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{StompAggregator.java => StompSubframeAggregator.java} (77%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{StompDecoder.java => StompSubframeDecoder.java} (74%) rename codec-stomp/src/main/java/io/netty/handler/codec/stomp/{StompEncoder.java => StompSubframeEncoder.java} (56%) rename codec-stomp/src/test/java/io/netty/handler/codec/stomp/{StompAggregatorTest.java => StompSubframeAggregatorTest.java} (84%) rename codec-stomp/src/test/java/io/netty/handler/codec/stomp/{StompDecoderTest.java => StompSubframeDecoderTest.java} (54%) rename codec-stomp/src/test/java/io/netty/handler/codec/stomp/{StompEncoderTest.java => StompSubframeEncoderTest.java} (78%) diff --git a/codec-stomp/pom.xml b/codec-stomp/pom.xml index 693cbbeb35..d00fab5132 100644 --- a/codec-stomp/pom.xml +++ b/codec-stomp/pom.xml @@ -20,7 +20,7 @@ io.netty netty-parent - 4.1.0.Alpha1-SNAPSHOT + 5.0.0.Alpha2-SNAPSHOT netty-codec-stomp diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultFullStompFrame.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultFullStompFrame.java deleted file mode 100644 index d13bd031fb..0000000000 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultFullStompFrame.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.stomp; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.CharsetUtil; - -/** - * Default implementation of {@link FullStompFrame}. - */ -public class DefaultFullStompFrame extends DefaultStompFrame implements FullStompFrame { - private final ByteBuf content; - - public DefaultFullStompFrame(StompCommand command) { - this(command, Unpooled.buffer(0)); - if (command == null) { - throw new NullPointerException("command"); - } - } - - public DefaultFullStompFrame(StompCommand command, ByteBuf content) { - super(command); - if (content == null) { - throw new NullPointerException("content"); - } - this.content = content; - } - - @Override - public ByteBuf content() { - return content; - } - - @Override - public FullStompFrame copy() { - return new DefaultFullStompFrame(command, content.copy()); - } - - @Override - public FullStompFrame duplicate() { - return new DefaultFullStompFrame(command, content.duplicate()); - } - - @Override - public int refCnt() { - return content.refCnt(); - } - - @Override - public FullStompFrame retain() { - content.retain(); - return this; - } - - @Override - public FullStompFrame retain(int increment) { - content.retain(); - return this; - } - - @Override - public FullStompFrame touch() { - content.touch(); - return this; - } - - @Override - public FullStompFrame touch(Object hint) { - content.touch(hint); - return this; - } - - @Override - public boolean release() { - return content.release(); - } - - @Override - public boolean release(int decrement) { - return content.release(decrement); - } - - @Override - public String toString() { - return "DefaultFullStompFrame{" + - "command=" + command + - ", headers=" + headers + - ", content=" + content.toString(CharsetUtil.UTF_8) + - '}'; - } -} diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContent.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContentSubframe.java similarity index 60% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContent.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContentSubframe.java index 96680cd030..eaca9f9900 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContent.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultLastStompContentSubframe.java @@ -18,52 +18,52 @@ package io.netty.handler.codec.stomp; import io.netty.buffer.ByteBuf; /** - * The default implementation for the {@link LastStompContent}. + * The default implementation for the {@link LastStompContentSubframe}. */ -public class DefaultLastStompContent extends DefaultStompContent implements LastStompContent { - public DefaultLastStompContent(ByteBuf content) { +public class DefaultLastStompContentSubframe extends DefaultStompContentSubframe implements LastStompContentSubframe { + + public DefaultLastStompContentSubframe(ByteBuf content) { super(content); } @Override - public DefaultLastStompContent retain() { + public DefaultLastStompContentSubframe retain() { super.retain(); return this; } @Override - public LastStompContent retain(int increment) { + public LastStompContentSubframe retain(int increment) { super.retain(increment); return this; } @Override - public LastStompContent touch() { + public LastStompContentSubframe touch() { super.touch(); return this; } @Override - public LastStompContent touch(Object hint) { + public LastStompContentSubframe touch(Object hint) { super.touch(hint); return this; } @Override - public LastStompContent copy() { - return new DefaultLastStompContent(content().copy()); + public LastStompContentSubframe copy() { + return new DefaultLastStompContentSubframe(content().copy()); } @Override - public LastStompContent duplicate() { - return new DefaultLastStompContent(content().duplicate()); + public LastStompContentSubframe duplicate() { + return new DefaultLastStompContentSubframe(content().duplicate()); } @Override public String toString() { return "DefaultLastStompContent{" + - "decoderResult=" + getDecoderResult() + + "decoderResult=" + decoderResult() + '}'; } - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContent.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java similarity index 70% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContent.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java index 6450a931c5..2c212634ce 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContent.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompContentSubframe.java @@ -20,13 +20,13 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.DecoderResult; /** - * The default {@link StompContent} implementation. + * The default {@link StompContentSubframe} implementation. */ -public class DefaultStompContent implements StompContent { +public class DefaultStompContentSubframe implements StompContentSubframe { private DecoderResult decoderResult; private final ByteBuf content; - public DefaultStompContent(ByteBuf content) { + public DefaultStompContentSubframe(ByteBuf content) { if (content == null) { throw new NullPointerException("content"); } @@ -39,13 +39,13 @@ public class DefaultStompContent implements StompContent { } @Override - public StompContent copy() { - return new DefaultStompContent(content().copy()); + public StompContentSubframe copy() { + return new DefaultStompContentSubframe(content().copy()); } @Override - public StompContent duplicate() { - return new DefaultStompContent(content().duplicate()); + public StompContentSubframe duplicate() { + return new DefaultStompContentSubframe(content().duplicate()); } @Override @@ -54,25 +54,25 @@ public class DefaultStompContent implements StompContent { } @Override - public StompContent retain() { + public StompContentSubframe retain() { content().retain(); return this; } @Override - public StompContent retain(int increment) { + public StompContentSubframe retain(int increment) { content().retain(increment); return this; } @Override - public StompContent touch() { - content.toString(); + public StompContentSubframe touch() { + content.touch(); return this; } @Override - public StompContent touch(Object hint) { + public StompContentSubframe touch(Object hint) { content.touch(hint); return this; } @@ -88,13 +88,13 @@ public class DefaultStompContent implements StompContent { } @Override - public DecoderResult getDecoderResult() { + public DecoderResult decoderResult() { return decoderResult; } @Override - public void setDecoderResult(DecoderResult result) { - this.decoderResult = result; + public void setDecoderResult(DecoderResult decoderResult) { + this.decoderResult = decoderResult; } @Override diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompFrame.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompFrame.java index 40404fddb6..90ed4a7950 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompFrame.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompFrame.java @@ -15,48 +15,92 @@ */ package io.netty.handler.codec.stomp; -import io.netty.handler.codec.DecoderResult; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; /** * Default implementation of {@link StompFrame}. */ -public class DefaultStompFrame implements StompFrame { - protected final StompCommand command; - protected DecoderResult decoderResult; - protected final StompHeaders headers = new StompHeaders(); +public class DefaultStompFrame extends DefaultStompHeadersSubframe implements StompFrame { + + private final ByteBuf content; public DefaultStompFrame(StompCommand command) { + this(command, Unpooled.buffer(0)); if (command == null) { throw new NullPointerException("command"); } - this.command = command; + } + + public DefaultStompFrame(StompCommand command, ByteBuf content) { + super(command); + if (content == null) { + throw new NullPointerException("content"); + } + this.content = content; } @Override - public StompCommand command() { - return command; + public ByteBuf content() { + return content; } @Override - public StompHeaders headers() { - return headers; + public StompFrame copy() { + return new DefaultStompFrame(command, content.copy()); } @Override - public DecoderResult getDecoderResult() { - return decoderResult; + public StompFrame duplicate() { + return new DefaultStompFrame(command, content.duplicate()); } @Override - public void setDecoderResult(DecoderResult decoderResult) { - this.decoderResult = decoderResult; + public int refCnt() { + return content.refCnt(); + } + + @Override + public StompFrame retain() { + content.retain(); + return this; + } + + @Override + public StompFrame retain(int increment) { + content.retain(); + return this; + } + + @Override + public StompFrame touch() { + content.touch(); + return this; + } + + @Override + public StompFrame touch(Object hint) { + content.touch(hint); + return this; + } + + @Override + public boolean release() { + return content.release(); + } + + @Override + public boolean release(int decrement) { + return content.release(decrement); } @Override public String toString() { - return "StompFrame{" + + return "DefaultFullStompFrame{" + "command=" + command + ", headers=" + headers + + ", content=" + content.toString(CharsetUtil.UTF_8) + '}'; } } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java new file mode 100644 index 0000000000..236fcddd56 --- /dev/null +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/DefaultStompHeadersSubframe.java @@ -0,0 +1,63 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.stomp; + +import io.netty.handler.codec.DecoderResult; + +/** + * Default implementation of {@link StompHeadersSubframe}. + */ +public class DefaultStompHeadersSubframe implements StompHeadersSubframe { + + protected final StompCommand command; + protected DecoderResult decoderResult; + protected final StompHeaders headers = new StompHeaders(); + + public DefaultStompHeadersSubframe(StompCommand command) { + if (command == null) { + throw new NullPointerException("command"); + } + this.command = command; + } + + @Override + public StompCommand command() { + return command; + } + + @Override + public StompHeaders headers() { + return headers; + } + + @Override + public DecoderResult decoderResult() { + return decoderResult; + } + + @Override + public void setDecoderResult(DecoderResult decoderResult) { + this.decoderResult = decoderResult; + } + + @Override + public String toString() { + return "StompFrame{" + + "command=" + command + + ", headers=" + headers + + '}'; + } +} diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/FullStompFrame.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/FullStompFrame.java deleted file mode 100644 index 11cbe54ca5..0000000000 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/FullStompFrame.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.stomp; - -/** - * Combines {@link StompFrame} and {@link LastStompContent} into one - * frame. So it represent a complete STOMP frame. - */ -public interface FullStompFrame extends StompFrame, LastStompContent { - @Override - FullStompFrame copy(); - - @Override - FullStompFrame duplicate(); - - @Override - FullStompFrame retain(); - - @Override - FullStompFrame retain(int increment); - - @Override - FullStompFrame touch(); - - @Override - FullStompFrame touch(Object hint); - -} diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContent.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContentSubframe.java similarity index 66% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContent.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContentSubframe.java index cf94a783ce..138d24edbc 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContent.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/LastStompContentSubframe.java @@ -20,47 +20,47 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.DecoderResult; /** - * The last {@link StompContent} which signals the end of the content batch + * The last {@link StompContentSubframe} which signals the end of the content batch *

* Note, even when no content is emitted by the protocol, an - * empty {@link LastStompContent} is issued to make the upstream parsing + * empty {@link LastStompContentSubframe} is issued to make the upstream parsing * easier. */ -public interface LastStompContent extends StompContent { +public interface LastStompContentSubframe extends StompContentSubframe { - LastStompContent EMPTY_LAST_CONTENT = new LastStompContent() { + LastStompContentSubframe EMPTY_LAST_CONTENT = new LastStompContentSubframe() { @Override public ByteBuf content() { return Unpooled.EMPTY_BUFFER; } @Override - public LastStompContent copy() { + public LastStompContentSubframe copy() { return EMPTY_LAST_CONTENT; } @Override - public LastStompContent duplicate() { + public LastStompContentSubframe duplicate() { return this; } @Override - public LastStompContent retain() { + public LastStompContentSubframe retain() { return this; } @Override - public LastStompContent retain(int increment) { + public LastStompContentSubframe retain(int increment) { return this; } @Override - public LastStompContent touch() { + public LastStompContentSubframe touch() { return this; } @Override - public LastStompContent touch(Object hint) { + public LastStompContentSubframe touch(Object hint) { return this; } @@ -80,7 +80,7 @@ public interface LastStompContent extends StompContent { } @Override - public DecoderResult getDecoderResult() { + public DecoderResult decoderResult() { return DecoderResult.SUCCESS; } @@ -91,21 +91,20 @@ public interface LastStompContent extends StompContent { }; @Override - LastStompContent copy(); + LastStompContentSubframe copy(); @Override - LastStompContent duplicate(); + LastStompContentSubframe duplicate(); @Override - LastStompContent retain(); + LastStompContentSubframe retain(); @Override - LastStompContent retain(int increment); + LastStompContentSubframe retain(int increment); @Override - LastStompContent touch(); + LastStompContentSubframe touch(); @Override - LastStompContent touch(Object hint); - + LastStompContentSubframe touch(Object hint); } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java index c5f0aa0244..789be1ef0f 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java @@ -19,7 +19,18 @@ package io.netty.handler.codec.stomp; * STOMP command */ public enum StompCommand { - STOMP, CONNECT, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE, ACK, NACK, BEGIN, DISCONNECT, MESSAGE, RECEIPT, ERROR, + STOMP, + CONNECT, + CONNECTED, + SEND, + SUBSCRIBE, + UNSUBSCRIBE, + ACK, + NACK, + BEGIN, + DISCONNECT, + MESSAGE, + RECEIPT, + ERROR, UNKNOWN - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompConstants.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompConstants.java index 67cbb6608a..3c9401ab2e 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompConstants.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompConstants.java @@ -16,12 +16,11 @@ package io.netty.handler.codec.stomp; final class StompConstants { - public static final byte CR = 13; - public static final byte LF = 10; - public static final byte NULL = 0; - public static final byte COLON = 58; - private StompConstants() { - } + static final byte CR = 13; + static final byte LF = 10; + static final byte NUL = 0; + static final byte COLON = 58; + private StompConstants() { } } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContent.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContentSubframe.java similarity index 54% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContent.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContentSubframe.java index eeaafee8ab..7b55edb893 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContent.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompContentSubframe.java @@ -16,32 +16,31 @@ package io.netty.handler.codec.stomp; import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelPipeline; /** - * An STOMP chunk which is used for STOMP chunked transfer-encoding. - * {@link StompDecoder} generates {@link StompContent} after - * {@link StompFrame} when the content is large or the encoding of the content - * is 'chunked. If you prefer not to receive {@link StompContent} in your handler, - * place {@link StompAggregator} after {@link StompDecoder} in the - * {@link io.netty.channel.ChannelPipeline}. + * An STOMP chunk which is used for STOMP chunked transfer-encoding. {@link StompSubframeDecoder} generates + * {@link StompContentSubframe} after {@link StompHeadersSubframe} when the content is large or the encoding of + * the content is 'chunked. If you prefer not to receive multiple {@link StompSubframe}s for a single + * {@link StompFrame}, place {@link StompSubframeAggregator} after {@link StompSubframeDecoder} in the + * {@link ChannelPipeline}. */ -public interface StompContent extends ByteBufHolder, StompObject { +public interface StompContentSubframe extends ByteBufHolder, StompSubframe { @Override - StompContent copy(); + StompContentSubframe copy(); @Override - StompContent duplicate(); + StompContentSubframe duplicate(); @Override - StompContent retain(); + StompContentSubframe retain(); @Override - StompContent retain(int increment); + StompContentSubframe retain(int increment); @Override - StompContent touch(); + StompContentSubframe touch(); @Override - StompContent touch(Object hint); - + StompContentSubframe touch(Object hint); } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompFrame.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompFrame.java index 99c589b31e..254ac6073c 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompFrame.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompFrame.java @@ -16,23 +16,25 @@ package io.netty.handler.codec.stomp; /** - * An interface that defines a Stomp frame - * - * @see StompFrame - * @see FullStompFrame - * @see StompHeaders + * Combines {@link StompHeadersSubframe} and {@link LastStompContentSubframe} into one + * frame. So it represent a complete STOMP frame. */ -public interface StompFrame extends StompObject { - /** - * returns command of this frame - * @return the command - */ - StompCommand command(); +public interface StompFrame extends StompHeadersSubframe, LastStompContentSubframe { + @Override + StompFrame copy(); - /** - * returns headers of this frame - * @return the headers object - */ - StompHeaders headers(); + @Override + StompFrame duplicate(); + @Override + StompFrame retain(); + + @Override + StompFrame retain(int increment); + + @Override + StompFrame touch(); + + @Override + StompFrame touch(Object hint); } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeaders.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeaders.java index b04d8d9e55..9f54dada32 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeaders.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeaders.java @@ -18,16 +18,16 @@ package io.netty.handler.codec.stomp; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; /** * Provides the constants for the standard STOMP header names and values and - * commonly used utility methods that accesses an {@link StompFrame}. + * commonly used utility methods that accesses an {@link StompHeadersSubframe}. */ public class StompHeaders { + public static final String ACCEPT_VERSION = "accept-version"; public static final String HOST = "host"; public static final String LOGIN = "login"; @@ -48,28 +48,16 @@ public class StompHeaders { public static final String CONTENT_LENGTH = "content-length"; public static final String CONTENT_TYPE = "content-type"; - public static long getContentLength(StompHeaders headers, long defaultValue) { - String contentLength = headers.get(CONTENT_LENGTH); - if (contentLength != null) { - try { - return Long.parseLong(contentLength); - } catch (NumberFormatException e) { - return defaultValue; - } - } - return defaultValue; - } - private final Map> headers = new HashMap>(); public boolean has(String key) { List values = headers.get(key); - return values != null && values.size() > 0; + return values != null && !values.isEmpty(); } public String get(String key) { List values = headers.get(key); - if (values != null && values.size() > 0) { + if (values != null && !values.isEmpty()) { return values.get(0); } else { return null; @@ -85,6 +73,7 @@ public class StompHeaders { values.add(value); } + @SuppressWarnings("ArraysAsListWithZeroOrOneArgument") public void set(String key, String value) { headers.put(key, Arrays.asList(value)); } @@ -110,8 +99,7 @@ public class StompHeaders { } public void set(StompHeaders headers) { - for (Iterator iterator = headers.keySet().iterator(); iterator.hasNext();) { - String key = iterator.next(); + for (String key: headers.keySet()) { List values = headers.getAll(key); this.headers.put(key, values); } diff --git a/example/src/main/java/io/netty/example/stomp/StompFrameListener.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeadersSubframe.java similarity index 62% rename from example/src/main/java/io/netty/example/stomp/StompFrameListener.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeadersSubframe.java index 03d7f4e4f5..064be80d62 100644 --- a/example/src/main/java/io/netty/example/stomp/StompFrameListener.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompHeadersSubframe.java @@ -13,13 +13,22 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.example.stomp; - -import io.netty.handler.codec.stomp.FullStompFrame; +package io.netty.handler.codec.stomp; /** - * STOMP frame listener which used as a callback in {@link StompClientHandler} + * An interface that defines a {@link StompFrame}'s command and headers. + * + * @see StompCommand + * @see StompHeaders */ -public interface StompFrameListener { - void onFrame(FullStompFrame frame); +public interface StompHeadersSubframe extends StompSubframe { + /** + * Returns command of this frame. + */ + StompCommand command(); + + /** + * Returns headers of this frame. + */ + StompHeaders headers(); } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompObject.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java similarity index 77% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompObject.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java index 15be1686fc..22f676a335 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompObject.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframe.java @@ -18,18 +18,17 @@ package io.netty.handler.codec.stomp; import io.netty.handler.codec.DecoderResult; /** - * Defines a common interface for all {@link StompObject} implementations. + * Defines a common interface for all {@link StompSubframe} implementations. */ -public interface StompObject { +public interface StompSubframe { /** * Returns the result of decoding this object. */ - DecoderResult getDecoderResult(); + DecoderResult decoderResult(); /** - * Updates the result of decoding this object. This method is supposed to be invoked by {@link StompDecoder}. - * Do not call this method unless you know what you are doing. + * Updates the result of decoding this object. This method is supposed to be invoked by + * {@link StompSubframeDecoder}. Do not call this method unless you know what you are doing. */ void setDecoderResult(DecoderResult result); - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompAggregator.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java similarity index 77% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompAggregator.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java index 9976906b51..093e3f84fd 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompAggregator.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeAggregator.java @@ -15,28 +15,30 @@ */ package io.netty.handler.codec.stomp; -import java.util.List; - import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; +import java.util.List; + /** - * A {@link io.netty.channel.ChannelHandler} that aggregates an {@link StompFrame} - * and its following {@link StompContent}s into a single {@link StompFrame} with - * no following {@link StompContent}s. It is useful when you don't want to take - * care of STOMP frames whose content is 'chunked'. Insert this - * handler after {@link StompDecoder} in the {@link io.netty.channel.ChannelPipeline}: + * A {@link ChannelHandler} that aggregates an {@link StompHeadersSubframe} + * and its following {@link StompContentSubframe}s into a single {@link StompFrame}. + * It is useful when you don't want to take care of STOMP frames whose content is 'chunked'. Insert this + * handler after {@link StompSubframeDecoder} in the {@link ChannelPipeline}: */ -public class StompAggregator extends MessageToMessageDecoder { - public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; +public class StompSubframeAggregator extends MessageToMessageDecoder { + + private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; private final int maxContentLength; - private FullStompFrame currentFrame; + private StompFrame currentFrame; private boolean tooLongFrameFound; private volatile boolean handlerAdded; @@ -48,7 +50,7 @@ public class StompAggregator extends MessageToMessageDecoder { * If the length of the aggregated content exceeds this value, * a {@link TooLongFrameException} will be raised. */ - public StompAggregator(int maxContentLength) { + public StompSubframeAggregator(int maxContentLength) { if (maxContentLength <= 0) { throw new IllegalArgumentException( "maxContentLength must be a positive integer: " + @@ -80,23 +82,23 @@ public class StompAggregator extends MessageToMessageDecoder { } @Override - protected void decode(ChannelHandlerContext ctx, StompObject msg, List out) throws Exception { - FullStompFrame currentFrame = this.currentFrame; - if (msg instanceof StompFrame) { + protected void decode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { + StompFrame currentFrame = this.currentFrame; + if (msg instanceof StompHeadersSubframe) { assert currentFrame == null; - StompFrame frame = (StompFrame) msg; - this.currentFrame = currentFrame = new DefaultFullStompFrame(frame.command(), + StompHeadersSubframe frame = (StompHeadersSubframe) msg; + this.currentFrame = currentFrame = new DefaultStompFrame(frame.command(), Unpooled.compositeBuffer(maxCumulationBufferComponents)); currentFrame.headers().set(frame.headers()); - } else if (msg instanceof StompContent) { + } else if (msg instanceof StompContentSubframe) { if (tooLongFrameFound) { - if (msg instanceof LastStompContent) { + if (msg instanceof LastStompContentSubframe) { this.currentFrame = null; } return; } assert currentFrame != null; - StompContent chunk = (StompContent) msg; + StompContentSubframe chunk = (StompContentSubframe) msg; CompositeByteBuf contentBuf = (CompositeByteBuf) currentFrame.content(); if (contentBuf.readableBytes() > maxContentLength - chunk.content().readableBytes()) { tooLongFrameFound = true; @@ -109,7 +111,7 @@ public class StompAggregator extends MessageToMessageDecoder { contentBuf.addComponent(chunk.retain().content()); contentBuf.writerIndex(contentBuf.writerIndex() + chunk.content().readableBytes()); - if (chunk instanceof LastStompContent) { + if (chunk instanceof LastStompContentSubframe) { out.add(currentFrame); this.currentFrame = null; } @@ -121,7 +123,7 @@ public class StompAggregator extends MessageToMessageDecoder { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); - this.handlerAdded = true; + handlerAdded = true; } @Override @@ -136,7 +138,7 @@ public class StompAggregator extends MessageToMessageDecoder { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); - this.handlerAdded = false; + handlerAdded = false; if (currentFrame != null) { currentFrame.release(); currentFrame = null; diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompDecoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java similarity index 74% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompDecoder.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java index 04cf855871..a8376fd5f9 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompDecoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java @@ -15,23 +15,25 @@ */ package io.netty.handler.codec.stomp; -import java.util.List; - import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.stomp.StompDecoder.State; +import io.netty.handler.codec.stomp.StompSubframeDecoder.State; +import io.netty.util.internal.AppendableCharSequence; +import io.netty.util.internal.StringUtil; -import static io.netty.buffer.ByteBufUtil.readBytes; +import java.util.List; +import java.util.Locale; + +import static io.netty.buffer.ByteBufUtil.*; /** - * Decodes {@link ByteBuf}s into {@link StompFrame}s and - * {@link StompContent}s. + * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and + * {@link StompContentSubframe}s. * *

Parameters to control memory consumption:

* {@code maxLineLength} the maximum length of line - @@ -42,31 +44,42 @@ import static io.netty.buffer.ByteBufUtil.readBytes; * {@code maxChunkSize} * The maximum length of the content or each chunk. If the content length * (or the length of each chunk) exceeds this value, the content or chunk - * ill be split into multiple {@link StompContent}s whose length is + * ill be split into multiple {@link StompContentSubframe}s whose length is * {@code maxChunkSize} at maximum. * *

Chunked Content

* * If the content of a stomp message is greater than {@code maxChunkSize} * the transfer encoding of the HTTP message is 'chunked', this decoder - * generates multiple {@link StompContent} instances to avoid excessive memory + * generates multiple {@link StompContentSubframe} instances to avoid excessive memory * consumption. Note, that every message, even with no content decodes with - * {@link LastStompContent} at the end to simplify upstream message parsing. + * {@link LastStompContentSubframe} at the end to simplify upstream message parsing. */ -public class StompDecoder extends ReplayingDecoder { - public static final int DEFAULT_CHUNK_SIZE = 8132; - public static final int DEFAULT_MAX_LINE_LENGTH = 1024; - private int maxLineLength; - private int maxChunkSize; +public class StompSubframeDecoder extends ReplayingDecoder { + + private static final int DEFAULT_CHUNK_SIZE = 8132; + private static final int DEFAULT_MAX_LINE_LENGTH = 1024; + + enum State { + SKIP_CONTROL_CHARACTERS, + READ_HEADERS, + READ_CONTENT, + FINALIZE_FRAME_READ, + BAD_FRAME, + INVALID_CHUNK + } + + private final int maxLineLength; + private final int maxChunkSize; private int alreadyReadChunkSize; - private LastStompContent lastContent; + private LastStompContentSubframe lastContent; private long contentLength; - public StompDecoder() { + public StompSubframeDecoder() { this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); } - public StompDecoder(int maxLineLength, int maxChunkSize) { + public StompSubframeDecoder(int maxLineLength, int maxChunkSize) { super(State.SKIP_CONTROL_CHARACTERS); if (maxLineLength <= 0) { throw new IllegalArgumentException( @@ -88,17 +101,18 @@ public class StompDecoder extends ReplayingDecoder { case SKIP_CONTROL_CHARACTERS: skipControlCharacters(in); checkpoint(State.READ_HEADERS); + // Fall through. case READ_HEADERS: StompCommand command = StompCommand.UNKNOWN; - StompFrame frame = null; + StompHeadersSubframe frame = null; try { command = readCommand(in); - frame = new DefaultStompFrame(command); + frame = new DefaultStompHeadersSubframe(command); checkpoint(readHeaders(in, frame.headers())); out.add(frame); } catch (Exception e) { if (frame == null) { - frame = new DefaultStompFrame(command); + frame = new DefaultStompHeadersSubframe(command); } frame.setDecoderResult(DecoderResult.failure(e)); out.add(frame); @@ -126,27 +140,27 @@ public class StompDecoder extends ReplayingDecoder { } ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead); if ((alreadyReadChunkSize += toRead) >= contentLength) { - lastContent = new DefaultLastStompContent(chunkBuffer); + lastContent = new DefaultLastStompContentSubframe(chunkBuffer); checkpoint(State.FINALIZE_FRAME_READ); } else { - DefaultStompContent chunk; - chunk = new DefaultStompContent(chunkBuffer); + DefaultStompContentSubframe chunk; + chunk = new DefaultStompContentSubframe(chunkBuffer); out.add(chunk); } if (alreadyReadChunkSize < contentLength) { return; } - //fall through + // Fall through. case FINALIZE_FRAME_READ: skipNullCharacter(in); if (lastContent == null) { - lastContent = LastStompContent.EMPTY_LAST_CONTENT; + lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT; } out.add(lastContent); resetDecoder(); } } catch (Exception e) { - StompContent errorContent = new DefaultLastStompContent(Unpooled.EMPTY_BUFFER); + StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER); errorContent.setDecoderResult(DecoderResult.failure(e)); out.add(errorContent); checkpoint(State.BAD_FRAME); @@ -162,7 +176,7 @@ public class StompDecoder extends ReplayingDecoder { //do nothing } if (command == null) { - commandStr = commandStr.toUpperCase(); + commandStr = commandStr.toUpperCase(Locale.US); try { command = StompCommand.valueOf(commandStr); } catch (IllegalArgumentException iae) { @@ -176,20 +190,20 @@ public class StompDecoder extends ReplayingDecoder { } private State readHeaders(ByteBuf buffer, StompHeaders headers) { - while (true) { + for (;;) { String line = readLine(buffer, maxLineLength); - if (line.length() > 0) { - String[] split = line.split(":"); + if (!line.isEmpty()) { + String[] split = StringUtil.split(line, ':'); if (split.length == 2) { headers.add(split[0], split[1]); } } else { long contentLength = -1; if (headers.has(StompHeaders.CONTENT_LENGTH)) { - contentLength = StompHeaders.getContentLength(headers, 0); + contentLength = getContentLength(headers, 0); } else { - int globalIndex = ByteBufUtil.indexOf(buffer, buffer.readerIndex(), - buffer.writerIndex(), StompConstants.NULL); + int globalIndex = indexOf(buffer, buffer.readerIndex(), + buffer.writerIndex(), StompConstants.NUL); if (globalIndex != -1) { contentLength = globalIndex - buffer.readerIndex(); } @@ -204,16 +218,28 @@ public class StompDecoder extends ReplayingDecoder { } } + private static long getContentLength(StompHeaders headers, long defaultValue) { + String contentLength = headers.get(StompHeaders.CONTENT_LENGTH); + if (contentLength != null) { + try { + return Long.parseLong(contentLength); + } catch (NumberFormatException ignored) { + return defaultValue; + } + } + return defaultValue; + } + private static void skipNullCharacter(ByteBuf buffer) { byte b = buffer.readByte(); - if (b != StompConstants.NULL) { + if (b != StompConstants.NUL) { throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte"); } } private static void skipControlCharacters(ByteBuf buffer) { byte b; - while (true) { + for (;;) { b = buffer.readByte(); if (b != StompConstants.CR && b != StompConstants.LF) { buffer.readerIndex(buffer.readerIndex() - 1); @@ -223,23 +249,23 @@ public class StompDecoder extends ReplayingDecoder { } private static String readLine(ByteBuf buffer, int maxLineLength) { - StringBuilder sb = new StringBuilder(); + AppendableCharSequence buf = new AppendableCharSequence(128); int lineLength = 0; - while (true) { + for (;;) { byte nextByte = buffer.readByte(); if (nextByte == StompConstants.CR) { nextByte = buffer.readByte(); if (nextByte == StompConstants.LF) { - return sb.toString(); + return buf.toString(); } } else if (nextByte == StompConstants.LF) { - return sb.toString(); + return buf.toString(); } else { if (lineLength >= maxLineLength) { throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes."); } - lineLength++; - sb.append((char) nextByte); + lineLength ++; + buf.append((char) nextByte); } } } @@ -250,14 +276,4 @@ public class StompDecoder extends ReplayingDecoder { alreadyReadChunkSize = 0; lastContent = null; } - - enum State { - SKIP_CONTROL_CHARACTERS, - READ_HEADERS, - READ_CONTENT, - FINALIZE_FRAME_READ, - BAD_FRAME, - INVALID_CHUNK - } - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompEncoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java similarity index 56% rename from codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompEncoder.java rename to codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java index 8aa05efbad..ac8f286c91 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompEncoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java @@ -15,70 +15,64 @@ */ package io.netty.handler.codec.stomp; -import java.util.Iterator; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; +import java.util.List; + /** - * Encodes a {@link StompFrame} or a {@link FullStompFrame} or a {@link StompContent} into a {@link ByteBuf}. + * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}. */ -public class StompEncoder extends MessageToMessageEncoder { +public class StompSubframeEncoder extends MessageToMessageEncoder { @Override - protected void encode(ChannelHandlerContext ctx, StompObject msg, List out) throws Exception { - if (msg instanceof FullStompFrame) { - FullStompFrame frame = (FullStompFrame) msg; + protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { + if (msg instanceof StompFrame) { + StompFrame frame = (StompFrame) msg; ByteBuf frameBuf = encodeFrame(frame, ctx); out.add(frameBuf); ByteBuf contentBuf = encodeContent(frame, ctx); out.add(contentBuf); - } else if (msg instanceof StompFrame) { - StompFrame frame = (StompFrame) msg; + } else if (msg instanceof StompHeadersSubframe) { + StompHeadersSubframe frame = (StompHeadersSubframe) msg; ByteBuf buf = encodeFrame(frame, ctx); out.add(buf); - } else if (msg instanceof StompContent) { - StompContent stompContent = (StompContent) msg; - ByteBuf buf = encodeContent(stompContent, ctx); + } else if (msg instanceof StompContentSubframe) { + StompContentSubframe stompContentSubframe = (StompContentSubframe) msg; + ByteBuf buf = encodeContent(stompContentSubframe, ctx); out.add(buf); } } - private ByteBuf encodeContent(StompContent content, ChannelHandlerContext ctx) { - if (content instanceof LastStompContent) { + private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) { + if (content instanceof LastStompContentSubframe) { ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1); buf.writeBytes(content.content()); - buf.writeByte(StompConstants.NULL); + buf.writeByte(StompConstants.NUL); return buf; } else { - ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes()); - buf.writeBytes(content.content()); - return buf; + return content.content().retain(); } } - private ByteBuf encodeFrame(StompFrame frame, ChannelHandlerContext ctx) { + private static ByteBuf encodeFrame(StompHeadersSubframe frame, ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes(frame.command().toString().getBytes(CharsetUtil.US_ASCII)); buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF); StompHeaders headers = frame.headers(); - for (Iterator iterator = headers.keySet().iterator(); iterator.hasNext();) { - String key = iterator.next(); - List values = headers.getAll(key); - for (Iterator stringIterator = values.iterator(); stringIterator.hasNext();) { - String value = stringIterator.next(); - buf.writeBytes(key.getBytes(CharsetUtil.US_ASCII)). - writeByte(StompConstants.COLON).writeBytes(value.getBytes(CharsetUtil.US_ASCII)); + for (String k: headers.keySet()) { + List values = headers.getAll(k); + for (String v: values) { + buf.writeBytes(k.getBytes(CharsetUtil.US_ASCII)). + writeByte(StompConstants.COLON).writeBytes(v.getBytes(CharsetUtil.US_ASCII)); buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF); } } buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF); return buf; } - } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/package-info.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/package-info.java index b7213a9e79..dba4f936b9 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/package-info.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 The Netty Project + * Copyright 2014 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -15,6 +15,6 @@ */ /** - * Common superset of ascii and binary classes. + * STOMP codec */ package io.netty.handler.codec.stomp; diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompAggregatorTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java similarity index 84% rename from codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompAggregatorTest.java rename to codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java index 8b8260f515..27c8732563 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompAggregatorTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java @@ -15,24 +15,23 @@ */ package io.netty.handler.codec.stomp; -import io.netty.util.CharsetUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.CharsetUtil; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -public class StompAggregatorTest { +public class StompSubframeAggregatorTest { private EmbeddedChannel channel; @Before public void setup() throws Exception { - channel = new EmbeddedChannel(new StompDecoder(), new StompAggregator(100000)); + channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(100000)); } @After @@ -45,8 +44,8 @@ public class StompAggregatorTest { ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes()); channel.writeInbound(incoming); - StompFrame frame = channel.readInbound(); - Assert.assertTrue(frame instanceof FullStompFrame); + StompHeadersSubframe frame = channel.readInbound(); + Assert.assertTrue(frame instanceof StompFrame); Assert.assertNull(channel.readInbound()); } @@ -55,7 +54,7 @@ public class StompAggregatorTest { ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes()); channel.writeInbound(incoming); - FullStompFrame frame = channel.readInbound(); + StompFrame frame = channel.readInbound(); Assert.assertNotNull(frame); Assert.assertEquals(StompCommand.SEND, frame.command()); Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8)); @@ -64,11 +63,12 @@ public class StompAggregatorTest { @Test public void testSingleFrameChunked() { - EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(10000, 5), new StompAggregator(100000)); + EmbeddedChannel channel = new EmbeddedChannel( + new StompSubframeDecoder(10000, 5), new StompSubframeAggregator(100000)); ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes()); channel.writeInbound(incoming); - FullStompFrame frame = channel.readInbound(); + StompFrame frame = channel.readInbound(); Assert.assertNotNull(frame); Assert.assertEquals(StompCommand.SEND, frame.command()); Assert.assertNull(channel.readInbound()); @@ -81,7 +81,7 @@ public class StompAggregatorTest { incoming.writeBytes(StompTestConstants.CONNECTED_FRAME.getBytes()); channel.writeInbound(incoming); channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes())); - FullStompFrame frame = channel.readInbound(); + StompFrame frame = channel.readInbound(); Assert.assertEquals(StompCommand.CONNECT, frame.command()); frame = channel.readInbound(); Assert.assertEquals(StompCommand.CONNECTED, frame.command()); @@ -92,8 +92,7 @@ public class StompAggregatorTest { @Test(expected = TooLongFrameException.class) public void testTooLongFrameException() { - EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(), new StompAggregator(10)); + EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10)); channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes())); } - } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompDecoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java similarity index 54% rename from codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompDecoderTest.java rename to codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java index a5fdec2c0b..eef5b4b7ef 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompDecoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java @@ -15,28 +15,28 @@ */ package io.netty.handler.codec.stomp; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Assert; +import static org.junit.Assert.*; -public class StompDecoderTest { +public class StompSubframeDecoderTest { private EmbeddedChannel channel; @Before public void setup() throws Exception { - channel = new EmbeddedChannel(new StompDecoder()); + channel = new EmbeddedChannel(new StompSubframeDecoder()); } @After public void teardown() throws Exception { - Assert.assertFalse(channel.finish()); + assertFalse(channel.finish()); } @Test @@ -44,13 +44,13 @@ public class StompDecoderTest { ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes()); channel.writeInbound(incoming); - StompFrame frame = channel.readInbound(); - Assert.assertNotNull(frame); - Assert.assertEquals(StompCommand.CONNECT, frame.command()); - StompContent content = channel.readInbound(); - Assert.assertTrue(content == LastStompContent.EMPTY_LAST_CONTENT); + StompHeadersSubframe frame = channel.readInbound(); + assertNotNull(frame); + assertEquals(StompCommand.CONNECT, frame.command()); + StompContentSubframe content = channel.readInbound(); + assertSame(content, LastStompContentSubframe.EMPTY_LAST_CONTENT); Object o = channel.readInbound(); - Assert.assertNull(o); + assertNull(o); } @Test @@ -58,14 +58,14 @@ public class StompDecoderTest { ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes()); channel.writeInbound(incoming); - StompFrame frame = channel.readInbound(); - Assert.assertNotNull(frame); - Assert.assertEquals(StompCommand.SEND, frame.command()); - StompContent content = channel.readInbound(); - Assert.assertTrue(content instanceof LastStompContent); + StompHeadersSubframe frame = channel.readInbound(); + assertNotNull(frame); + assertEquals(StompCommand.SEND, frame.command()); + StompContentSubframe content = channel.readInbound(); + assertTrue(content instanceof LastStompContentSubframe); String s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals("hello, queue a!!!", s); - Assert.assertNull(channel.readInbound()); + assertEquals("hello, queue a!!!", s); + assertNull(channel.readInbound()); } @Test @@ -73,43 +73,43 @@ public class StompDecoderTest { ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.SEND_FRAME_1.getBytes()); channel.writeInbound(incoming); - StompFrame frame = (StompFrame) channel.readInbound(); - Assert.assertNotNull(frame); - Assert.assertEquals(StompCommand.SEND, frame.command()); - StompContent content = (StompContent) channel.readInbound(); - Assert.assertTrue(content instanceof LastStompContent); + StompHeadersSubframe frame = channel.readInbound(); + assertNotNull(frame); + assertEquals(StompCommand.SEND, frame.command()); + StompContentSubframe content = channel.readInbound(); + assertTrue(content instanceof LastStompContentSubframe); String s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals("hello, queue a!", s); - Assert.assertNull(channel.readInbound()); + assertEquals("hello, queue a!", s); + assertNull(channel.readInbound()); } @Test public void testSingleFrameChunked() { - EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(10000, 5)); + EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(10000, 5)); ByteBuf incoming = Unpooled.buffer(); incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes()); channel.writeInbound(incoming); - StompFrame frame = channel.readInbound(); - Assert.assertNotNull(frame); - Assert.assertEquals(StompCommand.SEND, frame.command()); - StompContent content = channel.readInbound(); + StompHeadersSubframe frame = channel.readInbound(); + assertNotNull(frame); + assertEquals(StompCommand.SEND, frame.command()); + StompContentSubframe content = channel.readInbound(); String s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals("hello", s); + assertEquals("hello", s); content = channel.readInbound(); s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals(", que", s); + assertEquals(", que", s); content = channel.readInbound(); s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals("ue a!", s); + assertEquals("ue a!", s); content = channel.readInbound(); s = content.content().toString(CharsetUtil.UTF_8); - Assert.assertEquals("!!", s); + assertEquals("!!", s); - Assert.assertNull(channel.readInbound()); + assertNull(channel.readInbound()); } @Test @@ -119,17 +119,17 @@ public class StompDecoderTest { incoming.writeBytes(StompTestConstants.CONNECTED_FRAME.getBytes()); channel.writeInbound(incoming); - StompFrame frame = channel.readInbound(); - Assert.assertNotNull(frame); - Assert.assertEquals(StompCommand.CONNECT, frame.command()); - StompContent content = channel.readInbound(); - Assert.assertTrue(content == LastStompContent.EMPTY_LAST_CONTENT); + StompHeadersSubframe frame = channel.readInbound(); + assertNotNull(frame); + assertEquals(StompCommand.CONNECT, frame.command()); + StompContentSubframe content = channel.readInbound(); + assertSame(content, LastStompContentSubframe.EMPTY_LAST_CONTENT); - StompFrame frame2 = channel.readInbound(); - Assert.assertNotNull(frame2); - Assert.assertEquals(StompCommand.CONNECTED, frame2.command()); - StompContent content2 = channel.readInbound(); - Assert.assertTrue(content2 == LastStompContent.EMPTY_LAST_CONTENT); - Assert.assertNull(channel.readInbound()); + StompHeadersSubframe frame2 = channel.readInbound(); + assertNotNull(frame2); + assertEquals(StompCommand.CONNECTED, frame2.command()); + StompContentSubframe content2 = channel.readInbound(); + assertSame(content2, LastStompContentSubframe.EMPTY_LAST_CONTENT); + assertNull(channel.readInbound()); } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompEncoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java similarity index 78% rename from codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompEncoderTest.java rename to codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java index 40aa6873e9..90402f285d 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompEncoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java @@ -15,49 +15,48 @@ */ package io.netty.handler.codec.stomp; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Assert; +import static org.junit.Assert.*; -public class StompEncoderTest { +public class StompSubframeEncoderTest { private EmbeddedChannel channel; @Before public void setup() throws Exception { - channel = new EmbeddedChannel(new StompEncoder()); + channel = new EmbeddedChannel(new StompSubframeEncoder()); } @After public void teardown() throws Exception { - Assert.assertFalse(channel.finish()); + assertFalse(channel.finish()); } @Test public void testFrameAndContentEncoding() { - StompFrame frame = new DefaultStompFrame(StompCommand.CONNECT); + StompHeadersSubframe frame = new DefaultStompHeadersSubframe(StompCommand.CONNECT); StompHeaders headers = frame.headers(); headers.set(StompHeaders.ACCEPT_VERSION, "1.1,1.2"); headers.set(StompHeaders.HOST, "stomp.github.org"); channel.writeOutbound(frame); - channel.writeOutbound(LastStompContent.EMPTY_LAST_CONTENT); + channel.writeOutbound(LastStompContentSubframe.EMPTY_LAST_CONTENT); ByteBuf aggregatedBuffer = Unpooled.buffer(); ByteBuf byteBuf = channel.readOutbound(); - Assert.assertNotNull(byteBuf); + assertNotNull(byteBuf); aggregatedBuffer.writeBytes(byteBuf); byteBuf = channel.readOutbound(); - Assert.assertNotNull(byteBuf); + assertNotNull(byteBuf); aggregatedBuffer.writeBytes(byteBuf); aggregatedBuffer.resetReaderIndex(); String content = aggregatedBuffer.toString(CharsetUtil.UTF_8); - Assert.assertEquals(StompTestConstants.CONNECT_FRAME, content); + assertEquals(StompTestConstants.CONNECT_FRAME, content); } - } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java index 0abed751cd..b9246c1d0d 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java @@ -21,7 +21,7 @@ public final class StompTestConstants { "host:stomp.github.org\r\n" + "accept-version:1.1,1.2\r\n" + "\r\n" + - "\0"; + '\0'; public static final String CONNECTED_FRAME = "CONNECTED\r\n" + "version:1.2\n" + @@ -43,6 +43,5 @@ public final class StompTestConstants { "hello, queue a!!!" + "\0\n"; - private StompTestConstants() { - } + private StompTestConstants() { } } diff --git a/example/src/main/java/io/netty/example/stomp/StompClient.java b/example/src/main/java/io/netty/example/stomp/StompClient.java index 78e770b43a..5527316f26 100644 --- a/example/src/main/java/io/netty/example/stomp/StompClient.java +++ b/example/src/main/java/io/netty/example/stomp/StompClient.java @@ -16,148 +16,49 @@ package io.netty.example.stomp; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.stomp.FullStompFrame; -import io.netty.handler.codec.stomp.DefaultFullStompFrame; -import io.netty.handler.codec.stomp.StompCommand; -import io.netty.handler.codec.stomp.StompHeaders; -import io.netty.handler.codec.stomp.StompEncoder; -import io.netty.handler.codec.stomp.StompDecoder; -import io.netty.handler.codec.stomp.StompAggregator; +import io.netty.handler.codec.stomp.StompSubframeAggregator; +import io.netty.handler.codec.stomp.StompSubframeDecoder; +import io.netty.handler.codec.stomp.StompSubframeEncoder; /** * very simple stomp client implementation example, requires running stomp server to actually work * uses default username/password and destination values from hornetq message broker */ -public class StompClient implements StompFrameListener { - public static final String DEAFULT_HOST = "localhost"; - public static final int DEFAULT_PORT = 61613; - public static final String DEFAULT_USERNAME = "guest"; - public static final String DEFAULT_PASSWORD = "guest"; - private static final String EXAMPLE_TOPIC = "jms.topic.exampleTopic"; +public final class StompClient { - private final String host; - private final int port; - private final String username; - private final String password; - private ClientState state = ClientState.CONNECTING; - private Channel ch; + static final boolean SSL = System.getProperty("ssl") != null; + static final String HOST = System.getProperty("host", "127.0.0.1"); + static final int PORT = Integer.parseInt(System.getProperty("port", "61613")); + static final String LOGIN = System.getProperty("login", "guest"); + static final String PASSCODE = System.getProperty("passcode", "guest"); + static final String TOPIC = System.getProperty("topic", "jms.topic.exampleTopic"); public static void main(String[] args) throws Exception { - String host; - int port; - String username; - String password; - if (args.length == 0) { - host = DEAFULT_HOST; - port = DEFAULT_PORT; - username = DEFAULT_USERNAME; - password = DEFAULT_PASSWORD; - } else if (args.length == 4) { - host = args[0]; - port = Integer.parseInt(args[1]); - username = args[2]; - password = args[3]; - } else { - System.err.println("Usage: " + StompClient.class.getSimpleName() + " "); - return; - } - StompClient stompClient = new StompClient(host, port, username, password); - stompClient.run(); - } - - public StompClient(String host, int port, String username, String password) { - this.host = host; - this.port = port; - this.username = username; - this.password = password; - } - - public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); - Bootstrap b = new Bootstrap(); - final StompClient that = this; - b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("decoder", new StompDecoder()); - pipeline.addLast("encoder", new StompEncoder()); - pipeline.addLast("aggregator", new StompAggregator(1048576)); - pipeline.addLast("handler", new StompClientHandler(that)); - } - }); - b.remoteAddress(host, port); - - this.ch = b.connect().sync().channel(); - - FullStompFrame connFrame = new DefaultFullStompFrame(StompCommand.CONNECT); - connFrame.headers().set(StompHeaders.ACCEPT_VERSION, "1.2"); - connFrame.headers().set(StompHeaders.HOST, host); - connFrame.headers().set(StompHeaders.LOGIN, username); - connFrame.headers().set(StompHeaders.PASSCODE, password); - ch.writeAndFlush(connFrame).sync(); - } - - @Override - public void onFrame(FullStompFrame frame) { - String subscrReceiptId = "001"; - String disconReceiptId = "002"; try { - switch (frame.command()) { - case CONNECTED: - FullStompFrame subscribeFrame = new DefaultFullStompFrame(StompCommand.SUBSCRIBE); - subscribeFrame.headers().set(StompHeaders.DESTINATION, EXAMPLE_TOPIC); - subscribeFrame.headers().set(StompHeaders.RECEIPT, subscrReceiptId); - subscribeFrame.headers().set(StompHeaders.ID, "1"); - System.out.println("connected, sending subscribe frame: " + subscribeFrame); - state = ClientState.CONNECTED; - ch.writeAndFlush(subscribeFrame); - break; - case RECEIPT: - String receiptHeader = frame.headers().get(StompHeaders.RECEIPT_ID); - if (state == ClientState.CONNECTED && receiptHeader.equals(subscrReceiptId)) { - FullStompFrame msgFrame = new DefaultFullStompFrame(StompCommand.SEND); - msgFrame.headers().set(StompHeaders.DESTINATION, EXAMPLE_TOPIC); - msgFrame.content().writeBytes("some payload".getBytes()); - System.out.println("subscribed, sending message frame: " + msgFrame); - state = ClientState.SUBSCRIBED; - ch.writeAndFlush(msgFrame); - } else if (state == ClientState.DISCONNECTING && receiptHeader.equals(disconReceiptId)) { - System.out.println("disconnected, exiting.."); - System.exit(0); - } else { - throw new IllegalStateException("received: " + frame + ", while internal state is " + state); - } - break; - case MESSAGE: - if (state == ClientState.SUBSCRIBED) { - System.out.println("received frame: " + frame); - FullStompFrame disconnFrame = new DefaultFullStompFrame(StompCommand.DISCONNECT); - disconnFrame.headers().set(StompHeaders.RECEIPT, disconReceiptId); - System.out.println("sending disconnect frame: " + disconnFrame); - state = ClientState.DISCONNECTING; - ch.writeAndFlush(disconnFrame); - } - break; - default: - break; - } - } catch (Exception e) { - e.printStackTrace(); - } - } + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("decoder", new StompSubframeDecoder()); + pipeline.addLast("encoder", new StompSubframeEncoder()); + pipeline.addLast("aggregator", new StompSubframeAggregator(1048576)); + pipeline.addLast("handler", new StompClientHandler()); + } + }); - enum ClientState { - CONNECTING, - CONNECTED, - SUBSCRIBED, - DISCONNECTING + b.connect(HOST, PORT).sync().channel().closeFuture().sync(); + } finally { + group.shutdownGracefully(); + } } } diff --git a/example/src/main/java/io/netty/example/stomp/StompClientHandler.java b/example/src/main/java/io/netty/example/stomp/StompClientHandler.java index 080d689f32..d8e020a2c8 100644 --- a/example/src/main/java/io/netty/example/stomp/StompClientHandler.java +++ b/example/src/main/java/io/netty/example/stomp/StompClientHandler.java @@ -17,23 +17,84 @@ package io.netty.example.stomp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.stomp.FullStompFrame; +import io.netty.handler.codec.stomp.DefaultStompFrame; +import io.netty.handler.codec.stomp.StompCommand; +import io.netty.handler.codec.stomp.StompFrame; +import io.netty.handler.codec.stomp.StompHeaders; /** * STOMP client inbound handler implementation, which just passes received messages to listener */ -public class StompClientHandler extends SimpleChannelInboundHandler { - private final StompFrameListener listener; +public class StompClientHandler extends SimpleChannelInboundHandler { - public StompClientHandler(StompFrameListener listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - this.listener = listener; + private enum ClientState { + AUTHENTICATING, + AUTHENTICATED, + SUBSCRIBED, + DISCONNECTING + } + + private ClientState state; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + state = ClientState.AUTHENTICATING; + StompFrame connFrame = new DefaultStompFrame(StompCommand.CONNECT); + connFrame.headers().set(StompHeaders.ACCEPT_VERSION, "1.2"); + connFrame.headers().set(StompHeaders.HOST, StompClient.HOST); + connFrame.headers().set(StompHeaders.LOGIN, StompClient.LOGIN); + connFrame.headers().set(StompHeaders.PASSCODE, StompClient.PASSCODE); + ctx.writeAndFlush(connFrame); } @Override - protected void messageReceived(ChannelHandlerContext ctx, FullStompFrame msg) throws Exception { - listener.onFrame(msg); + protected void messageReceived(ChannelHandlerContext ctx, StompFrame frame) throws Exception { + String subscrReceiptId = "001"; + String disconReceiptId = "002"; + switch (frame.command()) { + case CONNECTED: + StompFrame subscribeFrame = new DefaultStompFrame(StompCommand.SUBSCRIBE); + subscribeFrame.headers().set(StompHeaders.DESTINATION, StompClient.TOPIC); + subscribeFrame.headers().set(StompHeaders.RECEIPT, subscrReceiptId); + subscribeFrame.headers().set(StompHeaders.ID, "1"); + System.out.println("connected, sending subscribe frame: " + subscribeFrame); + state = ClientState.AUTHENTICATED; + ctx.writeAndFlush(subscribeFrame); + break; + case RECEIPT: + String receiptHeader = frame.headers().get(StompHeaders.RECEIPT_ID); + if (state == ClientState.AUTHENTICATED && receiptHeader.equals(subscrReceiptId)) { + StompFrame msgFrame = new DefaultStompFrame(StompCommand.SEND); + msgFrame.headers().set(StompHeaders.DESTINATION, StompClient.TOPIC); + msgFrame.content().writeBytes("some payload".getBytes()); + System.out.println("subscribed, sending message frame: " + msgFrame); + state = ClientState.SUBSCRIBED; + ctx.writeAndFlush(msgFrame); + } else if (state == ClientState.DISCONNECTING && receiptHeader.equals(disconReceiptId)) { + System.out.println("disconnected"); + ctx.close(); + } else { + throw new IllegalStateException("received: " + frame + ", while internal state is " + state); + } + break; + case MESSAGE: + if (state == ClientState.SUBSCRIBED) { + System.out.println("received frame: " + frame); + StompFrame disconnFrame = new DefaultStompFrame(StompCommand.DISCONNECT); + disconnFrame.headers().set(StompHeaders.RECEIPT, disconReceiptId); + System.out.println("sending disconnect frame: " + disconnFrame); + state = ClientState.DISCONNECTING; + ctx.writeAndFlush(disconnFrame); + } + break; + default: + break; + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); } } diff --git a/run-example.sh b/run-example.sh index 9a188accde..b931bab17b 100755 --- a/run-example.sh +++ b/run-example.sh @@ -31,6 +31,7 @@ EXAMPLE_MAP=( 'proxy-server:io.netty.example.proxy.HexDumpProxy' 'socksproxy-server:io.netty.example.socksproxy.SocksServer' 'memcache-binary-client:io.netty.example.memcache.binary.MemcacheClient' + 'stomp-client:io.netty.example.stomp.StompClient' 'uptime-client:io.netty.example.uptime.UptimeClient' 'sctpecho-client:io.netty.example.sctp.SctpEchoClient' 'sctpecho-server:io.netty.example.sctp.SctpEchoServer'