Merge seda implementation with "generic" Executor. See #111

This commit is contained in:
norman 2011-12-15 09:34:30 +01:00
parent a6f5985ace
commit af730c11f9
12 changed files with 135 additions and 738 deletions

View File

@ -13,17 +13,16 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.execution.ChannelEventRunnable;
/**
* A {@link Runnable} which sends the specified {@link ChannelEvent} downstream.
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} downstream.
*/
public class ChannelDownstreamEventRunnable extends ChannelEventRunnable{
public final class ChannelDownstreamEventRunnable extends ChannelEventRunnable{
public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);

View File

@ -1,71 +1,56 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.EstimatableObjectWrapper;
/**
* a {@link Runnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it is used by
* {@link Executor} implementers only
*/
public class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
/**
* Sends the event upstream.
*/
@Override
public void run() {
ctx.sendUpstream(e);
}
@Override
public Object unwrap() {
return e;
}
}
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.EstimatableObjectWrapper;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper{
protected final ChannelHandlerContext ctx;
protected final ChannelEvent e;
int estimatedSize;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
@Override
public Object unwrap() {
return e;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
/**
* A {@link ChannelEventRunnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it is used by
* {@link Executor} implementers only
*/
public final class ChannelUpstreamEventRunnable extends ChannelEventRunnable {
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
super(ctx, e);
}
/**
* Sends the event upstream.
*/
@Override
public void run() {
ctx.sendUpstream(e);
}
}

View File

@ -30,7 +30,6 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.execution.seda.SedaExecutor;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
@ -110,16 +109,22 @@ import io.netty.util.internal.ExecutorUtil;
public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
private final Executor executor;
private final boolean handleDownstream;
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor) {
this(executor, false);
}
public ExecutionHandler(Executor executor, boolean handleDownstream) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
this.handleDownstream = handleDownstream;
}
/**
@ -135,13 +140,16 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
*/
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(getExecutor());
ExecutorUtil.terminate(executor);
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
@Override
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
executor.execute(new ChannelEventRunnable(context, e));
executor.execute(new ChannelUpstreamEventRunnable(context, e));
}
@Override
@ -149,7 +157,11 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
ctx.sendDownstream(e);
if (handleDownstream) {
executor.execute(new ChannelDownstreamEventRunnable(ctx, e));
} else {
ctx.sendDownstream(e);
}
}
}

View File

@ -64,8 +64,8 @@ import io.netty.util.internal.SharedResourceMisuseDetector;
* <ul>
* <li>you are using {@link MemoryAwareThreadPoolExecutor} independently from
* {@link ExecutionHandler},</li>
* <li>you are submitting a task whose type is not {@link ChannelEventRunnable}, or</li>
* <li>the message type of the {@link MessageEvent} in the {@link ChannelEventRunnable}
* <li>you are submitting a task whose type is not {@link ChannelUpstreamEventRunnable}, or</li>
* <li>the message type of the {@link MessageEvent} in the {@link ChannelUpstreamEventRunnable}
* is not {@link ChannelBuffer}.</li>
* </ul>
* Here is an example that demonstrates how to implement an {@link ObjectSizeEstimator}
@ -315,7 +315,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
if (!(command instanceof ChannelEventRunnable)) {
if (!(command instanceof ChannelUpstreamEventRunnable)) {
command = new MemoryAwareRunnable(command);
}
@ -363,8 +363,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
int increment = settings.objectSizeEstimator.estimateSize(task);
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
if (task instanceof ChannelUpstreamEventRunnable) {
ChannelUpstreamEventRunnable eventTask = (ChannelUpstreamEventRunnable) task;
eventTask.estimatedSize = increment;
Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(increment);
@ -398,8 +398,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment;
if (task instanceof ChannelEventRunnable) {
increment = ((ChannelEventRunnable) task).estimatedSize;
if (task instanceof ChannelUpstreamEventRunnable) {
increment = ((ChannelUpstreamEventRunnable) task).estimatedSize;
} else {
increment = ((MemoryAwareRunnable) task).estimatedSize;
}
@ -408,8 +408,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
totalLimiter.decrease(increment);
}
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
if (task instanceof ChannelUpstreamEventRunnable) {
ChannelUpstreamEventRunnable eventTask = (ChannelUpstreamEventRunnable) task;
Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(-increment);
//System.out.println("DC: " + channelCounter + ", " + increment);
@ -451,8 +451,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* make sure important tasks are not counted.
*/
protected boolean shouldCount(Runnable task) {
if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable r = (ChannelEventRunnable) task;
if (task instanceof ChannelUpstreamEventRunnable) {
ChannelUpstreamEventRunnable r = (ChannelUpstreamEventRunnable) task;
ChannelEvent e = r.getEvent();
if (e instanceof WriteCompletionEvent) {
return false;

View File

@ -234,10 +234,10 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
*/
@Override
protected void doExecute(Runnable task) {
if (!(task instanceof ChannelEventRunnable)) {
if (!(task instanceof ChannelUpstreamEventRunnable)) {
doUnorderedExecute(task);
} else {
ChannelEventRunnable r = (ChannelEventRunnable) task;
ChannelUpstreamEventRunnable r = (ChannelUpstreamEventRunnable) task;
getChildExecutor(r.getEvent()).execute(task);
}
}

View File

@ -1,301 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChildChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.WriteCompletionEvent;
import io.netty.handler.execution.ChannelEventRunnable;
/**
* {@link SimpleSedaExecutor} which offers an easy way to handle {@link ChannelEvent}'s in a more fine grained fashion. Sub-classes of this {@link FineGrainedSedaExecutor} should override needed methods to hand over the event to a specific
* {@link Executor}. By default all events will get passed to the {@link Executor}'s which were given to construct the {@link FineGrainedSedaExecutor}.
*
* This class is marked abstract to make it clear that it should only be used for sub-classing. If you only need to pass upstream/downstream events to a different {@link Executor} use {@link SimpleSedaExecutor}.
*
*
*/
public abstract class FineGrainedSedaExecutor extends SimpleSedaExecutor{
/**
* Create a new {@link FineGrainedSedaExecutor} which use the two given {@link Executor}'s as default. One is used for upstream events and one for downstream events.
*
* @param upstreamExecutor use the given {@link Executor} as default for downstream events
* @param downstreamExecutor use the given {@link Executor} as default for upstream events
*/
public FineGrainedSedaExecutor(Executor upstreamExecutor, Executor downstreamExecutor) {
super(upstreamExecutor, downstreamExecutor);
}
/**
* Create a new {@link FineGrainedSedaExecutor} which used the given {@link Executor} as default for upstream and downstream events
*
* @param executor use the given {@link Executor} as default for upstream and downstream events
*
*/
public FineGrainedSedaExecutor(Executor executor) {
super(executor);
}
@Override
protected final void executeDownstream(ChannelDownstreamEventRunnable runnable) throws Exception {
ChannelEvent e = runnable.getEvent();
if (e instanceof MessageEvent) {
executeWriteRequested(runnable, (MessageEvent) e);
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (!Boolean.TRUE.equals(evt.getValue())) {
executeCloseRequested(runnable, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
executeBindRequested(runnable, evt);
} else {
executeUnbindRequested(runnable, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
executeConnectRequested(runnable, evt);
} else {
executeDisconnectRequested(runnable, evt);
}
break;
case INTEREST_OPS:
executeSetInterestOpsRequested(runnable, evt);
break;
default:
super.executeDownstream(runnable);
break;
}
} else {
super.executeDownstream(runnable);
}
}
/**
* Invoked when {@link Channel#write(Object)} is called.
*/
public void executeWriteRequested(ChannelDownstreamEventRunnable runnable, MessageEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#bind(SocketAddress)} was called.
*/
public void executeBindRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#connect(SocketAddress)} was called.
*/
public void executeConnectRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#setInterestOps(int)} was called.
*/
public void executeSetInterestOpsRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#disconnect()} was called.
*/
public void executeDisconnectRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#unbind()} was called.
*/
public void executeUnbindRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
/**
* Invoked when {@link Channel#close()} was called.
*/
public void executeCloseRequested(ChannelDownstreamEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeDownstream(runnable);
}
@Override
protected final void executeUpstream(ChannelEventRunnable runnable) throws Exception {
ChannelEvent e = runnable.getEvent();
if (e instanceof MessageEvent) {
executeMessageReceived(runnable, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
executeWriteComplete(runnable, evt);
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
executeChildChannelOpen(runnable, evt);
} else {
executeChildChannelClosed(runnable, evt);
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {
executeChannelOpen(runnable, evt);
} else {
executeChannelClosed(runnable, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
executeChannelBound(runnable, evt);
} else {
executeChannelUnbound(runnable, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
executeChannelConnected(runnable, evt);
} else {
executeChannelDisconnected(runnable, evt);
}
break;
case INTEREST_OPS:
executeChannelInterestChanged(runnable, evt);
break;
default:
super.executeUpstream(runnable);
}
} else if (e instanceof ExceptionEvent) {
executeExceptionCaught(runnable, (ExceptionEvent) e);
} else {
super.executeUpstream(runnable);
}
}
/**
* Invoked when a message object (e.g: {@link ChannelBuffer}) was received
* from a remote peer.
*/
public void executeMessageReceived(ChannelEventRunnable runnable, MessageEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when an exception was raised by an I/O thread or a
* {@link ChannelHandler}.
*/
public void executeExceptionCaught(ChannelEventRunnable runnable, ExceptionEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} is open, but not bound nor connected.
*/
public void executeChannelOpen(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} is open and bound to a local address,
* but not connected.
*/
public void executeChannelBound(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} is open, bound to a local address, and
* connected to a remote address.
*/
public void executeChannelConnected(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel}'s {@link Channel#getInterestOps() interestOps}
* was changed.
*/
public void executeChannelInterestChanged(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} was disconnected from its remote peer.
*/
public void executeChannelDisconnected(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} was unbound from the current local address.
*/
public void executeChannelUnbound(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a {@link Channel} was closed and all its related resources
* were released.
*/
public void executeChannelClosed(ChannelEventRunnable runnable, ChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when something was written into a {@link Channel}.
*/
public void executeWriteComplete(ChannelEventRunnable runnable, WriteCompletionEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a child {@link Channel} was open.
* (e.g. a server channel accepted a connection)
*/
public void executeChildChannelOpen(ChannelEventRunnable runnable, ChildChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
/**
* Invoked when a child {@link Channel} was closed.
* (e.g. the accepted connection was closed)
*/
public void executeChildChannelClosed(ChannelEventRunnable runnable, ChildChannelStateEvent e) throws Exception {
super.executeUpstream(runnable);
}
}

View File

@ -1,65 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.handler.execution.ChannelEventRunnable;
import io.netty.util.ExternalResourceReleasable;
import java.util.concurrent.Executor;
/**
* Abstract base class for SEDA bases {@link Executor} logic.
*
*
*/
public abstract class SedaExecutor implements Executor, ExternalResourceReleasable{
@Override
public void execute(Runnable command) {
ChannelEventRunnable runnable = (ChannelEventRunnable) command;
ChannelHandlerContext ctx = runnable.getContext();
try {
// check if the event was down or upstream
if (runnable instanceof ChannelDownstreamEventRunnable) {
executeDownstream((ChannelDownstreamEventRunnable) runnable);
} else {
executeUpstream(runnable);
}
} catch (Exception e1) {
// handle exceptions
Channels.fireExceptionCaught(ctx, e1);
}
}
/**
* Execute the given {@link ChannelDownstreamEventRunnable} which was triggerd by a downstream event
*
* @param runnable
* @throws Exception
*/
protected abstract void executeDownstream(ChannelDownstreamEventRunnable runnable) throws Exception;
/**
* Execute the given {@link ChannelEventRunnable} which was triggered by an upstream event
*
* @param runnable
* @throws Exception
*/
protected abstract void executeUpstream(ChannelEventRunnable runnable) throws Exception;
}

View File

@ -1,58 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.execution.ExecutionHandler;
/**
* {@link ExecutionHandler} which submit all downstream and upstream events to the given {@link SedaExecutor}. The {@link SedaExecutor} is responsible for hand of the events
* to the different {@link Executor}'s and so build up an <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a> architecture.
*
*
*/
public class SedaHandler extends ExecutionHandler {
/**
* Create a new {@link SedaHandler} which uses the given {@link SedaExecutor}
*
* @param executor the {@link SedaExecutor} to hand off tasks
*/
public SedaHandler(SedaExecutor executor) {
super(executor);
}
/**
* Hand the event to the {@link Executor}
*/
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
getExecutor().execute(new ChannelDownstreamEventRunnable(ctx, e));
}
}
@Override
public void releaseExternalResources() {
((SedaExecutor) getExecutor()).releaseExternalResources();
}
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.util.ObjectSizeEstimator;
/**
* Subclass of {@link MemoryAwareThreadPoolExecutor} which has all the same semantics as {@link MemoryAwareThreadPoolExecutor}. The only difference is that it will not keep track of the memory usage
* of downstream events.
*
*
* For more details see {@link MemoryAwareThreadPoolExecutor}
*
*/
public class SedaMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor{
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory);
}
/**
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory);
}
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
}
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
}
/**
* Don't count if {@link Runnable} is an instance of {@link ChannelDownstreamEventRunnable}
*/
@Override
protected boolean shouldCount(Runnable task) {
if (!(task instanceof ChannelDownstreamEventRunnable)) {
return super.shouldCount(task);
}
return false;
}
}

View File

@ -1,75 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.util.ObjectSizeEstimator;
/**
* Subclass of {@link OrderedMemoryAwareThreadPoolExecutor} which has all the same semantics as {@link OrderedMemoryAwareThreadPoolExecutor}. The only difference is that it will not keep track of the memory usage
* of downstream events .
*
* For more details see {@link OrderedMemoryAwareThreadPoolExecutor}
*
*/
public class SedaOrderedMemoryAwareThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor{
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory);
}
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory);
}
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
}
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
}
/**
* Don't count if {@link Runnable} is an instance of {@link ChannelDownstreamEventRunnable}
*/
@Override
protected boolean shouldCount(Runnable task) {
if (!(task instanceof ChannelDownstreamEventRunnable)) {
return super.shouldCount(task);
}
return false;
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution.seda;
import java.util.concurrent.Executor;
import io.netty.handler.execution.ChannelEventRunnable;
import io.netty.util.internal.ExecutorUtil;
/**
* {@link SedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events.
*
* You should use a {@link SedaOrderedMemoryAwareThreadPoolExecutor} if you care about the order of thread-execution. In most cases this should be the case
*
*
*
*/
public class SimpleSedaExecutor extends SedaExecutor{
private final Executor upstreamExecutor;
private final Executor downstreamExecutor;
/**
* Construct an {@link SimpleSedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events.
*
* @param upstreamExecutor the {@link Executor} which is used for upstream events
* @param downstreamExecutor the {@link Executor} which is used for downstream events
*/
public SimpleSedaExecutor(Executor upstreamExecutor, Executor downstreamExecutor) {
this.upstreamExecutor = upstreamExecutor;
this.downstreamExecutor = downstreamExecutor;
}
/**
* Construct an {@link SimpleSedaExecutor} which uses the same {@link Executor} for downstream and upstream events
*
* @param executor the {@link Executor} for events
*/
public SimpleSedaExecutor(Executor executor) {
this(executor, executor);
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(upstreamExecutor, downstreamExecutor);
}
@Override
protected void executeDownstream(ChannelDownstreamEventRunnable runnable) throws Exception {
downstreamExecutor.execute(runnable);
}
@Override
protected void executeUpstream(ChannelEventRunnable runnable) throws Exception {
upstreamExecutor.execute(runnable);
}
}