Update examples

Motivation:
The forEachReadable/Writable permit a cleaner FileCopyExample implementation.

Modification:
Simplify FileCopyExample.
Also add examples of various good and bad ways to transfer buffer ownership between threads.
Update the forEachReadable/Writable APIs to let exceptions pass through.

Result:
Cleaner code and more useful forEachReadable/Writable APIs.
This commit is contained in:
Chris Vest 2021-01-25 12:24:52 +01:00
parent 6e24f5d155
commit 2df000ad9a
6 changed files with 292 additions and 35 deletions

View File

@ -562,7 +562,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* {@link ReadableComponentProcessor#process(int, ReadableComponent)} returned {@code false}. * {@link ReadableComponentProcessor#process(int, ReadableComponent)} returned {@code false}.
* In any case, the number of components processed may be less than {@link #countComponents()}. * In any case, the number of components processed may be less than {@link #countComponents()}.
*/ */
int forEachReadable(int initialIndex, ReadableComponentProcessor processor); <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor) throws E;
/** /**
* Process all writable components of this buffer, and return the number of components processed. * Process all writable components of this buffer, and return the number of components processed.
@ -601,5 +601,5 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* {@link WritableComponentProcessor#process(int, WritableComponent)} returned {@code false}. * {@link WritableComponentProcessor#process(int, WritableComponent)} returned {@code false}.
* In any case, the number of components processed may be less than {@link #countComponents()}. * In any case, the number of components processed may be less than {@link #countComponents()}.
*/ */
int forEachWritable(int initialIndex, WritableComponentProcessor processor); <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor) throws E;
} }

View File

@ -26,7 +26,7 @@ public interface ComponentProcessor {
* A processor of {@linkplain ReadableComponent readable components}. * A processor of {@linkplain ReadableComponent readable components}.
*/ */
@FunctionalInterface @FunctionalInterface
interface ReadableComponentProcessor extends ComponentProcessor { interface ReadableComponentProcessor<E extends Exception> extends ComponentProcessor {
/** /**
* Process the given component at the given index in the * Process the given component at the given index in the
* {@link Buf#forEachReadable(int, ReadableComponentProcessor) iteration}. * {@link Buf#forEachReadable(int, ReadableComponentProcessor) iteration}.
@ -41,14 +41,14 @@ public interface ComponentProcessor {
* @return {@code true} if the iteration should continue and more components should be processed, otherwise * @return {@code true} if the iteration should continue and more components should be processed, otherwise
* {@code false} to stop the iteration early. * {@code false} to stop the iteration early.
*/ */
boolean process(int index, ReadableComponent component); boolean process(int index, ReadableComponent component) throws E;
} }
/** /**
* A processor of {@linkplain WritableComponent writable components}. * A processor of {@linkplain WritableComponent writable components}.
*/ */
@FunctionalInterface @FunctionalInterface
interface WritableComponentProcessor extends ComponentProcessor { interface WritableComponentProcessor<E extends Exception> extends ComponentProcessor {
/** /**
* Process the given component at the given index in the * Process the given component at the given index in the
* {@link Buf#forEachWritable(int, WritableComponentProcessor)} iteration}. * {@link Buf#forEachWritable(int, WritableComponentProcessor)} iteration}.
@ -63,7 +63,7 @@ public interface ComponentProcessor {
* @return {@code true} if the iteration should continue and more components should be processed, otherwise * @return {@code true} if the iteration should continue and more components should be processed, otherwise
* {@code false} to stop the iteration early. * {@code false} to stop the iteration early.
*/ */
boolean process(int index, WritableComponent component); boolean process(int index, WritableComponent component) throws E;
} }
/** /**

View File

@ -801,7 +801,8 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
} }
@Override @Override
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) { public <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor)
throws E {
checkReadBounds(readerOffset(), Math.max(1, readableBytes())); checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
int visited = 0; int visited = 0;
for (Buf buf : bufs) { for (Buf buf : bufs) {
@ -819,7 +820,8 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
} }
@Override @Override
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) { public <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor)
throws E {
checkWriteBounds(writerOffset(), Math.max(1, writableBytes())); checkWriteBounds(writerOffset(), Math.max(1, writableBytes()));
int visited = 0; int visited = 0;
for (Buf buf : bufs) { for (Buf buf : bufs) {

View File

@ -548,13 +548,15 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, ReadableCompon
} }
@Override @Override
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) { public <E extends Exception> int forEachReadable(int initialIndex, ReadableComponentProcessor<E> processor)
throws E {
checkRead(readerOffset(), Math.max(1, readableBytes())); checkRead(readerOffset(), Math.max(1, readableBytes()));
return processor.process(initialIndex, this)? 1 : -1; return processor.process(initialIndex, this)? 1 : -1;
} }
@Override @Override
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) { public <E extends Exception> int forEachWritable(int initialIndex, WritableComponentProcessor<E> processor)
throws E {
checkWrite(writerOffset(), Math.max(1, writableBytes())); checkWrite(writerOffset(), Math.max(1, writableBytes()));
return processor.process(initialIndex, this)? 1 : -1; return processor.process(initialIndex, this)? 1 : -1;
} }

View File

@ -19,7 +19,6 @@ import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.Buf; import io.netty.buffer.api.Buf;
import io.netty.buffer.api.Send; import io.netty.buffer.api.Send;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -34,27 +33,23 @@ import static java.nio.file.StandardOpenOption.WRITE;
public final class FileCopyExample { public final class FileCopyExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2); ExecutorService executor = Executors.newFixedThreadPool(2);
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(8); ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(8);
Object done = new Object();
try (Allocator allocator = Allocator.pooledDirect(); try (Allocator allocator = Allocator.pooledDirect();
var input = FileChannel.open(Path.of("/dev/urandom"), READ); var input = FileChannel.open(Path.of("/dev/urandom"), READ);
var output = FileChannel.open(Path.of("random.bin"), CREATE, TRUNCATE_EXISTING, WRITE)) { var output = FileChannel.open(Path.of("random.bin"), CREATE, TRUNCATE_EXISTING, WRITE)) {
Send<Buf> done = allocator.compose().send();
var reader = executor.submit(() -> { var reader = executor.submit(() -> {
var buf = ByteBuffer.allocateDirect(1024);
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++) {
buf.clear();
while (buf.hasRemaining()) {
int r = input.read(buf);
System.out.println("r = " + r);
System.out.println("buf = " + buf);
}
buf.flip();
try (Buf in = allocator.allocate(1024)) { try (Buf in = allocator.allocate(1024)) {
System.out.println("in = " + in); System.out.println("in = " + in);
while (buf.hasRemaining()) { in.forEachWritable(0, (index, component) -> {
in.writeByte(buf.get()); var bb = component.writableBuffer();
} while (bb.hasRemaining()) {
input.read(bb);
}
return true;
});
System.out.println("Sending " + in.readableBytes() + " bytes."); System.out.println("Sending " + in.readableBytes() + " bytes.");
queue.put(in.send()); queue.put(in.send());
} }
@ -64,19 +59,17 @@ public final class FileCopyExample {
}); });
var writer = executor.submit(() -> { var writer = executor.submit(() -> {
var buf = ByteBuffer.allocateDirect(1024); Send<Buf> send;
Object msg; while ((send = queue.take()) != done) {
while ((msg = queue.take()) != done) {
buf.clear();
@SuppressWarnings("unchecked")
Send<Buf> send = (Send<Buf>) msg;
try (Buf out = send.receive()) { try (Buf out = send.receive()) {
System.out.println("Received " + out.readableBytes() + " bytes."); System.out.println("Received " + out.readableBytes() + " bytes.");
out.copyInto(0, buf, 0, out.readableBytes()); out.forEachReadable(0, (index, component) -> {
buf.position(0).limit(out.readableBytes()); var bb = component.readableBuffer();
} while (bb.hasRemaining()) {
while (buf.hasRemaining()) { output.write(bb);
output.write(buf); }
return true;
});
} }
} }
output.force(true); output.force(true);

View File

@ -0,0 +1,260 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.examples;
import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.Send;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
public class SendExample {
static final class Ex1 {
public static void main(String[] args) throws Exception {
ExecutorService executor =
newSingleThreadExecutor();
Allocator allocator = Allocator.heap();
var future = beginTask(executor, allocator);
future.get();
allocator.close();
executor.shutdown();
}
private static Future<?> beginTask(
ExecutorService executor, Allocator allocator) {
try (Buf buf = allocator.allocate(32)) {
// !!! pit-fall: buffer life-time ends before task completes
return executor.submit(new Task(buf));
}
}
private static class Task implements Runnable {
private final Buf buf;
Task(Buf buf) {
this.buf = buf;
}
@Override
public void run() {
// !!! danger: access out-side owning thread.
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
}
}
}
static final class Ex2 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newSingleThreadExecutor();
Allocator allocator = Allocator.heap();
var future = beginTask(executor, allocator);
future.get();
allocator.close();
executor.shutdown();
}
private static Future<?> beginTask(
ExecutorService executor, Allocator allocator) {
try (Buf buf = allocator.allocate(32)) {
// !!! pit-fall: Rc decrement in other thread.
return executor.submit(new Task(buf.acquire()));
}
}
private static class Task implements Runnable {
private final Buf buf;
Task(Buf buf) {
this.buf = buf;
}
@Override
public void run() {
try (buf) {
// !!! danger: access out-side owning thread.
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
}
}
}
}
static final class Ex3 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newSingleThreadExecutor();
Allocator allocator = Allocator.heap();
var future = beginTask(executor, allocator);
future.get();
allocator.close();
executor.shutdown();
}
private static Future<?> beginTask(
ExecutorService executor, Allocator allocator) {
try (Buf buf = allocator.allocate(32)) {
return executor.submit(new Task(buf.send()));
}
}
private static class Task implements Runnable {
private final Send<Buf> send;
Task(Send<Buf> send) {
this.send = send;
}
@Override
public void run() {
try (Buf buf = send.receive()) {
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
}
}
}
}
static final class Ex4 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
Allocator allocator = Allocator.heap();
try (Buf buf = allocator.allocate(4096)) {
// !!! pit-fall: Rc decrement in other thread.
var futA = executor.submit(new Task(buf.slice(0, 1024)));
var futB = executor.submit(new Task(buf.slice(1024, 1024)));
var futC = executor.submit(new Task(buf.slice(2048, 1024)));
var futD = executor.submit(new Task(buf.slice(3072, 1024)));
futA.get();
futB.get();
futC.get();
futD.get();
}
allocator.close();
executor.shutdown();
}
private static class Task implements Runnable {
private final Buf slice;
Task(Buf slice) {
this.slice = slice;
}
@Override
public void run() {
try (slice) {
while (slice.writableBytes() > 0) {
slice.writeByte((byte) 42);
}
}
}
}
}
static final class Ex5 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
Allocator allocator = Allocator.heap();
try (Buf buf = allocator.allocate(4096);
Buf sliceA = buf.slice(0, 1024);
Buf sliceB = buf.slice(1024, 1024);
Buf sliceC = buf.slice(2048, 1024);
Buf sliceD = buf.slice(3072, 1024)) {
var futA = executor.submit(new Task(sliceA));
var futB = executor.submit(new Task(sliceB));
var futC = executor.submit(new Task(sliceC));
var futD = executor.submit(new Task(sliceD));
futA.get();
futB.get();
futC.get();
futD.get();
}
allocator.close();
executor.shutdown();
}
private static class Task implements Runnable {
private final Buf slice;
Task(Buf slice) {
this.slice = slice;
}
@Override
public void run() {
while (slice.writableBytes() > 0) {
slice.writeByte((byte) 42);
}
}
}
}
static final class Ex6 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
Allocator allocator = Allocator.heap();
try (Buf buf = allocator.allocate(4096)) {
var futA = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
var futB = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
var futC = executor.submit(new Task(buf.writerOffset(1024).bifurcate().send()));
var futD = executor.submit(new Task(buf.send()));
futA.get();
futB.get();
futC.get();
futD.get();
}
allocator.close();
executor.shutdown();
}
private static class Task implements Runnable {
private final Send<Buf> send;
Task(Send<Buf> send) {
this.send = send;
}
@Override
public void run() {
try (Buf buf = send.receive().writerOffset(0)) {
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
}
}
}
}
}