Ported OIO socket/datagram transport to the new API

This commit is contained in:
Trustin Lee 2012-05-25 13:58:56 -07:00
parent b06a4bea6b
commit e2d69120bb
21 changed files with 906 additions and 1883 deletions

View File

@ -247,43 +247,15 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
public int setBytes(int index, InputStream in, int length)
throws IOException {
int readBytes = 0;
if (buffer.hasArray()) {
index += buffer.arrayOffset();
do {
int localReadBytes = in.read(buffer.array(), index, length);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
readBytes += localReadBytes;
index += localReadBytes;
length -= localReadBytes;
} while (length > 0);
return in.read(buffer.array(), buffer.arrayOffset() + index, length);
} else {
byte[] tmp = new byte[length];
int i = 0;
do {
int localReadBytes = in.read(tmp, i, tmp.length - i);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
readBytes += localReadBytes;
i += readBytes;
} while (i < tmp.length);
int readBytes = in.read(tmp);
tmpBuf.clear().position(index);
tmpBuf.put(tmp);
return readBytes;
}
return readBytes;
}
@Override

View File

@ -156,22 +156,7 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
int readBytes = 0;
do {
int localReadBytes = in.read(array, index, length);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
readBytes += localReadBytes;
index += localReadBytes;
length -= localReadBytes;
} while (length > 0);
return readBytes;
return in.read(array, index, length);
}
@Override

View File

@ -29,6 +29,10 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
static final ThreadLocal<SingleThreadEventLoop> CURRENT_EVENT_LOOP = new ThreadLocal<SingleThreadEventLoop>();
public static SingleThreadEventLoop currentEventLoop() {
return CURRENT_EVENT_LOOP.get();
}
private static long nanoTime() {
return System.nanoTime() - START_TIME;
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.socket.Worker;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
volatile Thread workerThread;
volatile Worker worker;
final Object interestOpsLock = new Object();
AbstractOioChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
}
@Override
public boolean isBound() {
return isOpen() && isSocketBound();
}
@Override
public boolean isConnected() {
return isOpen() && isSocketConnected();
}
@Override
public InetSocketAddress getLocalAddress() {
InetSocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress =
(InetSocketAddress) getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
(InetSocketAddress) getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
abstract boolean isSocketBound();
abstract boolean isSocketConnected();
abstract boolean isSocketClosed();
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
abstract void closeSocket() throws IOException;
}

View File

@ -1,57 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.channel();
if (ch instanceof AbstractOioChannel) {
AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker;
if (worker != null) {
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.channel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
}
}
return super.execute(pipeline, task);
}
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
}
return fireLater;
}
}

View File

@ -1,226 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels;
import io.netty.channel.socket.Worker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException;
import java.util.Queue;
/**
* Abstract base class for Oio-Worker implementations
*
* @param <C> {@link AbstractOioChannel}
*/
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
public AbstractOioWorker(C channel) {
this.channel = channel;
channel.worker = this;
}
@Override
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
channel.interestOpsLock.wait();
} catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
try {
boolean cont = process();
processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, succeededFuture(channel), true);
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}
@Override
public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) {
task.run();
} else {
boolean added = eventQueue.offer(task);
if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
}
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
* was processed without errors.
*
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
* @throws IOException
*/
abstract boolean process() throws IOException;
static void setInterestOps(
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
boolean iothread = isIoThread(channel);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
}
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void close(AbstractOioChannel channel, ChannelFuture future) {
close(channel, future, isIoThread(channel));
}
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (iothread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}

View File

@ -0,0 +1,256 @@
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.internal.QueueFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class BlockingChannelEventLoop implements EventLoop {
private final int maxChannels;
final ThreadFactory threadFactory;
final Set<SingleBlockingChannelEventLoop> activeChildren = Collections.newSetFromMap(
new ConcurrentHashMap<SingleBlockingChannelEventLoop, Boolean>());
final Queue<SingleBlockingChannelEventLoop> idleChildren =
QueueFactory.createQueue(SingleBlockingChannelEventLoop.class);
private final ChannelException tooManyChannels;
public BlockingChannelEventLoop() {
this(0);
}
public BlockingChannelEventLoop(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
public BlockingChannelEventLoop(int maxChannels, ThreadFactory threadFactory) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(new StackTraceElement[0]);
}
@Override
public void shutdown() {
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
for (EventLoop l: activeChildren) {
l.shutdownNow();
}
for (EventLoop l: idleChildren) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
return currentEventLoop().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return currentEventLoop().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextEventLoop().register(channel);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextEventLoop().register(channel, future);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
@Override
public boolean inEventLoop() {
return SingleThreadEventLoop.currentEventLoop() != null;
}
private EventLoop nextEventLoop() {
SingleBlockingChannelEventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = new SingleBlockingChannelEventLoop(this);
}
activeChildren.add(loop);
return loop;
}
private static SingleBlockingChannelEventLoop currentEventLoop() {
SingleBlockingChannelEventLoop loop =
(SingleBlockingChannelEventLoop) SingleThreadEventLoop.currentEventLoop();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}
return loop;
}
}

View File

@ -1,76 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.Socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
final class OioAcceptedSocketChannel extends OioSocketChannel {
private final PushbackInputStream in;
private final OutputStream out;
static OioAcceptedSocketChannel create(Channel parent,
ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
Socket socket) {
OioAcceptedSocketChannel instance = new OioAcceptedSocketChannel(
parent, factory, pipeline, sink, socket);
fireChannelOpen(instance);
fireChannelBound(instance, instance.getLocalAddress());
return instance;
}
private OioAcceptedSocketChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink,
Socket socket) {
super(parent, factory, pipeline, sink, socket);
try {
in = new PushbackInputStream(socket.getInputStream(), 1);
} catch (IOException e) {
throw new ChannelException("Failed to obtain an InputStream.", e);
}
try {
out = socket.getOutputStream();
} catch (IOException e) {
throw new ChannelException("Failed to obtain an OutputStream.", e);
}
}
@Override
PushbackInputStream getInputStream() {
return in;
}
@Override
OutputStream getOutputStream() {
return out;
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.Socket;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
final class OioClientSocketChannel extends OioSocketChannel {
volatile PushbackInputStream in;
volatile OutputStream out;
static OioClientSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioClientSocketChannel instance =
new OioClientSocketChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private OioClientSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(null, factory, pipeline, sink, new Socket());
}
@Override
PushbackInputStream getInputStream() {
return in;
}
@Override
OutputStream getOutputStream() {
return out;
}
}

View File

@ -1,111 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ClientSocketChannelFactory;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.ExecutorUtil;
/**
* A {@link ClientSocketChannelFactory} which creates a client-side blocking
* I/O based {@link SocketChannel}. It utilizes the good old blocking I/O API
* which is known to yield better throughput and latency when there are
* relatively small number of connections to serve.
*
* <h3>How threads work</h3>
* <p>
* There is only one type of threads in {@link OioClientSocketChannelFactory};
* worker threads.
*
* <h4>Worker threads</h4>
* <p>
* Each connected {@link Channel} has a dedicated worker thread, just like a
* traditional blocking I/O thread model.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* Worker threads are acquired from the {@link Executor} which was specified
* when a {@link OioClientSocketChannelFactory} was created (i.e. {@code workerExecutor}.)
* Therefore, you should make sure the specified {@link Executor} is able to
* lend the sufficient number of threads.
* <p>
* Worker threads are acquired lazily, and then released when there's nothing
* left to process. All the related resources are also released when the
* worker threads are released. Therefore, to shut down a service gracefully,
* you should do the following:
*
* <ol>
* <li>close all channels created by the factory usually using
* {@link ChannelGroup#close()}, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
*
* <h3>Limitation</h3>
* <p>
* A {@link SocketChannel} created by this factory does not support asynchronous
* operations. Any I/O requests such as {@code "connect"} and {@code "write"}
* will be performed in a blocking manner.
* @apiviz.landmark
*/
public class OioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor workerExecutor;
final OioClientSocketPipelineSink sink;
/**
* Creates a new instance with a {@link Executors#newCachedThreadPool()} as worker executor.
*
* See {@link #OioClientSocketChannelFactory(Executor)}
*/
public OioClientSocketChannelFactory() {
this(Executors.newCachedThreadPool());
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public OioClientSocketChannelFactory(Executor workerExecutor) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
this.workerExecutor = workerExecutor;
sink = new OioClientSocketPipelineSink(workerExecutor);
}
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
return OioClientSocketChannel.create(this, pipeline, sink);
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -1,132 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.PushbackInputStream;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.util.internal.DeadLockProofWorker;
class OioClientSocketPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor;
OioClientSocketPipelineSink(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
ChannelState state = stateEvent.getState();
Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
OioWorker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
OioWorker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
OioWorker.close(channel, future);
}
break;
case INTEREST_OPS:
OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
OioWorker.write(
channel, future,
((MessageEvent) e).getMessage());
}
}
private void bind(
OioClientSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.socket.bind(localAddress);
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(
OioClientSocketChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
try {
channel.socket.connect(
remoteAddress, channel.getConfig().getConnectTimeoutMillis());
connected = true;
// Obtain I/O stream.
channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
channel.out = channel.socket.getOutputStream();
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
// Start the business.
DeadLockProofWorker.start(workerExecutor, new OioWorker(channel));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
OioWorker.close(channel, future);
}
}
}
}

View File

@ -15,94 +15,287 @@
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.Channel;
import java.util.Queue;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.Channels;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
final class OioDatagramChannel extends AbstractOioChannel
public class OioDatagramChannel extends AbstractChannel
implements DatagramChannel {
final MulticastSocket socket;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private final MulticastSocket socket;
private final DatagramChannelConfig config;
private final ChannelBufferHolder<Object> out = ChannelBufferHolders.messageBuffer();
static OioDatagramChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioDatagramChannel instance =
new OioDatagramChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
private static MulticastSocket newSocket() {
try {
return new MulticastSocket(null);
} catch (Exception e) {
throw new ChannelException("failed to create a new socket", e);
}
}
private OioDatagramChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
public OioDatagramChannel() {
this(newSocket());
}
super(null, factory, pipeline, sink);
public OioDatagramChannel(MulticastSocket socket) {
this(null, socket);
}
public OioDatagramChannel(Integer id, MulticastSocket socket) {
super(null, id);
boolean success = false;
try {
socket = new MulticastSocket(null);
} catch (IOException e) {
throw new ChannelException("Failed to open a datagram socket.", e);
}
try {
socket.setSoTimeout(10);
socket.setSoTimeout(1000);
socket.setBroadcast(false);
success = true;
} catch (SocketException e) {
throw new ChannelException(
"Failed to configure the datagram socket timeout.", e);
} finally {
if (!success) {
socket.close();
}
}
this.socket = socket;
config = new DefaultDatagramChannelConfig(socket);
}
@Override
public DatagramChannelConfig getConfig() {
public DatagramChannelConfig config() {
return config;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
ensureBound();
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
@Override
public boolean isActive() {
return isOpen() && socket.isBound();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleBlockingChannelEventLoop;
}
@Override
protected Channel javaChannel() {
return null;
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return out;
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return socket.getRemoteSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
socket.joinGroup(multicastAddress);
return Channels.succeededFuture(this);
} catch (IOException e) {
return Channels.failedFuture(this, e);
socket.connect(remoteAddress);
success = true;
return true;
} finally {
if (!success) {
try {
socket.close();
} catch (Throwable t) {
logger.warn("Failed to close a socket.", t);
}
}
}
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
ensureBound();
try {
socket.joinGroup(multicastAddress, networkInterface);
return Channels.succeededFuture(this);
protected void doFinishConnect() throws Exception {
throw new Error();
}
} catch (IOException e) {
return Channels.failedFuture(this, e);
@Override
protected void doDisconnect() throws Exception {
socket.disconnect();
}
@Override
protected void doClose() throws Exception {
socket.close();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doRead(Queue<Object> buf) throws Exception {
int packetSize = config().getReceivePacketSize();
byte[] data = new byte[packetSize];
java.net.DatagramPacket p = new java.net.DatagramPacket(data, packetSize);
try {
socket.receive(p);
InetSocketAddress remoteAddr = (InetSocketAddress) p.getSocketAddress();
if (remoteAddr == null) {
remoteAddr = remoteAddress();
}
buf.add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data, p.getOffset(), p.getLength()), remoteAddr));
return 1;
} catch (SocketTimeoutException e) {
// Expected
return 0;
}
}
@Override
protected int doRead(ChannelBuffer buf) throws Exception {
throw new Error();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final Queue<Object> buf = unsafe().out().messageBuffer();
if (buf.isEmpty()) {
return 0;
}
DatagramPacket p = (DatagramPacket) buf.poll();
ChannelBuffer data = p.data();
int length = data.readableBytes();
SocketAddress remoteAddr = p.remoteAddress();
java.net.DatagramPacket q;
if (data.hasArray()) {
q = new java.net.DatagramPacket(
data.array(), data.arrayOffset() + data.readerIndex(), length, remoteAddr);
} else {
byte[] tmp = new byte[length];
data.getBytes(data.readerIndex(), tmp);
q = new java.net.DatagramPacket(tmp, length, remoteAddr);
}
socket.send(q);
return 1;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newFuture());
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelFuture future) {
ensureBound();
try {
socket.joinGroup(multicastAddress);
future.setSuccess();
} catch (IOException e) {
future.setFailure(e);
}
return future;
}
@Override
public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return joinGroup(multicastAddress, networkInterface, newFuture());
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelFuture future) {
ensureBound();
try {
socket.joinGroup(multicastAddress, networkInterface);
future.setSuccess();
} catch (IOException e) {
future.setFailure(e);
}
return future;
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
return future;
}
private void ensureBound() {
if (!isBound()) {
if (!isActive()) {
throw new IllegalStateException(
DatagramChannel.class.getName() +
" must be bound to join a group.");
@ -111,56 +304,77 @@ final class OioDatagramChannel extends AbstractOioChannel
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress) {
return leaveGroup(multicastAddress, newFuture());
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelFuture future) {
try {
socket.leaveGroup(multicastAddress);
return Channels.succeededFuture(this);
future.setSuccess();
} catch (IOException e) {
return Channels.failedFuture(this, e);
future.setFailure(e);
}
return future;
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return leaveGroup(multicastAddress, networkInterface, newFuture());
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelFuture future) {
try {
socket.leaveGroup(multicastAddress, networkInterface);
return Channels.succeededFuture(this);
future.setSuccess();
} catch (IOException e) {
return Channels.failedFuture(this, e);
future.setFailure(e);
}
return future;
}
@Override
boolean isSocketBound() {
return socket.isBound();
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
boolean isSocketConnected() {
return socket.isConnected();
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
return future;
}
@Override
InetSocketAddress getLocalSocketAddress() throws Exception {
return (InetSocketAddress) socket.getLocalSocketAddress();
public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
InetSocketAddress getRemoteSocketAddress() throws Exception {
return (InetSocketAddress) socket.getRemoteSocketAddress();
public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock,
ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
return future;
}
@Override
void closeSocket() throws IOException {
socket.close();
public ChannelFuture block(InetAddress multicastAddress,
InetAddress sourceToBlock) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
boolean isSocketClosed() {
return socket.isClosed();
public ChannelFuture block(InetAddress multicastAddress,
InetAddress sourceToBlock, ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
return future;
}
}

View File

@ -1,111 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelFactory;
import io.netty.util.internal.ExecutorUtil;
/**
* A {@link DatagramChannelFactory} which creates a blocking I/O based
* {@link DatagramChannel}. It utilizes the good old blocking I/O API which
* has support for multicast.
*
* <h3>How threads work</h3>
* <p>
* There is only one type of threads in {@link OioDatagramChannelFactory};
* worker threads.
*
* <h4>Worker threads</h4>
* <p>
* Each {@link Channel} has a dedicated worker thread, just like a
* traditional blocking I/O thread model.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* Worker threads are acquired from the {@link Executor} which was specified
* when a {@link OioDatagramChannelFactory} was created (i.e. {@code workerExecutor}.)
* Therefore, you should make sure the specified {@link Executor} is able to
* lend the sufficient number of threads.
* <p>
* Worker threads are acquired lazily, and then released when there's nothing
* left to process. All the related resources are also released when the
* worker threads are released. Therefore, to shut down a service gracefully,
* you should do the following:
*
* <ol>
* <li>close all channels created by the factory usually using
* {@link ChannelGroup#close()}, and</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
*
* <h3>Limitation</h3>
* <p>
* A {@link DatagramChannel} created by this factory does not support asynchronous
* operations. Any I/O requests such as {@code "write"} will be performed in a
* blocking manner.
* @apiviz.landmark
*/
public class OioDatagramChannelFactory implements DatagramChannelFactory {
private final Executor workerExecutor;
final OioDatagramPipelineSink sink;
/**
* Creates a new instance with a {@link Executors#newCachedThreadPool()} as worker executor.
*
* See {@link #OioDatagramChannelFactory(Executor)}
*/
public OioDatagramChannelFactory() {
this(Executors.newCachedThreadPool());
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public OioDatagramChannelFactory(Executor workerExecutor) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
this.workerExecutor = workerExecutor;
sink = new OioDatagramPipelineSink(workerExecutor);
}
@Override
public DatagramChannel newChannel(ChannelPipeline pipeline) {
return OioDatagramChannel.create(this, pipeline, sink);
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -1,152 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.util.internal.DeadLockProofWorker;
class OioDatagramPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor;
OioDatagramPipelineSink(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
ChannelState state = stateEvent.getState();
Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
OioDatagramWorker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
OioDatagramWorker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
OioDatagramWorker.disconnect(channel, future);
}
break;
case INTEREST_OPS:
OioDatagramWorker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent evt = (MessageEvent) e;
OioDatagramWorker.write(
channel, future, evt.getMessage(), evt.getRemoteAddress());
}
}
private void bind(
OioDatagramChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean workerStarted = false;
try {
channel.socket.bind(localAddress);
bound = true;
// Fire events
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
// Start the business.
DeadLockProofWorker.start(
workerExecutor,
new OioDatagramWorker(channel));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (bound && !workerStarted) {
OioDatagramWorker.close(channel, future);
}
}
}
private void connect(
OioDatagramChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Clear the cached address so that the next getRemoteAddress() call
// updates the cache.
channel.remoteAddress = null;
try {
channel.socket.connect(remoteAddress);
connected = true;
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
if (!bound) {
// Start the business.
DeadLockProofWorker.start(
workerExecutor,
new OioDatagramWorker(channel));
} else {
// Worker started by bind() - nothing to do.
}
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
OioDatagramWorker.close(channel, future);
}
}
}
}

View File

@ -1,131 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.DatagramPacket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ReceiveBufferSizePredictor;
class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
OioDatagramWorker(OioDatagramChannel channel) {
super(channel);
}
@Override
boolean process() throws IOException {
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
byte[] buf = new byte[predictor.nextReceiveBufferSize()];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
channel.socket.receive(packet);
} catch (InterruptedIOException e) {
// Can happen on interruption.
// Keep receiving unless the channel is closed.
return true;
}
fireMessageReceived(
channel,
channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
packet.getSocketAddress());
return true;
}
static void write(
OioDatagramChannel channel, ChannelFuture future,
Object message, SocketAddress remoteAddress) {
boolean iothread = isIoThread(channel);
try {
ChannelBuffer buf = (ChannelBuffer) message;
int offset = buf.readerIndex();
int length = buf.readableBytes();
ByteBuffer nioBuf = buf.toByteBuffer();
DatagramPacket packet;
if (nioBuf.hasArray()) {
// Avoid copy if the buffer is backed by an array.
packet = new DatagramPacket(
nioBuf.array(), nioBuf.arrayOffset() + offset, length);
} else {
// Otherwise it will be expensive.
byte[] arrayBuf = new byte[length];
buf.getBytes(0, arrayBuf);
packet = new DatagramPacket(arrayBuf, length);
}
if (remoteAddress != null) {
packet.setSocketAddress(remoteAddress);
}
channel.socket.send(packet);
if (iothread) {
fireWriteComplete(channel, length);
} else {
fireWriteCompleteLater(channel, length);
}
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean iothread = isIoThread(channel);
try {
channel.socket.disconnect();
future.setSuccess();
if (connected) {
// Notify.
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}

View File

@ -15,98 +15,164 @@
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
final class OioServerSocketChannel extends AbstractServerChannel
implements ServerSocketChannel {
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.Channel;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class OioServerSocketChannel extends AbstractServerChannel
implements ServerSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
private static ServerSocket newServerSocket() {
try {
return new ServerSocket();
} catch (IOException e) {
throw new ChannelException("failed to create a server socket", e);
}
}
final ServerSocket socket;
final Lock shutdownLock = new ReentrantLock();
private final ServerSocketChannelConfig config;
static OioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioServerSocketChannel instance =
new OioServerSocketChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
public OioServerSocketChannel() {
this(newServerSocket());
}
private OioServerSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
public OioServerSocketChannel(ServerSocket socket) {
this(null, socket);
}
super(factory, pipeline, sink);
try {
socket = new ServerSocket();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
public OioServerSocketChannel(Integer id, ServerSocket socket) {
super(id);
if (socket == null) {
throw new NullPointerException("socket");
}
boolean success = false;
try {
socket.setSoTimeout(1000);
success = true;
} catch (IOException e) {
try {
socket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException(
"Failed to set the server socket timeout.", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e);
}
}
}
}
this.socket = socket;
config = new DefaultServerSocketChannelConfig(socket);
}
@Override
public ServerSocketChannelConfig getConfig() {
public ServerSocketChannelConfig config() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.getLocalSocketAddress();
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public boolean isBound() {
public boolean isOpen() {
return !socket.isClosed();
}
@Override
public boolean isActive() {
return isOpen() && socket.isBound();
}
@Override
protected boolean setClosed() {
return super.setClosed();
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleBlockingChannelEventLoop;
}
@Override
protected Channel javaChannel() {
return null;
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
protected void doClose() throws Exception {
socket.close();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doRead(Queue<Object> buf) throws Exception {
Socket s = null;
try {
s = socket.accept();
if (s != null) {
buf.add(new OioSocketChannel(this, null, s));
return 1;
}
} catch (SocketTimeoutException e) {
// Expected
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
if (s != null) {
try {
s.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
}
return 0;
}
}

View File

@ -1,131 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelFactory;
import io.netty.util.internal.ExecutorUtil;
/**
* A {@link ServerSocketChannelFactory} which creates a server-side blocking
* I/O based {@link ServerSocketChannel}. It utilizes the good old blocking
* I/O API which is known to yield better throughput and latency when there
* are relatively small number of connections to serve.
*
* <h3>How threads work</h3>
* <p>
* There are two types of threads in a {@link OioServerSocketChannelFactory};
* one is boss thread and the other is worker thread.
*
* <h4>Boss threads</h4>
* <p>
* Each bound {@link ServerSocketChannel} has its own boss thread.
* For example, if you opened two server ports such as 80 and 443, you will
* have two boss threads. A boss thread accepts incoming connections until
* the port is unbound. Once a connection is accepted successfully, the boss
* thread passes the accepted {@link Channel} to one of the worker
* threads that the {@link OioServerSocketChannelFactory} manages.
*
* <h4>Worker threads</h4>
* <p>
* Each connected {@link Channel} has a dedicated worker thread, just like a
* traditional blocking I/O thread model.
*
* <h3>Life cycle of threads and graceful shutdown</h3>
* <p>
* All threads are acquired from the {@link Executor}s which were specified
* when a {@link OioServerSocketChannelFactory} was created. Boss threads are
* acquired from the {@code bossExecutor}, and worker threads are acquired from
* the {@code workerExecutor}. Therefore, you should make sure the specified
* {@link Executor}s are able to lend the sufficient number of threads.
* <p>
* Both boss and worker threads are acquired lazily, and then released when
* there's nothing left to process. All the related resources are also
* released when the boss and worker threads are released. Therefore, to shut
* down a service gracefully, you should do the following:
*
* <ol>
* <li>unbind all channels created by the factory,
* <li>close all child channels accepted by the unbound channels,
* (these two steps so far is usually done using {@link ChannelGroup#close()})</li>
* <li>call {@link #releaseExternalResources()}.</li>
* </ol>
*
* Please make sure not to shut down the executor until all channels are
* closed. Otherwise, you will end up with a {@link RejectedExecutionException}
* and the related resources might not be released properly.
*
* <h3>Limitation</h3>
* <p>
* A {@link ServerSocketChannel} created by this factory and its child channels
* do not support asynchronous operations. Any I/O requests such as
* {@code "write"} will be performed in a blocking manner.
* @apiviz.landmark
*/
public class OioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final Executor workerExecutor;
private final ChannelSink sink;
/**
* Create a new {@link OioServerSocketChannelFactory} with a {@link Executors#newCachedThreadPool()} for the boss and worker executor.
*
* See {@link #OioServerSocketChannelFactory(Executor, Executor)}
*/
public OioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public OioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor;
sink = new OioServerSocketPipelineSink(workerExecutor);
}
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return OioServerSocketChannel.create(this, pipeline, sink);
}
@Override
public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor, workerExecutor);
}
}

View File

@ -1,245 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
class OioServerSocketPipelineSink extends AbstractOioChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
private static final AtomicInteger nextId = new AtomicInteger();
final int id = nextId.incrementAndGet();
final Executor workerExecutor;
OioServerSocketPipelineSink(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof OioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof OioAcceptedSocketChannel) {
handleAcceptedSocket(e);
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
OioServerSocketChannel channel =
(OioServerSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
private void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
OioAcceptedSocketChannel channel =
(OioAcceptedSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
OioWorker.close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
OioWorker.close(channel, future);
}
break;
case INTEREST_OPS:
OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
OioSocketChannel channel = (OioSocketChannel) event.channel();
ChannelFuture future = event.getFuture();
Object message = event.getMessage();
OioWorker.write(channel, future, message);
}
}
private void bind(
OioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
localAddress = channel.getLocalAddress();
fireChannelBound(channel, localAddress);
Executor bossExecutor =
((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor, new Boss(channel));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
private void close(OioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
channel.socket.close();
// Make sure the boss thread is not running so that that the future
// is notified after a new connection cannot be accepted anymore.
// See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} finally {
channel.shutdownLock.unlock();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private final class Boss implements Runnable {
private final OioServerSocketChannel channel;
Boss(OioServerSocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
channel.shutdownLock.lock();
try {
while (channel.isBound()) {
try {
Socket acceptedSocket = channel.socket.accept();
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
final OioAcceptedSocketChannel acceptedChannel =
OioAcceptedSocketChannel.create(channel, channel.getFactory(),
pipeline, OioServerSocketPipelineSink.this, acceptedSocket);
DeadLockProofWorker.start(
workerExecutor,
new OioWorker(acceptedChannel));
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
} catch (SocketTimeoutException e) {
// Thrown every second to stop when requested.
} catch (Throwable e) {
// Do not log the exception if the server socket was closed
// by a user.
if (!channel.socket.isBound() || channel.socket.isClosed()) {
break;
}
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
}
}
}
}

View File

@ -15,74 +15,210 @@
*/
package io.netty.channel.socket.oio;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
abstract class OioSocketChannel extends AbstractOioChannel
implements SocketChannel {
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
final Socket socket;
public class OioSocketChannel extends AbstractChannel
implements SocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSocketChannel.class);
private final Socket socket;
private final SocketChannelConfig config;
private final ChannelBufferHolder<?> out = ChannelBufferHolders.byteBuffer();
private InputStream is;
private OutputStream os;
OioSocketChannel(
Channel parent,
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink,
Socket socket) {
public OioSocketChannel() {
this(new Socket());
}
super(parent, factory, pipeline, sink);
public OioSocketChannel(Socket socket) {
this(null, null, socket);
}
public OioSocketChannel(Channel parent, Integer id, Socket socket) {
super(parent, id);
this.socket = socket;
config = new DefaultSocketChannelConfig(socket);
boolean success = false;
try {
if (socket.isConnected()) {
is = socket.getInputStream();
os = socket.getOutputStream();
}
socket.setSoTimeout(1000);
success = true;
} catch (Exception e) {
throw new ChannelException("failed to initialize a socket", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
logger.warn("Failed to close a socket.", e);
}
}
}
}
@Override
public SocketChannelConfig getConfig() {
public SocketChannelConfig config() {
return config;
}
abstract PushbackInputStream getInputStream();
abstract OutputStream getOutputStream();
@Override
boolean isSocketBound() {
return socket.isBound();
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
boolean isSocketConnected() {
return socket.isConnected();
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
InetSocketAddress getLocalSocketAddress() throws Exception {
return (InetSocketAddress) socket.getLocalSocketAddress();
public boolean isOpen() {
return !socket.isClosed();
}
@Override
InetSocketAddress getRemoteSocketAddress() throws Exception {
return (InetSocketAddress) socket.getRemoteSocketAddress();
public boolean isActive() {
return !socket.isClosed() && socket.isConnected();
}
@Override
void closeSocket() throws IOException {
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleBlockingChannelEventLoop;
}
@Override
protected java.nio.channels.Channel javaChannel() {
return null;
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) out;
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return socket.getRemoteSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
socket.connect(remoteAddress, config().getConnectTimeoutMillis());
is = socket.getInputStream();
os = socket.getOutputStream();
success = true;
return true;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doFinishConnect() throws Exception {
throw new Error();
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
socket.close();
}
@Override
boolean isSocketClosed() {
return socket.isClosed();
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doRead(Queue<Object> buf) throws Exception {
throw new Error();
}
@Override
protected int doRead(ChannelBuffer buf) throws Exception {
try {
int readBytes = buf.writeBytes(is, buf.writableBytes());
return readBytes;
} catch (SocketTimeoutException e) {
// Expected
return 0;
}
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
final ChannelBuffer buf = unsafe().out().byteBuffer();
final int length = buf.readableBytes();
if (length == 0) {
return 0;
}
buf.readBytes(os, length);
return length;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
}

View File

@ -1,139 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.SocketException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.util.regex.Pattern;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.FileRegion;
class OioWorker extends AbstractOioWorker<OioSocketChannel> {
private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
"^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
OioWorker(OioSocketChannel channel) {
super(channel);
}
@Override
boolean process() throws IOException {
byte[] buf;
int readBytes;
PushbackInputStream in = channel.getInputStream();
int bytesToRead = in.available();
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else {
int b = in.read();
if (b < 0) {
return false;
}
in.unread(b);
return true;
}
fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
return true;
}
static void write(
OioSocketChannel channel, ChannelFuture future,
Object message) {
boolean iothread = isIoThread(channel);
OutputStream out = channel.getOutputStream();
if (out == null) {
Exception e = new ClosedChannelException();
future.setFailure(e);
if (iothread) {
fireExceptionCaught(channel, e);
} else {
fireExceptionCaughtLater(channel, e);
}
return;
}
try {
int length = 0;
// Add support to write a FileRegion. This in fact will not give any performance gain but at least it not fail and
// we did the best to emulate it
if (message instanceof FileRegion) {
FileRegion fr = (FileRegion) message;
try {
synchronized (out) {
WritableByteChannel bchannel = Channels.newChannel(out);
long i = 0;
while ((i = fr.transferTo(bchannel, length)) > 0) {
length += i;
if (length >= fr.getCount()) {
break;
}
}
}
} finally {
if (fr.releaseAfterTransfer()) {
fr.releaseExternalResources();
}
}
} else {
ChannelBuffer a = (ChannelBuffer) message;
length = a.readableBytes();
synchronized (out) {
a.getBytes(a.readerIndex(), out, length);
}
}
future.setSuccess();
if (iothread) {
fireWriteComplete(channel, length);
} else {
fireWriteCompleteLater(channel, length);
}
} catch (Throwable t) {
// Convert 'SocketException: Socket closed' to
// ClosedChannelException.
if (t instanceof SocketException &&
SOCKET_CLOSED_MESSAGE.matcher(
String.valueOf(t.getMessage())).matches()) {
t = new ClosedChannelException();
}
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
}

View File

@ -0,0 +1,77 @@
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.SingleThreadEventLoop;
public class SingleBlockingChannelEventLoop extends SingleThreadEventLoop {
private final BlockingChannelEventLoop parent;
private Channel ch;
SingleBlockingChannelEventLoop(BlockingChannelEventLoop parent) {
super(parent.threadFactory);
this.parent = parent;
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
return super.register(channel, future).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Override
protected void run() {
for (;;) {
Channel ch = SingleBlockingChannelEventLoop.this.ch;
if (ch == null || !ch.isActive()) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
} else {
ch.unsafe().read();
for (;;) {
Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
// Handle deregistration
if (!ch.isRegistered()) {
deregister();
}
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
interruptThread();
}
private void deregister() {
ch = null;
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
}