Synchronized between 4.1 and master again (part 2)

Motivation:
4 and 5 were diverged long time ago and we recently reverted some of the
early commits in master.  We must make sure 4.1 and master are not very
different now.

Modification:
Remove ChannelHandlerInvoker.writeAndFlush(...) and the related
implementations.

Result:
4.1 and master got closer.
This commit is contained in:
Trustin Lee 2014-04-25 14:00:04 +09:00
parent 8c3eaf3b56
commit b9039eaa82
17 changed files with 90 additions and 78 deletions

View File

@ -65,14 +65,14 @@ public interface EventExecutor extends EventExecutorGroup {
<V> ProgressivePromise<V> newProgressivePromise(); <V> ProgressivePromise<V> newProgressivePromise();
/** /**
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()} * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking. * every call of blocking methods will just return without blocking.
*/ */
<V> Future<V> newSucceededFuture(V result); <V> Future<V> newSucceededFuture(V result);
/** /**
* Create a new {@link Future} which is marked as fakued already. So {@link Future#isSuccess()} * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking. * every call of blocking methods will just return without blocking.
*/ */

View File

@ -18,8 +18,8 @@ package io.netty.example.uptime;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;

View File

@ -30,7 +30,7 @@
* <li> <tt>{@link io.netty.handler.traffic.AbstractTrafficShapingHandler}</tt>: this abstract class implements * <li> <tt>{@link io.netty.handler.traffic.AbstractTrafficShapingHandler}</tt>: this abstract class implements
* the kernel of traffic shaping. It could be extended to fit your needs. Two classes are proposed as default * the kernel of traffic shaping. It could be extended to fit your needs. Two classes are proposed as default
* implementations: see {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} and * implementations: see {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} and
* see {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} respectively for Channel traffic shaping and * {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} respectively for Channel traffic shaping and
* global traffic shaping.</li> * global traffic shaping.</li>
* </ul></p> * </ul></p>
* *

View File

@ -66,7 +66,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
private volatile boolean inputShutdown; private volatile boolean inputShutdown;
private volatile boolean outputShutdown; private volatile boolean outputShutdown;
EpollSocketChannel(Channel parent, int fd) throws IOException { EpollSocketChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN, true); super(parent, fd, Native.EPOLLIN, true);
config = new EpollSocketChannelConfig(this); config = new EpollSocketChannelConfig(this);
// Directly cache the remote and local addresses // Directly cache the remote and local addresses

View File

@ -25,7 +25,6 @@ import static io.netty.channel.ChannelOption.*;
/** /**
* Option for configuring a serial port connection * Option for configuring a serial port connection
*/ */
public final class RxtxChannelOption { public final class RxtxChannelOption {
private static final Class<RxtxChannelOption> T = RxtxChannelOption.class; private static final Class<RxtxChannelOption> T = RxtxChannelOption.class;

View File

@ -18,6 +18,7 @@ package io.netty.channel.sctp;
import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
@ -33,9 +34,9 @@ import io.netty.channel.RecvByteBufAllocator;
* <tr> * <tr>
* <th>Name</th><th>Associated setter method</th> * <th>Name</th><th>Associated setter method</th>
* </tr><tr> * </tr><tr>
* <td>{@link io.netty.channel.ChannelOption#SO_RCVBUF}</td><td>{@link #setReceiveBufferSize(int)}</td> * <td>{@link ChannelOption#SO_RCVBUF}</td><td>{@link #setReceiveBufferSize(int)}</td>
* </tr><tr> * </tr><tr>
* <td>{@link io.netty.channel.ChannelOption#SO_SNDBUF}</td><td>{@link #setSendBufferSize(int)}</td> * <td>{@link ChannelOption#SO_SNDBUF}</td><td>{@link #setSendBufferSize(int)}</td>
* </tr><tr> * </tr><tr>
* <td>{@link SctpChannelOption#SCTP_NODELAY}</td><td>{@link #setSctpNoDelay(boolean)}}</td> * <td>{@link SctpChannelOption#SCTP_NODELAY}</td><td>{@link #setSctpNoDelay(boolean)}}</td>
* </tr><tr> * </tr><tr>

View File

@ -23,11 +23,9 @@ import io.netty.channel.udt.UdtMessage;
* <p> * <p>
* Note: send/receive must use {@link UdtMessage} in the pipeline * Note: send/receive must use {@link UdtMessage} in the pipeline
*/ */
public class NioUdtMessageRendezvousChannel extends public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChannel {
NioUdtMessageConnectorChannel {
public NioUdtMessageRendezvousChannel() { public NioUdtMessageRendezvousChannel() {
super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM)); super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM));
} }
} }

View File

@ -239,5 +239,4 @@ public final class NioUdtProvider<T extends UdtChannel> implements ChannelFactor
public TypeUDT type() { public TypeUDT type() {
return type; return type;
} }
} }

View File

@ -436,6 +436,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* <li>{@link #remoteAddress()}</li> * <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li> * <li>{@link #closeForcibly()}</li>
* <li>{@link #register(EventLoop, ChannelPromise)}</li> * <li>{@link #register(EventLoop, ChannelPromise)}</li>
* <li>{@link #deregister(ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li> * <li>{@link #voidPromise()}</li>
* </ul> * </ul>
*/ */

View File

@ -192,6 +192,9 @@ public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter {
} else { } else {
name = e.name; name = e.name;
} }
// Note that we do not use dctx.invoker() because it raises an IllegalStateExxception
// if the Channel is not registered yet.
pipeline.addAfter(dctx.invoker, oldName, name, e.handler); pipeline.addAfter(dctx.invoker, oldName, name, e.handler);
} }
} finally { } finally {

View File

@ -101,7 +101,7 @@ import java.nio.channels.Channels;
* a = 1; * a = 1;
* } * }
* *
* attr.set(a * (Integer) msg)); * attr.set(a * (Integer) msg);
* } * }
* } * }
* *

View File

@ -23,7 +23,7 @@ import java.net.SocketAddress;
/** /**
* Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}. * Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
* A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default * A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default
* implementation. * implementation. Note that the methods in this interface are not intended to be called by a user.
*/ */
public interface ChannelHandlerInvoker { public interface ChannelHandlerInvoker {

View File

@ -126,6 +126,11 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
invokeChannelRegisteredNow(ctx); invokeChannelRegisteredNow(ctx);
} }
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
invokeChannelUnregisteredNow(ctx);
}
@Override @Override
public void invokeChannelActive(ChannelHandlerContext ctx) { public void invokeChannelActive(ChannelHandlerContext ctx) {
invokeChannelActiveNow(ctx); invokeChannelActiveNow(ctx);
@ -136,11 +141,6 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
invokeChannelInactiveNow(ctx); invokeChannelInactiveNow(ctx);
} }
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
invokeChannelUnregisteredNow(ctx);
}
@Override @Override
public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
invokeExceptionCaughtNow(ctx, cause); invokeExceptionCaughtNow(ctx, cause);

View File

@ -143,6 +143,7 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
} }
this.activeOnOpen = activeOnOpen; this.activeOnOpen = activeOnOpen;
} }
@Override @Override
public boolean isBroadcast() { public boolean isBroadcast() {
try { try {

View File

@ -67,7 +67,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
* Create a new instance * Create a new instance
*/ */
public NioSocketChannel() { public NioSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER)); this(DEFAULT_SELECTOR_PROVIDER);
} }
/** /**

View File

@ -39,8 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SingleThreadEventLoopTest { public class SingleThreadEventLoopTest {

View File

@ -90,7 +90,7 @@ public class LocalTransportThreadModelTest {
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor(); ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor(); ThreadNameAuditor h3 = new ThreadNameAuditor(true);
Channel ch = new LocalChannel(); Channel ch = new LocalChannel();
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'. // With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
@ -361,6 +361,15 @@ public class LocalTransportThreadModelTest {
private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<String>(); private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<String>();
private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<String>(); private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<String>();
private final Queue<String> removalThreadNames = new ConcurrentLinkedQueue<String>(); private final Queue<String> removalThreadNames = new ConcurrentLinkedQueue<String>();
private final boolean discard;
ThreadNameAuditor() {
this(false);
}
ThreadNameAuditor(boolean discard) {
this.discard = discard;
}
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
@ -370,8 +379,10 @@ public class LocalTransportThreadModelTest {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundThreadNames.add(Thread.currentThread().getName()); inboundThreadNames.add(Thread.currentThread().getName());
if (!discard) {
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
}
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {