+ * Note: this does not flush the {@link ChannelHandlerContext}. + *
+ */ + int write(FlowControlled frame, int allowedBytes) { + int before = frame.size(); + int writtenBytes = 0; + try { + assert !writing; + + // Write the portion of the frame. + writing = true; + needFlush |= frame.write(Math.max(0, allowedBytes)); + if (!cancelled && frame.size() == 0) { + // This frame has been fully written, remove this frame + // and notify it. Since we remove this frame + // first, we're guaranteed that its error method will not + // be called when we call cancel. + pendingWriteQueue.remove(); + frame.writeComplete(); + } + } catch (Throwable e) { + // Mark the state as cancelled, we'll clear the pending queue + // via cancel() below. + cancelled = true; + } finally { + writing = false; + // Make sure we always decrement the flow control windows + // by the bytes written. + writtenBytes = before - frame.size(); + decrementFlowControlWindow(writtenBytes); + decrementPendingBytes(writtenBytes); + // If a cancellation occurred while writing, call cancel again to + // clear and error all of the pending writes. + if (cancelled) { + cancel(); + } + } + return writtenBytes; + } + /** * Recursively increments the streamable bytes for this branch in the priority tree starting at the current * node. @@ -496,103 +538,48 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll } /** - * A wrapper class around the content of a data frame. + * Increments the number of pending bytes for this node. If there was any change to the number of bytes that + * fit into the stream window, then {@link #incrementStreamableBytesForTree} is called to recursively update + * this branch of the priority tree. */ - private final class Frame { - final FlowControlled payload; + void incrementPendingBytes(int numBytes) { + int previouslyStreamable = streamableBytes(); + pendingBytes += numBytes; - Frame(FlowControlled payload) { - this.payload = payload; - // Increment the number of pending bytes for this stream. - incrementPendingBytes(payload.size()); + int delta = streamableBytes() - previouslyStreamable; + if (delta != 0) { + incrementStreamableBytesForTree(delta); } + } - /** - * Increments the number of pending bytes for this node. If there was any change to the number of bytes that - * fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this - * branch of the priority tree. - */ - private void incrementPendingBytes(int numBytes) { - int previouslyStreamable = streamableBytes(); - pendingBytes += numBytes; + /** + * If this frame is in the pending queue, decrements the number of pending bytes for the stream. + */ + void decrementPendingBytes(int bytes) { + incrementPendingBytes(-bytes); + } - int delta = streamableBytes() - previouslyStreamable; - if (delta != 0) { - incrementStreamableBytesForTree(delta); - } + /** + * Decrement the per stream and connection flow control window by {@code bytes}. + */ + void decrementFlowControlWindow(int bytes) { + try { + int negativeBytes = -bytes; + connectionState().incrementStreamWindow(negativeBytes); + incrementStreamWindow(negativeBytes); + } catch (Http2Exception e) { + // Should never get here since we're decrementing. + throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e); } + } - /** - * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending - * queue, the written bytes are removed from this branch of the priority tree. - *- * Note: this does not flush the {@link ChannelHandlerContext}. - */ - int write(int allowedBytes) { - int before = payload.size(); - int writtenBytes = 0; - try { - if (writing) { - throw new IllegalStateException("write is not re-entrant"); - } - // Write the portion of the frame. - writing = true; - needFlush |= payload.write(Math.max(0, allowedBytes)); - if (!cancelled && payload.size() == 0) { - // This frame has been fully written, remove this frame - // and notify the payload. Since we remove this frame - // first, we're guaranteed that its error method will not - // be called when we call cancel. - pendingWriteQueue.remove(); - payload.writeComplete(); - } - } catch (Throwable e) { - // Mark the state as cancelled, we'll clear the pending queue - // via cancel() below. - cancelled = true; - } finally { - writing = false; - // Make sure we always decrement the flow control windows - // by the bytes written. - writtenBytes = before - payload.size(); - decrementFlowControlWindow(writtenBytes); - decrementPendingBytes(writtenBytes); - // If a cancellation occurred while writing, call cancel again to - // clear and error all of the pending writes. - if (cancelled) { - cancel(); - } - } - return writtenBytes; - } - - /** - * Decrement the per stream and connection flow control window by {@code bytes}. - */ - void decrementFlowControlWindow(int bytes) { - try { - connectionState().incrementStreamWindow(-bytes); - incrementStreamWindow(-bytes); - } catch (Http2Exception e) { // Should never get here since we're decrementing. - throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e); - } - } - - /** - * Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are - * removed from this branch of the priority tree. - */ - void writeError(Http2Exception cause) { - decrementPendingBytes(payload.size()); - payload.error(cause); - } - - /** - * If this frame is in the pending queue, decrements the number of pending bytes for the stream. - */ - void decrementPendingBytes(int bytes) { - incrementPendingBytes(-bytes); - } + /** + * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue, + * the unwritten bytes are removed from this branch of the priority tree. + */ + void writeError(FlowControlled frame, Http2Exception cause) { + decrementPendingBytes(frame.size()); + frame.error(cause); } } }