Invoke channelAcquired callback on first time channel acquire (#9093)

Motivation:

SimpleChannelPool provides ability to provide custom callbacks/handlers
on major events such as "channel acquired", "channel created" and
"channel released". In the current implementation, when a request to
acquire a channel is made for the first time, the internal channel pool
creates the channel lazily. This triggers the "channel created" callback
but does not invoke the "channel acquired" callback. This is contrary to
caller expectations who assumes that "channel acquired" will be invoked
at the end of every successful acquire call. It also leads to an
inconsistent API experience where the acquired callback is sometimes
invoked and sometimes it isn't depending on wheather the internal
mechanism is creating a new channel or re-using an existing one.

Modifications:

Invoke acquired callback consistenly even when creating a new channel
and modify the tests to support this behaviour

Result:

Consistent experience for the caller of acquire API. Every time they
call the API, the acquired callback will be invoked.
This commit is contained in:
Divij Vaidya 2019-04-29 11:45:49 -07:00 committed by Norman Maurer
parent 1837209a87
commit b9c4e17291
3 changed files with 12 additions and 13 deletions

View File

@ -206,9 +206,10 @@ public class SimpleChannelPool implements ChannelPool {
return promise;
}
private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
private void notifyConnect(ChannelFuture future, Promise<Channel> promise) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
handler.channelAcquired(channel);
if (!promise.trySuccess(channel)) {
// Promise was completed in the meantime (like cancelled), just release the channel again
release(channel);

View File

@ -20,10 +20,10 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
import io.netty.util.concurrent.Future;
@ -43,7 +43,7 @@ public class FixedChannelPoolTest {
@BeforeClass
public static void createEventLoop() {
group = new LocalEventLoopGroup();
group = new DefaultEventLoopGroup();
}
@AfterClass
@ -88,7 +88,7 @@ public class FixedChannelPoolTest {
assertSame(channel, channel2);
assertEquals(1, handler.channelCount());
assertEquals(1, handler.acquiredCount());
assertEquals(2, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().syncUninterruptibly();

View File

@ -20,16 +20,14 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.Future;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
@ -41,7 +39,7 @@ public class SimpleChannelPoolTest {
@Test
public void testAcquire() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
@ -82,7 +80,7 @@ public class SimpleChannelPoolTest {
assertFalse(channel.isActive());
}
assertEquals(1, handler.acquiredCount());
assertEquals(2, handler.acquiredCount());
assertEquals(2, handler.releasedCount());
sc.close().sync();
@ -91,7 +89,7 @@ public class SimpleChannelPoolTest {
@Test
public void testBoundedChannelPoolSegment() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
@ -139,7 +137,7 @@ public class SimpleChannelPoolTest {
channel2.close().sync();
assertEquals(2, handler.channelCount());
assertEquals(0, handler.acquiredCount());
assertEquals(2, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().sync();
channel.close().sync();
@ -154,7 +152,7 @@ public class SimpleChannelPoolTest {
*/
@Test
public void testUnhealthyChannelIsNotOffered() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
@ -200,7 +198,7 @@ public class SimpleChannelPoolTest {
*/
@Test
public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);