diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java deleted file mode 100644 index 1d688f2996..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; - -/** - * A skeletal {@link ChannelPoolHandler} implementation. - */ -public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler { - - /** - * NOOP implementation, sub-classes may override this. - * - * {@inheritDoc} - */ - @Override - public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception { - // NOOP - } - - /** - * NOOP implementation, sub-classes may override this. - * - * {@inheritDoc} - */ - @Override - public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception { - // NOOP - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java deleted file mode 100644 index 4b7213e901..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ReadOnlyIterator; - -import java.io.Closeable; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; - -import static io.netty.util.internal.ObjectUtil.checkNotNull; - -/** - * A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool} - * the {@link Object#hashCode()} and {@link Object#equals(Object)} is used. - */ -public abstract class AbstractChannelPoolMap - implements ChannelPoolMap, Iterable>, Closeable { - private final ConcurrentMap map = PlatformDependent.newConcurrentHashMap(); - - @Override - public final P get(K key) { - P pool = map.get(checkNotNull(key, "key")); - if (pool == null) { - pool = newPool(key); - P old = map.putIfAbsent(key, pool); - if (old != null) { - // We need to destroy the newly created pool as we not use it. - pool.close(); - pool = old; - } - } - return pool; - } - /** - * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed, - * {@code false} otherwise. - * - * Please note that {@code null} keys are not allowed. - */ - public final boolean remove(K key) { - P pool = map.remove(checkNotNull(key, "key")); - if (pool != null) { - pool.close(); - return true; - } - return false; - } - - @Override - public final Iterator> iterator() { - return new ReadOnlyIterator>(map.entrySet().iterator()); - } - - /** - * Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}. - */ - public final int size() { - return map.size(); - } - - /** - * Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}. - */ - public final boolean isEmpty() { - return map.isEmpty(); - } - - @Override - public final boolean contains(K key) { - return map.containsKey(checkNotNull(key, "key")); - } - - /** - * Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}. - */ - protected abstract P newPool(K key); - - @Override - public final void close() { - for (K key: map.keySet()) { - remove(key); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java b/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java deleted file mode 100644 index c8775d48b6..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelHealthChecker.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; - -/** - * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or - * {@link ChannelPool#acquire(Promise)}. - */ -public interface ChannelHealthChecker { - - /** - * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}. - */ - ChannelHealthChecker ACTIVE = new ChannelHealthChecker() { - @Override - public Future isHealthy(Channel channel) { - EventLoop loop = channel.eventLoop(); - return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE); - } - }; - - /** - * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once - * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - Future isHealthy(Channel channel); -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java b/transport/src/main/java/io/netty/channel/pool/ChannelPool.java deleted file mode 100644 index 6e163d9f33..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPool.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; - -import java.io.Closeable; - -/** - * Allows to acquire and release {@link Channel} and so act as a pool of these. - */ -public interface ChannelPool extends Closeable { - - /** - * Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once - * the acquire is successful and failed otherwise. - * - * Its important that an acquired is always released to the pool again, even if the {@link Channel} - * is explicitly closed.. - */ - Future acquire(); - - /** - * Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once - * the acquire is successful and failed otherwise. - * - * Its important that an acquired is always released to the pool again, even if the {@link Channel} - * is explicitly closed.. - */ - Future acquire(Promise promise); - - /** - * Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once - * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. - */ - Future release(Channel channel); - - /** - * Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once - * the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed. - */ - Future release(Channel channel, Promise promise); - - @Override - void close(); -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java deleted file mode 100644 index 0bd078fa67..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPoolHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Promise; - -/** - * Handler which is called for various actions done by the {@link ChannelPool}. - */ -public interface ChannelPoolHandler { - /** - * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or - * {@link ChannelPool#release(Channel, Promise)}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelReleased(Channel ch) throws Exception; - - /** - * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or - * {@link ChannelPool#acquire(Promise)}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelAcquired(Channel ch) throws Exception; - - /** - * Called once a new {@link Channel} is created in the {@link ChannelPool}. - * - * This method will be called by the {@link EventLoop} of the {@link Channel}. - */ - void channelCreated(Channel ch) throws Exception; -} diff --git a/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java deleted file mode 100644 index bdddd117c3..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/ChannelPoolMap.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -/** - * Allows to map {@link ChannelPool} implementations to a specific key. - * - * @param the type of the key - * @param

the type of the {@link ChannelPool} - */ -public interface ChannelPoolMap { - /** - * Return the {@link ChannelPool} for the {@code code}. This will never return {@code null}, - * but create a new {@link ChannelPool} if non exists for they requested {@code key}. - * - * Please note that {@code null} keys are not allowed. - */ - P get(K key); - - /** - * Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}. - * - * Please note that {@code null} keys are not allowed. - */ - boolean contains(K key); -} diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java deleted file mode 100644 index 5ca376f88d..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.ThrowableUtil; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum - * number of concurrent connections. - */ -public class FixedChannelPool extends SimpleChannelPool { - private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("Too many outstanding acquire operations"), - FixedChannelPool.class, "acquire0(...)"); - private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace( - new TimeoutException("Acquire operation took longer then configured maximum time"), - FixedChannelPool.class, "(...)"); - static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - FixedChannelPool.class, "release(...)"); - static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - FixedChannelPool.class, "acquire0(...)"); - public enum AcquireTimeoutAction { - /** - * Create a new connection when the timeout is detected. - */ - NEW, - - /** - * Fail the {@link Future} of the acquire call with a {@link TimeoutException}. - */ - FAIL - } - - private final EventExecutor executor; - private final long acquireTimeoutNanos; - private final Runnable timeoutTask; - - // There is no need to worry about synchronization as everything that modified the queue or counts is done - // by the above EventExecutor. - private final Queue pendingAcquireQueue = new ArrayDeque(); - private final int maxConnections; - private final int maxPendingAcquires; - private final AtomicInteger acquiredChannelCount = new AtomicInteger(); - private int pendingAcquireCount; - private boolean closed; - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to acquire - * a {@link Channel} will be delayed until a connection is returned to the pool again. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections) { - this(bootstrap, handler, maxConnections, Integer.MAX_VALUE); - } - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, - releaseHealthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. - */ - public FixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, - boolean releaseHealthCheck, boolean lastRecentUsed) { - super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); - if (maxConnections < 1) { - throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); - } - if (maxPendingAcquires < 1) { - throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)"); - } - if (action == null && acquireTimeoutMillis == -1) { - timeoutTask = null; - acquireTimeoutNanos = -1; - } else if (action == null && acquireTimeoutMillis != -1) { - throw new NullPointerException("action"); - } else if (action != null && acquireTimeoutMillis < 0) { - throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)"); - } else { - acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); - switch (action) { - case FAIL: - timeoutTask = new TimeoutTask() { - @Override - public void onTimeout(AcquireTask task) { - // Fail the promise as we timed out. - task.promise.setFailure(TIMEOUT_EXCEPTION); - } - }; - break; - case NEW: - timeoutTask = new TimeoutTask() { - @Override - public void onTimeout(AcquireTask task) { - // Increment the acquire count and delegate to super to actually acquire a Channel which will - // create a new connection. - task.acquired(); - - FixedChannelPool.super.acquire(task.promise); - } - }; - break; - default: - throw new Error(); - } - } - executor = bootstrap.config().group().next(); - this.maxConnections = maxConnections; - this.maxPendingAcquires = maxPendingAcquires; - } - - /** Returns the number of acquired channels that this pool thinks it has. */ - public int acquiredChannelCount() { - return acquiredChannelCount.get(); - } - - @Override - public Future acquire(final Promise promise) { - try { - if (executor.inEventLoop()) { - acquire0(promise); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - acquire0(promise); - } - }); - } - } catch (Throwable cause) { - promise.setFailure(cause); - } - return promise; - } - - private void acquire0(final Promise promise) { - assert executor.inEventLoop(); - - if (closed) { - promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - if (acquiredChannelCount.get() < maxConnections) { - assert acquiredChannelCount.get() >= 0; - - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop - Promise p = executor.newPromise(); - AcquireListener l = new AcquireListener(promise); - l.acquired(); - p.addListener(l); - super.acquire(p); - } else { - if (pendingAcquireCount >= maxPendingAcquires) { - promise.setFailure(FULL_EXCEPTION); - } else { - AcquireTask task = new AcquireTask(promise); - if (pendingAcquireQueue.offer(task)) { - ++pendingAcquireCount; - - if (timeoutTask != null) { - task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); - } - } else { - promise.setFailure(FULL_EXCEPTION); - } - } - - assert pendingAcquireCount > 0; - } - } - - @Override - public Future release(final Channel channel, final Promise promise) { - ObjectUtil.checkNotNull(promise, "promise"); - final Promise p = executor.newPromise(); - super.release(channel, p.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - // Since the pool is closed, we have no choice but to close the channel - channel.close(); - promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - decrementAndRunTaskQueue(); - promise.setSuccess(null); - } else { - Throwable cause = future.cause(); - // Check if the exception was not because of we passed the Channel to the wrong pool. - if (!(cause instanceof IllegalArgumentException)) { - decrementAndRunTaskQueue(); - } - promise.setFailure(future.cause()); - } - } - })); - return promise; - } - - private void decrementAndRunTaskQueue() { - // We should never have a negative value. - int currentCount = acquiredChannelCount.decrementAndGet(); - assert currentCount >= 0; - - // Run the pending acquire tasks before notify the original promise so if the user would - // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= - // maxPendingAcquires we may be able to run some pending tasks first and so allow to add - // more. - runTaskQueue(); - } - - private void runTaskQueue() { - while (acquiredChannelCount.get() < maxConnections) { - AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - - // Cancel the timeout if one was scheduled - ScheduledFuture timeoutFuture = task.timeoutFuture; - if (timeoutFuture != null) { - timeoutFuture.cancel(false); - } - - --pendingAcquireCount; - task.acquired(); - - super.acquire(task.promise); - } - - // We should never have a negative value. - assert pendingAcquireCount >= 0; - assert acquiredChannelCount.get() >= 0; - } - - // AcquireTask extends AcquireListener to reduce object creations and so GC pressure - private final class AcquireTask extends AcquireListener { - final Promise promise; - final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos; - ScheduledFuture timeoutFuture; - - public AcquireTask(Promise promise) { - super(promise); - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop. - this.promise = executor.newPromise().addListener(this); - } - } - - private abstract class TimeoutTask implements Runnable { - @Override - public final void run() { - assert executor.inEventLoop(); - long nanoTime = System.nanoTime(); - for (;;) { - AcquireTask task = pendingAcquireQueue.peek(); - // Compare nanoTime as descripted in the javadocs of System.nanoTime() - // - // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() - // See https://github.com/netty/netty/issues/3705 - if (task == null || nanoTime - task.expireNanoTime < 0) { - break; - } - pendingAcquireQueue.remove(); - - --pendingAcquireCount; - onTimeout(task); - } - } - - public abstract void onTimeout(AcquireTask task); - } - - private class AcquireListener implements FutureListener { - private final Promise originalPromise; - protected boolean acquired; - - AcquireListener(Promise originalPromise) { - this.originalPromise = originalPromise; - } - - @Override - public void operationComplete(Future future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - if (future.isSuccess()) { - // Since the pool is closed, we have no choice but to close the channel - future.getNow().close(); - } - originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - originalPromise.setSuccess(future.getNow()); - } else { - if (acquired) { - decrementAndRunTaskQueue(); - } else { - runTaskQueue(); - } - - originalPromise.setFailure(future.cause()); - } - } - - public void acquired() { - if (acquired) { - return; - } - acquiredChannelCount.incrementAndGet(); - acquired = true; - } - } - - @Override - public void close() { - if (executor.inEventLoop()) { - close0(); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - close0(); - } - }).awaitUninterruptibly(); - } - } - - private void close0() { - if (!closed) { - closed = true; - for (;;) { - AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - ScheduledFuture f = task.timeoutFuture; - if (f != null) { - f.cancel(false); - } - task.promise.setFailure(new ClosedChannelException()); - } - acquiredChannelCount.set(0); - pendingAcquireCount = 0; - - // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need - // to ensure we will not block in a EventExecutor. - GlobalEventExecutor.INSTANCE.execute(new Runnable() { - @Override - public void run() { - FixedChannelPool.super.close(); - } - }); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java deleted file mode 100644 index 6fcfd4443f..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoop; -import io.netty.util.AttributeKey; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThrowableUtil; - -import java.util.Deque; - -import static io.netty.util.internal.ObjectUtil.*; - -/** - * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire - * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. - * - * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. - * - */ -public class SimpleChannelPool implements ChannelPool { - private static final AttributeKey POOL_KEY = AttributeKey.newInstance("channelPool"); - private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)"); - - private final Deque deque = PlatformDependent.newConcurrentDeque(); - private final ChannelPoolHandler handler; - private final ChannelHealthChecker healthCheck; - private final Bootstrap bootstrap; - private final boolean releaseHealthCheck; - private final boolean lastRecentUsed; - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) { - this(bootstrap, handler, ChannelHealthChecker.ACTIVE); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) { - this(bootstrap, handler, healthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; - * otherwise, channel health is only checked at acquisition time - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, - boolean releaseHealthCheck) { - this(bootstrap, handler, healthCheck, releaseHealthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true}; - * otherwise, channel health is only checked at acquisition time - * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. - */ - public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, - boolean releaseHealthCheck, boolean lastRecentUsed) { - this.handler = checkNotNull(handler, "handler"); - this.healthCheck = checkNotNull(healthCheck, "healthCheck"); - this.releaseHealthCheck = releaseHealthCheck; - // Clone the original Bootstrap as we want to set our own handler - this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); - this.bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - assert ch.eventLoop().inEventLoop(); - handler.channelCreated(ch); - } - }); - this.lastRecentUsed = lastRecentUsed; - } - - /** - * Returns the {@link Bootstrap} this pool will use to open new connections. - * - * @return the {@link Bootstrap} this pool will use to open new connections - */ - protected Bootstrap bootstrap() { - return bootstrap; - } - - /** - * Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions. - * - * @return the {@link ChannelPoolHandler} that will be notified for the different pool actions - */ - protected ChannelPoolHandler handler() { - return handler; - } - - /** - * Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy. - * - * @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy - */ - protected ChannelHealthChecker healthChecker() { - return healthCheck; - } - - /** - * Indicates whether this pool will check the health of channels before offering them back into the pool. - * - * @return {@code true} if this pool will check the health of channels before offering them back into the pool, or - * {@code false} if channel health is only checked at acquisition time - */ - protected boolean releaseHealthCheck() { - return releaseHealthCheck; - } - - @Override - public final Future acquire() { - return acquire(bootstrap.config().group().next().newPromise()); - } - - @Override - public Future acquire(final Promise promise) { - checkNotNull(promise, "promise"); - return acquireHealthyFromPoolOrNew(promise); - } - - /** - * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. - * @param promise the promise to provide acquire result. - * @return future for acquiring a channel. - */ - private Future acquireHealthyFromPoolOrNew(final Promise promise) { - try { - final Channel ch = pollChannel(); - if (ch == null) { - // No Channel left in the pool bootstrap a new Channel - Bootstrap bs = bootstrap.clone(); - bs.attr(POOL_KEY, this); - ChannelFuture f = connectChannel(bs); - if (f.isDone()) { - notifyConnect(f, promise); - } else { - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - notifyConnect(future, promise); - } - }); - } - return promise; - } - EventLoop loop = ch.eventLoop(); - if (loop.inEventLoop()) { - doHealthCheck(ch, promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - doHealthCheck(ch, promise); - } - }); - } - } catch (Throwable cause) { - promise.tryFailure(cause); - } - return promise; - } - - private void notifyConnect(ChannelFuture future, Promise promise) { - if (future.isSuccess()) { - Channel channel = future.channel(); - if (!promise.trySuccess(channel)) { - // Promise was completed in the meantime (like cancelled), just release the channel again - release(channel); - } - } else { - promise.tryFailure(future.cause()); - } - } - - private void doHealthCheck(final Channel ch, final Promise promise) { - assert ch.eventLoop().inEventLoop(); - - Future f = healthCheck.isHealthy(ch); - if (f.isDone()) { - notifyHealthCheck(f, ch, promise); - } else { - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - notifyHealthCheck(future, ch, promise); - } - }); - } - } - - private void notifyHealthCheck(Future future, Channel ch, Promise promise) { - assert ch.eventLoop().inEventLoop(); - - if (future.isSuccess()) { - if (future.getNow()) { - try { - ch.attr(POOL_KEY).set(this); - handler.channelAcquired(ch); - promise.setSuccess(ch); - } catch (Throwable cause) { - closeAndFail(ch, cause, promise); - } - } else { - closeChannel(ch); - acquireHealthyFromPoolOrNew(promise); - } - } else { - closeChannel(ch); - acquireHealthyFromPoolOrNew(promise); - } - } - - /** - * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may - * override this. - *

- * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify. - */ - protected ChannelFuture connectChannel(Bootstrap bs) { - return bs.connect(); - } - - @Override - public final Future release(Channel channel) { - return release(channel, channel.eventLoop().newPromise()); - } - - @Override - public Future release(final Channel channel, final Promise promise) { - checkNotNull(channel, "channel"); - checkNotNull(promise, "promise"); - try { - EventLoop loop = channel.eventLoop(); - if (loop.inEventLoop()) { - doReleaseChannel(channel, promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - doReleaseChannel(channel, promise); - } - }); - } - } catch (Throwable cause) { - closeAndFail(channel, cause, promise); - } - return promise; - } - - private void doReleaseChannel(Channel channel, Promise promise) { - assert channel.eventLoop().inEventLoop(); - // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. - if (channel.attr(POOL_KEY).getAndSet(null) != this) { - closeAndFail(channel, - // Better include a stacktrace here as this is an user error. - new IllegalArgumentException( - "Channel " + channel + " was not acquired from this ChannelPool"), - promise); - } else { - try { - if (releaseHealthCheck) { - doHealthCheckOnRelease(channel, promise); - } else { - releaseAndOffer(channel, promise); - } - } catch (Throwable cause) { - closeAndFail(channel, cause, promise); - } - } - } - - private void doHealthCheckOnRelease(final Channel channel, final Promise promise) throws Exception { - final Future f = healthCheck.isHealthy(channel); - if (f.isDone()) { - releaseAndOfferIfHealthy(channel, promise, f); - } else { - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - releaseAndOfferIfHealthy(channel, promise, f); - } - }); - } - } - - /** - * Adds the channel back to the pool only if the channel is healthy. - * @param channel the channel to put back to the pool - * @param promise offer operation promise. - * @param future the future that contains information fif channel is healthy or not. - * @throws Exception in case when failed to notify handler about release operation. - */ - private void releaseAndOfferIfHealthy(Channel channel, Promise promise, Future future) - throws Exception { - if (future.getNow()) { //channel turns out to be healthy, offering and releasing it. - releaseAndOffer(channel, promise); - } else { //channel not healthy, just releasing it. - handler.channelReleased(channel); - promise.setSuccess(null); - } - } - - private void releaseAndOffer(Channel channel, Promise promise) throws Exception { - if (offerChannel(channel)) { - handler.channelReleased(channel); - promise.setSuccess(null); - } else { - closeAndFail(channel, FULL_EXCEPTION, promise); - } - } - - private static void closeChannel(Channel channel) { - channel.attr(POOL_KEY).getAndSet(null); - channel.close(); - } - - private static void closeAndFail(Channel channel, Throwable cause, Promise promise) { - closeChannel(channel); - promise.tryFailure(cause); - } - - /** - * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no - * {@link Channel} is ready to be reused. - * - * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that - * implementations of these methods needs to be thread-safe! - */ - protected Channel pollChannel() { - return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); - } - - /** - * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} - * could be added, {@code false} otherwise. - * - * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that - * implementations of these methods needs to be thread-safe! - */ - protected boolean offerChannel(Channel channel) { - return deque.offer(channel); - } - - @Override - public void close() { - for (;;) { - Channel channel = pollChannel(); - if (channel == null) { - break; - } - // Just ignore any errors that are reported back from close(). - channel.close().awaitUninterruptibly(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/pool/package-info.java b/transport/src/main/java/io/netty/channel/pool/package-info.java deleted file mode 100644 index 74293d28bb..0000000000 --- a/transport/src/main/java/io/netty/channel/pool/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * Implementations and API for {@link io.netty.channel.Channel} pools. - */ -package io.netty.channel.pool; diff --git a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java deleted file mode 100644 index 8d03cd7b6c..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import org.junit.Test; - -import java.net.ConnectException; - -import static org.junit.Assert.*; - -public class AbstractChannelPoolMapTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - @Test(expected = ConnectException.class) - public void testMap() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - final Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - AbstractChannelPoolMap poolMap = - new AbstractChannelPoolMap() { - @Override - protected SimpleChannelPool newPool(EventLoop key) { - return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler()); - } - }; - - EventLoop loop = group.next(); - - assertFalse(poolMap.iterator().hasNext()); - assertEquals(0, poolMap.size()); - - SimpleChannelPool pool = poolMap.get(loop); - assertEquals(1, poolMap.size()); - assertTrue(poolMap.iterator().hasNext()); - - assertSame(pool, poolMap.get(loop)); - assertTrue(poolMap.remove(loop)); - assertFalse(poolMap.remove(loop)); - - assertFalse(poolMap.iterator().hasNext()); - assertEquals(0, poolMap.size()); - - pool.acquire().syncUninterruptibly(); - } - - private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { - @Override - public void channelCreated(Channel ch) throws Exception { - // NOOP - } - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java b/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java deleted file mode 100644 index dc6fce504a..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/CountingChannelPoolHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.channel.Channel; - -import java.util.concurrent.atomic.AtomicInteger; - -final class CountingChannelPoolHandler implements ChannelPoolHandler { - private final AtomicInteger channelCount = new AtomicInteger(0); - private final AtomicInteger acquiredCount = new AtomicInteger(0); - private final AtomicInteger releasedCount = new AtomicInteger(0); - - @Override - public void channelCreated(Channel ch) { - channelCount.incrementAndGet(); - } - - @Override - public void channelReleased(Channel ch) { - releasedCount.incrementAndGet(); - } - - @Override - public void channelAcquired(Channel ch) { - acquiredCount.incrementAndGet(); - } - - public int channelCount() { - return channelCount.get(); - } - - public int acquiredCount() { - return acquiredCount.get(); - } - - public int releasedCount() { - return releasedCount.get(); - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java deleted file mode 100644 index bbf1debeac..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction; -import io.netty.util.concurrent.Future; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; - -public class FixedChannelPoolTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - private static EventLoopGroup group; - - @BeforeClass - public static void createEventLoop() { - group = new LocalEventLoopGroup(); - } - - @AfterClass - public static void destroyEventLoop() { - if (group != null) { - group.shutdownGracefully(); - } - } - - @Test - public void testAcquire() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - assertFalse(future.isDone()); - - pool.release(channel).syncUninterruptibly(); - assertTrue(future.await(1, TimeUnit.SECONDS)); - - Channel channel2 = future.getNow(); - assertSame(channel, channel2); - assertEquals(1, handler.channelCount()); - - assertEquals(1, handler.acquiredCount()); - assertEquals(1, handler.releasedCount()); - - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - @Test(expected = TimeoutException.class) - public void testAcquireTimeout() throws Exception { - testAcquireTimeout(500); - } - - @Test(expected = TimeoutException.class) - public void testAcquireWithZeroTimeout() throws Exception { - testAcquireTimeout(0); - } - - private static void testAcquireTimeout(long timeoutMillis) throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, - AcquireTimeoutAction.FAIL, timeoutMillis, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - try { - future.syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test - public void testAcquireNewConnection() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, - AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - assertNotSame(channel, channel2); - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - /** - * Tests that the acquiredChannelCount is not added up several times for the same channel acquire request. - * @throws Exception - */ - @Test - public void testAcquireNewConnectionWhen() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - channel1.close().syncUninterruptibly(); - pool.release(channel1); - - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - - assertNotSame(channel1, channel2); - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - } - - @Test(expected = IllegalStateException.class) - public void testAcquireBoundQueue() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - Future future = pool.acquire(); - assertFalse(future.isDone()); - - try { - pool.acquire().syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test(expected = IllegalArgumentException.class) - public void testReleaseDifferentPool() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new TestChannelPoolHandler(); - ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1); - ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1); - - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - - try { - pool2.release(channel).syncUninterruptibly(); - } finally { - sc.close().syncUninterruptibly(); - channel.close().syncUninterruptibly(); - } - } - - @Test - public void testReleaseAfterClosePool() throws Exception { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group).channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - - FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2); - final Future acquire = pool.acquire(); - final Channel channel = acquire.get(); - pool.close(); - group.submit(new Runnable() { - @Override - public void run() { - // NOOP - } - }).syncUninterruptibly(); - try { - pool.release(channel).syncUninterruptibly(); - fail(); - } catch (IllegalStateException e) { - assertSame(FixedChannelPool.POOL_CLOSED_ON_RELEASE_EXCEPTION, e); - } - // Since the pool is closed, the Channel should have been closed as well. - channel.closeFuture().syncUninterruptibly(); - assertFalse("Unexpected open channel", channel.isOpen()); - sc.close().syncUninterruptibly(); - } - - @Test - public void testReleaseClosed() { - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group).channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - - FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2); - Channel channel = pool.acquire().syncUninterruptibly().getNow(); - channel.close().syncUninterruptibly(); - pool.release(channel).syncUninterruptibly(); - - sc.close().syncUninterruptibly(); - } - - private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { - @Override - public void channelCreated(Channel ch) throws Exception { - // NOOP - } - } -} diff --git a/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java deleted file mode 100644 index a91790c38b..0000000000 --- a/transport/src/test/java/io/netty/channel/pool/SimpleChannelPoolTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.pool; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; -import io.netty.channel.local.LocalServerChannel; -import io.netty.util.concurrent.Future; -import org.hamcrest.CoreMatchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; - -import static org.junit.Assert.*; - -public class SimpleChannelPoolTest { - private static final String LOCAL_ADDR_ID = "test.id"; - - @Test - public void testAcquire() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).sync().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new SimpleChannelPool(cb, handler); - - Channel channel = pool.acquire().sync().getNow(); - - pool.release(channel).syncUninterruptibly(); - - Channel channel2 = pool.acquire().sync().getNow(); - assertSame(channel, channel2); - assertEquals(1, handler.channelCount()); - pool.release(channel2).syncUninterruptibly(); - - // Should fail on multiple release calls. - try { - pool.release(channel2).syncUninterruptibly(); - fail(); - } catch (IllegalArgumentException e) { - // expected - assertFalse(channel.isActive()); - } - - assertEquals(1, handler.acquiredCount()); - assertEquals(2, handler.releasedCount()); - - sc.close().sync(); - group.shutdownGracefully(); - } - - @Test - public void testBoundedChannelPoolSegment() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).sync().channel(); - CountingChannelPoolHandler handler = new CountingChannelPoolHandler(); - - ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) { - private final Queue queue = new LinkedBlockingQueue(1); - - @Override - protected Channel pollChannel() { - return queue.poll(); - } - - @Override - protected boolean offerChannel(Channel ch) { - return queue.offer(ch); - } - }; - - Channel channel = pool.acquire().sync().getNow(); - Channel channel2 = pool.acquire().sync().getNow(); - - pool.release(channel).syncUninterruptibly().getNow(); - try { - pool.release(channel2).syncUninterruptibly(); - fail(); - } catch (IllegalStateException e) { - // expected - } - channel2.close().sync(); - - assertEquals(2, handler.channelCount()); - assertEquals(0, handler.acquiredCount()); - assertEquals(1, handler.releasedCount()); - sc.close().sync(); - channel.close().sync(); - channel2.close().sync(); - group.shutdownGracefully(); - } - - /** - * Tests that if channel was unhealthy it is not offered back to the pool. - * - * @throws Exception - */ - @Test - public void testUnhealthyChannelIsNotOffered() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new CountingChannelPoolHandler(); - ChannelPool pool = new SimpleChannelPool(cb, handler); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - pool.release(channel1).syncUninterruptibly(); - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - //first check that when returned healthy then it actually offered back to the pool. - assertSame(channel1, channel2); - - channel1.close().syncUninterruptibly(); - - pool.release(channel1).syncUninterruptibly(); - Channel channel3 = pool.acquire().syncUninterruptibly().getNow(); - //channel1 was not healthy anymore so it should not get acquired anymore. - assertNotSame(channel1, channel3); - sc.close().syncUninterruptibly(); - channel3.close().syncUninterruptibly(); - group.shutdownGracefully(); - } - - /** - * Tests that if channel was unhealthy it is was offered back to the pool because - * it was requested not to validate channel health on release. - * - * @throws Exception - */ - @Test - public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception { - EventLoopGroup group = new LocalEventLoopGroup(); - LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); - Bootstrap cb = new Bootstrap(); - cb.remoteAddress(addr); - cb.group(group) - .channel(LocalChannel.class); - - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); - } - }); - - // Start server - Channel sc = sb.bind(addr).syncUninterruptibly().channel(); - ChannelPoolHandler handler = new CountingChannelPoolHandler(); - ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false); - Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); - channel1.close().syncUninterruptibly(); - Future releaseFuture = - pool.release(channel1, channel1.eventLoop().newPromise()).syncUninterruptibly(); - assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true)); - - Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); - //verifying that in fact the channel2 is different that means is not pulled from the pool - assertNotSame(channel1, channel2); - sc.close().syncUninterruptibly(); - channel2.close().syncUninterruptibly(); - group.shutdownGracefully(); - } - - @Test - public void testBootstrap() { - final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), new CountingChannelPoolHandler()); - - try { - // Checking for the actual bootstrap object doesn't make sense here, since the pool uses a copy with a - // modified channel handler. - assertNotNull(pool.bootstrap()); - } finally { - pool.close(); - } - } - - @Test - public void testHandler() { - final ChannelPoolHandler handler = new CountingChannelPoolHandler(); - final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), handler); - - try { - assertSame(handler, pool.handler()); - } finally { - pool.close(); - } - } - - @Test - public void testHealthChecker() { - final ChannelHealthChecker healthChecker = ChannelHealthChecker.ACTIVE; - final SimpleChannelPool pool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - healthChecker); - - try { - assertSame(healthChecker, pool.healthChecker()); - } finally { - pool.close(); - } - } - - @Test - public void testReleaseHealthCheck() { - final SimpleChannelPool healthCheckOnReleasePool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - ChannelHealthChecker.ACTIVE, - true); - - try { - assertTrue(healthCheckOnReleasePool.releaseHealthCheck()); - } finally { - healthCheckOnReleasePool.close(); - } - - final SimpleChannelPool noHealthCheckOnReleasePool = new SimpleChannelPool( - new Bootstrap(), - new CountingChannelPoolHandler(), - ChannelHealthChecker.ACTIVE, - false); - - try { - assertFalse(noHealthCheckOnReleasePool.releaseHealthCheck()); - } finally { - noHealthCheckOnReleasePool.close(); - } - } -}