Not fail the promise when a closed Channel is offered back to the ChannelPool

Motivation:

We should not fail the promise when a closed Channel is offereed back to the ChannelPool as we explicit mention that the Channel must always be returned.

Modifications:

- Not fail the promise
- Add test-case

Result:

Fixes [#6831]
This commit is contained in:
Norman Maurer 2017-06-12 11:10:10 +02:00
parent 66c83f7b74
commit 94c0ef3c96
3 changed files with 54 additions and 29 deletions

View File

@ -43,9 +43,6 @@ public class SimpleChannelPool implements ChannelPool {
private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool"); private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)"); new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");
private static final IllegalStateException UNHEALTHY_NON_OFFERED_TO_POOL = ThrowableUtil.unknownStackTrace(
new IllegalStateException("Channel is unhealthy not offering it back to pool"),
SimpleChannelPool.class, "releaseAndOffer(...)");
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque(); private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
private final ChannelPoolHandler handler; private final ChannelPoolHandler handler;
@ -325,9 +322,9 @@ public class SimpleChannelPool implements ChannelPool {
throws Exception { throws Exception {
if (future.getNow()) { //channel turns out to be healthy, offering and releasing it. if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
releaseAndOffer(channel, promise); releaseAndOffer(channel, promise);
} else { //channel ont healthy, just releasing it. } else { //channel not healthy, just releasing it.
handler.channelReleased(channel); handler.channelReleased(channel);
closeAndFail(channel, UNHEALTHY_NON_OFFERED_TO_POOL, promise); promise.setSuccess(null);
} }
} }

View File

@ -28,6 +28,8 @@ import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel; import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction; import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -38,9 +40,22 @@ import static org.junit.Assert.*;
public class FixedChannelPoolTest { public class FixedChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id"; private static final String LOCAL_ADDR_ID = "test.id";
private static EventLoopGroup group;
@BeforeClass
public static void createEventLoop() {
group = new LocalEventLoopGroup();
}
@AfterClass
public static void destroyEventLoop() {
if (group != null) {
group.shutdownGracefully();
}
}
@Test @Test
public void testAcquire() throws Exception { public void testAcquire() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -79,12 +94,10 @@ public class FixedChannelPoolTest {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly(); channel2.close().syncUninterruptibly();
group.shutdownGracefully();
} }
@Test(expected = TimeoutException.class) @Test(expected = TimeoutException.class)
public void testAcquireTimeout() throws Exception { public void testAcquireTimeout() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -114,13 +127,11 @@ public class FixedChannelPoolTest {
} finally { } finally {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();
group.shutdownGracefully();
} }
} }
@Test @Test
public void testAcquireNewConnection() throws Exception { public void testAcquireNewConnection() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -149,7 +160,6 @@ public class FixedChannelPoolTest {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();
channel2.close().syncUninterruptibly(); channel2.close().syncUninterruptibly();
group.shutdownGracefully();
} }
/** /**
@ -158,7 +168,6 @@ public class FixedChannelPoolTest {
*/ */
@Test @Test
public void testAcquireNewConnectionWhen() throws Exception { public void testAcquireNewConnectionWhen() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -188,12 +197,10 @@ public class FixedChannelPoolTest {
assertNotSame(channel1, channel2); assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly(); channel2.close().syncUninterruptibly();
group.shutdownGracefully();
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testAcquireBoundQueue() throws Exception { public void testAcquireBoundQueue() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -224,13 +231,11 @@ public class FixedChannelPoolTest {
} finally { } finally {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();
group.shutdownGracefully();
} }
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testReleaseDifferentPool() throws Exception { public void testReleaseDifferentPool() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -260,13 +265,11 @@ public class FixedChannelPoolTest {
} finally { } finally {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();
group.shutdownGracefully();
} }
} }
@Test @Test
public void testReleaseAfterClosePool() throws Exception { public void testReleaseAfterClosePool() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup(1);
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap(); Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr); cb.remoteAddress(addr);
@ -300,6 +303,34 @@ public class FixedChannelPoolTest {
channel.close().syncUninterruptibly(); channel.close().syncUninterruptibly();
} }
@Test
public void testReleaseClosed() {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
channel.close().syncUninterruptibly();
pool.release(channel).syncUninterruptibly();
sc.close().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override @Override
public void channelCreated(Channel ch) throws Exception { public void channelCreated(Channel ch) throws Exception {

View File

@ -39,9 +39,6 @@ import static org.junit.Assert.*;
public class SimpleChannelPoolTest { public class SimpleChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id"; private static final String LOCAL_ADDR_ID = "test.id";
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test @Test
public void testAcquire() throws Exception { public void testAcquire() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup(); EventLoopGroup group = new LocalEventLoopGroup();
@ -184,16 +181,16 @@ public class SimpleChannelPoolTest {
//first check that when returned healthy then it actually offered back to the pool. //first check that when returned healthy then it actually offered back to the pool.
assertSame(channel1, channel2); assertSame(channel1, channel2);
expectedException.expect(IllegalStateException.class);
channel1.close().syncUninterruptibly(); channel1.close().syncUninterruptibly();
try {
pool.release(channel1).syncUninterruptibly(); pool.release(channel1).syncUninterruptibly();
} finally { Channel channel3 = pool.acquire().syncUninterruptibly().getNow();
//channel1 was not healthy anymore so it should not get acquired anymore.
assertNotSame(channel1, channel3);
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly(); channel3.close().syncUninterruptibly();
group.shutdownGracefully(); group.shutdownGracefully();
} }
}
/** /**
* Tests that if channel was unhealthy it is was offered back to the pool because * Tests that if channel was unhealthy it is was offered back to the pool because