Initial working version of the echo server example

- Optimized AbstractChannelBuffer.discardReadBytes()
- Split ChannelHandlerInvoker into ChannelInboundInvoker and
  ChannelOutboundInvoker
  - Channel implements ChannelOutboundInvoker
  - ChannelOutboundInvoker.nextOut() is now out()
  - ChannelOutboundHandlerContext.out() is now prevOut()
  - Added the outbound operations without future
    parameter to ChannelOutboundInvoker for user convenience
- All async operations which requires a ChannelFuture as a parameter
  now returns ChannelFuture for user convenience
- Added ChannelFutureFactory.newVoidFuture() to allow a user specify
  a dummy future that is of no use
  - I'm unsure if it is actually a good idea to introduce it. It might
    go away later.
- Made the contract of AbstractChannel.doXXX() much simpler and moved
  all common code up to AbstractChannel.DefaultUnsafe
- Added Channel.isOpen()
- Fixed a bug where MultithreadEventLoop always shut down its child
  event loops on construction
- Maybe more changes I don't remember :-)
This commit is contained in:
Trustin Lee 2012-05-09 22:09:06 +09:00
parent 607d784e5e
commit 129a2af86a
24 changed files with 863 additions and 449 deletions

View File

@ -76,7 +76,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer {
@Override
public boolean readable() {
return readableBytes() > 0;
return writerIndex > readerIndex;
}
@Override
@ -119,6 +119,12 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer {
if (readerIndex == 0) {
return;
}
if (readerIndex == writerIndex) {
clear();
return;
}
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
markedReaderIndex = Math.max(markedReaderIndex - readerIndex, 0);

View File

@ -15,14 +15,22 @@
*/
package io.netty.example.echo;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* Echoes back any received data from a client.
@ -35,21 +43,52 @@ public class EchoServer {
this.port = port;
}
public void run() {
public void run() throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class);
ServerSocketChannel ssc = new NioServerSocketChannel();
ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter<SocketChannel>() {
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
@Override
public ChannelBufferHolder<SocketChannel> newInboundBuffer(
ChannelInboundHandlerContext<SocketChannel> ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<SocketChannel>());
}
@Override
public void inboundBufferUpdated(
ChannelInboundHandlerContext<SocketChannel> ctx)
throws Exception {
Queue<SocketChannel> in = ctx.in().messageBuffer();
for (;;) {
SocketChannel s = in.poll();
if (s == null) {
break;
}
s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) {
return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer out = ctx.out().byteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
ctx.flush();
}
});
loop.register(s);
}
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
loop.register(ssc).awaitUninterruptibly().rethrowIfFailed();
ssc.bind(new InetSocketAddress(port), ssc.newFuture());
}
public static void main(String[] args) throws Exception {

View File

@ -20,8 +20,8 @@ import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.ConcurrentHashMap;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -35,9 +35,19 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
/**
* Generates a negative unique integer ID. This method generates only
* negative integers to avoid conflicts with user-specified IDs where only
* non-negative integers are allowed.
*/
private static Integer allocateId(Channel channel) {
Integer id = Integer.valueOf(System.identityHashCode(channel));
int idVal = -Math.abs(System.identityHashCode(channel));
if (idVal >= 0) {
idVal = -1;
}
Integer id;
for (;;) {
id = Integer.valueOf(idVal);
// Loop until a unique ID is acquired.
// It should be found in one loop practically.
if (allChannels.putIfAbsent(id, channel) == null) {
@ -45,22 +55,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return id;
} else {
// Taken by other channel at almost the same moment.
id = Integer.valueOf(id.intValue() + 1);
idVal --;
if (idVal >= 0) {
idVal = -1;
}
}
}
}
private final Integer id;
private final Channel parent;
private final Integer id;
private final Unsafe unsafe;
private final ChannelPipeline pipeline = new DefaultChannelPipeline(this);
private final List<ChannelFutureListener> closureListeners = new ArrayList<ChannelFutureListener>(4);
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
private volatile EventLoop eventLoop;
private volatile boolean registered;
private volatile boolean notifiedClosureListeners;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelFuture connectFuture;
private long writtenAmount;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -69,12 +89,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
/**
* Creates a new instance.
*
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
protected AbstractChannel(Channel parent, Integer id) {
if (id == null) {
id = allocateId(this);
} else {
if (id.intValue() < 0) {
throw new IllegalArgumentException("id: " + id + " (expected: >= 0)");
}
if (allChannels.putIfAbsent(id, this) != null) {
throw new IllegalArgumentException("duplicate ID: " + id);
}
}
this.parent = parent;
this.id = id;
unsafe = new DefaultUnsafe();
closureListeners.add(new ChannelFutureListener() {
@ -85,19 +120,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
});
}
/**
* (Internal use only) Creates a new temporary instance with the specified
* ID.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Integer id, Channel parent) {
this.id = id;
this.parent = parent;
unsafe = new DefaultUnsafe();
}
@Override
public final Integer id() {
return id;
@ -115,61 +137,123 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public EventLoop eventLoop() {
if (eventLoop == null) {
throw new IllegalStateException("channel not registered to an event loop");
}
return eventLoop;
}
@Override
public boolean isOpen() {
return unsafe().ch().isOpen();
}
@Override
public boolean isRegistered() {
return registered;
}
@Override
public void bind(SocketAddress localAddress, ChannelFuture future) {
pipeline().bind(localAddress, future);
public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = newFuture();
pipeline().bind(localAddress, f);
return f;
}
@Override
public void connect(SocketAddress remoteAddress, ChannelFuture future) {
pipeline().connect(remoteAddress, future);
public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = newFuture();
pipeline().connect(remoteAddress, f);
return f;
}
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
pipeline().connect(remoteAddress, localAddress, future);
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = newFuture();
pipeline().connect(remoteAddress, localAddress, f);
return f;
}
@Override
public void disconnect(ChannelFuture future) {
pipeline().disconnect(future);
public ChannelFuture disconnect() {
ChannelFuture f = newFuture();
pipeline().disconnect(f);
return f;
}
@Override
public void close(ChannelFuture future) {
pipeline().close(future);
public ChannelFuture close() {
ChannelFuture f = newFuture();
pipeline().close(f);
return f;
}
@Override
public void deregister(ChannelFuture future) {
pipeline().deregister(future);
public ChannelFuture deregister() {
ChannelFuture f = newFuture();
pipeline().deregister(f);
return f;
}
@Override
public ChannelFuture flush() {
ChannelFuture f = newFuture();
pipeline().flush(f);
return f;
}
@Override
public ChannelFuture write(Object message) {
ChannelFuture f = newFuture();
pipeline().write(message, f);
return f;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return pipeline().bind(localAddress, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
return pipeline().connect(remoteAddress, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return pipeline().connect(remoteAddress, localAddress, future);
}
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return pipeline().disconnect(future);
}
@Override
public ChannelFuture close(ChannelFuture future) {
return pipeline().close(future);
}
@Override
public ChannelFuture deregister(ChannelFuture future) {
return pipeline().deregister(future);
}
@Override
public ChannelBufferHolder<Object> out() {
return pipeline().nextOut();
return pipeline().out();
}
@Override
public void flush(ChannelFuture future) {
pipeline().flush(future);
public ChannelFuture flush(ChannelFuture future) {
return pipeline().flush(future);
}
@Override
public void write(Object message, ChannelFuture future) {
pipeline.write(message, future);
public ChannelFuture write(Object message, ChannelFuture future) {
return pipeline.write(message, future);
}
@Override
public ChannelFuture newFuture() {
return new DefaultChannelFuture(this, false);
@ -185,6 +269,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return new FailedChannelFuture(this, cause);
}
@Override
public ChannelFuture newVoidFuture() {
return voidFuture;
}
@Override
public void addClosureListener(final ChannelFutureListener listener) {
@ -370,7 +458,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public SocketAddress remoteAddress() {
// TODO Auto-generated method stub
return remoteAddress0();
}
@ -385,22 +472,53 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
AbstractChannel.this.eventLoop = eventLoop;
assert eventLoop().inEventLoop();
doRegister(future);
assert future.isDone();
if (registered = future.isSuccess()) {
if (!ensureOpen(future)) {
return;
}
try {
doRegister();
registered = true;
future.setSuccess();
pipeline().fireChannelRegistered();
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
try {
doClose();
} catch (Throwable t2) {
logger.warn("Failed to close a channel", t2);
}
future.setFailure(t);
pipeline().fireExceptionCaught(t);
}
}
@Override
public void bind(final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
doBind(localAddress, future);
if (!ensureOpen(future)) {
return;
}
try {
boolean wasActive = isActive();
doBind(localAddress);
future.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doBind(localAddress, future);
bind(localAddress, future);
}
});
}
@ -408,20 +526,35 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) {
// XXX: What if a user makes a connection attempt twice?
if (eventLoop().inEventLoop()) {
doConnect(remoteAddress, localAddress, future);
if (!future.isDone()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
future.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} else {
connectFuture = future;
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doConnect(remoteAddress, localAddress, future);
if (!future.isDone()) {
connectFuture = future;
}
connect(remoteAddress, localAddress, future);
}
});
}
@ -431,18 +564,37 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
public void finishConnect() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
doFinishConnect(connectFuture);
try {
doFinishConnect();
connectFuture.setSuccess();
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectFuture = null;
}
}
@Override
public void disconnect(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
doDisconnect(future);
try {
boolean wasActive = isActive();
doDisconnect();
future.setSuccess();
if (wasActive && !isActive()) {
pipeline().fireChannelInactive();
}
} catch (Throwable t) {
future.setFailure(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doDisconnect(future);
disconnect(future);
}
});
}
@ -451,14 +603,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public void close(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
doClose(future);
if (isOpen()) {
boolean wasActive = isActive();
try {
doClose();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
if (wasActive && !isActive()) {
pipeline().fireChannelInactive();
}
notifyClosureListeners();
} else {
// Closed already.
future.setSuccess();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doClose(future);
notifyClosureListeners();
close(future);
}
});
}
@ -468,47 +633,99 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
public void deregister(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
try {
doDeregister(future);
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
future.setSuccess();
registered = false;
pipeline().fireChannelUnregistered();
eventLoop = null;
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
deregister(future);
}
});
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
// FIXME: Wrap with a loop
long readAmount = 0;
try {
boolean closeIfClosed = false;
for (;;) {
int localReadAmount = doRead();
if (localReadAmount > 0) {
readAmount += localReadAmount;
continue;
}
if (localReadAmount == 0) {
break;
}
if (localReadAmount < 0) {
closeIfClosed = true;
break;
}
}
if (readAmount > 0) {
pipeline.fireInboundBufferUpdated();
}
if (closeIfClosed) {
closeIfClosed();
}
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
}
@Override
public void flush(final ChannelFuture future) {
// FIXME: Notify future properly using writtenAmount.
if (eventLoop().inEventLoop()) {
try {
writtenAmount += doFlush();
} catch (Exception e) {
future.setFailure(e);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
doDeregister(future);
} finally {
registered = false;
pipeline().fireChannelUnregistered();
eventLoop = null;
writtenAmount += doFlush();
} catch (Exception e) {
future.setFailure(e);
}
}
});
}
}
@Override
public int read() throws IOException {
assert eventLoop().inEventLoop();
return doRead();
private boolean ensureOpen(ChannelFuture future) {
if (isOpen()) {
return true;
}
@Override
public int flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
return doFlush(future);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
doFlush(future);
Exception e = new ClosedChannelException();
future.setFailure(e);
pipeline().fireExceptionCaught(e);
return false;
}
});
return -1; // Unknown
private void closeIfClosed() {
if (isOpen()) {
return;
}
close(newFuture());
}
}
@ -518,14 +735,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract SocketAddress localAddress0();
protected abstract SocketAddress remoteAddress0();
protected abstract void doRegister(ChannelFuture future);
protected abstract void doBind(SocketAddress localAddress, ChannelFuture future);
protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
protected abstract void doFinishConnect(ChannelFuture future);
protected abstract void doDisconnect(ChannelFuture future);
protected abstract void doClose(ChannelFuture future);
protected abstract void doDeregister(ChannelFuture future);
protected abstract void doRegister() throws Exception;
protected abstract void doBind(SocketAddress localAddress) throws Exception;
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected abstract void doFinishConnect() throws Exception;
protected abstract void doDisconnect() throws Exception;
protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception;
protected abstract int doRead();
protected abstract int doFlush(ChannelFuture future);
protected abstract int doRead() throws Exception;
protected abstract int doFlush() throws Exception;
}

View File

@ -37,8 +37,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
/**
* Creates a new instance.
*/
protected AbstractServerChannel() {
super(null);
protected AbstractServerChannel(Integer id) {
super(null, id);
}
@Override
@ -62,24 +62,23 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect(ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect(ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
return 0;
protected int doFlush() throws Exception {
throw new UnsupportedOperationException();
}
private static class NoopQueue extends AbstractQueue<Object> {

View File

@ -21,7 +21,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
import io.netty.util.AttributeMap;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
@ -106,7 +105,7 @@ import java.nio.channels.SelectionKey;
*
* @apiviz.exclude ^io\.netty\.channel\.([a-z]+\.)+[^\.]+Channel$
*/
public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<Channel> {
public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFutureFactory, Comparable<Channel> {
/**
* Returns the unique integer ID of this channel.
@ -134,6 +133,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
*/
ChannelPipeline pipeline();
boolean isOpen();
boolean isRegistered();
boolean isActive();
@ -164,17 +164,6 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
*/
SocketAddress remoteAddress();
void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
void disconnect(ChannelFuture future);
void close(ChannelFuture future);
void deregister(ChannelFuture future);
ChannelBufferHolder<Object> out();
void flush(ChannelFuture future);
void write(Object message, ChannelFuture future);
// FIXME: Introduce more flexible channel state notification mechanism
// - notify me when channel becomes (un)registered, (in)active
void addClosureListener(ChannelFutureListener listener);
@ -197,7 +186,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable<
void close(ChannelFuture future);
void deregister(ChannelFuture future);
int read() throws IOException;
int flush(ChannelFuture future);
void read();
void flush(ChannelFuture future);
}
}

View File

@ -4,4 +4,5 @@ public interface ChannelFutureFactory {
ChannelFuture newFuture();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelFuture newVoidFuture();
}

View File

@ -114,9 +114,9 @@ public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, Ch
@Override
public void flush(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
if (ctx.out().hasMessageBuffer()) {
Queue<O> out = ctx.out().messageBuffer();
Queue<Object> nextOut = ctx.nextOut().messageBuffer();
if (ctx.prevOut().hasMessageBuffer()) {
Queue<O> out = ctx.prevOut().messageBuffer();
Queue<Object> nextOut = ctx.out().messageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
@ -125,8 +125,8 @@ public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, Ch
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer nextOut = ctx.nextOut().byteBuffer();
ChannelBuffer out = ctx.prevOut().byteBuffer();
ChannelBuffer nextOut = ctx.out().byteBuffer();
nextOut.writeBytes(out);
}
ctx.flush(future);

View File

@ -122,7 +122,9 @@ import java.nio.channels.Channels;
* pipeline, and how to handle the event in your application.
* @apiviz.owns io.netty.channel.ChannelHandler
*/
public interface ChannelHandlerContext extends AttributeMap, ChannelHandlerInvoker, ChannelFutureFactory {
public interface ChannelHandlerContext
extends AttributeMap, ChannelFutureFactory,
ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel();
ChannelPipeline pipeline();

View File

@ -1,26 +0,0 @@
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelHandlerInvoker {
ChannelBufferHolder<Object> nextIn();
ChannelBufferHolder<Object> nextOut();
void fireChannelRegistered();
void fireChannelUnregistered();
void fireChannelActive();
void fireChannelInactive();
void fireExceptionCaught(Throwable cause);
void fireUserEventTriggered(Object event);
void fireInboundBufferUpdated();
void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
void disconnect(ChannelFuture future);
void close(ChannelFuture future);
void deregister(ChannelFuture future);
void flush(ChannelFuture future);
void write(Object message, ChannelFuture future);
}

View File

@ -0,0 +1,14 @@
package io.netty.channel;
public interface ChannelInboundInvoker {
ChannelBufferHolder<Object> nextIn();
void fireChannelRegistered();
void fireChannelUnregistered();
void fireChannelActive();
void fireChannelInactive();
void fireExceptionCaught(Throwable cause);
void fireUserEventTriggered(Object event);
void fireInboundBufferUpdated();
}

View File

@ -59,9 +59,9 @@ public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<
@Override
public void flush(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
if (ctx.out().hasMessageBuffer()) {
Queue<O> out = ctx.out().messageBuffer();
Queue<Object> nextOut = ctx.nextOut().messageBuffer();
if (ctx.prevOut().hasMessageBuffer()) {
Queue<O> out = ctx.prevOut().messageBuffer();
Queue<Object> nextOut = ctx.out().messageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
@ -70,8 +70,8 @@ public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer nextOut = ctx.nextOut().byteBuffer();
ChannelBuffer out = ctx.prevOut().byteBuffer();
ChannelBuffer nextOut = ctx.out().byteBuffer();
nextOut.writeBytes(out);
}
ctx.flush(future);

View File

@ -2,5 +2,5 @@ package io.netty.channel;
public interface ChannelOutboundHandlerContext<O> extends ChannelHandlerContext {
ChannelBufferHolder<O> out();
ChannelBufferHolder<O> prevOut();
}

View File

@ -0,0 +1,25 @@
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundInvoker {
ChannelBufferHolder<Object> out();
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture flush();
ChannelFuture write(Object message);
ChannelFuture bind(SocketAddress localAddress, ChannelFuture future);
ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
ChannelFuture disconnect(ChannelFuture future);
ChannelFuture close(ChannelFuture future);
ChannelFuture deregister(ChannelFuture future);
ChannelFuture flush(ChannelFuture future);
ChannelFuture write(Object message, ChannelFuture future);
}

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ChannelBuffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -202,7 +203,7 @@ import java.util.NoSuchElementException;
* @apiviz.owns io.netty.channel.ChannelHandler
* @apiviz.uses io.netty.channel.ChannelSink - - sends events downstream
*/
public interface ChannelPipeline extends ChannelHandlerInvoker {
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.

View File

@ -521,7 +521,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelBufferHolder<Object> nextOut() {
public ChannelBufferHolder<Object> out() {
return channel().unsafe().out();
}
@ -657,12 +657,68 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void bind(SocketAddress localAddress, ChannelFuture future) {
bind(firstOutboundContext(), localAddress, future);
public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = channel().newFuture();
bind(localAddress, f);
return f;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = channel().newFuture();
connect(remoteAddress, f);
return f;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = channel().newFuture();
connect(remoteAddress, localAddress, f);
return f;
}
@Override
public ChannelFuture disconnect() {
ChannelFuture f = channel().newFuture();
disconnect(f);
return f;
}
@Override
public ChannelFuture close() {
ChannelFuture f = channel().newFuture();
close(f);
return f;
}
@Override
public ChannelFuture deregister() {
ChannelFuture f = channel().newFuture();
deregister(f);
return f;
}
@Override
public ChannelFuture flush() {
ChannelFuture f = channel().newFuture();
flush(f);
return f;
}
@Override
public ChannelFuture write(Object message) {
ChannelFuture f = channel().newFuture();
write(message, f);
return f;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return bind(firstOutboundContext(), localAddress, future);
}
@SuppressWarnings("unchecked")
private void bind(DefaultChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) {
private ChannelFuture bind(DefaultChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
@ -675,20 +731,21 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} else {
channel().unsafe().bind(localAddress, future);
}
return future;
}
@Override
public void connect(SocketAddress remoteAddress, ChannelFuture future) {
connect(remoteAddress, null, future);
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
return connect(remoteAddress, null, future);
}
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
connect(firstOutboundContext(), remoteAddress, localAddress, future);
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return connect(firstOutboundContext(), remoteAddress, localAddress, future);
}
@SuppressWarnings("unchecked")
private void connect(DefaultChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
private ChannelFuture connect(DefaultChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
@ -702,15 +759,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} else {
channel().unsafe().connect(remoteAddress, localAddress, future);
}
return future;
}
@Override
public void disconnect(ChannelFuture future) {
disconnect(firstOutboundContext(), future);
public ChannelFuture disconnect(ChannelFuture future) {
return disconnect(firstOutboundContext(), future);
}
@SuppressWarnings("unchecked")
private void disconnect(DefaultChannelHandlerContext ctx, ChannelFuture future) {
private ChannelFuture disconnect(DefaultChannelHandlerContext ctx, ChannelFuture future) {
if (ctx != null) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).disconnect(ctx, future);
@ -720,15 +779,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} else {
channel().unsafe().disconnect(future);
}
return future;
}
@Override
public void close(ChannelFuture future) {
close(firstOutboundContext(), future);
public ChannelFuture close(ChannelFuture future) {
return close(firstOutboundContext(), future);
}
@SuppressWarnings("unchecked")
private void close(DefaultChannelHandlerContext ctx, ChannelFuture future) {
private ChannelFuture close(DefaultChannelHandlerContext ctx, ChannelFuture future) {
if (ctx != null) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).close(ctx, future);
@ -738,15 +799,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} else {
channel().unsafe().close(future);
}
return future;
}
@Override
public void deregister(final ChannelFuture future) {
deregister(firstOutboundContext(), future);
public ChannelFuture deregister(final ChannelFuture future) {
return deregister(firstOutboundContext(), future);
}
@SuppressWarnings("unchecked")
private void deregister(DefaultChannelHandlerContext ctx, ChannelFuture future) {
private ChannelFuture deregister(DefaultChannelHandlerContext ctx, ChannelFuture future) {
if (ctx != null) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).deregister(ctx, future);
@ -756,46 +819,39 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} else {
channel().unsafe().deregister(future);
}
return future;
}
@Override
public void flush(ChannelFuture future) {
DefaultChannelHandlerContext ctx = firstOutboundContext();
if (ctx != null) {
flush(ctx, future);
} else {
channel().unsafe().flush(future);
}
public ChannelFuture flush(ChannelFuture future) {
return flush(firstOutboundContext(), future);
}
@SuppressWarnings("unchecked")
private void flush(DefaultChannelHandlerContext ctx, ChannelFuture future) {
private ChannelFuture flush(DefaultChannelHandlerContext ctx, ChannelFuture future) {
if (ctx != null) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).flush(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
channel().unsafe().flush(future);
}
return future;
}
@Override
public void write(Object message, ChannelFuture future) {
public ChannelFuture write(Object message, ChannelFuture future) {
if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
nextOut().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else {
nextOut().messageBuffer().add(message);
out().messageBuffer().add(message);
}
flush(future);
}
private static void write(DefaultChannelHandlerContext ctx, Object message, ChannelFuture future) {
if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
ctx.nextOut().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else {
ctx.nextOut().messageBuffer().add(message);
}
ctx.flush(future);
return flush(future);
}
private DefaultChannelHandlerContext firstInboundContext() {
@ -922,7 +978,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final boolean canHandleInbound;
private final boolean canHandleOutbound;
private final ChannelBufferHolder<Object> in;
private final ChannelBufferHolder<Object> out;
private final ChannelBufferHolder<Object> prevOut;
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
@ -961,7 +1017,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
if (canHandleOutbound) {
try {
out = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
prevOut = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
} finally {
@ -970,7 +1026,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
} else {
out = null;
prevOut = null;
}
}
@ -1010,8 +1066,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelBufferHolder<Object> out() {
return out;
public ChannelBufferHolder<Object> prevOut() {
return prevOut;
}
@Override
@ -1025,10 +1081,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelBufferHolder<Object> nextOut() {
public ChannelBufferHolder<Object> out() {
DefaultChannelHandlerContext next = nextOutboundContext(prev);
if (next != null) {
return next.out();
return next.prevOut();
} else {
return channel().unsafe().out();
}
@ -1091,43 +1147,105 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void bind(SocketAddress localAddress, ChannelFuture future) {
DefaultChannelPipeline.this.bind(nextOutboundContext(prev), localAddress, future);
public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = newFuture();
bind(localAddress, f);
return f;
}
@Override
public void connect(SocketAddress remoteAddress, ChannelFuture future) {
connect(remoteAddress, null, future);
public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = newFuture();
connect(remoteAddress, f);
return f;
}
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
DefaultChannelPipeline.this.connect(nextOutboundContext(prev), remoteAddress, localAddress, future);
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = newFuture();
connect(remoteAddress, localAddress, f);
return f;
}
@Override
public void disconnect(ChannelFuture future) {
DefaultChannelPipeline.this.disconnect(nextOutboundContext(prev), future);
public ChannelFuture disconnect() {
ChannelFuture f = newFuture();
disconnect(f);
return f;
}
@Override
public void close(ChannelFuture future) {
DefaultChannelPipeline.this.close(nextOutboundContext(prev), future);
public ChannelFuture close() {
ChannelFuture f = newFuture();
close(f);
return f;
}
@Override
public void deregister(ChannelFuture future) {
DefaultChannelPipeline.this.deregister(nextOutboundContext(prev), future);
public ChannelFuture deregister() {
ChannelFuture f = newFuture();
deregister(f);
return f;
}
@Override
public void flush(ChannelFuture future) {
DefaultChannelPipeline.this.flush(nextOutboundContext(prev), future);
public ChannelFuture flush() {
ChannelFuture f = newFuture();
flush(f);
return f;
}
@Override
public void write(Object message, ChannelFuture future) {
DefaultChannelPipeline.write(nextOutboundContext(prev), message, future);
public ChannelFuture write(Object message) {
ChannelFuture f = newFuture();
write(message, f);
return f;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return DefaultChannelPipeline.this.bind(nextOutboundContext(prev), localAddress, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
return connect(remoteAddress, null, future);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return DefaultChannelPipeline.this.connect(nextOutboundContext(prev), remoteAddress, localAddress, future);
}
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return DefaultChannelPipeline.this.disconnect(nextOutboundContext(prev), future);
}
@Override
public ChannelFuture close(ChannelFuture future) {
return DefaultChannelPipeline.this.close(nextOutboundContext(prev), future);
}
@Override
public ChannelFuture deregister(ChannelFuture future) {
return DefaultChannelPipeline.this.deregister(nextOutboundContext(prev), future);
}
@Override
public ChannelFuture flush(ChannelFuture future) {
return DefaultChannelPipeline.this.flush(nextOutboundContext(prev), future);
}
@Override
public ChannelFuture write(Object message, ChannelFuture future) {
if (message instanceof ChannelBuffer) {
ChannelBuffer m = (ChannelBuffer) message;
out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes());
} else {
out().messageBuffer().add(message);
}
return flush(future);
}
@Override
@ -1144,5 +1262,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public ChannelFuture newFailedFuture(Throwable cause) {
return channel().newFailedFuture(cause);
}
@Override
public ChannelFuture newVoidFuture() {
return channel().newVoidFuture();
}
}
}

View File

@ -4,6 +4,6 @@ import java.util.concurrent.ExecutorService;
public interface EventLoop extends ExecutorService {
ChannelFuture register(Channel channel);
EventLoop register(Channel channel, ChannelFuture future);
ChannelFuture register(Channel channel, ChannelFuture future);
boolean inEventLoop();
}

View File

@ -17,6 +17,9 @@ public class MultithreadEventLoop implements EventLoop {
private final EventLoop[] children;
private final AtomicInteger childIndex = new AtomicInteger();
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType) {
this(loopType, Runtime.getRuntime().availableProcessors() * 2);
}
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType, int nThreads) {
this(loopType, nThreads, Executors.defaultThreadFactory());
@ -35,17 +38,21 @@ public class MultithreadEventLoop implements EventLoop {
children = new EventLoop[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = loopType.getConstructor(ThreadFactory.class).newInstance(threadFactory);
success = true;
} catch (Exception e) {
throw new EventLoopException("failed to create a child event loop: " + loopType.getName(), e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdown();
}
}
}
}
}
@Override
public void shutdown() {
@ -152,7 +159,7 @@ public class MultithreadEventLoop implements EventLoop {
}
@Override
public EventLoop register(Channel channel, ChannelFuture future) {
public ChannelFuture register(Channel channel, ChannelFuture future) {
return nextEventLoop().register(channel, future);
}

View File

@ -51,13 +51,13 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
@Override
public ChannelFuture register(Channel channel) {
ChannelFuture future = new DefaultChannelFuture(channel, false);
ChannelFuture future = channel.newFuture();
register(channel, future);
return future;
}
@Override
public EventLoop register(final Channel channel, final ChannelFuture future) {
public ChannelFuture register(final Channel channel, final ChannelFuture future) {
if (inEventLoop()) {
channel.unsafe().register(this, future);
} else {
@ -68,7 +68,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
}
});
}
return this;
return future;
}
protected void interruptThread() {

View File

@ -0,0 +1,129 @@
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
public class VoidChannelFuture implements ChannelFuture {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(VoidChannelFuture.class);
private final Channel channel;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
public VoidChannelFuture(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
@Override
public void addListener(final ChannelFutureListener listener) {
fail();
}
@Override
public void removeListener(ChannelFutureListener listener) {
// NOOP
}
@Override
public ChannelFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
fail();
return false;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
fail();
return false;
}
@Override
public ChannelFuture awaitUninterruptibly() {
fail();
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
fail();
return false;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
fail();
return false;
}
@Override
public Channel channel() {
return channel;
}
@Override
public boolean isDone() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public Throwable cause() {
return null;
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
fail();
return this;
}
@Override
public boolean setProgress(long amount, long current, long total) {
return false;
}
@Override
public boolean setFailure(Throwable cause) {
return false;
}
@Override
public boolean setSuccess() {
return false;
}
@Override
public boolean cancel() {
return false;
}
private static void fail() {
throw new IllegalStateException("void future");
}
}

View File

@ -45,6 +45,8 @@ public interface DatagramChannel extends Channel {
*/
void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future);
void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future);
/**
* Leaves a multicast group.
*/

View File

@ -18,7 +18,6 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
@ -33,13 +32,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private volatile SelectionKey selectionKey;
protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) {
super(id, parent);
this.ch = ch;
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch) {
super(parent);
protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch) {
super(parent, id);
this.ch = ch;
}
@ -87,16 +81,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
public abstract NioChannelConfig config();
@Override
protected void doRegister(ChannelFuture future) {
protected void doRegister() throws Exception {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
try {
selectionKey = javaChannel().register(loop.selector, javaChannel().validOps() & ~SelectionKey.OP_WRITE, this);
} catch (Exception e) {
throw new ChannelException("failed to register a channel", e);
}
}
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger;
@ -29,7 +28,7 @@ import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
final class NioServerSocketChannel extends AbstractServerChannel
public class NioServerSocketChannel extends AbstractServerChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final InternalLogger logger =
@ -41,6 +40,8 @@ final class NioServerSocketChannel extends AbstractServerChannel
private volatile SelectionKey selectionKey;
public NioServerSocketChannel() {
super(null);
try {
socket = ServerSocketChannel.open();
} catch (IOException e) {
@ -109,73 +110,37 @@ final class NioServerSocketChannel extends AbstractServerChannel
}
@Override
protected void doRegister(ChannelFuture future) {
protected void doRegister() throws Exception {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
try {
selectionKey = javaChannel().register(loop.selector, javaChannel().validOps(), this);
} catch (Exception e) {
throw new ChannelException("failed to register a channel", e);
}
}
@Override
protected void doBind(SocketAddress localAddress, ChannelFuture future) {
try {
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
future.setSuccess();
pipeline().fireChannelActive();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected void doClose(ChannelFuture future) {
try {
protected void doClose() throws Exception {
javaChannel().close();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
future.setSuccess();
pipeline().fireChannelInactive();
if (isRegistered()) {
deregister(null);
}
}
@Override
protected void doDeregister(ChannelFuture future) {
try {
protected void doDeregister() throws Exception {
selectionKey.cancel();
future.setSuccess();
pipeline().fireChannelUnregistered();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected int doRead() {
int acceptedConns = 0;
for (;;) {
try {
protected int doRead() throws Exception {
java.nio.channels.SocketChannel ch = javaChannel().accept();
if (ch == null) {
break;
return 0;
}
pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, ch));
} catch (ChannelException e) {
pipeline().fireExceptionCaught(e);
} catch (Exception e) {
pipeline().fireExceptionCaught(new ChannelException("failed to accept a connection", e));
}
}
return acceptedConns;
pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, null, ch));
return 1;
}
}

View File

@ -21,14 +21,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@ -40,43 +38,35 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
private final ChannelBufferHolder<?> out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
private static SocketChannel newSocket() {
SocketChannel socket;
try {
socket = SocketChannel.open();
return SocketChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
boolean success = false;
public NioSocketChannel() {
this(null, null, newSocket());
}
public NioSocketChannel(Channel parent, Integer id, SocketChannel socket) {
super(parent, id, socket);
try {
socket.configureBlocking(false);
success = true;
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.",
e);
"Failed to close a partially initialized socket.", e2);
}
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
return socket;
}
public NioSocketChannel(Channel parent) {
this(parent, newSocket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new DefaultNioSocketChannelConfig(socket.socket());
}
@ -113,120 +103,58 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
}
@Override
protected void doBind(SocketAddress localAddress, ChannelFuture future) {
try {
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
future.setSuccess();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) {
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
try {
javaChannel().socket().bind(localAddress);
} catch (Exception e) {
future.setFailure(e);
}
}
boolean success = false;
try {
if (javaChannel().connect(remoteAddress)) {
future.setSuccess();
pipeline().fireChannelActive();
boolean connected = javaChannel().connect(remoteAddress);
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
} catch (Exception e) {
future.setFailure(e);
close(null);
}
}
@Override
protected void doFinishConnect(ChannelFuture future) {
try {
if (javaChannel().finishConnect()) {
future.setSuccess();
pipeline().fireChannelActive();
}
} catch (Exception e) {
future.setFailure(e);
close(null);
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
@Override
protected void doDisconnect(ChannelFuture future) {
doClose(future);
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose(ChannelFuture future) {
try {
protected void doClose() throws Exception {
javaChannel().close();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
future.setSuccess();
pipeline().fireChannelInactive();
if (isRegistered()) {
deregister(null);
}
}
@Override
protected void doDeregister(ChannelFuture future) {
try {
protected void doDeregister() throws Exception {
selectionKey().cancel();
future.setSuccess();
pipeline().fireChannelUnregistered();
} catch (Exception e) {
future.setFailure(e);
}
}
@Override
protected int doRead() {
final SocketChannel ch = javaChannel();
int ret = 0;
int readBytes = 0;
boolean failure = true;
protected int doRead() throws Exception {
ChannelBuffer buf = pipeline().nextIn().byteBuffer();
try {
while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) {
readBytes += ret;
if (!buf.writable()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
}
if (readBytes > 0) {
pipeline().fireInboundBufferUpdated();
}
if (ret < 0 || failure) {
selectionKey().cancel(); // Some JDK implementations run into an infinite loop without this.
close(null);
return -1;
}
return readBytes;
return buf.writeBytes(javaChannel(), buf.writableBytes());
}
@Override
protected int doFlush(ChannelFuture future) {
protected int doFlush() throws Exception {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
@ -236,26 +164,22 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
final ChannelBuffer buf = unsafe().out().byteBuffer();
int bytesLeft = buf.readableBytes();
if (bytesLeft == 0) {
future.setSuccess();
return 0;
}
int readerIndex = buf.readerIndex();
int localWrittenBytes = 0;
int writtenBytes = 0;
try {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.getBytes(readerIndex, ch, bytesLeft);
localWrittenBytes = buf.readBytes(ch, bytesLeft);
if (localWrittenBytes > 0) {
bytesLeft -= localWrittenBytes;
if (bytesLeft <= 0) {
removeOpWrite = true;
future.setSuccess();
break;
}
readerIndex += localWrittenBytes;
writtenBytes += localWrittenBytes;
} else {
addOpWrite = true;
@ -265,11 +189,10 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
open = false;
close(null);
selectionKey().cancel();
ch.close();
}
}

View File

@ -33,7 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
abstract class SelectorEventLoop extends SingleThreadEventLoop {
public class SelectorEventLoop extends SingleThreadEventLoop {
/**
* Internal Netty logger.
*/
@ -57,19 +57,19 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
protected SelectorEventLoop() {
public SelectorEventLoop() {
this(Executors.defaultThreadFactory());
}
protected SelectorEventLoop(ThreadFactory threadFactory) {
public SelectorEventLoop(ThreadFactory threadFactory) {
this(threadFactory, SelectorProvider.provider());
}
protected SelectorEventLoop(SelectorProvider selectorProvider) {
public SelectorEventLoop(SelectorProvider selectorProvider) {
this(Executors.defaultThreadFactory(), selectorProvider);
}
protected SelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) {
public SelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(threadFactory);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
@ -190,7 +190,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (ch.unsafe().read() < 0) {
ch.unsafe().read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
continue;
}
@ -265,4 +266,11 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
}
return false;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}