Synchronized between 4.1 and master again (part 2)

Motivation:
4 and 5 were diverged long time ago and we recently reverted some of the
early commits in master.  We must make sure 4.1 and master are not very
different now.

Modification:
Remove ChannelHandlerInvoker.writeAndFlush(...) and the related
implementations.

Result:
4.1 and master got closer.
This commit is contained in:
Trustin Lee 2014-04-25 13:44:04 +09:00
parent 8a7d8a8ab7
commit 872d4c5bc1
17 changed files with 102 additions and 88 deletions

View File

@ -90,10 +90,10 @@ public class DefaultHttpHeaders extends HttpHeaders {
CharSequence strVal; CharSequence strVal;
if (validate) { if (validate) {
validateHeaderName0(name); validateHeaderName0(name);
strVal = toCharsequence(value); strVal = toCharSequence(value);
validateHeaderValue(strVal); validateHeaderValue(strVal);
} else { } else {
strVal = toCharsequence(value); strVal = toCharSequence(value);
} }
int h = hash(name); int h = hash(name);
int i = index(h); int i = index(h);
@ -109,7 +109,7 @@ public class DefaultHttpHeaders extends HttpHeaders {
int h = hash(name); int h = hash(name);
int i = index(h); int i = index(h);
for (Object v: values) { for (Object v: values) {
CharSequence vstr = toCharsequence(v); CharSequence vstr = toCharSequence(v);
if (validate) { if (validate) {
validateHeaderValue(vstr); validateHeaderValue(vstr);
} }
@ -181,10 +181,10 @@ public class DefaultHttpHeaders extends HttpHeaders {
CharSequence strVal; CharSequence strVal;
if (validate) { if (validate) {
validateHeaderName0(name); validateHeaderName0(name);
strVal = toCharsequence(value); strVal = toCharSequence(value);
validateHeaderValue(strVal); validateHeaderValue(strVal);
} else { } else {
strVal = toCharsequence(value); strVal = toCharSequence(value);
} }
int h = hash(name); int h = hash(name);
int i = index(h); int i = index(h);
@ -210,7 +210,7 @@ public class DefaultHttpHeaders extends HttpHeaders {
if (v == null) { if (v == null) {
break; break;
} }
CharSequence strVal = toCharsequence(v); CharSequence strVal = toCharSequence(v);
if (validate) { if (validate) {
validateHeaderValue(strVal); validateHeaderValue(strVal);
} }
@ -344,7 +344,7 @@ public class DefaultHttpHeaders extends HttpHeaders {
} }
} }
private static CharSequence toCharsequence(Object value) { private static CharSequence toCharSequence(Object value) {
if (value == null) { if (value == null) {
return null; return null;
} }

View File

@ -114,9 +114,9 @@ public class DefaultLastHttpContent extends DefaultHttpContent implements LastHt
@Override @Override
void validateHeaderName0(CharSequence name) { void validateHeaderName0(CharSequence name) {
super.validateHeaderName0(name); super.validateHeaderName0(name);
if (equalsIgnoreCase(name, HttpHeaders.Names.CONTENT_LENGTH) || if (equalsIgnoreCase(HttpHeaders.Names.CONTENT_LENGTH, name) ||
equalsIgnoreCase(name, HttpHeaders.Names.TRANSFER_ENCODING) || equalsIgnoreCase(HttpHeaders.Names.TRANSFER_ENCODING, name) ||
equalsIgnoreCase(name, HttpHeaders.Names.TRAILER)) { equalsIgnoreCase(HttpHeaders.Names.TRAILER, name)) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"prohibited trailing header: " + name); "prohibited trailing header: " + name);
} }

View File

@ -933,7 +933,7 @@ public abstract class HttpHeaders implements Iterable<Map.Entry<String, String>>
/** /**
* Sets the {@code "Host"} header. * Sets the {@code "Host"} header.
*/ */
public static void setHost(HttpMessage message, String value) { public static void setHost(HttpMessage message, CharSequence value) {
message.headers().set(Names.HOST, value); message.headers().set(Names.HOST, value);
} }

View File

@ -38,7 +38,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
* <p> * <p>
* Once started, you can test the server with your * Once started, you can test the server with your
* <a href="http://en.wikipedia.org/wiki/SPDY#Browser_support_and_usage">SPDY enabled web browser</a> by navigating * <a href="http://en.wikipedia.org/wiki/SPDY#Browser_support_and_usage">SPDY enabled web browser</a> by navigating
* to to <a href="https://localhost:8443/">https://localhost:8443/</a> * to <a href="https://localhost:8443/">https://localhost:8443/</a>
*/ */
public class SpdyServer { public class SpdyServer {

View File

@ -64,7 +64,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
*/ */
public NioSctpServerChannel() { public NioSctpServerChannel() {
super(null, newSocket(), SelectionKey.OP_ACCEPT); super(null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultSctpServerChannelConfig(this, javaChannel()); config = new NioSctpServerChannelConfig(this, javaChannel());
} }
@Override @Override

View File

@ -23,8 +23,7 @@ import io.netty.channel.udt.UdtMessage;
* <p> * <p>
* Note: send/receive must use {@link UdtMessage} in the pipeline * Note: send/receive must use {@link UdtMessage} in the pipeline
*/ */
public class NioUdtMessageRendezvousChannel extends public class NioUdtMessageRendezvousChannel extends NioUdtMessageConnectorChannel {
NioUdtMessageConnectorChannel {
public NioUdtMessageRendezvousChannel() { public NioUdtMessageRendezvousChannel() {
super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM)); super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.DATAGRAM));

View File

@ -24,9 +24,9 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -217,7 +217,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
public abstract B clone(); public abstract B clone();
/** /**
* Create a new {@link Channel} and register it with an {@link EventExecutorGroup}. * Create a new {@link Channel} and register it with an {@link EventLoop}.
*/ */
public ChannelFuture register() { public ChannelFuture register() {
validate(); validate();
@ -448,7 +448,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
@Override @Override
public String toString() { public String toString() {
return clazz.getSimpleName() + ".class"; return StringUtil.simpleClassName(clazz) + ".class";
} }
} }
} }

View File

@ -416,8 +416,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (!isCompatible(eventLoop)) { if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + promise.setFailure(
eventLoop.getClass().getName())); new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return; return;
} }
@ -493,6 +493,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closeIfClosed(); closeIfClosed();
return; return;
} }
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() { invokeLater(new OneTimeTask() {
@Override @Override
@ -501,6 +502,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
}); });
} }
safeSetSuccess(promise); safeSetSuccess(promise);
} }
@ -518,6 +520,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closeIfClosed(); closeIfClosed();
return; return;
} }
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() { invokeLater(new OneTimeTask() {
@Override @Override
@ -526,6 +529,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
}); });
} }
safeSetSuccess(promise); safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel closeIfClosed(); // doDisconnect() might have closed the channel
} }
@ -749,6 +753,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
private void invokeLater(Runnable task) { private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later. // This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been // They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack // triggered by another inbound event handler method. If fired immediately, the call stack
@ -761,6 +766,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// //
// which means the execution of two inbound handler methods of the same handler overlap undesirably. // which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task); eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
} }
} }
@ -787,8 +795,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract SocketAddress remoteAddress0(); protected abstract SocketAddress remoteAddress0();
/** /**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
* process.
* *
* Sub-classes may override this method * Sub-classes may override this method
*/ */

View File

@ -432,7 +432,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* <li>{@link #localAddress()}</li> * <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li> * <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li> * <li>{@link #closeForcibly()}</li>
* <li>{@link #register(ChannelPromise)}</li> * <li>{@link #register(EventLoop, ChannelPromise)}</li>
* <li>{@link #deregister(ChannelPromise)}</li> * <li>{@link #deregister(ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li> * <li>{@link #voidPromise()}</li>
* </ul> * </ul>

View File

@ -181,9 +181,9 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
added = true; added = true;
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) ctx.pipeline(); DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline();
String name = ctx.name(); String name = dctx.name();
try { try {
for (Entry e: handlers) { for (Entry e: handlers) {
String oldName = name; String oldName = name;
@ -192,10 +192,10 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter {
} else { } else {
name = e.name; name = e.name;
} }
// Pass in direct the invoker to eliminate the possibility of an IllegalStateException
// Note that we do not use dctx.invoker() because it raises an IllegalStateExxception
// if the Channel is not registered yet. // if the Channel is not registered yet.
DefaultChannelHandlerContext context = (DefaultChannelHandlerContext) ctx; pipeline.addAfter(dctx.invoker, oldName, name, e.handler);
pipeline.addAfter(context.invoker, oldName, name, e.handler);
} }
} finally { } finally {
if (selfRemoval) { if (selfRemoval) {

View File

@ -35,8 +35,8 @@ import java.nio.channels.Channels;
* *
* <h3>Notify</h3> * <h3>Notify</h3>
* *
* You can notify the closest handler in the * You can notify the closest handler in the same {@link ChannelPipeline} by calling one of the various methods
* same {@link ChannelPipeline} by calling one of the various methods provided here. * provided here.
* *
* Please refer to {@link ChannelPipeline} to understand how an event flows. * Please refer to {@link ChannelPipeline} to understand how an event flows.
* *

View File

@ -67,6 +67,9 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
return (EventLoop) super.next(); return (EventLoop) super.next();
} }
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
@Override @Override
public ChannelFuture register(Channel channel) { public ChannelFuture register(Channel channel) {
return next().register(channel); return next().register(channel);

View File

@ -92,7 +92,7 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
@Override @Override
public ChannelFuture register(Channel channel) { public ChannelFuture register(Channel channel) {
return register(channel, channel.newPromise()); return register(channel, new DefaultChannelPromise(channel, this));
} }
@Override @Override

View File

@ -18,6 +18,7 @@ package io.netty.channel.oio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -137,7 +137,7 @@ public class SingleThreadEventLoopTest {
testScheduleTask(loopB); testScheduleTask(loopB);
} }
private static void testScheduleTask(EventExecutor loopA) throws InterruptedException, ExecutionException { private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
final AtomicLong endTime = new AtomicLong(); final AtomicLong endTime = new AtomicLong();
loopA.schedule(new Runnable() { loopA.schedule(new Runnable() {
@ -159,7 +159,7 @@ public class SingleThreadEventLoopTest {
testScheduleTaskAtFixedRate(loopB); testScheduleTaskAtFixedRate(loopB);
} }
private static void testScheduleTaskAtFixedRate(EventExecutor loopA) throws InterruptedException { private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>(); final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() { ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override @Override
@ -199,7 +199,7 @@ public class SingleThreadEventLoopTest {
testScheduleLaggyTaskAtFixedRate(loopB); testScheduleLaggyTaskAtFixedRate(loopB);
} }
private static void testScheduleLaggyTaskAtFixedRate(EventExecutor loopA) throws InterruptedException { private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>(); final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() { ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override @Override

View File

@ -26,7 +26,9 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -83,8 +85,8 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 5000) @Test(timeout = 5000)
public void testStagedExecution() throws Throwable { public void testStagedExecution() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor(); ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor(true); ThreadNameAuditor h3 = new ThreadNameAuditor(true);
@ -226,11 +228,11 @@ public class LocalTransportThreadModelTest {
@Ignore @Ignore
public void testConcurrentMessageBufferAccess() throws Throwable { public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3")); EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4")); EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5")); EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
try { try {
final MessageForwarder1 h1 = new MessageForwarder1(); final MessageForwarder1 h1 = new MessageForwarder1();

View File

@ -25,7 +25,9 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -114,13 +116,13 @@ public class LocalTransportThreadModelTest3 {
private static void testConcurrentAddRemove(boolean inbound) throws Exception { private static void testConcurrentAddRemove(boolean inbound) throws Exception {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup e1 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup e3 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e3")); EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventLoopGroup e4 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e4")); EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventLoopGroup e5 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e5")); EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
final EventLoopGroup[] groups = { e1, e2, e3, e4, e5 }; final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 };
try { try {
Deque<EventType> events = new ConcurrentLinkedDeque<EventType>(); Deque<EventType> events = new ConcurrentLinkedDeque<EventType>();
final EventForwarder h1 = new EventForwarder(); final EventForwarder h1 = new EventForwarder();
@ -202,7 +204,7 @@ public class LocalTransportThreadModelTest3 {
for (;;) { for (;;) {
EventType event = events.poll(); EventType event = events.poll();
if (event == null) { if (event == null) {
Assert.assertTrue("Missing events:" + expectedEvents.toString(), expectedEvents.isEmpty()); Assert.assertTrue("Missing events:" + expectedEvents, expectedEvents.isEmpty());
break; break;
} }
Assert.assertEquals(event, expectedEvents.poll()); Assert.assertEquals(event, expectedEvents.poll());
@ -250,7 +252,7 @@ public class LocalTransportThreadModelTest3 {
private final Queue<EventType> events; private final Queue<EventType> events;
private final boolean inbound; private final boolean inbound;
public EventRecorder(Queue<EventType> events, boolean inbound) { EventRecorder(Queue<EventType> events, boolean inbound) {
this.events = events; this.events = events;
this.inbound = inbound; this.inbound = inbound;
} }