Remove extra field from ChunkedWriteHandler to make it less error-prone (#9958)
Motivation: At the moment we use an extra field in ChunedWriteHandler to hold the current write. This is not needed and makes sense even more error-prone. We can just peek in the queue. Modifications: Use Queue.peek() to keep track of current write Result: Less error-prone code
This commit is contained in:
parent
34e3ea9ee8
commit
69cd042401
@ -72,7 +72,6 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
|
private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
|
||||||
private volatile ChannelHandlerContext ctx;
|
private volatile ChannelHandlerContext ctx;
|
||||||
private PendingWrite currentWrite;
|
|
||||||
|
|
||||||
public ChunkedWriteHandler() {
|
public ChunkedWriteHandler() {
|
||||||
}
|
}
|
||||||
@ -150,13 +149,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
private void discard(Throwable cause) {
|
private void discard(Throwable cause) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
PendingWrite currentWrite = this.currentWrite;
|
PendingWrite currentWrite = queue.poll();
|
||||||
|
|
||||||
if (this.currentWrite == null) {
|
|
||||||
currentWrite = queue.poll();
|
|
||||||
} else {
|
|
||||||
this.currentWrite = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentWrite == null) {
|
if (currentWrite == null) {
|
||||||
break;
|
break;
|
||||||
@ -206,9 +199,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
boolean requiresFlush = true;
|
boolean requiresFlush = true;
|
||||||
ByteBufAllocator allocator = ctx.alloc();
|
ByteBufAllocator allocator = ctx.alloc();
|
||||||
while (channel.isWritable()) {
|
while (channel.isWritable()) {
|
||||||
if (currentWrite == null) {
|
final PendingWrite currentWrite = queue.peek();
|
||||||
currentWrite = queue.poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentWrite == null) {
|
if (currentWrite == null) {
|
||||||
break;
|
break;
|
||||||
@ -224,11 +215,10 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
// as this had to be done already by someone who resolved the
|
// as this had to be done already by someone who resolved the
|
||||||
// promise (using ChunkedInput.close method).
|
// promise (using ChunkedInput.close method).
|
||||||
// See https://github.com/netty/netty/issues/8700.
|
// See https://github.com/netty/netty/issues/8700.
|
||||||
this.currentWrite = null;
|
queue.remove();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final PendingWrite currentWrite = this.currentWrite;
|
|
||||||
final Object pendingMessage = currentWrite.msg;
|
final Object pendingMessage = currentWrite.msg;
|
||||||
|
|
||||||
if (pendingMessage instanceof ChunkedInput) {
|
if (pendingMessage instanceof ChunkedInput) {
|
||||||
@ -247,7 +237,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
suspend = false;
|
suspend = false;
|
||||||
}
|
}
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
this.currentWrite = null;
|
queue.remove();
|
||||||
|
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
ReferenceCountUtil.release(message);
|
ReferenceCountUtil.release(message);
|
||||||
@ -273,7 +263,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
ChannelFuture f = ctx.write(message);
|
ChannelFuture f = ctx.write(message);
|
||||||
if (endOfInput) {
|
if (endOfInput) {
|
||||||
this.currentWrite = null;
|
queue.remove();
|
||||||
|
|
||||||
// Register a listener which will close the input once the write is complete.
|
// Register a listener which will close the input once the write is complete.
|
||||||
// This is needed because the Chunk may have some resource bound that can not
|
// This is needed because the Chunk may have some resource bound that can not
|
||||||
@ -328,7 +318,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
ctx.flush();
|
ctx.flush();
|
||||||
requiresFlush = false;
|
requiresFlush = false;
|
||||||
} else {
|
} else {
|
||||||
this.currentWrite = null;
|
queue.remove();
|
||||||
ctx.write(pendingMessage, currentWrite.promise);
|
ctx.write(pendingMessage, currentWrite.promise);
|
||||||
requiresFlush = true;
|
requiresFlush = true;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user