Split dynamic pipeline manipulation test into a new class / Replace PrefixThreadFactory with DefaultThreadFactory / Port the latest tests from the branch 'out-of-order' written by @normanmaurer
This commit is contained in:
parent
1cdb9e0b48
commit
475039532c
@ -21,7 +21,6 @@ import io.netty.buffer.MessageBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelHandlerUtil;
|
||||
import io.netty.channel.ChannelInboundByteHandler;
|
||||
@ -31,9 +30,9 @@ import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOutboundByteHandler;
|
||||
import io.netty.channel.ChannelOutboundMessageHandler;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelStateHandlerAdapter;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
@ -41,18 +40,11 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Deque;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class LocalTransportThreadModelTest {
|
||||
@ -97,9 +89,9 @@ public class LocalTransportThreadModelTest {
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testStagedExecution() throws Throwable {
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
|
||||
ThreadNameAuditor h1 = new ThreadNameAuditor();
|
||||
ThreadNameAuditor h2 = new ThreadNameAuditor();
|
||||
ThreadNameAuditor h3 = new ThreadNameAuditor();
|
||||
@ -215,12 +207,12 @@ public class LocalTransportThreadModelTest {
|
||||
@Test(timeout = 30000)
|
||||
@Ignore("regression test")
|
||||
public void testConcurrentMessageBufferAccess() throws Throwable {
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
|
||||
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3"));
|
||||
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4"));
|
||||
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5"));
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
|
||||
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
|
||||
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
|
||||
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
|
||||
|
||||
try {
|
||||
final MessageForwarder1 h1 = new MessageForwarder1();
|
||||
@ -337,127 +329,6 @@ public class LocalTransportThreadModelTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Ignore("needs to get fixed")
|
||||
public void testConcurrentAddRemove() throws Throwable {
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
|
||||
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3"));
|
||||
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4"));
|
||||
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5"));
|
||||
|
||||
final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 };
|
||||
try {
|
||||
Deque<EventRecordHandler.Events> events = new ConcurrentLinkedDeque<EventRecordHandler.Events>();
|
||||
final EventForwardHandler h1 = new EventForwardHandler();
|
||||
final EventForwardHandler h2 = new EventForwardHandler();
|
||||
final EventForwardHandler h3 = new EventForwardHandler();
|
||||
final EventForwardHandler h4 = new EventForwardHandler();
|
||||
final EventForwardHandler h5 = new EventForwardHandler();
|
||||
final EventRecordHandler h6 = new EventRecordHandler(events);
|
||||
|
||||
final Channel ch = new LocalChannel();
|
||||
|
||||
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
|
||||
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
|
||||
ch.pipeline().addLast(h1)
|
||||
.addLast(e1, h2)
|
||||
.addLast(e2, h3)
|
||||
.addLast(e3, h4)
|
||||
.addLast(e4, h5)
|
||||
.addLast(e5, "recorder", h6);
|
||||
|
||||
l.register(ch).sync().channel().connect(localAddr).sync();
|
||||
|
||||
final int TOTAL_CNT = 8192;
|
||||
final LinkedList<EventRecordHandler.Events> expectedEvents = events(TOTAL_CNT);
|
||||
|
||||
Throwable cause = new Throwable();
|
||||
|
||||
Thread pipelineModifier = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Random random = new Random();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
if (!ch.isRegistered()) {
|
||||
continue;
|
||||
}
|
||||
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
|
||||
ChannelHandler handler = ch.pipeline().removeFirst();
|
||||
ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder",
|
||||
UUID.randomUUID().toString(), handler);
|
||||
}
|
||||
}
|
||||
});
|
||||
pipelineModifier.setDaemon(true);
|
||||
pipelineModifier.start();
|
||||
for (int i = 0; i < TOTAL_CNT; i++) {
|
||||
EventRecordHandler.Events event = expectedEvents.get(i);
|
||||
switch (event) {
|
||||
case EXCEPTION_CAUGHT:
|
||||
ch.pipeline().fireExceptionCaught(cause);
|
||||
break;
|
||||
case INBOUND_BufFER_UPDATED:
|
||||
ch.pipeline().fireInboundBufferUpdated();
|
||||
break;
|
||||
case READ_SUSPEND:
|
||||
ch.pipeline().fireChannelReadSuspended();
|
||||
break;
|
||||
case USER_EVENT:
|
||||
ch.pipeline().fireUserEventTriggered("");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while (events.size() < TOTAL_CNT + 2) {
|
||||
System.out.println(events.size() + " < " + (TOTAL_CNT + 2));
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
ch.close().sync();
|
||||
|
||||
expectedEvents.addFirst(EventRecordHandler.Events.ACTIVE);
|
||||
expectedEvents.addFirst(EventRecordHandler.Events.REGISTERED);
|
||||
expectedEvents.addLast(EventRecordHandler.Events.INACTIVE);
|
||||
expectedEvents.addLast(EventRecordHandler.Events.UNREGISTERED);
|
||||
|
||||
for (;;) {
|
||||
EventRecordHandler.Events event = events.poll();
|
||||
if (event == null) {
|
||||
Assert.assertTrue(expectedEvents.isEmpty());
|
||||
break;
|
||||
}
|
||||
Assert.assertEquals(expectedEvents.poll(), event);
|
||||
}
|
||||
} finally {
|
||||
l.shutdown();
|
||||
e1.shutdown();
|
||||
e2.shutdown();
|
||||
e3.shutdown();
|
||||
e4.shutdown();
|
||||
e5.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static LinkedList<EventRecordHandler.Events> events(int size) {
|
||||
EventRecordHandler.Events[] events = { EventRecordHandler.Events.EXCEPTION_CAUGHT,
|
||||
EventRecordHandler.Events.USER_EVENT, EventRecordHandler.Events.INBOUND_BufFER_UPDATED,
|
||||
EventRecordHandler.Events.READ_SUSPEND};
|
||||
Random random = new Random();
|
||||
LinkedList<EventRecordHandler.Events> expectedEvents = new LinkedList<EventRecordHandler.Events>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
expectedEvents.add(events[random.nextInt(events.length)]);
|
||||
}
|
||||
return expectedEvents;
|
||||
}
|
||||
|
||||
private static class ThreadNameAuditor
|
||||
extends ChannelDuplexHandler
|
||||
implements ChannelInboundMessageHandler<Object>,
|
||||
@ -904,94 +775,4 @@ public class LocalTransportThreadModelTest {
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
}
|
||||
|
||||
private static class PrefixThreadFactory implements ThreadFactory {
|
||||
|
||||
private final String prefix;
|
||||
private final AtomicInteger id = new AtomicInteger();
|
||||
|
||||
public PrefixThreadFactory(String prefix) {
|
||||
this.prefix = prefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName(prefix + '-' + id.incrementAndGet());
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private static final class EventForwardHandler extends ChannelDuplexHandler {
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
ctx.flush(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class EventRecordHandler extends ChannelStateHandlerAdapter {
|
||||
public enum Events {
|
||||
EXCEPTION_CAUGHT,
|
||||
USER_EVENT,
|
||||
READ_SUSPEND,
|
||||
INACTIVE,
|
||||
ACTIVE,
|
||||
UNREGISTERED,
|
||||
REGISTERED,
|
||||
INBOUND_BufFER_UPDATED
|
||||
}
|
||||
|
||||
private final Queue<Events> events;
|
||||
|
||||
public EventRecordHandler(Queue<Events> events) {
|
||||
this.events = events;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
events.add(Events.EXCEPTION_CAUGHT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
events.add(Events.USER_EVENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.READ_SUSPEND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.INACTIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.ACTIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.UNREGISTERED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.REGISTERED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(Events.INBOUND_BufFER_UPDATED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,326 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.local;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Deque;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
|
||||
public class LocalTransportThreadModelTest3 {
|
||||
|
||||
enum EventType {
|
||||
EXCEPTION_CAUGHT,
|
||||
USER_EVENT,
|
||||
READ_SUSPEND,
|
||||
INACTIVE,
|
||||
ACTIVE,
|
||||
UNREGISTERED,
|
||||
REGISTERED,
|
||||
INBOUND_BuFFER_UPDATED,
|
||||
FLUSH,
|
||||
READ
|
||||
}
|
||||
|
||||
private static EventLoopGroup group;
|
||||
private static LocalAddress localAddr;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
// Configure a test server
|
||||
group = new LocalEventLoopGroup();
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(group)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ChannelInboundMessageHandlerAdapter<Object>() {
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) {
|
||||
// Discard
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() {
|
||||
group.shutdown();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Ignore("regression test")
|
||||
public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable {
|
||||
for (int i = 0; i < 50; i ++) {
|
||||
testConcurrentAddRemoveInboundEvents();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Ignore("regression test")
|
||||
public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable {
|
||||
for (int i = 0; i < 50; i ++) {
|
||||
testConcurrentAddRemoveOutboundEvents();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Ignore("needs a fix")
|
||||
public void testConcurrentAddRemoveInboundEvents() throws Throwable {
|
||||
testConcurrentAddRemove(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Ignore("needs a fix")
|
||||
public void testConcurrentAddRemoveOutboundEvents() throws Throwable {
|
||||
testConcurrentAddRemove(false);
|
||||
}
|
||||
|
||||
private static void testConcurrentAddRemove(boolean inbound) throws Exception {
|
||||
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
|
||||
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
|
||||
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
|
||||
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
|
||||
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
|
||||
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
|
||||
|
||||
final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5};
|
||||
try {
|
||||
Deque<EventType> events = new ConcurrentLinkedDeque<EventType>();
|
||||
final EventForwarder h1 = new EventForwarder();
|
||||
final EventForwarder h2 = new EventForwarder();
|
||||
final EventForwarder h3 = new EventForwarder();
|
||||
final EventForwarder h4 = new EventForwarder();
|
||||
final EventForwarder h5 = new EventForwarder();
|
||||
final EventRecorder h6 = new EventRecorder(events, inbound);
|
||||
|
||||
final Channel ch = new LocalChannel();
|
||||
if (!inbound) {
|
||||
ch.config().setAutoRead(false);
|
||||
}
|
||||
ch.pipeline().addLast(e1, h1)
|
||||
.addLast(e1, h2)
|
||||
.addLast(e1, h3)
|
||||
.addLast(e1, h4)
|
||||
.addLast(e1, h5)
|
||||
.addLast(e1, "recorder", h6);
|
||||
|
||||
l.register(ch).sync().channel().connect(localAddr).sync();
|
||||
|
||||
final LinkedList<EventType> expectedEvents = events(inbound, 8192);
|
||||
|
||||
Throwable cause = new Throwable();
|
||||
|
||||
Thread pipelineModifier = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Random random = new Random();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
if (!ch.isRegistered()) {
|
||||
continue;
|
||||
}
|
||||
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
|
||||
ChannelHandler handler = ch.pipeline().removeFirst();
|
||||
ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder",
|
||||
UUID.randomUUID().toString(), handler);
|
||||
}
|
||||
}
|
||||
});
|
||||
pipelineModifier.setDaemon(true);
|
||||
pipelineModifier.start();
|
||||
for (EventType event: expectedEvents) {
|
||||
switch (event) {
|
||||
case EXCEPTION_CAUGHT:
|
||||
ch.pipeline().fireExceptionCaught(cause);
|
||||
break;
|
||||
case INBOUND_BuFFER_UPDATED:
|
||||
ch.pipeline().fireInboundBufferUpdated();
|
||||
break;
|
||||
case READ_SUSPEND:
|
||||
ch.pipeline().fireChannelReadSuspended();
|
||||
break;
|
||||
case USER_EVENT:
|
||||
ch.pipeline().fireUserEventTriggered("");
|
||||
break;
|
||||
case FLUSH:
|
||||
ch.pipeline().flush();
|
||||
break;
|
||||
case READ:
|
||||
ch.pipeline().read();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ch.close().sync();
|
||||
|
||||
while (events.peekLast() != EventType.UNREGISTERED) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
expectedEvents.addFirst(EventType.ACTIVE);
|
||||
expectedEvents.addFirst(EventType.REGISTERED);
|
||||
expectedEvents.addLast(EventType.INACTIVE);
|
||||
expectedEvents.addLast(EventType.UNREGISTERED);
|
||||
|
||||
for (;;) {
|
||||
EventType event = events.poll();
|
||||
if (event == null) {
|
||||
Assert.assertTrue("Missing events:" + expectedEvents.toString(), expectedEvents.isEmpty());
|
||||
break;
|
||||
}
|
||||
Assert.assertEquals(event, expectedEvents.poll());
|
||||
}
|
||||
} finally {
|
||||
l.shutdown();
|
||||
e1.shutdown();
|
||||
e2.shutdown();
|
||||
e3.shutdown();
|
||||
e4.shutdown();
|
||||
e5.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static LinkedList<EventType> events(boolean inbound, int size) {
|
||||
EventType[] events;
|
||||
if (inbound) {
|
||||
events = new EventType[] {
|
||||
EventType.USER_EVENT, EventType.INBOUND_BuFFER_UPDATED, EventType.READ_SUSPEND, EventType.EXCEPTION_CAUGHT};
|
||||
} else {
|
||||
events = new EventType[] {
|
||||
EventType.READ, EventType.FLUSH, EventType.EXCEPTION_CAUGHT };
|
||||
}
|
||||
|
||||
Random random = new Random();
|
||||
LinkedList<EventType> expectedEvents = new LinkedList<EventType>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
expectedEvents.add(events[random.nextInt(events.length)]);
|
||||
}
|
||||
return expectedEvents;
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private static final class EventForwarder extends ChannelDuplexHandler {
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
ctx.flush(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class EventRecorder extends ChannelDuplexHandler {
|
||||
private final Queue<EventType> events;
|
||||
private final boolean inbound;
|
||||
|
||||
public EventRecorder(Queue<EventType> events, boolean inbound) {
|
||||
this.events = events;
|
||||
this.inbound = inbound;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
events.add(EventType.EXCEPTION_CAUGHT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (inbound) {
|
||||
events.add(EventType.USER_EVENT);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
|
||||
if (inbound) {
|
||||
events.add(EventType.READ_SUSPEND);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(EventType.INACTIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(EventType.ACTIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(EventType.UNREGISTERED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
events.add(EventType.REGISTERED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
if (inbound) {
|
||||
events.add(EventType.INBOUND_BuFFER_UPDATED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
if (!inbound) {
|
||||
events.add(EventType.FLUSH);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(ChannelHandlerContext ctx) {
|
||||
if (!inbound) {
|
||||
events.add(EventType.READ);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user