Introduce AddressedEnvelope message type for generic representation of an addressed message
- Fixes #1282 (not perfectly, but to the extent it's possible with the current API) - Add AddressedEnvelope and DefaultAddressedEnvelope - Make DatagramPacket extend DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> - Rename ByteBufHolder.data() to content() so that a message can implement both AddressedEnvelope and ByteBufHolder (DatagramPacket does) without introducing two getter methods for the content - Datagram channel implementations now understand ByteBuf and ByteBufHolder as a message with unspecified remote address.
This commit is contained in:
parent
bc96c4b7b3
commit
1e0c83db23
@ -22,9 +22,8 @@ public interface ByteBufHolder extends ReferenceCounted {
|
||||
|
||||
/**
|
||||
* Return the data which is held by this {@link ByteBufHolder}.
|
||||
*
|
||||
*/
|
||||
ByteBuf data();
|
||||
ByteBuf content();
|
||||
|
||||
/**
|
||||
* Create a deep copy of this {@link ByteBufHolder}.
|
||||
|
@ -31,7 +31,7 @@ public class DefaultByteBufHolder implements ByteBufHolder {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
if (data.refCnt() <= 0) {
|
||||
throw new IllegalBufferAccessException();
|
||||
}
|
||||
@ -72,6 +72,6 @@ public class DefaultByteBufHolder implements ByteBufHolder {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + '(' + data().toString() + ')';
|
||||
return getClass().getSimpleName() + '(' + content().toString() + ')';
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
return content;
|
||||
}
|
||||
|
||||
@ -95,7 +95,7 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
|
||||
@Override
|
||||
public FullHttpRequest copy() {
|
||||
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
|
||||
getProtocolVersion(), getMethod(), getUri(), data().copy());
|
||||
getProtocolVersion(), getMethod(), getUri(), content().copy());
|
||||
copy.headers().set(headers());
|
||||
copy.trailingHeaders().set(trailingHeaders());
|
||||
return copy;
|
||||
|
@ -45,7 +45,7 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
return content;
|
||||
}
|
||||
|
||||
@ -90,7 +90,7 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
|
||||
|
||||
@Override
|
||||
public FullHttpResponse copy() {
|
||||
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(), data().copy());
|
||||
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(), content().copy());
|
||||
copy.headers().set(headers());
|
||||
copy.trailingHeaders().set(trailingHeaders());
|
||||
return copy;
|
||||
|
@ -35,7 +35,7 @@ public class DefaultHttpContent extends DefaultHttpObject implements HttpContent
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
return content;
|
||||
}
|
||||
|
||||
@ -73,6 +73,6 @@ public class DefaultHttpContent extends DefaultHttpObject implements HttpContent
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(data: " + data() + ", getDecoderResult: " + getDecoderResult() + ')';
|
||||
return getClass().getSimpleName() + "(data: " + content() + ", getDecoderResult: " + getDecoderResult() + ')';
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ public class DefaultLastHttpContent extends DefaultHttpContent implements LastHt
|
||||
|
||||
@Override
|
||||
public LastHttpContent copy() {
|
||||
DefaultLastHttpContent copy = new DefaultLastHttpContent(data().copy());
|
||||
DefaultLastHttpContent copy = new DefaultLastHttpContent(content().copy());
|
||||
copy.trailingHeaders().set(trailingHeaders());
|
||||
return copy;
|
||||
}
|
||||
|
@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.embedded.EmbeddedByteChannel;
|
||||
import io.netty.handler.codec.MessageToMessageDecoder;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Decodes the content of the received {@link HttpRequest} and {@link HttpContent}.
|
||||
* The original content is replaced with the new content decoded by the
|
||||
@ -106,18 +108,17 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
|
||||
} else {
|
||||
headers.set(HttpHeaders.Names.CONTENT_ENCODING, targetContentEncoding);
|
||||
}
|
||||
|
||||
Object[] decoded = decodeContent(message, c);
|
||||
|
||||
// Replace the content.
|
||||
if (headers.contains(HttpHeaders.Names.CONTENT_LENGTH)) {
|
||||
headers.set(
|
||||
HttpHeaders.Names.CONTENT_LENGTH,
|
||||
Integer.toString(((ByteBufHolder) decoded[1]).data().readableBytes()));
|
||||
Integer.toString(((ByteBufHolder) decoded[1]).content().readableBytes()));
|
||||
}
|
||||
|
||||
for (Object obj: decoded) {
|
||||
out.add(obj);
|
||||
}
|
||||
Collections.addAll(out, decoded);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -130,11 +131,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
|
||||
}
|
||||
|
||||
if (decoder != null) {
|
||||
Object[] decoded = decodeContent(null, c);
|
||||
|
||||
for (Object obj: decoded) {
|
||||
out.add(obj);
|
||||
}
|
||||
Collections.addAll(out, decodeContent(null, c));
|
||||
} else {
|
||||
if (c instanceof LastHttpContent) {
|
||||
decodeStarted = false;
|
||||
@ -146,7 +143,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
|
||||
|
||||
private Object[] decodeContent(HttpMessage header, HttpContent c) {
|
||||
ByteBuf newContent = Unpooled.buffer();
|
||||
ByteBuf content = c.data();
|
||||
ByteBuf content = c.content();
|
||||
decode(content, newContent);
|
||||
|
||||
if (c instanceof LastHttpContent) {
|
||||
|
@ -110,7 +110,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
|
||||
|
||||
if (isFull) {
|
||||
// Pass through the full response with empty content and continue waiting for the the next resp.
|
||||
if (!((ByteBufHolder) res).data().isReadable()) {
|
||||
if (!((ByteBufHolder) res).content().isReadable()) {
|
||||
// Set the content length to 0.
|
||||
res.headers().remove(Names.TRANSFER_ENCODING);
|
||||
res.headers().set(Names.CONTENT_LENGTH, "0");
|
||||
@ -127,7 +127,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
|
||||
if (isFull) {
|
||||
// As an unchunked response
|
||||
res.headers().remove(Names.TRANSFER_ENCODING);
|
||||
res.headers().set(Names.CONTENT_LENGTH, ((ByteBufHolder) res).data().readableBytes());
|
||||
res.headers().set(Names.CONTENT_LENGTH, ((ByteBufHolder) res).content().readableBytes());
|
||||
out.add(BufUtil.retain(res));
|
||||
} else {
|
||||
// As a chunked response
|
||||
@ -201,7 +201,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
|
||||
|
||||
private HttpContent[] encodeContent(HttpContent c) {
|
||||
ByteBuf newContent = Unpooled.buffer();
|
||||
ByteBuf content = c.data();
|
||||
ByteBuf content = c.content();
|
||||
|
||||
encode(content, newContent);
|
||||
|
||||
|
@ -809,7 +809,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
|
||||
|
||||
/**
|
||||
* Returns the length of the content. Please note that this value is
|
||||
* not retrieved from {@link HttpContent#data()} but from the
|
||||
* not retrieved from {@link HttpContent#content()} but from the
|
||||
* {@code "Content-Length"} header, and thus they are independent from each
|
||||
* other.
|
||||
*
|
||||
@ -838,7 +838,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
|
||||
|
||||
/**
|
||||
* Returns the length of the content. Please note that this value is
|
||||
* not retrieved from {@link HttpContent#data()} but from the
|
||||
* not retrieved from {@link HttpContent#content()} but from the
|
||||
* {@code "Content-Length"} header, and thus they are independent from each
|
||||
* other.
|
||||
*
|
||||
|
@ -152,9 +152,9 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
|
||||
|
||||
// Merge the received chunk into the content of the current message.
|
||||
HttpContent chunk = (HttpContent) msg;
|
||||
CompositeByteBuf content = (CompositeByteBuf) currentMessage.data();
|
||||
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
|
||||
|
||||
if (content.readableBytes() > maxContentLength - chunk.data().readableBytes()) {
|
||||
if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
|
||||
// TODO: Respond with 413 Request Entity Too Large
|
||||
// and discard the traffic or close the connection.
|
||||
// No need to notify the upstream handlers - just log.
|
||||
@ -165,10 +165,10 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
|
||||
}
|
||||
|
||||
// Append the content of the chunk
|
||||
if (chunk.data().isReadable()) {
|
||||
if (chunk.content().isReadable()) {
|
||||
chunk.retain();
|
||||
content.addComponent(chunk.data());
|
||||
content.writerIndex(content.writerIndex() + chunk.data().readableBytes());
|
||||
content.addComponent(chunk.content());
|
||||
content.writerIndex(content.writerIndex() + chunk.content().readableBytes());
|
||||
}
|
||||
|
||||
final boolean last;
|
||||
|
@ -72,7 +72,7 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
|
||||
}
|
||||
|
||||
HttpContent chunk = (HttpContent) msg;
|
||||
ByteBuf content = chunk.data();
|
||||
ByteBuf content = chunk.content();
|
||||
int contentLength = content.readableBytes();
|
||||
|
||||
if (state == ST_CONTENT_NON_CHUNK) {
|
||||
|
@ -30,7 +30,7 @@ public interface LastHttpContent extends HttpContent {
|
||||
LastHttpContent EMPTY_LAST_CONTENT = new LastHttpContent() {
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
return Unpooled.EMPTY_BUFFER;
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ public abstract class AbstractHttpData extends AbstractReferenceCounted implemen
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
public ByteBuf content() {
|
||||
try {
|
||||
return getByteBuf();
|
||||
} catch (IOException e) {
|
||||
|
@ -144,7 +144,7 @@ public class DiskAttribute extends AbstractDiskHttpData implements Attribute {
|
||||
public Attribute copy() {
|
||||
DiskAttribute attr = new DiskAttribute(getName());
|
||||
attr.setCharset(getCharset());
|
||||
ByteBuf content = data();
|
||||
ByteBuf content = content();
|
||||
if (content != null) {
|
||||
try {
|
||||
attr.setContent(content.copy());
|
||||
|
@ -167,7 +167,7 @@ public class DiskFileUpload extends AbstractDiskHttpData implements FileUpload {
|
||||
public FileUpload copy() {
|
||||
DiskFileUpload upload = new DiskFileUpload(getName(),
|
||||
getFilename(), getContentType(), getContentTransferEncoding(), getCharset(), size);
|
||||
ByteBuf buf = data();
|
||||
ByteBuf buf = content();
|
||||
if (buf != null) {
|
||||
try {
|
||||
upload.setContent(buf.copy());
|
||||
|
@ -348,7 +348,7 @@ public class HttpPostRequestDecoder {
|
||||
* errors
|
||||
*/
|
||||
public void offer(HttpContent content) throws ErrorDataDecoderException {
|
||||
ByteBuf chunked = content.data();
|
||||
ByteBuf chunked = content.content();
|
||||
if (undecodedChunk == null) {
|
||||
undecodedChunk = chunked;
|
||||
} else {
|
||||
|
@ -699,11 +699,11 @@ public class HttpPostRequestEncoder implements ChunkedMessageInput<HttpContent>
|
||||
}
|
||||
}
|
||||
HttpHeaders.setTransferEncodingChunked(request);
|
||||
request.data().clear();
|
||||
request.content().clear();
|
||||
} else {
|
||||
// get the only one body and set it to the request
|
||||
HttpContent chunk = nextChunk();
|
||||
request.data().clear().writeBytes(chunk.data());
|
||||
request.content().clear().writeBytes(chunk.content());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ public class MemoryAttribute extends AbstractMemoryHttpData implements Attribute
|
||||
public Attribute copy() {
|
||||
MemoryAttribute attr = new MemoryAttribute(getName());
|
||||
attr.setCharset(getCharset());
|
||||
ByteBuf content = data();
|
||||
ByteBuf content = content();
|
||||
if (content != null) {
|
||||
try {
|
||||
attr.setContent(content.copy());
|
||||
|
@ -133,7 +133,7 @@ public class MemoryFileUpload extends AbstractMemoryHttpData implements FileUplo
|
||||
public FileUpload copy() {
|
||||
MemoryFileUpload upload = new MemoryFileUpload(getName(), getFilename(), getContentType(),
|
||||
getContentTransferEncoding(), getCharset(), size);
|
||||
ByteBuf buf = data();
|
||||
ByteBuf buf = content();
|
||||
if (buf != null) {
|
||||
try {
|
||||
upload.setContent(buf.copy());
|
||||
|
@ -205,8 +205,8 @@ public class MixedAttribute implements Attribute {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
return attribute.data();
|
||||
public ByteBuf content() {
|
||||
return attribute.content();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -231,8 +231,8 @@ public class MixedFileUpload implements FileUpload {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf data() {
|
||||
return fileUpload.data();
|
||||
public ByteBuf content() {
|
||||
return fileUpload.content();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +56,7 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
|
||||
|
||||
@Override
|
||||
public BinaryWebSocketFrame copy() {
|
||||
return new BinaryWebSocketFrame(isFinalFragment(), rsv(), data().copy());
|
||||
return new BinaryWebSocketFrame(isFinalFragment(), rsv(), content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,7 +109,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
|
||||
* a getStatus code is set, -1 is returned.
|
||||
*/
|
||||
public int statusCode() {
|
||||
ByteBuf binaryData = data();
|
||||
ByteBuf binaryData = content();
|
||||
if (binaryData == null || binaryData.capacity() == 0) {
|
||||
return -1;
|
||||
}
|
||||
@ -126,7 +126,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
|
||||
* text is not supplied, an empty string is returned.
|
||||
*/
|
||||
public String reasonText() {
|
||||
ByteBuf binaryData = data();
|
||||
ByteBuf binaryData = content();
|
||||
if (binaryData == null || binaryData.capacity() <= 2) {
|
||||
return "";
|
||||
}
|
||||
@ -140,7 +140,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
|
||||
|
||||
@Override
|
||||
public CloseWebSocketFrame copy() {
|
||||
return new CloseWebSocketFrame(isFinalFragment(), rsv(), data().copy());
|
||||
return new CloseWebSocketFrame(isFinalFragment(), rsv(), content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,7 +95,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
|
||||
* Returns the text data in this frame
|
||||
*/
|
||||
public String text() {
|
||||
return data().toString(CharsetUtil.UTF_8);
|
||||
return content().toString(CharsetUtil.UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -121,7 +121,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
|
||||
|
||||
@Override
|
||||
public ContinuationWebSocketFrame copy() {
|
||||
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), data().copy(), aggregatedText());
|
||||
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content().copy(), aggregatedText());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +56,7 @@ public class PingWebSocketFrame extends WebSocketFrame {
|
||||
|
||||
@Override
|
||||
public PingWebSocketFrame copy() {
|
||||
return new PingWebSocketFrame(isFinalFragment(), rsv(), data().copy());
|
||||
return new PingWebSocketFrame(isFinalFragment(), rsv(), content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +56,7 @@ public class PongWebSocketFrame extends WebSocketFrame {
|
||||
|
||||
@Override
|
||||
public PongWebSocketFrame copy() {
|
||||
return new PongWebSocketFrame(isFinalFragment(), rsv(), data().copy());
|
||||
return new PongWebSocketFrame(isFinalFragment(), rsv(), content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,12 +91,12 @@ public class TextWebSocketFrame extends WebSocketFrame {
|
||||
* Returns the text data in this frame
|
||||
*/
|
||||
public String text() {
|
||||
return data().toString(CharsetUtil.UTF_8);
|
||||
return content().toString(CharsetUtil.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TextWebSocketFrame copy() {
|
||||
return new TextWebSocketFrame(isFinalFragment(), rsv(), data().copy());
|
||||
return new TextWebSocketFrame(isFinalFragment(), rsv(), content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -33,7 +33,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame
|
||||
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
|
||||
if (msg instanceof TextWebSocketFrame) {
|
||||
// Text frame
|
||||
ByteBuf data = msg.data();
|
||||
ByteBuf data = msg.content();
|
||||
out.writeByte((byte) 0x00);
|
||||
out.writeBytes(data, data.readerIndex(), data.readableBytes());
|
||||
out.writeByte((byte) 0xFF);
|
||||
@ -43,7 +43,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame
|
||||
out.writeByte((byte) 0x00);
|
||||
} else {
|
||||
// Binary frame
|
||||
ByteBuf data = msg.data();
|
||||
ByteBuf data = msg.content();
|
||||
int dataLen = data.readableBytes();
|
||||
out.ensureWritable(dataLen + 5);
|
||||
|
||||
|
@ -98,7 +98,7 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
|
||||
|
||||
byte[] mask;
|
||||
|
||||
ByteBuf data = msg.data();
|
||||
ByteBuf data = msg.content();
|
||||
if (data == null) {
|
||||
data = Unpooled.EMPTY_BUFFER;
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
|
||||
// Set Content-Length to workaround some known defect.
|
||||
// See also: http://www.ietf.org/mail-archive/web/hybi/current/msg02149.html
|
||||
headers.set(Names.CONTENT_LENGTH, key3.length);
|
||||
request.data().writeBytes(key3);
|
||||
request.content().writeBytes(key3);
|
||||
return request;
|
||||
}
|
||||
|
||||
@ -211,7 +211,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
|
||||
+ connection);
|
||||
}
|
||||
|
||||
ByteBuf challenge = response.data();
|
||||
ByteBuf challenge = response.content();
|
||||
if (!challenge.equals(expectedChallengeResponseBytes)) {
|
||||
throw new WebSocketHandshakeException("Invalid challenge");
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public abstract class WebSocketFrame extends DefaultByteBufHolder {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(data: " + data().toString() + ')';
|
||||
return getClass().getSimpleName() + "(data: " + content().toString() + ')';
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,8 +52,8 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
|
||||
out.add(msg.retain());
|
||||
return;
|
||||
}
|
||||
ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.data().retain());
|
||||
buf.writerIndex(buf.writerIndex() + msg.data().readableBytes());
|
||||
ByteBuf buf = ctx.alloc().compositeBuffer().addComponent(msg.content().retain());
|
||||
buf.writerIndex(buf.writerIndex() + msg.content().readableBytes());
|
||||
|
||||
if (msg instanceof TextWebSocketFrame) {
|
||||
currentFrame = new TextWebSocketFrame(true, msg.rsv(), buf);
|
||||
@ -66,19 +66,19 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
|
||||
return;
|
||||
}
|
||||
if (msg instanceof ContinuationWebSocketFrame) {
|
||||
CompositeByteBuf content = (CompositeByteBuf) currentFrame.data();
|
||||
if (content.readableBytes() > maxFrameSize - msg.data().readableBytes()) {
|
||||
CompositeByteBuf content = (CompositeByteBuf) currentFrame.content();
|
||||
if (content.readableBytes() > maxFrameSize - msg.content().readableBytes()) {
|
||||
throw new TooLongFrameException(
|
||||
"WebSocketFrame length exceeded " + content +
|
||||
" bytes.");
|
||||
}
|
||||
content.addComponent(msg.data().retain());
|
||||
content.writerIndex(content.writerIndex() + msg.data().readableBytes());
|
||||
content.addComponent(msg.content().retain());
|
||||
content.writerIndex(content.writerIndex() + msg.content().readableBytes());
|
||||
|
||||
if (msg.isFinalFragment()) {
|
||||
WebSocketFrame frame = this.currentFrame;
|
||||
WebSocketFrame currentFrame = this.currentFrame;
|
||||
this.currentFrame = null;
|
||||
out.add(frame);
|
||||
out.add(currentFrame);
|
||||
return;
|
||||
} else {
|
||||
return;
|
||||
|
@ -24,8 +24,8 @@ abstract class WebSocketProtocolHandler extends ChannelInboundMessageHandlerAda
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
|
||||
if (frame instanceof PingWebSocketFrame) {
|
||||
frame.data().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.data()));
|
||||
frame.content().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.content()));
|
||||
return;
|
||||
}
|
||||
if (frame instanceof PongWebSocketFrame) {
|
||||
|
@ -152,12 +152,12 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
|
||||
BEGINNING_SPACE.matcher(key1).replaceAll("").length());
|
||||
int b = (int) (Long.parseLong(BEGINNING_DIGIT.matcher(key2).replaceAll("")) /
|
||||
BEGINNING_SPACE.matcher(key2).replaceAll("").length());
|
||||
long c = req.data().readLong();
|
||||
long c = req.content().readLong();
|
||||
ByteBuf input = Unpooled.buffer(16);
|
||||
input.writeInt(a);
|
||||
input.writeInt(b);
|
||||
input.writeLong(c);
|
||||
res.data().writeBytes(WebSocketUtil.md5(input.array()));
|
||||
res.content().writeBytes(WebSocketUtil.md5(input.array()));
|
||||
} else {
|
||||
// Old Hixie 75 handshake getMethod with no challenge:
|
||||
res.headers().add(WEBSOCKET_ORIGIN, req.headers().get(ORIGIN));
|
||||
|
@ -84,7 +84,7 @@ public class DefaultSpdyDataFrame extends DefaultByteBufHolder implements SpdyDa
|
||||
|
||||
@Override
|
||||
public DefaultSpdyDataFrame copy() {
|
||||
DefaultSpdyDataFrame frame = new DefaultSpdyDataFrame(getStreamId(), data().copy());
|
||||
DefaultSpdyDataFrame frame = new DefaultSpdyDataFrame(getStreamId(), content().copy());
|
||||
frame.setLast(isLast());
|
||||
return frame;
|
||||
}
|
||||
@ -116,7 +116,7 @@ public class DefaultSpdyDataFrame extends DefaultByteBufHolder implements SpdyDa
|
||||
if (refCnt() == 0) {
|
||||
buf.append("(freed)");
|
||||
} else {
|
||||
buf.append(data().readableBytes());
|
||||
buf.append(content().readableBytes());
|
||||
}
|
||||
return buf.toString();
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ public interface SpdyDataFrame extends ByteBufHolder, SpdyStreamFrame, SpdyDataO
|
||||
* The data payload cannot exceed 16777215 bytes.
|
||||
*/
|
||||
@Override
|
||||
ByteBuf data();
|
||||
ByteBuf content();
|
||||
|
||||
@Override
|
||||
SpdyDataFrame copy();
|
||||
|
@ -80,7 +80,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<SpdyDataOrControlFram
|
||||
if (msg instanceof SpdyDataFrame) {
|
||||
|
||||
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
|
||||
ByteBuf data = spdyDataFrame.data();
|
||||
ByteBuf data = spdyDataFrame.content();
|
||||
byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0;
|
||||
out.ensureWritable(SPDY_HEADER_SIZE + data.readableBytes());
|
||||
out.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF);
|
||||
|
@ -201,14 +201,14 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyDataOrControlFr
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuf content = fullHttpMessage.data();
|
||||
if (content.readableBytes() > maxContentLength - spdyDataFrame.data().readableBytes()) {
|
||||
ByteBuf content = fullHttpMessage.content();
|
||||
if (content.readableBytes() > maxContentLength - spdyDataFrame.content().readableBytes()) {
|
||||
messageMap.remove(streamID);
|
||||
throw new TooLongFrameException(
|
||||
"HTTP content length exceeded " + maxContentLength + " bytes.");
|
||||
}
|
||||
|
||||
ByteBuf spdyDataFrameData = spdyDataFrame.data();
|
||||
ByteBuf spdyDataFrameData = spdyDataFrame.content();
|
||||
int spdyDataFrameDataLen = spdyDataFrameData.readableBytes();
|
||||
content.writeBytes(spdyDataFrameData, spdyDataFrameData.readerIndex(), spdyDataFrameDataLen);
|
||||
|
||||
|
@ -168,8 +168,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
|
||||
|
||||
HttpContent chunk = (HttpContent) msg;
|
||||
|
||||
chunk.data().retain();
|
||||
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId, chunk.data());
|
||||
chunk.content().retain();
|
||||
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId, chunk.content());
|
||||
spdyDataFrame.setLast(chunk instanceof LastHttpContent);
|
||||
if (chunk instanceof LastHttpContent) {
|
||||
LastHttpContent trailer = (LastHttpContent) chunk;
|
||||
|
@ -179,7 +179,7 @@ public class SpdySessionHandler
|
||||
|
||||
if (flowControl) {
|
||||
// Update receive window size
|
||||
int deltaWindowSize = -1 * spdyDataFrame.data().readableBytes();
|
||||
int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
|
||||
int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
|
||||
|
||||
// Window size can become negative if we sent a SETTINGS frame that reduces the
|
||||
@ -195,9 +195,9 @@ public class SpdySessionHandler
|
||||
// Window size became negative due to sender writing frame before receiving SETTINGS
|
||||
// Send data frames upstream in initialReceiveWindowSize chunks
|
||||
if (newWindowSize < 0) {
|
||||
while (spdyDataFrame.data().readableBytes() > initialReceiveWindowSize) {
|
||||
while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
|
||||
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
|
||||
spdyDataFrame.data().readSlice(initialReceiveWindowSize).retain());
|
||||
spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain());
|
||||
ctx.nextOutboundMessageBuffer().add(partialDataFrame);
|
||||
ctx.flush();
|
||||
}
|
||||
@ -492,7 +492,7 @@ public class SpdySessionHandler
|
||||
|
||||
if (flowControl) {
|
||||
synchronized (flowControlLock) {
|
||||
int dataLength = spdyDataFrame.data().readableBytes();
|
||||
int dataLength = spdyDataFrame.content().readableBytes();
|
||||
int sendWindowSize = spdySession.getSendWindowSize(streamID);
|
||||
|
||||
if (sendWindowSize >= dataLength) {
|
||||
@ -521,7 +521,7 @@ public class SpdySessionHandler
|
||||
|
||||
// Create a partial data frame whose length is the current window size
|
||||
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
|
||||
spdyDataFrame.data().readSlice(sendWindowSize).retain());
|
||||
spdyDataFrame.content().readSlice(sendWindowSize).retain());
|
||||
|
||||
// Enqueue the remaining data (will be the first frame queued)
|
||||
spdySession.putPendingWrite(streamID, spdyDataFrame);
|
||||
@ -805,7 +805,7 @@ public class SpdySessionHandler
|
||||
break;
|
||||
}
|
||||
|
||||
int dataFrameSize = spdyDataFrame.data().readableBytes();
|
||||
int dataFrameSize = spdyDataFrame.content().readableBytes();
|
||||
|
||||
if (newWindowSize >= dataFrameSize) {
|
||||
// Window size is large enough to send entire data frame
|
||||
@ -840,7 +840,7 @@ public class SpdySessionHandler
|
||||
|
||||
// Create a partial data frame whose length is the current window size
|
||||
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
|
||||
spdyDataFrame.data().readSlice(newWindowSize).retain());
|
||||
spdyDataFrame.content().readSlice(newWindowSize).retain());
|
||||
|
||||
// The transfer window size is pre-decremented when sending a data frame downstream.
|
||||
// Close the stream on write failures that leaves the transfer window in a corrupt state.
|
||||
|
@ -81,7 +81,7 @@ public class HttpContentCompressorTest {
|
||||
HttpContent chunk;
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||
assertThat(chunk.data().isReadable(), is(true));
|
||||
assertThat(chunk.content().isReadable(), is(true));
|
||||
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
}
|
||||
|
@ -59,11 +59,11 @@ public class HttpContentEncoderTest {
|
||||
|
||||
HttpContent chunk;
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("3"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("2"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("1"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
||||
|
||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
@ -86,11 +86,11 @@ public class HttpContentEncoderTest {
|
||||
|
||||
HttpContent chunk;
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("3"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("2"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
||||
chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().toString(CharsetUtil.US_ASCII), is("1"));
|
||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
||||
|
||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
@ -109,8 +109,8 @@ public class HttpContentEncoderTest {
|
||||
assertEncodedResponse(ch);
|
||||
|
||||
LastHttpContent c = (LastHttpContent) ch.readOutbound();
|
||||
assertThat(c.data().readableBytes(), is(2));
|
||||
assertThat(c.data().toString(CharsetUtil.US_ASCII), is("42"));
|
||||
assertThat(c.content().readableBytes(), is(2));
|
||||
assertThat(c.content().toString(CharsetUtil.US_ASCII), is("42"));
|
||||
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
}
|
||||
@ -129,7 +129,7 @@ public class HttpContentEncoderTest {
|
||||
|
||||
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
HttpContent chunk = (HttpContent) ch.readOutbound();
|
||||
assertThat(chunk.data().isReadable(), is(false));
|
||||
assertThat(chunk.content().isReadable(), is(false));
|
||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
}
|
||||
@ -156,8 +156,8 @@ public class HttpContentEncoderTest {
|
||||
assertThat(res.headers().get(Names.CONTENT_LENGTH), is("0"));
|
||||
// Content encoding shouldn't be modified.
|
||||
assertThat(res.headers().get(Names.CONTENT_ENCODING), is(nullValue()));
|
||||
assertThat(res.data().readableBytes(), is(0));
|
||||
assertThat(res.data().toString(CharsetUtil.US_ASCII), is(""));
|
||||
assertThat(res.content().readableBytes(), is(0));
|
||||
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
|
||||
|
||||
assertThat(ch.readOutbound(), is(nullValue()));
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ public class HttpObjectAggregatorTest {
|
||||
DefaultFullHttpRequest aggratedMessage = (DefaultFullHttpRequest) embedder.readInbound();
|
||||
assertNotNull(aggratedMessage);
|
||||
|
||||
assertEquals(chunk1.data().readableBytes() + chunk2.data().readableBytes(),
|
||||
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
|
||||
HttpHeaders.getContentLength(aggratedMessage));
|
||||
assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString());
|
||||
checkContentBuffer(aggratedMessage);
|
||||
@ -60,7 +60,7 @@ public class HttpObjectAggregatorTest {
|
||||
}
|
||||
|
||||
private static void checkContentBuffer(FullHttpRequest aggregatedMessage) {
|
||||
CompositeByteBuf buffer = (CompositeByteBuf) aggregatedMessage.data();
|
||||
CompositeByteBuf buffer = (CompositeByteBuf) aggregatedMessage.content();
|
||||
assertEquals(2, buffer.numComponents());
|
||||
List<ByteBuf> buffers = buffer.decompose(0, buffer.capacity());
|
||||
assertEquals(2, buffers.size());
|
||||
@ -93,7 +93,7 @@ public class HttpObjectAggregatorTest {
|
||||
DefaultFullHttpRequest aggratedMessage = (DefaultFullHttpRequest) embedder.readInbound();
|
||||
assertNotNull(aggratedMessage);
|
||||
|
||||
assertEquals(chunk1.data().readableBytes() + chunk2.data().readableBytes(),
|
||||
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
|
||||
HttpHeaders.getContentLength(aggratedMessage));
|
||||
assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString());
|
||||
assertEquals(aggratedMessage.headers().get("X-Trailer"), Boolean.TRUE.toString());
|
||||
@ -158,7 +158,7 @@ public class HttpObjectAggregatorTest {
|
||||
FullHttpRequest aggratedMessage = (FullHttpRequest) embedder.readInbound();
|
||||
assertNotNull(aggratedMessage);
|
||||
|
||||
assertEquals(chunk1.data().readableBytes() + chunk2.data().readableBytes(),
|
||||
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
|
||||
HttpHeaders.getContentLength(aggratedMessage));
|
||||
assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString());
|
||||
checkContentBuffer(aggratedMessage);
|
||||
|
@ -54,7 +54,7 @@ public class HttpServerCodecTest {
|
||||
break;
|
||||
}
|
||||
empty = false;
|
||||
totalBytesPolled += httpChunk.data().readableBytes();
|
||||
totalBytesPolled += httpChunk.content().readableBytes();
|
||||
Assert.assertFalse(httpChunk instanceof LastHttpContent);
|
||||
}
|
||||
Assert.assertFalse(empty);
|
||||
|
@ -97,7 +97,7 @@ public class HttpPostRequestDecoderTest {
|
||||
data + "\r\n" +
|
||||
"--" + boundary + "--\r\n";
|
||||
|
||||
req.data().writeBytes(body.getBytes(CharsetUtil.UTF_8));
|
||||
req.content().writeBytes(body.getBytes(CharsetUtil.UTF_8));
|
||||
}
|
||||
// Create decoder instance to test.
|
||||
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(inMemoryFactory, req);
|
||||
|
@ -46,22 +46,22 @@ public class WebSocketFrameAggregatorTest {
|
||||
BinaryWebSocketFrame frame = (BinaryWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame.isFinalFragment());
|
||||
Assert.assertEquals(1, frame.rsv());
|
||||
Assert.assertEquals(content1, frame.data());
|
||||
Assert.assertEquals(content1, frame.content());
|
||||
|
||||
PingWebSocketFrame frame2 = (PingWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame2.isFinalFragment());
|
||||
Assert.assertEquals(0, frame2.rsv());
|
||||
Assert.assertEquals(content1, frame2.data());
|
||||
Assert.assertEquals(content1, frame2.content());
|
||||
|
||||
PongWebSocketFrame frame3 = (PongWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame3.isFinalFragment());
|
||||
Assert.assertEquals(0, frame3.rsv());
|
||||
Assert.assertEquals(content1, frame3.data());
|
||||
Assert.assertEquals(content1, frame3.content());
|
||||
|
||||
BinaryWebSocketFrame frame4 = (BinaryWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame4.isFinalFragment());
|
||||
Assert.assertEquals(0, frame4.rsv());
|
||||
Assert.assertEquals(aggregatedContent, frame4.data());
|
||||
Assert.assertEquals(aggregatedContent, frame4.content());
|
||||
|
||||
Assert.assertNull(channel.readInbound());
|
||||
}
|
||||
@ -81,22 +81,22 @@ public class WebSocketFrameAggregatorTest {
|
||||
TextWebSocketFrame frame = (TextWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame.isFinalFragment());
|
||||
Assert.assertEquals(1, frame.rsv());
|
||||
Assert.assertEquals(content1.duplicate(), frame.data());
|
||||
Assert.assertEquals(content1.duplicate(), frame.content());
|
||||
|
||||
PingWebSocketFrame frame2 = (PingWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame2.isFinalFragment());
|
||||
Assert.assertEquals(0, frame2.rsv());
|
||||
Assert.assertEquals(content1, frame2.data());
|
||||
Assert.assertEquals(content1, frame2.content());
|
||||
|
||||
PongWebSocketFrame frame3 = (PongWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame3.isFinalFragment());
|
||||
Assert.assertEquals(0, frame3.rsv());
|
||||
Assert.assertEquals(content1, frame3.data());
|
||||
Assert.assertEquals(content1, frame3.content());
|
||||
|
||||
TextWebSocketFrame frame4 = (TextWebSocketFrame) channel.readInbound();
|
||||
Assert.assertTrue(frame4.isFinalFragment());
|
||||
Assert.assertEquals(0, frame4.rsv());
|
||||
Assert.assertEquals(aggregatedContent, frame4.data());
|
||||
Assert.assertEquals(aggregatedContent, frame4.content());
|
||||
|
||||
Assert.assertNull(channel.readInbound());
|
||||
}
|
||||
|
@ -66,6 +66,6 @@ public class WebSocketServerHandshaker00Test {
|
||||
Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
|
||||
LastHttpContent content = (LastHttpContent) ch2.readInbound();
|
||||
|
||||
Assert.assertEquals("8jKS'y:G*Co,Wxa-", content.data().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertEquals("8jKS'y:G*Co,Wxa-", content.content().toString(CharsetUtil.US_ASCII));
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ public class WebSocketServerProtocolHandlerTest {
|
||||
}
|
||||
|
||||
private static String getResponseMessage(FullHttpResponse response) {
|
||||
return new String(response.data().array());
|
||||
return new String(response.content().array());
|
||||
}
|
||||
|
||||
private static FullHttpResponse getHttpResponse(EmbeddedMessageChannel ch) {
|
||||
|
@ -84,4 +84,24 @@ public final class StringUtil {
|
||||
|
||||
return res.toArray(new String[res.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* The shortcut to {@link #simpleClassName(Class) simpleClassName(o.getClass())}.
|
||||
*/
|
||||
public static String simpleClassName(Object o) {
|
||||
return simpleClassName(o.getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a simplified name from a {@link Class}. Similar to {@link Class#getSimpleName()}, but it works fine
|
||||
* with anonymous classes.
|
||||
*/
|
||||
public static String simpleClassName(Class<?> clazz) {
|
||||
Package pkg = clazz.getPackage();
|
||||
if (pkg != null) {
|
||||
return clazz.getName().substring(pkg.getName().length() + 1);
|
||||
} else {
|
||||
return clazz.getName();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -272,7 +272,7 @@ public class HttpStaticFileServerHandler extends ChannelInboundMessageHandlerAda
|
||||
|
||||
buf.append("</ul></body></html>\r\n");
|
||||
|
||||
response.data().writeBytes(Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8));
|
||||
response.content().writeBytes(Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8));
|
||||
|
||||
// Close the connection as soon as the error message is sent.
|
||||
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
|
||||
|
@ -52,7 +52,7 @@ public class HttpSnoopClientHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
if (msg instanceof HttpContent) {
|
||||
HttpContent content = (HttpContent) msg;
|
||||
|
||||
System.out.print(content.data().toString(CharsetUtil.UTF_8));
|
||||
System.out.print(content.content().toString(CharsetUtil.UTF_8));
|
||||
System.out.flush();
|
||||
|
||||
if (content instanceof LastHttpContent) {
|
||||
|
@ -98,7 +98,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
if (msg instanceof HttpContent) {
|
||||
HttpContent httpContent = (HttpContent) msg;
|
||||
|
||||
ByteBuf content = httpContent.data();
|
||||
ByteBuf content = httpContent.content();
|
||||
if (content.isReadable()) {
|
||||
buf.append("CONTENT: ");
|
||||
buf.append(content.toString(CharsetUtil.UTF_8));
|
||||
@ -149,7 +149,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
|
||||
if (keepAlive) {
|
||||
// Add 'Content-Length' header only for a keep-alive connection.
|
||||
response.headers().set(CONTENT_LENGTH, response.data().readableBytes());
|
||||
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
|
||||
// Add keep alive header as per:
|
||||
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
|
||||
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
|
||||
|
@ -59,7 +59,7 @@ public class HttpUploadClientHandler extends ChannelInboundMessageHandlerAdapter
|
||||
}
|
||||
if (msg instanceof HttpContent) {
|
||||
HttpContent chunk = (HttpContent) msg;
|
||||
logger.info(chunk.data().toString(CharsetUtil.UTF_8));
|
||||
logger.info(chunk.content().toString(CharsetUtil.UTF_8));
|
||||
|
||||
if (chunk instanceof LastHttpContent) {
|
||||
if (readingChunks) {
|
||||
@ -69,7 +69,7 @@ public class HttpUploadClientHandler extends ChannelInboundMessageHandlerAdapter
|
||||
}
|
||||
readingChunks = false;
|
||||
} else {
|
||||
logger.info(chunk.data().toString(CharsetUtil.UTF_8));
|
||||
logger.info(chunk.content().toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
|
||||
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
|
||||
} else if (frame instanceof PingWebSocketFrame) {
|
||||
ctx.nextOutboundMessageBuffer().add(
|
||||
new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data().retain()));
|
||||
new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content().retain()));
|
||||
} else if (frame instanceof TextWebSocketFrame) {
|
||||
ctx.nextOutboundMessageBuffer().add(frame.retain());
|
||||
} else if (frame instanceof BinaryWebSocketFrame) {
|
||||
@ -120,8 +120,8 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
|
||||
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
|
||||
// Generate an error page if response getStatus code is not OK (200).
|
||||
if (res.getStatus().code() != 200) {
|
||||
res.data().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.data().readableBytes());
|
||||
res.content().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.content().readableBytes());
|
||||
}
|
||||
|
||||
// Send the response and close the connection if necessary.
|
||||
|
@ -91,7 +91,7 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
if (msg instanceof FullHttpResponse) {
|
||||
FullHttpResponse response = (FullHttpResponse) msg;
|
||||
throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content="
|
||||
+ response.data().toString(CharsetUtil.UTF_8) + ')');
|
||||
+ response.content().toString(CharsetUtil.UTF_8) + ')');
|
||||
}
|
||||
|
||||
WebSocketFrame frame = (WebSocketFrame) msg;
|
||||
|
@ -111,8 +111,8 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
return;
|
||||
}
|
||||
if (frame instanceof PingWebSocketFrame) {
|
||||
frame.data().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.data()));
|
||||
frame.content().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.content()));
|
||||
return;
|
||||
}
|
||||
if (!(frame instanceof TextWebSocketFrame)) {
|
||||
@ -132,8 +132,8 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
|
||||
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
|
||||
// Generate an error page if response getStatus code is not OK (200).
|
||||
if (res.getStatus().code() != 200) {
|
||||
res.data().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.data().readableBytes());
|
||||
res.content().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.content().readableBytes());
|
||||
}
|
||||
|
||||
// Send the response and close the connection if necessary.
|
||||
|
@ -113,8 +113,8 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
|
||||
return;
|
||||
}
|
||||
if (frame instanceof PingWebSocketFrame) {
|
||||
frame.data().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.data()));
|
||||
frame.content().retain();
|
||||
ctx.channel().write(new PongWebSocketFrame(frame.content()));
|
||||
return;
|
||||
}
|
||||
if (!(frame instanceof TextWebSocketFrame)) {
|
||||
@ -134,8 +134,8 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
|
||||
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
|
||||
// Generate an error page if response getStatus code is not OK (200).
|
||||
if (res.getStatus().code() != 200) {
|
||||
res.data().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.data().readableBytes());
|
||||
res.content().writeBytes(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
|
||||
setContentLength(res, res.content().readableBytes());
|
||||
}
|
||||
|
||||
// Send the response and close the connection if necessary.
|
||||
|
@ -55,7 +55,7 @@ public class QuoteOfTheMomentClient {
|
||||
// Broadcast the QOTM request to port 8080.
|
||||
ch.write(new DatagramPacket(
|
||||
Unpooled.copiedBuffer("QOTM?", CharsetUtil.UTF_8),
|
||||
new InetSocketAddress("255.255.255.255", port)));
|
||||
new InetSocketAddress("255.255.255.255", port))).sync();
|
||||
|
||||
// QuoteOfTheMomentClientHandler will close the DatagramChannel when a
|
||||
// response is received. If the channel is not closed within 5 seconds,
|
||||
|
@ -26,7 +26,7 @@ public class QuoteOfTheMomentClientHandler extends ChannelInboundMessageHandlerA
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx, DatagramPacket msg)
|
||||
throws Exception {
|
||||
String response = msg.data().toString(CharsetUtil.UTF_8);
|
||||
String response = msg.content().toString(CharsetUtil.UTF_8);
|
||||
if (response.startsWith("QOTM: ")) {
|
||||
System.out.println("Quote of the Moment: " + response.substring(6));
|
||||
ctx.close();
|
||||
|
@ -44,13 +44,11 @@ public class QuoteOfTheMomentServerHandler extends ChannelInboundMessageHandlerA
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx, DatagramPacket msg)
|
||||
throws Exception {
|
||||
if ("QOTM?".equals(msg.data().toString(CharsetUtil.UTF_8))) {
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
System.err.println(msg);
|
||||
if ("QOTM?".equals(msg.content().toString(CharsetUtil.UTF_8))) {
|
||||
ctx.write(new DatagramPacket(
|
||||
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8),
|
||||
msg.remoteAddress()));
|
||||
Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), msg.sender()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ public class MsgEchoClientHandler extends
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx,
|
||||
final UdtMessage message) throws Exception {
|
||||
final ByteBuf byteBuf = message.data();
|
||||
final ByteBuf byteBuf = message.content();
|
||||
meter.mark(byteBuf.readableBytes());
|
||||
final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
|
||||
out.add(message.retain());
|
||||
|
@ -70,7 +70,7 @@ public class MsgEchoPeerHandler extends
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx,
|
||||
final UdtMessage message) throws Exception {
|
||||
final ByteBuf byteBuf = message.data();
|
||||
final ByteBuf byteBuf = message.content();
|
||||
meter.mark(byteBuf.readableBytes());
|
||||
final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
|
||||
out.add(message.retain());
|
||||
|
@ -99,14 +99,12 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
|
||||
private volatile boolean fail;
|
||||
|
||||
@Override
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx,
|
||||
DatagramPacket msg) throws Exception {
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
if (done) {
|
||||
fail = true;
|
||||
}
|
||||
|
||||
assertEquals(1, msg.data().readInt());
|
||||
assertEquals(1, msg.content().readInt());
|
||||
latch.countDown();
|
||||
|
||||
// mark the handler as done as we only are supposed to receive one message
|
||||
|
@ -43,7 +43,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
public void messageReceived(
|
||||
ChannelHandlerContext ctx,
|
||||
DatagramPacket msg) throws Exception {
|
||||
assertEquals(1, msg.data().readInt());
|
||||
assertEquals(1, msg.content().readInt());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -111,7 +111,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!data().equals(sctpFrame.data())) {
|
||||
if (!content().equals(sctpFrame.content())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -122,16 +122,16 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
||||
public int hashCode() {
|
||||
int result = streamIdentifier;
|
||||
result = 31 * result + protocolIdentifier;
|
||||
result = 31 * result + data().hashCode();
|
||||
result = 31 * result + content().hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SctpMessage copy() {
|
||||
if (msgInfo == null) {
|
||||
return new SctpMessage(protocolIdentifier, streamIdentifier, data().copy());
|
||||
return new SctpMessage(protocolIdentifier, streamIdentifier, content().copy());
|
||||
} else {
|
||||
return new SctpMessage(msgInfo, data().copy());
|
||||
return new SctpMessage(msgInfo, content().copy());
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,6 +156,6 @@ public final class SctpMessage extends DefaultByteBufHolder {
|
||||
}
|
||||
return "SctpFrame{" +
|
||||
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
|
||||
", data=" + BufUtil.hexDump(data()) + '}';
|
||||
", data=" + BufUtil.hexDump(content()) + '}';
|
||||
}
|
||||
}
|
||||
|
@ -289,7 +289,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
|
||||
@Override
|
||||
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
|
||||
SctpMessage packet = (SctpMessage) buf.peek();
|
||||
ByteBuf data = packet.data();
|
||||
ByteBuf data = packet.content();
|
||||
int dataLen = data.readableBytes();
|
||||
ByteBuffer nioData;
|
||||
if (data.nioBufferCount() == 1) {
|
||||
|
@ -226,7 +226,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ByteBuf data = packet.data();
|
||||
ByteBuf data = packet.content();
|
||||
int dataLen = data.readableBytes();
|
||||
ByteBuffer nioData;
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAd
|
||||
"pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName()));
|
||||
}
|
||||
|
||||
ctx.nextInboundByteBuffer().writeBytes(msg.data());
|
||||
ctx.nextInboundByteBuffer().writeBytes(msg.content());
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
|
||||
|
||||
final ByteBuf byteBuf = msg.data();
|
||||
final ByteBuf byteBuf = msg.content();
|
||||
final int protocolIdentifier = msg.protocolIdentifier();
|
||||
final int streamIdentifier = msg.streamIdentifier();
|
||||
final boolean isComplete = msg.isComplete();
|
||||
|
@ -15,12 +15,11 @@
|
||||
*/
|
||||
package io.netty.channel.udt;
|
||||
|
||||
import com.barchart.udt.TypeUDT;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.DefaultByteBufHolder;
|
||||
import io.netty.channel.udt.nio.NioUdtProvider;
|
||||
|
||||
import com.barchart.udt.TypeUDT;
|
||||
|
||||
/**
|
||||
* The message container that is used for {@link TypeUDT#DATAGRAM} messages.
|
||||
* @see {@link NioUdtProvider#MESSAGE_CONNECTOR}
|
||||
@ -34,7 +33,7 @@ public final class UdtMessage extends DefaultByteBufHolder {
|
||||
|
||||
@Override
|
||||
public UdtMessage copy() {
|
||||
return new UdtMessage(data().copy());
|
||||
return new UdtMessage(content().copy());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,7 +177,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel
|
||||
// expects a message
|
||||
final UdtMessage message = (UdtMessage) messageQueue.peek();
|
||||
|
||||
final ByteBuf byteBuf = message.data();
|
||||
final ByteBuf byteBuf = message.content();
|
||||
|
||||
final int messageSize = byteBuf.readableBytes();
|
||||
|
||||
|
@ -77,7 +77,7 @@ public class EchoMessageHandler extends
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final UdtMessage message) throws Exception {
|
||||
|
||||
final ByteBuf byteBuf = message.data();
|
||||
final ByteBuf byteBuf = message.content();
|
||||
|
||||
if (meter != null) {
|
||||
meter.mark(byteBuf.readableBytes());
|
||||
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ReferenceCounted;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* A message that wraps another message with a sender address and a recipient address.
|
||||
*
|
||||
* @param <M> the type of the wrapped message
|
||||
* @param <A> the type of the address
|
||||
*/
|
||||
public interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted {
|
||||
/**
|
||||
* Returns the message wrapped by this envelope message.
|
||||
*/
|
||||
M content();
|
||||
|
||||
/**
|
||||
* Returns the address of the sender of this message.
|
||||
*/
|
||||
A sender();
|
||||
|
||||
/**
|
||||
* Returns the address of the recipient of this message.
|
||||
*/
|
||||
A recipient();
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.BufUtil;
|
||||
import io.netty.buffer.ReferenceCounted;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* The default {@link AddressedEnvelope} implementation.
|
||||
*
|
||||
* @param <M> the type of the wrapped message
|
||||
* @param <A> the type of the recipient address
|
||||
*/
|
||||
public class DefaultAddressedEnvelope<M, A extends SocketAddress> implements AddressedEnvelope<M, A> {
|
||||
|
||||
private final M message;
|
||||
private final A sender;
|
||||
private final A recipient;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@code message}, {@code recipient} address, and
|
||||
* {@code sender} address.
|
||||
*/
|
||||
public DefaultAddressedEnvelope(M message, A recipient, A sender) {
|
||||
if (message == null) {
|
||||
throw new NullPointerException("message");
|
||||
}
|
||||
|
||||
this.message = message;
|
||||
this.sender = sender;
|
||||
this.recipient = recipient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@code message} and {@code recipient} address.
|
||||
* The sender address becomes {@code null}.
|
||||
*/
|
||||
public DefaultAddressedEnvelope(M message, A recipient) {
|
||||
this(message, recipient, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public M content() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public A sender() {
|
||||
return sender;
|
||||
}
|
||||
|
||||
@Override
|
||||
public A recipient() {
|
||||
return recipient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int refCnt() {
|
||||
if (message instanceof ReferenceCounted) {
|
||||
return ((ReferenceCounted) message).refCnt();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressedEnvelope<M, A> retain() {
|
||||
BufUtil.retain(message);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressedEnvelope<M, A> retain(int increment) {
|
||||
BufUtil.retain(message, increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
return BufUtil.release(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(int decrement) {
|
||||
return BufUtil.release(message, decrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (sender != null) {
|
||||
return StringUtil.simpleClassName(this) +
|
||||
'(' + sender + " => " + recipient + ", " + message + ')';
|
||||
} else {
|
||||
return StringUtil.simpleClassName(this) +
|
||||
"(=> " + recipient + ", " + message + ')';
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import io.netty.channel.Channel.Unsafe;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -282,13 +283,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
synchronized (cache) {
|
||||
name = cache.get(handlerType);
|
||||
if (name == null) {
|
||||
Package pkg = handlerType.getPackage();
|
||||
if (pkg != null) {
|
||||
name = handlerType.getName().substring(pkg.getName().length() + 1);
|
||||
} else {
|
||||
name = handlerType.getName();
|
||||
}
|
||||
name += "#0";
|
||||
name = StringUtil.simpleClassName(handlerType) + "#0";
|
||||
cache.put(handlerType, name);
|
||||
}
|
||||
}
|
||||
|
@ -15,44 +15,36 @@
|
||||
*/
|
||||
package io.netty.channel.socket;
|
||||
|
||||
import io.netty.buffer.BufUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.DefaultByteBufHolder;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.channel.DefaultAddressedEnvelope;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* The message container that is used for {@link DatagramChannel} to communicate with the remote peer.
|
||||
*/
|
||||
public final class DatagramPacket extends DefaultByteBufHolder {
|
||||
|
||||
private final InetSocketAddress remoteAddress;
|
||||
public final class DatagramPacket
|
||||
extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {
|
||||
|
||||
/**
|
||||
* Create a new instance
|
||||
*
|
||||
* @param data the {@link ByteBuf} which holds the data of the packet
|
||||
* @param remoteAddress the (@link InetSocketAddress} from which the packet was received or to which the
|
||||
* packet will be send
|
||||
* Create a new instance with the specified packet {@code data} and {@code recipient} address.
|
||||
*/
|
||||
public DatagramPacket(ByteBuf data, InetSocketAddress remoteAddress) {
|
||||
super(data);
|
||||
if (remoteAddress == null) {
|
||||
throw new NullPointerException("remoteAddress");
|
||||
}
|
||||
|
||||
this.remoteAddress = remoteAddress;
|
||||
public DatagramPacket(ByteBuf data, InetSocketAddress recipient) {
|
||||
super(data, recipient);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link InetSocketAddress} which this {@link DatagramPacket} will send to or was received from.
|
||||
* Create a new instance with the specified packet {@code data}, {@code recipient} address, and {@code sender}
|
||||
* address.
|
||||
*/
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return remoteAddress;
|
||||
public DatagramPacket(ByteBuf data, InetSocketAddress recipient, InetSocketAddress sender) {
|
||||
super(data, recipient, sender);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatagramPacket copy() {
|
||||
return new DatagramPacket(data().copy(), remoteAddress());
|
||||
return new DatagramPacket(content().copy(), recipient(), sender());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -66,14 +58,4 @@ public final class DatagramPacket extends DefaultByteBufHolder {
|
||||
super.retain(increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (refCnt() == 0) {
|
||||
return "DatagramPacket{remoteAddress=" + remoteAddress().toString() +
|
||||
", data=(FREED)}";
|
||||
}
|
||||
return "DatagramPacket{remoteAddress=" + remoteAddress().toString() +
|
||||
", data=" + BufUtil.hexDump(data()) + '}';
|
||||
}
|
||||
}
|
||||
|
@ -16,8 +16,12 @@
|
||||
package io.netty.channel.socket.nio;
|
||||
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.buffer.BufUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.buffer.MessageBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
@ -27,6 +31,7 @@ import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
@ -45,8 +50,11 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel} which can be used
|
||||
* to send and receive {@link DatagramPacket}'s.
|
||||
* An NIO datagram {@link Channel} that sends and receives an
|
||||
* {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
|
||||
*
|
||||
* @see AddressedEnvelope
|
||||
* @see DatagramPacket
|
||||
*/
|
||||
public final class NioDatagramChannel
|
||||
extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
|
||||
@ -192,16 +200,18 @@ public final class NioDatagramChannel
|
||||
@Override
|
||||
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
|
||||
DatagramChannel ch = javaChannel();
|
||||
ByteBuf buffer = alloc().directBuffer(config().getReceivePacketSize());
|
||||
ByteBuf data = alloc().directBuffer(config().getReceivePacketSize());
|
||||
boolean free = true;
|
||||
try {
|
||||
ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
|
||||
ByteBuffer nioData = data.nioBuffer(data.writerIndex(), data.writableBytes());
|
||||
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data);
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
|
||||
if (remoteAddress == null) {
|
||||
return 0;
|
||||
}
|
||||
buf.add(new DatagramPacket(buffer.writerIndex(buffer.writerIndex() + data.position()), remoteAddress));
|
||||
|
||||
data.writerIndex(data.writerIndex() + nioData.position());
|
||||
buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
|
||||
free = false;
|
||||
return 1;
|
||||
} catch (Throwable cause) {
|
||||
@ -209,15 +219,36 @@ public final class NioDatagramChannel
|
||||
return -1;
|
||||
} finally {
|
||||
if (free) {
|
||||
buffer.release();
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
|
||||
DatagramPacket packet = (DatagramPacket) buf.peek();
|
||||
ByteBuf data = packet.data();
|
||||
final Object o = buf.peek();
|
||||
final Object m;
|
||||
final ByteBuf data;
|
||||
final SocketAddress remoteAddress;
|
||||
if (o instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
|
||||
remoteAddress = envelope.recipient();
|
||||
m = envelope.content();
|
||||
} else {
|
||||
m = o;
|
||||
remoteAddress = null;
|
||||
}
|
||||
|
||||
if (m instanceof ByteBufHolder) {
|
||||
data = ((ByteBufHolder) m).content();
|
||||
} else if (m instanceof ByteBuf) {
|
||||
data = (ByteBuf) m;
|
||||
} else {
|
||||
BufUtil.release(buf.remove());
|
||||
throw new ChannelException("unsupported message type: " + StringUtil.simpleClassName(o));
|
||||
}
|
||||
|
||||
int dataLen = data.readableBytes();
|
||||
ByteBuffer nioData;
|
||||
if (data.nioBufferCount() == 1) {
|
||||
@ -228,7 +259,12 @@ public final class NioDatagramChannel
|
||||
nioData.flip();
|
||||
}
|
||||
|
||||
final int writtenBytes = javaChannel().send(nioData, packet.remoteAddress());
|
||||
final int writtenBytes;
|
||||
if (remoteAddress != null) {
|
||||
writtenBytes = javaChannel().send(nioData, remoteAddress);
|
||||
} else {
|
||||
writtenBytes = javaChannel().write(nioData);
|
||||
}
|
||||
|
||||
final SelectionKey key = selectionKey();
|
||||
final int interestOps = key.interestOps();
|
||||
@ -246,11 +282,8 @@ public final class NioDatagramChannel
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Wrote a packet.
|
||||
buf.remove();
|
||||
|
||||
// packet was written free up buffer
|
||||
packet.release();
|
||||
// Wrote a packet - free the message.
|
||||
BufUtil.release(buf.remove());
|
||||
|
||||
if (buf.isEmpty()) {
|
||||
// Wrote the outbound buffer completely - clear OP_WRITE.
|
||||
|
@ -16,8 +16,12 @@
|
||||
package io.netty.channel.socket.oio;
|
||||
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.buffer.BufUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.buffer.MessageBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
@ -29,6 +33,7 @@ import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.DefaultDatagramChannelConfig;
|
||||
import io.netty.util.internal.EmptyArrays;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -43,8 +48,11 @@ import java.net.SocketTimeoutException;
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
* {@link DatagramChannel} implementation which use Old-Blocking-IO. It can be used to read and write
|
||||
* {@link DatagramPacket}s via UDP.
|
||||
* An OIO datagram {@link Channel} that sends and receives an
|
||||
* {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
|
||||
*
|
||||
* @see AddressedEnvelope
|
||||
* @see DatagramPacket
|
||||
*/
|
||||
public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
implements DatagramChannel {
|
||||
@ -193,11 +201,11 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
@Override
|
||||
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
|
||||
int packetSize = config().getReceivePacketSize();
|
||||
ByteBuf buffer = alloc().heapBuffer(packetSize);
|
||||
ByteBuf data = alloc().heapBuffer(packetSize);
|
||||
boolean free = true;
|
||||
|
||||
try {
|
||||
tmpPacket.setData(buffer.array(), buffer.arrayOffset(), packetSize);
|
||||
tmpPacket.setData(data.array(), data.arrayOffset(), packetSize);
|
||||
socket.receive(tmpPacket);
|
||||
|
||||
InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
|
||||
@ -205,8 +213,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
remoteAddr = remoteAddress();
|
||||
}
|
||||
|
||||
DatagramPacket packet = new DatagramPacket(buffer.writerIndex(tmpPacket.getLength()), remoteAddr);
|
||||
buf.add(packet);
|
||||
buf.add(new DatagramPacket(data.writerIndex(tmpPacket.getLength()), localAddress(), remoteAddr));
|
||||
free = false;
|
||||
return 1;
|
||||
} catch (SocketTimeoutException e) {
|
||||
@ -222,21 +229,40 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
return -1;
|
||||
} finally {
|
||||
if (free) {
|
||||
buffer.release();
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
|
||||
DatagramPacket p = (DatagramPacket) buf.poll();
|
||||
final Object o = buf.poll();
|
||||
final Object m;
|
||||
final ByteBuf data;
|
||||
final SocketAddress remoteAddress;
|
||||
if (o instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o;
|
||||
remoteAddress = envelope.recipient();
|
||||
m = envelope.content();
|
||||
} else {
|
||||
m = o;
|
||||
remoteAddress = null;
|
||||
}
|
||||
|
||||
if (m instanceof ByteBufHolder) {
|
||||
data = ((ByteBufHolder) m).content();
|
||||
} else if (m instanceof ByteBuf) {
|
||||
data = (ByteBuf) m;
|
||||
} else {
|
||||
BufUtil.release(buf.remove());
|
||||
throw new ChannelException("unsupported message type: " + StringUtil.simpleClassName(o));
|
||||
}
|
||||
|
||||
try {
|
||||
ByteBuf data = p.data();
|
||||
int length = data.readableBytes();
|
||||
InetSocketAddress remote = p.remoteAddress();
|
||||
if (remote != null) {
|
||||
tmpPacket.setSocketAddress(remote);
|
||||
if (remoteAddress != null) {
|
||||
tmpPacket.setSocketAddress(remoteAddress);
|
||||
}
|
||||
if (data.hasArray()) {
|
||||
tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
|
||||
@ -247,7 +273,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
socket.send(tmpPacket);
|
||||
} finally {
|
||||
p.release();
|
||||
BufUtil.release(o);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user