CompositeBuffer#split() should correctly set offsets (#11671)

__Motivation__

While computing offsets from within `CompositeBuffer#split()`, we do not consider a case when the constituents `buffer` array is empty but existing read/write offsets are non-zero. This is possible when the buffer is full.

__Modification__

Correctly set offsets even for the aforementioned case.

__Result__

Read/write offsets are correctly set while splitting composite buffer.
This commit is contained in:
Nitesh Kant 2021-09-10 12:33:16 -07:00 committed by GitHub
parent efd576e43e
commit 43f3956030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 17 deletions

View File

@ -276,9 +276,9 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
} }
private void computeBufferOffsets() { private void computeBufferOffsets() {
if (bufs.length > 0) {
int woff = 0; int woff = 0;
int roff = 0; int roff = 0;
if (bufs.length > 0) {
boolean woffMidpoint = false; boolean woffMidpoint = false;
for (Buffer buf : bufs) { for (Buffer buf : bufs) {
if (buf.writableBytes() == 0) { if (buf.writableBytes() == 0) {
@ -307,10 +307,10 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
} }
assert roff <= woff: assert roff <= woff:
"The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.'; "The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.';
// Commit computed offsets. }
// Commit computed offsets, if any
this.woff = woff; this.woff = woff;
this.roff = roff; this.roff = roff;
}
offsets = new int[bufs.length]; offsets = new int[bufs.length];
long cap = 0; long cap = 0;
@ -706,6 +706,11 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
} }
} }
@Override
public CompositeBuffer split() {
return split(writerOffset());
}
@Override @Override
public CompositeBuffer split(int splitOffset) { public CompositeBuffer split(int splitOffset) {
checkSplit(splitOffset); checkSplit(splitOffset);
@ -1164,9 +1169,7 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
} }
boolean readOnly = this.readOnly; boolean readOnly = this.readOnly;
makeInaccessible(); makeInaccessible();
return new Owned<CompositeBuffer>() { return drop -> {
@Override
public CompositeBuffer transferOwnership(Drop<CompositeBuffer> drop) {
Buffer[] received = new Buffer[sends.length]; Buffer[] received = new Buffer[sends.length];
for (int i = 0; i < sends.length; i++) { for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive(); received[i] = sends[i].receive();
@ -1175,11 +1178,10 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
composite.readOnly = readOnly; composite.readOnly = readOnly;
drop.attach(composite); drop.attach(composite);
return composite; return composite;
}
}; };
} }
void makeInaccessible() { private void makeInaccessible() {
capacity = 0; capacity = 0;
roff = 0; roff = 0;
woff = 0; woff = 0;

View File

@ -0,0 +1,64 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.buffer.api.tests;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.concurrent.ThreadLocalRandom;
public class BufferSplitTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
void splitPostFull(Fixture fixture) {
splitPostFullOrRead(fixture, false);
}
@ParameterizedTest
@MethodSource("allocators")
void splitPostFullAndRead(Fixture fixture) {
splitPostFullOrRead(fixture, true);
}
private static void splitPostFullOrRead(Fixture fixture, boolean read) {
try (BufferAllocator allocator = fixture.createAllocator()) {
final int capacity = 3;
try (Buffer buf = allocator.allocate(capacity)) {
byte[] data = new byte[capacity];
ThreadLocalRandom.current().nextBytes(data);
buf.writeBytes(data);
assertEquals(buf.capacity(), buf.writerOffset());
if (read) {
for (int i = 0; i < capacity; i++) {
buf.readByte();
}
}
assertEquals(read ? buf.capacity() : 0, buf.readerOffset());
try (Buffer split = buf.split()) {
assertEquals(capacity, split.capacity());
assertEquals(split.capacity(), split.writerOffset());
assertEquals(read ? split.capacity() : 0, split.readerOffset());
assertEquals(0, buf.capacity());
assertEquals(0, buf.writerOffset());
assertEquals(0, buf.readerOffset());
}
}
}
}
}