Commit first round of classes to support nio2/async channel api. Still work in progress.. See #396

This commit is contained in:
Norman Maurer 2012-06-13 22:23:21 +02:00
parent fa63b89fc3
commit 18aaae3c2e
10 changed files with 592 additions and 19 deletions

View File

@ -281,6 +281,11 @@
<ignore>java.nio.channels.MembershipKey</ignore>
<ignore>java.net.StandardSocketOptions</ignore>
<ignore>java.net.StandardProtocolFamily</ignore>
<!-- Used for NIO2 -->
<ignore>java.nio.channels.AsynchronousChannel</ignore>
<ignore>java.nio.channels.AsynchronousSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousChannelGroup</ignore>
</ignores>
</configuration>
<executions>

View File

@ -85,7 +85,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private long writeCounter;
private boolean inFlushNow;
protected boolean inFlushNow;
private boolean flushNowPending;
/** Cache for the string representation of this channel */
@ -623,7 +623,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public final void flushNow() {
public void flushNow() {
if (inFlushNow) {
return;
}
@ -631,12 +631,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
inFlushNow = true;
ChannelHandlerContext ctx = directOutboundContext();
Throwable cause = null;
boolean handleFlush = true;
try {
if (ctx.hasOutboundByteBuffer()) {
ByteBuf out = ctx.outboundByteBuffer();
int oldSize = out.readableBytes();
try {
doFlushByteBuffer(out);
handleFlush = doFlushByteBuffer(out);
} catch (Throwable t) {
cause = t;
} finally {
@ -657,7 +658,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
writeCounter += oldSize - out.size();
}
}
if (handleFlush) {
if (cause == null) {
notifyFlushFutures();
} else {
@ -667,6 +668,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
close(voidFuture());
}
}
}
} finally {
inFlushNow = false;
}
@ -713,7 +715,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception;
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
throw new UnsupportedOperationException();
}
protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
@ -722,7 +724,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract boolean isFlushPending();
private void notifyFlushFutures() {
protected final void notifyFlushFutures() {
if (flushCheckpoints.isEmpty()) {
return;
}
@ -760,7 +762,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private void notifyFlushFutures(Throwable cause) {
protected final void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();

View File

@ -77,7 +77,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
throw new UnsupportedOperationException();
}

View File

@ -71,10 +71,11 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!lastOutboundBuffer().readable()) {
lastOutboundBuffer().discardReadBytes();
}
lastOutboundBuffer().writeBytes(buf);
return true;
}
}

View File

@ -85,11 +85,11 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
return true;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
@ -103,6 +103,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
break;
}
}
return true;
}
protected abstract int doReadBytes(ByteBuf buf) throws Exception;

View File

@ -0,0 +1,176 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractAsyncChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
protected ChannelFuture connectFuture;
protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractAsyncChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
public InetSocketAddress localAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.remoteAddress();
}
protected AsynchronousChannel javaChannel() {
return ch;
}
@Override
public ChannelConfig config() {
// TODO: Fix me
return null;
}
@Override
public boolean isOpen() {
return ch == null || ch.isOpen();
}
@Override
protected boolean isCompatible(EventLoop loop) {
// TODO: Fix me
return true;
}
@Override
protected void doDeregister() throws Exception {
throw new UnsupportedOperationException("Deregistration is not supported by AbstractAsyncChannel");
}
@Override
protected AsyncUnsafe newUnsafe() {
return new AsyncUnsafe();
}
protected class AsyncUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
connectFuture = future;
doConnect(remoteAddress, localAddress, future);
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (connectTimeoutException == null) {
connectTimeoutException = new ConnectException("connection timed out");
}
ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture;
if (connectFuture != null &&
connectFuture.setFailure(connectTimeoutException)) {
pipeline().fireExceptionCaught(connectTimeoutException);
close(voidFuture());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
protected final void connectFailed(Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
protected final void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {
boolean wasActive = isActive();
connectFuture.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectFuture = null;
}
}
}
protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture connectFuture);
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel {
private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler();
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class);
public AsyncServerSocketChannel() {
super(null, null);
}
@Override
protected AsynchronousServerSocketChannel javaChannel() {
return (AsynchronousServerSocketChannel) super.javaChannel();
}
@Override
public boolean isActive() {
return localAddress0() != null;
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
}
@Override
protected SocketAddress localAddress0() {
try {
return javaChannel().getLocalAddress();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected boolean isFlushPending() {
return false;
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Runnable doRegister() throws Exception {
ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
javaChannel().accept(this, ACCEPT_HANDLER);
return null;
}
private static final class AcceptHandler
implements CompletionHandler<AsynchronousSocketChannel, AsyncServerSocketChannel> {
public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) {
// register again this handler to accept new connections
channel.javaChannel().accept(channel, this);
// create the socket add it to the buffer and fire the event
channel.outboundMessageBuffer().add(new AsyncSocketchannel(channel, null));
channel.pipeline().fireInboundBufferUpdated();
}
public void failed(Throwable t, AsyncServerSocketChannel channel) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
}
}
}

View File

@ -0,0 +1,246 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncSocketchannel extends AbstractAsyncChannel {
private static final CompletionHandler<Void, AsyncSocketchannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AsyncSocketchannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AsyncSocketchannel> WRITE_HANDLER = new WriteHandler();
private final AtomicBoolean flushing = new AtomicBoolean(false);
public AsyncSocketchannel() {
this(null, null);
}
public AsyncSocketchannel(AsyncServerSocketChannel parent, Integer id) {
super(parent, id);
}
@Override
public boolean isActive() {
AsynchronousSocketChannel ch = javaChannel();
return ch.isOpen();
}
@Override
protected AsynchronousSocketChannel javaChannel() {
return (AsynchronousSocketChannel) super.javaChannel();
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.BYTE;
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) {
assert ch != null;
if (localAddress != null) {
try {
javaChannel().bind(localAddress);
} catch (IOException e) {
future.setFailure(e);
return;
}
}
javaChannel().connect(remoteAddress, this, CONNECT_HANDLER);
}
@Override
protected InetSocketAddress localAddress0() {
try {
return (InetSocketAddress) javaChannel().getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected InetSocketAddress remoteAddress0() {
try {
return (InetSocketAddress) javaChannel().getRemoteAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected Runnable doRegister() throws Exception {
assert ch == null;
ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
return null;
}
private void read() {
javaChannel().read(pipeline().inboundByteBuffer().nioBuffer(), this, READ_HANDLER);
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected boolean isFlushPending() {
// TODO: Fix me
return true;
}
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
if (flushing.compareAndSet(false, true)) {
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
}
return false;
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
private static final class WriteHandler implements CompletionHandler<Integer, AsyncSocketchannel> {
@Override
public void completed(Integer result, AsyncSocketchannel channel) {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
if (result > 0) {
channel.notifyFlushFutures();
}
channel.flushing.set(false);
}
@Override
public void failed(Throwable cause, AsyncSocketchannel channel) {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
}
channel.flushing.set(false);
}
}
private static final class ReadHandler implements CompletionHandler<Integer, AsyncSocketchannel> {
@Override
public void completed(Integer result, AsyncSocketchannel channel) {
assert channel.eventLoop().inEventLoop();
final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
try {
expandReadBuffer(byteBuf);
for (;;) {
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && channel.isOpen()) {
channel.close(channel.unsafe().voidFuture());
} else {
// start the next read
channel.read();
}
}
}
@Override
public void failed(Throwable t, AsyncSocketchannel channel) {
channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
} else {
// start the next read
channel.read();
}
}
}
private static final class ConnectHandler implements CompletionHandler<Void, AsyncSocketchannel> {
@Override
public void completed(Void result, AsyncSocketchannel channel) {
((AsyncUnsafe) channel.unsafe()).connectSuccess();
}
@Override
public void failed(Throwable exc, AsyncSocketchannel channel) {
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel
* API implementation - recommended for a large number of connections (&gt;= 1000).
*/
package io.netty.channel.socket.nio2;

View File

@ -86,11 +86,12 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
while (buf.readable()) {
doWriteBytes(buf);
}
buf.clear();
return true;
}
protected abstract int available();