From 65be9ebd44bb60b4921e11493f208407f64982e3 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 17 Feb 2012 20:33:18 +0100 Subject: [PATCH] Start to refactor oio transport to share more code. See #186 --- .../socket/oio/AbstractOioChannel.java | 115 +++++++++++++ .../channel/socket/oio/AbstractOioWorker.java | 158 ++++++++++++++++++ .../socket/oio/OioDatagramChannel.java | 101 ++++------- .../channel/socket/oio/OioDatagramWorker.java | 144 +++------------- .../channel/socket/oio/OioSocketChannel.java | 93 ++++------- .../netty/channel/socket/oio/OioWorker.java | 146 +++------------- 6 files changed, 381 insertions(+), 376 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java new file mode 100644 index 0000000000..1d7106b4c7 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -0,0 +1,115 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.oio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; + +abstract class AbstractOioChannel extends AbstractChannel{ + private volatile InetSocketAddress localAddress; + volatile InetSocketAddress remoteAddress; + volatile Thread workerThread; + final Object interestOpsLock = new Object(); + + AbstractOioChannel( + Channel parent, + ChannelFactory factory, + ChannelPipeline pipeline, + ChannelSink sink) { + super(parent, factory, pipeline, sink); + } + + @Override + protected boolean setClosed() { + return super.setClosed(); + } + + @Override + protected void setInterestOpsNow(int interestOps) { + super.setInterestOpsNow(interestOps); + } + + @Override + public ChannelFuture write(Object message, SocketAddress remoteAddress) { + if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { + return super.write(message, null); + } else { + return super.write(message, remoteAddress); + } + } + + @Override + public boolean isBound() { + return isOpen() && isSocketBound(); + } + + @Override + public boolean isConnected() { + return isOpen() && isSocketConnected(); + } + + + @Override + public InetSocketAddress getLocalAddress() { + InetSocketAddress localAddress = this.localAddress; + if (localAddress == null) { + try { + this.localAddress = localAddress = + (InetSocketAddress) getLocalSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return localAddress; + } + + @Override + public InetSocketAddress getRemoteAddress() { + InetSocketAddress remoteAddress = this.remoteAddress; + if (remoteAddress == null) { + try { + this.remoteAddress = remoteAddress = + (InetSocketAddress) getRemoteSocketAddress(); + } catch (Throwable t) { + // Sometimes fails on a closed socket in Windows. + return null; + } + } + return remoteAddress; + } + + abstract boolean isSocketBound(); + + abstract boolean isSocketConnected(); + + abstract boolean isSocketClosed(); + + abstract InetSocketAddress getLocalSocketAddress() throws Exception; + + abstract InetSocketAddress getRemoteSocketAddress() throws Exception; + + abstract void closeSocket() throws IOException; + +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java new file mode 100644 index 0000000000..898ef994c3 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -0,0 +1,158 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.oio; + +import static io.netty.channel.Channels.fireChannelClosed; +import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelInterestChanged; +import static io.netty.channel.Channels.fireChannelUnbound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.succeededFuture; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.Channels; + +import java.io.IOException; + +/** + * Abstract base class for Oio-Worker implementations + * + * @param {@link AbstractOioChannel} + */ +abstract class AbstractOioWorker implements Runnable { + + protected final C channel; + + public AbstractOioWorker(C channel) { + this.channel = channel; + } + + @Override + public void run() { + channel.workerThread = Thread.currentThread(); + + while (channel.isOpen()) { + synchronized (channel.interestOpsLock) { + while (!channel.isReadable()) { + try { + // notify() is not called at all. + // close() and setInterestOps() calls Thread.interrupt() + channel.interestOpsLock.wait(); + } catch (InterruptedException e) { + if (!channel.isOpen()) { + break; + } + } + } + } + + try { + if (!process()) { + break; + } + } catch (Throwable t) { + if (!channel.isSocketClosed()) { + fireExceptionCaught(channel, t); + } + break; + } + } + + // Setting the workerThread to null will prevent any channel + // operations from interrupting this thread from now on. + channel.workerThread = null; + + // Clean up. + close(channel, succeededFuture(channel)); + } + + /** + * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message + * was processed without errors. + * + * @return continue returns true as long as this worker should continue to try processing incoming messages + * @throws IOException + */ + abstract boolean process() throws IOException; + + static void setInterestOps( + AbstractOioChannel channel, ChannelFuture future, int interestOps) { + + // Override OP_WRITE flag - a user cannot change this flag. + interestOps &= ~Channel.OP_WRITE; + interestOps |= channel.getInterestOps() & Channel.OP_WRITE; + + boolean changed = false; + try { + if (channel.getInterestOps() != interestOps) { + if ((interestOps & Channel.OP_READ) != 0) { + channel.setInterestOpsNow(Channel.OP_READ); + } else { + channel.setInterestOpsNow(Channel.OP_NONE); + } + changed = true; + } + + future.setSuccess(); + if (changed) { + synchronized (channel.interestOpsLock) { + channel.setInterestOpsNow(interestOps); + + // Notify the worker so it stops or continues reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + } + + fireChannelInterestChanged(channel); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + static void close(AbstractOioChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + boolean bound = channel.isBound(); + try { + channel.closeSocket(); + if (channel.setClosed()) { + future.setSuccess(); + if (connected) { + // Notify the worker so it stops reading. + Thread currentThread = Thread.currentThread(); + Thread workerThread = channel.workerThread; + if (workerThread != null && currentThread != workerThread) { + workerThread.interrupt(); + } + fireChannelDisconnected(channel); + } + if (bound) { + fireChannelUnbound(channel); + } + fireChannelClosed(channel); + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 841fd1b316..9f55095fa0 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -22,28 +22,22 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.NetworkInterface; -import java.net.SocketAddress; import java.net.SocketException; -import io.netty.channel.AbstractChannel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DefaultDatagramChannelConfig; -final class OioDatagramChannel extends AbstractChannel +final class OioDatagramChannel extends AbstractOioChannel implements DatagramChannel { final MulticastSocket socket; - final Object interestOpsLock = new Object(); private final DatagramChannelConfig config; - volatile Thread workerThread; - private volatile InetSocketAddress localAddress; - volatile InetSocketAddress remoteAddress; + static OioDatagramChannel create(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) { @@ -81,65 +75,6 @@ final class OioDatagramChannel extends AbstractChannel return config; } - @Override - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = - (InetSocketAddress) socket.getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - @Override - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - (InetSocketAddress) socket.getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } - - @Override - public boolean isBound() { - return isOpen() && socket.isBound(); - } - - @Override - public boolean isConnected() { - return isOpen() && socket.isConnected(); - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - @Override - protected void setInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return super.write(message, remoteAddress); - } - } - @Override public void joinGroup(InetAddress multicastAddress) { ensureBound(); @@ -187,4 +122,36 @@ final class OioDatagramChannel extends AbstractChannel throw new ChannelException(e); } } + + @Override + boolean isSocketBound() { + return socket.isBound(); + } + + @Override + boolean isSocketConnected() { + return socket.isConnected(); + } + + @Override + InetSocketAddress getLocalSocketAddress() throws Exception{ + return (InetSocketAddress) socket.getLocalSocketAddress(); + } + + @Override + InetSocketAddress getRemoteSocketAddress() throws Exception{ + return (InetSocketAddress) socket.getRemoteSocketAddress(); + } + + @Override + void closeSocket() throws IOException { + socket.close(); + } + + @Override + boolean isSocketClosed() { + return socket.isClosed(); + } + + } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index ab6ffbd277..f1b42b42f9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -17,77 +17,49 @@ package io.netty.channel.socket.oio; import static io.netty.channel.Channels.*; +import java.io.IOException; import java.io.InterruptedIOException; import java.net.DatagramPacket; -import java.net.MulticastSocket; import java.net.SocketAddress; import java.nio.ByteBuffer; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ReceiveBufferSizePredictor; -class OioDatagramWorker implements Runnable { - - private final OioDatagramChannel channel; +class OioDatagramWorker extends AbstractOioWorker { OioDatagramWorker(OioDatagramChannel channel) { - this.channel = channel; + super(channel); } + + @Override - public void run() { - channel.workerThread = Thread.currentThread(); - final MulticastSocket socket = channel.socket; + boolean process() throws IOException { - while (channel.isOpen()) { - synchronized (channel.interestOpsLock) { - while (!channel.isReadable()) { - try { - // notify() is not called at all. - // close() and setInterestOps() calls Thread.interrupt() - channel.interestOpsLock.wait(); - } catch (InterruptedException e) { - if (!channel.isOpen()) { - break; - } - } - } - } + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); - ReceiveBufferSizePredictor predictor = - channel.getConfig().getReceiveBufferSizePredictor(); + byte[] buf = new byte[predictor.nextReceiveBufferSize()]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + try { + channel.socket.receive(packet); + } catch (InterruptedIOException e) { + // Can happen on interruption. + // Keep receiving unless the channel is closed. + return true; + } - byte[] buf = new byte[predictor.nextReceiveBufferSize()]; - DatagramPacket packet = new DatagramPacket(buf, buf.length); - try { - socket.receive(packet); - } catch (InterruptedIOException e) { - // Can happen on interruption. - // Keep receiving unless the channel is closed. - continue; - } catch (Throwable t) { - if (!channel.socket.isClosed()) { - fireExceptionCaught(channel, t); - } - break; - } - - fireMessageReceived( - channel, - channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()), - packet.getSocketAddress()); - } - - // Setting the workerThread to null will prevent any channel - // operations from interrupting this thread from now on. - channel.workerThread = null; - - // Clean up. - close(channel, succeededFuture(channel)); + fireMessageReceived( + channel, + channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()), + packet.getSocketAddress()); + return true; } + + static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { @@ -120,45 +92,7 @@ class OioDatagramWorker implements Runnable { } } - static void setInterestOps( - OioDatagramChannel channel, ChannelFuture future, int interestOps) { - - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getInterestOps() & Channel.OP_WRITE; - - boolean changed = false; - try { - if (channel.getInterestOps() != interestOps) { - if ((interestOps & Channel.OP_READ) != 0) { - channel.setInterestOpsNow(Channel.OP_READ); - } else { - channel.setInterestOpsNow(Channel.OP_NONE); - } - changed = true; - } - - future.setSuccess(); - if (changed) { - synchronized (channel.interestOpsLock) { - channel.setInterestOpsNow(interestOps); - - // Notify the worker so it stops or continues reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - } - - fireChannelInterestChanged(channel); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - + static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); try { @@ -174,32 +108,4 @@ class OioDatagramWorker implements Runnable { } } - static void close(OioDatagramChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - try { - channel.socket.close(); - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - // Notify the worker so it stops reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - fireChannelDisconnected(channel); - } - if (bound) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index bd1ccdb588..2b3303e519 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -15,31 +15,25 @@ */ package io.netty.channel.socket.oio; +import java.io.IOException; import java.io.OutputStream; import java.io.PushbackInputStream; import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketAddress; -import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannelConfig; -abstract class OioSocketChannel extends AbstractChannel +abstract class OioSocketChannel extends AbstractOioChannel implements SocketChannel { final Socket socket; - final Object interestOpsLock = new Object(); private final SocketChannelConfig config; - volatile Thread workerThread; - private volatile InetSocketAddress localAddress; - private volatile InetSocketAddress remoteAddress; OioSocketChannel( Channel parent, @@ -59,65 +53,36 @@ abstract class OioSocketChannel extends AbstractChannel return config; } - @Override - public InetSocketAddress getLocalAddress() { - InetSocketAddress localAddress = this.localAddress; - if (localAddress == null) { - try { - this.localAddress = localAddress = - (InetSocketAddress) socket.getLocalSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return localAddress; - } - - @Override - public InetSocketAddress getRemoteAddress() { - InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress == null) { - try { - this.remoteAddress = remoteAddress = - (InetSocketAddress) socket.getRemoteSocketAddress(); - } catch (Throwable t) { - // Sometimes fails on a closed socket in Windows. - return null; - } - } - return remoteAddress; - } - - @Override - public boolean isBound() { - return isOpen() && socket.isBound(); - } - - @Override - public boolean isConnected() { - return isOpen() && socket.isConnected(); - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - @Override - protected void setInterestOpsNow(int interestOps) { - super.setInterestOpsNow(interestOps); - } - abstract PushbackInputStream getInputStream(); abstract OutputStream getOutputStream(); @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return getUnsupportedOperationFuture(); - } + boolean isSocketBound() { + return socket.isBound(); + } + + @Override + boolean isSocketConnected() { + return socket.isConnected(); + } + + @Override + InetSocketAddress getLocalSocketAddress() throws Exception { + return (InetSocketAddress) socket.getLocalSocketAddress(); + } + + @Override + InetSocketAddress getRemoteSocketAddress() throws Exception { + return (InetSocketAddress) socket.getRemoteSocketAddress(); + } + + @Override + void closeSocket() throws IOException { + socket.close(); + } + + @Override + boolean isSocketClosed() { + return socket.isClosed(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index 9afe62ba62..bb0f7148d9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.oio; import static io.netty.channel.Channels.*; +import java.io.IOException; import java.io.OutputStream; import java.io.PushbackInputStream; import java.net.SocketException; @@ -26,79 +27,38 @@ import java.nio.channels.WritableByteChannel; import java.util.regex.Pattern; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.FileRegion; -class OioWorker implements Runnable { +class OioWorker extends AbstractOioWorker { private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile( "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE); - private final OioSocketChannel channel; - OioWorker(OioSocketChannel channel) { - this.channel = channel; + super(channel); } @Override - public void run() { - channel.workerThread = Thread.currentThread(); - final PushbackInputStream in = channel.getInputStream(); - boolean fireOpen = channel instanceof OioAcceptedSocketChannel; - - while (channel.isOpen()) { - if (fireOpen) { - fireOpen = false; - fireChannelConnected(channel, channel.getRemoteAddress()); + boolean process() throws IOException { + byte[] buf; + int readBytes; + PushbackInputStream in = channel.getInputStream(); + int bytesToRead = in.available(); + if (bytesToRead > 0) { + buf = new byte[bytesToRead]; + readBytes = in.read(buf); + } else { + int b = in.read(); + if (b < 0) { + return false; } - synchronized (channel.interestOpsLock) { - while (!channel.isReadable()) { - try { - // notify() is not called at all. - // close() and setInterestOps() calls Thread.interrupt() - channel.interestOpsLock.wait(); - } catch (InterruptedException e) { - if (!channel.isOpen()) { - break; - } - } - } - } - - byte[] buf; - int readBytes; - try { - int bytesToRead = in.available(); - if (bytesToRead > 0) { - buf = new byte[bytesToRead]; - readBytes = in.read(buf); - } else { - int b = in.read(); - if (b < 0) { - break; - } - in.unread(b); - continue; - } - } catch (Throwable t) { - if (!channel.socket.isClosed()) { - fireExceptionCaught(channel, t); - } - break; - } - - fireMessageReceived( - channel, - channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes)); + in.unread(b); + return true; } - - // Setting the workerThread to null will prevent any channel - // operations from interrupting this thread from now on. - channel.workerThread = null; - - // Clean up. - close(channel, succeededFuture(channel)); + fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes)); + + return true; } static void write( @@ -162,71 +122,5 @@ class OioWorker implements Runnable { } } - static void setInterestOps( - OioSocketChannel channel, ChannelFuture future, int interestOps) { - // Override OP_WRITE flag - a user cannot change this flag. - interestOps &= ~Channel.OP_WRITE; - interestOps |= channel.getInterestOps() & Channel.OP_WRITE; - - boolean changed = false; - try { - if (channel.getInterestOps() != interestOps) { - if ((interestOps & Channel.OP_READ) != 0) { - channel.setInterestOpsNow(Channel.OP_READ); - } else { - channel.setInterestOpsNow(Channel.OP_NONE); - } - changed = true; - } - - future.setSuccess(); - if (changed) { - synchronized (channel.interestOpsLock) { - channel.setInterestOpsNow(interestOps); - - // Notify the worker so it stops or continues reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - } - - fireChannelInterestChanged(channel); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - static void close(OioSocketChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean bound = channel.isBound(); - try { - channel.socket.close(); - if (channel.setClosed()) { - future.setSuccess(); - if (connected) { - // Notify the worker so it stops reading. - Thread currentThread = Thread.currentThread(); - Thread workerThread = channel.workerThread; - if (workerThread != null && currentThread != workerThread) { - workerThread.interrupt(); - } - fireChannelDisconnected(channel); - } - if (bound) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } }