This commit is contained in:
parent
3b729848dc
commit
0bd73b8d80
@ -212,6 +212,46 @@ public interface ChannelHandlerContext
|
||||
*/
|
||||
<T> MessageBuf<T> outboundMessageBuffer();
|
||||
|
||||
/**
|
||||
* Replaces the inbound byte buffer with the given buffer. This returns the
|
||||
* old buffer, so any readable bytes can be handled appropriately by the caller.
|
||||
*
|
||||
* @param newInboundByteBuf the new inbound byte buffer
|
||||
* @return the old buffer.
|
||||
* @throws NullPointerException if the argument is {@code null}.
|
||||
*/
|
||||
ByteBuf replaceInboundByteBuffer(ByteBuf newInboundByteBuf);
|
||||
|
||||
/**
|
||||
* Replaces the inbound message buffer with the given buffer. This returns the
|
||||
* old buffer, so any pending messages can be handled appropriately by the caller.
|
||||
*
|
||||
* @param newInboundMsgBuf the new inbound message buffer
|
||||
* @return the old buffer.
|
||||
* @throws NullPointerException if the argument is {@code null}.
|
||||
*/
|
||||
<T> MessageBuf<T> replaceInboundMessageBuffer(MessageBuf<T> newInboundMsgBuf);
|
||||
|
||||
/**
|
||||
* Replaces the outbound byte buffer with the given buffer. This returns the
|
||||
* old buffer, so any readable bytes can be handled appropriately by the caller.
|
||||
*
|
||||
* @param newOutboundByteBuf the new inbound byte buffer
|
||||
* @return the old buffer.
|
||||
* @throws NullPointerException if the argument is {@code null}.
|
||||
*/
|
||||
ByteBuf replaceOutboundByteBuffer(ByteBuf newOutboundByteBuf);
|
||||
|
||||
/**
|
||||
* Replaces the outbound message buffer with the given buffer. This returns the
|
||||
* old buffer, so any pending messages can be handled appropriately by the caller.
|
||||
*
|
||||
* @param newOutboundMsgBuf the new inbound message buffer
|
||||
* @return the old buffer.
|
||||
* @throws NullPointerException if the argument is {@code null}.
|
||||
*/
|
||||
<T> MessageBuf<T> replaceOutboundMessageBuffer(MessageBuf<T> newOutboundMsgBuf);
|
||||
|
||||
/**
|
||||
* Return {@code true} if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling
|
||||
* inbound data.
|
||||
|
@ -27,7 +27,10 @@ import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -48,10 +51,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
final int directions;
|
||||
private final ChannelHandler handler;
|
||||
|
||||
final MessageBuf<Object> inMsgBuf;
|
||||
final ByteBuf inByteBuf;
|
||||
final MessageBuf<Object> outMsgBuf;
|
||||
final ByteBuf outByteBuf;
|
||||
MessageBuf<Object> inMsgBuf;
|
||||
ByteBuf inByteBuf;
|
||||
MessageBuf<Object> outMsgBuf;
|
||||
ByteBuf outByteBuf;
|
||||
|
||||
// When the two handlers run in a different thread and they are next to each other,
|
||||
// each other's buffers can be accessed at the same time resulting in a race condition.
|
||||
@ -447,6 +450,222 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return (MessageBuf<T>) outMsgBuf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
|
||||
* current thread will be interrupted and this will return {@code null}. It is expected that the task
|
||||
* performs any appropriate locking.
|
||||
* <p>
|
||||
* If the {@link Callable#call()} call throws a {@link Throwable}, but it is not an instance of
|
||||
* {@link Error}, {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an
|
||||
* {@link AssertionError} and that is thrown instead.</p>
|
||||
*
|
||||
* @param c execute this callable and return its value
|
||||
* @param <T> the return value type
|
||||
* @return the task's return value, or {@code null} if the task was interrupted.
|
||||
* @see Callable#call()
|
||||
* @see Future#get()
|
||||
* @throws Error if the task threw this.
|
||||
* @throws RuntimeException if the task threw this.
|
||||
* @throws Exception if the task threw this.
|
||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
||||
* {@link Throwable}.
|
||||
*/
|
||||
<T> T executeOnEventLoop(Callable<T> c) throws Exception {
|
||||
return getFromFuture(executor().submit(c));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
|
||||
* current thread will be interrupted. It is expected that the task performs any appropriate locking.
|
||||
* <p>
|
||||
* If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of
|
||||
* {@link Error} or {@link RuntimeException}, then it is wrapped inside a
|
||||
* {@link ChannelPipelineException} and that is thrown instead.</p>
|
||||
*
|
||||
* @param r execute this runnable
|
||||
* @see Runnable#run()
|
||||
* @see Future#get()
|
||||
* @throws Error if the task threw this.
|
||||
* @throws RuntimeException if the task threw this.
|
||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
||||
* {@link Throwable}.
|
||||
*/
|
||||
void executeOnEventLoop(Runnable r) {
|
||||
waitForFuture(executor().submit(r));
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a future to finish and gets the result. If the task is interrupted, then the current thread
|
||||
* will be interrupted and this will return {@code null}. It is expected that the task performs any
|
||||
* appropriate locking.
|
||||
* <p>
|
||||
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error},
|
||||
* {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an {@link AssertionError}
|
||||
* and that is thrown instead.</p>
|
||||
*
|
||||
* @param future wait for this future
|
||||
* @param <T> the return value type
|
||||
* @return the task's return value, or {@code null} if the task was interrupted.
|
||||
* @see Future#get()
|
||||
* @throws Error if the task threw this.
|
||||
* @throws RuntimeException if the task threw this.
|
||||
* @throws Exception if the task threw this.
|
||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
||||
* {@link Throwable}.
|
||||
*/
|
||||
<T> T getFromFuture(Future<T> future) throws Exception {
|
||||
try {
|
||||
return future.get();
|
||||
} catch (ExecutionException ex) {
|
||||
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
||||
|
||||
Throwable t = ex.getCause();
|
||||
if (t instanceof Error) { throw (Error) t; }
|
||||
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
|
||||
if (t instanceof Exception) { throw (Exception) t; }
|
||||
throw new ChannelPipelineException(t);
|
||||
} catch (InterruptedException ex) {
|
||||
// Interrupt the calling thread (note that this method is not called from the event loop)
|
||||
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
||||
* It is expected that the task performs any appropriate locking.
|
||||
* <p>
|
||||
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
|
||||
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
|
||||
* thrown instead.</p>
|
||||
*
|
||||
* @param future wait for this future
|
||||
* @see Future#get()
|
||||
* @throws Error if the task threw this.
|
||||
* @throws RuntimeException if the task threw this.
|
||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
||||
* {@link Throwable}.
|
||||
*/
|
||||
void waitForFuture(Future future) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (ExecutionException ex) {
|
||||
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
||||
|
||||
Throwable t = ex.getCause();
|
||||
if (t instanceof Error) { throw (Error) t; }
|
||||
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
|
||||
throw new ChannelPipelineException(t);
|
||||
} catch (InterruptedException ex) {
|
||||
// Interrupt the calling thread (note that this method is not called from the event loop)
|
||||
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf replaceInboundByteBuffer(final ByteBuf newInboundByteBuf) {
|
||||
if (newInboundByteBuf == null) {
|
||||
throw new NullPointerException("newInboundByteBuf");
|
||||
}
|
||||
|
||||
if (!executor().inEventLoop()) {
|
||||
try {
|
||||
return executeOnEventLoop(new Callable<ByteBuf>() {
|
||||
@Override
|
||||
public ByteBuf call() {
|
||||
return replaceInboundByteBuffer(newInboundByteBuf);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
// Ignore because call() does not throw an Exception
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuf currentInboundByteBuf = inboundByteBuffer();
|
||||
|
||||
this.inByteBuf = newInboundByteBuf;
|
||||
return currentInboundByteBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> MessageBuf<T> replaceInboundMessageBuffer(final MessageBuf<T> newInboundMsgBuf) {
|
||||
if (newInboundMsgBuf == null) {
|
||||
throw new NullPointerException("newInboundMsgBuf");
|
||||
}
|
||||
|
||||
if (!executor().inEventLoop()) {
|
||||
try {
|
||||
return executeOnEventLoop(new Callable<MessageBuf<T>>() {
|
||||
@Override
|
||||
public MessageBuf<T> call() {
|
||||
return replaceInboundMessageBuffer(newInboundMsgBuf);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
// Ignore because call() does not throw an Exception
|
||||
}
|
||||
}
|
||||
|
||||
MessageBuf<T> currentInboundMsgBuf = inboundMessageBuffer();
|
||||
|
||||
this.inMsgBuf = (MessageBuf<Object>) newInboundMsgBuf;
|
||||
return currentInboundMsgBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf replaceOutboundByteBuffer(final ByteBuf newOutboundByteBuf) {
|
||||
if (newOutboundByteBuf == null) {
|
||||
throw new NullPointerException("newOutboundByteBuf");
|
||||
}
|
||||
|
||||
if (!executor().inEventLoop()) {
|
||||
try {
|
||||
return executeOnEventLoop(new Callable<ByteBuf>() {
|
||||
@Override
|
||||
public ByteBuf call() {
|
||||
return replaceOutboundByteBuffer(newOutboundByteBuf);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
// Ignore because call() does not throw an Exception
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuf currentOutboundByteBuf = outboundByteBuffer();
|
||||
|
||||
this.outByteBuf = newOutboundByteBuf;
|
||||
return currentOutboundByteBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> MessageBuf<T> replaceOutboundMessageBuffer(final MessageBuf<T> newOutboundMsgBuf) {
|
||||
if (newOutboundMsgBuf == null) {
|
||||
throw new NullPointerException("newOutboundMsgBuf");
|
||||
}
|
||||
|
||||
if (!executor().inEventLoop()) {
|
||||
try {
|
||||
return executeOnEventLoop(new Callable<MessageBuf<T>>() {
|
||||
@Override
|
||||
public MessageBuf<T> call() {
|
||||
return replaceOutboundMessageBuffer(newOutboundMsgBuf);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
// Ignore because call() does not throw an Exception
|
||||
}
|
||||
}
|
||||
|
||||
MessageBuf<T> currentOutboundMsgBuf = outboundMessageBuffer();
|
||||
|
||||
this.outMsgBuf = (MessageBuf<Object>) newOutboundMsgBuf;
|
||||
return currentOutboundMsgBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNextInboundByteBuffer() {
|
||||
DefaultChannelHandlerContext ctx = next;
|
||||
|
@ -82,44 +82,35 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, final ChannelHandler handler) {
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final DefaultChannelHandlerContext nextCtx;
|
||||
final DefaultChannelHandlerContext newCtx;
|
||||
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
final DefaultChannelHandlerContext nextCtx = head.next;
|
||||
final DefaultChannelHandlerContext newCtx =
|
||||
new DefaultChannelHandlerContext(this, group, head, nextCtx, name, handler);
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
nextCtx = head.next;
|
||||
newCtx = new DefaultChannelHandlerContext(this, group, head, nextCtx, name, handler);
|
||||
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addFirst0(name, nextCtx, newCtx);
|
||||
return this;
|
||||
}
|
||||
future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addFirst0(name, nextCtx, newCtx);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
newCtx.executeOnEventLoop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
checkDuplicateName(name);
|
||||
addFirst0(name, nextCtx, newCtx);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Call Future.get() outside of the synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
return this;
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addFirst0(
|
||||
@ -141,46 +132,36 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, final String name, final ChannelHandler handler) {
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final DefaultChannelHandlerContext oldTail;
|
||||
final DefaultChannelHandlerContext newTail;
|
||||
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
|
||||
final DefaultChannelHandlerContext oldTail = tail;
|
||||
final DefaultChannelHandlerContext newTail =
|
||||
new DefaultChannelHandlerContext(this, group, oldTail, null, name, handler);
|
||||
oldTail = tail;
|
||||
newTail = new DefaultChannelHandlerContext(this, group, oldTail, null, name, handler);
|
||||
|
||||
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
|
||||
addLast0(name, oldTail, newTail);
|
||||
return this;
|
||||
} else {
|
||||
future = newTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
checkDuplicateName(name);
|
||||
addLast0(name, oldTail, newTail);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
|
||||
addLast0(name, oldTail, newTail);
|
||||
return this;
|
||||
}
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
return this;
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
newTail.executeOnEventLoop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
checkDuplicateName(name);
|
||||
addLast0(name, oldTail, newTail);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addLast0(
|
||||
@ -201,46 +182,35 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addBefore(
|
||||
EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) {
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||
final DefaultChannelHandlerContext ctx;
|
||||
final DefaultChannelHandlerContext newCtx;
|
||||
|
||||
synchronized (this) {
|
||||
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
final DefaultChannelHandlerContext newCtx =
|
||||
new DefaultChannelHandlerContext(this, group, ctx.prev, ctx, name, handler);
|
||||
synchronized (this) {
|
||||
ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
newCtx = new DefaultChannelHandlerContext(this, group, ctx.prev, ctx, name, handler);
|
||||
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addBefore0(name, ctx, newCtx);
|
||||
return this;
|
||||
} else {
|
||||
future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
checkDuplicateName(name);
|
||||
addBefore0(name, ctx, newCtx);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addBefore0(name, ctx, newCtx);
|
||||
return this;
|
||||
}
|
||||
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
return this;
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
newCtx.executeOnEventLoop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
checkDuplicateName(name);
|
||||
addBefore0(name, ctx, newCtx);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
||||
@ -260,49 +230,38 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addAfter(
|
||||
EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) {
|
||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||
final DefaultChannelHandlerContext ctx;
|
||||
final DefaultChannelHandlerContext newCtx;
|
||||
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
|
||||
synchronized (this) {
|
||||
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
if (ctx == tail) {
|
||||
return addLast(name, handler);
|
||||
}
|
||||
checkDuplicateName(name);
|
||||
final DefaultChannelHandlerContext newCtx =
|
||||
new DefaultChannelHandlerContext(this, group, ctx, ctx.next, name, handler);
|
||||
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addAfter0(name, ctx, newCtx);
|
||||
return this;
|
||||
} else {
|
||||
future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
checkDuplicateName(name);
|
||||
addAfter0(name, ctx, newCtx);
|
||||
}
|
||||
});
|
||||
}
|
||||
synchronized (this) {
|
||||
ctx = getContextOrDie(baseName);
|
||||
if (ctx == tail) {
|
||||
return addLast(name, handler);
|
||||
}
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
return this;
|
||||
checkDuplicateName(name);
|
||||
newCtx = new DefaultChannelHandlerContext(this, group, ctx, ctx.next, name, handler);
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addAfter0(name, ctx, newCtx);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
newCtx.executeOnEventLoop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
checkDuplicateName(name);
|
||||
addAfter0(name, ctx, newCtx);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
|
||||
@ -394,65 +353,59 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
|
||||
try {
|
||||
DefaultChannelHandlerContext context;
|
||||
Future<Throwable> future;
|
||||
synchronized (this) {
|
||||
if (head == tail) {
|
||||
return null;
|
||||
} else if (ctx == head) {
|
||||
throw new Error(); // Should never happen.
|
||||
} else if (ctx == tail) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
DefaultChannelHandlerContext context;
|
||||
Future future;
|
||||
|
||||
final DefaultChannelHandlerContext oldTail = tail;
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
return oldTail;
|
||||
} else {
|
||||
future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
synchronized (this) {
|
||||
if (head == tail) {
|
||||
return null;
|
||||
} else if (ctx == head) {
|
||||
throw new Error(); // Should never happen.
|
||||
} else if (ctx == tail) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
final DefaultChannelHandlerContext oldTail = tail;
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
return oldTail;
|
||||
} else {
|
||||
future = oldTail.executor().submit(new Runnable() {
|
||||
@Override
|
||||
void doCall() {
|
||||
removeLast0(oldTail);
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
removeLast0(oldTail);
|
||||
}
|
||||
}
|
||||
});
|
||||
context = oldTail;
|
||||
}
|
||||
context = oldTail;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
|
||||
remove0(ctx);
|
||||
return ctx;
|
||||
} else {
|
||||
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
|
||||
remove0(ctx);
|
||||
return ctx;
|
||||
} else {
|
||||
future = ctx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
future = ctx.executor().submit(new Runnable() {
|
||||
@Override
|
||||
void doCall() {
|
||||
remove0(ctx);
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
remove0(ctx);
|
||||
}
|
||||
}
|
||||
});
|
||||
context = ctx;
|
||||
}
|
||||
context = ctx;
|
||||
}
|
||||
}
|
||||
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
|
||||
return context;
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
context.waitForFuture(future);
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
private void remove0(DefaultChannelHandlerContext ctx) {
|
||||
@ -480,43 +433,32 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelHandler removeLast() {
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
final DefaultChannelHandlerContext oldTail;
|
||||
synchronized (this) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
oldTail = tail;
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
return oldTail.handler();
|
||||
} else {
|
||||
future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
removeLast0(oldTail);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
final DefaultChannelHandlerContext oldTail;
|
||||
|
||||
return oldTail.handler();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
synchronized (this) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
oldTail = tail;
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
return oldTail.handler();
|
||||
}
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
oldTail.executeOnEventLoop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
removeLast0(oldTail);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return oldTail.handler();
|
||||
}
|
||||
|
||||
private void removeLast0(DefaultChannelHandlerContext oldTail) {
|
||||
@ -550,78 +492,76 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
private ChannelHandler replace(
|
||||
final DefaultChannelHandlerContext ctx, final String newName, final ChannelHandler newHandler) {
|
||||
try {
|
||||
Future<Throwable> future;
|
||||
synchronized (this) {
|
||||
if (ctx == head) {
|
||||
throw new IllegalArgumentException();
|
||||
} else if (ctx == tail) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
final DefaultChannelHandlerContext oldTail = tail;
|
||||
final DefaultChannelHandlerContext newTail =
|
||||
new DefaultChannelHandlerContext(this, null, oldTail, null, newName, newHandler);
|
||||
final DefaultChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
|
||||
Future future;
|
||||
DefaultChannelHandlerContext context;
|
||||
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
checkDuplicateName(newName);
|
||||
addLast0(newName, tail, newTail);
|
||||
return ctx.handler();
|
||||
synchronized (this) {
|
||||
if (ctx == head) {
|
||||
throw new IllegalArgumentException();
|
||||
} else if (ctx == tail) {
|
||||
if (head == tail) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
final DefaultChannelHandlerContext oldTail = tail;
|
||||
final DefaultChannelHandlerContext newTail =
|
||||
new DefaultChannelHandlerContext(this, null, oldTail, null, newName, newHandler);
|
||||
|
||||
} else {
|
||||
future = oldTail.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
@Override
|
||||
void doCall() {
|
||||
removeLast0(oldTail);
|
||||
checkDuplicateName(newName);
|
||||
addLast0(newName, tail, newTail);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
checkDuplicateName(newName);
|
||||
addLast0(newName, tail, newTail);
|
||||
return ctx.handler();
|
||||
|
||||
} else {
|
||||
boolean sameName = ctx.name().equals(newName);
|
||||
if (!sameName) {
|
||||
checkDuplicateName(newName);
|
||||
}
|
||||
|
||||
DefaultChannelHandlerContext prev = ctx.prev;
|
||||
DefaultChannelHandlerContext next = ctx.next;
|
||||
|
||||
final DefaultChannelHandlerContext newCtx =
|
||||
new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler);
|
||||
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
replace0(ctx, newName, newCtx);
|
||||
return ctx.handler();
|
||||
} else {
|
||||
future = newCtx.executor().submit(new DefaultChannelPipelineModificationTask(this) {
|
||||
future = oldTail.executor().submit(new Runnable() {
|
||||
@Override
|
||||
void doCall() {
|
||||
replace0(ctx, newName, newCtx);
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
removeLast0(oldTail);
|
||||
checkDuplicateName(newName);
|
||||
addLast0(newName, tail, newTail);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
context = oldTail;
|
||||
}
|
||||
|
||||
} else {
|
||||
boolean sameName = ctx.name().equals(newName);
|
||||
if (!sameName) {
|
||||
checkDuplicateName(newName);
|
||||
}
|
||||
|
||||
DefaultChannelHandlerContext prev = ctx.prev;
|
||||
DefaultChannelHandlerContext next = ctx.next;
|
||||
|
||||
final DefaultChannelHandlerContext newCtx =
|
||||
new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler);
|
||||
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
replace0(ctx, newName, newCtx);
|
||||
return ctx.handler();
|
||||
} else {
|
||||
future = newCtx.executor().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
replace0(ctx, newName, newCtx);
|
||||
}
|
||||
}
|
||||
});
|
||||
context = newCtx;
|
||||
}
|
||||
}
|
||||
// Call Future.get() outside of synchronized block to prevent dead-lock
|
||||
Throwable result = future.get();
|
||||
if (result != null) {
|
||||
// re-throw exception that was caught
|
||||
throw result;
|
||||
}
|
||||
|
||||
return ctx.handler();
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Error e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelPipelineException(t);
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
context.waitForFuture(future);
|
||||
|
||||
return ctx.handler();
|
||||
}
|
||||
|
||||
private void replace0(DefaultChannelHandlerContext ctx, String newName, DefaultChannelHandlerContext newCtx) {
|
||||
|
@ -1,57 +0,0 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* Custom {@link Callable} implementation which will catch all {@link Throwable} which happens
|
||||
* during execution of {@link DefaultChannelPipelineModificationTask#doCall()} and return them in the
|
||||
* {@link Future}. This allows to re-throw them later.
|
||||
*
|
||||
* It also handles the right synchronization of the {@link DefaultChannelPipelineModificationTask#doCall()}
|
||||
* method.
|
||||
*
|
||||
* It was originally an inner class of {@link DefaultChannelPipeline}, but moved to a top level
|
||||
* type to work around a compiler bug.
|
||||
*/
|
||||
abstract class DefaultChannelPipelineModificationTask implements Callable<Throwable> {
|
||||
|
||||
private final ChannelPipeline lock;
|
||||
|
||||
DefaultChannelPipelineModificationTask(ChannelPipeline lock) {
|
||||
this.lock = lock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable call() {
|
||||
try {
|
||||
synchronized (lock) {
|
||||
doCall();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
return t;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the modification
|
||||
*/
|
||||
abstract void doCall();
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user