Add EventLoop implementation for the local transport

This commit is contained in:
Trustin Lee 2012-05-30 04:33:43 -07:00
parent c17e5b458a
commit 9f9045c3b4
3 changed files with 82 additions and 3 deletions

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoop;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.handler.logging.LogLevel;
@ -45,9 +46,10 @@ public class LocalEcho {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
try {
// Note that we can use any event loop so that you can ensure certain local channels
// are handled by the same event loop thread which drives a certain socket channel.
sb.eventLoop(new NioEventLoop(), new NioEventLoop())
// Note that we can use any event loop to ensure certain local channels
// are handled by the same event loop thread which drives a certain socket channel
// to reduce the communication latency between socket channels and local channels.
sb.eventLoop(new LocalEventLoop(), new LocalEventLoop())
.channel(new LocalServerChannel())
.localAddress(addr)
.initializer(new ChannelInitializer<LocalServerChannel>() {

View File

@ -0,0 +1,51 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
final class LocalChildEventLoop extends SingleThreadEventLoop {
LocalChildEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
interruptThread();
}
}
}

View File

@ -0,0 +1,26 @@
package io.netty.channel.local;
import io.netty.channel.EventLoopFactory;
import io.netty.channel.MultithreadEventLoop;
import java.util.concurrent.ThreadFactory;
public class LocalEventLoop extends MultithreadEventLoop {
public LocalEventLoop() {
this(DEFAULT_POOL_SIZE);
}
public LocalEventLoop(int nThreads) {
this(nThreads, DEFAULT_THREAD_FACTORY);
}
public LocalEventLoop(int nThreads, ThreadFactory threadFactory) {
super(new EventLoopFactory<LocalChildEventLoop>() {
@Override
public LocalChildEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception {
return new LocalChildEventLoop(threadFactory);
}
}, nThreads, threadFactory);
}
}