From d74f96432f66fcf39d5bd79038af7b7824a14588 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 30 Apr 2015 12:26:46 +0200 Subject: [PATCH] [#3218] Add ChannelPool / ChannelPoolMap abstraction and implementations Motivation: Many projects need some kind a Channel/Connection pool implementation. While the protocols are different many things can be shared, so we should provide a generic API and implementation. Modifications: Add ChannelPool / ChannelPoolMap API and implementations. Result: Reusable / Generic pool implementation that users can use. --- .../util/internal/PlatformDependent.java | 14 + pom.xml | 2 + .../io/netty/bootstrap/AbstractBootstrap.java | 2 +- .../java/io/netty/bootstrap/Bootstrap.java | 12 + .../pool/AbstractChannelPoolHandler.java | 44 +++ .../channel/pool/AbstractChannelPoolMap.java | 100 +++++ .../channel/pool/ChannelHealthChecker.java | 47 +++ .../io/netty/channel/pool/ChannelPool.java | 56 +++ .../channel/pool/ChannelPoolHandler.java | 48 +++ .../io/netty/channel/pool/ChannelPoolMap.java | 39 ++ .../netty/channel/pool/FixedChannelPool.java | 371 ++++++++++++++++++ .../netty/channel/pool/SimpleChannelPool.java | 278 +++++++++++++ .../io/netty/channel/pool/package-info.java | 20 + .../pool/AbstractChannelPoolMapTest.java | 75 ++++ .../pool/CountingChannelPoolHandler.java | 53 +++ .../channel/pool/FixedChannelPoolTest.java | 233 +++++++++++ .../channel/pool/SimpleChannelPoolTest.java | 145 +++++++ 17 files changed, 1538 insertions(+), 1 deletion(-) create mode 100644 transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java create mode 100644 transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java create mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java create mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPool.java create mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java create mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java create mode 100644 transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java create mode 100644 transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java create mode 100644 transport/src/main/java/io/netty/channel/pool/package-info.java create mode 100644 transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java create mode 100644 transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java create mode 100644 transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java create mode 100644 transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 9d0eec8047..982050d953 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -29,13 +29,16 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.util.Deque; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -447,6 +450,17 @@ public final class PlatformDependent { return PlatformDependent0.getSystemClassLoader(); } + /** + * Returns a new concurrent {@link Deque}. + */ + public static Deque newConcurrentDeque() { + if (javaVersion() < 7) { + return new LinkedBlockingDeque(); + } else { + return new ConcurrentLinkedDeque(); + } + } + private static boolean isAndroid0() { boolean android; try { diff --git a/pom.xml b/pom.xml index 85875c6a4b..d9fe0f6b31 100644 --- a/pom.xml +++ b/pom.xml @@ -966,6 +966,8 @@ javax.net.ssl.SSLEngine javax.net.ssl.X509ExtendedTrustManager + + java.util.concurrent.ConcurrentLinkedDeque diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 6bdb07ce93..de4d789199 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -46,7 +46,7 @@ import java.util.Map; */ public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { - private volatile EventLoopGroup group; + volatile EventLoopGroup group; @SuppressWarnings("deprecation") private volatile ChannelFactory channelFactory; private volatile SocketAddress localAddress; diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 7ffb459a56..83047306d4 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import io.netty.resolver.DefaultNameResolverGroup; import io.netty.resolver.NameResolver; import io.netty.resolver.NameResolverGroup; @@ -281,6 +282,17 @@ public class Bootstrap extends AbstractBootstrap { return new Bootstrap(this); } + /** + * Returns a deep clone of this bootstrap which has the identical configuration except that it uses + * the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar + * settings. + */ + public Bootstrap clone(EventLoopGroup group) { + Bootstrap bs = new Bootstrap(this); + bs.group = group; + return bs; + } + @Override public String toString() { if (remoteAddress == null) { diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java new file mode 100644 index 0000000000..1d688f2996 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.channel.Channel; + +/** + * A skeletal {@link ChannelPoolHandler} implementation. + */ +public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler { + + /** + * NOOP implementation, sub-classes may override this. + * + * {@inheritDoc} + */ + @Override + public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception { + // NOOP + } + + /** + * NOOP implementation, sub-classes may override this. + * + * {@inheritDoc} + */ + @Override + public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception { + // NOOP + } +} diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java new file mode 100644 index 0000000000..4b7213e901 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java @@ -0,0 +1,100 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ReadOnlyIterator; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +/** + * A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool} + * the {@link Object#hashCode()} and {@link Object#equals(Object)} is used. + */ +public abstract class AbstractChannelPoolMap + implements ChannelPoolMap, Iterable>, Closeable { + private final ConcurrentMap map = PlatformDependent.newConcurrentHashMap(); + + @Override + public final P get(K key) { + P pool = map.get(checkNotNull(key, "key")); + if (pool == null) { + pool = newPool(key); + P old = map.putIfAbsent(key, pool); + if (old != null) { + // We need to destroy the newly created pool as we not use it. + pool.close(); + pool = old; + } + } + return pool; + } + /** + * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed, + * {@code false} otherwise. + * + * Please note that {@code null} keys are not allowed. + */ + public final boolean remove(K key) { + P pool = map.remove(checkNotNull(key, "key")); + if (pool != null) { + pool.close(); + return true; + } + return false; + } + + @Override + public final Iterator> iterator() { + return new ReadOnlyIterator>(map.entrySet().iterator()); + } + + /** + * Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}. + */ + public final int size() { + return map.size(); + } + + /** + * Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}. + */ + public final boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public final boolean contains(K key) { + return map.containsKey(checkNotNull(key, "key")); + } + + /** + * Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}. + */ + protected abstract P newPool(K key); + + @Override + public final void close() { + for (K key: map.keySet()) { + remove(key); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java b/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java new file mode 100644 index 0000000000..c8775d48b6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java @@ -0,0 +1,47 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +/** + * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or + * {@link ChannelPool#acquire(Promise)}. + */ +public interface ChannelHealthChecker { + + /** + * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}. + */ + ChannelHealthChecker ACTIVE = new ChannelHealthChecker() { + @Override + public Future isHealthy(Channel channel) { + EventLoop loop = channel.eventLoop(); + return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE); + } + }; + + /** + * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once + * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise. + * + * This method will be called by the {@link EventLoop} of the {@link Channel}. + */ + Future isHealthy(Channel channel); +} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java new file mode 100644 index 0000000000..d9e5d014f2 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.channel.Channel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Allows to acquire and release {@link Channel} and so act as a pool of these. + */ +public interface ChannelPool extends Closeable { + + /** + * Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once + * the acquire is successful and failed otherwise. + */ + Future acquire(); + + /** + * Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once + * the acquire is successful and failed otherwise. + */ + Future acquire(Promise promise); + + /** + * Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once + * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. + */ + Future release(Channel channel); + + /** + * Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once + * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. + */ + Future release(Channel channel, Promise promise); + + @Override + void close(); +} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java new file mode 100644 index 0000000000..0bd078fa67 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Promise; + +/** + * Handler which is called for various actions done by the {@link ChannelPool}. + */ +public interface ChannelPoolHandler { + /** + * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or + * {@link ChannelPool#release(Channel, Promise)}. + * + * This method will be called by the {@link EventLoop} of the {@link Channel}. + */ + void channelReleased(Channel ch) throws Exception; + + /** + * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or + * {@link ChannelPool#acquire(Promise)}. + * + * This method will be called by the {@link EventLoop} of the {@link Channel}. + */ + void channelAcquired(Channel ch) throws Exception; + + /** + * Called once a new {@link Channel} is created in the {@link ChannelPool}. + * + * This method will be called by the {@link EventLoop} of the {@link Channel}. + */ + void channelCreated(Channel ch) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java new file mode 100644 index 0000000000..bdddd117c3 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java @@ -0,0 +1,39 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +/** + * Allows to map {@link ChannelPool} implementations to a specific key. + * + * @param the type of the key + * @param

the type of the {@link ChannelPool} + */ +public interface ChannelPoolMap { + /** + * Return the {@link ChannelPool} for the {@code code}. This will never return {@code null}, + * but create a new {@link ChannelPool} if non exists for they requested {@code key}. + * + * Please note that {@code null} keys are not allowed. + */ + P get(K key); + + /** + * Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}. + * + * Please note that {@code null} keys are not allowed. + */ + boolean contains(K key); +} diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java new file mode 100644 index 0000000000..1066358dd6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java @@ -0,0 +1,371 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.OneTimeTask; + +import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum + * number of concurrent connections. + */ +public final class FixedChannelPool extends SimpleChannelPool { + private static final IllegalStateException FULL_EXCEPTION = + new IllegalStateException("Too many outstanding acquire operations"); + private static final TimeoutException TIMEOUT_EXCEPTION = + new TimeoutException("Acquire operation took longer then configured maximum time"); + + static { + FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); + TIMEOUT_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); + } + + public enum AcquireTimeoutAction { + /** + * Create a new connection when the timeout is detected. + */ + NEW, + + /** + * Fail the {@link Future} of the acquire call with a {@link TimeoutException}. + */ + FAIL + } + + private final EventExecutor executor; + private final long acquireTimeoutNanos; + private final Runnable timeoutTask; + + // There is no need to worry about synchronization as everything that modified the queue or counts is done + // by the above EventExecutor. + private final Queue pendingAcquireQueue = new ArrayDeque(); + private final int maxConnections; + private final int maxPendingAcquires; + private int acquiredChannelCount; + private int pendingAcquireCount; + + /** + * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. + * + * @param bootstrap the {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param maxConnections the numnber of maximal active connections, once this is reached new tries to acquire + * a {@link Channel} will be delayed until a connection is returned to the pool again. + */ + public FixedChannelPool(Bootstrap bootstrap, + ChannelPoolHandler handler, int maxConnections) { + this(bootstrap, handler, maxConnections, Integer.MAX_VALUE); + } + + /** + * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. + * + * @param bootstrap the {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param maxConnections the numnber of maximal active connections, once this is reached new tries to + * acquire a {@link Channel} will be delayed until a connection is returned to the + * pool again. + * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will + * be failed. + */ + public FixedChannelPool(Bootstrap bootstrap, + ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) { + super(bootstrap, handler); + executor = bootstrap.group().next(); + timeoutTask = null; + acquireTimeoutNanos = -1; + this.maxConnections = maxConnections; + this.maxPendingAcquires = maxPendingAcquires; + } + + /** + * Creates a new instance. + * + * @param bootstrap the {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is + * still healty when obtain from the {@link ChannelPool} + * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. + * In this case {@param acquireTimeoutMillis} must be {@code -1}. + * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or + * the {@link AcquireTimeoutAction} takes place. + * @param maxConnections the numnber of maximal active connections, once this is reached new tries to + * acquire a {@link Channel} will be delayed until a connection is returned to the + * pool again. + * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will + * be failed. + */ + public FixedChannelPool(Bootstrap bootstrap, + ChannelPoolHandler handler, + ChannelHealthChecker healthCheck, AcquireTimeoutAction action, + final long acquireTimeoutMillis, + int maxConnections, int maxPendingAcquires) { + super(bootstrap, handler, healthCheck); + if (maxConnections < 1) { + throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); + } + if (maxPendingAcquires < 1) { + throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)"); + } + if (action == null && acquireTimeoutMillis == -1) { + timeoutTask = null; + acquireTimeoutNanos = -1; + } else if (action == null && acquireTimeoutMillis != -1) { + throw new NullPointerException("action"); + } else if (action != null && acquireTimeoutMillis < 0) { + throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 1)"); + } else { + acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); + switch (action) { + case FAIL: + timeoutTask = new TimeoutTask() { + @Override + public void onTimeout(AcquireTask task) { + // Fail the promise as we timed out. + task.promise.setFailure(TIMEOUT_EXCEPTION); + } + }; + break; + case NEW: + timeoutTask = new TimeoutTask() { + @Override + public void onTimeout(AcquireTask task) { + // Increment the acquire count and delegate to super to actually acquire a Channel which will + // create a new connetion. + ++acquiredChannelCount; + + FixedChannelPool.super.acquire(task.promise); + } + }; + break; + default: + throw new Error(); + } + } + executor = bootstrap.group().next(); + this.maxConnections = maxConnections; + this.maxPendingAcquires = maxPendingAcquires; + } + + @Override + public Future acquire(final Promise promise) { + try { + if (executor.inEventLoop()) { + acquire0(promise); + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + acquire0(promise); + } + }); + } + } catch (Throwable cause) { + promise.setFailure(cause); + } + return promise; + } + + private void acquire0(final Promise promise) { + assert executor.inEventLoop(); + + if (acquiredChannelCount < maxConnections) { + ++acquiredChannelCount; + + assert acquiredChannelCount > 0; + + // We need to create a new promise as we need to ensure the AcquireListener runs in the correct + // EventLoop + Promise p = executor.newPromise(); + p.addListener(new AcquireListener(promise)); + super.acquire(p); + } else { + if (pendingAcquireCount >= maxPendingAcquires) { + promise.setFailure(FULL_EXCEPTION); + } else { + AcquireTask task = new AcquireTask(promise); + if (pendingAcquireQueue.offer(task)) { + ++pendingAcquireCount; + + if (timeoutTask != null) { + task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); + } + } else { + promise.setFailure(FULL_EXCEPTION); + } + } + + assert pendingAcquireCount > 0; + } + } + + @Override + public Future release(final Channel channel, final Promise promise) { + final Promise p = executor.newPromise(); + super.release(channel, p.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + assert executor.inEventLoop(); + + if (future.isSuccess()) { + decrementAndRunTaskQueue(); + promise.setSuccess(null); + } else { + Throwable cause = future.cause(); + // Check if the exception was not because of we passed the Channel to the wrong pool. + if (!(cause instanceof IllegalArgumentException)) { + decrementAndRunTaskQueue(); + } + promise.setFailure(future.cause()); + } + } + })); + return p; + } + + private void decrementAndRunTaskQueue() { + --acquiredChannelCount; + + // We should never have a negative value. + assert acquiredChannelCount >= 0; + + // Run the pending acquire tasks before notify the original promise so if the user would + // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= + // maxPendingAcquires we may be able to run some pending tasks first and so allow to add + // more. + runTaskQueue(); + } + + private void runTaskQueue() { + while (acquiredChannelCount <= maxConnections) { + AcquireTask task = pendingAcquireQueue.poll(); + if (task == null) { + break; + } + + // Cancel the timeout if one was scheduled + ScheduledFuture timeoutFuture = task.timeoutFuture; + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } + + --pendingAcquireCount; + ++acquiredChannelCount; + + super.acquire(task.promise); + } + + // We should never have a negative value. + assert pendingAcquireCount >= 0; + assert acquiredChannelCount >= 0; + } + + // AcquireTask extends AcquireListener to reduce object creations and so GC pressure + private final class AcquireTask extends AcquireListener { + final Promise promise; + final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos; + ScheduledFuture timeoutFuture; + + public AcquireTask(Promise promise) { + super(promise); + // We need to create a new promise as we need to ensure the AcquireListener runs in the correct + // EventLoop. + this.promise = executor.newPromise().addListener(this); + } + } + + private abstract class TimeoutTask implements Runnable { + @Override + public final void run() { + assert executor.inEventLoop(); + long nanoTime = System.nanoTime(); + for (;;) { + AcquireTask task = pendingAcquireQueue.peek(); + // Compare nanoTime as descripted in the javadocs of System.nanoTime() + // + // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() + // See https://github.com/netty/netty/issues/3705 + if (task == null || nanoTime - task.expireNanoTime < 0) { + break; + } + pendingAcquireQueue.remove(); + + --pendingAcquireCount; + onTimeout(task); + } + } + + public abstract void onTimeout(AcquireTask task); + } + + private class AcquireListener implements FutureListener { + private final Promise originalPromise; + + AcquireListener(Promise originalPromise) { + this.originalPromise = originalPromise; + } + + @Override + public void operationComplete(Future future) throws Exception { + assert executor.inEventLoop(); + + if (future.isSuccess()) { + originalPromise.setSuccess(future.getNow()); + } else { + // Something went wrong try to run pending acquire tasks. + decrementAndRunTaskQueue(); + originalPromise.setFailure(future.cause()); + } + } + } + + @Override + public void close() { + executor.execute(new OneTimeTask() { + @Override + public void run() { + for (;;) { + AcquireTask task = pendingAcquireQueue.poll(); + if (task == null) { + break; + } + ScheduledFuture f = task.timeoutFuture; + if (f != null) { + f.cancel(false); + } + task.promise.setFailure(new ClosedChannelException()); + } + acquiredChannelCount = 0; + pendingAcquireCount = 0; + FixedChannelPool.super.close(); + } + }); + } +} diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java new file mode 100644 index 0000000000..bbef73ea42 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java @@ -0,0 +1,278 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoop; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.PlatformDependent; + +import java.util.Deque; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +/** + * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire + * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. + * + * This implementation uses FIFO order for {@link Channel}s in the {@link ChannelPool}. + * + */ +public class SimpleChannelPool implements ChannelPool { + private static final AttributeKey POOL_KEY = AttributeKey.newInstance("channelPool"); + private static final IllegalStateException FULL_EXCEPTION = new IllegalStateException("ChannelPool full"); + static { + FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); + } + private final Deque deque = PlatformDependent.newConcurrentDeque(); + private final ChannelPoolHandler handler; + private final ChannelHealthChecker healthCheck; + private final Bootstrap bootstrap; + + /** + * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. + * + * @param bootstrap the {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + */ + public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) { + this(bootstrap, handler, ChannelHealthChecker.ACTIVE); + } + + /** + * Creates a new instance. + * + * @param bootstrap the {@link Bootstrap} that is used for connections + * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions + * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is + * still healty when obtain from the {@link ChannelPool} + */ + public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) { + this.handler = checkNotNull(handler, "handler"); + this.healthCheck = checkNotNull(healthCheck, "healthCheck"); + // Clone the original Bootstrap as we want to set our own handler + this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); + this.bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + assert ch.eventLoop().inEventLoop(); + handler.channelCreated(ch); + } + }); + } + + @Override + public final Future acquire() { + return acquire(bootstrap.group().next().newPromise()); + } + + @Override + public Future acquire(final Promise promise) { + checkNotNull(promise, "promise"); + try { + final Channel ch = pollChannel(); + if (ch == null) { + // No Channel left in the pool bootstrap a new Channel + Bootstrap bs = bootstrap.clone(); + bs.attr(POOL_KEY, this); + ChannelFuture f = connectChannel(bs); + if (f.isDone()) { + notifyConnect(f, promise); + } else { + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + notifyConnect(future, promise); + } + }); + } + return promise; + } + EventLoop loop = ch.eventLoop(); + if (loop.inEventLoop()) { + doHealthCheck(ch, promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + doHealthCheck(ch, promise); + } + }); + } + } catch (Throwable cause) { + promise.setFailure(cause); + } + return promise; + } + + private static void notifyConnect(ChannelFuture future, Promise promise) { + if (future.isSuccess()) { + promise.setSuccess(future.channel()); + } else { + promise.setFailure(future.cause()); + } + } + + private void doHealthCheck(final Channel ch, final Promise promise) { + assert ch.eventLoop().inEventLoop(); + + Future f = healthCheck.isHealthy(ch); + if (f.isDone()) { + notifyHealthCheck(f, ch, promise); + } else { + f.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + notifyHealthCheck(future, ch, promise); + } + }); + } + } + + private void notifyHealthCheck(Future future, Channel ch, Promise promise) { + assert ch.eventLoop().inEventLoop(); + + if (future.isSuccess()) { + if (future.getNow() == Boolean.TRUE) { + try { + ch.attr(POOL_KEY).set(this); + handler.channelAcquired(ch); + promise.setSuccess(ch); + } catch (Throwable cause) { + closeAndFail(ch, cause, promise); + } + } else { + closeChannel(ch); + acquire(promise); + } + } else { + closeChannel(ch); + acquire(promise); + } + } + + /** + * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, + * sub-classes may override this. + * + * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify. + */ + protected ChannelFuture connectChannel(Bootstrap bs) { + return bs.connect(); + } + + @Override + public final Future release(Channel channel) { + return release(channel, channel.eventLoop().newPromise()); + } + + @Override + public Future release(final Channel channel, final Promise promise) { + checkNotNull(channel, "channel"); + checkNotNull(promise, "promise"); + try { + EventLoop loop = channel.eventLoop(); + if (loop.inEventLoop()) { + doReleaseChannel(channel, promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + doReleaseChannel(channel, promise); + } + }); + } + } catch (Throwable cause) { + closeAndFail(channel, cause, promise); + } + return promise; + } + + private void doReleaseChannel(Channel channel, Promise promise) { + assert channel.eventLoop().inEventLoop(); + // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. + if (channel.attr(POOL_KEY).getAndSet(null) != this) { + closeAndFail(channel, + // Better include a stracktrace here as this is an user error. + new IllegalArgumentException( + "Channel " + channel + " was not acquired from this ChannelPool"), + promise); + } else { + try { + if (offerChannel(channel)) { + handler.channelReleased(channel); + promise.setSuccess(null); + } else { + closeAndFail(channel, FULL_EXCEPTION, promise); + } + } catch (Throwable cause) { + closeAndFail(channel, cause, promise); + } + } + } + + private static void closeChannel(Channel channel) { + channel.attr(POOL_KEY).getAndSet(null); + channel.close(); + } + + private static void closeAndFail(Channel channel, Throwable cause, Promise promise) { + closeChannel(channel); + promise.setFailure(cause); + } + + /** + * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no + * {@link Channel} is ready to be reused. + * + * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that + * implementations of these methods needs to be thread-safe! + */ + protected Channel pollChannel() { + return deque.pollLast(); + } + + /** + * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} + * could be added, {@code false} otherwise. + * + * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that + * implementations of these methods needs to be thread-safe! + */ + protected boolean offerChannel(Channel channel) { + return deque.offer(channel); + } + + @Override + public void close() { + for (;;) { + Channel channel = pollChannel(); + if (channel == null) { + break; + } + channel.close(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/pool/package-info.java b/transport/src/main/java/io/netty/channel/pool/package-info.java new file mode 100644 index 0000000000..74293d28bb --- /dev/null +++ b/transport/src/main/java/io/netty/channel/pool/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Implementations and API for {@link io.netty.channel.Channel} pools. + */ +package io.netty.channel.pool; diff --git a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java new file mode 100644 index 0000000000..2f826ec0aa --- /dev/null +++ b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class AbstractChannelPoolMapTest { + private static final String LOCAL_ADDR_ID = "test.id"; + + @Test(expected = ChannelException.class) + public void testMap() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + final Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + AbstractChannelPoolMap poolMap = + new AbstractChannelPoolMap() { + @Override + protected SimpleChannelPool newPool(EventLoop key) { + return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler()); + } + }; + + EventLoop loop = group.next(); + + assertFalse(poolMap.iterator().hasNext()); + assertEquals(0, poolMap.size()); + + SimpleChannelPool pool = poolMap.get(loop); + assertEquals(1, poolMap.size()); + assertTrue(poolMap.iterator().hasNext()); + + assertSame(pool, poolMap.get(loop)); + assertTrue(poolMap.remove(loop)); + assertFalse(poolMap.remove(loop)); + + assertFalse(poolMap.iterator().hasNext()); + assertEquals(0, poolMap.size()); + + pool.acquire().syncUninterruptibly(); + } + + private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { + @Override + public void channelCreated(Channel ch) throws Exception { + // NOOP + } + } +} diff --git a/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java b/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java new file mode 100644 index 0000000000..dc6fce504a --- /dev/null +++ b/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.channel.Channel; + +import java.util.concurrent.atomic.AtomicInteger; + +final class CountingChannelPoolHandler implements ChannelPoolHandler { + private final AtomicInteger channelCount = new AtomicInteger(0); + private final AtomicInteger acquiredCount = new AtomicInteger(0); + private final AtomicInteger releasedCount = new AtomicInteger(0); + + @Override + public void channelCreated(Channel ch) { + channelCount.incrementAndGet(); + } + + @Override + public void channelReleased(Channel ch) { + releasedCount.incrementAndGet(); + } + + @Override + public void channelAcquired(Channel ch) { + acquiredCount.incrementAndGet(); + } + + public int channelCount() { + return channelCount.get(); + } + + public int acquiredCount() { + return acquiredCount.get(); + } + + public int releasedCount() { + return releasedCount.get(); + } +} diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java new file mode 100644 index 0000000000..8a3da2d055 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java @@ -0,0 +1,233 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction; +import io.netty.util.concurrent.Future; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +public class FixedChannelPoolTest { + private static final String LOCAL_ADDR_ID = "test.id"; + + @Test + public void testAcquire() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); + + ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE); + + Channel channel = pool.acquire().syncUninterruptibly().getNow(); + Future future = pool.acquire(); + assertFalse(future.isDone()); + + pool.release(channel).syncUninterruptibly(); + assertTrue(future.await(1, TimeUnit.SECONDS)); + + Channel channel2 = future.getNow(); + assertSame(channel, channel2); + assertEquals(1, handler.channelCount()); + + assertEquals(1, handler.acquiredCount()); + assertEquals(1, handler.releasedCount()); + + sc.close().syncUninterruptibly(); + channel2.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + + @Test(expected = TimeoutException.class) + public void testAcquireTimeout() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, + AcquireTimeoutAction.FAIL, 500, 1, Integer.MAX_VALUE); + + Channel channel = pool.acquire().syncUninterruptibly().getNow(); + Future future = pool.acquire(); + try { + future.syncUninterruptibly(); + } finally { + sc.close().syncUninterruptibly(); + channel.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + } + + @Test + public void testAcquireNewConnection() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, + AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE); + + Channel channel = pool.acquire().syncUninterruptibly().getNow(); + Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); + assertNotSame(channel, channel2); + sc.close().syncUninterruptibly(); + channel.close().syncUninterruptibly(); + channel2.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + + @Test(expected = IllegalStateException.class) + public void testAcquireBoundQueue() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); + + Channel channel = pool.acquire().syncUninterruptibly().getNow(); + Future future = pool.acquire(); + assertFalse(future.isDone()); + + try { + pool.acquire().syncUninterruptibly(); + } finally { + sc.close().syncUninterruptibly(); + channel.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testReleaseDifferentPool() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); + ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1); + + Channel channel = pool.acquire().syncUninterruptibly().getNow(); + + try { + pool2.release(channel).syncUninterruptibly(); + } finally { + sc.close().syncUninterruptibly(); + channel.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + } + + private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { + @Override + public void channelCreated(Channel ch) throws Exception { + // NOOP + } + } +} diff --git a/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java new file mode 100644 index 0000000000..20d024ddd8 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import org.junit.Test; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.*; + +public class SimpleChannelPoolTest { + private static final String LOCAL_ADDR_ID = "test.id"; + + @Test + public void testAcquire() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).sync().channel(); + CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); + + ChannelPool pool = new SimpleChannelPool(cb, handler); + + Channel channel = pool.acquire().sync().getNow(); + + pool.release(channel).syncUninterruptibly(); + + Channel channel2 = pool.acquire().sync().getNow(); + assertSame(channel, channel2); + assertEquals(1, handler.channelCount()); + pool.release(channel2).syncUninterruptibly(); + + // Should fail on multiple release calls. + try { + pool.release(channel2).syncUninterruptibly(); + fail(); + } catch (IllegalArgumentException e) { + // expected + assertFalse(channel.isActive()); + } + + assertEquals(1, handler.acquiredCount()); + assertEquals(2, handler.releasedCount()); + + sc.close().sync(); + group.shutdownGracefully(); + } + + @Test + public void testBoundedChannelPoolSegment() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).sync().channel(); + CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); + + ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) { + private final Queue queue = new LinkedBlockingQueue(1); + + @Override + protected Channel pollChannel() { + return queue.poll(); + } + + @Override + protected boolean offerChannel(Channel ch) { + return queue.offer(ch); + } + }; + + Channel channel = pool.acquire().sync().getNow(); + Channel channel2 = pool.acquire().sync().getNow(); + + pool.release(channel).syncUninterruptibly().getNow(); + try { + pool.release(channel2).syncUninterruptibly(); + fail(); + } catch (IllegalStateException e) { + // expected + } + channel2.close().sync(); + + assertEquals(2, handler.channelCount()); + assertEquals(0, handler.acquiredCount()); + assertEquals(1, handler.releasedCount()); + sc.close().sync(); + channel.close().sync(); + channel2.close().sync(); + group.shutdownGracefully(); + } +}