Remove Frame class from DefaultHttp2RemoteFlowController. Fixes #3488

Motivation:

For every write of a flow controlled frame (HEADERS, DATA) we are allocating
a Frame object that is not necessary anymore as it does not maintain any
state, besides the payload.

Modifications:

Remove the Frame class and directly add the payload to the pending write queue.

Result:

One few object allocation per write of a flow controlled frame.
This commit is contained in:
Jakob Buchgraber 2015-03-11 15:42:34 -07:00 committed by nmittler
parent 18443efeab
commit d5963e069d

View File

@ -166,10 +166,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
@Override
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream,
FlowControlled payload) {
public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) {
checkNotNull(ctx, "ctx");
checkNotNull(payload, "payload");
checkNotNull(frame, "frame");
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}
@ -178,16 +177,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
FlowState state;
try {
state = state(stream);
state.newFrame(payload);
state.enqueueFrame(frame);
} catch (Throwable t) {
payload.error(t);
frame.error(t);
return;
}
state.writeBytes(state.writableWindow());
try {
flush();
} catch (Throwable t) {
payload.error(t);
frame.error(t);
}
}
@ -335,8 +334,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* The outbound flow control state for a single stream.
*/
final class FlowState {
private final Deque<Frame> pendingWriteQueue;
private final class FlowState {
private final Deque<FlowControlled> pendingWriteQueue;
private final Http2Stream stream;
private int window;
private int pendingBytes;
@ -350,7 +349,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
FlowState(Http2Stream stream, int initialWindowSize) {
this.stream = stream;
window(initialWindowSize);
pendingWriteQueue = new ArrayDeque<Frame>(2);
pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
}
int window() {
@ -424,13 +423,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
* Creates a new payload with the given values and immediately enqueues it.
* Adds the {@code frame} to the pending queue and increments the pending
* byte count.
*/
Frame newFrame(FlowControlled payload) {
// Store this as the future for the most recent write attempt.
Frame frame = new Frame(payload);
void enqueueFrame(FlowControlled frame) {
incrementPendingBytes(frame.size());
pendingWriteQueue.offer(frame);
return frame;
}
/**
@ -443,7 +441,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* Returns the the head of the pending queue, or {@code null} if empty.
*/
Frame peek() {
FlowControlled peek() {
return pendingWriteQueue.peek();
}
@ -458,12 +456,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return;
}
for (;;) {
Frame frame = pendingWriteQueue.poll();
FlowControlled frame = pendingWriteQueue.poll();
if (frame == null) {
break;
}
frame.writeError(streamError(stream.id(), INTERNAL_ERROR,
"Stream closed before write could take place"));
writeError(frame, streamError(stream.id(), INTERNAL_ERROR,
"Stream closed before write could take place"));
}
}
@ -476,7 +474,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
int bytesAttempted = 0;
while (hasFrame()) {
int maxBytes = min(bytes - bytesAttempted, writableWindow());
bytesAttempted += peek().write(maxBytes);
bytesAttempted += write(peek(), maxBytes);
if (bytes - bytesAttempted <= 0) {
break;
}
@ -484,6 +482,50 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return bytesAttempted;
}
/**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* queue, the written bytes are removed from this branch of the priority tree.
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
* </p>
*/
int write(FlowControlled frame, int allowedBytes) {
int before = frame.size();
int writtenBytes = 0;
try {
assert !writing;
// Write the portion of the frame.
writing = true;
needFlush |= frame.write(Math.max(0, allowedBytes));
if (!cancelled && frame.size() == 0) {
// This frame has been fully written, remove this frame
// and notify it. Since we remove this frame
// first, we're guaranteed that its error method will not
// be called when we call cancel.
pendingWriteQueue.remove();
frame.writeComplete();
}
} catch (Throwable e) {
// Mark the state as cancelled, we'll clear the pending queue
// via cancel() below.
cancelled = true;
} finally {
writing = false;
// Make sure we always decrement the flow control windows
// by the bytes written.
writtenBytes = before - frame.size();
decrementFlowControlWindow(writtenBytes);
decrementPendingBytes(writtenBytes);
// If a cancellation occurred while writing, call cancel again to
// clear and error all of the pending writes.
if (cancelled) {
cancel();
}
}
return writtenBytes;
}
/**
* Recursively increments the streamable bytes for this branch in the priority tree starting at the current
* node.
@ -496,103 +538,48 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
}
/**
* A wrapper class around the content of a data frame.
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
* fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update
* this branch of the priority tree.
*/
private final class Frame {
final FlowControlled payload;
void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
pendingBytes += numBytes;
Frame(FlowControlled payload) {
this.payload = payload;
// Increment the number of pending bytes for this stream.
incrementPendingBytes(payload.size());
int delta = streamableBytes() - previouslyStreamable;
if (delta != 0) {
incrementStreamableBytesForTree(delta);
}
}
/**
* Increments the number of pending bytes for this node. If there was any change to the number of bytes that
* fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this
* branch of the priority tree.
*/
private void incrementPendingBytes(int numBytes) {
int previouslyStreamable = streamableBytes();
pendingBytes += numBytes;
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
*/
void decrementPendingBytes(int bytes) {
incrementPendingBytes(-bytes);
}
int delta = streamableBytes() - previouslyStreamable;
if (delta != 0) {
incrementStreamableBytesForTree(delta);
}
/**
* Decrement the per stream and connection flow control window by {@code bytes}.
*/
void decrementFlowControlWindow(int bytes) {
try {
int negativeBytes = -bytes;
connectionState().incrementStreamWindow(negativeBytes);
incrementStreamWindow(negativeBytes);
} catch (Http2Exception e) {
// Should never get here since we're decrementing.
throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
}
}
/**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* queue, the written bytes are removed from this branch of the priority tree.
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
*/
int write(int allowedBytes) {
int before = payload.size();
int writtenBytes = 0;
try {
if (writing) {
throw new IllegalStateException("write is not re-entrant");
}
// Write the portion of the frame.
writing = true;
needFlush |= payload.write(Math.max(0, allowedBytes));
if (!cancelled && payload.size() == 0) {
// This frame has been fully written, remove this frame
// and notify the payload. Since we remove this frame
// first, we're guaranteed that its error method will not
// be called when we call cancel.
pendingWriteQueue.remove();
payload.writeComplete();
}
} catch (Throwable e) {
// Mark the state as cancelled, we'll clear the pending queue
// via cancel() below.
cancelled = true;
} finally {
writing = false;
// Make sure we always decrement the flow control windows
// by the bytes written.
writtenBytes = before - payload.size();
decrementFlowControlWindow(writtenBytes);
decrementPendingBytes(writtenBytes);
// If a cancellation occurred while writing, call cancel again to
// clear and error all of the pending writes.
if (cancelled) {
cancel();
}
}
return writtenBytes;
}
/**
* Decrement the per stream and connection flow control window by {@code bytes}.
*/
void decrementFlowControlWindow(int bytes) {
try {
connectionState().incrementStreamWindow(-bytes);
incrementStreamWindow(-bytes);
} catch (Http2Exception e) { // Should never get here since we're decrementing.
throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
}
}
/**
* Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are
* removed from this branch of the priority tree.
*/
void writeError(Http2Exception cause) {
decrementPendingBytes(payload.size());
payload.error(cause);
}
/**
* If this frame is in the pending queue, decrements the number of pending bytes for the stream.
*/
void decrementPendingBytes(int bytes) {
incrementPendingBytes(-bytes);
}
/**
* Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
* the unwritten bytes are removed from this branch of the priority tree.
*/
void writeError(FlowControlled frame, Http2Exception cause) {
decrementPendingBytes(frame.size());
frame.error(cause);
}
}
}