Initial STOMP protocol work from @sskachkov

This commit is contained in:
Sergey Skachkov 2014-01-30 12:18:30 -08:00 committed by Trustin Lee
parent d1b90774bc
commit b286079205
27 changed files with 1926 additions and 0 deletions

View File

@ -140,6 +140,13 @@
<scope>compile</scope> <scope>compile</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-stomp</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-common</artifactId> <artifactId>netty-common</artifactId>

44
codec-stomp/pom.xml Normal file
View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.0.Alpha1-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-stomp</artifactId>
<packaging>jar</packaging>
<name>Netty/Codec/Stomp</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,105 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
* Default implementation of {@link FullStompFrame}.
*/
public class DefaultFullStompFrame extends DefaultStompFrame implements FullStompFrame {
private final ByteBuf content;
public DefaultFullStompFrame(StompCommand command) {
this(command, Unpooled.buffer(0));
if (command == null) {
throw new NullPointerException("command");
}
}
public DefaultFullStompFrame(StompCommand command, ByteBuf content) {
super(command);
if (content == null) {
throw new NullPointerException("content");
}
this.content = content;
}
@Override
public ByteBuf content() {
return content;
}
@Override
public FullStompFrame copy() {
return new DefaultFullStompFrame(command, content.copy());
}
@Override
public FullStompFrame duplicate() {
return new DefaultFullStompFrame(command, content.duplicate());
}
@Override
public int refCnt() {
return content.refCnt();
}
@Override
public FullStompFrame retain() {
content.retain();
return this;
}
@Override
public FullStompFrame retain(int increment) {
content.retain();
return this;
}
@Override
public FullStompFrame touch() {
content.touch();
return this;
}
@Override
public FullStompFrame touch(Object hint) {
content.touch(hint);
return this;
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override
public String toString() {
return "DefaultFullStompFrame{" +
"command=" + command +
", headers=" + headers +
", content=" + content.toString(CharsetUtil.UTF_8) +
'}';
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
/**
* The default implementation for the {@link LastStompContent}.
*/
public class DefaultLastStompContent extends DefaultStompContent implements LastStompContent {
public DefaultLastStompContent(ByteBuf content) {
super(content);
}
@Override
public DefaultLastStompContent retain() {
super.retain();
return this;
}
@Override
public LastStompContent retain(int increment) {
super.retain(increment);
return this;
}
@Override
public LastStompContent touch() {
super.touch();
return this;
}
@Override
public LastStompContent touch(Object hint) {
super.touch(hint);
return this;
}
@Override
public LastStompContent copy() {
return new DefaultLastStompContent(content().copy());
}
@Override
public LastStompContent duplicate() {
return new DefaultLastStompContent(content().duplicate());
}
@Override
public String toString() {
return "DefaultLastStompContent{" +
"decoderResult=" + getDecoderResult() +
'}';
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderResult;
/**
* The default {@link StompContent} implementation.
*/
public class DefaultStompContent implements StompContent {
private DecoderResult decoderResult;
private final ByteBuf content;
public DefaultStompContent(ByteBuf content) {
if (content == null) {
throw new NullPointerException("content");
}
this.content = content;
}
@Override
public ByteBuf content() {
return content;
}
@Override
public StompContent copy() {
return new DefaultStompContent(content().copy());
}
@Override
public StompContent duplicate() {
return new DefaultStompContent(content().duplicate());
}
@Override
public int refCnt() {
return content().refCnt();
}
@Override
public StompContent retain() {
content().retain();
return this;
}
@Override
public StompContent retain(int increment) {
content().retain(increment);
return this;
}
@Override
public StompContent touch() {
content.toString();
return this;
}
@Override
public StompContent touch(Object hint) {
content.touch(hint);
return this;
}
@Override
public boolean release() {
return content().release();
}
@Override
public boolean release(int decrement) {
return content().release(decrement);
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult;
}
@Override
public void setDecoderResult(DecoderResult result) {
this.decoderResult = result;
}
@Override
public String toString() {
return "DefaultStompContent{" +
"decoderResult=" + decoderResult +
'}';
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.handler.codec.DecoderResult;
/**
* Default implementation of {@link StompFrame}.
*/
public class DefaultStompFrame implements StompFrame {
protected final StompCommand command;
protected DecoderResult decoderResult;
protected final StompHeaders headers = new StompHeaders();
public DefaultStompFrame(StompCommand command) {
if (command == null) {
throw new NullPointerException("command");
}
this.command = command;
}
@Override
public StompCommand command() {
return command;
}
@Override
public StompHeaders headers() {
return headers;
}
@Override
public DecoderResult getDecoderResult() {
return decoderResult;
}
@Override
public void setDecoderResult(DecoderResult decoderResult) {
this.decoderResult = decoderResult;
}
@Override
public String toString() {
return "StompFrame{" +
"command=" + command +
", headers=" + headers +
'}';
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
/**
* Combines {@link StompFrame} and {@link LastStompContent} into one
* frame. So it represent a <i>complete</i> STOMP frame.
*/
public interface FullStompFrame extends StompFrame, LastStompContent {
@Override
FullStompFrame copy();
@Override
FullStompFrame duplicate();
@Override
FullStompFrame retain();
@Override
FullStompFrame retain(int increment);
@Override
FullStompFrame touch();
@Override
FullStompFrame touch(Object hint);
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderResult;
/**
* The last {@link StompContent} which signals the end of the content batch
* <p/>
* Note, even when no content is emitted by the protocol, an
* empty {@link LastStompContent} is issued to make the upstream parsing
* easier.
*/
public interface LastStompContent extends StompContent {
LastStompContent EMPTY_LAST_CONTENT = new LastStompContent() {
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public LastStompContent copy() {
return EMPTY_LAST_CONTENT;
}
@Override
public LastStompContent duplicate() {
return this;
}
@Override
public LastStompContent retain() {
return this;
}
@Override
public LastStompContent retain(int increment) {
return this;
}
@Override
public LastStompContent touch() {
return this;
}
@Override
public LastStompContent touch(Object hint) {
return this;
}
@Override
public int refCnt() {
return 1;
}
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
@Override
public DecoderResult getDecoderResult() {
return DecoderResult.SUCCESS;
}
@Override
public void setDecoderResult(DecoderResult result) {
throw new UnsupportedOperationException("read only");
}
};
@Override
LastStompContent copy();
@Override
LastStompContent duplicate();
@Override
LastStompContent retain();
@Override
LastStompContent retain(int increment);
@Override
LastStompContent touch();
@Override
LastStompContent touch(Object hint);
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import java.util.List;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
/**
* A {@link io.netty.channel.ChannelHandler} that aggregates an {@link StompFrame}
* and its following {@link StompContent}s into a single {@link StompFrame} with
* no following {@link StompContent}s. It is useful when you don't want to take
* care of STOMP frames whose content is 'chunked'. Insert this
* handler after {@link StompDecoder} in the {@link io.netty.channel.ChannelPipeline}:
*/
public class StompAggregator extends MessageToMessageDecoder<StompObject> {
public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
private final int maxContentLength;
private FullStompFrame currentFrame;
private boolean tooLongFrameFound;
private volatile boolean handlerAdded;
/**
* Creates a new instance.
*
* @param maxContentLength
* the maximum length of the aggregated content.
* If the length of the aggregated content exceeds this value,
* a {@link TooLongFrameException} will be raised.
*/
public StompAggregator(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException(
"maxContentLength must be a positive integer: " +
maxContentLength);
}
this.maxContentLength = maxContentLength;
}
/**
* Sets the maximum number of components in the cumulation buffer. If the number of
* the components in the cumulation buffer exceeds this value, the components of the
* cumulation buffer are consolidated into a single component, involving memory copies.
* The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
* and its minimum allowed value is {@code 2}.
*/
public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
if (maxCumulationBufferComponents < 2) {
throw new IllegalArgumentException(
"maxCumulationBufferComponents: " + maxCumulationBufferComponents +
" (expected: >= 2)");
}
if (!handlerAdded) {
this.maxCumulationBufferComponents = maxCumulationBufferComponents;
} else {
throw new IllegalStateException(
"decoder properties cannot be changed once the decoder is added to a pipeline.");
}
}
@Override
protected void decode(ChannelHandlerContext ctx, StompObject msg, List<Object> out) throws Exception {
FullStompFrame currentFrame = this.currentFrame;
if (msg instanceof StompFrame) {
assert currentFrame == null;
StompFrame frame = (StompFrame) msg;
this.currentFrame = currentFrame = new DefaultFullStompFrame(frame.command(),
Unpooled.compositeBuffer(maxCumulationBufferComponents));
currentFrame.headers().set(frame.headers());
} else if (msg instanceof StompContent) {
if (tooLongFrameFound) {
if (msg instanceof LastStompContent) {
this.currentFrame = null;
}
return;
}
assert currentFrame != null;
StompContent chunk = (StompContent) msg;
CompositeByteBuf contentBuf = (CompositeByteBuf) currentFrame.content();
if (contentBuf.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
tooLongFrameFound = true;
currentFrame.release();
this.currentFrame = null;
throw new TooLongFrameException(
"STOMP content length exceeded " + maxContentLength +
" bytes.");
}
contentBuf.addComponent(chunk.retain().content());
contentBuf.writerIndex(contentBuf.writerIndex() + chunk.content().readableBytes());
if (chunk instanceof LastStompContent) {
out.add(currentFrame);
this.currentFrame = null;
}
} else {
throw new IllegalArgumentException("received unsupported object type " + msg);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.handlerAdded = true;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
this.handlerAdded = false;
if (currentFrame != null) {
currentFrame.release();
currentFrame = null;
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
/**
* STOMP command
*/
public enum StompCommand {
STOMP, CONNECT, CONNECTED, SEND, SUBSCRIBE, UNSUBSCRIBE, ACK, NACK, BEGIN, DISCONNECT, MESSAGE, RECEIPT, ERROR,
UNKNOWN
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
final class StompConstants {
public static final byte CR = 13;
public static final byte LF = 10;
public static final byte NULL = 0;
public static final byte COLON = 58;
private StompConstants() {
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBufHolder;
/**
* An STOMP chunk which is used for STOMP chunked transfer-encoding.
* {@link StompDecoder} generates {@link StompContent} after
* {@link StompFrame} when the content is large or the encoding of the content
* is 'chunked. If you prefer not to receive {@link StompContent} in your handler,
* place {@link StompAggregator} after {@link StompDecoder} in the
* {@link io.netty.channel.ChannelPipeline}.
*/
public interface StompContent extends ByteBufHolder, StompObject {
@Override
StompContent copy();
@Override
StompContent duplicate();
@Override
StompContent retain();
@Override
StompContent retain(int increment);
@Override
StompContent touch();
@Override
StompContent touch(Object hint);
}

View File

@ -0,0 +1,263 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.stomp.StompDecoder.State;
import static io.netty.buffer.ByteBufUtil.readBytes;
/**
* Decodes {@link ByteBuf}s into {@link StompFrame}s and
* {@link StompContent}s.
*
* <h3>Parameters to control memory consumption: </h3>
* {@code maxLineLength} the maximum length of line -
* restricts length of command and header lines
* If the length of the initial line exceeds this value, a
* {@link TooLongFrameException} will be raised.
* <br>
* {@code maxChunkSize}
* The maximum length of the content or each chunk. If the content length
* (or the length of each chunk) exceeds this value, the content or chunk
* ill be split into multiple {@link StompContent}s whose length is
* {@code maxChunkSize} at maximum.
*
* <h3>Chunked Content</h3>
*
* If the content of a stomp message is greater than {@code maxChunkSize}
* the transfer encoding of the HTTP message is 'chunked', this decoder
* generates multiple {@link StompContent} instances to avoid excessive memory
* consumption. Note, that every message, even with no content decodes with
* {@link LastStompContent} at the end to simplify upstream message parsing.
*/
public class StompDecoder extends ReplayingDecoder<State> {
public static final int DEFAULT_CHUNK_SIZE = 8132;
public static final int DEFAULT_MAX_LINE_LENGTH = 1024;
private int maxLineLength;
private int maxChunkSize;
private int alreadyReadChunkSize;
private LastStompContent lastContent;
private long contentLength;
public StompDecoder() {
this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
}
public StompDecoder(int maxLineLength, int maxChunkSize) {
super(State.SKIP_CONTROL_CHARACTERS);
if (maxLineLength <= 0) {
throw new IllegalArgumentException(
"maxLineLength must be a positive integer: " +
maxLineLength);
}
if (maxChunkSize <= 0) {
throw new IllegalArgumentException(
"maxChunkSize must be a positive integer: " +
maxChunkSize);
}
this.maxChunkSize = maxChunkSize;
this.maxLineLength = maxLineLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case SKIP_CONTROL_CHARACTERS:
skipControlCharacters(in);
checkpoint(State.READ_HEADERS);
case READ_HEADERS:
StompCommand command = StompCommand.UNKNOWN;
StompFrame frame = null;
try {
command = readCommand(in);
frame = new DefaultStompFrame(command);
checkpoint(readHeaders(in, frame.headers()));
out.add(frame);
} catch (Exception e) {
if (frame == null) {
frame = new DefaultStompFrame(command);
}
frame.setDecoderResult(DecoderResult.failure(e));
out.add(frame);
checkpoint(State.BAD_FRAME);
return;
}
break;
case BAD_FRAME:
in.skipBytes(actualReadableBytes());
return;
}
try {
switch (state()) {
case READ_CONTENT:
int toRead = in.readableBytes();
if (toRead == 0) {
return;
}
if (toRead > maxChunkSize) {
toRead = maxChunkSize;
}
int remainingLength = (int) (contentLength - alreadyReadChunkSize);
if (toRead > remainingLength) {
toRead = remainingLength;
}
ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
if ((alreadyReadChunkSize += toRead) >= contentLength) {
lastContent = new DefaultLastStompContent(chunkBuffer);
checkpoint(State.FINALIZE_FRAME_READ);
} else {
DefaultStompContent chunk;
chunk = new DefaultStompContent(chunkBuffer);
out.add(chunk);
}
if (alreadyReadChunkSize < contentLength) {
return;
}
//fall through
case FINALIZE_FRAME_READ:
skipNullCharacter(in);
if (lastContent == null) {
lastContent = LastStompContent.EMPTY_LAST_CONTENT;
}
out.add(lastContent);
resetDecoder();
}
} catch (Exception e) {
StompContent errorContent = new DefaultLastStompContent(Unpooled.EMPTY_BUFFER);
errorContent.setDecoderResult(DecoderResult.failure(e));
out.add(errorContent);
checkpoint(State.BAD_FRAME);
}
}
private StompCommand readCommand(ByteBuf in) {
String commandStr = readLine(in, maxLineLength);
StompCommand command = null;
try {
command = StompCommand.valueOf(commandStr);
} catch (IllegalArgumentException iae) {
//do nothing
}
if (command == null) {
commandStr = commandStr.toUpperCase();
try {
command = StompCommand.valueOf(commandStr);
} catch (IllegalArgumentException iae) {
//do nothing
}
}
if (command == null) {
throw new DecoderException("failed to read command from channel");
}
return command;
}
private State readHeaders(ByteBuf buffer, StompHeaders headers) {
while (true) {
String line = readLine(buffer, maxLineLength);
if (line.length() > 0) {
String[] split = line.split(":");
if (split.length == 2) {
headers.add(split[0], split[1]);
}
} else {
long contentLength = -1;
if (headers.has(StompHeaders.CONTENT_LENGTH)) {
contentLength = StompHeaders.getContentLength(headers, 0);
} else {
int globalIndex = ByteBufUtil.indexOf(buffer, buffer.readerIndex(),
buffer.writerIndex(), StompConstants.NULL);
if (globalIndex != -1) {
contentLength = globalIndex - buffer.readerIndex();
}
}
if (contentLength > 0) {
this.contentLength = contentLength;
return State.READ_CONTENT;
} else {
return State.FINALIZE_FRAME_READ;
}
}
}
}
private static void skipNullCharacter(ByteBuf buffer) {
byte b = buffer.readByte();
if (b != StompConstants.NULL) {
throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
}
}
private static void skipControlCharacters(ByteBuf buffer) {
byte b;
while (true) {
b = buffer.readByte();
if (b != StompConstants.CR && b != StompConstants.LF) {
buffer.readerIndex(buffer.readerIndex() - 1);
break;
}
}
}
private static String readLine(ByteBuf buffer, int maxLineLength) {
StringBuilder sb = new StringBuilder();
int lineLength = 0;
while (true) {
byte nextByte = buffer.readByte();
if (nextByte == StompConstants.CR) {
nextByte = buffer.readByte();
if (nextByte == StompConstants.LF) {
return sb.toString();
}
} else if (nextByte == StompConstants.LF) {
return sb.toString();
} else {
if (lineLength >= maxLineLength) {
throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
}
lineLength++;
sb.append((char) nextByte);
}
}
}
private void resetDecoder() {
checkpoint(State.SKIP_CONTROL_CHARACTERS);
contentLength = 0;
alreadyReadChunkSize = 0;
lastContent = null;
}
enum State {
SKIP_CONTROL_CHARACTERS,
READ_HEADERS,
READ_CONTENT,
FINALIZE_FRAME_READ,
BAD_FRAME,
INVALID_CHUNK
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import java.util.Iterator;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
/**
* Encodes a {@link StompFrame} or a {@link FullStompFrame} or a {@link StompContent} into a {@link ByteBuf}.
*/
public class StompEncoder extends MessageToMessageEncoder<StompObject> {
@Override
protected void encode(ChannelHandlerContext ctx, StompObject msg, List<Object> out) throws Exception {
if (msg instanceof FullStompFrame) {
FullStompFrame frame = (FullStompFrame) msg;
ByteBuf frameBuf = encodeFrame(frame, ctx);
out.add(frameBuf);
ByteBuf contentBuf = encodeContent(frame, ctx);
out.add(contentBuf);
} else if (msg instanceof StompFrame) {
StompFrame frame = (StompFrame) msg;
ByteBuf buf = encodeFrame(frame, ctx);
out.add(buf);
} else if (msg instanceof StompContent) {
StompContent stompContent = (StompContent) msg;
ByteBuf buf = encodeContent(stompContent, ctx);
out.add(buf);
}
}
private ByteBuf encodeContent(StompContent content, ChannelHandlerContext ctx) {
if (content instanceof LastStompContent) {
ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
buf.writeBytes(content.content());
buf.writeByte(StompConstants.NULL);
return buf;
} else {
ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes());
buf.writeBytes(content.content());
return buf;
}
}
private ByteBuf encodeFrame(StompFrame frame, ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes(frame.command().toString().getBytes(CharsetUtil.US_ASCII));
buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF);
StompHeaders headers = frame.headers();
for (Iterator<String> iterator = headers.keySet().iterator(); iterator.hasNext();) {
String key = iterator.next();
List<String> values = headers.getAll(key);
for (Iterator<String> stringIterator = values.iterator(); stringIterator.hasNext();) {
String value = stringIterator.next();
buf.writeBytes(key.getBytes(CharsetUtil.US_ASCII)).
writeByte(StompConstants.COLON).writeBytes(value.getBytes(CharsetUtil.US_ASCII));
buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF);
}
}
buf.writeByte(StompConstants.CR).writeByte(StompConstants.LF);
return buf;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
/**
* An interface that defines a Stomp frame
*
* @see StompFrame
* @see FullStompFrame
* @see StompHeaders
*/
public interface StompFrame extends StompObject {
/**
* returns command of this frame
* @return the command
*/
StompCommand command();
/**
* returns headers of this frame
* @return the headers object
*/
StompHeaders headers();
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Provides the constants for the standard STOMP header names and values and
* commonly used utility methods that accesses an {@link StompFrame}.
*/
public class StompHeaders {
public static final String ACCEPT_VERSION = "accept-version";
public static final String HOST = "host";
public static final String LOGIN = "login";
public static final String PASSCODE = "passcode";
public static final String HEART_BEAT = "heart-beat";
public static final String VERSION = "version";
public static final String SESSION = "session";
public static final String SERVER = "server";
public static final String DESTINATION = "destination";
public static final String ID = "id";
public static final String ACK = "ack";
public static final String TRANSACTION = "transaction";
public static final String RECEIPT = "receipt";
public static final String MESSAGE_ID = "message-id";
public static final String SUBSCRIPTION = "subscription";
public static final String RECEIPT_ID = "receipt-id";
public static final String MESSAGE = "message";
public static final String CONTENT_LENGTH = "content-length";
public static final String CONTENT_TYPE = "content-type";
public static long getContentLength(StompHeaders headers, long defaultValue) {
String contentLength = headers.get(CONTENT_LENGTH);
if (contentLength != null) {
try {
return Long.parseLong(contentLength);
} catch (NumberFormatException e) {
return defaultValue;
}
}
return defaultValue;
}
private final Map<String, List<String>> headers = new HashMap<String, List<String>>();
public boolean has(String key) {
List<String> values = headers.get(key);
return values != null && values.size() > 0;
}
public String get(String key) {
List<String> values = headers.get(key);
if (values != null && values.size() > 0) {
return values.get(0);
} else {
return null;
}
}
public void add(String key, String value) {
List<String> values = headers.get(key);
if (values == null) {
values = new ArrayList<String>();
headers.put(key, values);
}
values.add(value);
}
public void set(String key, String value) {
headers.put(key, Arrays.asList(value));
}
public List<String> getAll(String key) {
List<String> values = headers.get(key);
if (values != null) {
return new ArrayList<String>(values);
} else {
return new ArrayList<String>();
}
}
public Set<String> keySet() {
return headers.keySet();
}
@Override
public String toString() {
return "StompHeaders{" +
headers +
'}';
}
public void set(StompHeaders headers) {
for (Iterator<String> iterator = headers.keySet().iterator(); iterator.hasNext();) {
String key = iterator.next();
List<String> values = headers.getAll(key);
this.headers.put(key, values);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.handler.codec.DecoderResult;
/**
* Defines a common interface for all {@link StompObject} implementations.
*/
public interface StompObject {
/**
* Returns the result of decoding this object.
*/
DecoderResult getDecoderResult();
/**
* Updates the result of decoding this object. This method is supposed to be invoked by {@link StompDecoder}.
* Do not call this method unless you know what you are doing.
*/
void setDecoderResult(DecoderResult result);
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Common superset of ascii and binary classes.
*/
package io.netty.handler.codec.stomp;

View File

@ -0,0 +1,99 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.util.CharsetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.TooLongFrameException;
import org.junit.Assert;
public class StompAggregatorTest {
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new StompDecoder(), new StompAggregator(100000));
}
@After
public void teardown() throws Exception {
Assert.assertFalse(channel.finish());
}
@Test
public void testSingleFrameDecoding() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertTrue(frame instanceof FullStompFrame);
Assert.assertNull(channel.readInbound());
}
@Test
public void testSingleFrameWithBodyAndContentLength() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes());
channel.writeInbound(incoming);
FullStompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8));
Assert.assertNull(channel.readInbound());
}
@Test
public void testSingleFrameChunked() {
EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(10000, 5), new StompAggregator(100000));
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes());
channel.writeInbound(incoming);
FullStompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
Assert.assertNull(channel.readInbound());
}
@Test
public void testMultipleFramesDecoding() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes());
incoming.writeBytes(StompTestConstants.CONNECTED_FRAME.getBytes());
channel.writeInbound(incoming);
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes()));
FullStompFrame frame = channel.readInbound();
Assert.assertEquals(StompCommand.CONNECT, frame.command());
frame = channel.readInbound();
Assert.assertEquals(StompCommand.CONNECTED, frame.command());
frame = channel.readInbound();
Assert.assertEquals(StompCommand.SEND, frame.command());
Assert.assertNull(channel.readInbound());
}
@Test(expected = TooLongFrameException.class)
public void testTooLongFrameException() {
EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(), new StompAggregator(10));
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes()));
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.util.CharsetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
public class StompDecoderTest {
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new StompDecoder());
}
@After
public void teardown() throws Exception {
Assert.assertFalse(channel.finish());
}
@Test
public void testSingleFrameDecoding() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.CONNECT, frame.command());
StompContent content = channel.readInbound();
Assert.assertTrue(content == LastStompContent.EMPTY_LAST_CONTENT);
Object o = channel.readInbound();
Assert.assertNull(o);
}
@Test
public void testSingleFrameWithBodyAndContentLength() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
StompContent content = channel.readInbound();
Assert.assertTrue(content instanceof LastStompContent);
String s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals("hello, queue a!!!", s);
Assert.assertNull(channel.readInbound());
}
@Test
public void testSingleFrameWithBodyWithoutContentLength() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_1.getBytes());
channel.writeInbound(incoming);
StompFrame frame = (StompFrame) channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
StompContent content = (StompContent) channel.readInbound();
Assert.assertTrue(content instanceof LastStompContent);
String s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals("hello, queue a!", s);
Assert.assertNull(channel.readInbound());
}
@Test
public void testSingleFrameChunked() {
EmbeddedChannel channel = new EmbeddedChannel(new StompDecoder(10000, 5));
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_2.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
StompContent content = channel.readInbound();
String s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals("hello", s);
content = channel.readInbound();
s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals(", que", s);
content = channel.readInbound();
s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals("ue a!", s);
content = channel.readInbound();
s = content.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals("!!", s);
Assert.assertNull(channel.readInbound());
}
@Test
public void testMultipleFramesDecoding() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.CONNECT_FRAME.getBytes());
incoming.writeBytes(StompTestConstants.CONNECTED_FRAME.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.CONNECT, frame.command());
StompContent content = channel.readInbound();
Assert.assertTrue(content == LastStompContent.EMPTY_LAST_CONTENT);
StompFrame frame2 = channel.readInbound();
Assert.assertNotNull(frame2);
Assert.assertEquals(StompCommand.CONNECTED, frame2.command());
StompContent content2 = channel.readInbound();
Assert.assertTrue(content2 == LastStompContent.EMPTY_LAST_CONTENT);
Assert.assertNull(channel.readInbound());
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
import io.netty.util.CharsetUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
public class StompEncoderTest {
private EmbeddedChannel channel;
@Before
public void setup() throws Exception {
channel = new EmbeddedChannel(new StompEncoder());
}
@After
public void teardown() throws Exception {
Assert.assertFalse(channel.finish());
}
@Test
public void testFrameAndContentEncoding() {
StompFrame frame = new DefaultStompFrame(StompCommand.CONNECT);
StompHeaders headers = frame.headers();
headers.set(StompHeaders.ACCEPT_VERSION, "1.1,1.2");
headers.set(StompHeaders.HOST, "stomp.github.org");
channel.writeOutbound(frame);
channel.writeOutbound(LastStompContent.EMPTY_LAST_CONTENT);
ByteBuf aggregatedBuffer = Unpooled.buffer();
ByteBuf byteBuf = channel.readOutbound();
Assert.assertNotNull(byteBuf);
aggregatedBuffer.writeBytes(byteBuf);
byteBuf = channel.readOutbound();
Assert.assertNotNull(byteBuf);
aggregatedBuffer.writeBytes(byteBuf);
aggregatedBuffer.resetReaderIndex();
String content = aggregatedBuffer.toString(CharsetUtil.UTF_8);
Assert.assertEquals(StompTestConstants.CONNECT_FRAME, content);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.stomp;
public final class StompTestConstants {
public static final String CONNECT_FRAME =
"CONNECT\r\n" +
"host:stomp.github.org\r\n" +
"accept-version:1.1,1.2\r\n" +
"\r\n" +
"\0";
public static final String CONNECTED_FRAME =
"CONNECTED\r\n" +
"version:1.2\n" +
"\r\n" +
"\0\n";
public static final String SEND_FRAME_1 =
"SEND\r\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
"\r\n" +
"hello, queue a!" +
"\0\n";
public static final String SEND_FRAME_2 =
"SEND\r\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
"content-length:17\n" +
"\r\n" +
"hello, queue a!!!" +
"\0\n";
private StompTestConstants() {
}
}

View File

@ -60,6 +60,11 @@
<artifactId>netty-codec-socks</artifactId> <artifactId>netty-codec-socks</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-stomp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>

View File

@ -0,0 +1,163 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.stomp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.stomp.FullStompFrame;
import io.netty.handler.codec.stomp.DefaultFullStompFrame;
import io.netty.handler.codec.stomp.StompCommand;
import io.netty.handler.codec.stomp.StompHeaders;
import io.netty.handler.codec.stomp.StompEncoder;
import io.netty.handler.codec.stomp.StompDecoder;
import io.netty.handler.codec.stomp.StompAggregator;
/**
* very simple stomp client implementation example, requires running stomp server to actually work
* uses default username/password and destination values from hornetq message broker
*/
public class StompClient implements StompFrameListener {
public static final String DEAFULT_HOST = "localhost";
public static final int DEFAULT_PORT = 61613;
public static final String DEFAULT_USERNAME = "guest";
public static final String DEFAULT_PASSWORD = "guest";
private static final String EXAMPLE_TOPIC = "jms.topic.exampleTopic";
private final String host;
private final int port;
private final String username;
private final String password;
private ClientState state = ClientState.CONNECTING;
private Channel ch;
public static void main(String[] args) throws Exception {
String host;
int port;
String username;
String password;
if (args.length == 0) {
host = DEAFULT_HOST;
port = DEFAULT_PORT;
username = DEFAULT_USERNAME;
password = DEFAULT_PASSWORD;
} else if (args.length == 4) {
host = args[0];
port = Integer.parseInt(args[1]);
username = args[2];
password = args[3];
} else {
System.err.println("Usage: " + StompClient.class.getSimpleName() + " <host> <port> <username> <password>");
return;
}
StompClient stompClient = new StompClient(host, port, username, password);
stompClient.run();
}
public StompClient(String host, int port, String username, String password) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
final StompClient that = this;
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StompDecoder());
pipeline.addLast("encoder", new StompEncoder());
pipeline.addLast("aggregator", new StompAggregator(1048576));
pipeline.addLast("handler", new StompClientHandler(that));
}
});
b.remoteAddress(host, port);
this.ch = b.connect().sync().channel();
FullStompFrame connFrame = new DefaultFullStompFrame(StompCommand.CONNECT);
connFrame.headers().set(StompHeaders.ACCEPT_VERSION, "1.2");
connFrame.headers().set(StompHeaders.HOST, host);
connFrame.headers().set(StompHeaders.LOGIN, username);
connFrame.headers().set(StompHeaders.PASSCODE, password);
ch.writeAndFlush(connFrame).sync();
}
@Override
public void onFrame(FullStompFrame frame) {
String subscrReceiptId = "001";
String disconReceiptId = "002";
try {
switch (frame.command()) {
case CONNECTED:
FullStompFrame subscribeFrame = new DefaultFullStompFrame(StompCommand.SUBSCRIBE);
subscribeFrame.headers().set(StompHeaders.DESTINATION, EXAMPLE_TOPIC);
subscribeFrame.headers().set(StompHeaders.RECEIPT, subscrReceiptId);
subscribeFrame.headers().set(StompHeaders.ID, "1");
System.out.println("connected, sending subscribe frame: " + subscribeFrame);
state = ClientState.CONNECTED;
ch.writeAndFlush(subscribeFrame);
break;
case RECEIPT:
String receiptHeader = frame.headers().get(StompHeaders.RECEIPT_ID);
if (state == ClientState.CONNECTED && receiptHeader.equals(subscrReceiptId)) {
FullStompFrame msgFrame = new DefaultFullStompFrame(StompCommand.SEND);
msgFrame.headers().set(StompHeaders.DESTINATION, EXAMPLE_TOPIC);
msgFrame.content().writeBytes("some payload".getBytes());
System.out.println("subscribed, sending message frame: " + msgFrame);
state = ClientState.SUBSCRIBED;
ch.writeAndFlush(msgFrame);
} else if (state == ClientState.DISCONNECTING && receiptHeader.equals(disconReceiptId)) {
System.out.println("disconnected, exiting..");
System.exit(0);
} else {
throw new IllegalStateException("received: " + frame + ", while internal state is " + state);
}
break;
case MESSAGE:
if (state == ClientState.SUBSCRIBED) {
System.out.println("received frame: " + frame);
FullStompFrame disconnFrame = new DefaultFullStompFrame(StompCommand.DISCONNECT);
disconnFrame.headers().set(StompHeaders.RECEIPT, disconReceiptId);
System.out.println("sending disconnect frame: " + disconnFrame);
state = ClientState.DISCONNECTING;
ch.writeAndFlush(disconnFrame);
}
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
enum ClientState {
CONNECTING,
CONNECTED,
SUBSCRIBED,
DISCONNECTING
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.stomp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.stomp.FullStompFrame;
/**
* STOMP client inbound handler implementation, which just passes received messages to listener
*/
public class StompClientHandler extends SimpleChannelInboundHandler<FullStompFrame> {
private final StompFrameListener listener;
public StompClientHandler(StompFrameListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
this.listener = listener;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullStompFrame msg) throws Exception {
listener.onFrame(msg);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.stomp;
import io.netty.handler.codec.stomp.FullStompFrame;
/**
* STOMP frame listener which used as a callback in {@link StompClientHandler}
*/
public interface StompFrameListener {
void onFrame(FullStompFrame frame);
}

View File

@ -337,6 +337,7 @@
<module>codec</module> <module>codec</module>
<module>codec-http</module> <module>codec-http</module>
<module>codec-memcache</module> <module>codec-memcache</module>
<module>codec-stomp</module>
<module>codec-socks</module> <module>codec-socks</module>
<module>transport</module> <module>transport</module>
<module>transport-rxtx</module> <module>transport-rxtx</module>