Rename method to better reflect its usage and update some javadocs. See

#187 and #140
This commit is contained in:
Norman Maurer 2012-02-25 14:19:11 +01:00
parent c2bc463d61
commit 301a17c029
15 changed files with 35 additions and 19 deletions

View File

@ -226,7 +226,7 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
}
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
handleEvent(e);
}
}

View File

@ -23,7 +23,7 @@ import io.netty.channel.ChannelPipeline;
public abstract class AbstractHttpChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}

View File

@ -334,7 +334,7 @@ public class RxtxChannelSink extends AbstractChannelSink {
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
}

View File

@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline;
public abstract class AbstractScptChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof SctpChannelImpl) {
SctpChannelImpl channel = (SctpChannelImpl) ch;
// check if the current thread is a worker thread, and only fire the event later if thats not the case
if (channel.worker.thread != Thread.currentThread()) {
channel.worker.fireEventLater(new Runnable() {
channel.worker.executeInIoThread(new Runnable() {
@Override
public void run() {

View File

@ -245,7 +245,8 @@ class SctpWorker implements Worker {
}
}
public void fireEventLater(Runnable eventRunnable) {
@Override
public void executeInIoThread(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
// wake up the selector to speed things

View File

@ -38,5 +38,8 @@ public interface ChannelSink {
*/
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception;
/**
* Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly
*/
void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception;
}

View File

@ -586,7 +586,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void sendUpstreamLater(ChannelEvent e) {
try {
getSink().fireEventLater(this, e);
getSink().fireUpstreamEventLater(this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
@ -843,7 +843,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Not attached yet; discarding: " + e);
}

View File

@ -182,7 +182,7 @@ public class IoStreamChannelSink extends AbstractChannelSink {
* This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
}

View File

@ -89,7 +89,7 @@ final class LocalClientChannelSink extends AbstractChannelSink {
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}

View File

@ -46,7 +46,7 @@ final class LocalServerChannelSink extends AbstractChannelSink {
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}

View File

@ -16,7 +16,16 @@
package io.netty.channel.socket;
/**
* A {@link Worker} is responsible to dispatch IO operations
*
*/
public interface Worker extends Runnable{
void fireEventLater(Runnable eventRunnable);
/**
* Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work.
*
* @param task the {@link Runnable} to execute
*/
void executeInIoThread(Runnable task);
}

View File

@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline;
public abstract class AbstractNioChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
// check if the current thread is a worker thread if so we can send the event now
if (channel.worker.thread != Thread.currentThread()) {
channel.worker.fireEventLater(new Runnable() {
channel.worker.executeInIoThread(new Runnable() {
@Override
public void run() {

View File

@ -271,7 +271,8 @@ abstract class AbstractNioWorker implements Worker {
}
}
public void fireEventLater(Runnable eventRunnable) {
@Override
public void executeInIoThread(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
// wake up the selector to speed things

View File

@ -25,13 +25,13 @@ import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null && channel.workerThread != Thread.currentThread()) {
channel.worker.fireEventLater(new Runnable() {
channel.worker.executeInIoThread(new Runnable() {
@Override
public void run() {

View File

@ -91,8 +91,10 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
@Override
public void fireEventLater(Runnable eventRunnable) {
public void executeInIoThread(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
private void processEventQueue() throws IOException {