DefaultPromise LateListener Logic Issues
Motivation: The LateListener logic is prone to infinite loops and relies on being processed in the EventExecutor's thread for synchronization, but this EventExecutor may not be constant. An infinite loop can occur if the EventExecutor's execute method does not introduce a context switch in LateListener.run. The EventExecutor can be changed by classes which inherit from DefaultPromise. For example the DefaultChannelPromise will return w/e EventLoop the channel is registered to, but this EventLoop can change (re-registration). Modifications: - Remove the LateListener concept and instead use a single Object to maintain the listeners while still preserving notification order - Make the result member variable an atomic variable so it can be outside the synchronized(this) blocks - Cleanup/simplify existing state management code Result: Fixes https://github.com/netty/netty/issues/5185
This commit is contained in:
parent
ffd2450e11
commit
f2ed3e6ce8
File diff suppressed because it is too large
Load Diff
@ -84,7 +84,7 @@ public class DefaultPromiseTest {
|
|||||||
p[i].addListener(new FutureListener<Void>() {
|
p[i].addListener(new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
DefaultPromise.notifyListener(ImmediateEventExecutor.INSTANCE, future, new FutureListener<Void>() {
|
future.addListener(new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
if (finalI + 1 < p.length) {
|
if (finalI + 1 < p.length) {
|
||||||
@ -192,8 +192,8 @@ public class DefaultPromiseTest {
|
|||||||
* <ol>
|
* <ol>
|
||||||
* <li>A write is done</li>
|
* <li>A write is done</li>
|
||||||
* <li>The write operation completes, and the promise state is changed to done</li>
|
* <li>The write operation completes, and the promise state is changed to done</li>
|
||||||
* <li>A listener is added to the return from the write. The {@link FutureListener#operationComplete()} updates
|
* <li>A listener is added to the return from the write. The {@link FutureListener#operationComplete(Future)}
|
||||||
* state which must be invoked before the response to the previous write is read.</li>
|
* updates state which must be invoked before the response to the previous write is read.</li>
|
||||||
* <li>The write operation</li>
|
* <li>The write operation</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*/
|
*/
|
||||||
|
@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
|
|||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerAdapter;
|
import io.netty.channel.ChannelHandlerAdapter;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
@ -39,10 +40,27 @@ import java.util.concurrent.CountDownLatch;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class EmbeddedChannelTest {
|
public class EmbeddedChannelTest {
|
||||||
|
|
||||||
|
@Test(timeout = 2000)
|
||||||
|
public void promiseDoesNotInfiniteLoop() throws InterruptedException {
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel();
|
||||||
|
channel.closeFuture().addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
future.channel().close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructWithChannelInitializer() {
|
public void testConstructWithChannelInitializer() {
|
||||||
final Integer first = 1;
|
final Integer first = 1;
|
||||||
|
Loading…
Reference in New Issue
Block a user