[#3218] Add ChannelPool / ChannelPoolMap abstraction and implementations
Motivation: Many projects need some kind a Channel/Connection pool implementation. While the protocols are different many things can be shared, so we should provide a generic API and implementation. Modifications: Add ChannelPool / ChannelPoolMap API and implementations. Result: Reusable / Generic pool implementation that users can use.
This commit is contained in:
parent
44d64ce039
commit
b50f2d5294
@ -29,13 +29,16 @@ import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
@ -430,6 +433,17 @@ public final class PlatformDependent {
|
||||
return PlatformDependent0.getSystemClassLoader();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new concurrent {@link Deque}.
|
||||
*/
|
||||
public static <C> Deque<C> newConcurrentDeque() {
|
||||
if (javaVersion() < 7) {
|
||||
return new LinkedBlockingDeque<C>();
|
||||
} else {
|
||||
return new ConcurrentLinkedDeque<C>();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isAndroid0() {
|
||||
boolean android;
|
||||
try {
|
||||
|
2
pom.xml
2
pom.xml
@ -927,6 +927,8 @@
|
||||
<!-- SSLSession implementation -->
|
||||
<ignore>javax.net.ssl.SSLEngine</ignore>
|
||||
<ignore>javax.net.ssl.X509ExtendedTrustManager</ignore>
|
||||
|
||||
<ignore>java.util.concurrent.ConcurrentLinkedDeque</ignore>
|
||||
</ignores>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
@ -46,7 +46,7 @@ import java.util.Map;
|
||||
*/
|
||||
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
|
||||
|
||||
private volatile EventLoopGroup group;
|
||||
volatile EventLoopGroup group;
|
||||
private volatile ChannelFactory<? extends C> channelFactory;
|
||||
private volatile SocketAddress localAddress;
|
||||
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
|
||||
|
@ -21,6 +21,7 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
@ -216,6 +217,17 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
|
||||
return new Bootstrap(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a deep clone of this bootstrap which has the identical configuration except that it uses
|
||||
* the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar
|
||||
* settings.
|
||||
*/
|
||||
public Bootstrap clone(EventLoopGroup group) {
|
||||
Bootstrap bs = new Bootstrap(this);
|
||||
bs.group = group;
|
||||
return bs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (remoteAddress == null) {
|
||||
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
|
||||
/**
|
||||
* A skeletal {@link ChannelPoolHandler} implementation.
|
||||
*/
|
||||
public abstract class AbstractChannelPoolHandler implements ChannelPoolHandler {
|
||||
|
||||
/**
|
||||
* NOOP implementation, sub-classes may override this.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void channelAcquired(@SuppressWarnings("unused") Channel ch) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* NOOP implementation, sub-classes may override this.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void channelReleased(@SuppressWarnings("unused") Channel ch) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.ReadOnlyIterator;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
/**
|
||||
* A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool}
|
||||
* the {@link Object#hashCode()} and {@link Object#equals(Object)} is used.
|
||||
*/
|
||||
public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
|
||||
implements ChannelPoolMap<K, P>, Iterable<Entry<K, P>>, Closeable {
|
||||
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();
|
||||
|
||||
@Override
|
||||
public final P get(K key) {
|
||||
P pool = map.get(checkNotNull(key, "key"));
|
||||
if (pool == null) {
|
||||
pool = newPool(key);
|
||||
P old = map.putIfAbsent(key, pool);
|
||||
if (old != null) {
|
||||
// We need to destroy the newly created pool as we not use it.
|
||||
pool.close();
|
||||
pool = old;
|
||||
}
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
/**
|
||||
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
|
||||
* {@code false} otherwise.
|
||||
*
|
||||
* Please note that {@code null} keys are not allowed.
|
||||
*/
|
||||
public final boolean remove(K key) {
|
||||
P pool = map.remove(checkNotNull(key, "key"));
|
||||
if (pool != null) {
|
||||
pool.close();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Iterator<Entry<K, P>> iterator() {
|
||||
return new ReadOnlyIterator<Entry<K, P>>(map.entrySet().iterator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}.
|
||||
*/
|
||||
public final int size() {
|
||||
return map.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}.
|
||||
*/
|
||||
public final boolean isEmpty() {
|
||||
return map.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean contains(K key) {
|
||||
return map.containsKey(checkNotNull(key, "key"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}.
|
||||
*/
|
||||
protected abstract P newPool(K key);
|
||||
|
||||
@Override
|
||||
public final void close() {
|
||||
for (K key: map.keySet()) {
|
||||
remove(key);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
* Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
|
||||
* {@link ChannelPool#acquire(Promise)}.
|
||||
*/
|
||||
public interface ChannelHealthChecker {
|
||||
|
||||
/**
|
||||
* {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
|
||||
*/
|
||||
ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
|
||||
@Override
|
||||
public Future<Boolean> isHealthy(Channel channel) {
|
||||
EventLoop loop = channel.eventLoop();
|
||||
return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
|
||||
* the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
|
||||
*
|
||||
* This method will be called by the {@link EventLoop} of the {@link Channel}.
|
||||
*/
|
||||
Future<Boolean> isHealthy(Channel channel);
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Allows to acquire and release {@link Channel} and so act as a pool of these.
|
||||
*/
|
||||
public interface ChannelPool extends Closeable {
|
||||
|
||||
/**
|
||||
* Acquire a {@link Channel} from this {@link ChannelPool}. The returned {@link Future} is notified once
|
||||
* the acquire is successful and failed otherwise.
|
||||
*/
|
||||
Future<Channel> acquire();
|
||||
|
||||
/**
|
||||
* Acquire a {@link Channel} from this {@link ChannelPool}. The given {@link Promise} is notified once
|
||||
* the acquire is successful and failed otherwise.
|
||||
*/
|
||||
Future<Channel> acquire(Promise<Channel> promise);
|
||||
|
||||
/**
|
||||
* Release a {@link Channel} back to this {@link ChannelPool}. The returned {@link Future} is notified once
|
||||
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
|
||||
*/
|
||||
Future<Void> release(Channel channel);
|
||||
|
||||
/**
|
||||
* Release a {@link Channel} back to this {@link ChannelPool}. The given {@link Promise} is notified once
|
||||
* the release is successful and failed otherwise. When failed the {@link Channel} will automatically closed.
|
||||
*/
|
||||
Future<Void> release(Channel channel, Promise<Void> promise);
|
||||
|
||||
@Override
|
||||
void close();
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
* Handler which is called for various actions done by the {@link ChannelPool}.
|
||||
*/
|
||||
public interface ChannelPoolHandler {
|
||||
/**
|
||||
* Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
|
||||
* {@link ChannelPool#release(Channel, Promise)}.
|
||||
*
|
||||
* This method will be called by the {@link EventLoop} of the {@link Channel}.
|
||||
*/
|
||||
void channelReleased(Channel ch) throws Exception;
|
||||
|
||||
/**
|
||||
* Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
|
||||
* {@link ChannelPool#acquire(Promise)}.
|
||||
*
|
||||
* This method will be called by the {@link EventLoop} of the {@link Channel}.
|
||||
*/
|
||||
void channelAcquired(Channel ch) throws Exception;
|
||||
|
||||
/**
|
||||
* Called once a new {@link Channel} is created in the {@link ChannelPool}.
|
||||
*
|
||||
* This method will be called by the {@link EventLoop} of the {@link Channel}.
|
||||
*/
|
||||
void channelCreated(Channel ch) throws Exception;
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
/**
|
||||
* Allows to map {@link ChannelPool} implementations to a specific key.
|
||||
*
|
||||
* @param <K> the type of the key
|
||||
* @param <P> the type of the {@link ChannelPool}
|
||||
*/
|
||||
public interface ChannelPoolMap<K, P extends ChannelPool> {
|
||||
/**
|
||||
* Return the {@link ChannelPool} for the {@code code}. This will never return {@code null},
|
||||
* but create a new {@link ChannelPool} if non exists for they requested {@code key}.
|
||||
*
|
||||
* Please note that {@code null} keys are not allowed.
|
||||
*/
|
||||
P get(K key);
|
||||
|
||||
/**
|
||||
* Returns {@code true} if a {@link ChannelPool} exists for the given {@code key}.
|
||||
*
|
||||
* Please note that {@code null} keys are not allowed.
|
||||
*/
|
||||
boolean contains(K key);
|
||||
}
|
@ -0,0 +1,371 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.EmptyArrays;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
|
||||
* number of concurrent connections.
|
||||
*/
|
||||
public final class FixedChannelPool extends SimpleChannelPool {
|
||||
private static final IllegalStateException FULL_EXCEPTION =
|
||||
new IllegalStateException("Too many outstanding acquire operations");
|
||||
private static final TimeoutException TIMEOUT_EXCEPTION =
|
||||
new TimeoutException("Acquire operation took longer then configured maximum time");
|
||||
|
||||
static {
|
||||
FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||
TIMEOUT_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||
}
|
||||
|
||||
public enum AcquireTimeoutAction {
|
||||
/**
|
||||
* Create a new connection when the timeout is detected.
|
||||
*/
|
||||
NEW,
|
||||
|
||||
/**
|
||||
* Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
|
||||
*/
|
||||
FAIL
|
||||
}
|
||||
|
||||
private final EventExecutor executor;
|
||||
private final long acquireTimeoutNanos;
|
||||
private final Runnable timeoutTask;
|
||||
|
||||
// There is no need to worry about synchronization as everything that modified the queue or counts is done
|
||||
// by the above EventExecutor.
|
||||
private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
|
||||
private final int maxConnections;
|
||||
private final int maxPendingAcquires;
|
||||
private int acquiredChannelCount;
|
||||
private int pendingAcquireCount;
|
||||
|
||||
/**
|
||||
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
|
||||
*
|
||||
* @param bootstrap the {@link Bootstrap} that is used for connections
|
||||
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
|
||||
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to acquire
|
||||
* a {@link Channel} will be delayed until a connection is returned to the pool again.
|
||||
*/
|
||||
public FixedChannelPool(Bootstrap bootstrap,
|
||||
ChannelPoolHandler handler, int maxConnections) {
|
||||
this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
|
||||
*
|
||||
* @param bootstrap the {@link Bootstrap} that is used for connections
|
||||
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
|
||||
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to
|
||||
* acquire a {@link Channel} will be delayed until a connection is returned to the
|
||||
* pool again.
|
||||
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
|
||||
* be failed.
|
||||
*/
|
||||
public FixedChannelPool(Bootstrap bootstrap,
|
||||
ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
|
||||
super(bootstrap, handler);
|
||||
executor = bootstrap.group().next();
|
||||
timeoutTask = null;
|
||||
acquireTimeoutNanos = -1;
|
||||
this.maxConnections = maxConnections;
|
||||
this.maxPendingAcquires = maxPendingAcquires;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param bootstrap the {@link Bootstrap} that is used for connections
|
||||
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
|
||||
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
|
||||
* still healty when obtain from the {@link ChannelPool}
|
||||
* @param action the {@link AcquireTimeoutAction} to use or {@code null} if non should be used.
|
||||
* In this case {@param acquireTimeoutMillis} must be {@code -1}.
|
||||
* @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or
|
||||
* the {@link AcquireTimeoutAction} takes place.
|
||||
* @param maxConnections the numnber of maximal active connections, once this is reached new tries to
|
||||
* acquire a {@link Channel} will be delayed until a connection is returned to the
|
||||
* pool again.
|
||||
* @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will
|
||||
* be failed.
|
||||
*/
|
||||
public FixedChannelPool(Bootstrap bootstrap,
|
||||
ChannelPoolHandler handler,
|
||||
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
|
||||
final long acquireTimeoutMillis,
|
||||
int maxConnections, int maxPendingAcquires) {
|
||||
super(bootstrap, handler, healthCheck);
|
||||
if (maxConnections < 1) {
|
||||
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
|
||||
}
|
||||
if (maxPendingAcquires < 1) {
|
||||
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
|
||||
}
|
||||
if (action == null && acquireTimeoutMillis == -1) {
|
||||
timeoutTask = null;
|
||||
acquireTimeoutNanos = -1;
|
||||
} else if (action == null && acquireTimeoutMillis != -1) {
|
||||
throw new NullPointerException("action");
|
||||
} else if (action != null && acquireTimeoutMillis < 0) {
|
||||
throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 1)");
|
||||
} else {
|
||||
acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
|
||||
switch (action) {
|
||||
case FAIL:
|
||||
timeoutTask = new TimeoutTask() {
|
||||
@Override
|
||||
public void onTimeout(AcquireTask task) {
|
||||
// Fail the promise as we timed out.
|
||||
task.promise.setFailure(TIMEOUT_EXCEPTION);
|
||||
}
|
||||
};
|
||||
break;
|
||||
case NEW:
|
||||
timeoutTask = new TimeoutTask() {
|
||||
@Override
|
||||
public void onTimeout(AcquireTask task) {
|
||||
// Increment the acquire count and delegate to super to actually acquire a Channel which will
|
||||
// create a new connetion.
|
||||
++acquiredChannelCount;
|
||||
|
||||
FixedChannelPool.super.acquire(task.promise);
|
||||
}
|
||||
};
|
||||
break;
|
||||
default:
|
||||
throw new Error();
|
||||
}
|
||||
}
|
||||
executor = bootstrap.group().next();
|
||||
this.maxConnections = maxConnections;
|
||||
this.maxPendingAcquires = maxPendingAcquires;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Channel> acquire(final Promise<Channel> promise) {
|
||||
try {
|
||||
if (executor.inEventLoop()) {
|
||||
acquire0(promise);
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
acquire0(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void acquire0(final Promise<Channel> promise) {
|
||||
assert executor.inEventLoop();
|
||||
|
||||
if (acquiredChannelCount < maxConnections) {
|
||||
++acquiredChannelCount;
|
||||
|
||||
assert acquiredChannelCount > 0;
|
||||
|
||||
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
||||
// EventLoop
|
||||
Promise<Channel> p = executor.newPromise();
|
||||
p.addListener(new AcquireListener(promise));
|
||||
super.acquire(p);
|
||||
} else {
|
||||
if (pendingAcquireCount >= maxPendingAcquires) {
|
||||
promise.setFailure(FULL_EXCEPTION);
|
||||
} else {
|
||||
AcquireTask task = new AcquireTask(promise);
|
||||
if (pendingAcquireQueue.offer(task)) {
|
||||
++pendingAcquireCount;
|
||||
|
||||
if (timeoutTask != null) {
|
||||
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
} else {
|
||||
promise.setFailure(FULL_EXCEPTION);
|
||||
}
|
||||
}
|
||||
|
||||
assert pendingAcquireCount > 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
|
||||
final Promise<Void> p = executor.newPromise();
|
||||
super.release(channel, p.addListener(new FutureListener<Void>() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Void> future) throws Exception {
|
||||
assert executor.inEventLoop();
|
||||
|
||||
if (future.isSuccess()) {
|
||||
decrementAndRunTaskQueue();
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
Throwable cause = future.cause();
|
||||
// Check if the exception was not because of we passed the Channel to the wrong pool.
|
||||
if (!(cause instanceof IllegalArgumentException)) {
|
||||
decrementAndRunTaskQueue();
|
||||
}
|
||||
promise.setFailure(future.cause());
|
||||
}
|
||||
}
|
||||
}));
|
||||
return p;
|
||||
}
|
||||
|
||||
private void decrementAndRunTaskQueue() {
|
||||
--acquiredChannelCount;
|
||||
|
||||
// We should never have a negative value.
|
||||
assert acquiredChannelCount >= 0;
|
||||
|
||||
// Run the pending acquire tasks before notify the original promise so if the user would
|
||||
// try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
|
||||
// maxPendingAcquires we may be able to run some pending tasks first and so allow to add
|
||||
// more.
|
||||
runTaskQueue();
|
||||
}
|
||||
|
||||
private void runTaskQueue() {
|
||||
while (acquiredChannelCount <= maxConnections) {
|
||||
AcquireTask task = pendingAcquireQueue.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Cancel the timeout if one was scheduled
|
||||
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
|
||||
if (timeoutFuture != null) {
|
||||
timeoutFuture.cancel(false);
|
||||
}
|
||||
|
||||
--pendingAcquireCount;
|
||||
++acquiredChannelCount;
|
||||
|
||||
super.acquire(task.promise);
|
||||
}
|
||||
|
||||
// We should never have a negative value.
|
||||
assert pendingAcquireCount >= 0;
|
||||
assert acquiredChannelCount >= 0;
|
||||
}
|
||||
|
||||
// AcquireTask extends AcquireListener to reduce object creations and so GC pressure
|
||||
private final class AcquireTask extends AcquireListener {
|
||||
final Promise<Channel> promise;
|
||||
final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
|
||||
ScheduledFuture<?> timeoutFuture;
|
||||
|
||||
public AcquireTask(Promise<Channel> promise) {
|
||||
super(promise);
|
||||
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
||||
// EventLoop.
|
||||
this.promise = executor.<Channel>newPromise().addListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class TimeoutTask implements Runnable {
|
||||
@Override
|
||||
public final void run() {
|
||||
assert executor.inEventLoop();
|
||||
long nanoTime = System.nanoTime();
|
||||
for (;;) {
|
||||
AcquireTask task = pendingAcquireQueue.peek();
|
||||
// Compare nanoTime as descripted in the javadocs of System.nanoTime()
|
||||
//
|
||||
// See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
|
||||
// See https://github.com/netty/netty/issues/3705
|
||||
if (task == null || nanoTime - task.expireNanoTime < 0) {
|
||||
break;
|
||||
}
|
||||
pendingAcquireQueue.remove();
|
||||
|
||||
--pendingAcquireCount;
|
||||
onTimeout(task);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void onTimeout(AcquireTask task);
|
||||
}
|
||||
|
||||
private class AcquireListener implements FutureListener<Channel> {
|
||||
private final Promise<Channel> originalPromise;
|
||||
|
||||
AcquireListener(Promise<Channel> originalPromise) {
|
||||
this.originalPromise = originalPromise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Channel> future) throws Exception {
|
||||
assert executor.inEventLoop();
|
||||
|
||||
if (future.isSuccess()) {
|
||||
originalPromise.setSuccess(future.getNow());
|
||||
} else {
|
||||
// Something went wrong try to run pending acquire tasks.
|
||||
decrementAndRunTaskQueue();
|
||||
originalPromise.setFailure(future.cause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (;;) {
|
||||
AcquireTask task = pendingAcquireQueue.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
ScheduledFuture<?> f = task.timeoutFuture;
|
||||
if (f != null) {
|
||||
f.cancel(false);
|
||||
}
|
||||
task.promise.setFailure(new ClosedChannelException());
|
||||
}
|
||||
acquiredChannelCount = 0;
|
||||
pendingAcquireCount = 0;
|
||||
FixedChannelPool.super.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,278 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.EmptyArrays;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.util.Deque;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
/**
|
||||
* Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
|
||||
* a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
|
||||
*
|
||||
* This implementation uses FIFO order for {@link Channel}s in the {@link ChannelPool}.
|
||||
*
|
||||
*/
|
||||
public class SimpleChannelPool implements ChannelPool {
|
||||
private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
|
||||
private static final IllegalStateException FULL_EXCEPTION = new IllegalStateException("ChannelPool full");
|
||||
static {
|
||||
FULL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||
}
|
||||
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
|
||||
private final ChannelPoolHandler handler;
|
||||
private final ChannelHealthChecker healthCheck;
|
||||
private final Bootstrap bootstrap;
|
||||
|
||||
/**
|
||||
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
|
||||
*
|
||||
* @param bootstrap the {@link Bootstrap} that is used for connections
|
||||
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
|
||||
*/
|
||||
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
|
||||
this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param bootstrap the {@link Bootstrap} that is used for connections
|
||||
* @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions
|
||||
* @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
|
||||
* still healty when obtain from the {@link ChannelPool}
|
||||
*/
|
||||
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
|
||||
this.handler = checkNotNull(handler, "handler");
|
||||
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
|
||||
// Clone the original Bootstrap as we want to set our own handler
|
||||
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
|
||||
this.bootstrap.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
assert ch.eventLoop().inEventLoop();
|
||||
handler.channelCreated(ch);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Future<Channel> acquire() {
|
||||
return acquire(bootstrap.group().next().<Channel>newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Channel> acquire(final Promise<Channel> promise) {
|
||||
checkNotNull(promise, "promise");
|
||||
try {
|
||||
final Channel ch = pollChannel();
|
||||
if (ch == null) {
|
||||
// No Channel left in the pool bootstrap a new Channel
|
||||
Bootstrap bs = bootstrap.clone();
|
||||
bs.attr(POOL_KEY, this);
|
||||
ChannelFuture f = connectChannel(bs);
|
||||
if (f.isDone()) {
|
||||
notifyConnect(f, promise);
|
||||
} else {
|
||||
f.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
notifyConnect(future, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
EventLoop loop = ch.eventLoop();
|
||||
if (loop.inEventLoop()) {
|
||||
doHealthCheck(ch, promise);
|
||||
} else {
|
||||
loop.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
doHealthCheck(ch, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
private static void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
|
||||
if (future.isSuccess()) {
|
||||
promise.setSuccess(future.channel());
|
||||
} else {
|
||||
promise.setFailure(future.cause());
|
||||
}
|
||||
}
|
||||
|
||||
private void doHealthCheck(final Channel ch, final Promise<Channel> promise) {
|
||||
assert ch.eventLoop().inEventLoop();
|
||||
|
||||
Future<Boolean> f = healthCheck.isHealthy(ch);
|
||||
if (f.isDone()) {
|
||||
notifyHealthCheck(f, ch, promise);
|
||||
} else {
|
||||
f.addListener(new FutureListener<Boolean>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Boolean> future) throws Exception {
|
||||
notifyHealthCheck(future, ch, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyHealthCheck(Future<Boolean> future, Channel ch, Promise<Channel> promise) {
|
||||
assert ch.eventLoop().inEventLoop();
|
||||
|
||||
if (future.isSuccess()) {
|
||||
if (future.getNow() == Boolean.TRUE) {
|
||||
try {
|
||||
ch.attr(POOL_KEY).set(this);
|
||||
handler.channelAcquired(ch);
|
||||
promise.setSuccess(ch);
|
||||
} catch (Throwable cause) {
|
||||
closeAndFail(ch, cause, promise);
|
||||
}
|
||||
} else {
|
||||
closeChannel(ch);
|
||||
acquire(promise);
|
||||
}
|
||||
} else {
|
||||
closeChannel(ch);
|
||||
acquire(promise);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()},
|
||||
* sub-classes may override this.
|
||||
*
|
||||
* The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
|
||||
*/
|
||||
protected ChannelFuture connectChannel(Bootstrap bs) {
|
||||
return bs.connect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Future<Void> release(Channel channel) {
|
||||
return release(channel, channel.eventLoop().<Void>newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
|
||||
checkNotNull(channel, "channel");
|
||||
checkNotNull(promise, "promise");
|
||||
try {
|
||||
EventLoop loop = channel.eventLoop();
|
||||
if (loop.inEventLoop()) {
|
||||
doReleaseChannel(channel, promise);
|
||||
} else {
|
||||
loop.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
doReleaseChannel(channel, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
closeAndFail(channel, cause, promise);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void doReleaseChannel(Channel channel, Promise<Void> promise) {
|
||||
assert channel.eventLoop().inEventLoop();
|
||||
// Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
|
||||
if (channel.attr(POOL_KEY).getAndSet(null) != this) {
|
||||
closeAndFail(channel,
|
||||
// Better include a stracktrace here as this is an user error.
|
||||
new IllegalArgumentException(
|
||||
"Channel " + channel + " was not acquired from this ChannelPool"),
|
||||
promise);
|
||||
} else {
|
||||
try {
|
||||
if (offerChannel(channel)) {
|
||||
handler.channelReleased(channel);
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
closeAndFail(channel, FULL_EXCEPTION, promise);
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
closeAndFail(channel, cause, promise);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void closeChannel(Channel channel) {
|
||||
channel.attr(POOL_KEY).getAndSet(null);
|
||||
channel.close();
|
||||
}
|
||||
|
||||
private static void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
|
||||
closeChannel(channel);
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
|
||||
* {@link Channel} is ready to be reused.
|
||||
*
|
||||
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
|
||||
* implementations of these methods needs to be thread-safe!
|
||||
*/
|
||||
protected Channel pollChannel() {
|
||||
return deque.pollLast();
|
||||
}
|
||||
|
||||
/**
|
||||
* Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
|
||||
* could be added, {@code false} otherwise.
|
||||
*
|
||||
* Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
|
||||
* implementations of these methods needs to be thread-safe!
|
||||
*/
|
||||
protected boolean offerChannel(Channel channel) {
|
||||
return deque.offer(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (;;) {
|
||||
Channel channel = pollChannel();
|
||||
if (channel == null) {
|
||||
break;
|
||||
}
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Implementations and API for {@link io.netty.channel.Channel} pools.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalEventLoopGroup;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class AbstractChannelPoolMapTest {
|
||||
private static final String LOCAL_ADDR_ID = "test.id";
|
||||
|
||||
@Test(expected = ChannelException.class)
|
||||
public void testMap() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
final Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
AbstractChannelPoolMap<EventLoop, SimpleChannelPool> poolMap =
|
||||
new AbstractChannelPoolMap<EventLoop, SimpleChannelPool>() {
|
||||
@Override
|
||||
protected SimpleChannelPool newPool(EventLoop key) {
|
||||
return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler());
|
||||
}
|
||||
};
|
||||
|
||||
EventLoop loop = group.next();
|
||||
|
||||
assertFalse(poolMap.iterator().hasNext());
|
||||
assertEquals(0, poolMap.size());
|
||||
|
||||
SimpleChannelPool pool = poolMap.get(loop);
|
||||
assertEquals(1, poolMap.size());
|
||||
assertTrue(poolMap.iterator().hasNext());
|
||||
|
||||
assertSame(pool, poolMap.get(loop));
|
||||
assertTrue(poolMap.remove(loop));
|
||||
assertFalse(poolMap.remove(loop));
|
||||
|
||||
assertFalse(poolMap.iterator().hasNext());
|
||||
assertEquals(0, poolMap.size());
|
||||
|
||||
pool.acquire().syncUninterruptibly();
|
||||
}
|
||||
|
||||
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
|
||||
@Override
|
||||
public void channelCreated(Channel ch) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
final class CountingChannelPoolHandler implements ChannelPoolHandler {
|
||||
private final AtomicInteger channelCount = new AtomicInteger(0);
|
||||
private final AtomicInteger acquiredCount = new AtomicInteger(0);
|
||||
private final AtomicInteger releasedCount = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void channelCreated(Channel ch) {
|
||||
channelCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReleased(Channel ch) {
|
||||
releasedCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelAcquired(Channel ch) {
|
||||
acquiredCount.incrementAndGet();
|
||||
}
|
||||
|
||||
public int channelCount() {
|
||||
return channelCount.get();
|
||||
}
|
||||
|
||||
public int acquiredCount() {
|
||||
return acquiredCount.get();
|
||||
}
|
||||
|
||||
public int releasedCount() {
|
||||
return releasedCount.get();
|
||||
}
|
||||
}
|
@ -0,0 +1,233 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalEventLoopGroup;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class FixedChannelPoolTest {
|
||||
private static final String LOCAL_ADDR_ID = "test.id";
|
||||
|
||||
@Test
|
||||
public void testAcquire() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
|
||||
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
|
||||
|
||||
ChannelPool pool = new FixedChannelPool(cb, handler, 1, Integer.MAX_VALUE);
|
||||
|
||||
Channel channel = pool.acquire().syncUninterruptibly().getNow();
|
||||
Future<Channel> future = pool.acquire();
|
||||
assertFalse(future.isDone());
|
||||
|
||||
pool.release(channel).syncUninterruptibly();
|
||||
assertTrue(future.await(1, TimeUnit.SECONDS));
|
||||
|
||||
Channel channel2 = future.getNow();
|
||||
assertSame(channel, channel2);
|
||||
assertEquals(1, handler.channelCount());
|
||||
|
||||
assertEquals(1, handler.acquiredCount());
|
||||
assertEquals(1, handler.releasedCount());
|
||||
|
||||
sc.close().syncUninterruptibly();
|
||||
channel2.close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
@Test(expected = TimeoutException.class)
|
||||
public void testAcquireTimeout() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
|
||||
ChannelPoolHandler handler = new TestChannelPoolHandler();
|
||||
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
|
||||
AcquireTimeoutAction.FAIL, 500, 1, Integer.MAX_VALUE);
|
||||
|
||||
Channel channel = pool.acquire().syncUninterruptibly().getNow();
|
||||
Future<Channel> future = pool.acquire();
|
||||
try {
|
||||
future.syncUninterruptibly();
|
||||
} finally {
|
||||
sc.close().syncUninterruptibly();
|
||||
channel.close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcquireNewConnection() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
|
||||
ChannelPoolHandler handler = new TestChannelPoolHandler();
|
||||
ChannelPool pool = new FixedChannelPool(cb, handler, ChannelHealthChecker.ACTIVE,
|
||||
AcquireTimeoutAction.NEW, 500, 1, Integer.MAX_VALUE);
|
||||
|
||||
Channel channel = pool.acquire().syncUninterruptibly().getNow();
|
||||
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
|
||||
assertNotSame(channel, channel2);
|
||||
sc.close().syncUninterruptibly();
|
||||
channel.close().syncUninterruptibly();
|
||||
channel2.close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testAcquireBoundQueue() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
|
||||
ChannelPoolHandler handler = new TestChannelPoolHandler();
|
||||
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
|
||||
|
||||
Channel channel = pool.acquire().syncUninterruptibly().getNow();
|
||||
Future<Channel> future = pool.acquire();
|
||||
assertFalse(future.isDone());
|
||||
|
||||
try {
|
||||
pool.acquire().syncUninterruptibly();
|
||||
} finally {
|
||||
sc.close().syncUninterruptibly();
|
||||
channel.close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testReleaseDifferentPool() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
|
||||
ChannelPoolHandler handler = new TestChannelPoolHandler();
|
||||
ChannelPool pool = new FixedChannelPool(cb, handler, 1, 1);
|
||||
ChannelPool pool2 = new FixedChannelPool(cb, handler, 1, 1);
|
||||
|
||||
Channel channel = pool.acquire().syncUninterruptibly().getNow();
|
||||
|
||||
try {
|
||||
pool2.release(channel).syncUninterruptibly();
|
||||
} finally {
|
||||
sc.close().syncUninterruptibly();
|
||||
channel.close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
|
||||
@Override
|
||||
public void channelCreated(Channel ch) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalEventLoopGroup;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class SimpleChannelPoolTest {
|
||||
private static final String LOCAL_ADDR_ID = "test.id";
|
||||
|
||||
@Test
|
||||
public void testAcquire() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).sync().channel();
|
||||
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
|
||||
|
||||
ChannelPool pool = new SimpleChannelPool(cb, handler);
|
||||
|
||||
Channel channel = pool.acquire().sync().getNow();
|
||||
|
||||
pool.release(channel).syncUninterruptibly();
|
||||
|
||||
Channel channel2 = pool.acquire().sync().getNow();
|
||||
assertSame(channel, channel2);
|
||||
assertEquals(1, handler.channelCount());
|
||||
pool.release(channel2).syncUninterruptibly();
|
||||
|
||||
// Should fail on multiple release calls.
|
||||
try {
|
||||
pool.release(channel2).syncUninterruptibly();
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected
|
||||
assertFalse(channel.isActive());
|
||||
}
|
||||
|
||||
assertEquals(1, handler.acquiredCount());
|
||||
assertEquals(2, handler.releasedCount());
|
||||
|
||||
sc.close().sync();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBoundedChannelPoolSegment() throws Exception {
|
||||
EventLoopGroup group = new LocalEventLoopGroup();
|
||||
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
|
||||
Bootstrap cb = new Bootstrap();
|
||||
cb.remoteAddress(addr);
|
||||
cb.group(group)
|
||||
.channel(LocalChannel.class);
|
||||
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
|
||||
}
|
||||
});
|
||||
|
||||
// Start server
|
||||
Channel sc = sb.bind(addr).sync().channel();
|
||||
CountingChannelPoolHandler handler = new CountingChannelPoolHandler();
|
||||
|
||||
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE) {
|
||||
private final Queue<Channel> queue = new LinkedBlockingQueue<Channel>(1);
|
||||
|
||||
@Override
|
||||
protected Channel pollChannel() {
|
||||
return queue.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean offerChannel(Channel ch) {
|
||||
return queue.offer(ch);
|
||||
}
|
||||
};
|
||||
|
||||
Channel channel = pool.acquire().sync().getNow();
|
||||
Channel channel2 = pool.acquire().sync().getNow();
|
||||
|
||||
pool.release(channel).syncUninterruptibly().getNow();
|
||||
try {
|
||||
pool.release(channel2).syncUninterruptibly();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
// expected
|
||||
}
|
||||
channel2.close().sync();
|
||||
|
||||
assertEquals(2, handler.channelCount());
|
||||
assertEquals(0, handler.acquiredCount());
|
||||
assertEquals(1, handler.releasedCount());
|
||||
sc.close().sync();
|
||||
channel.close().sync();
|
||||
channel2.close().sync();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user