Add ChannelMetadata and remove unnecessary disconnect() impls

- Add Channel.metadata() and remove Channel.bufferType()
- DefaultPipeline automatically redirects disconnect() request to
  close() if the channel has no disconnect operation
- Remove unnecessary disconnect() implementations
This commit is contained in:
Trustin Lee 2012-06-19 10:39:30 +09:00
parent 32188f83ac
commit a5bb2c7f77
20 changed files with 134 additions and 88 deletions

View File

@ -434,12 +434,6 @@ public class SpdySessionHandler
super.close(ctx, future);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
sendGoAwayFrame(ctx);
super.close(ctx, future);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer();

View File

@ -335,29 +335,6 @@ public class JZlibEncoder extends ZlibEncoder {
}
}
@Override
public void disconnect(
final ChannelHandlerContext ctx,
final ChannelFuture future) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newFuture());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.disconnect(future);
}
});
if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.disconnect(future);
}
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
}
}
@Override
public void close(
final ChannelHandlerContext ctx,

View File

@ -208,27 +208,6 @@ public class JdkZlibEncoder extends ZlibEncoder {
}
}
@Override
public void disconnect(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newFuture());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.disconnect(future);
}
});
if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.disconnect(future);
}
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
}
}
@Override
public void close(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newFuture());

View File

@ -20,7 +20,6 @@ import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import java.net.SocketAddress;
import java.util.Queue;
/**
* A skeletal server-side {@link Channel} implementation. A server-side
@ -34,6 +33,8 @@ import java.util.Queue;
*/
public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
/**
* Creates a new instance.
*/
@ -52,8 +53,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
public ChannelMetadata metadata() {
return METADATA;
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
@ -139,7 +138,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
boolean isRegistered();
boolean isActive();
ChannelBufType bufferType();
ChannelMetadata metadata();
ByteBuf outboundByteBuffer();
<T> MessageBuf<T> outboundMessageBuffer();

View File

@ -0,0 +1,51 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ChannelBufType;
import java.net.SocketAddress;
/**
* Represents the properties of a {@link Channel} implementation.
*/
public final class ChannelMetadata {
private final ChannelBufType bufferType;
private final boolean hasDisconnect;
public ChannelMetadata(ChannelBufType bufferType, boolean hasDisconnect) {
if (bufferType == null) {
throw new NullPointerException("bufferType");
}
this.bufferType = bufferType;
this.hasDisconnect = hasDisconnect;
}
public ChannelBufType bufferType() {
return bufferType;
}
/**
* Returns {@code true} if and only if the channel has the {@code disconnect()} operation
* that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} again,
* such as UDP/IP.
*/
public boolean hasDisconnect() {
return hasDisconnect;
}
}

View File

@ -887,7 +887,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public MessageBuf<Object> inboundMessageBuffer() {
if (channel.bufferType() != ChannelBufType.MESSAGE) {
if (channel.metadata().bufferType() != ChannelBufType.MESSAGE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
@ -896,7 +896,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ByteBuf inboundByteBuffer() {
if (channel.bufferType() != ChannelBufType.BYTE) {
if (channel.metadata().bufferType() != ChannelBufType.BYTE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
@ -1150,6 +1150,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!ctx.channel().metadata().hasDisconnect()) {
return close(ctx, future);
}
validateFuture(future);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
@ -1435,7 +1441,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public ChannelBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
switch (channel.bufferType()) {
switch (channel.metadata().bufferType()) {
case BYTE:
return Unpooled.dynamicBuffer();
case MESSAGE:

View File

@ -19,16 +19,19 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelMetadata;
public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
public EmbeddedByteChannel(ChannelHandler... handlers) {
super(Unpooled.dynamicBuffer(), handlers);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.BYTE;
public ChannelMetadata metadata() {
return METADATA;
}
public ByteBuf inboundBuffer() {

View File

@ -19,16 +19,19 @@ import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelMetadata;
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
public EmbeddedMessageChannel(ChannelHandler... handlers) {
super(Unpooled.messageBuffer(), handlers);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
public ChannelMetadata metadata() {
return METADATA;
}
public MessageBuf<Object> inboundBuffer() {

View File

@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventExecutor;
@ -38,6 +39,8 @@ import java.nio.channels.NotYetConnectedException;
*/
public class LocalChannel extends AbstractChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
private final ChannelConfig config = new DefaultChannelConfig();
private final Runnable shutdownHook = new Runnable() {
@Override
@ -68,8 +71,8 @@ public class LocalChannel extends AbstractChannel {
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
public ChannelMetadata metadata() {
return METADATA;
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
@ -31,11 +30,6 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
super(parent, id, ch, SelectionKey.OP_READ);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.BYTE;
}
@Override
protected Unsafe newUnsafe() {
return new NioByteUnsafe();

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
@ -30,11 +29,6 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
super(parent, id, ch, defaultInterestOps);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
}
@Override
protected Unsafe newUnsafe() {
return new NioMessageUnsafe();

View File

@ -16,10 +16,12 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
@ -47,6 +49,8 @@ import java.util.Map;
public final class NioDatagramChannel
extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, true);
private final DatagramChannelConfig config;
private final Map<InetAddress, List<MembershipKey>> memberships =
new HashMap<InetAddress, List<MembershipKey>>();
@ -92,6 +96,11 @@ public final class NioDatagramChannel
config = new NioDatagramChannelConfig(socket);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public DatagramChannelConfig config() {
return config;

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
@ -30,6 +32,8 @@ import java.nio.channels.SocketChannel;
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
private static ServerSocketChannel newSocket() {
try {
return ServerSocketChannel.open();
@ -46,6 +50,11 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public ServerSocketChannelConfig config() {
return config;

View File

@ -16,8 +16,10 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.logging.InternalLogger;
@ -30,6 +32,8 @@ import java.nio.channels.SocketChannel;
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private final SocketChannelConfig config;
@ -71,6 +75,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
config = new DefaultSocketChannelConfig(socket.socket());
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public SocketChannelConfig config() {
return config;

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
@ -28,11 +27,6 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
super(parent, id);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.BYTE;
}
@Override
protected Unsafe newUnsafe() {
return new OioByteUnsafe();

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
@ -28,11 +27,6 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
super(parent, id);
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
}
@Override
protected Unsafe newUnsafe() {
return new OioMessageUnsafe();

View File

@ -16,10 +16,12 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
@ -42,6 +44,8 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, true);
private static final byte[] EMPTY_DATA = new byte[0];
private final MulticastSocket socket;
@ -85,6 +89,11 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
config = new DefaultDatagramChannelConfig(socket);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public DatagramChannelConfig config() {
return config;

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
@ -38,6 +40,8 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.MESSAGE, false);
private static ServerSocket newServerSocket() {
try {
return new ServerSocket();
@ -88,6 +92,11 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
config = new DefaultServerSocketChannelConfig(socket);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public ServerSocketChannelConfig config() {
return config;

View File

@ -16,8 +16,10 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
@ -38,6 +40,8 @@ public class OioSocketChannel extends AbstractOioByteChannel
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSocketChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(ChannelBufType.BYTE, false);
private final Socket socket;
private final SocketChannelConfig config;
private InputStream is;
@ -77,6 +81,11 @@ public class OioSocketChannel extends AbstractOioByteChannel
}
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public SocketChannelConfig config() {
return config;