Http2ConnectionHandler and Http2FrameListener cyclic dependency

Motivation:
It is often the case that implementations of Http2FrameListener will want to send responses when data is read. The Http2FrameListener needs access to the Http2ConnectionHandler (or the encoder contained within) to be able to send responses. However the Http2ConnectionHandler requires a Http2FrameListener instance to be passed in during construction time. This creates a cyclic dependency which can make it difficult to cleanly accomplish this relationship.

Modifications:
- Add Http2ConnectionDecoder.frameListener(..) method to set the frame listener. This will allow the listener to be set after construction.

Result:
Classes which inherit from Http2ConnectionHandler can more cleanly set the Http2FrameListener.
This commit is contained in:
Scott Mitchell 2015-09-30 15:22:57 -07:00
parent 2ffe7bd72e
commit 1485a87e25
14 changed files with 157 additions and 117 deletions

View File

@ -47,8 +47,8 @@ public class DecoratingHttp2ConnectionDecoder implements Http2ConnectionDecoder
}
@Override
public Http2FrameListener listener() {
return delegate.listener();
public void frameListener(Http2FrameListener listener) {
delegate.frameListener(listener);
}
@Override

View File

@ -44,25 +44,22 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private Http2LifecycleManager lifecycleManager;
private final Http2ConnectionEncoder encoder;
private final Http2FrameReader frameReader;
private final Http2FrameListener listener;
private Http2FrameListener listener;
private final Http2PromisedRequestVerifier requestVerifier;
public DefaultHttp2ConnectionDecoder(Http2Connection connection,
Http2ConnectionEncoder encoder,
Http2FrameReader frameReader,
Http2FrameListener listener) {
this(connection, encoder, frameReader, listener, ALWAYS_VERIFY);
Http2FrameReader frameReader) {
this(connection, encoder, frameReader, ALWAYS_VERIFY);
}
public DefaultHttp2ConnectionDecoder(Http2Connection connection,
Http2ConnectionEncoder encoder,
Http2FrameReader frameReader,
Http2FrameListener listener,
Http2PromisedRequestVerifier requestVerifier) {
this.connection = checkNotNull(connection, "connection");
this.frameReader = checkNotNull(frameReader, "frameReader");
this.encoder = checkNotNull(encoder, "encoder");
this.listener = checkNotNull(listener, "listener");
this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier");
if (connection.local().flowController() == null) {
connection.local().flowController(
@ -86,8 +83,8 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
}
@Override
public Http2FrameListener listener() {
return listener;
public void frameListener(Http2FrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override

View File

@ -44,9 +44,11 @@ public interface Http2ConnectionDecoder extends Closeable {
Http2LocalFlowController flowController();
/**
* Provides direct access to the underlying frame listener.
* Set the {@link Http2FrameListener} which will be notified when frames are decoded.
* <p>
* This <strong>must</strong> be set before frames are decoded.
*/
Http2FrameListener listener();
void frameListener(Http2FrameListener listener);
/**
* Called by the {@link Http2ConnectionHandler} to decode the next frame from the input buffer.

View File

@ -71,27 +71,27 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
private BaseDecoder byteDecoder;
private long gracefulShutdownTimeoutMillis = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(server, listener, true);
public Http2ConnectionHandler(boolean server) {
this(server, true);
}
public Http2ConnectionHandler(boolean server, Http2FrameListener listener, boolean validateHeaders) {
this(new DefaultHttp2Connection(server), listener, validateHeaders);
public Http2ConnectionHandler(boolean server, boolean validateHeaders) {
this(new DefaultHttp2Connection(server), validateHeaders);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, true);
public Http2ConnectionHandler(Http2Connection connection) {
this(connection, true);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener, boolean validateHeaders) {
this(connection, new DefaultHttp2FrameReader(validateHeaders), new DefaultHttp2FrameWriter(), listener);
public Http2ConnectionHandler(Http2Connection connection, boolean validateHeaders) {
this(connection, new DefaultHttp2FrameReader(validateHeaders), new DefaultHttp2FrameWriter());
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener) {
Http2FrameWriter frameWriter) {
initialSettings = null;
encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, listener);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
}
/**
@ -108,18 +108,16 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2Settings initialSettings) {
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(), listener,
public Http2ConnectionHandler(Http2Connection connection, Http2Settings initialSettings) {
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(),
initialSettings);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener,
Http2Settings initialSettings) {
Http2FrameWriter frameWriter, Http2Settings initialSettings) {
this.initialSettings = initialSettings;
encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, listener);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
}
public Http2ConnectionHandler(Http2ConnectionDecoder decoder,

View File

@ -36,33 +36,32 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
private final boolean validateHeaders;
private int currentStreamId;
public HttpToHttp2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(server, listener, true);
public HttpToHttp2ConnectionHandler(boolean server) {
this(server, true);
}
public HttpToHttp2ConnectionHandler(boolean server, Http2FrameListener listener, boolean validateHeaders) {
super(server, listener);
public HttpToHttp2ConnectionHandler(boolean server, boolean validateHeaders) {
super(server);
this.validateHeaders = validateHeaders;
}
public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, true);
public HttpToHttp2ConnectionHandler(Http2Connection connection) {
this(connection, true);
}
public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
boolean validateHeaders) {
super(connection, listener);
public HttpToHttp2ConnectionHandler(Http2Connection connection, boolean validateHeaders) {
super(connection);
this.validateHeaders = validateHeaders;
}
public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener) {
this(connection, frameReader, frameWriter, listener, true);
Http2FrameWriter frameWriter) {
this(connection, frameReader, frameWriter, true);
}
public HttpToHttp2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener, boolean validateHeaders) {
super(connection, frameReader, frameWriter, listener);
Http2FrameWriter frameWriter, boolean validateHeaders) {
super(connection, frameReader, frameWriter);
this.validateHeaders = validateHeaders;
}

View File

@ -299,9 +299,8 @@ public class DataCompressionHttp2Test {
Http2ConnectionEncoder encoder = new CompressorHttp2ConnectionEncoder(
new DefaultHttp2ConnectionEncoder(serverConnection, new DefaultHttp2FrameWriter()));
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(serverConnection, encoder, new DefaultHttp2FrameReader(),
new DelegatingDecompressorFrameListener(serverConnection,
serverListener));
new DefaultHttp2ConnectionDecoder(serverConnection, encoder, new DefaultHttp2FrameReader());
decoder.frameListener(new DelegatingDecompressorFrameListener(serverConnection, serverListener));
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(decoder, encoder);
p.addLast(connectionHandler);
serverChannelLatch.countDown();
@ -318,9 +317,8 @@ public class DataCompressionHttp2Test {
new DefaultHttp2ConnectionEncoder(clientConnection, new DefaultHttp2FrameWriter()));
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(clientConnection, clientEncoder,
new DefaultHttp2FrameReader(),
new DelegatingDecompressorFrameListener(clientConnection,
clientListener));
new DefaultHttp2FrameReader());
decoder.frameListener(new DelegatingDecompressorFrameListener(clientConnection, clientListener));
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
// By default tests don't wait for server to gracefully shutdown streams

View File

@ -150,8 +150,9 @@ public class DefaultHttp2ConnectionDecoderTest {
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader, listener);
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader);
decoder.lifecycleManager(lifecycleManager);
decoder.frameListener(listener);
// Simulate receiving the initial settings from the remote endpoint.
decode().onSettingsRead(ctx, new Http2Settings());

View File

@ -482,7 +482,9 @@ public class Http2ConnectionRoundtripTest {
serverFrameCountDown =
new FrameCountDown(serverListener, serverSettingsAckLatch,
requestLatch, dataLatch, trailersLatch, goAwayLatch);
p.addLast(new Http2ConnectionHandler(true, serverFrameCountDown, false));
Http2ConnectionHandler handler = new Http2ConnectionHandler(true, false);
handler.decoder().frameListener(serverFrameCountDown);
p.addLast(handler);
}
});
@ -492,7 +494,9 @@ public class Http2ConnectionRoundtripTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Http2ConnectionHandler(false, clientListener, false));
Http2ConnectionHandler handler = new Http2ConnectionHandler(false, false);
handler.decoder().frameListener(clientListener);
p.addLast(handler);
}
});

View File

@ -487,7 +487,9 @@ public class HttpToHttp2ConnectionHandlerTest {
ChannelPipeline p = ch.pipeline();
serverFrameCountDown =
new FrameCountDown(serverListener, serverSettingsAckLatch, requestLatch, null, trailersLatch);
p.addLast(new HttpToHttp2ConnectionHandler(true, serverFrameCountDown));
HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(true);
handler.decoder().frameListener(serverFrameCountDown);
p.addLast(handler);
}
});
@ -497,7 +499,8 @@ public class HttpToHttp2ConnectionHandlerTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(false, clientListener);
HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(false);
handler.decoder().frameListener(clientListener);
handler.gracefulShutdownTimeoutMillis(0);
p.addLast(handler);
}

View File

@ -110,8 +110,8 @@ public class StreamBufferingEncoderTest {
new DefaultHttp2ConnectionEncoder(connection, writer);
encoder = new StreamBufferingEncoder(defaultEncoder);
DefaultHttp2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(connection, encoder,
mock(Http2FrameReader.class), mock(Http2FrameListener.class));
new DefaultHttp2ConnectionDecoder(connection, encoder, mock(Http2FrameReader.class));
decoder.frameListener(mock(Http2FrameListener.class));
Http2ConnectionHandler handler = new Http2ConnectionHandler(decoder, encoder);
// Set LifeCycleManager on encoder and decoder

View File

@ -62,10 +62,8 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
public void initChannel(SocketChannel ch) throws Exception {
final Http2Connection connection = new DefaultHttp2Connection(false);
final Http2FrameWriter frameWriter = frameWriter();
connectionHandler = new HttpToHttp2ConnectionHandler(connection,
frameReader(),
frameWriter,
new DelegatingDecompressorFrameListener(connection,
connectionHandler = new HttpToHttp2ConnectionHandler(connection, frameReader(), frameWriter);
connectionHandler.decoder().frameListener(new DelegatingDecompressorFrameListener(connection,
new InboundHttp2ToHttpAdapter.Builder(connection)
.maxContentLength(maxContentLength)
.propagateSettings(true)

View File

@ -15,11 +15,6 @@
package io.netty.example.http2.helloworld.server;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.logging.LogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
@ -29,23 +24,30 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.logging.LogLevel.INFO;
/**
* A simple handler that responds with the message "Hello World!".
*/
public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
public class HelloWorldHttp2Handler extends Http2ConnectionHandler implements Http2FrameListener {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO, HelloWorldHttp2Handler.class);
static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8));
@ -53,13 +55,13 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
public HelloWorldHttp2Handler() {
this(new DefaultHttp2Connection(true), new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger(
new DefaultHttp2FrameWriter(), logger), new SimpleHttp2FrameListener());
new DefaultHttp2FrameWriter(), logger));
}
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, SimpleHttp2FrameListener listener) {
super(connection, frameReader, frameWriter, listener);
listener.encoder(encoder());
Http2FrameWriter frameWriter) {
super(connection, frameReader, frameWriter);
decoder().frameListener(this);
}
/**
@ -85,50 +87,86 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
ctx.close();
}
private static class SimpleHttp2FrameListener extends Http2FrameAdapter {
private Http2ConnectionEncoder encoder;
/**
* Sends a "Hello World" DATA frame to the client.
*/
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
encoder().writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
ctx.flush();
}
public void encoder(Http2ConnectionEncoder encoder) {
this.encoder = encoder;
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
int processed = data.readableBytes() + padding;
if (endOfStream) {
sendResponse(ctx, streamId, data.retain());
}
return processed;
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
int processed = data.readableBytes() + padding;
if (endOfStream) {
sendResponse(ctx, streamId, data.retain());
}
return processed;
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) throws Http2Exception {
if (endStream) {
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(HelloWorldHttp2Handler.RESPONSE_BYTES.duplicate());
ByteBufUtil.writeAscii(content, " - via HTTP/2");
sendResponse(ctx, streamId, content);
}
}
/**
* Sends a "Hello World" DATA frame to the client.
*/
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
encoder.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
encoder.writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
ctx.flush();
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
if (endOfStream) {
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(HelloWorldHttp2Handler.RESPONSE_BYTES.duplicate());
ByteBufUtil.writeAscii(content, " - via HTTP/2");
sendResponse(ctx, streamId, content);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers,
int padding) throws Http2Exception {
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) throws Http2Exception {
}
}

View File

@ -61,12 +61,14 @@ public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {
InboundHttp2ToHttpAdapter listener = new InboundHttp2ToHttpAdapter.Builder(connection)
.propagateSettings(true).validateHttpHeaders(false).maxContentLength(MAX_CONTENT_LENGTH).build();
ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(
HttpToHttp2ConnectionHandler handler = new HttpToHttp2ConnectionHandler(
connection,
// Loggers can be activated for debugging purposes
// new Http2InboundFrameLogger(reader, TilesHttp2ToHttpHandler.logger),
// new Http2OutboundFrameLogger(writer, TilesHttp2ToHttpHandler.logger)
reader, writer, listener));
reader, writer);
handler.decoder().frameListener(listener);
ctx.pipeline().addLast(handler);
ctx.pipeline().addLast(new Http2RequestHandler());
}

View File

@ -262,8 +262,8 @@ public class Http2FrameWriterBenchmark extends AbstractSharedExecutorMicrobenchm
environment.writer(new DefaultHttp2FrameWriter());
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, environment.writer());
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader(),
new Http2FrameAdapter());
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader());
decoder.frameListener(new Http2FrameAdapter());
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(decoder, encoder);
p.addLast(connectionHandler);
environment.context(p.lastContext());
@ -291,8 +291,8 @@ public class Http2FrameWriterBenchmark extends AbstractSharedExecutorMicrobenchm
final Http2Connection connection = new DefaultHttp2Connection(false);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, env.writer());
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader(),
new Http2FrameAdapter());
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader());
decoder.frameListener(new Http2FrameAdapter());
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandler(decoder, encoder);
env.context(new EmbeddedChannelWriteReleaseHandlerContext(alloc, connectionHandler) {
@Override