[#3218] Add ChannelPool / ChannelPoolMap abstraction and implementations

Motivation:

Many projects need some kind a Channel/Connection pool implementation. While the protocols are different many things can be shared, so we should provide a generic API and implementation.

Modifications:

Add ChannelPool / ChannelPoolMap API and implementations.

Result:

Reusable / Generic pool implementation that users can use.
This commit is contained in:
Norman Maurer 2015-04-30 12:26:46 +02:00
parent 2df9cf65af
commit d74f96432f
17 changed files with 1538 additions and 1 deletions

View File

@ -29,13 +29,16 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -447,6 +450,17 @@ public final class PlatformDependent {
return PlatformDependent0.getSystemClassLoader(); return PlatformDependent0.getSystemClassLoader();
} }
/**
* Returns a new concurrent {@link Deque}.
*/
public static <C> Deque<C> newConcurrentDeque() {
if (javaVersion() < 7) {
return new LinkedBlockingDeque<C>();
} else {
return new ConcurrentLinkedDeque<C>();
}
}
private static boolean isAndroid0() { private static boolean isAndroid0() {
boolean android; boolean android;
try { try {

View File

@ -966,6 +966,8 @@
<!-- SSLSession implementation --> <!-- SSLSession implementation -->
<ignore>javax.net.ssl.SSLEngine</ignore> <ignore>javax.net.ssl.SSLEngine</ignore>
<ignore>javax.net.ssl.X509ExtendedTrustManager</ignore> <ignore>javax.net.ssl.X509ExtendedTrustManager</ignore>
<ignore>java.util.concurrent.ConcurrentLinkedDeque</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -46,7 +46,7 @@ import java.util.Map;
*/ */
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private volatile EventLoopGroup group; volatile EventLoopGroup group;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory; private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress; private volatile SocketAddress localAddress;

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.DefaultNameResolverGroup; import io.netty.resolver.DefaultNameResolverGroup;
import io.netty.resolver.NameResolver; import io.netty.resolver.NameResolver;
import io.netty.resolver.NameResolverGroup; import io.netty.resolver.NameResolverGroup;
@ -281,6 +282,17 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
return new Bootstrap(this); return new Bootstrap(this);
} }
/**
* Returns a deep clone of this bootstrap which has the identical configuration except that it uses
* the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar
* settings.
*/
public Bootstrap clone(EventLoopGroup group) {
Bootstrap bs = new Bootstrap(this);
bs.group = group;
return bs;
}
@Override @Override
public String toString() { public String toString() {
if (remoteAddress == null) { if (remoteAddress == null) {

View File

@ -0,0 +1,44 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
/**
* A skeletal {@link ChannelPoolHandler} implementation.
*/
public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler {
/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}
/**
* NOOP implementation, sub-classes may override this.
*
* {@inheritDoc}
*/
@Override
public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception {
// NOOP
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool}
* the {@link Object#hashCode()} and {@link Object#equals(Object)} is used.
*/
public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
implements ChannelPoolMap<K, P>, Iterable<Entry<K, P>>, Closeable {
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();
@Override
public final P get(K key) {
P pool = map.get(checkNotNull(key, "key"));
if (pool == null) {
pool = newPool(key);
P old = map.putIfAbsent(key, pool);
if (old != null) {
// We need to destroy the newly created pool as we not use it.
pool.close();
pool = old;
}
}
return pool;
}
/**
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
* {@code false} otherwise.
*
* Please note that {@code null} keys are not allowed.
*/
public final boolean remove(K key) {
P pool = map.remove(checkNotNull(key, "key"));
if (pool != null) {
pool.close();
return true;
}
return false;
}
@Override
public final Iterator<Entry<K, P>> iterator() {
return new ReadOnlyIterator<Entry<K, P>>(map.entrySet().iterator());
}
/**
* Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}.
*/
public final int size() {
return map.size();
}
/**
* Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}.
*/
public final boolean isEmpty() {
return map.isEmpty();
}
@Override
public final boolean contains(K key) {
return map.containsKey(checkNotNull(key, "key"));
}
/**
* Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}.
*/
protected abstract P newPool(K key);
@Override
public final void close() {
for (K key: map.keySet()) {
remove(key);
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
/**
* Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*/
public interface ChannelHealthChecker {
/**
* {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
*/
ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
}
};
/**
* Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
* the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
Future<Boolean> isHealthy(Channel channel);
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.io.IOException;
/**
* Allows to acquire and release {@link Channel} and so act as a pool of these.
*/
public interface ChannelPool extends Closeable {
/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once
* the acquire is successful and failed otherwise.
*/
Future<Channel> acquire();
/**
* Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once
* the acquire is successful and failed otherwise.
*/
Future<Channel> acquire(Promise<Channel> promise);
/**
* Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel);
/**
* Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
*/
Future<Void> release(Channel channel, Promise<Void> promise);
@Override
void close();
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Promise;
/**
* Handler which is called for various actions done by the {@link ChannelPool}.
*/
public interface ChannelPoolHandler {
/**
* Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
* {@link ChannelPool#release(Channel, Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelReleased(Channel ch) throws Exception;
/**
* Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
* {@link ChannelPool#acquire(Promise)}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelAcquired(Channel ch) throws Exception;
/**
* Called once a new {@link Channel} is created in the {@link ChannelPool}.
*
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
void channelCreated(Channel ch) throws Exception;
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
/**
* Allows to map {@link ChannelPool} implementations to a specific key.
*
* @param <K> the type of the key
* @param <P> the type of the {@link ChannelPool}
*/
public interface ChannelPoolMap<K, P extends ChannelPool> {
/**
* Return the {@link ChannelPool} for the {@code code}. This will never return {@code null},
* but create a new {@link ChannelPool} if non exists for they requested {@code key}.
*
* Please note that {@code null} keys are not allowed.
*/
P get(K key);
/**
* Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}.
*
* Please note that {@code null} keys are not allowed.
*/
boolean contains(K key);
}

View File

@ -0,0 +1,371 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.OneTimeTask;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
* number of concurrent connections.
*/
public final class FixedChannelPool extends SimpleChannelPool {
private static final IllegalStateException FULL_EXCEPTION =
new IllegalStateException("Too many outstanding acquire operations");
private static final TimeoutException TIMEOUT_EXCEPTION =
new TimeoutException("Acquire operation took longer then configured maximum time");
static {
FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
TIMEOUT_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
public enum AcquireTimeoutAction {
/**
* Create a new connection when the timeout is detected.
*/
NEW,
/**
* Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
*/
FAIL
}
private final EventExecutor executor;
private final long acquireTimeoutNanos;
private final Runnable timeoutTask;
// There is no need to worry about synchronization as everything that modified the queue or counts is done
// by the above EventExecutor.
private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
private final int maxConnections;
private final int maxPendingAcquires;
private int acquiredChannelCount;
private int pendingAcquireCount;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to acquire
* a {@link Channel} will be delayed until a connection is returned to the pool again.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler, int maxConnections) {
this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
}
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
super(bootstrap, handler);
executor = bootstrap.group().next();
timeoutTask = null;
acquireTimeoutNanos = -1;
this.maxConnections = maxConnections;
this.maxPendingAcquires = maxPendingAcquires;
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healty when obtain from the {@link ChannelPool}
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
* the {@link AcquireTimeoutAction} takes place.
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to
* acquire a {@link Channel} will be delayed until a connection is returned to the
* pool again.
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
* be failed.
*/
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires) {
super(bootstrap, handler, healthCheck);
if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
}
if (maxPendingAcquires < 1) {
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
}
if (action == null && acquireTimeoutMillis == -1) {
timeoutTask = null;
acquireTimeoutNanos = -1;
} else if (action == null && acquireTimeoutMillis != -1) {
throw new NullPointerException("action");
} else if (action != null && acquireTimeoutMillis < 0) {
throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 1)");
} else {
acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
switch (action) {
case FAIL:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Fail the promise as we timed out.
task.promise.setFailure(TIMEOUT_EXCEPTION);
}
};
break;
case NEW:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Increment the acquire count and delegate to super to actually acquire a Channel which will
// create a new connetion.
++acquiredChannelCount;
FixedChannelPool.super.acquire(task.promise);
}
};
break;
default:
throw new Error();
}
}
executor = bootstrap.group().next();
this.maxConnections = maxConnections;
this.maxPendingAcquires = maxPendingAcquires;
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
try {
if (executor.inEventLoop()) {
acquire0(promise);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
if (acquiredChannelCount < maxConnections) {
++acquiredChannelCount;
assert acquiredChannelCount > 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop
Promise<Channel> p = executor.newPromise();
p.addListener(new AcquireListener(promise));
super.acquire(p);
} else {
if (pendingAcquireCount >= maxPendingAcquires) {
promise.setFailure(FULL_EXCEPTION);
} else {
AcquireTask task = new AcquireTask(promise);
if (pendingAcquireQueue.offer(task)) {
++pendingAcquireCount;
if (timeoutTask != null) {
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
}
} else {
promise.setFailure(FULL_EXCEPTION);
}
}
assert pendingAcquireCount > 0;
}
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
final Promise<Void> p = executor.newPromise();
super.release(channel, p.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
if (future.isSuccess()) {
decrementAndRunTaskQueue();
promise.setSuccess(null);
} else {
Throwable cause = future.cause();
// Check if the exception was not because of we passed the Channel to the wrong pool.
if (!(cause instanceof IllegalArgumentException)) {
decrementAndRunTaskQueue();
}
promise.setFailure(future.cause());
}
}
}));
return p;
}
private void decrementAndRunTaskQueue() {
--acquiredChannelCount;
// We should never have a negative value.
assert acquiredChannelCount >= 0;
// Run the pending acquire tasks before notify the original promise so if the user would
// try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
// maxPendingAcquires we may be able to run some pending tasks first and so allow to add
// more.
runTaskQueue();
}
private void runTaskQueue() {
while (acquiredChannelCount <= maxConnections) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
// Cancel the timeout if one was scheduled
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
--pendingAcquireCount;
++acquiredChannelCount;
super.acquire(task.promise);
}
// We should never have a negative value.
assert pendingAcquireCount >= 0;
assert acquiredChannelCount >= 0;
}
// AcquireTask extends AcquireListener to reduce object creations and so GC pressure
private final class AcquireTask extends AcquireListener {
final Promise<Channel> promise;
final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
ScheduledFuture<?> timeoutFuture;
public AcquireTask(Promise<Channel> promise) {
super(promise);
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop.
this.promise = executor.<Channel>newPromise().addListener(this);
}
}
private abstract class TimeoutTask implements Runnable {
@Override
public final void run() {
assert executor.inEventLoop();
long nanoTime = System.nanoTime();
for (;;) {
AcquireTask task = pendingAcquireQueue.peek();
// Compare nanoTime as descripted in the javadocs of System.nanoTime()
//
// See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
// See https://github.com/netty/netty/issues/3705
if (task == null || nanoTime - task.expireNanoTime < 0) {
break;
}
pendingAcquireQueue.remove();
--pendingAcquireCount;
onTimeout(task);
}
}
public abstract void onTimeout(AcquireTask task);
}
private class AcquireListener implements FutureListener<Channel> {
private final Promise<Channel> originalPromise;
AcquireListener(Promise<Channel> originalPromise) {
this.originalPromise = originalPromise;
}
@Override
public void operationComplete(Future<Channel> future) throws Exception {
assert executor.inEventLoop();
if (future.isSuccess()) {
originalPromise.setSuccess(future.getNow());
} else {
// Something went wrong try to run pending acquire tasks.
decrementAndRunTaskQueue();
originalPromise.setFailure(future.cause());
}
}
}
@Override
public void close() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
acquiredChannelCount = 0;
pendingAcquireCount = 0;
FixedChannelPool.super.close();
}
});
}
}

View File

@ -0,0 +1,278 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import java.util.Deque;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
* a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
*
* This implementation uses FIFO order for {@link Channel}s in the {@link ChannelPool}.
*
*/
public class SimpleChannelPool implements ChannelPool {
private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
private static final IllegalStateException FULL_EXCEPTION = new IllegalStateException("ChannelPool full");
static {
FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
private final ChannelPoolHandler handler;
private final ChannelHealthChecker healthCheck;
private final Bootstrap bootstrap;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
}
/**
* Creates a new instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
* still healty when obtain from the {@link ChannelPool}
*/
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
this.handler = checkNotNull(handler, "handler");
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
// Clone the original Bootstrap as we want to set our own handler
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
this.bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
assert ch.eventLoop().inEventLoop();
handler.channelCreated(ch);
}
});
}
@Override
public final Future<Channel> acquire() {
return acquire(bootstrap.group().next().<Channel>newPromise());
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
checkNotNull(promise, "promise");
try {
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
private static void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
if (future.isSuccess()) {
promise.setSuccess(future.channel());
} else {
promise.setFailure(future.cause());
}
}
private void doHealthCheck(final Channel ch, final Promise<Channel> promise) {
assert ch.eventLoop().inEventLoop();
Future<Boolean> f = healthCheck.isHealthy(ch);
if (f.isDone()) {
notifyHealthCheck(f, ch, promise);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
notifyHealthCheck(future, ch, promise);
}
});
}
}
private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) {
assert ch.eventLoop().inEventLoop();
if (future.isSuccess()) {
if (future.getNow() == Boolean.TRUE) {
try {
ch.attr(POOL_KEY).set(this);
handler.channelAcquired(ch);
promise.setSuccess(ch);
} catch (Throwable cause) {
closeAndFail(ch, cause, promise);
}
} else {
closeChannel(ch);
acquire(promise);
}
} else {
closeChannel(ch);
acquire(promise);
}
}
/**
* Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()},
* sub-classes may override this.
*
* The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
*/
protected ChannelFuture connectChannel(Bootstrap bs) {
return bs.connect();
}
@Override
public final Future<Void> release(Channel channel) {
return release(channel, channel.eventLoop().<Void>newPromise());
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
return promise;
}
private void doReleaseChannel(Channel channel, Promise<Void> promise) {
assert channel.eventLoop().inEventLoop();
// Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
if (channel.attr(POOL_KEY).getAndSet(null) != this) {
closeAndFail(channel,
// Better include a stracktrace here as this is an user error.
new IllegalArgumentException(
"Channel " + channel + " was not acquired from this ChannelPool"),
promise);
} else {
try {
if (offerChannel(channel)) {
handler.channelReleased(channel);
promise.setSuccess(null);
} else {
closeAndFail(channel, FULL_EXCEPTION, promise);
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
}
}
private static void closeChannel(Channel channel) {
channel.attr(POOL_KEY).getAndSet(null);
channel.close();
}
private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
closeChannel(channel);
promise.setFailure(cause);
}
/**
* Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
* {@link Channel} is ready to be reused.
*
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
* implementations of these methods needs to be thread-safe!
*/
protected Channel pollChannel() {
return deque.pollLast();
}
/**
* Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
* could be added, {@code false} otherwise.
*
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
* implementations of these methods needs to be thread-safe!
*/
protected boolean offerChannel(Channel channel) {
return deque.offer(channel);
}
@Override
public void close() {
for (;;) {
Channel channel = pollChannel();
if (channel == null) {
break;
}
channel.close();
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Implementations and API for {@link io.netty.channel.Channel} pools.
*/
package io.netty.channel.pool;

View File

@ -0,0 +1,75 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import org.junit.Test;
import static org.junit.Assert.*;
public class AbstractChannelPoolMapTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Test(expected = ChannelException.class)
public void testMap() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, SimpleChannelPool> poolMap =
new AbstractChannelPoolMap<EventLoop, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(EventLoop key) {
return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
SimpleChannelPool pool = poolMap.get(loop);
assertEquals(1, poolMap.size());
assertTrue(poolMap.iterator().hasNext());
assertSame(pool, poolMap.get(loop));
assertTrue(poolMap.remove(loop));
assertFalse(poolMap.remove(loop));
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
pool.acquire().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
// NOOP
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.channel.Channel;
import java.util.concurrent.atomic.AtomicInteger;
final class CountingChannelPoolHandler implements ChannelPoolHandler {
private final AtomicInteger channelCount = new AtomicInteger(0);
private final AtomicInteger acquiredCount = new AtomicInteger(0);
private final AtomicInteger releasedCount = new AtomicInteger(0);
@Override
public void channelCreated(Channel ch) {
channelCount.incrementAndGet();
}
@Override
public void channelReleased(Channel ch) {
releasedCount.incrementAndGet();
}
@Override
public void channelAcquired(Channel ch) {
acquiredCount.incrementAndGet();
}
public int channelCount() {
return channelCount.get();
}
public int acquiredCount() {
return acquiredCount.get();
}
public int releasedCount() {
return releasedCount.get();
}
}

View File

@ -0,0 +1,233 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
import io.netty.util.concurrent.Future;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
public class FixedChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Test
public void testAcquire() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
assertFalse(future.isDone());
pool.release(channel).syncUninterruptibly();
assertTrue(future.await(1, TimeUnit.SECONDS));
Channel channel2 = future.getNow();
assertSame(channel, channel2);
assertEquals(1, handler.channelCount());
assertEquals(1, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
@Test(expected = TimeoutException.class)
public void testAcquireTimeout() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
AcquireTimeoutAction.FAIL, 500, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
try {
future.syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
group.shutdownGracefully();
}
}
@Test
public void testAcquireNewConnection() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel, channel2);
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
@Test(expected = IllegalStateException.class)
public void testAcquireBoundQueue() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
Future<Channel> future = pool.acquire();
assertFalse(future.isDone());
try {
pool.acquire().syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
group.shutdownGracefully();
}
}
@Test(expected = IllegalArgumentException.class)
public void testReleaseDifferentPool() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1);
Channel channel = pool.acquire().syncUninterruptibly().getNow();
try {
pool2.release(channel).syncUninterruptibly();
} finally {
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
group.shutdownGracefully();
}
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
// NOOP
}
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import org.junit.Test;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.Assert.*;
public class SimpleChannelPoolTest {
private static final String LOCAL_ADDR_ID = "test.id";
@Test
public void testAcquire() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).sync().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel = pool.acquire().sync().getNow();
pool.release(channel).syncUninterruptibly();
Channel channel2 = pool.acquire().sync().getNow();
assertSame(channel, channel2);
assertEquals(1, handler.channelCount());
pool.release(channel2).syncUninterruptibly();
// Should fail on multiple release calls.
try {
pool.release(channel2).syncUninterruptibly();
fail();
} catch (IllegalArgumentException e) {
// expected
assertFalse(channel.isActive());
}
assertEquals(1, handler.acquiredCount());
assertEquals(2, handler.releasedCount());
sc.close().sync();
group.shutdownGracefully();
}
@Test
public void testBoundedChannelPoolSegment() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).sync().channel();
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) {
private final Queue<Channel> queue = new LinkedBlockingQueue<Channel>(1);
@Override
protected Channel pollChannel() {
return queue.poll();
}
@Override
protected boolean offerChannel(Channel ch) {
return queue.offer(ch);
}
};
Channel channel = pool.acquire().sync().getNow();
Channel channel2 = pool.acquire().sync().getNow();
pool.release(channel).syncUninterruptibly().getNow();
try {
pool.release(channel2).syncUninterruptibly();
fail();
} catch (IllegalStateException e) {
// expected
}
channel2.close().sync();
assertEquals(2, handler.channelCount());
assertEquals(0, handler.acquiredCount());
assertEquals(1, handler.releasedCount());
sc.close().sync();
channel.close().sync();
channel2.close().sync();
group.shutdownGracefully();
}
}