Fix buffer leaks in HTTP2 codec
Motivation: Our ci showed some buffer leaks in the http2 codec. This commit fixes all of these. Modifications: * Correctly release buffers in DefaultHttp2FrameWriter * Fix buffer leaks in tests Result: Tests pass without leaks
This commit is contained in:
parent
ca7c53d45f
commit
3080d14f25
@ -98,6 +98,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
|||||||
return ctx.writeAndFlush(out, promise);
|
return ctx.writeAndFlush(out, promise);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw failAndThrow(promise, e);
|
throw failAndThrow(promise, e);
|
||||||
|
} finally {
|
||||||
|
data.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -220,6 +222,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
|||||||
return ctx.writeAndFlush(frame, promise);
|
return ctx.writeAndFlush(frame, promise);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw failAndThrow(promise, e);
|
throw failAndThrow(promise, e);
|
||||||
|
} finally {
|
||||||
|
data.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,6 +301,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
|||||||
return ctx.writeAndFlush(frame, promise);
|
return ctx.writeAndFlush(frame, promise);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw failAndThrow(promise, e);
|
throw failAndThrow(promise, e);
|
||||||
|
} finally {
|
||||||
|
debugData.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,6 +357,8 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
|||||||
return ctx.writeAndFlush(frame, promise);
|
return ctx.writeAndFlush(frame, promise);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw failAndThrow(promise, e);
|
throw failAndThrow(promise, e);
|
||||||
|
} finally {
|
||||||
|
protocolId.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
|
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
@ -77,7 +78,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
@Test
|
@Test
|
||||||
public void dataShouldRoundtrip() throws Exception {
|
public void dataShouldRoundtrip() throws Exception {
|
||||||
ByteBuf data = dummyData();
|
ByteBuf data = dummyData();
|
||||||
writer.writeData(ctx, promise, 1000, data, 0, false, false, false);
|
writer.writeData(ctx, promise, 1000, data.retain().duplicate(), 0, false, false, false);
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
verify(observer).onDataRead(eq(1000), eq(data), eq(0), eq(false), eq(false), eq(false));
|
verify(observer).onDataRead(eq(1000), eq(data), eq(0), eq(false), eq(false), eq(false));
|
||||||
@ -86,7 +87,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
@Test
|
@Test
|
||||||
public void dataWithPaddingShouldRoundtrip() throws Exception {
|
public void dataWithPaddingShouldRoundtrip() throws Exception {
|
||||||
ByteBuf data = dummyData();
|
ByteBuf data = dummyData();
|
||||||
writer.writeData(ctx, promise, 1, data, 256, true, true, true);
|
writer.writeData(ctx, promise, 1, data.retain().duplicate(), 256, true, true, true);
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
verify(observer).onDataRead(eq(1), eq(data), eq(256), eq(true), eq(true), eq(true));
|
verify(observer).onDataRead(eq(1), eq(data), eq(256), eq(true), eq(true), eq(true));
|
||||||
@ -142,7 +143,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
@Test
|
@Test
|
||||||
public void pingShouldRoundtrip() throws Exception {
|
public void pingShouldRoundtrip() throws Exception {
|
||||||
ByteBuf data = dummyData();
|
ByteBuf data = dummyData();
|
||||||
writer.writePing(ctx, promise, false, data);
|
writer.writePing(ctx, promise, false, data.retain().duplicate());
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
verify(observer).onPingRead(eq(data));
|
verify(observer).onPingRead(eq(data));
|
||||||
@ -151,7 +152,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
@Test
|
@Test
|
||||||
public void pingAckShouldRoundtrip() throws Exception {
|
public void pingAckShouldRoundtrip() throws Exception {
|
||||||
ByteBuf data = dummyData();
|
ByteBuf data = dummyData();
|
||||||
writer.writePing(ctx, promise, true, data);
|
writer.writePing(ctx, promise, true, data.retain().duplicate());
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
verify(observer).onPingAckRead(eq(data));
|
verify(observer).onPingAckRead(eq(data));
|
||||||
@ -160,7 +161,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
@Test
|
@Test
|
||||||
public void goAwayShouldRoundtrip() throws Exception {
|
public void goAwayShouldRoundtrip() throws Exception {
|
||||||
ByteBuf data = dummyData();
|
ByteBuf data = dummyData();
|
||||||
writer.writeGoAway(ctx, promise, 1, MAX_UNSIGNED_INT, data);
|
writer.writeGoAway(ctx, promise, 1, MAX_UNSIGNED_INT, data.retain().duplicate());
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
verify(observer).onGoAwayRead(eq(1), eq(MAX_UNSIGNED_INT), eq(data));
|
verify(observer).onGoAwayRead(eq(1), eq(MAX_UNSIGNED_INT), eq(data));
|
||||||
@ -203,7 +204,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void emptyHeadersShouldRoundtrip() throws Exception {
|
public void emptyHeadersShouldRoundtrip() throws Exception {
|
||||||
Http2Headers headers = DefaultHttp2Headers.EMPTY_HEADERS;
|
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
|
||||||
writer.writeHeaders(ctx, promise, 1, headers, 0, true, true);
|
writer.writeHeaders(ctx, promise, 1, headers, 0, true, true);
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
@ -212,7 +213,7 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception {
|
public void emptyHeadersWithPaddingShouldRoundtrip() throws Exception {
|
||||||
Http2Headers headers = DefaultHttp2Headers.EMPTY_HEADERS;
|
Http2Headers headers = Http2Headers.EMPTY_HEADERS;
|
||||||
writer.writeHeaders(ctx, promise, 1, headers, 256, true, true);
|
writer.writeHeaders(ctx, promise, 1, headers, 256, true, true);
|
||||||
ByteBuf frame = captureWrite();
|
ByteBuf frame = captureWrite();
|
||||||
reader.readFrame(alloc, frame, observer);
|
reader.readFrame(alloc, frame, observer);
|
||||||
@ -329,15 +330,15 @@ public class DefaultHttp2FrameIOTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuf dummyData() {
|
private ByteBuf dummyData() {
|
||||||
return alloc.buffer().writeBytes("abcdefgh".getBytes(CharsetUtil.UTF_8));
|
return ReferenceCountUtil.releaseLater(alloc.buffer().writeBytes("abcdefgh".getBytes(CharsetUtil.UTF_8)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Http2Headers dummyHeaders() {
|
private static Http2Headers dummyHeaders() {
|
||||||
return DefaultHttp2Headers.newBuilder().method("GET").scheme("https")
|
return DefaultHttp2Headers.newBuilder().method("GET").scheme("https")
|
||||||
.authority("example.org").path("/some/path").add("accept", "*/*").build();
|
.authority("example.org").path("/some/path").add("accept", "*/*").build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Http2Headers largeHeaders() {
|
private static Http2Headers largeHeaders() {
|
||||||
DefaultHttp2Headers.Builder builder = DefaultHttp2Headers.newBuilder();
|
DefaultHttp2Headers.Builder builder = DefaultHttp2Headers.newBuilder();
|
||||||
for (int i = 0; i < 100; ++i) {
|
for (int i = 0; i < 100; ++i) {
|
||||||
String key = "this-is-a-test-header-key-" + i;
|
String key = "this-is-a-test-header-key-" + i;
|
||||||
|
@ -113,11 +113,13 @@ public class DefaultHttp2InboundFlowControllerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void applyFlowControl(int dataSize, boolean endOfStream) throws Http2Exception {
|
private void applyFlowControl(int dataSize, boolean endOfStream) throws Http2Exception {
|
||||||
controller.applyInboundFlowControl(STREAM_ID, dummyData(dataSize), 0, endOfStream, false,
|
ByteBuf buf = dummyData(dataSize);
|
||||||
|
controller.applyInboundFlowControl(STREAM_ID, buf, 0, endOfStream, false,
|
||||||
false, frameWriter);
|
false, frameWriter);
|
||||||
|
buf.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuf dummyData(int size) {
|
private static ByteBuf dummyData(int size) {
|
||||||
ByteBuf buffer = Unpooled.buffer(size);
|
ByteBuf buffer = Unpooled.buffer(size);
|
||||||
buffer.writerIndex(size);
|
buffer.writerIndex(size);
|
||||||
return buffer;
|
return buffer;
|
||||||
|
@ -577,9 +577,9 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
|||||||
verify(reader).maxHeaderTableSize(2000);
|
verify(reader).maxHeaderTableSize(2000);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuf dummyData() {
|
private static ByteBuf dummyData() {
|
||||||
// The buffer is purposely 8 bytes so it will even work for a ping frame.
|
// The buffer is purposely 8 bytes so it will even work for a ping frame.
|
||||||
return Unpooled.copiedBuffer("abcdefgh", UTF_8);
|
return Unpooled.wrappedBuffer("abcdefgh".getBytes(UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockContext() {
|
private void mockContext() {
|
||||||
|
@ -115,7 +115,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
|
|
||||||
frameWriter.writeData(ctx(), newPromise(), 0x7FFFFFFF,
|
frameWriter.writeData(ctx(), newPromise(), 0x7FFFFFFF,
|
||||||
Unpooled.copiedBuffer(text.getBytes()), 100, true, false, false);
|
Unpooled.copiedBuffer(text.getBytes()), 100, true, false, false);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onDataRead(eq(0x7FFFFFFF), dataCaptor.capture(), eq(100), eq(true),
|
verify(serverObserver).onDataRead(eq(0x7FFFFFFF), dataCaptor.capture(), eq(100), eq(true),
|
||||||
@ -128,7 +127,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
|
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
|
||||||
.authority("example.org").path("/some/path/resource2").build();
|
.authority("example.org").path("/some/path/resource2").build();
|
||||||
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 0, true, false);
|
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 0, true, false);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onHeadersRead(eq(0x7FFFFFFF), eq(headers), eq(0), eq(true),
|
verify(serverObserver).onHeadersRead(eq(0x7FFFFFFF), eq(headers), eq(0), eq(true),
|
||||||
@ -142,7 +140,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
.authority("example.org").path("/some/path/resource2").build();
|
.authority("example.org").path("/some/path/resource2").build();
|
||||||
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255, true, 0,
|
frameWriter.writeHeaders(ctx(), newPromise(), 0x7FFFFFFF, headers, 4, (short) 255, true, 0,
|
||||||
true, false);
|
true, false);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onHeadersRead(eq(0x7FFFFFFF), eq(headers), eq(4), eq((short) 255),
|
verify(serverObserver).onHeadersRead(eq(0x7FFFFFFF), eq(headers), eq(4), eq((short) 255),
|
||||||
@ -154,7 +151,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
String text = "test";
|
String text = "test";
|
||||||
frameWriter.writeGoAway(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL,
|
frameWriter.writeGoAway(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL,
|
||||||
Unpooled.copiedBuffer(text.getBytes()));
|
Unpooled.copiedBuffer(text.getBytes()));
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onGoAwayRead(eq(0x7FFFFFFF), eq(0xFFFFFFFFL), dataCaptor.capture());
|
verify(serverObserver).onGoAwayRead(eq(0x7FFFFFFF), eq(0xFFFFFFFFL), dataCaptor.capture());
|
||||||
@ -164,7 +160,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
public void pingFrameShouldMatch() throws Exception {
|
public void pingFrameShouldMatch() throws Exception {
|
||||||
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
|
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
|
||||||
frameWriter.writePing(ctx(), ctx().newPromise(), true, buf);
|
frameWriter.writePing(ctx(), ctx().newPromise(), true, buf);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onPingAckRead(dataCaptor.capture());
|
verify(serverObserver).onPingAckRead(dataCaptor.capture());
|
||||||
@ -173,7 +168,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
@Test
|
@Test
|
||||||
public void priorityFrameShouldMatch() throws Exception {
|
public void priorityFrameShouldMatch() throws Exception {
|
||||||
frameWriter.writePriority(ctx(), newPromise(), 0x7FFFFFFF, 1, (short) 1, true);
|
frameWriter.writePriority(ctx(), newPromise(), 0x7FFFFFFF, 1, (short) 1, true);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onPriorityRead(eq(0x7FFFFFFF), eq(1), eq((short) 1), eq(true));
|
verify(serverObserver).onPriorityRead(eq(0x7FFFFFFF), eq(1), eq((short) 1), eq(true));
|
||||||
@ -185,7 +179,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
|
new DefaultHttp2Headers.Builder().method("GET").scheme("https")
|
||||||
.authority("example.org").path("/some/path/resource2").build();
|
.authority("example.org").path("/some/path/resource2").build();
|
||||||
frameWriter.writePushPromise(ctx(), newPromise(), 0x7FFFFFFF, 1, headers, 5);
|
frameWriter.writePushPromise(ctx(), newPromise(), 0x7FFFFFFF, 1, headers, 5);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onPushPromiseRead(eq(0x7FFFFFFF), eq(1), eq(headers), eq(5));
|
verify(serverObserver).onPushPromiseRead(eq(0x7FFFFFFF), eq(1), eq(headers), eq(5));
|
||||||
@ -194,7 +187,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
@Test
|
@Test
|
||||||
public void rstStreamFrameShouldMatch() throws Exception {
|
public void rstStreamFrameShouldMatch() throws Exception {
|
||||||
frameWriter.writeRstStream(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL);
|
frameWriter.writeRstStream(ctx(), newPromise(), 0x7FFFFFFF, 0xFFFFFFFFL);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onRstStreamRead(eq(0x7FFFFFFF), eq(0xFFFFFFFFL));
|
verify(serverObserver).onRstStreamRead(eq(0x7FFFFFFF), eq(0xFFFFFFFFL));
|
||||||
@ -208,7 +200,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
settings.maxConcurrentStreams(1000);
|
settings.maxConcurrentStreams(1000);
|
||||||
settings.maxHeaderTableSize(4096);
|
settings.maxHeaderTableSize(4096);
|
||||||
frameWriter.writeSettings(ctx(), newPromise(), settings);
|
frameWriter.writeSettings(ctx(), newPromise(), settings);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onSettingsRead(eq(settings));
|
verify(serverObserver).onSettingsRead(eq(settings));
|
||||||
@ -217,7 +208,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
@Test
|
@Test
|
||||||
public void windowUpdateFrameShouldMatch() throws Exception {
|
public void windowUpdateFrameShouldMatch() throws Exception {
|
||||||
frameWriter.writeWindowUpdate(ctx(), newPromise(), 0x7FFFFFFF, 0x7FFFFFFF);
|
frameWriter.writeWindowUpdate(ctx(), newPromise(), 0x7FFFFFFF, 0x7FFFFFFF);
|
||||||
flush();
|
|
||||||
|
|
||||||
awaitRequests();
|
awaitRequests();
|
||||||
verify(serverObserver).onWindowUpdateRead(eq(0x7FFFFFFF), eq(0x7FFFFFFF));
|
verify(serverObserver).onWindowUpdateRead(eq(0x7FFFFFFF), eq(0x7FFFFFFF));
|
||||||
@ -238,7 +228,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
false, false);
|
false, false);
|
||||||
frameWriter.writeData(ctx(), newPromise(), i, Unpooled.copiedBuffer(text.getBytes()),
|
frameWriter.writeData(ctx(), newPromise(), i, Unpooled.copiedBuffer(text.getBytes()),
|
||||||
0, true, true, false);
|
0, true, true, false);
|
||||||
flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all frames to be received.
|
// Wait for all frames to be received.
|
||||||
@ -257,10 +246,6 @@ public class Http2FrameRoundtripTest {
|
|||||||
return ctx().newPromise();
|
return ctx().newPromise();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flush() {
|
|
||||||
ctx().flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class FrameAdapter extends ByteToMessageDecoder {
|
private final class FrameAdapter extends ByteToMessageDecoder {
|
||||||
|
|
||||||
private final Http2FrameObserver observer;
|
private final Http2FrameObserver observer;
|
||||||
|
Loading…
Reference in New Issue
Block a user