Add working HttpSnoop example

This commit is contained in:
Chris Vest 2021-03-06 11:18:14 +01:00
parent bf80061335
commit 602389712c
10 changed files with 944 additions and 72 deletions

View File

@ -24,7 +24,7 @@ ENV PATH=/home/build/apache-maven-3.6.3/bin:$PATH
# Prepare a snapshot of Netty 5
RUN git clone -b master https://github.com/netty/netty.git netty
WORKDIR /home/build/netty
RUN mvn install -DskipTests -T1C -B -am -pl buffer,handler
RUN mvn install -DskipTests -T1C -B -am
WORKDIR /home/build
# Prepare our own build

View File

@ -400,6 +400,12 @@
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>

View File

@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.util.ByteProcessor;
import io.netty.util.IllegalReferenceCountException;
import java.io.IOException;
import java.io.InputStream;
@ -32,6 +33,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;
public final class ByteBufAdaptor extends ByteBuf {
private final ByteBufAllocatorAdaptor alloc;
@ -138,7 +140,8 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
return readerIndex(readerIndex).writerIndex(writerIndex);
buffer.reset().writerOffset(writerIndex).readerOffset(readerIndex);
return this;
}
@Override
@ -194,13 +197,31 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
buffer.ensureWritable(minWritableBytes);
try {
if (writableBytes() < minWritableBytes) {
int borrows = buffer.countBorrows();
if (borrows == 0) {
// Good place.
buffer.ensureWritable(minWritableBytes);
} else {
// Highly questionable place, but ByteBuf technically allows this, so we have to emulate.
release(borrows);
try {
buffer.ensureWritable(minWritableBytes);
} finally {
retain(borrows);
}
}
}
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
return this;
}
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
buffer.ensureWritable(minWritableBytes);
ensureWritable(minWritableBytes);
return minWritableBytes;
}
@ -211,17 +232,29 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public byte getByte(int index) {
return buffer.getByte(index);
try {
return buffer.getByte(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public short getUnsignedByte(int index) {
return (short) buffer.getUnsignedByte(index);
try {
return (short) buffer.getUnsignedByte(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public short getShort(int index) {
return buffer.getShort(index);
try {
return buffer.getShort(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -229,6 +262,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getShort(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -236,7 +271,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int getUnsignedShort(int index) {
return buffer.getUnsignedShort(index);
try {
return buffer.getUnsignedShort(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -244,6 +283,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedShort(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -251,7 +292,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int getMedium(int index) {
return buffer.getMedium(index);
try {
return buffer.getMedium(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -259,6 +304,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getMedium(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -266,7 +313,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int getUnsignedMedium(int index) {
return buffer.getUnsignedMedium(index);
try {
return buffer.getUnsignedMedium(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -274,6 +325,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedMedium(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -281,7 +334,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int getInt(int index) {
return buffer.getInt(index);
try {
return buffer.getInt(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -289,6 +346,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getInt(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -296,7 +355,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public long getUnsignedInt(int index) {
return buffer.getUnsignedInt(index);
try {
return buffer.getUnsignedInt(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -304,6 +367,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getUnsignedInt(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -311,7 +376,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public long getLong(int index) {
return buffer.getLong(index);
try {
return buffer.getLong(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -319,6 +388,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).getLong(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -326,17 +397,29 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public char getChar(int index) {
return buffer.getChar(index);
try {
return buffer.getChar(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public float getFloat(int index) {
return buffer.getFloat(index);
try {
return buffer.getFloat(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public double getDouble(int index) {
return buffer.getDouble(index);
try {
return buffer.getDouble(index);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -421,14 +504,22 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setByte(int index, int value) {
buffer.setByte(index, (byte) value);
try {
buffer.setByte(index, (byte) value);
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
return this;
}
@Override
public ByteBuf setShort(int index, int value) {
buffer.setShort(index, (short) value);
return this;
try {
buffer.setShort(index, (short) value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -437,6 +528,8 @@ public final class ByteBufAdaptor extends ByteBuf {
try {
buffer.order(ByteOrder.LITTLE_ENDIAN).setShort(index, (short) value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -444,8 +537,12 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setMedium(int index, int value) {
buffer.setMedium(index, value);
return this;
try {
buffer.setMedium(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -454,6 +551,8 @@ public final class ByteBufAdaptor extends ByteBuf {
try {
buffer.order(ByteOrder.LITTLE_ENDIAN).setMedium(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -461,8 +560,12 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setInt(int index, int value) {
buffer.setInt(index, value);
return this;
try {
buffer.setInt(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -471,6 +574,8 @@ public final class ByteBufAdaptor extends ByteBuf {
try {
buffer.order(ByteOrder.LITTLE_ENDIAN).setInt(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -478,8 +583,12 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setLong(int index, long value) {
buffer.setLong(index, value);
return this;
try {
buffer.setLong(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -488,6 +597,8 @@ public final class ByteBufAdaptor extends ByteBuf {
try {
buffer.order(ByteOrder.LITTLE_ENDIAN).setLong(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -495,24 +606,39 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setChar(int index, int value) {
buffer.setChar(index, (char) value);
return this;
try {
buffer.setChar(index, (char) value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public ByteBuf setFloat(int index, float value) {
buffer.setFloat(index, value);
return this;
try {
buffer.setFloat(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public ByteBuf setDouble(int index, double value) {
buffer.setDouble(index, value);
return this;
try {
buffer.setDouble(index, value);
return this;
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public ByteBuf setBytes(int index, ByteBuf src) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
while (src.isReadable() && index < capacity()) {
setByte(index++, src.readByte());
}
@ -606,17 +732,29 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public byte readByte() {
return buffer.readByte();
try {
return buffer.readByte();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public short readUnsignedByte() {
return (short) buffer.readUnsignedByte();
try {
return (short) buffer.readUnsignedByte();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public short readShort() {
return buffer.readShort();
try {
return buffer.readShort();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -624,6 +762,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readShort();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -631,7 +771,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int readUnsignedShort() {
return buffer.readUnsignedShort();
try {
return buffer.readUnsignedShort();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -639,6 +783,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedShort();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -646,7 +792,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int readMedium() {
return buffer.readMedium();
try {
return buffer.readMedium();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -654,6 +804,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readMedium();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -661,7 +813,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int readUnsignedMedium() {
return buffer.readUnsignedMedium();
try {
return buffer.readUnsignedMedium();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -669,6 +825,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedMedium();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -676,7 +834,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int readInt() {
return buffer.readInt();
try {
return buffer.readInt();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -684,6 +846,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readInt();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -691,7 +855,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public long readUnsignedInt() {
return buffer.readUnsignedInt();
try {
return buffer.readUnsignedInt();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -699,6 +867,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readUnsignedInt();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -706,7 +876,11 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public long readLong() {
return buffer.readLong();
try {
return buffer.readLong();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -714,6 +888,8 @@ public final class ByteBufAdaptor extends ByteBuf {
ByteOrder originalOrder = buffer.order();
try {
return buffer.order(ByteOrder.LITTLE_ENDIAN).readLong();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
} finally {
buffer.order(originalOrder);
}
@ -721,17 +897,29 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public char readChar() {
return buffer.readChar();
try {
return buffer.readChar();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public float readFloat() {
return buffer.readFloat();
try {
return buffer.readFloat();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public double readDouble() {
return buffer.readDouble();
try {
return buffer.readDouble();
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
@ -743,12 +931,16 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf readSlice(int length) {
return readRetainedSlice(length);
ByteBuf slice = readRetainedSlice(length);
release();
return slice;
}
@Override
public ByteBuf readRetainedSlice(int length) {
return slice(readerIndex(), length);
ByteBuf slice = retainedSlice(readerIndex(), length);
buffer.readerOffset(buffer.readerOffset() + length);
return slice;
}
@Override
@ -1021,7 +1213,25 @@ public final class ByteBufAdaptor extends ByteBuf {
components[i] = component.writableBuffer();
return true;
});
int read = (int) in.read(components);
int read;
if (isDirect()) {
// TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native
// memory segments to be used in IO operations.
ByteBuffer[] copies = new ByteBuffer[components.length];
for (int i = 0; i < copies.length; i++) {
copies[i] = ByteBuffer.allocateDirect(components[i].remaining());
}
read = (int) in.read(copies);
for (int i = 0; i < copies.length; i++) {
ByteBuffer copy = copies[i];
ByteBuffer component = components[i];
component.put(copy.flip()).flip();
}
} else {
read = (int) in.read(components);
}
if (read > 0) {
writerIndex(read + writerIndex());
}
@ -1078,76 +1288,119 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int bytesBefore(byte value) {
return indexOf(readerIndex(), writerIndex(), value);
return bytesBefore(readerIndex(), writerIndex(), value);
}
@Override
public int bytesBefore(int length, byte value) {
return indexOf(readerIndex(), readerIndex() + length, value);
return bytesBefore(readerIndex(), readerIndex() + length, value);
}
@Override
public int bytesBefore(int index, int length, byte value) {
return indexOf(index, index + length, value);
int i = indexOf(index, index + length, value);
if (i != -1) {
i -= index;
}
return i;
}
@Override
public int forEachByte(ByteProcessor processor) {
return buffer.openCursor().process(processor);
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
int index = readerIndex();
int bytes = buffer.openCursor().process(processor);
return bytes == -1 ? -1 : index + bytes;
}
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
return buffer.openCursor(index, length).process(processor);
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
int bytes = buffer.openCursor(index, length).process(processor);
return bytes == -1 ? -1 : index + bytes;
}
@Override
public int forEachByteDesc(ByteProcessor processor) {
return buffer.openReverseCursor().process(processor);
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
int index = readerIndex();
int bytes = buffer.openReverseCursor().process(processor);
return bytes == -1 ? -1 : index - bytes;
}
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
return buffer.openReverseCursor(index, length).process(processor);
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
int bytes = buffer.openReverseCursor(index, length).process(processor);
return bytes == -1 ? -1 : index - bytes;
}
@Override
public ByteBuf copy() {
return copy(0, capacity());
return copy(readerIndex(), readableBytes());
}
@Override
public ByteBuf copy(int index, int length) {
BufferAllocator allocator = preferredBufferAllocator();
Buffer copy = allocator.allocate(length);
buffer.copyInto(index, copy, 0, length);
copy.order(buffer.order());
return wrap(copy).setIndex(readerIndex(), writerIndex());
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
try {
BufferAllocator allocator = preferredBufferAllocator();
Buffer copy = allocator.allocate(length);
buffer.copyInto(index, copy, 0, length);
copy.order(buffer.order());
copy.writerOffset(length);
return wrap(copy);
} catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException(e.getMessage());
}
}
@Override
public ByteBuf slice() {
return retainedSlice();
ByteBuf slice = retainedSlice();
release();
return slice;
}
@Override
public ByteBuf retainedSlice() {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
return wrap(buffer.slice());
}
@Override
public ByteBuf slice(int index, int length) {
return retainedSlice(index, length);
ByteBuf slice = retainedSlice(index, length);
release();
return slice;
}
@Override
public ByteBuf retainedSlice(int index, int length) {
return wrap(buffer.slice(index, length));
try {
return wrap(buffer.slice(index, length));
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
}
@Override
public ByteBuf duplicate() {
return retainedDuplicate();
ByteBuf duplicate = retainedDuplicate();
release();
return duplicate;
}
@Override
@ -1157,32 +1410,81 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int nioBufferCount() {
return -1;
return 1;
}
@Override
public ByteBuffer nioBuffer() {
throw new UnsupportedOperationException("Cannot create shared NIO buffer.");
return nioBuffer(readerIndex(), readableBytes());
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return nioBuffer();
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
ByteBuffer copy = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
while (index < length) {
copy.put(getByte(index++));
}
return copy.flip();
}
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
return nioBuffer();
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
if (readerIndex() <= index && index < writerIndex() && length <= readableBytes()) {
// We wish to read from the internal buffer.
if (buffer.countReadableComponents() != 1) {
throw new UnsupportedOperationException(
"Unsupported number of readable components: " + buffer.countReadableComponents() + '.');
}
AtomicReference<ByteBuffer> bufRef = new AtomicReference<>();
buffer.forEachReadable(0, (i, component) -> {
bufRef.set(component.readableBuffer());
return false;
});
ByteBuffer buffer = bufRef.get();
if (index != readerIndex() || length != readableBytes()) {
buffer = buffer.slice(index - readerIndex(), length);
}
return buffer;
} else if (writerIndex() <= index && length <= writableBytes()) {
// We wish to write to the internal buffer.
if (buffer.countWritableComponents() != 1) {
throw new UnsupportedOperationException(
"Unsupported number of writable components: " + buffer.countWritableComponents() + '.');
}
AtomicReference<ByteBuffer> bufRef = new AtomicReference<>();
buffer.forEachWritable(0, (i, component) -> {
bufRef.set(component.writableBuffer());
return false;
});
ByteBuffer buffer = bufRef.get();
if (index != writerIndex() || length != writableBytes()) {
buffer = buffer.slice(index - writerIndex(), length);
}
return buffer;
} else {
String message = "Cannot provide internal NIO buffer for range from " + index + " for length " + length +
", when writerIndex() is " + writerIndex() + " and writable bytes are " + writableBytes() +
", and readerIndex() is " + readerIndex() + " and readable bytes are " + readableBytes() +
". The requested range must fall within EITHER the readable area OR the writable area. " +
"Straddling the two areas, or reaching outside of their bounds, is not allowed.";
throw new UnsupportedOperationException(message);
}
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException("Cannot create shared NIO buffers.");
return new ByteBuffer[] { nioBuffer() };
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return nioBuffers();
return new ByteBuffer[] { internalNioBuffer(index, length) };
}
@Override

View File

@ -98,9 +98,7 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable
@Override
public ByteBuf directBuffer(int initialCapacity) {
// TODO we cannot use off-heap buffers here, until the JDK allows direct byte buffers based on native
// memory segments to be used in IO operations.
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
return new ByteBufAdaptor(this, offheap.allocate(initialCapacity));
}
@Override

View File

@ -0,0 +1,114 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.net.URI;
/**
* A simple HTTP client that prints out the content of the HTTP response to
* {@link System#out} to test {@link HttpSnoopServer}.
*/
public final class HttpSnoopClient {
static final String URL = System.getProperty("url", "http://127.0.0.1:8080/");
public static void main(String[] args) throws Exception {
URI uri = new URI(URL);
String scheme = uri.getScheme() == null? "http" : uri.getScheme();
String host = uri.getHost() == null? "127.0.0.1" : uri.getHost();
int port = uri.getPort();
if (port == -1) {
if ("http".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("https".equalsIgnoreCase(scheme)) {
port = 443;
}
}
if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
System.err.println("Only HTTP(S) is supported.");
return;
}
// Configure SSL context if necessary.
final boolean ssl = "https".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.ALLOCATOR, allocator)
.channel(NioSocketChannel.class)
.handler(new HttpSnoopClientInitializer(sslCtx, allocator));
// Make the connection attempt.
Channel ch = b.connect(host, port).sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath(), Unpooled.EMPTY_BUFFER);
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
// Set some example cookies.
request.headers().set(
HttpHeaderNames.COOKIE,
ClientCookieEncoder.STRICT.encode(
new DefaultCookie("my-cookie", "foo"),
new DefaultCookie("another-cookie", "bar")));
// Send the HTTP request.
ch.writeAndFlush(request);
// Wait for the server to close the connection.
ch.closeFuture().sync();
} finally {
// Shut down executor threads to exit.
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil;
public class HttpSnoopClientHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
System.err.println("STATUS: " + response.status());
System.err.println("VERSION: " + response.protocolVersion());
System.err.println();
if (!response.headers().isEmpty()) {
for (CharSequence name: response.headers().names()) {
for (CharSequence value: response.headers().getAll(name)) {
System.err.println("HEADER: " + name + " = " + value);
}
}
System.err.println();
}
if (HttpUtil.isTransferEncodingChunked(response)) {
System.err.println("CHUNKED CONTENT {");
} else {
System.err.println("CONTENT {");
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
System.err.print(content.content().toString(CharsetUtil.UTF_8));
System.err.flush();
if (content instanceof LastHttpContent) {
System.err.println("} END OF CONTENT");
ctx.close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.ssl.SslContext;
public class HttpSnoopClientInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final ByteBufAllocator allocator;
public HttpSnoopClientInitializer(SslContext sslCtx, ByteBufAllocator allocator) {
this.sslCtx = sslCtx;
this.allocator = allocator;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
// `initChannel` runs before the channel options have been applied, so we need to configure
// the handler with an allocator that we get passed via the constructor.
p.addLast(sslCtx.newHandler(allocator));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpSnoopClientHandler());
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* An HTTP server that sends back the content of the received HTTP request
* in a pretty plaintext form.
*/
public final class HttpSnoopServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
ByteBufAllocatorAdaptor allocator = new ByteBufAllocatorAdaptor();
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.ALLOCATOR, allocator)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpSnoopServerInitializer(sslCtx, allocator));
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,199 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class HttpSnoopServerHandler extends SimpleChannelInboundHandler<Object> {
private HttpRequest request;
/** Buffer that stores the response content */
private final StringBuilder buf = new StringBuilder();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
buf.setLength(0);
buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n");
buf.append("===================================\r\n");
buf.append("VERSION: ").append(request.protocolVersion()).append("\r\n");
buf.append("HOSTNAME: ").append(request.headers().get(HttpHeaderNames.HOST, "unknown")).append("\r\n");
buf.append("REQUEST_URI: ").append(request.uri()).append("\r\n\r\n");
HttpHeaders headers = request.headers();
if (!headers.isEmpty()) {
for (Entry<String, String> h: headers) {
CharSequence key = h.getKey();
CharSequence value = h.getValue();
buf.append("HEADER: ").append(key).append(" = ").append(value).append("\r\n");
}
buf.append("\r\n");
}
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
Map<String, List<String>> params = queryStringDecoder.parameters();
if (!params.isEmpty()) {
for (Entry<String, List<String>> p: params.entrySet()) {
String key = p.getKey();
List<String> vals = p.getValue();
for (String val : vals) {
buf.append("PARAM: ").append(key).append(" = ").append(val).append("\r\n");
}
}
buf.append("\r\n");
}
appendDecoderResult(buf, request);
}
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
if (content.isReadable()) {
buf.append("CONTENT: ");
buf.append(content.toString(CharsetUtil.UTF_8));
buf.append("\r\n");
appendDecoderResult(buf, request);
}
if (msg instanceof LastHttpContent) {
buf.append("END OF CONTENT\r\n");
LastHttpContent trailer = (LastHttpContent) msg;
if (!trailer.trailingHeaders().isEmpty()) {
buf.append("\r\n");
for (CharSequence name: trailer.trailingHeaders().names()) {
for (CharSequence value: trailer.trailingHeaders().getAll(name)) {
buf.append("TRAILING HEADER: ");
buf.append(name).append(" = ").append(value).append("\r\n");
}
}
buf.append("\r\n");
}
if (!writeResponse(trailer, ctx)) {
// If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
}
private static void appendDecoderResult(StringBuilder buf, HttpObject o) {
DecoderResult result = o.decoderResult();
if (result.isSuccess()) {
return;
}
buf.append(".. WITH DECODER FAILURE: ");
buf.append(result.cause());
buf.append("\r\n");
}
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - https://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE, Unpooled.EMPTY_BUFFER);
ctx.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples.http.snoop;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;
public class HttpSnoopServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final ByteBufAllocator allocator;
public HttpSnoopServerInitializer(SslContext sslCtx, ByteBufAllocator allocator) {
this.sslCtx = sslCtx;
this.allocator = allocator;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
ch.alloc();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(allocator));
}
p.addLast(new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
//p.addLast(new HttpContentCompressor());
p.addLast(new HttpSnoopServerHandler());
}
}