Retrofit the codec framework with the new API (in progress)

- Replaced FrameDecoder and OneToOne(Encoder|Decoder) with:
  - (Stream|Message)To(String|Message)(Encoder|Decoder)
- Moved the classes in 'codec.frame' up to 'codec'
- Fixed some bugs found while running unit tests
This commit is contained in:
Trustin Lee 2012-05-16 23:02:06 +09:00
parent 894ececbb7
commit 92a688e5b2
121 changed files with 1260 additions and 1423 deletions

View File

@ -29,7 +29,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
/**

View File

@ -22,7 +22,7 @@ import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder;
/**

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
/**

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
/**

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder;
import io.netty.handler.codec.replay.VoidEnum;

View File

@ -58,8 +58,8 @@ import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.CorruptedFrameException;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;

View File

@ -59,7 +59,7 @@ import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;

View File

@ -155,7 +155,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
ChannelFuture future = channel.write(request);
channel.getPipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket00FrameEncoder());
channel.pipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket00FrameEncoder());
return future;
}
@ -210,7 +210,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
String protocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(protocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder", new WebSocket00FrameDecoder());
channel.pipeline().replace(HttpResponseDecoder.class, "ws-decoder", new WebSocket00FrameDecoder());
setHandshakeComplete();
}

View File

@ -136,7 +136,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
ChannelFuture future = channel.write(request);
channel.getPipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket08FrameEncoder(true));
channel.pipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket08FrameEncoder(true));
return future;
}
@ -186,7 +186,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
expectedChallengeResponseString));
}
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
channel.pipeline().replace(HttpResponseDecoder.class, "ws-decoder",
new WebSocket08FrameDecoder(false, allowExtensions));
setHandshakeComplete();

View File

@ -136,7 +136,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
ChannelFuture future = channel.write(request);
channel.getPipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket13FrameEncoder(true));
channel.pipeline().replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket13FrameEncoder(true));
return future;
}
@ -186,7 +186,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
expectedChallengeResponseString));
}
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
channel.pipeline().replace(HttpResponseDecoder.class, "ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions));
setHandshakeComplete();

View File

@ -163,7 +163,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
}
// Upgrade the connection and send the handshake response.
ChannelPipeline p = channel.getPipeline();
ChannelPipeline p = channel.pipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}

View File

@ -137,7 +137,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
ChannelPipeline p = channel.getPipeline();
ChannelPipeline p = channel.pipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}

View File

@ -138,7 +138,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
ChannelPipeline p = channel.getPipeline();
ChannelPipeline p = channel.pipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}

View File

@ -18,8 +18,8 @@ package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMessageDecoder;

View File

@ -16,7 +16,7 @@
package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpRequest;

View File

@ -16,7 +16,7 @@
package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse;

View File

@ -19,7 +19,7 @@ import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.FrameDecoder;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;

View File

@ -24,7 +24,7 @@ import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;

View File

@ -55,7 +55,7 @@ public class WebSocketServerHandshaker00Test {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
EasyMock.expect(channelMock.getPipeline()).andReturn(pipeline);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers
Capture<HttpResponse> res = new Capture<HttpResponse>();

View File

@ -51,7 +51,7 @@ public class WebSocketServerHandshaker08Test {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
EasyMock.expect(channelMock.getPipeline()).andReturn(pipeline);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers
Capture<HttpResponse> res = new Capture<HttpResponse>();

View File

@ -51,7 +51,7 @@ public class WebSocketServerHandshaker13Test {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
EasyMock.expect(channelMock.getPipeline()).andReturn(pipeline);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers
Capture<HttpResponse> res = new Capture<HttpResponse>();

View File

@ -157,11 +157,11 @@ public abstract class AbstractSocketSpdyEchoTest {
EchoHandler sh = new EchoHandler(true);
EchoHandler ch = new EchoHandler(false);
sb.getPipeline().addLast("decoder", new SpdyFrameDecoder());
sb.getPipeline().addLast("encoder", new SpdyFrameEncoder());
sb.getPipeline().addLast("handler", sh);
sb.pipeline().addLast("decoder", new SpdyFrameDecoder());
sb.pipeline().addLast("encoder", new SpdyFrameEncoder());
sb.pipeline().addLast("handler", sh);
cb.getPipeline().addLast("handler", ch);
cb.pipeline().addLast("handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -13,11 +13,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
/**
* An {@link Exception} which is thrown when the received frame data can not
* be decoded by a {@link FrameDecoder} implementation.
* An {@link Exception} which is thrown when the received frame data could not be decoded by
* an inbound handler.
*
* @apiviz.hidden
*/
public class CorruptedFrameException extends Exception {

View File

@ -13,12 +13,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.ChannelInboundHandlerContext;
/**
* A decoder that splits the received {@link ChannelBuffer}s by one or more
@ -56,7 +55,7 @@ import io.netty.channel.Channels;
* </pre>
* @apiviz.uses io.netty.handler.codec.frame.Delimiters - - useful
*/
public class DelimiterBasedFrameDecoder extends FrameDecoder {
public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
private final ChannelBuffer[] delimiters;
private final int maxFrameLength;
@ -188,8 +187,7 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
}
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ChannelBuffer minDelim = null;
@ -256,14 +254,12 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
private void fail(ChannelHandlerContext ctx, long frameLength) {
if (frameLength > 0) {
Channels.fireExceptionCaught(
ctx.channel(),
ctx.fireExceptionCaught(
new TooLongFrameException(
"frame length exceeds " + maxFrameLength +
": " + frameLength + " - discarded"));
} else {
Channels.fireExceptionCaught(
ctx.channel(),
ctx.fireExceptionCaught(
new TooLongFrameException(
"frame length exceeds " + maxFrameLength +
" - discarding"));

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;

View File

@ -13,13 +13,13 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerContext;
/**
* A decoder that splits the received {@link ChannelBuffer}s by the fixed number
@ -37,7 +37,7 @@ import io.netty.channel.ChannelHandlerContext;
* +-----+-----+-----+
* </pre>
*/
public class FixedLengthFrameDecoder extends FrameDecoder {
public class FixedLengthFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
private final int frameLength;
private final boolean allocateFullBuffer;
@ -63,25 +63,23 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
this.frameLength = frameLength;
this.allocateFullBuffer = allocateFullBuffer;
}
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < frameLength) {
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
if (allocateFullBuffer) {
return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer(frameLength));
} else {
return super.newInboundBuffer(ctx);
}
}
@Override
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return buffer.readBytes(frameLength);
return in.readBytes(frameLength);
}
}
@Override
protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory();
if (allocateFullBuffer) {
return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), frameLength, ctx.channel().getConfig().getBufferFactory());
}
return super.newCumulationBuffer(ctx, minimumCapacity);
}
}

View File

@ -13,13 +13,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
@ -181,7 +180,7 @@ import io.netty.handler.codec.serialization.ObjectDecoder;
* </pre>
* @see LengthFieldPrepender
*/
public class LengthFieldBasedFrameDecoder extends FrameDecoder {
public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
private final int maxFrameLength;
private final int lengthFieldOffset;
@ -309,55 +308,53 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
}
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, buffer.readableBytes());
buffer.skipBytes(localBytesToDiscard);
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(ctx, false);
return null;
}
if (buffer.readableBytes() < lengthFieldEndOffset) {
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = buffer.readerIndex() + lengthFieldOffset;
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength;
switch (lengthFieldLength) {
case 1:
frameLength = buffer.getUnsignedByte(actualLengthFieldOffset);
frameLength = in.getUnsignedByte(actualLengthFieldOffset);
break;
case 2:
frameLength = buffer.getUnsignedShort(actualLengthFieldOffset);
frameLength = in.getUnsignedShort(actualLengthFieldOffset);
break;
case 3:
frameLength = buffer.getUnsignedMedium(actualLengthFieldOffset);
frameLength = in.getUnsignedMedium(actualLengthFieldOffset);
break;
case 4:
frameLength = buffer.getUnsignedInt(actualLengthFieldOffset);
frameLength = in.getUnsignedInt(actualLengthFieldOffset);
break;
case 8:
frameLength = buffer.getLong(actualLengthFieldOffset);
frameLength = in.getLong(actualLengthFieldOffset);
break;
default:
throw new Error("should not reach here");
}
if (frameLength < 0) {
buffer.skipBytes(lengthFieldEndOffset);
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
buffer.skipBytes(lengthFieldEndOffset);
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
@ -367,31 +364,31 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
tooLongFrameLength = frameLength;
bytesToDiscard = frameLength - buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
bytesToDiscard = frameLength - in.readableBytes();
in.skipBytes(in.readableBytes());
failIfNecessary(ctx, true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (buffer.readableBytes() < frameLengthInt) {
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
buffer.skipBytes(frameLengthInt);
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
buffer.skipBytes(initialBytesToStrip);
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = buffer.readerIndex();
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ChannelBuffer frame = extractFrame(buffer, readerIndex, actualFrameLength);
buffer.readerIndex(readerIndex + actualFrameLength);
ChannelBuffer frame = extractFrame(in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
@ -402,14 +399,14 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
if ((!failFast) ||
(failFast && firstDetectionOfTooLongFrame)) {
if (!failFast ||
failFast && firstDetectionOfTooLongFrame) {
fail(ctx, tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) {
fail(ctx, this.tooLongFrameLength);
fail(ctx, tooLongFrameLength);
}
}
@ -417,13 +414,13 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
/**
* Extract the sub-region of the specified buffer. This method is called by
* {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer)} for each
* {@link #decode(ChannelInboundHandlerContext, ChannelBuffer)} for each
* frame. The default implementation returns a copy of the sub-region.
* For example, you could override this method to use an alternative
* {@link ChannelBufferFactory}.
* <p>
* If you are sure that the frame and its content are not accessed after
* the current {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer)}
* the current {@link #decode(ChannelInboundHandlerContext, ChannelBuffer)}
* call returns, you can even avoid memory copy by returning the sliced
* sub-region (i.e. <tt>return buffer.slice(index, length)</tt>).
* It's often useful when you convert the extracted frame into an object.
@ -438,14 +435,12 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
private void fail(ChannelHandlerContext ctx, long frameLength) {
if (frameLength > 0) {
Channels.fireExceptionCaught(
ctx.channel(),
ctx.fireExceptionCaught(
new TooLongFrameException(
"Adjusted frame length exceeds " + maxFrameLength +
": " + frameLength + " - discarded"));
} else {
Channels.fireExceptionCaught(
ctx.channel(),
ctx.fireExceptionCaught(
new TooLongFrameException(
"Adjusted frame length exceeds " + maxFrameLength +
" - discarding"));

View File

@ -13,18 +13,14 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
import static io.netty.buffer.ChannelBuffers.*;
import java.nio.ByteOrder;
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelOutboundHandlerContext;
import java.nio.ByteOrder;
/**
* An encoder that prepends the length of the message. The length value is
@ -55,7 +51,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
* </pre>
*/
@Sharable
public class LengthFieldPrepender extends OneToOneEncoder {
public class LengthFieldPrepender extends MessageToStreamEncoder<ChannelBuffer> {
private final int lengthFieldLength;
private final boolean lengthIncludesLengthFieldLength;
@ -101,48 +97,44 @@ public class LengthFieldPrepender extends OneToOneEncoder {
}
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer body = (ChannelBuffer) msg;
ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(body.order(), lengthFieldLength);
public void encode(
ChannelOutboundHandlerContext<ChannelBuffer> ctx,
ChannelBuffer msg, ChannelBuffer out) throws Exception {
int length = lengthIncludesLengthFieldLength?
body.readableBytes() + lengthFieldLength : body.readableBytes();
msg.readableBytes() + lengthFieldLength : msg.readableBytes();
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
header.writeByte((byte) length);
out.writeByte((byte) length);
break;
case 2:
if (length >= 65536) {
throw new IllegalArgumentException(
"length does not fit into a short integer: " + length);
}
header.writeShort((short) length);
out.writeShort((short) length);
break;
case 3:
if (length >= 16777216) {
throw new IllegalArgumentException(
"length does not fit into a medium integer: " + length);
}
header.writeMedium(length);
out.writeMedium(length);
break;
case 4:
header.writeInt(length);
out.writeInt(length);
break;
case 8:
header.writeLong(length);
out.writeLong(length);
break;
default:
throw new Error("should not reach here");
}
return wrappedBuffer(header, body);
out.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
}
}

View File

@ -0,0 +1,51 @@
package io.netty.handler.codec;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import java.util.Queue;
public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandlerAdapter<I> {
@Override
public ChannelBufferHolder<I> newInboundBuffer(
ChannelInboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx)
throws Exception {
Queue<I> in = ctx.in().messageBuffer();
boolean decoded = false;
for (;;) {
try {
I msg = in.poll();
if (msg == null) {
break;
}
O emsg = decode(ctx, msg);
if (emsg == null) {
throw new IllegalArgumentException(
"decode() returned null. unsupported message type? " +
msg.getClass().getName());
}
if (unfoldAndAdd(ctx, ctx.nextIn(), emsg)) {
decoded = true;
}
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
if (decoded) {
ctx.fireInboundBufferUpdated();
}
}
public abstract O decode(ChannelInboundHandlerContext<I> ctx, I msg) throws Exception;
}

View File

@ -0,0 +1,109 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerContext;
import java.util.Queue;
public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundHandlerAdapter<I> {
@Override
public ChannelBufferHolder<I> newOutboundBuffer(
ChannelOutboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void flush(ChannelOutboundHandlerContext<I> ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.prevOut().messageBuffer();
boolean encoded = false;
for (;;) {
try {
I msg = in.poll();
if (msg == null) {
break;
}
O emsg = encode(ctx, msg);
if (emsg == null) {
throw new IllegalArgumentException(
"encode() returned null. unsupported message type? " +
msg.getClass().getName());
}
if (unfoldAndAdd(ctx, ctx.out(), emsg)) {
encoded = true;
}
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
if (encoded) {
ctx.flush(future);
}
}
public abstract O encode(ChannelOutboundHandlerContext<I> ctx, I msg) throws Exception;
static <T> boolean unfoldAndAdd(
ChannelHandlerContext ctx, ChannelBufferHolder<Object> dst, Object msg) throws Exception {
if (msg == null) {
return false;
}
if (msg instanceof Object[]) {
Object[] array = (Object[]) msg;
if (array.length == 0) {
return false;
}
boolean added = false;
for (Object m: array) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, dst, m)) {
added = true;
}
}
return added;
}
if (msg instanceof Iterable) {
boolean added = false;
@SuppressWarnings("unchecked")
Iterable<Object> i = (Iterable<Object>) msg;
for (Object m: i) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, dst, m)) {
added = true;
}
}
return added;
}
if (dst.hasMessageBuffer()) {
dst.messageBuffer().add(msg);
} else if (msg instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) msg;
if (!buf.readable()) {
return false;
}
dst.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
} else {
throw new IllegalArgumentException(
"message cannot be written to byte buffer if it is not " +
ChannelBuffer.class.getSimpleName() + '.');
}
return true;
}
}

View File

@ -0,0 +1,45 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerContext;
import java.util.Queue;
public abstract class MessageToStreamEncoder<I> extends ChannelOutboundHandlerAdapter<I> {
@Override
public ChannelBufferHolder<I> newOutboundBuffer(
ChannelOutboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void flush(ChannelOutboundHandlerContext<I> ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.prevOut().messageBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
int oldOutSize = out.readableBytes();
for (;;) {
I msg = in.poll();
if (msg == null) {
break;
}
try {
encode(ctx, msg, out);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
if (out.readableBytes() > oldOutSize) {
ctx.flush(future);
}
}
public abstract void encode(ChannelOutboundHandlerContext<I> ctx, I msg, ChannelBuffer out) throws Exception;
}

View File

@ -0,0 +1,69 @@
package io.netty.handler.codec;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
callDecode(ctx);
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer();
if (!in.readable()) {
callDecode(ctx);
}
try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, in))) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
ctx.fireChannelInactive();
}
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
boolean decoded = false;
for (;;) {
try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decode(ctx, in))) {
decoded = true;
} else {
break;
}
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
if (decoded) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
}
public abstract O decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception;
public O decodeLast(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
return decode(ctx, in);
}
}

View File

@ -0,0 +1,73 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
callDecode(ctx);
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer();
if (!in.readable()) {
callDecode(ctx);
}
ChannelBuffer out = ctx.nextIn().byteBuffer();
int oldOutSize = out.readableBytes();
try {
decodeLast(ctx, in, out);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
ctx.fireChannelInactive();
}
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer out = ctx.nextIn().byteBuffer();
int oldOutSize = out.readableBytes();
while (in.readable()) {
int oldInSize = in.readableBytes();
try {
decode(ctx, in, out);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
if (oldInSize == in.readableBytes()) {
break;
}
}
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
}
public abstract void decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in, ChannelBuffer out) throws Exception;
public void decodeLast(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in, ChannelBuffer out) throws Exception {
decode(ctx, in, out);
}
}

View File

@ -0,0 +1,43 @@
package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerContext;
public abstract class StreamToStreamEncoder extends ChannelOutboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelOutboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public void flush(ChannelOutboundHandlerContext<Byte> ctx, ChannelFuture future) throws Exception {
ChannelBuffer in = ctx.prevOut().byteBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
int oldOutSize = out.readableBytes();
while (in.readable()) {
int oldInSize = in.readableBytes();
try {
encode(ctx, in, out);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
if (oldInSize == in.readableBytes()) {
break;
}
}
if (out.readableBytes() > oldOutSize) {
in.discardReadBytes();
ctx.flush(future);
}
}
public abstract void encode(ChannelOutboundHandlerContext<Byte> ctx, ChannelBuffer in, ChannelBuffer out) throws Exception;
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;
/**
* An {@link Exception} which is thrown when the length of the frame

View File

@ -16,21 +16,18 @@
package io.netty.handler.codec.base64;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.oneone.OneToOneDecoder;
import io.netty.util.CharsetUtil;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.StreamToMessageDecoder;
/**
* Decodes a Base64-encoded {@link ChannelBuffer} or US-ASCII {@link String}
* into a {@link ChannelBuffer}. Please note that this decoder must be used
* with a proper {@link FrameDecoder} such as {@link DelimiterBasedFrameDecoder}
* with a proper {@link StreamToMessageDecoder} such as {@link DelimiterBasedFrameDecoder}
* if you are using a stream-based transport such as TCP/IP. A typical decoder
* setup for TCP/IP would be:
* <pre>
@ -47,7 +44,7 @@ import io.netty.util.CharsetUtil;
* @apiviz.uses io.netty.handler.codec.base64.Base64
*/
@Sharable
public class Base64Decoder extends OneToOneDecoder {
public class Base64Decoder extends MessageToMessageDecoder<ChannelBuffer, ChannelBuffer> {
private final Base64Dialect dialect;
@ -63,17 +60,7 @@ public class Base64Decoder extends OneToOneDecoder {
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
if (msg instanceof String) {
msg = ChannelBuffers.copiedBuffer((String) msg, CharsetUtil.US_ASCII);
} else if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer src = (ChannelBuffer) msg;
return Base64.decode(
src, src.readerIndex(), src.readableBytes(), dialect);
public ChannelBuffer decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
return Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect);
}
}

View File

@ -16,13 +16,12 @@
package io.netty.handler.codec.base64;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.MessageToMessageEncoder;
/**
* Encodes a {@link ChannelBuffer} into a Base64-encoded {@link ChannelBuffer}.
@ -41,7 +40,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
* @apiviz.uses io.netty.handler.codec.base64.Base64
*/
@Sharable
public class Base64Encoder extends OneToOneEncoder {
public class Base64Encoder extends MessageToMessageEncoder<ChannelBuffer, ChannelBuffer> {
private final boolean breakLines;
private final Base64Dialect dialect;
@ -64,15 +63,8 @@ public class Base64Encoder extends OneToOneEncoder {
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer src = (ChannelBuffer) msg;
return Base64.encode(
src, src.readerIndex(), src.readableBytes(),
breakLines, dialect);
public ChannelBuffer encode(ChannelOutboundHandlerContext<ChannelBuffer> ctx,
ChannelBuffer msg) throws Exception {
return Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect);
}
}

View File

@ -16,13 +16,12 @@
package io.netty.handler.codec.bytes;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.frame.LengthFieldPrepender;
import io.netty.handler.codec.oneone.OneToOneDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* Decodes a received {@link ChannelBuffer} into an array of bytes.
@ -49,31 +48,26 @@ import io.netty.handler.codec.oneone.OneToOneDecoder;
* }
* </pre>
*/
public class ByteArrayDecoder extends OneToOneDecoder {
public class ByteArrayDecoder extends MessageToMessageDecoder<ChannelBuffer, byte[]> {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer buf = (ChannelBuffer) msg;
public byte[] decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
byte[] array;
if (buf.hasArray()) {
if (buf.arrayOffset() == 0 && buf.readableBytes() == buf.capacity()) {
if (msg.hasArray()) {
if (msg.arrayOffset() == 0 && msg.readableBytes() == msg.capacity()) {
// we have no offset and the length is the same as the capacity. Its safe to reuse the array without copy it first
array = buf.array();
array = msg.array();
} else {
// copy the ChannelBuffer to a byte array
array = new byte[buf.readableBytes()];
buf.getBytes(0, array);
array = new byte[msg.readableBytes()];
msg.getBytes(0, array);
}
} else {
// copy the ChannelBuffer to a byte array
array = new byte[buf.readableBytes()];
buf.getBytes(0, array);
array = new byte[msg.readableBytes()];
msg.getBytes(0, array);
}
return array;
}
}

View File

@ -15,16 +15,16 @@
*/
package io.netty.handler.codec.bytes;
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.frame.LengthFieldPrepender;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder;
/**
* Encodes the requested array of bytes into a {@link ChannelBuffer}.
@ -51,14 +51,15 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
* }
* </pre>
*/
public class ByteArrayEncoder extends OneToOneEncoder {
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[], ChannelBuffer> {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof byte[])) {
return msg;
}
return wrappedBuffer((byte[]) msg);
public ChannelBufferHolder<byte[]> newOutboundBuffer(ChannelOutboundHandlerContext<byte[]> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public ChannelBuffer encode(ChannelOutboundHandlerContext<byte[]> ctx, byte[] msg) throws Exception {
return ChannelBuffers.wrappedBuffer(msg);
}
}

View File

@ -16,10 +16,8 @@
package io.netty.handler.codec.compression;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.oneone.OneToOneDecoder;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.StreamToStreamDecoder;
import io.netty.util.internal.jzlib.JZlib;
import io.netty.util.internal.jzlib.ZStream;
@ -29,7 +27,7 @@ import io.netty.util.internal.jzlib.ZStream;
* @apiviz.landmark
* @apiviz.has io.netty.handler.codec.compression.ZlibWrapper
*/
public class ZlibDecoder extends OneToOneDecoder {
public class ZlibDecoder extends StreamToStreamDecoder {
private final ZStream z = new ZStream();
private byte[] dictionary;
@ -93,39 +91,62 @@ public class ZlibDecoder extends OneToOneDecoder {
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer) || finished) {
return msg;
}
public void decode(
ChannelInboundHandlerContext<Byte> ctx,
ChannelBuffer in, ChannelBuffer out) throws Exception {
synchronized (z) {
try {
// Configure input.
ChannelBuffer compressed = (ChannelBuffer) msg;
byte[] in = new byte[compressed.readableBytes()];
compressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
int inputLength = in.readableBytes();
boolean inHasArray = in.hasArray();
z.avail_in = inputLength;
if (inHasArray) {
z.next_in = in.array();
z.next_in_index = in.arrayOffset() + in.readerIndex();
} else {
byte[] array = new byte[inputLength];
in.readBytes(array);
z.next_in = array;
z.next_in_index = 0;
}
int oldNextInIndex = z.next_in_index;
// Configure output.
byte[] out = new byte[in.length << 1];
ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(
compressed.order(), out.length,
ctx.channel().getConfig().getBufferFactory());
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
int maxOutputLength = inputLength << 1;
boolean outHasArray = out.hasArray();
if (!outHasArray) {
z.next_out = new byte[maxOutputLength];
}
loop: for (;;) {
// Decompress 'in' into 'out'
int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
if (z.next_out_index > 0) {
decompressed.writeBytes(out, 0, z.next_out_index);
z.avail_out = out.length;
z.avail_out = maxOutputLength;
if (outHasArray) {
out.ensureWritableBytes(maxOutputLength);
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
} else {
z.next_out_index = 0;
}
int oldNextOutIndex = z.next_out_index;
// Decompress 'in' into 'out'
int resultCode;
try {
resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
} finally {
if (inHasArray) {
in.skipBytes(z.next_in_index - oldNextInIndex);
}
}
int outputLength = z.next_out_index - oldNextOutIndex;
if (outputLength > 0) {
if (outHasArray) {
out.writerIndex(out.writerIndex() + outputLength);
} else {
out.writeBytes(z.next_out, 0, outputLength);
}
}
z.next_out_index = 0;
switch (resultCode) {
case JZlib.Z_NEED_DICT:
@ -153,12 +174,6 @@ public class ZlibDecoder extends OneToOneDecoder {
ZlibUtil.fail(z, "decompression failure", resultCode);
}
}
if (decompressed.writerIndex() != 0) { // readerIndex is always 0
return decompressed;
} else {
return null;
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack

View File

@ -15,29 +15,25 @@
*/
package io.netty.handler.codec.compression;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.StreamToStreamEncoder;
import io.netty.util.internal.jzlib.JZlib;
import io.netty.util.internal.jzlib.ZStream;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Compresses a {@link ChannelBuffer} using the deflate algorithm.
* @apiviz.landmark
* @apiviz.has io.netty.handler.codec.compression.ZlibWrapper
*/
public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler {
public class ZlibEncoder extends StreamToStreamEncoder {
private static final byte[] EMPTY_ARRAY = new byte[0];
@ -247,11 +243,19 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
}
public ChannelFuture close() {
return close(ctx().channel().newFuture());
}
public ChannelFuture close(ChannelFuture future) {
return finishEncode(ctx(), future);
}
private ChannelHandlerContext ctx() {
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException("not added to a pipeline");
}
return finishEncode(ctx, null);
return ctx;
}
public boolean isClosed() {
@ -259,39 +263,65 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer) || finished.get()) {
return msg;
public void encode(ChannelOutboundHandlerContext<Byte> ctx,
ChannelBuffer in, ChannelBuffer out) throws Exception {
if (finished.get()) {
return;
}
ChannelBuffer result;
synchronized (z) {
try {
// Configure input.
ChannelBuffer uncompressed = (ChannelBuffer) msg;
byte[] in = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
int inputLength = in.readableBytes();
boolean inHasArray = in.hasArray();
z.avail_in = inputLength;
if (inHasArray) {
z.next_in = in.array();
z.next_in_index = in.arrayOffset() + in.readerIndex();
} else {
byte[] array = new byte[inputLength];
in.readBytes(array);
z.next_in = array;
z.next_in_index = 0;
}
int oldNextInIndex = z.next_in_index;
// Configure output.
byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12;
boolean outHasArray = out.hasArray();
z.avail_out = maxOutputLength;
if (outHasArray) {
out.ensureWritableBytes(maxOutputLength);
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
} else {
byte[] array = new byte[maxOutputLength];
z.next_out = array;
z.next_out_index = 0;
}
int oldNextOutIndex = z.next_out_index;
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
int resultCode;
try {
resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
} finally {
if (inHasArray) {
in.skipBytes(z.next_in_index - oldNextInIndex);
}
}
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "compression failure", resultCode);
}
if (z.next_out_index != 0) {
result = ctx.channel().getConfig().getBufferFactory().getBuffer(
uncompressed.order(), out, 0, z.next_out_index);
} else {
result = ChannelBuffers.EMPTY_BUFFER;
int outputLength = z.next_out_index - oldNextOutIndex;
if (outputLength > 0) {
if (outHasArray) {
out.writerIndex(out.writerIndex() + outputLength);
} else {
out.writeBytes(z.next_out, 0, outputLength);
}
}
} finally {
// Deference the external references explicitly to tell the VM that
@ -302,39 +332,39 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
z.next_out = null;
}
}
return result;
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
finishEncode(ctx, evt);
return;
}
public void disconnect(
final ChannelOutboundHandlerContext<Byte> ctx,
final ChannelFuture future) throws Exception {
finishEncode(ctx, ctx.newFuture()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.disconnect(future);
}
}
super.handleDownstream(ctx, evt);
});
}
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
if (!finished.compareAndSet(false, true)) {
if (evt != null) {
ctx.sendDownstream(evt);
@Override
public void close(
final ChannelOutboundHandlerContext<Byte> ctx,
final ChannelFuture future) throws Exception {
finishEncode(ctx, ctx.newFuture()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(future);
}
return Channels.succeededFuture(ctx.channel());
});
}
private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelFuture future) {
if (!finished.compareAndSet(false, true)) {
future.setSuccess();
return future;
}
ChannelBuffer footer;
ChannelFuture future;
synchronized (z) {
try {
// Configure input.
@ -351,20 +381,11 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
// Write the ADLER32 checksum (stream footer).
int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
future = Channels.failedFuture(
ctx.channel(),
ZlibUtil.exception(z, "compression failure", resultCode));
footer = null;
future.setFailure(ZlibUtil.exception(z, "compression failure", resultCode));
return future;
} else if (z.next_out_index != 0) {
future = Channels.future(ctx.channel());
footer =
ctx.channel().getConfig().getBufferFactory().getBuffer(
out, 0, z.next_out_index);
footer = ChannelBuffers.wrappedBuffer(out, 0, z.next_out_index);
} else {
// Note that we should never use a SucceededChannelFuture
// here just in case any downstream handler or a sink wants
// to notify a write error.
future = Channels.future(ctx.channel());
footer = ChannelBuffers.EMPTY_BUFFER;
}
} finally {
@ -379,19 +400,7 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
}
}
if (footer != null) {
Channels.write(ctx, future, footer);
}
if (evt != null) {
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.sendDownstream(evt);
}
});
}
ctx.write(footer, future);
return future;
}
@ -399,19 +408,4 @@ public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChanne
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Unused
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
}

View File

@ -15,96 +15,44 @@
*/
package io.netty.handler.codec.embedder;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import java.lang.reflect.Array;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.Queue;
import io.netty.channel.Channels;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineException;
import io.netty.channel.ChannelSink;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.DefaultChannelPipeline;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
/**
* A skeletal {@link CodecEmbedder} implementation.
*/
abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
private final Channel channel;
private final ChannelPipeline pipeline;
private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
private static final EventLoop loop = new EmbeddedEventLoop();
final Queue<Object> productQueue = new LinkedList<Object>();
private final Queue<Object> productQueue = new LinkedList<Object>();
private final Channel channel = new EmbeddedChannel(productQueue);
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
protected AbstractCodecEmbedder(ChannelHandler... handlers) {
pipeline = new EmbeddedChannelPipeline();
configurePipeline(handlers);
channel = new EmbeddedChannel(pipeline, sink);
fireInitialEvents();
}
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*
* @param bufferFactory the {@link ChannelBufferFactory} to be used when
* creating a new buffer.
*/
protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
this(handlers);
getChannel().getConfig().setBufferFactory(bufferFactory);
}
private void fireInitialEvents() {
// Fire the typical initial events.
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress());
}
private void configurePipeline(ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
if (handlers.length == 0) {
throw new IllegalArgumentException(
"handlers should contain at least one " +
ChannelHandler.class.getSimpleName() + '.');
}
for (int i = 0; i < handlers.length; i ++) {
ChannelHandler h = handlers[i];
if (h == null) {
throw new NullPointerException("handlers[" + i + "]");
}
pipeline.addLast(String.valueOf(i), handlers[i]);
}
pipeline.addLast("SINK", sink);
channel.pipeline().addLast(handlers);
channel.pipeline().addLast(new LastHandler());
loop.register(channel);
}
@Override
public boolean finish() {
close(channel);
fireChannelDisconnected(channel);
fireChannelUnbound(channel);
fireChannelClosed(channel);
channel.pipeline().close().syncUninterruptibly();
return !productQueue.isEmpty();
}
@ -112,7 +60,7 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
* Returns the virtual {@link Channel} which will be used as a mock
* during encoding and decoding.
*/
protected final Channel getChannel() {
protected final Channel channel() {
return channel;
}
@ -125,15 +73,27 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
}
@Override
@SuppressWarnings("unchecked")
public final E poll() {
return (E) productQueue.poll();
return product(productQueue.poll());
}
@Override
@SuppressWarnings("unchecked")
public final E peek() {
return (E) productQueue.peek();
return product(productQueue.peek());
}
@SuppressWarnings("unchecked")
private E product(Object p) {
if (p instanceof Throwable) {
if (p instanceof RuntimeException) {
throw (RuntimeException) p;
}
if (p instanceof Error) {
throw (Error) p;
}
throw new ChannelException((Throwable) p);
}
return (E) p;
}
@Override
@ -186,73 +146,20 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
}
@Override
public ChannelPipeline getPipeline() {
return pipeline;
public ChannelPipeline pipeline() {
return channel.pipeline();
}
private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
EmbeddedChannelSink() {
private final class LastHandler extends ChannelInboundHandlerAdapter<Object> {
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(productQueue);
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
handleEvent(e);
}
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
handleEvent(e);
}
private void handleEvent(ChannelEvent e) {
if (e instanceof MessageEvent) {
boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
assert offered;
} else if (e instanceof ExceptionEvent) {
throw new CodecEmbedderException(((ExceptionEvent) e).cause());
}
// Swallow otherwise.
}
@Override
public void exceptionCaught(
ChannelPipeline pipeline, ChannelEvent e,
ChannelPipelineException cause) throws Exception {
Throwable actualCause = cause.getCause();
if (actualCause == null) {
actualCause = cause;
}
throw new CodecEmbedderException(actualCause);
}
@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
try {
task.run();
return Channels.succeededFuture(pipeline.channel());
} catch (Throwable t) {
return Channels.failedFuture(pipeline.channel(), t);
}
}
}
private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
EmbeddedChannelPipeline() {
}
@Override
protected void notifyHandlerException(ChannelEvent e, Throwable t) {
while (t instanceof ChannelPipelineException && t.getCause() != null) {
t = t.getCause();
}
if (t instanceof CodecEmbedderException) {
throw (CodecEmbedderException) t;
} else {
throw new CodecEmbedderException(t);
}
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
productQueue.add(cause);
}
}
}

View File

@ -95,5 +95,5 @@ public interface CodecEmbedder<E> {
/**
* Returns the {@link ChannelPipeline} that handles the input.
*/
ChannelPipeline getPipeline();
ChannelPipeline pipeline();
}

View File

@ -15,13 +15,11 @@
*/
package io.netty.handler.codec.embedder;
import static io.netty.channel.Channels.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.handler.codec.base64.Base64Decoder;
import io.netty.handler.codec.string.StringDecoder;
@ -51,24 +49,20 @@ public class DecoderEmbedder<E> extends AbstractCodecEmbedder<E> {
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
public DecoderEmbedder(ChannelUpstreamHandler... handlers) {
public DecoderEmbedder(ChannelHandler... handlers) {
super(handlers);
}
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*
* @param bufferFactory the {@link ChannelBufferFactory} to be used when
* creating a new buffer.
*/
public DecoderEmbedder(ChannelBufferFactory bufferFactory, ChannelUpstreamHandler... handlers) {
super(bufferFactory, handlers);
}
@Override
public boolean offer(Object input) {
fireMessageReceived(getChannel(), input);
return !super.isEmpty();
ChannelBufferHolder<Object> in = pipeline().nextIn();
if (in.hasByteBuffer()) {
in.byteBuffer().writeBytes((ChannelBuffer) input);
} else {
in.messageBuffer().add(input);
}
pipeline().fireInboundBufferUpdated();
return !isEmpty();
}
}

View File

@ -15,53 +15,131 @@
*/
package io.netty.handler.codec.embedder;
import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Queue;
/**
* TODO Make EmbeddedChannel implement ChannelConfig and ChannelSink to reduce overhead.
* TODO Do not extend AbstractChannel to reduce overhead and remove the internal-use-only constructor in AbstractChannel.
*/
class EmbeddedChannel extends AbstractChannel {
private static final Integer DUMMY_ID = Integer.valueOf(0);
private final ChannelConfig config;
private final ChannelConfig config = new DefaultChannelConfig();
private final ChannelBufferHolder<?> firstOut = ChannelBufferHolders.byteBuffer();
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> productQueue;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
private final java.nio.channels.Channel javaChannel = new java.nio.channels.Channel() {
@Override
public boolean isOpen() {
return state < 2;
}
EmbeddedChannel(ChannelPipeline pipeline, ChannelSink sink) {
super(DUMMY_ID, null, EmbeddedChannelFactory.INSTANCE, pipeline, sink);
config = new DefaultChannelConfig();
@Override
public void close() throws IOException {
// NOOP
}
};
EmbeddedChannel(Queue<Object> productQueue) {
super(null, null);
this.productQueue = productQueue;
}
@Override
public ChannelConfig getConfig() {
public ChannelConfig config() {
return config;
}
@Override
public SocketAddress getLocalAddress() {
return localAddress;
public boolean isActive() {
return state == 1;
}
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EmbeddedEventLoop;
}
@Override
public boolean isBound() {
protected java.nio.channels.Channel javaChannel() {
return javaChannel;
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) firstOut;
}
@Override
protected SocketAddress localAddress0() {
return isActive()? localAddress : null;
}
@Override
protected SocketAddress remoteAddress0() {
return isActive()? remoteAddress : null;
}
@Override
protected void doRegister() throws Exception {
state = 1;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// NOOP
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
return true;
}
@Override
public boolean isConnected() {
return true;
protected void doFinishConnect() throws Exception {
// NOOP
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
state = 2;
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doRead() throws Exception {
return 0;
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
ChannelBuffer buf = firstOut().byteBuffer();
int length = buf.readableBytes();
if (length > 0) {
productQueue.add(buf.readBytes(length));
}
return length;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
/**
*/
final class EmbeddedChannelFactory implements ChannelFactory {
static final ChannelFactory INSTANCE = new EmbeddedChannelFactory();
private EmbeddedChannelFactory() {
}
@Override
public Channel newChannel(ChannelPipeline pipeline) {
throw new UnsupportedOperationException();
}
@Override
public void releaseExternalResources() {
// No external resources
}
}

View File

@ -0,0 +1,88 @@
package io.netty.handler.codec.embedder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class EmbeddedEventLoop extends AbstractExecutorService implements
EventLoop {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void shutdown() {
// NOOP
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
return false;
}
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, channel.newFuture());
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
channel.unsafe().register(this, future);
return future;
}
@Override
public boolean inEventLoop() {
return true;
}
}

View File

@ -15,11 +15,8 @@
*/
package io.netty.handler.codec.embedder;
import static io.netty.channel.Channels.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.base64.Base64Encoder;
import io.netty.handler.codec.string.StringEncoder;
@ -51,24 +48,13 @@ public class EncoderEmbedder<E> extends AbstractCodecEmbedder<E> {
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
public EncoderEmbedder(ChannelDownstreamHandler... handlers) {
public EncoderEmbedder(ChannelHandler... handlers) {
super(handlers);
}
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*
* @param bufferFactory the {@link ChannelBufferFactory} to be used when
* creating a new buffer.
*/
public EncoderEmbedder(ChannelBufferFactory bufferFactory, ChannelDownstreamHandler... handlers) {
super(bufferFactory, handlers);
}
@Override
public boolean offer(Object input) {
write(getChannel(), input).setSuccess();
channel().write(input);
return !isEmpty();
}
}

View File

@ -1,366 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.frame;
import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.replay.ReplayingDecoder;
/**
* Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
* <p>
* In a stream-based transport such as TCP/IP, packets can be fragmented and
* reassembled during transmission even in a LAN environment. For example,
* let us assume you have received three packets:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
* because of the packet fragmentation, a server can receive them like the
* following:
* <pre>
* +----+-------+---+---+
* | AB | CDEFG | H | I |
* +----+-------+---+---+
* </pre>
* <p>
* {@link FrameDecoder} helps you defrag the received packets into one or more
* meaningful <strong>frames</strong> that could be easily understood by the
* application logic. In case of the example above, your {@link FrameDecoder}
* implementation could defrag the received packets like the following:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
* <p>
* The following code shows an example handler which decodes a frame whose
* first 4 bytes header represents the length of the frame, excluding the
* header.
* <pre>
* MESSAGE FORMAT
* ==============
*
* Offset: 0 4 (Length + 4)
* +--------+------------------------+
* Fields: | Length | Actual message content |
* +--------+------------------------+
*
* DECODER IMPLEMENTATION
* ======================
*
* public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
*
* {@code @Override}
* protected Object decode({@link ChannelHandlerContext} ctx,
* {@link Channel channel},
* {@link ChannelBuffer} buf) throws Exception {
*
* // Make sure if the length field was received.
* if (buf.readableBytes() &lt; 4) {
* // The length field was not received yet - return null.
* // This method will be invoked again when more packets are
* // received and appended to the buffer.
* return <strong>null</strong>;
* }
*
* // The length field is in the buffer.
*
* // Mark the current buffer position before reading the length field
* // because the whole frame might not be in the buffer yet.
* // We will reset the buffer position to the marked position if
* // there's not enough bytes in the buffer.
* buf.markReaderIndex();
*
* // Read the length field.
* int length = buf.readInt();
*
* // Make sure if there's enough bytes in the buffer.
* if (buf.readableBytes() &lt; length) {
* // The whole bytes were not received yet - return null.
* // This method will be invoked again when more packets are
* // received and appended to the buffer.
*
* // Reset to the marked position to read the length field again
* // next time.
* buf.resetReaderIndex();
*
* return <strong>null</strong>;
* }
*
* // There's enough bytes in the buffer. Read it.
* {@link ChannelBuffer} frame = buf.readBytes(length);
*
* // Successfully decoded a frame. Return the decoded frame.
* return <strong>frame</strong>;
* }
* }
* </pre>
*
* <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
* <p>
* Please note that you can return an object of a different type than
* {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
* implementation. For example, you could return a
* <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
* {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
* contains a POJO rather than a {@link ChannelBuffer}.
*
* <h3>Replacing a decoder with another decoder in a pipeline</h3>
* <p>
* If you are going to write a protocol multiplexer, you will probably want to
* replace a {@link FrameDecoder} (protocol detector) with another
* {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
* It is not possible to achieve this simply by calling
* {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
* some additional steps are required:
* <pre>
* public class FirstDecoder extends {@link FrameDecoder} {
*
* public FirstDecoder() {
* super(true); // Enable unfold
* }
*
* {@code @Override}
* protected Object decode({@link ChannelHandlerContext} ctx,
* {@link Channel} channel,
* {@link ChannelBuffer} buf) {
* ...
* // Decode the first message
* Object firstMessage = ...;
*
* // Add the second decoder
* ctx.getPipeline().addLast("second", new SecondDecoder());
*
* // Remove the first decoder (me)
* ctx.getPipeline().remove(this);
*
* if (buf.readable()) {
* // Hand off the remaining data to the second decoder
* return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
* } else {
* // Nothing to hand off
* return firstMessage;
* }
* }
* }
* </pre>
* @apiviz.landmark
*/
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
private final boolean unfold;
private ChannelBuffer cumulation;
protected FrameDecoder() {
this(false);
}
protected FrameDecoder(boolean unfold) {
this.unfold = unfold;
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
ChannelBuffer input = (ChannelBuffer) m;
if (!input.readable()) {
return;
}
if (cumulation == null) {
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.channel(), input, e.getRemoteAddress());
if (input.readable()) {
// seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it
(this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
}
} else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes();
}
cumulation.writeBytes(input);
callDecode(ctx, e.channel(), cumulation, e.getRemoteAddress());
if (!cumulation.readable()) {
this.cumulation = null;
}
}
}
@Override
public void channelDisconnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void channelClosed(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
ctx.sendUpstream(e);
}
/**
* Decodes the received packets so far into a frame.
*
* @param ctx the context of this handler
* @param channel the current channel
* @param buffer the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains
* at least one byte in your decoder implementation.
*
* @return the decoded frame if a full frame was received and decoded.
* {@code null} if there's not enough data in the buffer to decode a frame.
*/
protected abstract Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
/**
* Decodes the received data so far into a frame when the channel is
* disconnected.
*
* @param ctx the context of this handler
* @param channel the current channel
* @param buffer the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains
* at least one byte in your decoder implementation.
*
* @return the decoded frame if a full frame was received and decoded.
* {@code null} if there's not enough data in the buffer to decode a frame.
*/
protected Object decodeLast(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return decode(ctx, channel, buffer);
}
private void callDecode(
ChannelHandlerContext context, Channel channel,
ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) {
int oldReaderIndex = cumulation.readerIndex();
Object frame = decode(context, channel, cumulation);
if (frame == null) {
if (oldReaderIndex == cumulation.readerIndex()) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
} else {
// Previous data has been discarded.
// Probably it is reading on.
continue;
}
} else if (oldReaderIndex == cumulation.readerIndex()) {
throw new IllegalStateException(
"decode() method must read at least one byte " +
"if it returned a frame (caused by: " + getClass() + ")");
}
unfoldAndFireMessageReceived(context, remoteAddress, frame);
}
}
private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result instanceof Iterable<?>) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) {
return;
}
this.cumulation = null;
if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel.
callDecode(ctx, ctx.channel(), cumulation, null);
}
// Call decodeLast() finally. Please note that decodeLast() is
// called even if there's nothing more to read from the buffer to
// notify a user that the connection was closed explicitly.
Object partialFrame = decodeLast(ctx, ctx.channel(), cumulation);
if (partialFrame != null) {
unfoldAndFireMessageReceived(ctx, null, partialFrame);
}
} finally {
ctx.sendUpstream(e);
}
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation.
* Be aware that this MUST be a dynamic buffer. Sub-classes may override
* this to provide a dynamic {@link ChannelBuffer} which has some
* pre-allocated size that better fit their need.
*
* @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation
*/
protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
}
}

View File

@ -1,81 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.oneone;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.frame.FrameDecoder;
/**
* Transforms a received message into another message. Please note that this
* decoder must be used with a proper {@link FrameDecoder} such as
* {@link DelimiterBasedFrameDecoder} or you must implement proper framing
* mechanism by yourself if you are using a stream-based transport such as
* TCP/IP. A typical setup for TCP/IP would be:
* <pre>
* {@link ChannelPipeline} pipeline = ...;
*
* // Decoders
* pipeline.addLast("frameDecoder", new {@link DelimiterBasedFrameDecoder}(80, {@link Delimiters#nulDelimiter()}));
* pipeline.addLast("customDecoder", new {@link OneToOneDecoder}() { ... });
*
* // Encoder
* pipeline.addLast("customEncoder", new {@link OneToOneEncoder}() { ... });
* </pre>
* @apiviz.landmark
*/
public abstract class OneToOneDecoder implements ChannelUpstreamHandler {
/**
* Creates a new instance with the current system character set.
*/
protected OneToOneDecoder() {
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendUpstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object decodedMessage = decode(ctx, e.channel(), originalMessage);
if (originalMessage == decodedMessage) {
ctx.sendUpstream(evt);
} else if (decodedMessage != null) {
fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress());
}
}
/**
* Transforms the specified received message into another message and return
* the transformed message. Return {@code null} if the received message
* is supposed to be discarded.
*/
protected abstract Object decode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception;
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.oneone;
import static io.netty.channel.Channels.*;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
/**
* Transforms a write request into another write request. A typical setup for
* TCP/IP would be:
* <pre>
* {@link ChannelPipeline} pipeline = ...;
*
* // Decoders
* pipeline.addLast("frameDecoder", new {@link DelimiterBasedFrameDecoder}(80, {@link Delimiters#nulDelimiter()}));
* pipeline.addLast("customDecoder", new {@link OneToOneDecoder}() { ... });
*
* // Encoder
* pipeline.addLast("customEncoder", new {@link OneToOneEncoder}() { ... });
* </pre>
* @apiviz.landmark
*/
public abstract class OneToOneEncoder implements ChannelDownstreamHandler {
protected OneToOneEncoder() {
}
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.channel(), originalMessage);
if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt);
} else if (encodedMessage != null) {
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
}
/**
* Transforms the specified message into another message and return the
* transformed message. Note that you can not return {@code null}, unlike
* you can in {@link OneToOneDecoder#decode(ChannelHandlerContext, Channel, Object)};
* you must return something, at least {@link ChannelBuffers#EMPTY_BUFFER}.
*/
protected abstract Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception;
}

View File

@ -1,23 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Simplistic abstract classes which help implement encoder and decoder that
* transform an object into another object and vice versa.
*
* @apiviz.exclude \.codec\.(?!oneone)[a-z0-9]+\.
*/
package io.netty.handler.codec.oneone;

View File

@ -24,4 +24,4 @@
* @apiviz.exclude \.codec\.[a-eg-z][a-z0-9]*\.
* @apiviz.exclude \.ssl\.
*/
package io.netty.handler.codec.frame;
package io.netty.handler.codec;

View File

@ -17,15 +17,13 @@ package io.netty.handler.codec.protobuf;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferInputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.frame.LengthFieldPrepender;
import io.netty.handler.codec.oneone.OneToOneDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
@ -64,7 +62,7 @@ import com.google.protobuf.MessageLite;
* @apiviz.landmark
*/
@Sharable
public class ProtobufDecoder extends OneToOneDecoder {
public class ProtobufDecoder extends MessageToMessageDecoder<ChannelBuffer, MessageLite> {
private final MessageLite prototype;
private final ExtensionRegistry extensionRegistry;
@ -85,29 +83,23 @@ public class ProtobufDecoder extends OneToOneDecoder {
}
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer buf = (ChannelBuffer) msg;
if (buf.hasArray()) {
final int offset = buf.readerIndex();
public MessageLite decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
if (msg.hasArray()) {
final int offset = msg.readerIndex();
if (extensionRegistry == null) {
return prototype.newBuilderForType().mergeFrom(
buf.array(), buf.arrayOffset() + offset, buf.readableBytes()).build();
msg.array(), msg.arrayOffset() + offset, msg.readableBytes()).build();
} else {
return prototype.newBuilderForType().mergeFrom(
buf.array(), buf.arrayOffset() + offset, buf.readableBytes(), extensionRegistry).build();
msg.array(), msg.arrayOffset() + offset, msg.readableBytes(), extensionRegistry).build();
}
} else {
if (extensionRegistry == null) {
return prototype.newBuilderForType().mergeFrom(
new ChannelBufferInputStream((ChannelBuffer) msg)).build();
new ChannelBufferInputStream(msg)).build();
} else {
return prototype.newBuilderForType().mergeFrom(
new ChannelBufferInputStream((ChannelBuffer) msg), extensionRegistry).build();
new ChannelBufferInputStream(msg), extensionRegistry).build();
}
}
}

View File

@ -16,16 +16,14 @@
package io.netty.handler.codec.protobuf;
import static io.netty.buffer.ChannelBuffers.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.frame.LengthFieldPrepender;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
@ -60,17 +58,16 @@ import com.google.protobuf.MessageLite;
* @apiviz.landmark
*/
@Sharable
public class ProtobufEncoder extends OneToOneEncoder {
public class ProtobufEncoder extends MessageToMessageEncoder<Object, ChannelBuffer> {
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
public ChannelBuffer encode(ChannelOutboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof MessageLite) {
return wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
return wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
return msg;
return null;
}
}

View File

@ -16,10 +16,9 @@
package io.netty.handler.codec.protobuf;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.CorruptedFrameException;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.StreamToMessageDecoder;
import com.google.protobuf.CodedInputStream;
@ -38,7 +37,7 @@ import com.google.protobuf.CodedInputStream;
*
* @see com.google.protobuf.CodedInputStream
*/
public class ProtobufVarint32FrameDecoder extends FrameDecoder {
public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
// TODO maxFrameLength + safe skip + fail-fast option
// (just like LengthFieldBasedFrameDecoder)
@ -50,27 +49,27 @@ public class ProtobufVarint32FrameDecoder extends FrameDecoder {
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
in.markReaderIndex();
final byte[] buf = new byte[5];
for (int i = 0; i < buf.length; i ++) {
if (!buffer.readable()) {
buffer.resetReaderIndex();
if (!in.readable()) {
in.resetReaderIndex();
return null;
}
buf[i] = buffer.readByte();
buf[i] = in.readByte();
if (buf[i] >= 0) {
int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return null;
} else {
return buffer.readBytes(length);
return in.readBytes(length);
}
}
}

View File

@ -15,14 +15,11 @@
*/
package io.netty.handler.codec.protobuf;
import static io.netty.buffer.ChannelBuffers.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
import com.google.protobuf.CodedOutputStream;
@ -41,7 +38,7 @@ import com.google.protobuf.CodedOutputStream;
* @see com.google.protobuf.CodedOutputStream
*/
@Sharable
public class ProtobufVarint32LengthFieldPrepender extends OneToOneEncoder {
public class ProtobufVarint32LengthFieldPrepender extends MessageToStreamEncoder<ChannelBuffer> {
/**
* Creates a new instance.
@ -50,23 +47,18 @@ public class ProtobufVarint32LengthFieldPrepender extends OneToOneEncoder {
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
public void encode(
ChannelOutboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg, ChannelBuffer out) throws Exception {
ChannelBuffer body = msg;
int bodyLen = body.readableBytes();
int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
out.ensureWritableBytes(headerLen + bodyLen);
ChannelBuffer body = (ChannelBuffer) msg;
int length = body.readableBytes();
ChannelBuffer header =
channel.getConfig().getBufferFactory().getBuffer(
body.order(),
CodedOutputStream.computeRawVarint32Size(length));
CodedOutputStream codedOutputStream = CodedOutputStream
.newInstance(new ChannelBufferOutputStream(header));
codedOutputStream.writeRawVarint32(length);
codedOutputStream.flush();
return wrappedBuffer(header, body);
CodedOutputStream headerOut =
CodedOutputStream.newInstance(new ChannelBufferOutputStream(out));
headerOut.writeRawVarint32(bodyLen);
headerOut.flush();
out.writeBytes(body);
}
}

View File

@ -29,7 +29,7 @@ import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.FrameDecoder;
/**
* A specialized variation of {@link FrameDecoder} which enables implementation

View File

@ -15,7 +15,7 @@
*/
/**
* Specialized variation of {@link io.netty.handler.codec.frame.FrameDecoder}
* Specialized variation of {@link io.netty.handler.codec.FrameDecoder}
* which enables implementation of a non-blocking decoder in the blocking I/O
* paradigm.
*

View File

@ -22,7 +22,7 @@ import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferInputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* A decoder which deserializes the received {@link ChannelBuffer}s into Java

View File

@ -23,9 +23,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.FrameDecoder;
import io.netty.handler.codec.oneone.OneToOneDecoder;
/**

View File

@ -25,8 +25,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.oneone.OneToOneEncoder;
/**

View File

@ -15,13 +15,13 @@
*/
package io.netty.handler.codec.bytes;
import static org.hamcrest.core.Is.is;
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertThat;
import static io.netty.buffer.ChannelBuffers.*;
import static org.hamcrest.core.Is.*;
import static org.junit.Assert.*;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import java.util.Random;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import org.junit.Before;
import org.junit.Test;
@ -35,27 +35,31 @@ public class ByteArrayDecoderTest {
public void setUp() {
embedder = new DecoderEmbedder<byte[]>(new ByteArrayDecoder());
}
@Test
public void testDecode() {
@Test
public void testDecode() {
byte[] b = new byte[2048];
new Random().nextBytes(b);
embedder.offer(wrappedBuffer(b));
assertThat(embedder.poll(), is(b));
}
@Test
public void testDecodeEmpty() {
}
@Test
public void testDecodeEmpty() {
byte[] b = new byte[0];
embedder.offer(wrappedBuffer(b));
assertThat(embedder.poll(), is(b));
}
@Test
public void testDecodeOtherType() {
}
@Test
public void testDecodeOtherType() {
String str = "Meep!";
embedder.offer(str);
assertThat(embedder.poll(), is((Object) str));
}
try {
embedder.poll();
fail();
} catch (ClassCastException e) {
// Expected
}
}
}

View File

@ -15,14 +15,15 @@
*/
package io.netty.handler.codec.bytes;
import static org.hamcrest.core.Is.is;
import static io.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertThat;
import static io.netty.buffer.ChannelBuffers.*;
import static org.hamcrest.core.Is.*;
import static org.hamcrest.core.IsNull.*;
import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import java.util.Random;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import org.junit.Before;
import org.junit.Test;
@ -38,25 +39,30 @@ public class ByteArrayEncoderTest {
}
@Test
public void testDecode() {
public void testEncode() {
byte[] b = new byte[2048];
new Random().nextBytes(b);
embedder.offer(b);
assertThat(embedder.poll(), is(wrappedBuffer(b)));
}
@Test
public void testDecodeEmpty() {
public void testEncodeEmpty() {
byte[] b = new byte[0];
embedder.offer(b);
assertThat(embedder.poll(), is(wrappedBuffer(b)));
assertThat(embedder.poll(), nullValue());
}
@Test
public void testDecodeOtherType() {
public void testEncodeOtherType() {
String str = "Meep!";
embedder.offer(str);
assertThat(embedder.poll(), is((Object) str));
try {
embedder.poll();
fail();
} catch (ClassCastException e) {
// Expected
}
}
}

View File

@ -17,6 +17,9 @@ package io.netty.handler.codec.frame;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.CodecEmbedderException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil;

View File

@ -17,6 +17,8 @@ package io.netty.handler.codec.frame;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.CodecEmbedderException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil;

View File

@ -52,7 +52,7 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) {
return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
return ChannelBufferHolders.byteBuffer();
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.example.echo;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -35,7 +34,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) {
return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
return ChannelBufferHolders.byteBuffer();
}
@Override

View File

@ -20,8 +20,8 @@ import java.math.BigInteger;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.CorruptedFrameException;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.FrameDecoder;
/**
* Decodes the binary representation of a {@link BigInteger} prepended

View File

@ -57,7 +57,7 @@ public class FactorialClient {
// Get the handler instance to retrieve the answer.
FactorialClientHandler handler =
(FactorialClientHandler) channel.getPipeline().last();
(FactorialClientHandler) channel.pipeline().last();
// Print out the answer.
System.err.format(

View File

@ -46,7 +46,7 @@ import io.netty.channel.ExceptionEvent;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.TooLongFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
@ -171,7 +171,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
// Write the content.
ChannelFuture writeFuture;
if (ch.getPipeline().get(SslHandler.class) != null) {
if (ch.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
} else {

View File

@ -52,7 +52,7 @@ public class LocalExample {
// Set up the default server-side event pipeline.
EchoServerHandler handler = new EchoServerHandler();
sb.getPipeline().addLast("handler", handler);
sb.pipeline().addLast("handler", handler);
// Start up the server.
sb.bind(socketAddress);

View File

@ -62,7 +62,7 @@ public class LocalTimeClient {
// Get the handler instance to initiate the request.
LocalTimeClientHandler handler =
channel.getPipeline().get(LocalTimeClientHandler.class);
channel.pipeline().get(LocalTimeClientHandler.class);
// Request and get the response.
List<String> response = handler.getLocalTimes(cities);

View File

@ -26,10 +26,10 @@ import io.netty.example.factorial.FactorialServerHandler;
import io.netty.example.factorial.NumberEncoder;
import io.netty.example.http.snoop.HttpSnoopServerHandler;
import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.FrameDecoder;
import io.netty.handler.codec.compression.ZlibDecoder;
import io.netty.handler.codec.compression.ZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
@ -120,7 +120,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
}
private void enableSsl(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.getPipeline();
ChannelPipeline p = ctx.pipeline();
SSLEngine engine =
SecureChatSslContextFactory.getServerContext().createSSLEngine();
@ -132,7 +132,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
}
private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.getPipeline();
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", new ZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", new ZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(detectSsl, false));
@ -140,7 +140,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
}
private void switchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.getPipeline();
ChannelPipeline p = ctx.pipeline();
p.addLast("decoder", new HttpRequestDecoder());
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("deflater", new HttpContentCompressor());
@ -149,7 +149,7 @@ public class PortUnificationServerHandler extends FrameDecoder {
}
private void switchToFactorial(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.getPipeline();
ChannelPipeline p = ctx.pipeline();
p.addLast("decoder", new BigIntegerDecoder());
p.addLast("encoder", new NumberEncoder());
p.addLast("handler", new FactorialServerHandler());

View File

@ -59,7 +59,7 @@ public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
// Start the connection attempt.
ClientBootstrap cb = new ClientBootstrap(cf);
cb.getPipeline().addLast("handler", new OutboundHandler(e.channel()));
cb.pipeline().addLast("handler", new OutboundHandler(e.channel()));
ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
outboundChannel = f.channel();

View File

@ -48,7 +48,7 @@ public class SecureChatClientHandler extends SimpleChannelUpstreamHandler {
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Get the SslHandler from the pipeline
// which were added in SecureChatPipelineFactory.
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
// Begin handshake.
sslHandler.handshake();

View File

@ -21,8 +21,8 @@ import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ssl.SslHandler;

View File

@ -57,7 +57,7 @@ public class SecureChatServerHandler extends SimpleChannelUpstreamHandler {
// Get the SslHandler in the current pipeline.
// We added it in SecureChatPipelineFactory.
final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
// Get notified when SSL handshake is done.
ChannelFuture handshakeFuture = sslHandler.handshake();

View File

@ -21,8 +21,8 @@ import javax.net.ssl.SSLEngine;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ssl.SslHandler;

View File

@ -29,8 +29,8 @@ import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler;
import io.netty.channel.iostream.IoStreamAddress;
import io.netty.channel.iostream.IoStreamChannelFactory;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

View File

@ -19,8 +19,8 @@ import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

View File

@ -19,8 +19,8 @@ import static io.netty.channel.Channels.*;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

View File

@ -47,7 +47,7 @@ import io.netty.channel.DownstreamMessageEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.frame.FrameDecoder;
import io.netty.handler.codec.FrameDecoder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.NonReentrantLock;

View File

@ -224,7 +224,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.getPipeline().isAttached()) {
if (ctx.pipeline().isAttached()) {
// channelOpen event has been fired already, which means
// this.channelOpen() will not be invoked.
// We have to initialize here instead.

View File

@ -138,7 +138,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.getPipeline().isAttached()) {
if (ctx.pipeline().isAttached()) {
// channelOpen event has been fired already, which means
// this.channelOpen() will not be invoked.
// We have to initialize here instead.

View File

@ -68,8 +68,8 @@ public abstract class AbstractDatagramMulticastTest {
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
MulticastTestHandler mhandler = new MulticastTestHandler();
cb.getPipeline().addFirst("handler", mhandler);
sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
cb.pipeline().addFirst("handler", mhandler);
sb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
int port = TestUtils.getFreePort();

View File

@ -63,7 +63,7 @@ public abstract class AbstractDatagramTest {
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
final CountDownLatch latch = new CountDownLatch(1);
sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() {
sb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler() {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -74,7 +74,7 @@ public abstract class AbstractDatagramTest {
}
});
cb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
cb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
Channel sc = sb.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));

View File

@ -62,7 +62,7 @@ public abstract class AbstractSocketClientBootstrapTest {
public void testFailedConnectionAttempt() throws Exception {
ClientBootstrap bootstrap = new ClientBootstrap();
bootstrap.setFactory(newClientSocketChannelFactory(executor));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption("remoteAddress", new InetSocketAddress("255.255.255.255", 1));
ChannelFuture future = bootstrap.connect();
future.awaitUninterruptibly();
@ -81,7 +81,7 @@ public abstract class AbstractSocketClientBootstrapTest {
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
"remoteAddress",
new InetSocketAddress(
@ -118,7 +118,7 @@ public abstract class AbstractSocketClientBootstrapTest {
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
"remoteAddress",
new InetSocketAddress(
@ -151,7 +151,7 @@ public abstract class AbstractSocketClientBootstrapTest {
ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class);
bootstrap.setPipelineFactory(pipelineFactory);
EasyMock.expect(pipelineFactory.getPipeline()).andThrow(new ChannelPipelineException());
EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException());
EasyMock.replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1));

View File

@ -84,13 +84,13 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("decoder", new CompatibleObjectDecoder());
sb.getPipeline().addLast("encoder", new CompatibleObjectEncoder());
sb.getPipeline().addLast("handler", sh);
sb.pipeline().addLast("decoder", new CompatibleObjectDecoder());
sb.pipeline().addLast("encoder", new CompatibleObjectEncoder());
sb.pipeline().addLast("handler", sh);
cb.getPipeline().addLast("decoder", new CompatibleObjectDecoder());
cb.getPipeline().addLast("encoder", new CompatibleObjectEncoder());
cb.getPipeline().addLast("handler", ch);
cb.pipeline().addLast("decoder", new CompatibleObjectDecoder());
cb.pipeline().addLast("encoder", new CompatibleObjectEncoder());
cb.pipeline().addLast("handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -75,8 +75,8 @@ public abstract class AbstractSocketEchoTest {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addFirst("handler", sh);
cb.getPipeline().addFirst("handler", ch);
sb.pipeline().addFirst("handler", sh);
cb.pipeline().addFirst("handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -37,7 +37,7 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.FixedLengthFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.util.SocketAddresses;
import io.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
@ -76,10 +76,10 @@ public abstract class AbstractSocketFixedLengthEchoTest {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
sb.getPipeline().addAfter("decoder", "handler", sh);
cb.getPipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
cb.getPipeline().addAfter("decoder", "handler", ch);
sb.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
sb.pipeline().addAfter("decoder", "handler", sh);
cb.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
cb.pipeline().addAfter("decoder", "handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -84,15 +84,15 @@ public abstract class AbstractSocketObjectStreamEchoTest {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("decoder", new ObjectDecoder(
sb.pipeline().addLast("decoder", new ObjectDecoder(
ClassResolvers.cacheDisabled(getClass().getClassLoader())));
sb.getPipeline().addLast("encoder", new ObjectEncoder());
sb.getPipeline().addLast("handler", sh);
sb.pipeline().addLast("encoder", new ObjectEncoder());
sb.pipeline().addLast("handler", sh);
cb.getPipeline().addLast("decoder", new ObjectDecoder(
cb.pipeline().addLast("decoder", new ObjectDecoder(
ClassResolvers.cacheDisabled(getClass().getClassLoader())));
cb.getPipeline().addLast("encoder", new ObjectEncoder());
cb.getPipeline().addLast("handler", ch);
cb.pipeline().addLast("encoder", new ObjectEncoder());
cb.pipeline().addLast("handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -121,11 +121,11 @@ public abstract class AbstractSocketServerBootstrapTest {
bootstrap.setOption("child.receiveBufferSize", 9753);
bootstrap.setOption("child.sendBufferSize", 8642);
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.pipeline().addLast("dummy", new DummyHandler());
Channel channel = bootstrap.bind();
ParentChannelHandler pch =
channel.getPipeline().get(ParentChannelHandler.class);
channel.pipeline().get(ParentChannelHandler.class);
Socket socket = null;
try {
@ -183,7 +183,7 @@ public abstract class AbstractSocketServerBootstrapTest {
ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class);
bootstrap.setPipelineFactory(pipelineFactory);
EasyMock.expect(pipelineFactory.getPipeline()).andThrow(new ChannelPipelineException());
EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException());
EasyMock.replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1));

View File

@ -113,14 +113,14 @@ public abstract class AbstractSocketSslEchoTest {
sb.setOption("receiveBufferSize", 1048576);
sb.setOption("receiveBufferSize", 1048576);
sb.getPipeline().addFirst("ssl", new SslHandler(sse));
sb.getPipeline().addLast("handler", sh);
cb.getPipeline().addFirst("ssl", new SslHandler(cse));
cb.getPipeline().addLast("handler", ch);
sb.pipeline().addFirst("ssl", new SslHandler(sse));
sb.pipeline().addLast("handler", sh);
cb.pipeline().addFirst("ssl", new SslHandler(cse));
cb.pipeline().addLast("handler", ch);
if (isExecutorRequired()) {
sb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
cb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
sb.pipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
cb.pipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
}
Channel sc = sb.bind(new InetSocketAddress(0));
@ -137,7 +137,7 @@ public abstract class AbstractSocketSslEchoTest {
assertTrue(ccf.isSuccess());
Channel cc = ccf.channel();
ChannelFuture hf = cc.getPipeline().get(SslHandler.class).handshake();
ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake();
hf.awaitUninterruptibly();
if (!hf.isSuccess()) {
logger.error("Handshake failed", hf.cause());

View File

@ -35,8 +35,8 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.frame.Delimiters;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
@ -86,15 +86,15 @@ public abstract class AbstractSocketStringEchoTest {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
sb.getPipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sb.getPipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sb.getPipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
sb.getPipeline().addAfter("decoder", "handler", sh);
sb.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sb.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sb.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
sb.pipeline().addAfter("decoder", "handler", sh);
cb.getPipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
cb.getPipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
cb.getPipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
cb.getPipeline().addAfter("decoder", "handler", ch);
cb.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
cb.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
cb.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
cb.pipeline().addAfter("decoder", "handler", ch);
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();

View File

@ -39,7 +39,7 @@ public class NioClientSocketShutdownTimeTest {
ClientBootstrap b = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool()));
b.getPipeline().addLast("handler", new DummyHandler());
b.pipeline().addLast("handler", new DummyHandler());
long startTime;
long stopTime;

View File

@ -43,7 +43,7 @@ public class NioServerSocketShutdownTimeTest {
bootstrap.setOption("child.sendBufferSize", 8642);
DummyHandler handler = new DummyHandler();
bootstrap.getPipeline().addLast("dummy", handler);
bootstrap.pipeline().addLast("dummy", handler);
Channel channel = bootstrap.bind();

View File

@ -232,7 +232,7 @@ final class HttpTunnelClientChannel extends AbstractChannel implements
void setTunnelIdForPollChannel() {
HttpTunnelClientPollHandler pollHandler =
pollChannel.getPipeline()
pollChannel.pipeline()
.get(HttpTunnelClientPollHandler.class);
pollHandler.setTunnelId(tunnelId);
}

View File

@ -61,7 +61,7 @@ final class HttpTunnelServerChannel extends AbstractServerChannel implements
realChannel = factory.createRealChannel(this, messageSwitch);
// TODO fix calling of overrideable getPipeline() from constructor
HttpTunnelServerChannelSink sink =
(HttpTunnelServerChannelSink) getPipeline().getSink();
(HttpTunnelServerChannelSink) pipeline().getSink();
sink.setRealChannel(realChannel);
sink.setCloseListener(CLOSE_FUTURE_PROXY);
config = new HttpTunnelServerChannelConfig(realChannel);
@ -107,7 +107,7 @@ final class HttpTunnelServerChannel extends AbstractServerChannel implements
String newTunnelId, InetSocketAddress remoteAddress) {
ChannelPipeline childPipeline = null;
try {
childPipeline = getConfig().getPipelineFactory().getPipeline();
childPipeline = getConfig().getPipelineFactory().pipeline();
} catch (Exception e) {
throw new ChannelPipelineException(
"Failed to initialize a pipeline.", e);

View File

@ -79,7 +79,7 @@ public class FakeServerSocketChannel extends AbstractChannel implements
public FakeSocketChannel acceptNewConnection(
InetSocketAddress remoteAddress, ChannelSink sink) throws Exception {
ChannelPipeline newPipeline =
getConfig().getPipelineFactory().getPipeline();
getConfig().getPipelineFactory().pipeline();
FakeSocketChannel newChannel =
new FakeSocketChannel(this, getFactory(), newPipeline, sink);
newChannel.localAddress = localAddress;

Some files were not shown because too many files have changed in this diff Show More