From b98516536e36540bac0e56c07bd169bcbbb39160 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 29 Mar 2012 17:07:19 +0200 Subject: [PATCH] Introduce the JdkChannel interface and implementation. This will allow us to also share all our nio code in the SCTP implementation. --- .../socket/nio/AbstractJdkChannel.java | 73 ++++++++++++++++ .../socket/nio/AbstractNioChannel.java | 22 +++-- .../socket/nio/AbstractNioChannelSink.java | 8 +- .../channel/socket/nio/AbstractNioWorker.java | 68 +++++++++------ .../netty/channel/socket/nio/JdkChannel.java | 53 ++++++++++++ .../nio/NioClientSocketPipelineSink.java | 4 +- .../socket/nio/NioDatagramChannel.java | 28 +++--- .../socket/nio/NioDatagramJdkChannel.java | 84 ++++++++++++++++++ .../socket/nio/NioDatagramPipelineSink.java | 6 +- .../channel/socket/nio/NioDatagramWorker.java | 29 +------ .../channel/socket/nio/NioSocketChannel.java | 17 +--- .../socket/nio/NioSocketJdkChannel.java | 86 +++++++++++++++++++ .../netty/channel/socket/nio/NioWorker.java | 42 +-------- 13 files changed, 378 insertions(+), 142 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramJdkChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java new file mode 100644 index 0000000000..8629711e23 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractJdkChannel.java @@ -0,0 +1,73 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AbstractSelectableChannel; + +public abstract class AbstractJdkChannel implements JdkChannel { + + final AbstractSelectableChannel channel; + + AbstractJdkChannel(AbstractSelectableChannel channel) { + this.channel = channel; + } + + protected AbstractSelectableChannel getChannel() { + return channel; + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public SelectionKey keyFor(Selector selector) { + return channel.keyFor(selector); + } + + @Override + public SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException { + return channel.register(selector, interestedOps, attachment); + } + + @Override + public boolean isRegistered() { + return channel.isRegistered(); + } + + + @Override + public void configureBlocking(boolean block) throws IOException { + channel.configureBlocking(block); + } + + + @Override + public boolean finishConnect() throws IOException { + return true; + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 5306aad079..03ff40a7c6 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -30,8 +30,6 @@ import io.netty.util.internal.ThreadLocalBoolean; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.channels.SelectableChannel; -import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.Iterator; import java.util.Queue; @@ -40,7 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { +abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { /** * The {@link AbstractNioWorker}. @@ -99,9 +97,9 @@ abstract class AbstractNioChannel { private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 89a679f085..a04d6edbd2 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -28,8 +28,8 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { @Override public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { Channel ch = pipeline.getChannel(); - if (ch instanceof AbstractNioChannel) { - AbstractNioChannel channel = (AbstractNioChannel) ch; + if (ch instanceof AbstractNioChannel) { + AbstractNioChannel channel = (AbstractNioChannel) ch; ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); channel.getWorker().executeInIoThread(wrapper); return wrapper; @@ -43,8 +43,8 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { Channel channel = event.getChannel(); boolean fireLater = false; - if (channel instanceof AbstractNioChannel) { - fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread(); + if (channel instanceof AbstractNioChannel) { + fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread(); } return fireLater; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 051a4b185e..d0d6ccf5b7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -157,7 +157,7 @@ abstract class AbstractNioWorker implements Worker { public void run() { try { try { - clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); + clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); } catch (ClosedChannelException e) { clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); } @@ -171,13 +171,13 @@ abstract class AbstractNioWorker implements Worker { } } }); - } else if (channel instanceof AbstractNioChannel) { + } else if (channel instanceof AbstractNioChannel) { registerTaskQueue.add(new Runnable() { @Override public void run() { try { - registerTask((AbstractNioChannel) channel, future); + registerTask((AbstractNioChannel) channel, future); } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); @@ -536,8 +536,9 @@ abstract class AbstractNioWorker implements Worker { private void connect(SelectionKey k) { final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { - if (ch.channel.finishConnect()) { - registerTask(ch, ch.connectFuture); + // TODO: Remove cast + if (ch.getJdkChannel().finishConnect()) { + registerTask(ch, ch.connectFuture); } } catch (Throwable t) { ch.connectFuture.setFailure(t); @@ -560,8 +561,8 @@ abstract class AbstractNioWorker implements Worker { private void close(SelectionKey k) { Object attachment = k.attachment(); - if (attachment instanceof AbstractNioChannel) { - AbstractNioChannel ch = (AbstractNioChannel) attachment; + if (attachment instanceof AbstractNioChannel) { + AbstractNioChannel ch = (AbstractNioChannel) attachment; close(ch, succeededFuture(ch)); } else if (attachment instanceof NioServerSocketChannel) { NioServerSocketChannel ch = (NioServerSocketChannel) attachment; @@ -571,7 +572,7 @@ abstract class AbstractNioWorker implements Worker { } } - void writeFromUserCode(final AbstractNioChannel channel) { + void writeFromUserCode(final AbstractNioChannel channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); @@ -594,22 +595,40 @@ abstract class AbstractNioWorker implements Worker { write0(channel); } - void writeFromTaskLoop(AbstractNioChannel ch) { + void writeFromTaskLoop(AbstractNioChannel ch) { if (!ch.writeSuspended) { write0(ch); } } void writeFromSelectorLoop(final SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); ch.writeSuspended = false; write0(ch); } - protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel channel); - + + protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { + if (!isIoThread()) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + boolean offered = writeTaskQueue.offer(channel.writeTask); + assert offered; + } - private void write0(AbstractNioChannel channel) { + final Selector workerSelector = selector; + if (workerSelector != null) { + if (wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } + } + + return true; + } + + return false; + } + + protected void write0(AbstractNioChannel channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; @@ -618,7 +637,8 @@ abstract class AbstractNioWorker implements Worker { long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; - final WritableByteChannel ch = channel.channel; + + final WritableByteChannel ch = channel.getJdkChannel(); final Queue writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { @@ -726,9 +746,9 @@ abstract class AbstractNioWorker implements Worker { return Thread.currentThread() == thread; } - private void setOpWrite(AbstractNioChannel channel) { + private void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); + SelectionKey key = channel.getJdkChannel().keyFor(selector); if (key == null) { return; } @@ -749,9 +769,9 @@ abstract class AbstractNioWorker implements Worker { } } - private void clearOpWrite(AbstractNioChannel channel) { + private void clearOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); + SelectionKey key = channel.getJdkChannel().keyFor(selector); if (key == null) { return; } @@ -822,13 +842,13 @@ abstract class AbstractNioWorker implements Worker { } } - void close(AbstractNioChannel channel, ChannelFuture future) { + void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); boolean iothread = isIoThread(); try { - channel.channel.close(); + channel.getJdkChannel().close(); cancelledKeys ++; if (channel.setClosed()) { @@ -868,7 +888,7 @@ abstract class AbstractNioWorker implements Worker { } } - private void cleanUpWriteBuffer(AbstractNioChannel channel) { + private void cleanUpWriteBuffer(AbstractNioChannel channel) { Exception cause = null; boolean fireExceptionCaught = false; @@ -925,7 +945,7 @@ abstract class AbstractNioWorker implements Worker { } } - void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { + void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; boolean iothread = isIoThread(); try { @@ -933,7 +953,7 @@ abstract class AbstractNioWorker implements Worker { // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { Selector selector = this.selector; - SelectionKey key = channel.channel.keyFor(selector); + SelectionKey key = channel.getJdkChannel().keyFor(selector); // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; @@ -1036,6 +1056,6 @@ abstract class AbstractNioWorker implements Worker { */ protected abstract boolean read(SelectionKey k); - protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); + protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java new file mode 100644 index 0000000000..3d3bb41f2b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/JdkChannel.java @@ -0,0 +1,53 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.Channel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.WritableByteChannel; + +public interface JdkChannel extends Channel, WritableByteChannel { + + SelectionKey keyFor(Selector selector); + + SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException; + + boolean isRegistered(); + + SocketAddress getRemoteSocketAddress(); + + SocketAddress getLocalSocketAddress(); + + boolean isConnected(); + + boolean isSocketBound(); + + boolean finishConnect() throws IOException; + + void disconnectSocket() throws IOException; + + void closeSocket() throws IOException; + + void bind(SocketAddress local) throws IOException; + + void connect(SocketAddress remote) throws IOException; + + void configureBlocking(boolean block) throws IOException; +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 2bbf690f3a..495dd371fb 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -84,7 +84,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { NioClientSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { try { - channel.channel.socket().bind(localAddress); + channel.getJdkChannel().bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); @@ -99,7 +99,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { final NioClientSocketChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - channel.channel.connect(remoteAddress); + channel.getJdkChannel().connect(remoteAddress); channel.getCloseFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 6ef742ec73..f916f7f088 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -33,8 +33,7 @@ import java.nio.channels.DatagramChannel; /** * Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}. */ -public final class NioDatagramChannel extends AbstractNioChannel - implements io.netty.channel.socket.DatagramChannel { +public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel { /** * The {@link DatagramChannelConfig}. @@ -53,8 +52,8 @@ public final class NioDatagramChannel extends AbstractNioChannel channel) { - if (!isIoThread()) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - // "add" the channels writeTask to the writeTaskQueue. - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; - } - - final Selector selector = this.selector; - if (selector != null) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - return true; - } - - return false; - } - void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean iothread = isIoThread(); try { - channel.getDatagramChannel().disconnect(); + channel.getJdkChannel().disconnectSocket(); future.setSuccess(); if (connected) { if (iothread) { @@ -154,7 +131,7 @@ public class NioDatagramWorker extends AbstractNioWorker { @Override - protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { + protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { final SocketAddress localAddress = channel.getLocalAddress(); if (localAddress == null) { if (future != null) { @@ -166,7 +143,7 @@ public class NioDatagramWorker extends AbstractNioWorker { try { synchronized (channel.interestOpsLock) { - ((NioDatagramChannel) channel).getDatagramChannel().register( + channel.getJdkChannel().register( selector, channel.getRawInterestOps(), channel); } if (future != null) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 1cf44f987c..47c8d4115d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -20,11 +20,9 @@ import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; -import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; -public abstract class NioSocketChannel extends AbstractNioChannel - implements io.netty.channel.socket.SocketChannel { +public abstract class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { private static final int ST_OPEN = 0; private static final int ST_BOUND = 1; @@ -38,7 +36,7 @@ public abstract class NioSocketChannel extends AbstractNioChannel Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, SocketChannel socket, NioWorker worker) { - super(parent, factory, pipeline, sink, worker, socket); + super(parent, factory, pipeline, sink, worker, new NioSocketJdkChannel(socket)); config = new DefaultNioSocketChannelConfig(socket.socket()); } @@ -84,15 +82,4 @@ public abstract class NioSocketChannel extends AbstractNioChannel state = ST_CLOSED; return super.setClosed(); } - - - @Override - InetSocketAddress getLocalSocketAddress() throws Exception { - return (InetSocketAddress) channel.socket().getLocalSocketAddress(); - } - - @Override - InetSocketAddress getRemoteSocketAddress() throws Exception { - return (InetSocketAddress) channel.socket().getRemoteSocketAddress(); - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java new file mode 100644 index 0000000000..bca65294b4 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketJdkChannel.java @@ -0,0 +1,86 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +class NioSocketJdkChannel extends AbstractJdkChannel { + + + public NioSocketJdkChannel(SocketChannel channel) { + super(channel); + } + + @Override + protected SocketChannel getChannel() { + return (SocketChannel) super.getChannel(); + } + + @Override + public InetSocketAddress getRemoteSocketAddress() { + return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress(); + } + + @Override + public InetSocketAddress getLocalSocketAddress() { + return (InetSocketAddress) getChannel().socket().getLocalSocketAddress(); + } + + @Override + public boolean isSocketBound() { + return getChannel().socket().isBound(); + } + + @Override + public void bind(SocketAddress local) throws IOException { + getChannel().socket().bind(local); + } + + @Override + public void connect(SocketAddress remote) throws IOException { + getChannel().connect(remote); + } + + @Override + public boolean isConnected() { + return getChannel().isConnected(); + } + + @Override + public void disconnectSocket() throws IOException { + getChannel().socket().close(); + } + + @Override + public void closeSocket() throws IOException { + getChannel().socket().close(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return getChannel().write(src); + } + + @Override + public boolean finishConnect() throws IOException { + return getChannel().finishConnect(); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index eab0b7c96d..5524be990d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -31,7 +31,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; @@ -107,40 +106,7 @@ public class NioWorker extends AbstractNioWorker { @Override - protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { - if (!isIoThread()) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - boolean offered = writeTaskQueue.offer(channel.writeTask); - assert offered; - } - - if (!(channel instanceof NioAcceptedSocketChannel)) { - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - } else { - // A write request can be made from an acceptor thread (boss) - // when a user attempted to write something in: - // - // * channelOpen() - // * channelBound() - // * channelConnected(). - // - // In this case, there's no need to wake up the selector because - // the channel is not even registered yet at this moment. - } - - return true; - } - - return false; - } - - @Override - protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { + protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { boolean server = !(channel instanceof NioClientSocketChannel); SocketAddress localAddress = channel.getLocalAddress(); SocketAddress remoteAddress = channel.getRemoteAddress(); @@ -155,13 +121,13 @@ public class NioWorker extends AbstractNioWorker { try { if (server) { - channel.channel.configureBlocking(false); + channel.getJdkChannel().configureBlocking(false); } - boolean registered = channel.channel.isRegistered(); + boolean registered = channel.getJdkChannel().isRegistered(); if (!registered) { synchronized (channel.interestOpsLock) { - channel.channel.register( + channel.getJdkChannel().register( selector, channel.getRawInterestOps(), channel); }