Unhealthy channel is not offered back to the pool.

Motivation:
When releasing unhealthy channel back to a pool we don't have to offer it since on acquire it will be discarded anyways.
 Also checking healthiness at release is a good idea so we don't end up having tons of unhealthy channels in the pool(unless they became unhealthy after being offered)

Modifications:
private SimpleChannelPool.offerIfHealthy() method added that is called from SimpleChannelPool.doReleaseChannel(). SimpleChannelPool.offerIfHealthy() offers channel back to pool only if channel is healthy.
Otherwise it throws setFailure exception to the promise.

 Result:
The pool is now much cleaner and not spammed with unhealthy channels.

Added ability to choose if channel health has to be validated on release by passing boolean flag.

Motivation:
Depending on performance preferences and individual use cases sometimes we would like to be able force health check of a channel at release time and do not offer it back to the pool. Other times we would want to just release channel and offer it back to the pool and check health only when we try to acquire that channel from the pool. See more details here: https://github.com/netty/netty/issues/4077#issuecomment-130461684

Modifications:
Future<Void> release(Channel channel, Promise<Void> promise, boolean offerHealthyOnly);
The offerHealthyOnly boolean flag allows developers to choose whether to do channel validation before offering it back to pool or not.
Appropriate modifications made to hierarchy of implementations of ChannelPool. offerHealthyOnly=true will force channel health to be checked before offering back to pool. offerHealthyOnly=false  will ignore channel health check and will just try just offer it back to the pool
 offerHealthyOnly=true by default.

Result:
Channel health check before offer back to pool is controlled by a flag now.

Code changed to satisfy checkstyle requirements.

Motivation:
Code needs to satisfy checkstyle requirements.

Modifications:
 SimpleChannelPool.java:279 line split to be less then 120 characters.
 SimpleChannelPool.java:280:31 space added after '{'
 SimpleChannelPool.java:282:17 space added after '{'
 SimpleChannelPoolTest.java:198 - extra white space line removed.

Result:
Code satisfies checkstyle requirements.

 offerHealthyOnly is passed as a constructor parameter now.

Motivation:
Instead of passing offerHealthyOnly as a method parameter it is better to pass it in as SimpleChannelPool or FixedChannelPool constructor.

Modifications:
 Redundant release method that takes offerHealthyOnly removed from ChannelPool.
 offerHealthyOnly parameter added to constructor for FixedChannelPool and SimpleChannelPool.

Result:
SimpleChannelPool and FixedChannelPool are now take offerHealthyOnly as a constructor parameter. Default behavior is: offerHealthyOnly=true.

Code changed to satisfy checkstyle requirements.

Motivation:
Code needs to satisfy checkstyle requirements.

Modifications:
 SimpleChannelPool.java:84: line made to be no longer then 120 characters.
 SimpleChannelPool.java:237: extra white space line removed.

Result:
Code satisfies checkstyle requirements.

Tests do not need to be too  copled to the code. Exception message should not be validated

Motivation:
We don't need our tests to be too coupled to the code. Exception type validation in tests is just good enough.

Modifications:
Exception validation message removed from SimpleChannelPoolTest.testUnhealthyChannelIsNotOffered() test.

Result:
The SimpleChannelPoolTest test is less coupled to the code now.

Stack trace set to empty for UNHEALTHY_NON_OFFERED_TO_POOL.

Motivation:
We don't need stack trace for UNHEALTHY_NON_OFFERED_TO_POOL.

Modifications:
Added  UNHEALTHY_NON_OFFERED_TO_POOL.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE) to static init block.

Result:
UNHEALTHY_NON_OFFERED_TO_POOL's stack trace set to empty.

Minor code re-factorings.

Motivation:
For better code readability we need to apply several minor code re-factorings.

Modifications:
javadocs true -> {@code true}
offerHealthyOnly variable name changed to releaseHeathCheck
<p/> -> <p> in javadocs
offerHealthyOnly removed from doReleaseChannel as it not needed there.

Result:
Code quality is improved.

Code changed to satisfy checkstyle requirements.

Motivation:
Code needs to satisfy checkstyle requirements.

Modifications:
SimpleChannelPool.java:87: line made to be no longer then 120 characters.

Result:
Code satisfies checkstyle requirements.

Pull request needs to contain only necessary changes

Motivation:
The pull request should not contain unnecessary changes that are not needed as part of required functionality of pull request.

Modifications:
private void doReleaseChannel(final Channel channel, final Promise<Void> promise) - >  private void doReleaseChannel(Channel channel, Promise<Void> promise)

Result:
Pull request contains less unnecessary modifications.
This commit is contained in:
Ivan Bahdanau 2015-08-12 10:22:03 -07:00 committed by Scott Mitchell
parent 2a7318ae03
commit 144f3ec8e9
3 changed files with 192 additions and 9 deletions

View File

@ -121,7 +121,34 @@ public final class FixedChannelPool extends SimpleChannelPool {
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires) {
super(bootstrap, handler, healthCheck);
this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healty when obtain from the {@link ChannelPool}
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
* the {@link AcquireTimeoutAction} takes place.
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
* @param releaseHealthCheck will check channel health before offering back if this parameter set to
* {@code true}.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
super(bootstrap, handler, healthCheck, releaseHealthCheck);
if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
}

View File

@ -43,13 +43,17 @@ import static io.netty.util.internal.ObjectUtil.*;
public class SimpleChannelPool implements ChannelPool {
private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
private static final IllegalStateException FULL_EXCEPTION = new IllegalStateException("ChannelPool full");
private static final IllegalStateException UNHEALTHY_NON_OFFERED_TO_POOL =
new IllegalStateException("Channel is unhealthy not offering it back to pool");
static {
FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
UNHEALTHY_NON_OFFERED_TO_POOL.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
private final ChannelPoolHandler handler;
private final ChannelHealthChecker healthCheck;
private final Bootstrap bootstrap;
private final boolean releaseHealthCheck;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
@ -67,11 +71,27 @@ public class SimpleChannelPool implements ChannelPool {
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healty when obtain from the {@link ChannelPool}
* still healthy when obtain from the {@link ChannelPool}
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
this(bootstrap, handler, healthCheck, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param releaseHealthCheck will offercheck channel health before offering back if this parameter set to
* {@code true}.
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
boolean releaseHealthCheck) {
this.handler = checkNotNull(handler, "handler");
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
this.releaseHealthCheck = releaseHealthCheck;
// Clone the original Bootstrap as we want to set our own handler
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
this.bootstrap.handler(new ChannelInitializer<Channel>() {
@ -183,9 +203,9 @@ public class SimpleChannelPool implements ChannelPool {
}
/**
* Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()},
* sub-classes may override this.
*
* Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
* override this.
* <p>
* The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
*/
protected ChannelFuture connectChannel(Bootstrap bs) {
@ -230,11 +250,10 @@ public class SimpleChannelPool implements ChannelPool {
promise);
} else {
try {
if (offerChannel(channel)) {
handler.channelReleased(channel);
promise.setSuccess(null);
if (releaseHealthCheck) {
doHealthCheckOnRelease(channel, promise);
} else {
closeAndFail(channel, FULL_EXCEPTION, promise);
releaseAndOffer(channel, promise);
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
@ -242,6 +261,46 @@ public class SimpleChannelPool implements ChannelPool {
}
}
private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
final Future<Boolean> f = healthCheck.isHealthy(channel);
if (f.isDone()) {
releaseAndOfferIfHealthy(channel, promise, f);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
releaseAndOfferIfHealthy(channel, promise, f);
}
});
}
}
/**
* Adds the channel back to the pool only if the channel is healty.
* @param channel the channel to put back to the pool
* @param promise offer operation promise.
* @param future the future that contains information fif channel is healthy or not.
* @throws Exception in case when failed to notify handler about release operation.
*/
private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
throws Exception {
if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
releaseAndOffer(channel, promise);
} else { //channel ont healthy, just releasing it.
handler.channelReleased(channel);
closeAndFail(channel, UNHEALTHY_NON_OFFERED_TO_POOL, promise);
}
}
private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
if (offerChannel(channel)) {
handler.channelReleased(channel);
promise.setSuccess(null);
} else {
closeAndFail(channel, FULL_EXCEPTION, promise);
}
}
private static void closeChannel(Channel channel) {
channel.attr(POOL_KEY).getAndSet(null);
channel.close();

View File

@ -25,7 +25,11 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.Future;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
@ -35,6 +39,9 @@ import static org.junit.Assert.*;
public class SimpleChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testAcquire() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
@ -142,4 +149,94 @@ public class SimpleChannelPoolTest {
channel2.close().sync();
group.shutdownGracefully();
}
/**
* Tests that if channel was unhealthy it is not offered back to the pool.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsNotOffered() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
pool.release(channel1).syncUninterruptibly();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//first check that when returned healthy then it actually offered back to the pool.
assertSame(channel1, channel2);
expectedException.expect(IllegalStateException.class);
channel1.close().syncUninterruptibly();
try {
pool.release(channel1).syncUninterruptibly();
} catch (Exception e) {
throw e;
} finally {
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
}
/**
* Tests that if channel was unhealthy it is was offered back to the pool because
* it was requested not to validate channel health on release.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
Future<Void> releaseFuture =
pool.release(channel1, channel1.eventLoop().<Void>newPromise()).syncUninterruptibly();
assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true));
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//verifying that in fact the channel2 is different that means is not pulled from the pool
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
}