From 529025d9d59280ad9e2f173e1bd0562a1e111280 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 24 Jul 2017 19:21:54 +0200 Subject: [PATCH] Allow to use oldest Channel out of the Simple / FixedChannelPool on acquire Motivation: We previously used pollLast() to retrieve a Channel from the queue that backs SimpleChannelPool. This could lead to the problem that some Channels are very unfrequently used and so when these are used the connection was already be closed and so could not be reused. Modifications: Allow to configure if the last recent used Channel should be used or the "oldest". Result: More flexible usage of ChannelPools --- .../netty/channel/pool/FixedChannelPool.java | 32 ++++++++++++++++++- .../netty/channel/pool/SimpleChannelPool.java | 20 +++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java index 623cfc2ef9..f3f697c187 100644 --- a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java @@ -151,7 +151,37 @@ public class FixedChannelPool extends SimpleChannelPool { ChannelHealthChecker healthCheck, AcquireTimeoutAction action, final long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) { - super(bootstrap, handler, healthCheck, releaseHealthCheck); + this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, + releaseHealthCheck, true); + } + + /** + * Creates a new instance. + * + * @param bootstrap theĀ {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is + * still healthy when obtain from the {@link ChannelPool} + * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. + * In this case {@param acquireTimeoutMillis} must be {@code -1}. + * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or + * the {@link AcquireTimeoutAction} takes place. + * @param maxConnections the number of maximal active connections, once this is reached new tries to + * acquire a {@link Channel} will be delayed until a connection is returned to the + * pool again. + * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will + * be failed. + * @param releaseHealthCheck will check channel health before offering back if this parameter set to + * {@code true}. + * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. + */ + public FixedChannelPool(Bootstrap bootstrap, + ChannelPoolHandler handler, + ChannelHealthChecker healthCheck, AcquireTimeoutAction action, + final long acquireTimeoutMillis, + int maxConnections, int maxPendingAcquires, + boolean releaseHealthCheck, boolean lastRecentUsed) { + super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); if (maxConnections < 1) { throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); } diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java index 49ec322db1..d8dd0eae94 100644 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java @@ -49,6 +49,7 @@ public class SimpleChannelPool implements ChannelPool { private final ChannelHealthChecker healthCheck; private final Bootstrap bootstrap; private final boolean releaseHealthCheck; + private final boolean lastRecentUsed; /** * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. @@ -84,6 +85,22 @@ public class SimpleChannelPool implements ChannelPool { */ public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck) { + this(bootstrap, handler, healthCheck, releaseHealthCheck, true); + } + + /** + * Creates a new instance. + * + * @param bootstrap theĀ {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is + * still healthy when obtain from the {@link ChannelPool} + * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; + * otherwise, channel health is only checked at acquisition time + * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. + */ + public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, + boolean releaseHealthCheck, boolean lastRecentUsed) { this.handler = checkNotNull(handler, "handler"); this.healthCheck = checkNotNull(healthCheck, "healthCheck"); this.releaseHealthCheck = releaseHealthCheck; @@ -96,6 +113,7 @@ public class SimpleChannelPool implements ChannelPool { handler.channelCreated(ch); } }); + this.lastRecentUsed = lastRecentUsed; } /** @@ -355,7 +373,7 @@ public class SimpleChannelPool implements ChannelPool { * implementations of these methods needs to be thread-safe! */ protected Channel pollChannel() { - return deque.pollLast(); + return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); } /**