Moved AsyncPipelineModification to a new file

This commit is contained in:
Trustin Lee 2012-06-08 12:42:55 +09:00
parent 493e77a5a7
commit 353357fbd5
2 changed files with 66 additions and 48 deletions

View File

@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
/** /**
@ -93,7 +92,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addFirst0(name, nextCtx, newCtx); addFirst0(name, nextCtx, newCtx);
return this; return this;
} }
future = newCtx.executor().submit(new AsyncPipelineModification(this) { future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
checkDuplicateName(name); checkDuplicateName(name);
@ -151,7 +150,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addLast0(name, oldTail, newTail); addLast0(name, oldTail, newTail);
return this; return this;
} else { } else {
future = newTail.executor().submit(new AsyncPipelineModification(this) { future = newTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
checkDuplicateName(name); checkDuplicateName(name);
@ -207,7 +206,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addBefore0(name, ctx, newCtx); addBefore0(name, ctx, newCtx);
return this; return this;
} else { } else {
future = newCtx.executor().submit(new AsyncPipelineModification(this) { future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
checkDuplicateName(name); checkDuplicateName(name);
@ -268,7 +267,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addAfter0(name, ctx, newCtx); addAfter0(name, ctx, newCtx);
return this; return this;
} else { } else {
future = newCtx.executor().submit(new AsyncPipelineModification(this) { future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
checkDuplicateName(name); checkDuplicateName(name);
@ -401,7 +400,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
removeLast0(oldTail); removeLast0(oldTail);
return oldTail; return oldTail;
} else { } else {
future = oldTail.executor().submit(new AsyncPipelineModification(this) { future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
removeLast0(oldTail); removeLast0(oldTail);
@ -415,7 +414,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
remove0(ctx); remove0(ctx);
return ctx; return ctx;
} else { } else {
future = ctx.executor().submit(new AsyncPipelineModification(this) { future = ctx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
remove0(ctx); remove0(ctx);
@ -478,7 +477,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
removeLast0(oldTail); removeLast0(oldTail);
return oldTail.handler(); return oldTail.handler();
} else { } else {
future = oldTail.executor().submit(new AsyncPipelineModification(this) { future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
removeLast0(oldTail); removeLast0(oldTail);
@ -552,7 +551,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return ctx.handler(); return ctx.handler();
} else { } else {
future = oldTail.executor().submit(new AsyncPipelineModification(this) { future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
removeLast0(oldTail); removeLast0(oldTail);
@ -577,7 +576,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
replace0(ctx, newName, newCtx); replace0(ctx, newName, newCtx);
return ctx.handler(); return ctx.handler();
} else { } else {
future = newCtx.executor().submit(new AsyncPipelineModification(this) { future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
@Override @Override
void doCall() { void doCall() {
replace0(ctx, newName, newCtx); replace0(ctx, newName, newCtx);
@ -1693,41 +1692,3 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
} }
/**
* Custom {@link Callable} implementation which will catch all {@link Throwable} which happens
* during execution of {@link AsyncPipelineModification#doCall()} and return them in the
* {@link Future}. This allows to re-throw them later.
*
* It also handles the right synchronization of the {@link AsyncPipelineModification#doCall()}
* method.
*
* It was originally an inner class of {@link DefaultChannelPipeline}, but moved to a top level
* type to work around a compiler bug.
*/
abstract class AsyncPipelineModification implements Callable<Throwable> {
private final ChannelPipeline lock;
AsyncPipelineModification(ChannelPipeline lock) {
this.lock = lock;
}
@Override
public Throwable call() {
try {
synchronized (lock) {
doCall();
}
} catch (Throwable t) {
return t;
}
return null;
}
/**
* Execute the modification
*/
abstract void doCall();
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* Custom {@link Callable} implementation which will catch all {@link Throwable} which happens
* during execution of {@link DefaultChannelPipelineModificationTask#doCall()} and return them in the
* {@link Future}. This allows to re-throw them later.
*
* It also handles the right synchronization of the {@link DefaultChannelPipelineModificationTask#doCall()}
* method.
*
* It was originally an inner class of {@link DefaultChannelPipeline}, but moved to a top level
* type to work around a compiler bug.
*/
abstract class DefaultChannelPipelineModificationTask implements Callable<Throwable> {
private final ChannelPipeline lock;
DefaultChannelPipelineModificationTask(ChannelPipeline lock) {
this.lock = lock;
}
@Override
public Throwable call() {
try {
synchronized (lock) {
doCall();
}
} catch (Throwable t) {
return t;
}
return null;
}
/**
* Execute the modification
*/
abstract void doCall();
}