Remove support from deregister a Channel from a EventLoop manually

This commit is contained in:
bgallagher 2013-08-23 14:52:52 -04:00 committed by Norman Maurer
parent 2ffdd92b56
commit c149f4bcc0
26 changed files with 188 additions and 363 deletions

View File

@ -88,7 +88,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (decoder != null) {
decoder.cleanFiles();
}

View File

@ -68,13 +68,9 @@ public class UptimeClientHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
println("Disconnected from: " + ctx.channel().remoteAddress());
}
@Override
public void channelUnregistered(final ChannelHandlerContext ctx)
throws Exception {
println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + 's');
final EventLoop loop = ctx.channel().eventLoop();

View File

@ -200,15 +200,6 @@ public class LoggingHandler extends ChannelDuplexHandler {
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "UNREGISTERED"));
}
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
@ -282,15 +273,6 @@ public class LoggingHandler extends ChannelDuplexHandler {
super.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx,
ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DEREGISTER()"));
}
super.deregister(ctx, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logMessage(ctx, "RECEIVED", msg);

View File

@ -44,6 +44,7 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@ -58,6 +59,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
* &middot; TLS</a> and StartTLS support to a {@link Channel}. Please refer
@ -370,11 +372,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Override
public void disconnect(final ChannelHandlerContext ctx,
final ChannelPromise promise) throws Exception {

View File

@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* A skeletal {@link Channel} implementation.
@ -66,6 +67,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private boolean strValActive;
private String strVal;
private static final AtomicReferenceFieldUpdater<AbstractChannel, EventLoop> EVENT_LOOP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractChannel.class, EventLoop.class, "eventLoop");
/**
* Creates a new instance.
*
@ -177,11 +181,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
@ -213,11 +212,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public Channel read() {
pipeline.read();
@ -395,17 +389,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
promise.setFailure(new IllegalStateException("incompatible event loop type: " +
eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (!AbstractChannel.EVENT_LOOP_UPDATER.compareAndSet(AbstractChannel.this, null, eventLoop)) {
return;
}
if (eventLoop.inEventLoop()) {
register0(promise);
@ -560,7 +553,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
});
}
deregister(voidPromise());
deregister();
}
}
@ -573,10 +566,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public final void deregister(final ChannelPromise promise) {
private void deregister() {
if (!registered) {
promise.setSuccess();
return;
}
@ -587,18 +578,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} finally {
if (registered) {
registered = false;
promise.setSuccess();
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
}
});
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
promise.setSuccess();
}
}
}

View File

@ -222,13 +222,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
@Deprecated
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.

View File

@ -73,17 +73,6 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
ctx.close(future);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
ctx.deregister(future);
}
/**
* Calls {@link ChannelHandlerContext#read()} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.

View File

@ -160,10 +160,6 @@ public interface ChannelHandlerContext
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
@Deprecated
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();

View File

@ -16,8 +16,8 @@
package io.netty.channel;
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user to hook in to state changes
* easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
@ -26,14 +26,6 @@ public interface ChannelInboundHandler extends ChannelHandler {
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*
* @deprecated use {@link #channelInactive(ChannelHandlerContext)}
*/
@Deprecated
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/

View File

@ -16,7 +16,7 @@
package io.netty.channel;
/**
* Abstract base class for {@link ChannelInboundHandler} implementations which provide
* Abstract base class for {@link ChannelInboundHandler} implementations that provides
* implementations of all of their methods.
*
* <p>
@ -42,17 +42,6 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
ctx.fireChannelRegistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.

View File

@ -30,16 +30,6 @@ interface ChannelInboundInvoker {
*/
ChannelInboundInvoker fireChannelRegistered();
/**
* A {@link Channel} was unregistered from its {@link EventLoop}.
*
* This will result in having the {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method
* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
@Deprecated
ChannelInboundInvoker fireChannelUnregistered();
/**
* A {@link Channel} is active now, which means it is connected.
*

View File

@ -62,15 +62,6 @@ public interface ChannelOutboundHandler extends ChannelHandler {
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/

View File

@ -71,17 +71,6 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
ctx.close(promise);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
/**
* Calls {@link ChannelHandlerContext#read()} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.ConnectException;
import java.net.SocketAddress;
@ -88,20 +86,6 @@ interface ChannelOutboundInvoker {
*/
ChannelFuture close();
/**
* Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*
*/
@Deprecated
ChannelFuture deregister();
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
@ -175,21 +159,6 @@ interface ChannelOutboundInvoker {
*/
ChannelFuture close(ChannelPromise promise);
/**
* Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
@Deprecated
ChannelFuture deregister(ChannelPromise promise);
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was

View File

@ -138,7 +138,6 @@ import java.util.NoSuchElementException;
* <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
* <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
* </ul>
* </li>
* <li>Outbound event propagation methods:
@ -150,7 +149,6 @@ import java.util.NoSuchElementException;
* <li>{@link ChannelHandlerContext#read()}</li>
* <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
* </ul>
* </li>
* </ul>
@ -597,10 +595,6 @@ public interface ChannelPipeline
@Override
ChannelPipeline fireChannelRegistered();
@Override
@Deprecated
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();

View File

@ -118,11 +118,6 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
inboundHandler.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelActive(ctx);
@ -178,11 +173,6 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
outboundHandler.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
outboundHandler.read(ctx);

View File

@ -160,31 +160,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelUnregistered();
}
});
}
return this;
}
private void invokeChannelUnregistered() {
try {
((ChannelInboundHandler) handler).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
@ -418,11 +393,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return close(newPromise());
}
@Override
public ChannelFuture deregister() {
return deregister(newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
@ -558,34 +528,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeDeregister(promise);
}
});
}
return promise;
}
private void invokeDeregister(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelHandlerContext read() {
final DefaultChannelHandlerContext next = findContextOutbound();

View File

@ -731,17 +731,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
@Override
public ChannelPipeline fireChannelUnregistered() {
head.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
teardownAll();
}
return this;
}
/**
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger
* handlerRemoved(). Note that the tail handler is excluded because it's neither an outbound handler nor it
@ -765,6 +754,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline fireChannelInactive() {
head.fireChannelInactive();
teardownAll();
return this;
}
@ -826,11 +816,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.close();
}
@Override
public ChannelFuture deregister() {
return tail.deregister();
}
@Override
public ChannelPipeline flush() {
tail.flush();
@ -862,11 +847,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.close(promise);
}
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise);
}
@Override
public ChannelPipeline read() {
tail.read();
@ -932,9 +912,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@ -1018,11 +995,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();

View File

@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -198,24 +197,4 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
* the operation is done for all channels
*/
ChannelGroupFuture close(ChannelMatcher matcher);
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop}.
* Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
@Deprecated
ChannelGroupFuture deregister();
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given
* {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
@Deprecated
ChannelGroupFuture deregister(ChannelMatcher matcher);
}

View File

@ -170,10 +170,6 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
public ChannelGroupFuture disconnect() {
return disconnect(ChannelMatchers.all());
}
@Override
public ChannelGroupFuture deregister() {
return deregister(ChannelMatchers.all());
}
@Override
public ChannelGroupFuture write(Object message) {
@ -268,29 +264,6 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroupFuture deregister(ChannelMatcher matcher) {
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroup flush(ChannelMatcher matcher) {
for (Channel c: nonServerChannels) {

View File

@ -154,7 +154,7 @@ public class LocalChannel extends AbstractChannel {
if (peer != null) {
state = 2;
peer.remoteAddress = parent().localAddress();
peer.remoteAddress = parent() == null ? null : parent().localAddress();
peer.state = 2;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
@ -219,9 +219,6 @@ public class LocalChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
if (isOpen()) {
unsafe().close(unsafe().voidPromise());
}
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
}

View File

@ -15,21 +15,14 @@
*/
package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.Test;
import static org.junit.Assert.*;
public abstract class AbstractEventLoopTest {
/**
* Test for https://github.com/netty/netty/issues/803
*/
/*
@Test
public void testReregister() {
EventLoopGroup group = newEventLoopGroup();
@ -53,7 +46,6 @@ public abstract class AbstractEventLoopTest {
EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor();
EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor();
future.channel().deregister().awaitUninterruptibly();
Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel();
EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor();
assertNotSame(executor1, executorNew);
@ -66,7 +58,7 @@ public abstract class AbstractEventLoopTest {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
}
*/
protected abstract EventLoopGroup newEventLoopGroup();
protected abstract Class<? extends ServerSocketChannel> newChannel();
}

View File

@ -25,6 +25,8 @@ import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import java.io.UnsupportedEncodingException;
class BaseChannelTest {
private final LoggingHandler loggingHandler;
@ -64,6 +66,18 @@ class BaseChannelTest {
return buf;
}
static Object createTestBuf(String string) throws UnsupportedEncodingException {
byte[] buf = string.getBytes("US-ASCII");
return createTestBuf(buf);
}
static Object createTestBuf(byte[] buf) {
ByteBuf ret = createTestBuf(buf.length);
ret.clear();
ret.writeBytes(buf);
return ret;
}
void assertLog(String expected) {
String actual = this.loggingHandler.getLog();
assertEquals(expected, actual);

View File

@ -21,9 +21,8 @@ import java.util.EnumSet;
final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHandler {
static enum Event { WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, DEREGISTER, READ, WRITABILITY,
HANDLER_ADDED, HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, UNREGISTERED, ACTIVE, INACTIVE,
USER };
static enum Event { WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, READ, WRITABILITY, HANDLER_ADDED,
HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, ACTIVE, INACTIVE, USER };
private StringBuilder log = new StringBuilder();
@ -67,12 +66,6 @@ final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHand
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log(Event.DEREGISTER);
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
log(Event.READ);
@ -106,12 +99,6 @@ final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHand
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log(Event.UNREGISTERED);
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log(Event.ACTIVE);

View File

@ -15,11 +15,17 @@
*/
package io.netty.channel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.LoggingHandler.Event;
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.channels.ClosedChannelException;
import org.junit.Test;
@ -91,4 +97,141 @@ public class ReentrantChannelTest extends BaseChannelTest {
"WRITABILITY: writable=true\n");
}
@Test
public void testWriteFlushPingPong() throws Exception {
LocalAddress addr = new LocalAddress("testWriteFlushPingPong");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
int writeCount;
int flushCount;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (writeCount < 5) {
writeCount++;
ctx.channel().flush();
}
super.write(ctx, msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (flushCount < 5) {
flushCount++;
ctx.channel().write(createTestBuf(2000));
}
super.flush(ctx);
}
});
clientChannel.writeAndFlush(createTestBuf(2000));
clientChannel.close().sync();
assertLog(
"WRITE\n" +
"FLUSH\n" +
"WRITE\n" +
"FLUSH\n" +
"WRITE\n" +
"FLUSH\n" +
"WRITE\n" +
"FLUSH\n" +
"WRITE\n" +
"FLUSH\n" +
"WRITE\n" +
"FLUSH\n" +
"CLOSE\n");
}
@Test
public void testCloseInFlush() throws Exception {
LocalAddress addr = new LocalAddress("testCloseInFlush");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
ctx.channel().close();
}
});
super.write(ctx, msg, promise);
ctx.channel().flush();
}
});
clientChannel.write(createTestBuf(2000)).sync();
clientChannel.closeFuture().sync();
assertLog(
"WRITE\n" +
"FLUSH\n" +
"CLOSE\n");
}
@Test
public void testFlushFailure() throws Exception {
LocalAddress addr = new LocalAddress("testFlushFailure");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
throw new Exception("intentional failure");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
});
try {
clientChannel.writeAndFlush(createTestBuf(2000)).sync();
fail();
} catch (Throwable cce) {
// FIXME: shouldn't this contain the "intentional failure" exception?
assertEquals(ClosedChannelException.class, cce.getClass());
}
clientChannel.closeFuture().sync();
assertLog(
"WRITE\n" +
"CLOSE\n");
}
}

View File

@ -28,11 +28,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Deque;
import java.util.LinkedList;
@ -41,6 +36,12 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class LocalTransportThreadModelTest3 {
enum EventType {
@ -49,7 +50,6 @@ public class LocalTransportThreadModelTest3 {
MESSAGE_RECEIVED_LAST,
INACTIVE,
ACTIVE,
UNREGISTERED,
REGISTERED,
MESSAGE_RECEIVED,
WRITE,
@ -198,14 +198,9 @@ public class LocalTransportThreadModelTest3 {
ch.close().sync();
while (events.peekLast() != EventType.UNREGISTERED) {
Thread.sleep(10);
}
expectedEvents.addFirst(EventType.ACTIVE);
expectedEvents.addFirst(EventType.REGISTERED);
expectedEvents.addLast(EventType.INACTIVE);
expectedEvents.addLast(EventType.UNREGISTERED);
for (;;) {
EventType event = events.poll();
@ -292,11 +287,6 @@ public class LocalTransportThreadModelTest3 {
events.add(EventType.ACTIVE);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.UNREGISTERED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.REGISTERED);