Remove Frame class from DefaultHttp2RemoteFlowController. Fixes #3488
Motivation: For every write of a flow controlled frame (HEADERS, DATA) we are allocating a Frame object that is not necessary anymore as it does not maintain any state, besides the payload. Modifications: Remove the Frame class and directly add the payload to the pending write queue. Result: One few object allocation per write of a flow controlled frame.
This commit is contained in:
parent
08b1438e7b
commit
d91cae2fc7
@ -166,10 +166,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream,
|
||||
FlowControlled payload) {
|
||||
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
|
||||
checkNotNull(ctx, "ctx");
|
||||
checkNotNull(payload, "payload");
|
||||
checkNotNull(frame, "frame");
|
||||
if (this.ctx != null && this.ctx != ctx) {
|
||||
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
|
||||
}
|
||||
@ -178,16 +177,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
FlowState state;
|
||||
try {
|
||||
state = state(stream);
|
||||
state.newFrame(payload);
|
||||
state.enqueueFrame(frame);
|
||||
} catch (Throwable t) {
|
||||
payload.error(t);
|
||||
frame.error(t);
|
||||
return;
|
||||
}
|
||||
state.writeBytes(state.writableWindow());
|
||||
try {
|
||||
flush();
|
||||
} catch (Throwable t) {
|
||||
payload.error(t);
|
||||
frame.error(t);
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,8 +334,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* The outbound flow control state for a single stream.
|
||||
*/
|
||||
final class FlowState {
|
||||
private final Deque<Frame> pendingWriteQueue;
|
||||
private final class FlowState {
|
||||
private final Deque<FlowControlled> pendingWriteQueue;
|
||||
private final Http2Stream stream;
|
||||
private int window;
|
||||
private int pendingBytes;
|
||||
@ -350,7 +349,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
FlowState(Http2Stream stream, int initialWindowSize) {
|
||||
this.stream = stream;
|
||||
window(initialWindowSize);
|
||||
pendingWriteQueue = new ArrayDeque<Frame>(2);
|
||||
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
|
||||
}
|
||||
|
||||
int window() {
|
||||
@ -424,13 +423,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new payload with the given values and immediately enqueues it.
|
||||
* Adds the {@code frame} to the pending queue and increments the pending
|
||||
* byte count.
|
||||
*/
|
||||
Frame newFrame(FlowControlled payload) {
|
||||
// Store this as the future for the most recent write attempt.
|
||||
Frame frame = new Frame(payload);
|
||||
void enqueueFrame(FlowControlled frame) {
|
||||
incrementPendingBytes(frame.size());
|
||||
pendingWriteQueue.offer(frame);
|
||||
return frame;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -443,7 +441,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* Returns the the head of the pending queue, or {@code null} if empty.
|
||||
*/
|
||||
Frame peek() {
|
||||
FlowControlled peek() {
|
||||
return pendingWriteQueue.peek();
|
||||
}
|
||||
|
||||
@ -458,12 +456,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return;
|
||||
}
|
||||
for (;;) {
|
||||
Frame frame = pendingWriteQueue.poll();
|
||||
FlowControlled frame = pendingWriteQueue.poll();
|
||||
if (frame == null) {
|
||||
break;
|
||||
}
|
||||
frame.writeError(streamError(stream.id(), INTERNAL_ERROR,
|
||||
"Stream closed before write could take place"));
|
||||
writeError(frame, streamError(stream.id(), INTERNAL_ERROR,
|
||||
"Stream closed before write could take place"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -476,7 +474,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
int bytesAttempted = 0;
|
||||
while (hasFrame()) {
|
||||
int maxBytes = min(bytes - bytesAttempted, writableWindow());
|
||||
bytesAttempted += peek().write(maxBytes);
|
||||
bytesAttempted += write(peek(), maxBytes);
|
||||
if (bytes - bytesAttempted <= 0) {
|
||||
break;
|
||||
}
|
||||
@ -484,6 +482,50 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
return bytesAttempted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
|
||||
* queue, the written bytes are removed from this branch of the priority tree.
|
||||
* <p>
|
||||
* Note: this does not flush the {@link ChannelHandlerContext}.
|
||||
* </p>
|
||||
*/
|
||||
int write(FlowControlled frame, int allowedBytes) {
|
||||
int before = frame.size();
|
||||
int writtenBytes = 0;
|
||||
try {
|
||||
assert !writing;
|
||||
|
||||
// Write the portion of the frame.
|
||||
writing = true;
|
||||
needFlush |= frame.write(Math.max(0, allowedBytes));
|
||||
if (!cancelled && frame.size() == 0) {
|
||||
// This frame has been fully written, remove this frame
|
||||
// and notify it. Since we remove this frame
|
||||
// first, we're guaranteed that its error method will not
|
||||
// be called when we call cancel.
|
||||
pendingWriteQueue.remove();
|
||||
frame.writeComplete();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// Mark the state as cancelled, we'll clear the pending queue
|
||||
// via cancel() below.
|
||||
cancelled = true;
|
||||
} finally {
|
||||
writing = false;
|
||||
// Make sure we always decrement the flow control windows
|
||||
// by the bytes written.
|
||||
writtenBytes = before - frame.size();
|
||||
decrementFlowControlWindow(writtenBytes);
|
||||
decrementPendingBytes(writtenBytes);
|
||||
// If a cancellation occurred while writing, call cancel again to
|
||||
// clear and error all of the pending writes.
|
||||
if (cancelled) {
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively increments the streamable bytes for this branch in the priority tree starting at the current
|
||||
* node.
|
||||
@ -496,103 +538,48 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper class around the content of a data frame.
|
||||
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
|
||||
* fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update
|
||||
* this branch of the priority tree.
|
||||
*/
|
||||
private final class Frame {
|
||||
final FlowControlled payload;
|
||||
void incrementPendingBytes(int numBytes) {
|
||||
int previouslyStreamable = streamableBytes();
|
||||
pendingBytes += numBytes;
|
||||
|
||||
Frame(FlowControlled payload) {
|
||||
this.payload = payload;
|
||||
// Increment the number of pending bytes for this stream.
|
||||
incrementPendingBytes(payload.size());
|
||||
int delta = streamableBytes() - previouslyStreamable;
|
||||
if (delta != 0) {
|
||||
incrementStreamableBytesForTree(delta);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
|
||||
* fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this
|
||||
* branch of the priority tree.
|
||||
*/
|
||||
private void incrementPendingBytes(int numBytes) {
|
||||
int previouslyStreamable = streamableBytes();
|
||||
pendingBytes += numBytes;
|
||||
/**
|
||||
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
|
||||
*/
|
||||
void decrementPendingBytes(int bytes) {
|
||||
incrementPendingBytes(-bytes);
|
||||
}
|
||||
|
||||
int delta = streamableBytes() - previouslyStreamable;
|
||||
if (delta != 0) {
|
||||
incrementStreamableBytesForTree(delta);
|
||||
}
|
||||
/**
|
||||
* Decrement the per stream and connection flow control window by {@code bytes}.
|
||||
*/
|
||||
void decrementFlowControlWindow(int bytes) {
|
||||
try {
|
||||
int negativeBytes = -bytes;
|
||||
connectionState().incrementStreamWindow(negativeBytes);
|
||||
incrementStreamWindow(negativeBytes);
|
||||
} catch (Http2Exception e) {
|
||||
// Should never get here since we're decrementing.
|
||||
throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
|
||||
* queue, the written bytes are removed from this branch of the priority tree.
|
||||
* <p>
|
||||
* Note: this does not flush the {@link ChannelHandlerContext}.
|
||||
*/
|
||||
int write(int allowedBytes) {
|
||||
int before = payload.size();
|
||||
int writtenBytes = 0;
|
||||
try {
|
||||
if (writing) {
|
||||
throw new IllegalStateException("write is not re-entrant");
|
||||
}
|
||||
// Write the portion of the frame.
|
||||
writing = true;
|
||||
needFlush |= payload.write(Math.max(0, allowedBytes));
|
||||
if (!cancelled && payload.size() == 0) {
|
||||
// This frame has been fully written, remove this frame
|
||||
// and notify the payload. Since we remove this frame
|
||||
// first, we're guaranteed that its error method will not
|
||||
// be called when we call cancel.
|
||||
pendingWriteQueue.remove();
|
||||
payload.writeComplete();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// Mark the state as cancelled, we'll clear the pending queue
|
||||
// via cancel() below.
|
||||
cancelled = true;
|
||||
} finally {
|
||||
writing = false;
|
||||
// Make sure we always decrement the flow control windows
|
||||
// by the bytes written.
|
||||
writtenBytes = before - payload.size();
|
||||
decrementFlowControlWindow(writtenBytes);
|
||||
decrementPendingBytes(writtenBytes);
|
||||
// If a cancellation occurred while writing, call cancel again to
|
||||
// clear and error all of the pending writes.
|
||||
if (cancelled) {
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the per stream and connection flow control window by {@code bytes}.
|
||||
*/
|
||||
void decrementFlowControlWindow(int bytes) {
|
||||
try {
|
||||
connectionState().incrementStreamWindow(-bytes);
|
||||
incrementStreamWindow(-bytes);
|
||||
} catch (Http2Exception e) { // Should never get here since we're decrementing.
|
||||
throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are
|
||||
* removed from this branch of the priority tree.
|
||||
*/
|
||||
void writeError(Http2Exception cause) {
|
||||
decrementPendingBytes(payload.size());
|
||||
payload.error(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
|
||||
*/
|
||||
void decrementPendingBytes(int bytes) {
|
||||
incrementPendingBytes(-bytes);
|
||||
}
|
||||
/**
|
||||
* Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
|
||||
* the unwritten bytes are removed from this branch of the priority tree.
|
||||
*/
|
||||
void writeError(FlowControlled frame, Http2Exception cause) {
|
||||
decrementPendingBytes(frame.size());
|
||||
frame.error(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user