Introduce the JdkChannel interface and implementation. This will allow

us to also share all our nio code in the SCTP implementation.
This commit is contained in:
Norman Maurer 2012-03-29 17:07:19 +02:00
parent 73f3a45e97
commit b98516536e
13 changed files with 378 additions and 142 deletions

View File

@ -0,0 +1,73 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
public abstract class AbstractJdkChannel implements JdkChannel {
final AbstractSelectableChannel channel;
AbstractJdkChannel(AbstractSelectableChannel channel) {
this.channel = channel;
}
protected AbstractSelectableChannel getChannel() {
return channel;
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() throws IOException {
channel.close();
}
@Override
public SelectionKey keyFor(Selector selector) {
return channel.keyFor(selector);
}
@Override
public SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException {
return channel.register(selector, interestedOps, attachment);
}
@Override
public boolean isRegistered() {
return channel.isRegistered();
}
@Override
public void configureBlocking(boolean block) throws IOException {
channel.configureBlocking(block);
}
@Override
public boolean finishConnect() throws IOException {
return true;
}
}

View File

@ -30,8 +30,6 @@ import io.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
@ -40,7 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel implements NioChannel {
abstract class AbstractNioChannel extends AbstractChannel implements NioChannel {
/**
* The {@link AbstractNioWorker}.
@ -99,9 +97,9 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
final C channel;
private final JdkChannel channel;
protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
protected AbstractNioChannel(Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, JdkChannel ch) {
super(id, parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
@ -109,12 +107,16 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
protected AbstractNioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, JdkChannel ch) {
super(parent, factory, pipeline, sink);
this.worker = worker;
this.channel = ch;
}
protected JdkChannel getJdkChannel() {
return channel;
}
/**
* Return the {@link AbstractNioWorker} that handle the IO of the {@link AbstractNioChannel}
*
@ -131,7 +133,7 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
if (localAddress == null) {
try {
this.localAddress = localAddress =
(InetSocketAddress) getLocalSocketAddress();
(InetSocketAddress) channel.getLocalSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
@ -146,7 +148,7 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress =
(InetSocketAddress) getRemoteSocketAddress();
(InetSocketAddress) channel.getRemoteSocketAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
@ -211,10 +213,6 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
return super.setClosed();
}
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
private final class WriteRequestQueue implements BlockingQueue<MessageEvent> {
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();

View File

@ -28,8 +28,8 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
if (ch instanceof AbstractNioChannel) {
AbstractNioChannel channel = (AbstractNioChannel) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.getWorker().executeInIoThread(wrapper);
return wrapper;
@ -43,8 +43,8 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractNioChannel<?>) {
fireLater = !((AbstractNioChannel<?>) channel).getWorker().isIoThread();
if (channel instanceof AbstractNioChannel) {
fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread();
}
return fireLater;
}

View File

@ -157,7 +157,7 @@ abstract class AbstractNioWorker implements Worker {
public void run() {
try {
try {
clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel);
clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel);
} catch (ClosedChannelException e) {
clientChannel.getWorker().close(clientChannel, succeededFuture(channel));
}
@ -171,13 +171,13 @@ abstract class AbstractNioWorker implements Worker {
}
}
});
} else if (channel instanceof AbstractNioChannel<?>) {
} else if (channel instanceof AbstractNioChannel) {
registerTaskQueue.add(new Runnable() {
@Override
public void run() {
try {
registerTask((AbstractNioChannel<?>) channel, future);
registerTask((AbstractNioChannel) channel, future);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
@ -536,8 +536,9 @@ abstract class AbstractNioWorker implements Worker {
private void connect(SelectionKey k) {
final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.channel.finishConnect()) {
registerTask(ch, ch.connectFuture);
// TODO: Remove cast
if (ch.getJdkChannel().finishConnect()) {
registerTask(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
@ -560,8 +561,8 @@ abstract class AbstractNioWorker implements Worker {
private void close(SelectionKey k) {
Object attachment = k.attachment();
if (attachment instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) attachment;
if (attachment instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) attachment;
close(ch, succeededFuture(ch));
} else if (attachment instanceof NioServerSocketChannel) {
NioServerSocketChannel ch = (NioServerSocketChannel) attachment;
@ -571,7 +572,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
void writeFromUserCode(final AbstractNioChannel<?> channel) {
void writeFromUserCode(final AbstractNioChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
@ -594,22 +595,40 @@ abstract class AbstractNioWorker implements Worker {
write0(channel);
}
void writeFromTaskLoop(AbstractNioChannel<?> ch) {
void writeFromTaskLoop(AbstractNioChannel ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel);
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) {
if (!isIoThread()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
private void write0(AbstractNioChannel<?> channel) {
final Selector workerSelector = selector;
if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
return true;
}
return false;
}
protected void write0(AbstractNioChannel channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
@ -618,7 +637,8 @@ abstract class AbstractNioWorker implements Worker {
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final WritableByteChannel ch = channel.getJdkChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
@ -726,9 +746,9 @@ abstract class AbstractNioWorker implements Worker {
return Thread.currentThread() == thread;
}
private void setOpWrite(AbstractNioChannel<?> channel) {
private void setOpWrite(AbstractNioChannel channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
SelectionKey key = channel.getJdkChannel().keyFor(selector);
if (key == null) {
return;
}
@ -749,9 +769,9 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void clearOpWrite(AbstractNioChannel<?> channel) {
private void clearOpWrite(AbstractNioChannel channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
SelectionKey key = channel.getJdkChannel().keyFor(selector);
if (key == null) {
return;
}
@ -822,13 +842,13 @@ abstract class AbstractNioWorker implements Worker {
}
}
void close(AbstractNioChannel<?> channel, ChannelFuture future) {
void close(AbstractNioChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean iothread = isIoThread();
try {
channel.channel.close();
channel.getJdkChannel().close();
cancelledKeys ++;
if (channel.setClosed()) {
@ -868,7 +888,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
private void cleanUpWriteBuffer(AbstractNioChannel channel) {
Exception cause = null;
boolean fireExceptionCaught = false;
@ -925,7 +945,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) {
boolean changed = false;
boolean iothread = isIoThread();
try {
@ -933,7 +953,7 @@ abstract class AbstractNioWorker implements Worker {
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
SelectionKey key = channel.getJdkChannel().keyFor(selector);
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
@ -1036,6 +1056,6 @@ abstract class AbstractNioWorker implements Worker {
*/
protected abstract boolean read(SelectionKey k);
protected abstract void registerTask(AbstractNioChannel<?> channel, ChannelFuture future);
protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future);
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.WritableByteChannel;
public interface JdkChannel extends Channel, WritableByteChannel {
SelectionKey keyFor(Selector selector);
SelectionKey register(Selector selector, int interestedOps, Object attachment) throws ClosedChannelException;
boolean isRegistered();
SocketAddress getRemoteSocketAddress();
SocketAddress getLocalSocketAddress();
boolean isConnected();
boolean isSocketBound();
boolean finishConnect() throws IOException;
void disconnectSocket() throws IOException;
void closeSocket() throws IOException;
void bind(SocketAddress local) throws IOException;
void connect(SocketAddress remote) throws IOException;
void configureBlocking(boolean block) throws IOException;
}

View File

@ -84,7 +84,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
NioClientSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.channel.socket().bind(localAddress);
channel.getJdkChannel().bind(localAddress);
channel.boundManually = true;
channel.setBound();
future.setSuccess();
@ -99,7 +99,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
channel.channel.connect(remoteAddress);
channel.getJdkChannel().connect(remoteAddress);
channel.getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)

View File

@ -33,8 +33,7 @@ import java.nio.channels.DatagramChannel;
/**
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
*/
public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
implements io.netty.channel.socket.DatagramChannel {
public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel {
/**
* The {@link DatagramChannelConfig}.
@ -53,8 +52,8 @@ public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel
private NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink,
final NioDatagramWorker worker) {
super(null, factory, pipeline, sink, worker, openNonBlockingChannel());
config = new DefaultNioDatagramChannelConfig(channel.socket());
super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel()));
config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel().socket());
}
private static DatagramChannel openNonBlockingChannel() {
@ -68,6 +67,11 @@ public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel
}
@Override
protected NioDatagramJdkChannel getJdkChannel() {
return (NioDatagramJdkChannel) super.getJdkChannel();
}
@Override
public NioDatagramWorker getWorker() {
return (NioDatagramWorker) super.getWorker();
@ -75,12 +79,12 @@ public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel
@Override
public boolean isBound() {
return isOpen() && channel.socket().isBound();
return isOpen() && getJdkChannel().isSocketBound();
}
@Override
public boolean isConnected() {
return channel.isConnected();
return getJdkChannel().isConnected();
}
@Override
@ -93,10 +97,6 @@ public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel
return config;
}
DatagramChannel getDatagramChannel() {
return channel;
}
@Override
public void joinGroup(InetAddress multicastAddress) {
throw new UnsupportedOperationException();
@ -119,15 +119,7 @@ public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel
throw new UnsupportedOperationException();
}
@Override
InetSocketAddress getLocalSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getLocalSocketAddress();
}
@Override
InetSocketAddress getRemoteSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {

View File

@ -0,0 +1,84 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
class NioDatagramJdkChannel extends AbstractJdkChannel {
NioDatagramJdkChannel(DatagramChannel channel) {
super(channel);
}
@Override
protected DatagramChannel getChannel() {
return (DatagramChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress();
}
@Override
public InetSocketAddress getLocalSocketAddress() {
return (InetSocketAddress) getChannel().socket().getLocalSocketAddress();
}
@Override
public boolean isSocketBound() {
return getChannel().socket().isBound();
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().socket().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public boolean isConnected() {
return getChannel().isConnected();
}
@Override
public void disconnectSocket() throws IOException {
getChannel().disconnect();
}
@Override
public void closeSocket() throws IOException {
getChannel().socket().close();
}
@Override
public int write(ByteBuffer src) throws IOException {
return getChannel().write(src);
}
}

View File

@ -84,7 +84,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
private void close(NioDatagramChannel channel, ChannelFuture future) {
try {
channel.getDatagramChannel().socket().close();
channel.getJdkChannel().closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (channel.isBound()) {
@ -110,7 +110,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
boolean started = false;
try {
// First bind the DatagramSocket the specified port.
channel.getDatagramChannel().socket().bind(address);
channel.getJdkChannel().bind(address);
bound = true;
future.setSuccess();
@ -143,7 +143,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
channel.remoteAddress = null;
try {
channel.getDatagramChannel().connect(remoteAddress);
channel.getJdkChannel().connect(remoteAddress);
connected = true;
// Fire events.

View File

@ -105,35 +105,12 @@ public class NioDatagramWorker extends AbstractNioWorker {
return true;
}
@Override
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
if (!isIoThread()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue.
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
final Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
return true;
}
return false;
}
void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean iothread = isIoThread();
try {
channel.getDatagramChannel().disconnect();
channel.getJdkChannel().disconnectSocket();
future.setSuccess();
if (connected) {
if (iothread) {
@ -154,7 +131,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
@Override
protected void registerTask(AbstractNioChannel<?> channel, ChannelFuture future) {
protected void registerTask(AbstractNioChannel channel, ChannelFuture future) {
final SocketAddress localAddress = channel.getLocalAddress();
if (localAddress == null) {
if (future != null) {
@ -166,7 +143,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
try {
synchronized (channel.interestOpsLock) {
((NioDatagramChannel) channel).getDatagramChannel().register(
channel.getJdkChannel().register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {

View File

@ -20,11 +20,9 @@ import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public abstract class NioSocketChannel extends AbstractNioChannel<SocketChannel>
implements io.netty.channel.socket.SocketChannel {
public abstract class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
@ -38,7 +36,7 @@ public abstract class NioSocketChannel extends AbstractNioChannel<SocketChannel>
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink, worker, socket);
super(parent, factory, pipeline, sink, worker, new NioSocketJdkChannel(socket));
config = new DefaultNioSocketChannelConfig(socket.socket());
}
@ -84,15 +82,4 @@ public abstract class NioSocketChannel extends AbstractNioChannel<SocketChannel>
state = ST_CLOSED;
return super.setClosed();
}
@Override
InetSocketAddress getLocalSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getLocalSocketAddress();
}
@Override
InetSocketAddress getRemoteSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class NioSocketJdkChannel extends AbstractJdkChannel {
public NioSocketJdkChannel(SocketChannel channel) {
super(channel);
}
@Override
protected SocketChannel getChannel() {
return (SocketChannel) super.getChannel();
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
return (InetSocketAddress) getChannel().socket().getRemoteSocketAddress();
}
@Override
public InetSocketAddress getLocalSocketAddress() {
return (InetSocketAddress) getChannel().socket().getLocalSocketAddress();
}
@Override
public boolean isSocketBound() {
return getChannel().socket().isBound();
}
@Override
public void bind(SocketAddress local) throws IOException {
getChannel().socket().bind(local);
}
@Override
public void connect(SocketAddress remote) throws IOException {
getChannel().connect(remote);
}
@Override
public boolean isConnected() {
return getChannel().isConnected();
}
@Override
public void disconnectSocket() throws IOException {
getChannel().socket().close();
}
@Override
public void closeSocket() throws IOException {
getChannel().socket().close();
}
@Override
public int write(ByteBuffer src) throws IOException {
return getChannel().write(src);
}
@Override
public boolean finishConnect() throws IOException {
return getChannel().finishConnect();
}
}

View File

@ -31,7 +31,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
@ -107,40 +106,7 @@ public class NioWorker extends AbstractNioWorker {
@Override
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
if (!isIoThread()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
if (!(channel instanceof NioAcceptedSocketChannel)) {
final Selector workerSelector = selector;
if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
} else {
// A write request can be made from an acceptor thread (boss)
// when a user attempted to write something in:
//
// * channelOpen()
// * channelBound()
// * channelConnected().
//
// In this case, there's no need to wake up the selector because
// the channel is not even registered yet at this moment.
}
return true;
}
return false;
}
@Override
protected void registerTask(AbstractNioChannel<?> channel, ChannelFuture future) {
protected void registerTask(AbstractNioChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
@ -155,13 +121,13 @@ public class NioWorker extends AbstractNioWorker {
try {
if (server) {
channel.channel.configureBlocking(false);
channel.getJdkChannel().configureBlocking(false);
}
boolean registered = channel.channel.isRegistered();
boolean registered = channel.getJdkChannel().isRegistered();
if (!registered) {
synchronized (channel.interestOpsLock) {
channel.channel.register(
channel.getJdkChannel().register(
selector, channel.getRawInterestOps(), channel);
}