Allow a user use any type as a ReplayingDecoder state / AIO cleanup
- Removed VoidEnum because a user can now specify Void instead - AIO: Prefer discardReadBytes to clear - AIO: Fixed a potential bug where notifyFlushFutures() is not called if flush() was requested with no outbound data
This commit is contained in:
parent
0289dadca4
commit
e157ea1a66
@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.ReplayingDecoder;
|
import io.netty.handler.codec.ReplayingDecoder;
|
||||||
import io.netty.handler.codec.TooLongFrameException;
|
import io.netty.handler.codec.TooLongFrameException;
|
||||||
import io.netty.util.VoidEnum;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decodes {@link ByteBuf}s into {@link WebSocketFrame}s.
|
* Decodes {@link ByteBuf}s into {@link WebSocketFrame}s.
|
||||||
@ -30,7 +29,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* @apiviz.landmark
|
* @apiviz.landmark
|
||||||
* @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame
|
* @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame
|
||||||
*/
|
*/
|
||||||
public class WebSocket00FrameDecoder extends ReplayingDecoder<WebSocketFrame, VoidEnum> {
|
public class WebSocket00FrameDecoder extends ReplayingDecoder<WebSocketFrame, Void> {
|
||||||
|
|
||||||
static final int DEFAULT_MAX_FRAME_SIZE = 16384;
|
static final int DEFAULT_MAX_FRAME_SIZE = 16384;
|
||||||
|
|
||||||
|
@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandler;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.util.Signal;
|
import io.netty.util.Signal;
|
||||||
import io.netty.util.VoidEnum;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A specialized variation of {@link ByteToMessageDecoder} which enables implementation
|
* A specialized variation of {@link ByteToMessageDecoder} which enables implementation
|
||||||
@ -105,7 +104,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* <li>You must keep in mind that {@code decode(..)} method can be called many
|
* <li>You must keep in mind that {@code decode(..)} method can be called many
|
||||||
* times to decode a single message. For example, the following code will
|
* times to decode a single message. For example, the following code will
|
||||||
* not work:
|
* not work:
|
||||||
* <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
|
* <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> {
|
||||||
*
|
*
|
||||||
* private final Queue<Integer> values = new LinkedList<Integer>();
|
* private final Queue<Integer> values = new LinkedList<Integer>();
|
||||||
*
|
*
|
||||||
@ -125,7 +124,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* The correct implementation looks like the following, and you can also
|
* The correct implementation looks like the following, and you can also
|
||||||
* utilize the 'checkpoint' feature which is explained in detail in the
|
* utilize the 'checkpoint' feature which is explained in detail in the
|
||||||
* next section.
|
* next section.
|
||||||
* <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
|
* <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> {
|
||||||
*
|
*
|
||||||
* private final Queue<Integer> values = new LinkedList<Integer>();
|
* private final Queue<Integer> values = new LinkedList<Integer>();
|
||||||
*
|
*
|
||||||
@ -206,7 +205,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* An alternative way to manage the decoder state is to manage it by yourself.
|
* An alternative way to manage the decoder state is to manage it by yourself.
|
||||||
* <pre>
|
* <pre>
|
||||||
* public class IntegerHeaderFrameDecoder
|
* public class IntegerHeaderFrameDecoder
|
||||||
* extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> {
|
* extends {@link ReplayingDecoder}<<strong>{@link Void}</strong>> {
|
||||||
*
|
*
|
||||||
* <strong>private boolean readLength;</strong>
|
* <strong>private boolean readLength;</strong>
|
||||||
* private int length;
|
* private int length;
|
||||||
@ -215,7 +214,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* protected Object decode({@link ChannelHandlerContext} ctx,
|
* protected Object decode({@link ChannelHandlerContext} ctx,
|
||||||
* {@link Channel} channel,
|
* {@link Channel} channel,
|
||||||
* {@link ByteBuf} buf,
|
* {@link ByteBuf} buf,
|
||||||
* {@link VoidEnum} state) throws Exception {
|
* {@link Void} state) throws Exception {
|
||||||
* if (!readLength) {
|
* if (!readLength) {
|
||||||
* length = buf.readInt();
|
* length = buf.readInt();
|
||||||
* <strong>readLength = true;</strong>
|
* <strong>readLength = true;</strong>
|
||||||
@ -241,7 +240,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
|
* {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
|
||||||
* some additional steps are required:
|
* some additional steps are required:
|
||||||
* <pre>
|
* <pre>
|
||||||
* public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> {
|
* public class FirstDecoder extends {@link ReplayingDecoder}<{@link Void}> {
|
||||||
*
|
*
|
||||||
* public FirstDecoder() {
|
* public FirstDecoder() {
|
||||||
* super(true); // Enable unfold
|
* super(true); // Enable unfold
|
||||||
@ -251,7 +250,7 @@ import io.netty.util.VoidEnum;
|
|||||||
* protected Object decode({@link ChannelHandlerContext} ctx,
|
* protected Object decode({@link ChannelHandlerContext} ctx,
|
||||||
* {@link Channel} ch,
|
* {@link Channel} ch,
|
||||||
* {@link ByteBuf} buf,
|
* {@link ByteBuf} buf,
|
||||||
* {@link VoidEnum} state) {
|
* {@link Void} state) {
|
||||||
* ...
|
* ...
|
||||||
* // Decode the first message
|
* // Decode the first message
|
||||||
* Object firstMessage = ...;
|
* Object firstMessage = ...;
|
||||||
@ -272,12 +271,13 @@ import io.netty.util.VoidEnum;
|
|||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
* @param <S>
|
* @param <S>
|
||||||
* the state type; use {@link VoidEnum} if state management is unused
|
* the state type which is usually an {@link Enum}; use {@link Void} if state management is
|
||||||
|
* unused
|
||||||
*
|
*
|
||||||
* @apiviz.landmark
|
* @apiviz.landmark
|
||||||
* @apiviz.has io.netty.handler.codec.UnreplayableOperationException oneway - - throws
|
* @apiviz.has io.netty.handler.codec.UnreplayableOperationException oneway - - throws
|
||||||
*/
|
*/
|
||||||
public abstract class ReplayingDecoder<O, S extends Enum<S>> extends ByteToMessageDecoder<O> {
|
public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
|
||||||
|
|
||||||
static final Signal REPLAY = new Signal(ReplayingDecoder.class.getName() + ".REPLAY");
|
static final Signal REPLAY = new Signal(ReplayingDecoder.class.getName() + ".REPLAY");
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import io.netty.channel.Channel;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.ReplayingDecoder;
|
import io.netty.handler.codec.ReplayingDecoder;
|
||||||
import io.netty.handler.codec.TooLongFrameException;
|
import io.netty.handler.codec.TooLongFrameException;
|
||||||
import io.netty.util.VoidEnum;
|
|
||||||
|
|
||||||
import java.io.ObjectStreamConstants;
|
import java.io.ObjectStreamConstants;
|
||||||
|
|
||||||
@ -32,7 +31,7 @@ import org.jboss.marshalling.Unmarshaller;
|
|||||||
*
|
*
|
||||||
* If you can you should use {@link MarshallingDecoder}.
|
* If you can you should use {@link MarshallingDecoder}.
|
||||||
*/
|
*/
|
||||||
public class CompatibleMarshallingDecoder extends ReplayingDecoder<Object, VoidEnum> {
|
public class CompatibleMarshallingDecoder extends ReplayingDecoder<Object, Void> {
|
||||||
protected final UnmarshallerProvider provider;
|
protected final UnmarshallerProvider provider;
|
||||||
protected final int maxObjectSize;
|
protected final int maxObjectSize;
|
||||||
|
|
||||||
|
@ -21,7 +21,6 @@ import io.netty.buffer.ByteBufIndexFinder;
|
|||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.embedded.EmbeddedByteChannel;
|
import io.netty.channel.embedded.EmbeddedByteChannel;
|
||||||
import io.netty.util.VoidEnum;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -48,7 +47,7 @@ public class ReplayingDecoderTest {
|
|||||||
assertNull(ch.readInbound());
|
assertNull(ch.readInbound());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class LineDecoder extends ReplayingDecoder<ByteBuf, VoidEnum> {
|
private static final class LineDecoder extends ReplayingDecoder<ByteBuf, Void> {
|
||||||
|
|
||||||
LineDecoder() {
|
LineDecoder() {
|
||||||
}
|
}
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2012 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.util;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A place holder {@link Enum} which has no constant.
|
|
||||||
*/
|
|
||||||
public enum VoidEnum {
|
|
||||||
// No state is defined.
|
|
||||||
}
|
|
@ -153,22 +153,24 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
|
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
|
||||||
if (!buf.readable()) {
|
if (flushing) {
|
||||||
notifyFlushFutures();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!flushing) {
|
|
||||||
flushing = true;
|
flushing = true;
|
||||||
|
if (buf.readable()) {
|
||||||
buf.discardReadBytes();
|
buf.discardReadBytes();
|
||||||
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
|
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
|
||||||
|
} else {
|
||||||
|
notifyFlushFutures();
|
||||||
|
flushing = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void beginRead() {
|
private void beginRead() {
|
||||||
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
||||||
if (!byteBuf.readable()) {
|
if (!byteBuf.readable()) {
|
||||||
byteBuf.clear();
|
byteBuf.discardReadBytes();
|
||||||
} else {
|
} else {
|
||||||
expandReadBuffer(byteBuf);
|
expandReadBuffer(byteBuf);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user