From f4d3f81d6ccd0f6ec3081462fca5f9ab5702fad6 Mon Sep 17 00:00:00 2001 From: Frank Barber Date: Tue, 11 Nov 2014 20:14:42 -0800 Subject: [PATCH] Prevent channel re-registration from firing channelActive Motivation: AbstractUnsafe considers two possibilities during channel registration. First, the channel may be an outgoing connection, in which case it will be registered before becoming active. Second, the channel may be an incoming connection in, which case the channel will already be active when it is registered. To handle the second case, AbstractUnsafe checks if the channel is active after registration and calls ChannelPipeline.fireChannelActive() if so. However, if an active channel is deregistered and then re-registered this logic causes a second fireChannelActive() to be invoked. This is unexpected; it is reasonable for handlers to assume that this method will only be invoked once per channel. Modifications: This change introduces a flag into AbstractUnsafe to recognize if this is the first or a subsequent registration. ChannelPipeline.fireChannelActive() is only possible for the first registration. Result: ChannelPipeline.fireChannelActive() is only called once. --- .../io/netty/channel/AbstractChannel.java | 8 +- .../io/netty/channel/AbstractChannelTest.java | 168 ++++++++++++++++++ 2 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 transport/src/test/java/io/netty/channel/AbstractChannelTest.java diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 12536b1650..334253e434 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -402,6 +402,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private RecvByteBufAllocator.Handle recvHandle; private boolean inFlush0; + /** true if the channel has never been registered, false otherwise */ + private boolean neverRegistered = true; @Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { @@ -476,11 +478,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } + boolean firstRegistration = neverRegistered; doRegister(); + neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); - if (isActive()) { + // Only fire a channelActive if the channel has never been registered. This prevents firing + // multiple channel actives if the channel is deregistered and re-registered. + if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java new file mode 100644 index 0000000000..70fa0ac397 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -0,0 +1,168 @@ +/* + * Copyright 2014 The Netty Project + + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import java.net.SocketAddress; + +import org.easymock.Capture; +import org.junit.Test; + +public class AbstractChannelTest { + + @Test + public void ensureInitialRegistrationFiresActive() throws Throwable { + EventLoop eventLoop = createNiceMock(EventLoop.class); + // This allows us to have a single-threaded test + expect(eventLoop.inEventLoop()).andReturn(true).anyTimes(); + + TestChannel channel = new TestChannel(); + ChannelInboundHandler handler = createMock(ChannelInboundHandler.class); + handler.handlerAdded(anyObject(ChannelHandlerContext.class)); expectLastCall(); + Capture throwable = catchHandlerExceptions(handler); + handler.channelRegistered(anyObject(ChannelHandlerContext.class)); + expectLastCall().once(); + handler.channelActive(anyObject(ChannelHandlerContext.class)); + expectLastCall().once(); + replay(handler, eventLoop); + channel.pipeline().addLast(handler); + + registerChannel(eventLoop, channel); + + checkForHandlerException(throwable); + verify(handler); + } + + @Test + public void ensureSubsequentRegistrationDoesNotFireActive() throws Throwable { + EventLoop eventLoop = createNiceMock(EventLoop.class); + // This allows us to have a single-threaded test + expect(eventLoop.inEventLoop()).andReturn(true).anyTimes(); + + TestChannel channel = new TestChannel(); + ChannelInboundHandler handler = createMock(ChannelInboundHandler.class); + handler.handlerAdded(anyObject(ChannelHandlerContext.class)); expectLastCall(); + Capture throwable = catchHandlerExceptions(handler); + handler.channelRegistered(anyObject(ChannelHandlerContext.class)); + expectLastCall().times(2); // Should register twice + handler.channelActive(anyObject(ChannelHandlerContext.class)); + expectLastCall().once(); // Should only fire active once + replay(handler, eventLoop); + channel.pipeline().addLast(handler); + + registerChannel(eventLoop, channel); + channel.unsafe().deregister(new DefaultChannelPromise(channel)); + registerChannel(eventLoop, channel); + + checkForHandlerException(throwable); + verify(handler); + } + + private void registerChannel(EventLoop eventLoop, Channel channel) throws Exception { + DefaultChannelPromise future = new DefaultChannelPromise(channel); + channel.unsafe().register(eventLoop, future); + future.sync(); // Cause any exceptions to be thrown + } + + private Capture catchHandlerExceptions(ChannelInboundHandler handler) throws Exception { + Capture throwable = new Capture(); + handler.exceptionCaught(anyObject(ChannelHandlerContext.class), capture(throwable)); + expectLastCall().anyTimes(); + return throwable; + } + + private void checkForHandlerException(Capture throwable) throws Throwable { + if (throwable.hasCaptured()) { + throw throwable.getValue(); + } + } + + private static class TestChannel extends AbstractChannel { + + private class TestUnsafe extends AbstractUnsafe { + + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { } + } + + public TestChannel() { + super(null); + } + + @Override + public ChannelConfig config() { + return new DefaultChannelConfig(this); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public ChannelMetadata metadata() { + return null; + } + + @Override + protected AbstractUnsafe newUnsafe() { + return new TestUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return true; + } + + @Override + protected SocketAddress localAddress0() { + return null; + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { } + + @Override + protected void doDisconnect() throws Exception { } + + @Override + protected void doClose() throws Exception { } + + @Override + protected void doBeginRead() throws Exception { } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { } + } +}