Backport #111 - Improve org.jboss.netty.execution for more flexible thread model

This commit is contained in:
Trustin Lee 2012-01-13 20:36:45 +09:00
parent 009300fad3
commit 9c4cd3702f
11 changed files with 340 additions and 86 deletions

View File

@ -0,0 +1,83 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
private final Executor cur;
private final Executor next;
private final ChannelEventRunnableFilter filter;
/**
* Create a new {@link ChainedExecutor} which will used the given {@link ChannelEventRunnableFilter} to see if the {@link #cur} {@link Executor} should get used.
* Otherwise it will pass the work to the {@link #next} {@link Executor}
*
* @param filter the {@link ChannelEventRunnableFilter} which will be used to check if the {@link ChannelEventRunnable} should be passed to the cur or next {@link Executor}
* @param cur the {@link Executor} to use if the {@link ChannelEventRunnableFilter} match
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
this.filter = filter;
this.cur = cur;
this.next = next;
}
/**
* Execute the passed {@link ChannelEventRunnable} with the current {@link Executor} if the {@link ChannelEventRunnableFilter} match.
* Otherwise pass it to the next {@link Executor} in the chain.
*/
public void execute(Runnable command) {
assert command instanceof ChannelEventRunnable;
if (filter.filter((ChannelEventRunnable) command)) {
cur.execute(command);
} else {
next.execute(command);
}
}
public void releaseExternalResources() {
ExecutorUtil.terminate(cur, next);
releaseExternal(cur);
releaseExternal(next);
}
private static void releaseExternal(Executor executor) {
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
/**
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} downstream.
*/
public class ChannelDownstreamEventRunnable extends ChannelEventRunnable {
public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);
}
/**
* Send the {@link ChannelEvent} downstream
*/
public void run() {
ctx.sendDownstream(e);
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} implementation which matches {@link ChannelDownstreamEventRunnable}
*
*/
public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -1,69 +1,55 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.EstimatableObjectWrapper;
/**
* a {@link Runnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it is used by
* {@link Executor} implementors only
*/
public class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
private final ChannelHandlerContext ctx;
private final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
/**
* Sends the event upstream.
*/
public void run() {
ctx.sendUpstream(e);
}
public Object unwrap() {
return e;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.EstimatableObjectWrapper;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
public Object unwrap() {
return e;
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
public interface ChannelEventRunnableFilter {
/**
* Return <code>true</code> if the {@link ChannelEventRunnable} should get handled by the {@link Executor}
*
*/
boolean filter(ChannelEventRunnable event);
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
/**
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it is used by
* {@link Executor} implementers only
*/
public class ChannelUpstreamEventRunnable extends ChannelEventRunnable {
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);
}
/**
* Sends the event upstream.
*/
public void run() {
ctx.sendUpstream(e);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -22,6 +22,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -29,7 +30,6 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
@ -109,16 +109,26 @@ import org.jboss.netty.util.internal.ExecutorUtil;
public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
private final Executor executor;
private final boolean handleDownstream;
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor) {
this(executor, false);
}
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor, boolean handleDownstream) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
this.handleDownstream = handleDownstream;
}
/**
@ -133,16 +143,34 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
* and wait for its termination.
*/
public void releaseExternalResources() {
ExecutorUtil.terminate(getExecutor());
Executor executor = getExecutor();
ExecutorUtil.terminate(executor);
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
executor.execute(new ChannelEventRunnable(context, e));
executor.execute(new ChannelUpstreamEventRunnable(context, e));
}
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
if (handleDownstream) {
executor.execute(new ChannelDownstreamEventRunnable(ctx, e));
} else {
ctx.sendDownstream(e);
}
}
}
/**
* Handle suspended reads
*/
protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
@ -154,11 +182,11 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
return true;
}
}
}
ctx.sendDownstream(e);
return false;
}
}

View File

@ -332,6 +332,9 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
if (command instanceof ChannelDownstreamEventRunnable) {
throw new RejectedExecutionException("command must be enclosed with an upstream event.");
}
if (!(command instanceof ChannelEventRunnable)) {
command = new MemoryAwareRunnable(command);
}
@ -468,8 +471,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* make sure important tasks are not counted.
*/
protected boolean shouldCount(Runnable task) {
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable r = (ChannelEventRunnable) task;
if (task instanceof ChannelUpstreamEventRunnable) {
ChannelUpstreamEventRunnable r = (ChannelUpstreamEventRunnable) task;
ChannelEvent e = r.getEvent();
if (e instanceof WriteCompletionEvent) {
return false;
@ -494,10 +497,6 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
@ -530,7 +529,6 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
private int waiters;
Limiter(long limit) {
super();
this.limit = limit;
}
@ -540,7 +538,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
try {
wait();
} catch (InterruptedException e) {
// Ignore
Thread.currentThread().interrupt();
} finally {
waiters --;
}

View File

@ -283,9 +283,6 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean();
ChildExecutor() {
}
public void execute(Runnable command) {
// TODO: What todo if the add return false ?
tasks.add(command);

View File

@ -21,6 +21,6 @@
* @apiviz.exclude ^java\.lang\.
* @apiviz.exclude \.netty\.channel\.
* @apiviz.exclude \.ExternalResourceReleasable$
* @apiviz.exclude \.ChannelEventRunnable$
* @apiviz.exclude \.Channel[A-Za-z]*EventRunnable[A-Za-z]*$
*/
package org.jboss.netty.handler.execution;