From 4efad295df2d0fea7a4731ac3301e36a35d5957d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 14 Jan 2019 08:28:40 +0100 Subject: [PATCH] Remove io.netty.channel.pool.* (#8681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: We should remove the ChannelPool and related implementations. It is often the case that having protocol knowledge can result in more effective pooling and ChannelPool currently doesn’t have this knowledge. This responsibility is assumed to be implemented at layers higher in the stack than Netty. Modifications: Remove io.netty.channel.pool.* Result: Less code to maintain, fixes https://github.com/netty/netty/issues/8549. --- .../pool/AbstractChannelPoolHandler.java | 44 -- .../channel/pool/AbstractChannelPoolMap.java | 100 ---- .../channel/pool/ChannelHealthChecker.java | 47 -- .../io/netty/channel/pool/ChannelPool.java | 61 --- .../channel/pool/ChannelPoolHandler.java | 48 -- .../io/netty/channel/pool/ChannelPoolMap.java | 39 -- .../netty/channel/pool/FixedChannelPool.java | 485 ------------------ .../netty/channel/pool/SimpleChannelPool.java | 401 --------------- .../io/netty/channel/pool/package-info.java | 20 - .../pool/AbstractChannelPoolMapTest.java | 76 --- .../pool/CountingChannelPoolHandler.java | 53 -- .../channel/pool/FixedChannelPoolTest.java | 355 ------------- .../channel/pool/SimpleChannelPoolTest.java | 304 ----------- 13 files changed, 2033 deletions(-) delete mode 100644 transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPool.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java delete mode 100644 transport/src/main/java/io/netty/channel/pool/package-info.java delete mode 100644 transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java delete mode 100644 transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java delete mode 100644 transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java delete mode 100644 transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java deleted file mode 100644 index 1d688f2996..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; - -/** - * A skeletal {@link ChannelPoolHandler} implementation. - */ -public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler { - - /** - * NOOP implementation, sub-classes may override this. - * - * {@inheritDoc} - */ - @Override - public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception { - // NOOP - } - - /** - * NOOP implementation, sub-classes may override this. - * - * {@inheritDoc} - */ - @Override - public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception { - // NOOP - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java deleted file mode 100644 index 4b7213e901..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ReadOnlyIterator; - -import java.io.Closeable; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; - -import static io.netty.util.internal.ObjectUtil.checkNotNull; - -/** - * A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool} - * the {@link Object#hashCode()} and {@link Object#equals(Object)} is used. - */ -public abstract class AbstractChannelPoolMap - implements ChannelPoolMap, Iterable>, Closeable { - private final ConcurrentMap map = PlatformDependent.newConcurrentHashMap(); - - @Override - public final P get(K key) { - P pool = map.get(checkNotNull(key, "key")); - if (pool == null) { - pool = newPool(key); - P old = map.putIfAbsent(key, pool); - if (old != null) { - // We need to destroy the newly created pool as we not use it. - pool.close(); - pool = old; - } - } - return pool; - } - /** - * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed, - * {@code false} otherwise. - * - * Please note that {@code null} keys are not allowed. - */ - public final boolean remove(K key) { - P pool = map.remove(checkNotNull(key, "key")); - if (pool != null) { - pool.close(); - return true; - } - return false; - } - - @Override - public final Iterator> iterator() { - return new ReadOnlyIterator>(map.entrySet().iterator()); - } - - /** - * Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}. - */ - public final int size() { - return map.size(); - } - - /** - * Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}. - */ - public final boolean isEmpty() { - return map.isEmpty(); - } - - @Override - public final boolean contains(K key) { - return map.containsKey(checkNotNull(key, "key")); - } - - /** - * Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}. - */ - protected abstract P newPool(K key); - - @Override - public final void close() { - for (K key: map.keySet()) { - remove(key); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java b/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java deleted file mode 100644 index c8775d48b6..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; - -/** - * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or - * {@link ChannelPool#acquire(Promise)}. - */ -public interface ChannelHealthChecker { - - /** - * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}. - */ - ChannelHealthChecker ACTIVE = new ChannelHealthChecker() { - @Override - public Future isHealthy(Channel channel) { - EventLoop loop = channel.eventLoop(); - return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE); - } - }; - - /** - * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once - * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - Future isHealthy(Channel channel); -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java deleted file mode 100644 index 6e163d9f33..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; - -import java.io.Closeable; - -/** - * Allows to acquire and release {@link Channel} and so act as a pool of these. - */ -public interface ChannelPool extends Closeable { - - /** - * Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once - * the acquire is successful and failed otherwise. - * - * Its important that an acquired is always released to the pool again, even if the {@link Channel} - * is explicitly closed.. - */ - Future acquire(); - - /** - * Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once - * the acquire is successful and failed otherwise. - * - * Its important that an acquired is always released to the pool again, even if the {@link Channel} - * is explicitly closed.. - */ - Future acquire(Promise promise); - - /** - * Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once - * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. - */ - Future release(Channel channel); - - /** - * Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once - * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. - */ - Future release(Channel channel, Promise promise); - - @Override - void close(); -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java deleted file mode 100644 index 0bd078fa67..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Promise; - -/** - * Handler which is called for various actions done by the {@link ChannelPool}. - */ -public interface ChannelPoolHandler { - /** - * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or - * {@link ChannelPool#release(Channel, Promise)}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelReleased(Channel ch) throws Exception; - - /** - * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or - * {@link ChannelPool#acquire(Promise)}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelAcquired(Channel ch) throws Exception; - - /** - * Called once a new {@link Channel} is created in the {@link ChannelPool}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelCreated(Channel ch) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java deleted file mode 100644 index bdddd117c3..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -/** - * Allows to map {@link ChannelPool} implementations to a specific key. - * - * @param the type of the key - * @param

the type of the {@link ChannelPool} - */ -public interface ChannelPoolMap { - /** - * Return the {@link ChannelPool} for the {@code code}. This will never return {@code null}, - * but create a new {@link ChannelPool} if non exists for they requested {@code key}. - * - * Please note that {@code null} keys are not allowed. - */ - P get(K key); - - /** - * Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}. - * - * Please note that {@code null} keys are not allowed. - */ - boolean contains(K key); -} diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java deleted file mode 100644 index 5ca376f88d..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.ThrowableUtil; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum - * number of concurrent connections. - */ -public class FixedChannelPool extends SimpleChannelPool { - private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("Too many outstanding acquire operations"), - FixedChannelPool.class, "acquire0(...)"); - private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace( - new TimeoutException("Acquire operation took longer then configured maximum time"), - FixedChannelPool.class, "(...)"); - static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - FixedChannelPool.class, "release(...)"); - static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - FixedChannelPool.class, "acquire0(...)"); - public enum AcquireTimeoutAction { - /** - * Create a new connection when the timeout is detected. - */ - NEW, - - /** - * Fail the {@link Future} of the acquire call with a {@link TimeoutException}. - */ - FAIL - } - - private final EventExecutor executor; - private final long acquireTimeoutNanos; - private final Runnable timeoutTask; - - // There is no need to worry about synchronization as everything that modified the queue or counts is done - // by the above EventExecutor. - private final Queue pendingAcquireQueue = new ArrayDeque(); - private final int maxConnections; - private final int maxPendingAcquires; - private final AtomicInteger acquiredChannelCount = new AtomicInteger(); - private int pendingAcquireCount; - private boolean closed; - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to acquire - * a {@link Channel} will be delayed until a connection is returned to the pool again. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections) { - this(bootstrap, handler, maxConnections, Integer.MAX_VALUE); - } - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, - releaseHealthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, - boolean releaseHealthCheck, boolean lastRecentUsed) { - super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); - if (maxConnections < 1) { - throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); - } - if (maxPendingAcquires < 1) { - throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)"); - } - if (action == null && acquireTimeoutMillis == -1) { - timeoutTask = null; - acquireTimeoutNanos = -1; - } else if (action == null && acquireTimeoutMillis != -1) { - throw new NullPointerException("action"); - } else if (action != null && acquireTimeoutMillis < 0) { - throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)"); - } else { - acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); - switch (action) { - case FAIL: - timeoutTask = new TimeoutTask() { - @Override - public void onTimeout(AcquireTask task) { - // Fail the promise as we timed out. - task.promise.setFailure(TIMEOUT_EXCEPTION); - } - }; - break; - case NEW: - timeoutTask = new TimeoutTask() { - @Override - public void onTimeout(AcquireTask task) { - // Increment the acquire count and delegate to super to actually acquire a Channel which will - // create a new connection. - task.acquired(); - - FixedChannelPool.super.acquire(task.promise); - } - }; - break; - default: - throw new Error(); - } - } - executor = bootstrap.config().group().next(); - this.maxConnections = maxConnections; - this.maxPendingAcquires = maxPendingAcquires; - } - - /** Returns the number of acquired channels that this pool thinks it has. */ - public int acquiredChannelCount() { - return acquiredChannelCount.get(); - } - - @Override - public Future acquire(final Promise promise) { - try { - if (executor.inEventLoop()) { - acquire0(promise); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - acquire0(promise); - } - }); - } - } catch (Throwable cause) { - promise.setFailure(cause); - } - return promise; - } - - private void acquire0(final Promise promise) { - assert executor.inEventLoop(); - - if (closed) { - promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - if (acquiredChannelCount.get() < maxConnections) { - assert acquiredChannelCount.get() >= 0; - - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop - Promise p = executor.newPromise(); - AcquireListener l = new AcquireListener(promise); - l.acquired(); - p.addListener(l); - super.acquire(p); - } else { - if (pendingAcquireCount >= maxPendingAcquires) { - promise.setFailure(FULL_EXCEPTION); - } else { - AcquireTask task = new AcquireTask(promise); - if (pendingAcquireQueue.offer(task)) { - ++pendingAcquireCount; - - if (timeoutTask != null) { - task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); - } - } else { - promise.setFailure(FULL_EXCEPTION); - } - } - - assert pendingAcquireCount > 0; - } - } - - @Override - public Future release(final Channel channel, final Promise promise) { - ObjectUtil.checkNotNull(promise, "promise"); - final Promise p = executor.newPromise(); - super.release(channel, p.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - // Since the pool is closed, we have no choice but to close the channel - channel.close(); - promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - decrementAndRunTaskQueue(); - promise.setSuccess(null); - } else { - Throwable cause = future.cause(); - // Check if the exception was not because of we passed the Channel to the wrong pool. - if (!(cause instanceof IllegalArgumentException)) { - decrementAndRunTaskQueue(); - } - promise.setFailure(future.cause()); - } - } - })); - return promise; - } - - private void decrementAndRunTaskQueue() { - // We should never have a negative value. - int currentCount = acquiredChannelCount.decrementAndGet(); - assert currentCount >= 0; - - // Run the pending acquire tasks before notify the original promise so if the user would - // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= - // maxPendingAcquires we may be able to run some pending tasks first and so allow to add - // more. - runTaskQueue(); - } - - private void runTaskQueue() { - while (acquiredChannelCount.get() < maxConnections) { - AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - - // Cancel the timeout if one was scheduled - ScheduledFuture timeoutFuture = task.timeoutFuture; - if (timeoutFuture != null) { - timeoutFuture.cancel(false); - } - - --pendingAcquireCount; - task.acquired(); - - super.acquire(task.promise); - } - - // We should never have a negative value. - assert pendingAcquireCount >= 0; - assert acquiredChannelCount.get() >= 0; - } - - // AcquireTask extends AcquireListener to reduce object creations and so GC pressure - private final class AcquireTask extends AcquireListener { - final Promise promise; - final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos; - ScheduledFuture timeoutFuture; - - public AcquireTask(Promise promise) { - super(promise); - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop. - this.promise = executor.newPromise().addListener(this); - } - } - - private abstract class TimeoutTask implements Runnable { - @Override - public final void run() { - assert executor.inEventLoop(); - long nanoTime = System.nanoTime(); - for (;;) { - AcquireTask task = pendingAcquireQueue.peek(); - // Compare nanoTime as descripted in the javadocs of System.nanoTime() - // - // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() - // See https://github.com/netty/netty/issues/3705 - if (task == null || nanoTime - task.expireNanoTime < 0) { - break; - } - pendingAcquireQueue.remove(); - - --pendingAcquireCount; - onTimeout(task); - } - } - - public abstract void onTimeout(AcquireTask task); - } - - private class AcquireListener implements FutureListener { - private final Promise originalPromise; - protected boolean acquired; - - AcquireListener(Promise originalPromise) { - this.originalPromise = originalPromise; - } - - @Override - public void operationComplete(Future future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - if (future.isSuccess()) { - // Since the pool is closed, we have no choice but to close the channel - future.getNow().close(); - } - originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - originalPromise.setSuccess(future.getNow()); - } else { - if (acquired) { - decrementAndRunTaskQueue(); - } else { - runTaskQueue(); - } - - originalPromise.setFailure(future.cause()); - } - } - - public void acquired() { - if (acquired) { - return; - } - acquiredChannelCount.incrementAndGet(); - acquired = true; - } - } - - @Override - public void close() { - if (executor.inEventLoop()) { - close0(); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - close0(); - } - }).awaitUninterruptibly(); - } - } - - private void close0() { - if (!closed) { - closed = true; - for (;;) { - AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - ScheduledFuture f = task.timeoutFuture; - if (f != null) { - f.cancel(false); - } - task.promise.setFailure(new ClosedChannelException()); - } - acquiredChannelCount.set(0); - pendingAcquireCount = 0; - - // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need - // to ensure we will not block in a EventExecutor. - GlobalEventExecutor.INSTANCE.execute(new Runnable() { - @Override - public void run() { - FixedChannelPool.super.close(); - } - }); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java deleted file mode 100644 index 6fcfd4443f..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoop; -import io.netty.util.AttributeKey; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThrowableUtil; - -import java.util.Deque; - -import static io.netty.util.internal.ObjectUtil.*; - -/** - * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire - * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. - * - * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. - * - */ -public class SimpleChannelPool implements ChannelPool { - private static final AttributeKey POOL_KEY = AttributeKey.newInstance("channelPool"); - private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)"); - - private final Deque deque = PlatformDependent.newConcurrentDeque(); - private final ChannelPoolHandler handler; - private final ChannelHealthChecker healthCheck; - private final Bootstrap bootstrap; - private final boolean releaseHealthCheck; - private final boolean lastRecentUsed; - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) { - this(bootstrap, handler, ChannelHealthChecker.ACTIVE); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) { - this(bootstrap, handler, healthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; - * otherwise, channel health is only checked at acquisition time - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, - boolean releaseHealthCheck) { - this(bootstrap, handler, healthCheck, releaseHealthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; - * otherwise, channel health is only checked at acquisition time - * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, - boolean releaseHealthCheck, boolean lastRecentUsed) { - this.handler = checkNotNull(handler, "handler"); - this.healthCheck = checkNotNull(healthCheck, "healthCheck"); - this.releaseHealthCheck = releaseHealthCheck; - // Clone the original Bootstrap as we want to set our own handler - this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); - this.bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - assert ch.eventLoop().inEventLoop(); - handler.channelCreated(ch); - } - }); - this.lastRecentUsed = lastRecentUsed; - } - - /** - * Returns the {@link Bootstrap} this pool will use to open new connections. - * - * @return the {@link Bootstrap} this pool will use to open new connections - */ - protected Bootstrap bootstrap() { - return bootstrap; - } - - /** - * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions. - * - * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions - */ - protected ChannelPoolHandler handler() { - return handler; - } - - /** - * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy. - * - * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy - */ - protected ChannelHealthChecker healthChecker() { - return healthCheck; - } - - /** - * Indicates whether this pool will check the health of channels before offering them back into the pool. - * - * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or - * {@code false} if channel health is only checked at acquisition time - */ - protected boolean releaseHealthCheck() { - return releaseHealthCheck; - } - - @Override - public final Future acquire() { - return acquire(bootstrap.config().group().next().newPromise()); - } - - @Override - public Future acquire(final Promise promise) { - checkNotNull(promise, "promise"); - return acquireHealthyFromPoolOrNew(promise); - } - - /** - * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. - * @param promise the promise to provide acquire result. - * @return future for acquiring a channel. - */ - private Future acquireHealthyFromPoolOrNew(final Promise promise) { - try { - final Channel ch = pollChannel(); - if (ch == null) { - // No Channel left in the pool bootstrap a new Channel - Bootstrap bs = bootstrap.clone(); - bs.attr(POOL_KEY, this); - ChannelFuture f = connectChannel(bs); - if (f.isDone()) { - notifyConnect(f, promise); - } else { - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - notifyConnect(future, promise); - } - }); - } - return promise; - } - EventLoop loop = ch.eventLoop(); - if (loop.inEventLoop()) { - doHealthCheck(ch, promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - doHealthCheck(ch, promise); - } - }); - } - } catch (Throwable cause) { - promise.tryFailure(cause); - } - return promise; - } - - private void notifyConnect(ChannelFuture future, Promise promise) { - if (future.isSuccess()) { - Channel channel = future.channel(); - if (!promise.trySuccess(channel)) { - // Promise was completed in the meantime (like cancelled), just release the channel again - release(channel); - } - } else { - promise.tryFailure(future.cause()); - } - } - - private void doHealthCheck(final Channel ch, final Promise promise) { - assert ch.eventLoop().inEventLoop(); - - Future f = healthCheck.isHealthy(ch); - if (f.isDone()) { - notifyHealthCheck(f, ch, promise); - } else { - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - notifyHealthCheck(future, ch, promise); - } - }); - } - } - - private void notifyHealthCheck(Future future, Channel ch, Promise promise) { - assert ch.eventLoop().inEventLoop(); - - if (future.isSuccess()) { - if (future.getNow()) { - try { - ch.attr(POOL_KEY).set(this); - handler.channelAcquired(ch); - promise.setSuccess(ch); - } catch (Throwable cause) { - closeAndFail(ch, cause, promise); - } - } else { - closeChannel(ch); - acquireHealthyFromPoolOrNew(promise); - } - } else { - closeChannel(ch); - acquireHealthyFromPoolOrNew(promise); - } - } - - /** - * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may - * override this. - *

- * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify. - */ - protected ChannelFuture connectChannel(Bootstrap bs) { - return bs.connect(); - } - - @Override - public final Future release(Channel channel) { - return release(channel, channel.eventLoop().newPromise()); - } - - @Override - public Future release(final Channel channel, final Promise promise) { - checkNotNull(channel, "channel"); - checkNotNull(promise, "promise"); - try { - EventLoop loop = channel.eventLoop(); - if (loop.inEventLoop()) { - doReleaseChannel(channel, promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - doReleaseChannel(channel, promise); - } - }); - } - } catch (Throwable cause) { - closeAndFail(channel, cause, promise); - } - return promise; - } - - private void doReleaseChannel(Channel channel, Promise promise) { - assert channel.eventLoop().inEventLoop(); - // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. - if (channel.attr(POOL_KEY).getAndSet(null) != this) { - closeAndFail(channel, - // Better include a stacktrace here as this is an user error. - new IllegalArgumentException( - "Channel " + channel + " was not acquired from this ChannelPool"), - promise); - } else { - try { - if (releaseHealthCheck) { - doHealthCheckOnRelease(channel, promise); - } else { - releaseAndOffer(channel, promise); - } - } catch (Throwable cause) { - closeAndFail(channel, cause, promise); - } - } - } - - private void doHealthCheckOnRelease(final Channel channel, final Promise promise) throws Exception { - final Future f = healthCheck.isHealthy(channel); - if (f.isDone()) { - releaseAndOfferIfHealthy(channel, promise, f); - } else { - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - releaseAndOfferIfHealthy(channel, promise, f); - } - }); - } - } - - /** - * Adds the channel back to the pool only if the channel is healthy. - * @param channel the channel to put back to the pool - * @param promise offer operation promise. - * @param future the future that contains information fif channel is healthy or not. - * @throws Exception in case when failed to notify handler about release operation. - */ - private void releaseAndOfferIfHealthy(Channel channel, Promise promise, Future future) - throws Exception { - if (future.getNow()) { //channel turns out to be healthy, offering and releasing it. - releaseAndOffer(channel, promise); - } else { //channel not healthy, just releasing it. - handler.channelReleased(channel); - promise.setSuccess(null); - } - } - - private void releaseAndOffer(Channel channel, Promise promise) throws Exception { - if (offerChannel(channel)) { - handler.channelReleased(channel); - promise.setSuccess(null); - } else { - closeAndFail(channel, FULL_EXCEPTION, promise); - } - } - - private static void closeChannel(Channel channel) { - channel.attr(POOL_KEY).getAndSet(null); - channel.close(); - } - - private static void closeAndFail(Channel channel, Throwable cause, Promise promise) { - closeChannel(channel); - promise.tryFailure(cause); - } - - /** - * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no - * {@link Channel} is ready to be reused. - * - * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that - * implementations of these methods needs to be thread-safe! - */ - protected Channel pollChannel() { - return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); - } - - /** - * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} - * could be added, {@code false} otherwise. - * - * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that - * implementations of these methods needs to be thread-safe! - */ - protected boolean offerChannel(Channel channel) { - return deque.offer(channel); - } - - @Override - public void close() { - for (;;) { - Channel channel = pollChannel(); - if (channel == null) { - break; - } - // Just ignore any errors that are reported back from close(). - channel.close().awaitUninterruptibly(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/package-info.java b/transport/src/main/java/io/netty/channel/pool/package-info.java deleted file mode 100644 index 74293d28bb..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * Implementations and API for {@link io.netty.channel.Channel} pools. - */ -package io.netty.channel.pool; diff --git a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java deleted file mode 100644 index 8d03cd7b6c..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import org.junit.Test; - -import java.net.ConnectException; - -import static org.junit.Assert.*; - -public class AbstractChannelPoolMapTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - @Test(expected = ConnectException.class) - public void testMap() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - final Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - AbstractChannelPoolMap poolMap = - new AbstractChannelPoolMap() { - @Override - protected SimpleChannelPool newPool(EventLoop key) { - return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler()); - } - }; - - EventLoop loop = group.next(); - - assertFalse(poolMap.iterator().hasNext()); - assertEquals(0, poolMap.size()); - - SimpleChannelPool pool = poolMap.get(loop); - assertEquals(1, poolMap.size()); - assertTrue(poolMap.iterator().hasNext()); - - assertSame(pool, poolMap.get(loop)); - assertTrue(poolMap.remove(loop)); - assertFalse(poolMap.remove(loop)); - - assertFalse(poolMap.iterator().hasNext()); - assertEquals(0, poolMap.size()); - - pool.acquire().syncUninterruptibly(); - } - - private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { - @Override - public void channelCreated(Channel ch) throws Exception { - // NOOP - } - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java b/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java deleted file mode 100644 index dc6fce504a..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; - -import java.util.concurrent.atomic.AtomicInteger; - -final class CountingChannelPoolHandler implements ChannelPoolHandler { - private final AtomicInteger channelCount = new AtomicInteger(0); - private final AtomicInteger acquiredCount = new AtomicInteger(0); - private final AtomicInteger releasedCount = new AtomicInteger(0); - - @Override - public void channelCreated(Channel ch) { - channelCount.incrementAndGet(); - } - - @Override - public void channelReleased(Channel ch) { - releasedCount.incrementAndGet(); - } - - @Override - public void channelAcquired(Channel ch) { - acquiredCount.incrementAndGet(); - } - - public int channelCount() { - return channelCount.get(); - } - - public int acquiredCount() { - return acquiredCount.get(); - } - - public int releasedCount() { - return releasedCount.get(); - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java deleted file mode 100644 index bbf1debeac..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction; -import io.netty.util.concurrent.Future; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; - -public class FixedChannelPoolTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - private static EventLoopGroup group; - - @BeforeClass - public static void createEventLoop() { - group = new LocalEventLoopGroup(); - } - - @AfterClass - public static void destroyEventLoop() { - if (group != null) { - group.shutdownGracefully(); - } - } - - @Test - public void testAcquire() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - assertFalse(future.isDone()); - - pool.release(channel).syncUninterruptibly(); - assertTrue(future.await(1, TimeUnit.SECONDS)); - - Channel channel2 = future.getNow(); - assertSame(channel, channel2); - assertEquals(1, handler.channelCount()); - - assertEquals(1, handler.acquiredCount()); - assertEquals(1, handler.releasedCount()); - - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - @Test(expected = TimeoutException.class) - public void testAcquireTimeout() throws Exception { - testAcquireTimeout(500); - } - - @Test(expected = TimeoutException.class) - public void testAcquireWithZeroTimeout() throws Exception { - testAcquireTimeout(0); - } - - private static void testAcquireTimeout(long timeoutMillis) throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, - AcquireTimeoutAction.FAIL, timeoutMillis, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - try { - future.syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test - public void testAcquireNewConnection() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, - AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - assertNotSame(channel, channel2); - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - /** - * Tests that the acquiredChannelCount is not added up several times for the same channel acquire request. - * @throws Exception - */ - @Test - public void testAcquireNewConnectionWhen() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - channel1.close().syncUninterruptibly(); - pool.release(channel1); - - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - - assertNotSame(channel1, channel2); - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - @Test(expected = IllegalStateException.class) - public void testAcquireBoundQueue() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - assertFalse(future.isDone()); - - try { - pool.acquire().syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test(expected = IllegalArgumentException.class) - public void testReleaseDifferentPool() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); - ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - - try { - pool2.release(channel).syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test - public void testReleaseAfterClosePool() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group).channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - - FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2); - final Future acquire = pool.acquire(); - final Channel channel = acquire.get(); - pool.close(); - group.submit(new Runnable() { - @Override - public void run() { - // NOOP - } - }).syncUninterruptibly(); - try { - pool.release(channel).syncUninterruptibly(); - fail(); - } catch (IllegalStateException e) { - assertSame(FixedChannelPool.POOL_CLOSED_ON_RELEASE_EXCEPTION, e); - } - // Since the pool is closed, the Channel should have been closed as well. - channel.closeFuture().syncUninterruptibly(); - assertFalse("Unexpected open channel", channel.isOpen()); - sc.close().syncUninterruptibly(); - } - - @Test - public void testReleaseClosed() { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group).channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - - FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2); - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - channel.close().syncUninterruptibly(); - pool.release(channel).syncUninterruptibly(); - - sc.close().syncUninterruptibly(); - } - - private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { - @Override - public void channelCreated(Channel ch) throws Exception { - // NOOP - } - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java deleted file mode 100644 index a91790c38b..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import io.netty.channel.local.LocalServerChannel; -import io.netty.util.concurrent.Future; -import org.hamcrest.CoreMatchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; - -import static org.junit.Assert.*; - -public class SimpleChannelPoolTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - @Test - public void testAcquire() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).sync().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new SimpleChannelPool(cb, handler); - - Channel channel = pool.acquire().sync().getNow(); - - pool.release(channel).syncUninterruptibly(); - - Channel channel2 = pool.acquire().sync().getNow(); - assertSame(channel, channel2); - assertEquals(1, handler.channelCount()); - pool.release(channel2).syncUninterruptibly(); - - // Should fail on multiple release calls. - try { - pool.release(channel2).syncUninterruptibly(); - fail(); - } catch (IllegalArgumentException e) { - // expected - assertFalse(channel.isActive()); - } - - assertEquals(1, handler.acquiredCount()); - assertEquals(2, handler.releasedCount()); - - sc.close().sync(); - group.shutdownGracefully(); - } - - @Test - public void testBoundedChannelPoolSegment() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).sync().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) { - private final Queue queue = new LinkedBlockingQueue(1); - - @Override - protected Channel pollChannel() { - return queue.poll(); - } - - @Override - protected boolean offerChannel(Channel ch) { - return queue.offer(ch); - } - }; - - Channel channel = pool.acquire().sync().getNow(); - Channel channel2 = pool.acquire().sync().getNow(); - - pool.release(channel).syncUninterruptibly().getNow(); - try { - pool.release(channel2).syncUninterruptibly(); - fail(); - } catch (IllegalStateException e) { - // expected - } - channel2.close().sync(); - - assertEquals(2, handler.channelCount()); - assertEquals(0, handler.acquiredCount()); - assertEquals(1, handler.releasedCount()); - sc.close().sync(); - channel.close().sync(); - channel2.close().sync(); - group.shutdownGracefully(); - } - - /** - * Tests that if channel was unhealthy it is not offered back to the pool. - * - * @throws Exception - */ - @Test - public void testUnhealthyChannelIsNotOffered() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new CountingChannelPoolHandler(); - ChannelPool pool = new SimpleChannelPool(cb, handler); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - pool.release(channel1).syncUninterruptibly(); - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - //first check that when returned healthy then it actually offered back to the pool. - assertSame(channel1, channel2); - - channel1.close().syncUninterruptibly(); - - pool.release(channel1).syncUninterruptibly(); - Channel channel3 = pool.acquire().syncUninterruptibly().getNow(); - //channel1 was not healthy anymore so it should not get acquired anymore. - assertNotSame(channel1, channel3); - sc.close().syncUninterruptibly(); - channel3.close().syncUninterruptibly(); - group.shutdownGracefully(); - } - - /** - * Tests that if channel was unhealthy it is was offered back to the pool because - * it was requested not to validate channel health on release. - * - * @throws Exception - */ - @Test - public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new CountingChannelPoolHandler(); - ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - channel1.close().syncUninterruptibly(); - Future releaseFuture = - pool.release(channel1, channel1.eventLoop().newPromise()).syncUninterruptibly(); - assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true)); - - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - //verifying that in fact the channel2 is different that means is not pulled from the pool - assertNotSame(channel1, channel2); - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - group.shutdownGracefully(); - } - - @Test - public void testBootstrap() { - final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), new CountingChannelPoolHandler()); - - try { - // Checking for the actual bootstrap object doesn't make sense here, since the pool uses a copy with a - // modified channel handler. - assertNotNull(pool.bootstrap()); - } finally { - pool.close(); - } - } - - @Test - public void testHandler() { - final ChannelPoolHandler handler = new CountingChannelPoolHandler(); - final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), handler); - - try { - assertSame(handler, pool.handler()); - } finally { - pool.close(); - } - } - - @Test - public void testHealthChecker() { - final ChannelHealthChecker healthChecker = ChannelHealthChecker.ACTIVE; - final SimpleChannelPool pool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - healthChecker); - - try { - assertSame(healthChecker, pool.healthChecker()); - } finally { - pool.close(); - } - } - - @Test - public void testReleaseHealthCheck() { - final SimpleChannelPool healthCheckOnReleasePool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - ChannelHealthChecker.ACTIVE, - true); - - try { - assertTrue(healthCheckOnReleasePool.releaseHealthCheck()); - } finally { - healthCheckOnReleasePool.close(); - } - - final SimpleChannelPool noHealthCheckOnReleasePool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - ChannelHealthChecker.ACTIVE, - false); - - try { - assertFalse(noHealthCheckOnReleasePool.releaseHealthCheck()); - } finally { - noHealthCheckOnReleasePool.close(); - } - } -}