Prevent channel re-registration from firing channelActive
Motivation: AbstractUnsafe considers two possibilities during channel registration. First, the channel may be an outgoing connection, in which case it will be registered before becoming active. Second, the channel may be an incoming connection in, which case the channel will already be active when it is registered. To handle the second case, AbstractUnsafe checks if the channel is active after registration and calls ChannelPipeline.fireChannelActive() if so. However, if an active channel is deregistered and then re-registered this logic causes a second fireChannelActive() to be invoked. This is unexpected; it is reasonable for handlers to assume that this method will only be invoked once per channel. Modifications: This change introduces a flag into AbstractUnsafe to recognize if this is the first or a subsequent registration. ChannelPipeline.fireChannelActive() is only possible for the first registration. Result: ChannelPipeline.fireChannelActive() is only called once.
This commit is contained in:
parent
dd3036be02
commit
e9bcc518fc
@ -423,6 +423,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
|
||||
private RecvByteBufAllocator.Handle recvHandle;
|
||||
private boolean inFlush0;
|
||||
/** true if the channel has never been registered, false otherwise */
|
||||
private boolean neverRegistered = true;
|
||||
|
||||
@Override
|
||||
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
|
||||
@ -507,12 +509,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
||||
return;
|
||||
}
|
||||
boolean firstRegistration = neverRegistered;
|
||||
doRegister();
|
||||
neverRegistered = false;
|
||||
registered = true;
|
||||
eventLoop.acceptNewTasks();
|
||||
safeSetSuccess(promise);
|
||||
pipeline.fireChannelRegistered();
|
||||
if (isActive()) {
|
||||
// Only fire a channelActive if the channel has never been registered. This prevents firing
|
||||
// multiple channel actives if the channel is deregistered and re-registered.
|
||||
if (firstRegistration && isActive()) {
|
||||
pipeline.fireChannelActive();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
@ -0,0 +1,168 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
import static org.easymock.EasyMock.capture;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.createNiceMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.expectLastCall;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import org.easymock.Capture;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AbstractChannelTest {
|
||||
|
||||
@Test
|
||||
public void ensureInitialRegistrationFiresActive() throws Throwable {
|
||||
EventLoop eventLoop = createNiceMock(EventLoop.class);
|
||||
// This allows us to have a single-threaded test
|
||||
expect(eventLoop.inEventLoop()).andReturn(true).anyTimes();
|
||||
|
||||
TestChannel channel = new TestChannel();
|
||||
ChannelHandler handler = createMock(ChannelHandler.class);
|
||||
handler.handlerAdded(anyObject(ChannelHandlerContext.class)); expectLastCall();
|
||||
Capture<Throwable> throwable = catchHandlerExceptions(handler);
|
||||
handler.channelRegistered(anyObject(ChannelHandlerContext.class));
|
||||
expectLastCall().once();
|
||||
handler.channelActive(anyObject(ChannelHandlerContext.class));
|
||||
expectLastCall().once();
|
||||
replay(handler, eventLoop);
|
||||
channel.pipeline().addLast(handler);
|
||||
|
||||
registerChannel(eventLoop, channel);
|
||||
|
||||
checkForHandlerException(throwable);
|
||||
verify(handler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ensureSubsequentRegistrationDoesNotFireActive() throws Throwable {
|
||||
EventLoop eventLoop = createNiceMock(EventLoop.class);
|
||||
// This allows us to have a single-threaded test
|
||||
expect(eventLoop.inEventLoop()).andReturn(true).anyTimes();
|
||||
|
||||
TestChannel channel = new TestChannel();
|
||||
ChannelHandler handler = createMock(ChannelHandler.class);
|
||||
handler.handlerAdded(anyObject(ChannelHandlerContext.class)); expectLastCall();
|
||||
Capture<Throwable> throwable = catchHandlerExceptions(handler);
|
||||
handler.channelRegistered(anyObject(ChannelHandlerContext.class));
|
||||
expectLastCall().times(2); // Should register twice
|
||||
handler.channelActive(anyObject(ChannelHandlerContext.class));
|
||||
expectLastCall().once(); // Should only fire active once
|
||||
replay(handler, eventLoop);
|
||||
channel.pipeline().addLast(handler);
|
||||
|
||||
registerChannel(eventLoop, channel);
|
||||
channel.unsafe().deregister(new DefaultChannelPromise(channel));
|
||||
registerChannel(eventLoop, channel);
|
||||
|
||||
checkForHandlerException(throwable);
|
||||
verify(handler);
|
||||
}
|
||||
|
||||
private void registerChannel(EventLoop eventLoop, Channel channel) throws Exception {
|
||||
DefaultChannelPromise future = new DefaultChannelPromise(channel);
|
||||
channel.unsafe().register(eventLoop, future);
|
||||
future.sync(); // Cause any exceptions to be thrown
|
||||
}
|
||||
|
||||
private Capture<Throwable> catchHandlerExceptions(ChannelHandler handler) throws Exception {
|
||||
Capture<Throwable> throwable = new Capture<Throwable>();
|
||||
handler.exceptionCaught(anyObject(ChannelHandlerContext.class), capture(throwable));
|
||||
expectLastCall().anyTimes();
|
||||
return throwable;
|
||||
}
|
||||
|
||||
private void checkForHandlerException(Capture<Throwable> throwable) throws Throwable {
|
||||
if (throwable.hasCaptured()) {
|
||||
throw throwable.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestChannel extends AbstractChannel {
|
||||
|
||||
private class TestUnsafe extends AbstractUnsafe {
|
||||
|
||||
@Override
|
||||
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { }
|
||||
}
|
||||
|
||||
public TestChannel() {
|
||||
super(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfig config() {
|
||||
return new DefaultChannelConfig(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelMetadata metadata() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractUnsafe newUnsafe() {
|
||||
return new TestUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress localAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doBeginRead() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception { }
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user