Make it possible to schedule upstream events to get fired later in the

io-thread. This is the first part of #140 and #187
This commit is contained in:
Norman Maurer 2012-02-24 20:26:50 +01:00
parent 2304913341
commit 5fdd2dea12
29 changed files with 353 additions and 32 deletions

View File

@ -224,6 +224,11 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
throw new CodecEmbedderException(actualCause); throw new CodecEmbedderException(actualCause);
} }
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
handleEvent(e);
}
} }
private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline { private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {

View File

@ -0,0 +1,30 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline;
public abstract class AbstractHttpChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
}

View File

@ -18,7 +18,6 @@ package io.netty.channel.socket.http;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -33,7 +32,7 @@ import io.netty.channel.MessageEvent;
* from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the * from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the
* client end of the tunnel. * client end of the tunnel.
*/ */
class HttpTunnelAcceptedChannelSink extends AbstractChannelSink { class HttpTunnelAcceptedChannelSink extends AbstractHttpChannelSink {
final SaturationManager saturationManager; final SaturationManager saturationManager;

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.http;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelStateEvent;
@ -27,7 +26,7 @@ import io.netty.channel.MessageEvent;
* Sink of a client channel, deals with sunk events and then makes appropriate calls * Sink of a client channel, deals with sunk events and then makes appropriate calls
* on the channel itself to push data. * on the channel itself to push data.
*/ */
class HttpTunnelClientChannelSink extends AbstractChannelSink { class HttpTunnelClientChannelSink extends AbstractHttpChannelSink {
@Override @Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.http;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -27,7 +26,7 @@ import io.netty.channel.socket.ServerSocketChannel;
/** /**
*/ */
class HttpTunnelServerChannelSink extends AbstractChannelSink { class HttpTunnelServerChannelSink extends AbstractHttpChannelSink {
private ChannelFutureListener closeHook; private ChannelFutureListener closeHook;

View File

@ -19,14 +19,13 @@ package io.netty.channel.socket.http;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
/** /**
* A fake channel sink for use in testing * A fake channel sink for use in testing
*/ */
public class FakeChannelSink extends AbstractChannelSink { public class FakeChannelSink extends AbstractHttpChannelSink {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>(); public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();

View File

@ -329,4 +329,12 @@ public class RxtxChannelSink extends AbstractChannelSink {
} }
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
} }

View File

@ -0,0 +1,43 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline;
public abstract class AbstractScptChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof SctpChannelImpl) {
SctpChannelImpl channel = (SctpChannelImpl) ch;
channel.worker.fireEventLater(new Runnable() {
@Override
public void run() {
pipeline.sendUpstream(e);
}
});
} else {
throw new UnsupportedOperationException();
}
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory;
/** /**
*/ */
class SctpClientPipelineSink extends AbstractChannelSink { class SctpClientPipelineSink extends AbstractScptChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); InternalLoggerFactory.getInstance(SctpClientPipelineSink.class);

View File

@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -45,7 +44,7 @@ import io.netty.util.internal.DeadLockProofWorker;
/** /**
*/ */
class SctpServerPipelineSink extends AbstractChannelSink { class SctpServerPipelineSink extends AbstractScptChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); InternalLoggerFactory.getInstance(SctpServerPipelineSink.class);

View File

@ -45,6 +45,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
import io.netty.channel.socket.Worker;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
@ -53,7 +54,7 @@ import io.netty.util.internal.QueueFactory;
/** /**
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
class SctpWorker implements Runnable { class SctpWorker implements Worker {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpWorker.class); InternalLoggerFactory.getInstance(SctpWorker.class);
@ -71,6 +72,8 @@ class SctpWorker implements Runnable {
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
@ -188,6 +191,7 @@ class SctpWorker implements Runnable {
cancelledKeys = 0; cancelledKeys = 0;
processRegisterTaskQueue(); processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue(); processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
@ -240,7 +244,14 @@ class SctpWorker implements Runnable {
} }
} }
} }
public void fireEventLater(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
// wake up the selector to speed things
selector.wakeup();
}
private void processRegisterTaskQueue() throws IOException { private void processRegisterTaskQueue() throws IOException {
for (; ;) { for (; ;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = registerTaskQueue.poll();
@ -264,7 +275,19 @@ class SctpWorker implements Runnable {
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException { private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next(); SelectionKey k = i.next();

View File

@ -442,6 +442,16 @@ public interface ChannelPipeline {
*/ */
void sendUpstream(ChannelEvent e); void sendUpstream(ChannelEvent e);
/**
* Sends the specified {@link ChannelEvent} to the first
* {@link ChannelUpstreamHandler} in this pipeline when the next IO-Worker operation is performed.
*
* @throws NullPointerException
* if the specified event is {@code null}
*/
void sendUpstreamLater(ChannelEvent e);
/** /**
* Sends the specified {@link ChannelEvent} to the last * Sends the specified {@link ChannelEvent} to the last
* {@link ChannelDownstreamHandler} in this pipeline. * {@link ChannelDownstreamHandler} in this pipeline.

View File

@ -37,4 +37,6 @@ public interface ChannelSink {
* one of its {@link ChannelHandler}s process a {@link ChannelEvent}. * one of its {@link ChannelHandler}s process a {@link ChannelEvent}.
*/ */
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception;
} }

View File

@ -583,6 +583,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
@Override
public void sendUpstreamLater(ChannelEvent e) {
try {
getSink().fireEventLater(this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
@Override @Override
public void sendDownstream(ChannelEvent e) { public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
@ -832,5 +841,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelEvent e, ChannelPipelineException cause) throws Exception { ChannelEvent e, ChannelPipelineException cause) throws Exception {
throw cause; throw cause;
} }
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Not attached yet; discarding: " + e);
}
}
} }
} }

View File

@ -177,4 +177,12 @@ public class IoStreamChannelSink extends AbstractChannelSink {
} }
} }
} }
/**
* This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
} }

View File

@ -85,6 +85,14 @@ final class LocalClientChannelSink extends AbstractChannelSink {
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
try { try {
if (!LocalChannelRegistry.register(localAddress, channel)) { if (!LocalChannelRegistry.register(localAddress, channel)) {

View File

@ -42,6 +42,14 @@ final class LocalServerChannelSink extends AbstractChannelSink {
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
private void handleServerChannel(ChannelEvent e) { private void handleServerChannel(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) { if (!(e instanceof ChannelStateEvent)) {
return; return;

View File

@ -0,0 +1,22 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
public interface Worker extends Runnable{
void fireEventLater(Runnable eventRunnable);
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline;
public abstract class AbstractNioChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
channel.worker.fireEventLater(new Runnable() {
@Override
public void run() {
pipeline.sendUpstream(e);
}
});
} else {
throw new UnsupportedOperationException();
}
}
}

View File

@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.socket.Worker;
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -44,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class AbstractNioWorker implements Runnable { abstract class AbstractNioWorker implements Worker {
/** /**
* Internal Netty logger. * Internal Netty logger.
*/ */
@ -106,6 +107,9 @@ abstract class AbstractNioWorker implements Runnable {
*/ */
protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
@ -216,6 +220,7 @@ abstract class AbstractNioWorker implements Runnable {
cancelledKeys = 0; cancelledKeys = 0;
processRegisterTaskQueue(); processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue(); processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
@ -266,7 +271,13 @@ abstract class AbstractNioWorker implements Runnable {
} }
} }
public void fireEventLater(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
// wake up the selector to speed things
selector.wakeup();
}
private void processRegisterTaskQueue() throws IOException { private void processRegisterTaskQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = registerTaskQueue.poll();
@ -291,6 +302,18 @@ abstract class AbstractNioWorker implements Runnable {
} }
} }
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException { private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next(); SelectionKey k = i.next();

View File

@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -45,7 +44,7 @@ import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
class NioClientSocketPipelineSink extends AbstractChannelSink { class NioClientSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);

View File

@ -22,7 +22,6 @@ import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -35,7 +34,7 @@ import io.netty.channel.MessageEvent;
* Receives downstream events from a {@link ChannelPipeline}. It contains * Receives downstream events from a {@link ChannelPipeline}. It contains
* an array of I/O workers. * an array of I/O workers.
*/ */
class NioDatagramPipelineSink extends AbstractChannelSink { class NioDatagramPipelineSink extends AbstractNioChannelSink {
private final NioDatagramWorker[] workers; private final NioDatagramWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger();

View File

@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -41,7 +40,7 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
class NioServerSocketPipelineSink extends AbstractChannelSink { class NioServerSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);

View File

@ -25,11 +25,14 @@ import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink; import io.netty.channel.ChannelSink;
import io.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel { abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress; volatile InetSocketAddress remoteAddress;
volatile Thread workerThread; volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object(); final Object interestOpsLock = new Object();
AbstractOioChannel( AbstractOioChannel(

View File

@ -0,0 +1,52 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink{
@Override
public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
Channel ch = e.getChannel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
channel.worker.fireEventLater(new Runnable() {
@Override
public void run() {
pipeline.sendUpstream(e);
}
});
} else {
// no worker thread yet so just fire the event now
pipeline.sendUpstream(e);
}
} else {
throw new UnsupportedOperationException();
}
}
}

View File

@ -24,20 +24,26 @@ import static io.netty.channel.Channels.succeededFuture;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels; import io.netty.channel.Channels;
import io.netty.channel.socket.Worker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Queue;
/** /**
* Abstract base class for Oio-Worker implementations * Abstract base class for Oio-Worker implementations
* *
* @param <C> {@link AbstractOioChannel} * @param <C> {@link AbstractOioChannel}
*/ */
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Runnable { abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker{
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel; protected final C channel;
public AbstractOioWorker(C channel) { public AbstractOioWorker(C channel) {
this.channel = channel; this.channel = channel;
channel.worker = this;
} }
@Override @Override
@ -60,9 +66,13 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Runnab
} }
try { try {
if (!process()) { boolean cont = process();
break;
} processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) { } catch (Throwable t) {
if (!channel.isSocketClosed()) { if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
@ -79,6 +89,24 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Runnab
close(channel, succeededFuture(channel)); close(channel, succeededFuture(channel));
} }
@Override
public void fireEventLater(Runnable eventRunnable) {
assert eventQueue.offer(eventRunnable);
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/** /**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors. * was processed without errors.

View File

@ -21,7 +21,6 @@ import java.io.PushbackInputStream;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -31,7 +30,7 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
class OioClientSocketPipelineSink extends AbstractChannelSink { class OioClientSocketPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor; private final Executor workerExecutor;

View File

@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -30,7 +29,7 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
class OioDatagramPipelineSink extends AbstractChannelSink { class OioDatagramPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor; private final Executor workerExecutor;

View File

@ -24,7 +24,6 @@ import java.net.SocketTimeoutException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -36,7 +35,7 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
class OioServerSocketPipelineSink extends AbstractChannelSink { class OioServerSocketPipelineSink extends AbstractOioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);