Remove io.netty.channel.pool.* (#8681)

Motivation:

We should remove the ChannelPool and related implementations. It is often the case that having protocol knowledge can result in more effective pooling and ChannelPool currently doesn’t have this knowledge. This responsibility is assumed to be implemented at layers higher in the stack than Netty.

Modifications:

Remove io.netty.channel.pool.*

Result:

Less code to maintain, fixes https://github.com/netty/netty/issues/8549.
This commit is contained in:
Norman Maurer 2019-01-14 08:28:40 +01:00 committed by GitHub
parent d06babf02a
commit 4efad295df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 0 additions and 2033 deletions

View File

@ -1,44 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
/**
* A skeletal {@link ChannelPoolHandler} implementation.
*/
public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler {
/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}
/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}
}

View File

@ -1,100 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool}
* the {@link Object#hashCode()} and {@link Object#equals(Object)} is used.
*/
public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
implements ChannelPoolMap<K, P>, Iterable<Entry<K, P>>, Closeable {
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();
@Override
public final P get(K key) {
P pool = map.get(checkNotNull(key, "key"));
if (pool == null) {
pool = newPool(key);
P old = map.putIfAbsent(key, pool);
if (old != null) {
// We need to destroy the newly created pool as we not use it.
pool.close();
pool = old;
}
}
return pool;
}
/**
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
* {@code false} otherwise.
*
* Please note that {@code null} keys are not allowed.
*/
public final boolean remove(K key) {
P pool = map.remove(checkNotNull(key, "key"));
if (pool != null) {
pool.close();
return true;
}
return false;
}
@Override
public final Iterator<Entry<K, P>> iterator() {
return new ReadOnlyIterator<Entry<K, P>>(map.entrySet().iterator());
}
/**
* Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}.
*/
public final int size() {
return map.size();
}
/**
* Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}.
*/
public final boolean isEmpty() {
return map.isEmpty();
}
@Override
public final boolean contains(K key) {
return map.containsKey(checkNotNull(key, "key"));
}
/**
* Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}.
*/
protected abstract P newPool(K key);
@Override
public final void close() {
for (K key: map.keySet()) {
remove(key);
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
/**
* Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*/
public interface ChannelHealthChecker {
/**
* {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
*/
ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
}
};
/**
* Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
* the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
Future<Boolean> isHealthy(Channel channel);
}

View File

@ -1,61 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
/**
* Allows to acquire and release {@link Channel} and so act as a pool of these.
*/
public interface ChannelPool extends Closeable {
/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once
* the acquire is successful and failed otherwise.
*
* <strong>Its important that an acquired is always released to the pool again, even if the {@link Channel}
* is explicitly closed..</strong>
*/
Future<Channel> acquire();
/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once
* the acquire is successful and failed otherwise.
*
* <strong>Its important that an acquired is always released to the pool again, even if the {@link Channel}
* is explicitly closed..</strong>
*/
Future<Channel> acquire(Promise<Channel> promise);
/**
* Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel);
/**
* Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel, Promise<Void> promise);
@Override
void close();
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;
/**
* Handler which is called for various actions done by the {@link ChannelPool}.
*/
public interface ChannelPoolHandler {
/**
* Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
* {@link ChannelPool#release(Channel, Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelReleased(Channel ch) throws Exception;
/**
* Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelAcquired(Channel ch) throws Exception;
/**
* Called once a new {@link Channel} is created in the {@link ChannelPool}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelCreated(Channel ch) throws Exception;
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
/**
* Allows to map {@link ChannelPool} implementations to a specific key.
*
* @param <K> the type of the key
* @param <P> the type of the {@link ChannelPool}
*/
public interface ChannelPoolMap<K, P extends ChannelPool> {
/**
* Return the {@link ChannelPool} for the {@code code}. This will never return {@code null},
* but create a new {@link ChannelPool} if non exists for they requested {@code key}.
*
* Please note that {@code null} keys are not allowed.
*/
P get(K key);
/**
* Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}.
*
* Please note that {@code null} keys are not allowed.
*/
boolean contains(K key);
}

View File

@ -1,485 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
* number of concurrent connections.
*/
public class FixedChannelPool extends SimpleChannelPool {
private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("Too many outstanding acquire operations"),
FixedChannelPool.class, "acquire0(...)");
private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
new TimeoutException("Acquire operation took longer then configured maximum time"),
FixedChannelPool.class, "<init>(...)");
static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("FixedChannelPool was closed"),
FixedChannelPool.class, "release(...)");
static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("FixedChannelPool was closed"),
FixedChannelPool.class, "acquire0(...)");
public enum AcquireTimeoutAction {
/**
* Create a new connection when the timeout is detected.
*/
NEW,
/**
* Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
*/
FAIL
}
private final EventExecutor executor;
private final long acquireTimeoutNanos;
private final Runnable timeoutTask;
// There is no need to worry about synchronization as everything that modified the queue or counts is done
// by the above EventExecutor.
private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
private final int maxConnections;
private final int maxPendingAcquires;
private final AtomicInteger acquiredChannelCount = new AtomicInteger();
private int pendingAcquireCount;
private boolean closed;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param maxConnections the number of maximal active connections, once this is reached new tries to acquire
* a {@link Channel} will be delayed until a connection is returned to the pool again.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler, int maxConnections) {
this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
}
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param maxConnections the number of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
* the {@link AcquireTimeoutAction} takes place.
* @param maxConnections the number of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires) {
this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
* the {@link AcquireTimeoutAction} takes place.
* @param maxConnections the number of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
* @param releaseHealthCheck will check channel health before offering back if this parameter set to
* {@code true}.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
releaseHealthCheck, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
* the {@link AcquireTimeoutAction} takes place.
* @param maxConnections the number of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
* @param releaseHealthCheck will check channel health before offering back if this parameter set to
* {@code true}.
* @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires,
boolean releaseHealthCheck, boolean lastRecentUsed) {
super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
}
if (maxPendingAcquires < 1) {
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
}
if (action == null && acquireTimeoutMillis == -1) {
timeoutTask = null;
acquireTimeoutNanos = -1;
} else if (action == null && acquireTimeoutMillis != -1) {
throw new NullPointerException("action");
} else if (action != null && acquireTimeoutMillis < 0) {
throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
} else {
acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
switch (action) {
case FAIL:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Fail the promise as we timed out.
task.promise.setFailure(TIMEOUT_EXCEPTION);
}
};
break;
case NEW:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Increment the acquire count and delegate to super to actually acquire a Channel which will
// create a new connection.
task.acquired();
FixedChannelPool.super.acquire(task.promise);
}
};
break;
default:
throw new Error();
}
}
executor = bootstrap.config().group().next();
this.maxConnections = maxConnections;
this.maxPendingAcquires = maxPendingAcquires;
}
/** Returns the number of acquired channels that this pool thinks it has. */
public int acquiredChannelCount() {
return acquiredChannelCount.get();
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
try {
if (executor.inEventLoop()) {
acquire0(promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return;
}
if (acquiredChannelCount.get() < maxConnections) {
assert acquiredChannelCount.get() >= 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop
Promise<Channel> p = executor.newPromise();
AcquireListener l = new AcquireListener(promise);
l.acquired();
p.addListener(l);
super.acquire(p);
} else {
if (pendingAcquireCount >= maxPendingAcquires) {
promise.setFailure(FULL_EXCEPTION);
} else {
AcquireTask task = new AcquireTask(promise);
if (pendingAcquireQueue.offer(task)) {
++pendingAcquireCount;
if (timeoutTask != null) {
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
}
} else {
promise.setFailure(FULL_EXCEPTION);
}
}
assert pendingAcquireCount > 0;
}
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
ObjectUtil.checkNotNull(promise, "promise");
final Promise<Void> p = executor.newPromise();
super.release(channel, p.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
// Since the pool is closed, we have no choice but to close the channel
channel.close();
promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
return;
}
if (future.isSuccess()) {
decrementAndRunTaskQueue();
promise.setSuccess(null);
} else {
Throwable cause = future.cause();
// Check if the exception was not because of we passed the Channel to the wrong pool.
if (!(cause instanceof IllegalArgumentException)) {
decrementAndRunTaskQueue();
}
promise.setFailure(future.cause());
}
}
}));
return promise;
}
private void decrementAndRunTaskQueue() {
// We should never have a negative value.
int currentCount = acquiredChannelCount.decrementAndGet();
assert currentCount >= 0;
// Run the pending acquire tasks before notify the original promise so if the user would
// try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
// maxPendingAcquires we may be able to run some pending tasks first and so allow to add
// more.
runTaskQueue();
}
private void runTaskQueue() {
while (acquiredChannelCount.get() < maxConnections) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
// Cancel the timeout if one was scheduled
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
--pendingAcquireCount;
task.acquired();
super.acquire(task.promise);
}
// We should never have a negative value.
assert pendingAcquireCount >= 0;
assert acquiredChannelCount.get() >= 0;
}
// AcquireTask extends AcquireListener to reduce object creations and so GC pressure
private final class AcquireTask extends AcquireListener {
final Promise<Channel> promise;
final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
ScheduledFuture<?> timeoutFuture;
public AcquireTask(Promise<Channel> promise) {
super(promise);
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop.
this.promise = executor.<Channel>newPromise().addListener(this);
}
}
private abstract class TimeoutTask implements Runnable {
@Override
public final void run() {
assert executor.inEventLoop();
long nanoTime = System.nanoTime();
for (;;) {
AcquireTask task = pendingAcquireQueue.peek();
// Compare nanoTime as descripted in the javadocs of System.nanoTime()
//
// See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
// See https://github.com/netty/netty/issues/3705
if (task == null || nanoTime - task.expireNanoTime < 0) {
break;
}
pendingAcquireQueue.remove();
--pendingAcquireCount;
onTimeout(task);
}
}
public abstract void onTimeout(AcquireTask task);
}
private class AcquireListener implements FutureListener<Channel> {
private final Promise<Channel> originalPromise;
protected boolean acquired;
AcquireListener(Promise<Channel> originalPromise) {
this.originalPromise = originalPromise;
}
@Override
public void operationComplete(Future<Channel> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
if (future.isSuccess()) {
// Since the pool is closed, we have no choice but to close the channel
future.getNow().close();
}
originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return;
}
if (future.isSuccess()) {
originalPromise.setSuccess(future.getNow());
} else {
if (acquired) {
decrementAndRunTaskQueue();
} else {
runTaskQueue();
}
originalPromise.setFailure(future.cause());
}
}
public void acquired() {
if (acquired) {
return;
}
acquiredChannelCount.incrementAndGet();
acquired = true;
}
}
@Override
public void close() {
if (executor.inEventLoop()) {
close0();
} else {
executor.submit(new Runnable() {
@Override
public void run() {
close0();
}
}).awaitUninterruptibly();
}
}
private void close0() {
if (!closed) {
closed = true;
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
acquiredChannelCount.set(0);
pendingAcquireCount = 0;
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in a EventExecutor.
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
@Override
public void run() {
FixedChannelPool.super.close();
}
});
}
}
}

View File

@ -1,401 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil;
import java.util.Deque;
import static io.netty.util.internal.ObjectUtil.*;
/**
* Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
* a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
*
* This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
*
*/
public class SimpleChannelPool implements ChannelPool {
private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
private final ChannelPoolHandler handler;
private final ChannelHealthChecker healthCheck;
private final Bootstrap bootstrap;
private final boolean releaseHealthCheck;
private final boolean lastRecentUsed;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
this(bootstrap, handler, healthCheck, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
* otherwise, channel health is only checked at acquisition time
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
boolean releaseHealthCheck) {
this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healthy when obtain from the {@link ChannelPool}
* @param releaseHealthCheck will check channel health before offering back if this parameter set to {@code true};
* otherwise, channel health is only checked at acquisition time
* @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
boolean releaseHealthCheck, boolean lastRecentUsed) {
this.handler = checkNotNull(handler, "handler");
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
this.releaseHealthCheck = releaseHealthCheck;
// Clone the original Bootstrap as we want to set our own handler
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
this.bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
assert ch.eventLoop().inEventLoop();
handler.channelCreated(ch);
}
});
this.lastRecentUsed = lastRecentUsed;
}
/**
* Returns the {@link Bootstrap} this pool will use to open new connections.
*
* @return the {@link Bootstrap} this pool will use to open new connections
*/
protected Bootstrap bootstrap() {
return bootstrap;
}
/**
* Returns the {@link ChannelPoolHandler} that will be notified for the different pool actions.
*
* @return the {@link ChannelPoolHandler} that will be notified for the different pool actions
*/
protected ChannelPoolHandler handler() {
return handler;
}
/**
* Returns the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy.
*
* @return the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is healthy
*/
protected ChannelHealthChecker healthChecker() {
return healthCheck;
}
/**
* Indicates whether this pool will check the health of channels before offering them back into the pool.
*
* @return {@code true} if this pool will check the health of channels before offering them back into the pool, or
* {@code false} if channel health is only checked at acquisition time
*/
protected boolean releaseHealthCheck() {
return releaseHealthCheck;
}
@Override
public final Future<Channel> acquire() {
return acquire(bootstrap.config().group().next().<Channel>newPromise());
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
checkNotNull(promise, "promise");
return acquireHealthyFromPoolOrNew(promise);
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
if (future.isSuccess()) {
Channel channel = future.channel();
if (!promise.trySuccess(channel)) {
// Promise was completed in the meantime (like cancelled), just release the channel again
release(channel);
}
} else {
promise.tryFailure(future.cause());
}
}
private void doHealthCheck(final Channel ch, final Promise<Channel> promise) {
assert ch.eventLoop().inEventLoop();
Future<Boolean> f = healthCheck.isHealthy(ch);
if (f.isDone()) {
notifyHealthCheck(f, ch, promise);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
notifyHealthCheck(future, ch, promise);
}
});
}
}
private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) {
assert ch.eventLoop().inEventLoop();
if (future.isSuccess()) {
if (future.getNow()) {
try {
ch.attr(POOL_KEY).set(this);
handler.channelAcquired(ch);
promise.setSuccess(ch);
} catch (Throwable cause) {
closeAndFail(ch, cause, promise);
}
} else {
closeChannel(ch);
acquireHealthyFromPoolOrNew(promise);
}
} else {
closeChannel(ch);
acquireHealthyFromPoolOrNew(promise);
}
}
/**
* Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
* override this.
* <p>
* The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
*/
protected ChannelFuture connectChannel(Bootstrap bs) {
return bs.connect();
}
@Override
public final Future<Void> release(Channel channel) {
return release(channel, channel.eventLoop().<Void>newPromise());
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
return promise;
}
private void doReleaseChannel(Channel channel, Promise<Void> promise) {
assert channel.eventLoop().inEventLoop();
// Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
if (channel.attr(POOL_KEY).getAndSet(null) != this) {
closeAndFail(channel,
// Better include a stacktrace here as this is an user error.
new IllegalArgumentException(
"Channel " + channel + " was not acquired from this ChannelPool"),
promise);
} else {
try {
if (releaseHealthCheck) {
doHealthCheckOnRelease(channel, promise);
} else {
releaseAndOffer(channel, promise);
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
}
}
private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
final Future<Boolean> f = healthCheck.isHealthy(channel);
if (f.isDone()) {
releaseAndOfferIfHealthy(channel, promise, f);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
releaseAndOfferIfHealthy(channel, promise, f);
}
});
}
}
/**
* Adds the channel back to the pool only if the channel is healthy.
* @param channel the channel to put back to the pool
* @param promise offer operation promise.
* @param future the future that contains information fif channel is healthy or not.
* @throws Exception in case when failed to notify handler about release operation.
*/
private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
throws Exception {
if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
releaseAndOffer(channel, promise);
} else { //channel not healthy, just releasing it.
handler.channelReleased(channel);
promise.setSuccess(null);
}
}
private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
if (offerChannel(channel)) {
handler.channelReleased(channel);
promise.setSuccess(null);
} else {
closeAndFail(channel, FULL_EXCEPTION, promise);
}
}
private static void closeChannel(Channel channel) {
channel.attr(POOL_KEY).getAndSet(null);
channel.close();
}
private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
closeChannel(channel);
promise.tryFailure(cause);
}
/**
* Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
* {@link Channel} is ready to be reused.
*
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
* implementations of these methods needs to be thread-safe!
*/
protected Channel pollChannel() {
return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
}
/**
* Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
* could be added, {@code false} otherwise.
*
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
* implementations of these methods needs to be thread-safe!
*/
protected boolean offerChannel(Channel channel) {
return deque.offer(channel);
}
@Override
public void close() {
for (;;) {
Channel channel = pollChannel();
if (channel == null) {
break;
}
// Just ignore any errors that are reported back from close().
channel.close().awaitUninterruptibly();
}
}
}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Implementations and API for {@link io.netty.channel.Channel} pools.
*/
package io.netty.channel.pool;

View File

@ -1,76 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import org.junit.Test;
import java.net.ConnectException;
import static org.junit.Assert.*;
public class AbstractChannelPoolMapTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Test(expected = ConnectException.class)
public void testMap() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, SimpleChannelPool> poolMap =
new AbstractChannelPoolMap<EventLoop, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(EventLoop key) {
return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
SimpleChannelPool pool = poolMap.get(loop);
assertEquals(1, poolMap.size());
assertTrue(poolMap.iterator().hasNext());
assertSame(pool, poolMap.get(loop));
assertTrue(poolMap.remove(loop));
assertFalse(poolMap.remove(loop));
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
pool.acquire().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
// NOOP
}
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import java.util.concurrent.atomic.AtomicInteger;
final class CountingChannelPoolHandler implements ChannelPoolHandler {
private final AtomicInteger channelCount = new AtomicInteger(0);
private final AtomicInteger acquiredCount = new AtomicInteger(0);
private final AtomicInteger releasedCount = new AtomicInteger(0);
@Override
public void channelCreated(Channel ch) {
channelCount.incrementAndGet();
}
@Override
public void channelReleased(Channel ch) {
releasedCount.incrementAndGet();
}
@Override
public void channelAcquired(Channel ch) {
acquiredCount.incrementAndGet();
}
public int channelCount() {
return channelCount.get();
}
public int acquiredCount() {
return acquiredCount.get();
}
public int releasedCount() {
return releasedCount.get();
}
}

View File

@ -1,355 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
import io.netty.util.concurrent.Future;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
public class FixedChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id";
private static EventLoopGroup group;
@BeforeClass
public static void createEventLoop() {
group = new LocalEventLoopGroup();
}
@AfterClass
public static void destroyEventLoop() {
if (group != null) {
group.shutdownGracefully();
}
}
@Test
public void testAcquire() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
assertFalse(future.isDone());
pool.release(channel).syncUninterruptibly();
assertTrue(future.await(1, TimeUnit.SECONDS));
Channel channel2 = future.getNow();
assertSame(channel, channel2);
assertEquals(1, handler.channelCount());
assertEquals(1, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
@Test(expected = TimeoutException.class)
public void testAcquireTimeout() throws Exception {
testAcquireTimeout(500);
}
@Test(expected = TimeoutException.class)
public void testAcquireWithZeroTimeout() throws Exception {
testAcquireTimeout(0);
}
private static void testAcquireTimeout(long timeoutMillis) throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
AcquireTimeoutAction.FAIL, timeoutMillis, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
try {
future.syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
}
@Test
public void testAcquireNewConnection() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel, channel2);
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
/**
* Tests that the acquiredChannelCount is not added up several times for the same channel acquire request.
* @throws Exception
*/
@Test
public void testAcquireNewConnectionWhen() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
pool.release(channel1);
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
@Test(expected = IllegalStateException.class)
public void testAcquireBoundQueue() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
assertFalse(future.isDone());
try {
pool.acquire().syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
}
@Test(expected = IllegalArgumentException.class)
public void testReleaseDifferentPool() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
try {
pool2.release(channel).syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
}
@Test
public void testReleaseAfterClosePool() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
final Future<Channel> acquire = pool.acquire();
final Channel channel = acquire.get();
pool.close();
group.submit(new Runnable() {
@Override
public void run() {
// NOOP
}
}).syncUninterruptibly();
try {
pool.release(channel).syncUninterruptibly();
fail();
} catch (IllegalStateException e) {
assertSame(FixedChannelPool.POOL_CLOSED_ON_RELEASE_EXCEPTION, e);
}
// Since the pool is closed, the Channel should have been closed as well.
channel.closeFuture().syncUninterruptibly();
assertFalse("Unexpected open channel", channel.isOpen());
sc.close().syncUninterruptibly();
}
@Test
public void testReleaseClosed() {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
channel.close().syncUninterruptibly();
pool.release(channel).syncUninterruptibly();
sc.close().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
// NOOP
}
}
}

View File

@ -1,304 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.Future;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.Assert.*;
public class SimpleChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Test
public void testAcquire() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).sync().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel = pool.acquire().sync().getNow();
pool.release(channel).syncUninterruptibly();
Channel channel2 = pool.acquire().sync().getNow();
assertSame(channel, channel2);
assertEquals(1, handler.channelCount());
pool.release(channel2).syncUninterruptibly();
// Should fail on multiple release calls.
try {
pool.release(channel2).syncUninterruptibly();
fail();
} catch (IllegalArgumentException e) {
// expected
assertFalse(channel.isActive());
}
assertEquals(1, handler.acquiredCount());
assertEquals(2, handler.releasedCount());
sc.close().sync();
group.shutdownGracefully();
}
@Test
public void testBoundedChannelPoolSegment() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).sync().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) {
private final Queue<Channel> queue = new LinkedBlockingQueue<Channel>(1);
@Override
protected Channel pollChannel() {
return queue.poll();
}
@Override
protected boolean offerChannel(Channel ch) {
return queue.offer(ch);
}
};
Channel channel = pool.acquire().sync().getNow();
Channel channel2 = pool.acquire().sync().getNow();
pool.release(channel).syncUninterruptibly().getNow();
try {
pool.release(channel2).syncUninterruptibly();
fail();
} catch (IllegalStateException e) {
// expected
}
channel2.close().sync();
assertEquals(2, handler.channelCount());
assertEquals(0, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().sync();
channel.close().sync();
channel2.close().sync();
group.shutdownGracefully();
}
/**
* Tests that if channel was unhealthy it is not offered back to the pool.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsNotOffered() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
pool.release(channel1).syncUninterruptibly();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//first check that when returned healthy then it actually offered back to the pool.
assertSame(channel1, channel2);
channel1.close().syncUninterruptibly();
pool.release(channel1).syncUninterruptibly();
Channel channel3 = pool.acquire().syncUninterruptibly().getNow();
//channel1 was not healthy anymore so it should not get acquired anymore.
assertNotSame(channel1, channel3);
sc.close().syncUninterruptibly();
channel3.close().syncUninterruptibly();
group.shutdownGracefully();
}
/**
* Tests that if channel was unhealthy it is was offered back to the pool because
* it was requested not to validate channel health on release.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
Future<Void> releaseFuture =
pool.release(channel1, channel1.eventLoop().<Void>newPromise()).syncUninterruptibly();
assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true));
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//verifying that in fact the channel2 is different that means is not pulled from the pool
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
@Test
public void testBootstrap() {
final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), new CountingChannelPoolHandler());
try {
// Checking for the actual bootstrap object doesn't make sense here, since the pool uses a copy with a
// modified channel handler.
assertNotNull(pool.bootstrap());
} finally {
pool.close();
}
}
@Test
public void testHandler() {
final ChannelPoolHandler handler = new CountingChannelPoolHandler();
final SimpleChannelPool pool = new SimpleChannelPool(new Bootstrap(), handler);
try {
assertSame(handler, pool.handler());
} finally {
pool.close();
}
}
@Test
public void testHealthChecker() {
final ChannelHealthChecker healthChecker = ChannelHealthChecker.ACTIVE;
final SimpleChannelPool pool = new SimpleChannelPool(
new Bootstrap(),
new CountingChannelPoolHandler(),
healthChecker);
try {
assertSame(healthChecker, pool.healthChecker());
} finally {
pool.close();
}
}
@Test
public void testReleaseHealthCheck() {
final SimpleChannelPool healthCheckOnReleasePool = new SimpleChannelPool(
new Bootstrap(),
new CountingChannelPoolHandler(),
ChannelHealthChecker.ACTIVE,
true);
try {
assertTrue(healthCheckOnReleasePool.releaseHealthCheck());
} finally {
healthCheckOnReleasePool.close();
}
final SimpleChannelPool noHealthCheckOnReleasePool = new SimpleChannelPool(
new Bootstrap(),
new CountingChannelPoolHandler(),
ChannelHealthChecker.ACTIVE,
false);
try {
assertFalse(noHealthCheckOnReleasePool.releaseHealthCheck());
} finally {
noHealthCheckOnReleasePool.close();
}
}
}