Second round of new channel handler API design

- Rename ChannelReader to ChannelInboundHandler
- Rename ChannelWriter to ChannelOutboundHandler
- Introduce ChannelBufferHolder instead of adding the common super type
  of message buffers and byte buffers
  - This is more type-safe and natural.
- Remove the notification methods for buffer closure (might add back
  later when revisiting half-closed connection support)
This commit is contained in:
Trustin Lee 2012-04-29 17:53:50 +09:00
parent c7bd0b41e6
commit cdd1ba93f0
16 changed files with 332 additions and 228 deletions

View File

@ -0,0 +1,50 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.util.Queue;
public final class ChannelBufferHolder<E> {
private final Queue<E> msgBuf;
private final ChannelBuffer byteBuf;
ChannelBufferHolder(Queue<E> msgBuf) {
if (msgBuf == null) {
throw new NullPointerException("msgBuf");
}
this.msgBuf = msgBuf;
byteBuf = null;
}
ChannelBufferHolder(ChannelBuffer byteBuf) {
if (byteBuf == null) {
throw new NullPointerException("byteBuf");
}
msgBuf = null;
this.byteBuf = byteBuf;
}
public boolean hasMessageBuffer() {
return msgBuf != null;
}
public boolean hasByteBuffer() {
return byteBuf != null;
}
public Queue<E> messageBuffer() {
if (!hasMessageBuffer()) {
throw new IllegalStateException("does not have a message buffer");
}
return msgBuf;
}
public ChannelBuffer byteBuffer() {
if (!hasByteBuffer()) {
throw new IllegalStateException("does not have a byte buffer");
}
return byteBuf;
}
}

View File

@ -0,0 +1,20 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.util.Queue;
public final class ChannelBufferHolders {
public static <E> ChannelBufferHolder<E> messageBuffer(Queue<E> buffer) {
return new ChannelBufferHolder<E>(buffer);
}
public static ChannelBufferHolder<Byte> byteBuffer(ChannelBuffer buffer) {
return new ChannelBufferHolder<Byte>(buffer);
}
private ChannelBufferHolders() {
// Utility class
}
}

View File

@ -1,11 +1,12 @@
package io.netty.channel;
import io.netty.util.internal.QueueFactory;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelHandlerAdapter<I, O> implements ChannelReader<I>, ChannelWriter<O> {
public class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
@ -27,89 +28,105 @@ public class ChannelHandlerAdapter<I, O> implements ChannelReader<I>, ChannelWri
}
@Override
public void channelRegistered(ChannelReaderContext<I> ctx) throws Exception {
public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelRegistered();
}
@Override
public void channelUnregistered(ChannelReaderContext<I> ctx) throws Exception {
public void channelUnregistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelUnregistered();
}
@Override
public void channelActive(ChannelReaderContext<I> ctx) throws Exception {
public void channelActive(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelActive();
}
@Override
public void channelInactive(ChannelReaderContext<I> ctx) throws Exception {
public void channelInactive(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelInactive();
}
@Override
public void exceptionCaught(ChannelReaderContext<I> ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelInboundHandlerContext<I> ctx, Throwable cause) throws Exception {
ctx.next().exceptionCaught(cause);
}
@Override
public void userEventTriggered(ChannelReaderContext<I> ctx, Object evt) throws Exception {
public void userEventTriggered(ChannelInboundHandlerContext<I> ctx, Object evt) throws Exception {
ctx.next().userEventTriggered(evt);
}
@Override
@SuppressWarnings("unchecked")
public Queue<I> newReceiveBuffer(ChannelReaderContext<I> ctx) throws Exception {
return (Queue<I>) QueueFactory.createQueue(Object.class);
public ChannelBufferHolder<I> newInboundBuffer(ChannelInboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<I>());
}
@Override
public void receiveBufferUpdated(ChannelReaderContext<I> ctx) throws Exception {
ctx.in().transferTo(ctx.next().in());
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
if (ctx.in().hasMessageBuffer()) {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> nextIn = ctx.next().in().messageBuffer();
for (;;) {
I msg = in.poll();
if (msg == null) {
break;
}
nextIn.add(msg);
}
} else {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer nextIn = ctx.next().in().byteBuffer();
nextIn.writeBytes(in);
}
}
@Override
public void receiveBufferClosed(ChannelReaderContext<I> ctx) throws Exception {
ctx.next().in().close();
}
@Override
public void bind(ChannelWriterContext<O> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
public void bind(ChannelOutboundHandlerContext<O> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().bind(localAddress, future);
}
@Override
public void connect(ChannelWriterContext<O> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception {
public void connect(ChannelOutboundHandlerContext<O> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelWriterContext<O> ctx, ChannelFuture future) throws Exception {
public void disconnect(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().disconnect(future);
}
@Override
public void close(ChannelWriterContext<O> ctx, ChannelFuture future) throws Exception {
public void close(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().close(future);
}
@Override
public void deregister(ChannelWriterContext<O> ctx, ChannelFuture future) throws Exception {
public void deregister(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().deregister(future);
}
@Override
@SuppressWarnings("unchecked")
public Queue<O> newSendBuffer(ChannelWriterContext<O> ctx) throws Exception {
return (Queue<O>) QueueFactory.createQueue(Object.class);
public ChannelBufferHolder<O> newOutboundBuffer(ChannelOutboundHandlerContext<O> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<O>());
}
@Override
public void sendBufferUpdated(ChannelWriterContext<O> ctx) throws Exception {
ctx.out().transferTo(ctx.next().out());
}
@Override
public void sendBufferClosed(ChannelWriterContext<O> ctx) throws Exception {
ctx.next().out().close();
public void outboundBufferUpdated(ChannelOutboundHandlerContext<O> ctx) throws Exception {
if (ctx.out().hasMessageBuffer()) {
Queue<O> out = ctx.out().messageBuffer();
Queue<Object> nextOut = ctx.next().out().messageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
break;
}
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer nextOut = ctx.next().out().byteBuffer();
nextOut.writeBytes(out);
}
}
}

View File

@ -19,7 +19,6 @@ package io.netty.channel;
import io.netty.util.AttributeMap;
import java.net.SocketAddress;
import java.util.Queue;
/**
* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline}
@ -127,7 +126,7 @@ public interface ChannelHandlerContext extends AttributeMap {
String name();
Channel channel();
ChannelReader handler();
ChannelHandler handler();
NextHandler next();
// XXX: What happens if inbound queue is bounded (limited capacity) and it's full?
@ -142,15 +141,17 @@ public interface ChannelHandlerContext extends AttributeMap {
void channelInactive();
void exceptionCaught(Throwable cause);
void userEventTriggered(Object event);
Queue<?> in();
ChannelBufferHolder<Object> in();
// For writers
void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
void disconnect(ChannelFuture future);
void closeInbound(ChannelFuture future);
void closeOutbound(ChannelFuture future);
void close(ChannelFuture future);
void deregister(ChannelFuture future);
Queue<?> out();
ChannelBufferHolder<Object> out();
}
}

View File

@ -0,0 +1,17 @@
package io.netty.channel;
public interface ChannelInboundHandler<T> extends ChannelHandler {
void channelRegistered(ChannelInboundHandlerContext<T> ctx) throws Exception;
void channelUnregistered(ChannelInboundHandlerContext<T> ctx) throws Exception;
void channelActive(ChannelInboundHandlerContext<T> ctx) throws Exception;
void channelInactive(ChannelInboundHandlerContext<T> ctx) throws Exception;
void exceptionCaught(ChannelInboundHandlerContext<T> ctx, Throwable cause) throws Exception;
void userEventTriggered(ChannelInboundHandlerContext<T> ctx, Object evt) throws Exception;
ChannelBufferHolder<T> newInboundBuffer(ChannelInboundHandlerContext<T> ctx) throws Exception;
void inboundBufferUpdated(ChannelInboundHandlerContext<T> ctx) throws Exception;
}

View File

@ -0,0 +1,83 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelRegistered();
}
@Override
public void channelUnregistered(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelUnregistered();
}
@Override
public void channelActive(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelActive();
}
@Override
public void channelInactive(ChannelInboundHandlerContext<I> ctx) throws Exception {
ctx.next().channelInactive();
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<I> ctx, Throwable cause) throws Exception {
ctx.next().exceptionCaught(cause);
}
@Override
public void userEventTriggered(ChannelInboundHandlerContext<I> ctx, Object evt) throws Exception {
ctx.next().userEventTriggered(evt);
}
@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelInboundHandlerContext<I> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<I>());
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx) throws Exception {
if (ctx.in().hasMessageBuffer()) {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> nextIn = ctx.next().in().messageBuffer();
for (;;) {
I msg = in.poll();
if (msg == null) {
break;
}
nextIn.add(msg);
}
} else {
ChannelBuffer in = ctx.in().byteBuffer();
ChannelBuffer nextIn = ctx.next().in().byteBuffer();
nextIn.writeBytes(in);
}
}
}

View File

@ -0,0 +1,7 @@
package io.netty.channel;
public interface ChannelInboundHandlerContext<I> extends ChannelHandlerContext {
@Override
ChannelInboundHandler<I> handler();
ChannelBufferHolder<I> in();
}

View File

@ -0,0 +1,14 @@
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundHandler<T> extends ChannelHandler {
void bind(ChannelOutboundHandlerContext<T> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception;
void connect(ChannelOutboundHandlerContext<T> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception;
void disconnect(ChannelOutboundHandlerContext<T> ctx, ChannelFuture future) throws Exception;
void close(ChannelOutboundHandlerContext<T> ctx, ChannelFuture future) throws Exception;
void deregister(ChannelOutboundHandlerContext<T> ctx, ChannelFuture future) throws Exception;
ChannelBufferHolder<T> newOutboundBuffer(ChannelOutboundHandlerContext<T> ctx) throws Exception;
void outboundBufferUpdated(ChannelOutboundHandlerContext<T> ctx) throws Exception;
}

View File

@ -0,0 +1,78 @@
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<O> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void bind(ChannelOutboundHandlerContext<O> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().bind(localAddress, future);
}
@Override
public void connect(ChannelOutboundHandlerContext<O> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().disconnect(future);
}
@Override
public void close(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().close(future);
}
@Override
public void deregister(ChannelOutboundHandlerContext<O> ctx, ChannelFuture future) throws Exception {
ctx.next().deregister(future);
}
@Override
public ChannelBufferHolder<O> newOutboundBuffer(ChannelOutboundHandlerContext<O> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(new ArrayDeque<O>());
}
@Override
public void outboundBufferUpdated(ChannelOutboundHandlerContext<O> ctx) throws Exception {
if (ctx.out().hasMessageBuffer()) {
Queue<O> out = ctx.out().messageBuffer();
Queue<Object> nextOut = ctx.next().out().messageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
break;
}
nextOut.add(msg);
}
} else {
ChannelBuffer out = ctx.out().byteBuffer();
ChannelBuffer nextOut = ctx.next().out().byteBuffer();
nextOut.writeBytes(out);
}
}
}

View File

@ -0,0 +1,8 @@
package io.netty.channel;
public interface ChannelOutboundHandlerContext<O> extends ChannelHandlerContext {
@Override
ChannelOutboundHandler<O> handler();
ChannelBufferHolder<O> out();
}

View File

@ -1,19 +0,0 @@
package io.netty.channel;
import java.util.Queue;
public interface ChannelReader<T> extends ChannelHandler {
void channelRegistered(ChannelReaderContext<T> ctx) throws Exception;
void channelUnregistered(ChannelReaderContext<T> ctx) throws Exception;
void channelActive(ChannelReaderContext<T> ctx) throws Exception;
void channelInactive(ChannelReaderContext<T> ctx) throws Exception;
void exceptionCaught(ChannelReaderContext<T> ctx, Throwable cause) throws Exception;
void userEventTriggered(ChannelReaderContext<T> ctx, Object evt) throws Exception;
Queue<T> newReceiveBuffer(ChannelReaderContext<T> ctx) throws Exception;
void receiveBufferUpdated(ChannelReaderContext<T> ctx) throws Exception;
void receiveBufferClosed(ChannelReaderContext<T> ctx) throws Exception;
}

View File

@ -1,73 +0,0 @@
package io.netty.channel;
import io.netty.util.internal.QueueFactory;
import java.util.Queue;
public class ChannelReaderAdapter<T> implements ChannelReader<T> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void channelRegistered(ChannelReaderContext<T> ctx) throws Exception {
ctx.next().channelRegistered();
}
@Override
public void channelUnregistered(ChannelReaderContext<T> ctx) throws Exception {
ctx.next().channelUnregistered();
}
@Override
public void channelActive(ChannelReaderContext<T> ctx) throws Exception {
ctx.next().channelActive();
}
@Override
public void channelInactive(ChannelReaderContext<T> ctx) throws Exception {
ctx.next().channelInactive();
}
@Override
public void exceptionCaught(ChannelReaderContext<T> ctx, Throwable cause) throws Exception {
ctx.next().exceptionCaught(cause);
}
@Override
public void userEventTriggered(ChannelReaderContext<T> ctx, Object evt) throws Exception {
ctx.next().userEventTriggered(evt);
}
@Override
@SuppressWarnings("unchecked")
public Queue<T> newReceiveBuffer(ChannelReaderContext<T> ctx) throws Exception {
return (Queue<T>) QueueFactory.createQueue(Object.class);
}
@Override
public void receiveBufferUpdated(ChannelReaderContext<T> ctx) throws Exception {
ctx.in().transferTo(ctx.next().in());
}
@Override
public void receiveBufferClosed(ChannelReaderContext<T> ctx) throws Exception {
ctx.next().in().close();
}
}

View File

@ -1,7 +0,0 @@
package io.netty.channel;
import java.util.Queue;
public interface ChannelReaderContext<T> extends ChannelHandlerContext {
Queue<T> in();
}

View File

@ -1,16 +0,0 @@
package io.netty.channel;
import java.net.SocketAddress;
import java.util.Queue;
public interface ChannelWriter<T> extends ChannelHandler {
void bind(ChannelWriterContext<T> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception;
void connect(ChannelWriterContext<T> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception;
void disconnect(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception;
void close(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception;
void deregister(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception;
Queue<T> newSendBuffer(ChannelWriterContext<T> ctx) throws Exception;
void sendBufferUpdated(ChannelWriterContext<T> ctx) throws Exception;
void sendBufferClosed(ChannelWriterContext<T> ctx) throws Exception;
}

View File

@ -1,69 +0,0 @@
package io.netty.channel;
import io.netty.util.internal.QueueFactory;
import java.net.SocketAddress;
import java.util.Queue;
public class ChannelWriterAdapter<T> implements ChannelWriter<T> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void bind(ChannelWriterContext<T> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().bind(localAddress, future);
}
@Override
public void connect(ChannelWriterContext<T> ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.next().connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception {
ctx.next().disconnect(future);
}
@Override
public void close(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception {
ctx.next().close(future);
}
@Override
public void deregister(ChannelWriterContext<T> ctx, ChannelFuture future) throws Exception {
ctx.next().deregister(future);
}
@Override
@SuppressWarnings("unchecked")
public Queue<T> newSendBuffer(ChannelWriterContext<T> ctx) throws Exception {
return (Queue<T>) QueueFactory.createQueue(Object.class);
}
@Override
public void sendBufferUpdated(ChannelWriterContext<T> ctx) throws Exception {
ctx.out().transferTo(ctx.next().out());
}
@Override
public void sendBufferClosed(ChannelWriterContext<T> ctx) throws Exception {
ctx.next().out().close();
}
}

View File

@ -1,7 +0,0 @@
package io.netty.channel;
import java.util.Queue;
public interface ChannelWriterContext<T> extends ChannelHandlerContext {
Queue<T> out();
}