From d5963e069dacd757697349ab8c3b5e0d808b54e1 Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Wed, 11 Mar 2015 15:42:34 -0700 Subject: [PATCH] Remove Frame class from DefaultHttp2RemoteFlowController. Fixes #3488 Motivation: For every write of a flow controlled frame (HEADERS, DATA) we are allocating a Frame object that is not necessary anymore as it does not maintain any state, besides the payload. Modifications: Remove the Frame class and directly add the payload to the pending write queue. Result: One few object allocation per write of a flow controlled frame. --- .../DefaultHttp2RemoteFlowController.java | 205 ++++++++---------- 1 file changed, 96 insertions(+), 109 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 3cd72143f4..62420691f5 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -166,10 +166,9 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } @Override - public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, - FlowControlled payload) { + public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, FlowControlled frame) { checkNotNull(ctx, "ctx"); - checkNotNull(payload, "payload"); + checkNotNull(frame, "frame"); if (this.ctx != null && this.ctx != ctx) { throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported"); } @@ -178,16 +177,16 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll FlowState state; try { state = state(stream); - state.newFrame(payload); + state.enqueueFrame(frame); } catch (Throwable t) { - payload.error(t); + frame.error(t); return; } state.writeBytes(state.writableWindow()); try { flush(); } catch (Throwable t) { - payload.error(t); + frame.error(t); } } @@ -335,8 +334,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * The outbound flow control state for a single stream. */ - final class FlowState { - private final Deque pendingWriteQueue; + private final class FlowState { + private final Deque pendingWriteQueue; private final Http2Stream stream; private int window; private int pendingBytes; @@ -350,7 +349,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll FlowState(Http2Stream stream, int initialWindowSize) { this.stream = stream; window(initialWindowSize); - pendingWriteQueue = new ArrayDeque(2); + pendingWriteQueue = new ArrayDeque(2); } int window() { @@ -424,13 +423,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * Creates a new payload with the given values and immediately enqueues it. + * Adds the {@code frame} to the pending queue and increments the pending + * byte count. */ - Frame newFrame(FlowControlled payload) { - // Store this as the future for the most recent write attempt. - Frame frame = new Frame(payload); + void enqueueFrame(FlowControlled frame) { + incrementPendingBytes(frame.size()); pendingWriteQueue.offer(frame); - return frame; } /** @@ -443,7 +441,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll /** * Returns the the head of the pending queue, or {@code null} if empty. */ - Frame peek() { + FlowControlled peek() { return pendingWriteQueue.peek(); } @@ -458,12 +456,12 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return; } for (;;) { - Frame frame = pendingWriteQueue.poll(); + FlowControlled frame = pendingWriteQueue.poll(); if (frame == null) { break; } - frame.writeError(streamError(stream.id(), INTERNAL_ERROR, - "Stream closed before write could take place")); + writeError(frame, streamError(stream.id(), INTERNAL_ERROR, + "Stream closed before write could take place")); } } @@ -476,7 +474,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll int bytesAttempted = 0; while (hasFrame()) { int maxBytes = min(bytes - bytesAttempted, writableWindow()); - bytesAttempted += peek().write(maxBytes); + bytesAttempted += write(peek(), maxBytes); if (bytes - bytesAttempted <= 0) { break; } @@ -484,6 +482,50 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll return bytesAttempted; } + /** + * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending + * queue, the written bytes are removed from this branch of the priority tree. + *

+ * Note: this does not flush the {@link ChannelHandlerContext}. + *

+ */ + int write(FlowControlled frame, int allowedBytes) { + int before = frame.size(); + int writtenBytes = 0; + try { + assert !writing; + + // Write the portion of the frame. + writing = true; + needFlush |= frame.write(Math.max(0, allowedBytes)); + if (!cancelled && frame.size() == 0) { + // This frame has been fully written, remove this frame + // and notify it. Since we remove this frame + // first, we're guaranteed that its error method will not + // be called when we call cancel. + pendingWriteQueue.remove(); + frame.writeComplete(); + } + } catch (Throwable e) { + // Mark the state as cancelled, we'll clear the pending queue + // via cancel() below. + cancelled = true; + } finally { + writing = false; + // Make sure we always decrement the flow control windows + // by the bytes written. + writtenBytes = before - frame.size(); + decrementFlowControlWindow(writtenBytes); + decrementPendingBytes(writtenBytes); + // If a cancellation occurred while writing, call cancel again to + // clear and error all of the pending writes. + if (cancelled) { + cancel(); + } + } + return writtenBytes; + } + /** * Recursively increments the streamable bytes for this branch in the priority tree starting at the current * node. @@ -496,103 +538,48 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * A wrapper class around the content of a data frame. + * Increments the number of pending bytes for this node. If there was any change to the number of bytes that + * fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update + * this branch of the priority tree. */ - private final class Frame { - final FlowControlled payload; + void incrementPendingBytes(int numBytes) { + int previouslyStreamable = streamableBytes(); + pendingBytes += numBytes; - Frame(FlowControlled payload) { - this.payload = payload; - // Increment the number of pending bytes for this stream. - incrementPendingBytes(payload.size()); + int delta = streamableBytes() - previouslyStreamable; + if (delta != 0) { + incrementStreamableBytesForTree(delta); } + } - /** - * Increments the number of pending bytes for this node. If there was any change to the number of bytes that - * fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this - * branch of the priority tree. - */ - private void incrementPendingBytes(int numBytes) { - int previouslyStreamable = streamableBytes(); - pendingBytes += numBytes; + /** + * If this frame is in the pending queue, decrements the number of pending bytes for the stream. + */ + void decrementPendingBytes(int bytes) { + incrementPendingBytes(-bytes); + } - int delta = streamableBytes() - previouslyStreamable; - if (delta != 0) { - incrementStreamableBytesForTree(delta); - } + /** + * Decrement the per stream and connection flow control window by {@code bytes}. + */ + void decrementFlowControlWindow(int bytes) { + try { + int negativeBytes = -bytes; + connectionState().incrementStreamWindow(negativeBytes); + incrementStreamWindow(negativeBytes); + } catch (Http2Exception e) { + // Should never get here since we're decrementing. + throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e); } + } - /** - * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending - * queue, the written bytes are removed from this branch of the priority tree. - *

- * Note: this does not flush the {@link ChannelHandlerContext}. - */ - int write(int allowedBytes) { - int before = payload.size(); - int writtenBytes = 0; - try { - if (writing) { - throw new IllegalStateException("write is not re-entrant"); - } - // Write the portion of the frame. - writing = true; - needFlush |= payload.write(Math.max(0, allowedBytes)); - if (!cancelled && payload.size() == 0) { - // This frame has been fully written, remove this frame - // and notify the payload. Since we remove this frame - // first, we're guaranteed that its error method will not - // be called when we call cancel. - pendingWriteQueue.remove(); - payload.writeComplete(); - } - } catch (Throwable e) { - // Mark the state as cancelled, we'll clear the pending queue - // via cancel() below. - cancelled = true; - } finally { - writing = false; - // Make sure we always decrement the flow control windows - // by the bytes written. - writtenBytes = before - payload.size(); - decrementFlowControlWindow(writtenBytes); - decrementPendingBytes(writtenBytes); - // If a cancellation occurred while writing, call cancel again to - // clear and error all of the pending writes. - if (cancelled) { - cancel(); - } - } - return writtenBytes; - } - - /** - * Decrement the per stream and connection flow control window by {@code bytes}. - */ - void decrementFlowControlWindow(int bytes) { - try { - connectionState().incrementStreamWindow(-bytes); - incrementStreamWindow(-bytes); - } catch (Http2Exception e) { // Should never get here since we're decrementing. - throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e); - } - } - - /** - * Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are - * removed from this branch of the priority tree. - */ - void writeError(Http2Exception cause) { - decrementPendingBytes(payload.size()); - payload.error(cause); - } - - /** - * If this frame is in the pending queue, decrements the number of pending bytes for the stream. - */ - void decrementPendingBytes(int bytes) { - incrementPendingBytes(-bytes); - } + /** + * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue, + * the unwritten bytes are removed from this branch of the priority tree. + */ + void writeError(FlowControlled frame, Http2Exception cause) { + decrementPendingBytes(frame.size()); + frame.error(cause); } } }