diff --git a/buffer/src/main/java/io/netty/buffer/stream/AbstractStream.java b/buffer/src/main/java/io/netty/buffer/stream/AbstractStream.java deleted file mode 100644 index d9bc8b604b..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/AbstractStream.java +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.util.Random; - -public abstract class AbstractStream implements Stream { - - private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractStream.class); - - private static final Random random = new Random(); - - private static final ThreadLocal IN_CONSUMER = new ThreadLocal() { - @Override - protected Boolean initialValue() { - return Boolean.FALSE; - } - }; - - /** - * Generate a random string that can be used for correlating a group of log messages. - */ - private static String nextLogKey() { - return Long.toHexString(random.nextInt() & 0xFFFFFFFFL); - } - - private final StreamProducerContextImpl producerCtx; - private StreamConsumerContextImpl consumerCtx; - private Runnable invokeStreamConsumedTask; - - /** 0 - init, 1 - accepted, 2 - discarded, 3 - rejected, 4 - closed */ - int state; - T buffer; - - @SuppressWarnings("unchecked") - protected AbstractStream(EventExecutor executor, StreamProducer producer) { - producerCtx = new StreamProducerContextImpl(executor, producer); - } - - T buffer() { - T buffer = this.buffer; - if (buffer == null) { - fail(); - } - return buffer; - } - - @Override - public void accept(EventExecutor executor, StreamConsumer consumer) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (consumer == null) { - throw new NullPointerException("handler"); - } - - if (state != 0) { - fail(); - } - - StreamConsumerContextImpl consumerCtx = new StreamConsumerContextImpl(executor, consumer); - - @SuppressWarnings("unchecked") - StreamConsumer h = (StreamConsumer) consumer; - try { - buffer = h.newStreamBuffer(consumerCtx); - } catch (Throwable t) { - PlatformDependent.throwException(t); - } - - this.consumerCtx = consumerCtx; - state = 1; - - fireStreamAccepted(); - } - - private void fireStreamAccepted() { - EventExecutor e = producerCtx.executor; - if (e.inEventLoop()) { - invokeStreamAccepted(); - } else { - e.execute(new Runnable() { - @Override - public void run() { - invokeStreamAccepted(); - } - }); - } - } - - private void invokeStreamAccepted() { - StreamProducerContextImpl producerCtx = this.producerCtx; - try { - producerCtx.producer.streamAccepted(producerCtx); - } catch (Throwable t) { - safeAbort(producerCtx, t); - return; - } - - if (consumerCtx.nextCalled) { - consumerCtx.invokeStreamConsumed(); - } - } - - void safeAbort(StreamProducerContextImpl producerCtx, Throwable cause) { - try { - producerCtx.abort(cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - String key = nextLogKey(); - logger.warn("[{}] Failed to auto-abort a stream.", key, t); - logger.warn("[{}] .. when invoked with the following cause:", key, cause); - } - } - } - - @Override - public void discard() { - StreamConsumerContextImpl consumerCtx = this.consumerCtx; - if (consumerCtx == null) { - fail(); - } - consumerCtx.discard(); - } - - @Override - public void reject(Throwable cause) { - StreamConsumerContextImpl consumerCtx = this.consumerCtx; - if (consumerCtx == null) { - fail(); - } - consumerCtx.reject(cause); - } - - private void fail() { - switch (state) { - case 0: - throw new IllegalStateException("stream not accepted yet"); - case 1: - throw new IllegalStateException("stream accepted already"); - case 2: - throw new IllegalStateException("stream discarded already"); - case 3: - throw new IllegalStateException("stream rejected already"); - case 4: - throw new IllegalStateException("stream closed already"); - default: - throw new Error(); - } - } - - private final class StreamProducerContextImpl implements StreamProducerContext { - - final EventExecutor executor; - final StreamProducer producer; - boolean invokedStreamOpen; - Runnable invokeStreamUpdatedTask; - - @SuppressWarnings("unchecked") - StreamProducerContextImpl(EventExecutor executor, StreamProducer producer) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (producer == null) { - throw new NullPointerException("producer"); - } - - this.executor = executor; - this.producer = (StreamProducer) producer; - } - - @Override - public EventExecutor executor() { - return executor; - } - - @Override - public T buffer() { - return AbstractStream.this.buffer(); - } - - @Override - public StreamProducerContext update() { - if (state != 1) { - fail(); - } - - fireStreamUpdated(); - return this; - } - - private void fireStreamUpdated() { - EventExecutor e = consumerCtx.executor; - if (e.inEventLoop()) { - invokeStreamUpdated(); - } else { - Runnable task = invokeStreamUpdatedTask; - if (task == null) { - invokeStreamUpdatedTask = task = new Runnable() { - @Override - public void run() { - invokeStreamUpdated(); - } - }; - } - e.execute(task); - } - } - - private void invokeStreamUpdated() { - StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx; - StreamConsumer consumer = consumerCtx.consumer; - if (consumerCtx.singleThreaded) { - IN_CONSUMER.set(Boolean.TRUE); - } - try { - if (!invokedStreamOpen) { - invokedStreamOpen = true; - try { - consumer.streamOpen(consumerCtx); - } catch (Throwable t) { - safeAbort(producerCtx, new StreamConsumerException(t)); - return; - } - } - - try { - consumer.streamUpdated(consumerCtx); - } catch (Throwable t) { - safeAbort(producerCtx, new StreamConsumerException(t)); - } - } finally { - if (consumerCtx.singleThreaded) { - IN_CONSUMER.set(Boolean.FALSE); - } - } - } - - @Override - public void close() { - if (state == 1) { - state = 4; - fireStreamClosed(); - } - } - - private void fireStreamClosed() { - EventExecutor e = consumerCtx.executor; - if (e.inEventLoop()) { - invokeStreamClosed(); - } else { - e.execute(new Runnable() { - @Override - public void run() { - invokeStreamClosed(); - } - }); - } - } - - private void invokeStreamClosed() { - StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx; - try { - consumerCtx.consumer.streamClosed(consumerCtx); - } catch (Throwable t) { - logger.warn("StreamConsumer.streamClosed() raised an exception.", t); - } - } - - @Override - public void abort(Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - if (state != 1) { - fail(); - } - - fireStreamAborted(cause); - } - - private void fireStreamAborted(final Throwable cause) { - StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx; - EventExecutor e = consumerCtx.executor; - if (e.inEventLoop()) { - invokeStreamAborted(cause); - } else { - e.execute(new Runnable() { - @Override - public void run() { - invokeStreamAborted(cause); - } - }); - } - } - - private void invokeStreamAborted(Throwable cause) { - StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx; - try { - consumerCtx.consumer.streamAborted(consumerCtx, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - String key = nextLogKey(); - logger.warn("[{}] StreamConsumer.streamAborted() raised an exception.", key, t); - logger.warn("[{}] .. when invoked with the following cause:", key, cause); - } - } finally { - invokeStreamClosed(); - } - } - } - - private final class StreamConsumerContextImpl implements StreamConsumerContext { - - final EventExecutor executor; - final StreamConsumer consumer; - final boolean singleThreaded; - boolean nextCalled; - - @SuppressWarnings("unchecked") - StreamConsumerContextImpl(EventExecutor executor, StreamConsumer consumer) { - if (executor == null) { - throw new NullPointerException("executor"); - } - if (consumer == null) { - throw new NullPointerException("consumer"); - } - this.executor = executor; - this.consumer = (StreamConsumer) consumer; - singleThreaded = executor == producerCtx.executor; - } - - @Override - public EventExecutor executor() { - return executor; - } - - @Override - public T buffer() { - return AbstractStream.this.buffer(); - } - - @Override - public void discard() { - switch (state) { - case 2: - return; - case 3: - case 4: - fail(); - } - - T buffer = AbstractStream.this.buffer; - if (buffer != null) { - buffer.release(); - } - - state = 2; - - fireStreamDiscarded(); - } - - private void fireStreamDiscarded() { - EventExecutor e = producerCtx.executor; - if (e.inEventLoop()) { - invokeStreamDiscarded(); - } else { - e.execute(new Runnable() { - @Override - public void run() { - invokeStreamDiscarded(); - } - }); - } - } - - private void invokeStreamDiscarded() { - StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx; - try { - producerCtx.producer.streamDiscarded(producerCtx); - } catch (Throwable t) { - logger.warn("StreamProducer.streamDiscarded() raised an exception.", t); - } - } - - @Override - public void reject(Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - if (state != 1) { - fail(); - } - - T buffer = AbstractStream.this.buffer; - if (buffer != null) { - buffer.release(); - } - - state = 3; - - fireStreamRejected(cause); - } - - private void fireStreamRejected(final Throwable cause) { - EventExecutor e = producerCtx.executor; - if (e.inEventLoop()) { - invokeStreamRejected(cause); - } else { - e.execute(new Runnable() { - @Override - public void run() { - invokeStreamRejected(cause); - } - }); - } - } - - private void invokeStreamRejected(Throwable cause) { - StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx; - try { - producerCtx.producer.streamRejected(producerCtx, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - String key = nextLogKey(); - logger.warn("[{}] StreamProducer.streamRejected() raised an exception.", key, t); - logger.warn("[{}] .. when invoked with the following cause:", key, cause); - } - } - } - - @Override - public void next() { - if (state != 1) { - fail(); - } - - if (singleThreaded && IN_CONSUMER.get()) { - nextCalled = true; - } else { - fireStreamConsumed(); - } - } - - private void fireStreamConsumed() { - EventExecutor e = producerCtx.executor; - if (e.inEventLoop()) { - invokeStreamConsumed(); - } else { - Runnable task = invokeStreamConsumedTask; - if (task == null) { - invokeStreamConsumedTask = task = new Runnable() { - @Override - public void run() { - invokeStreamConsumed(); - } - }; - } - e.execute(task); - } - } - - private void invokeStreamConsumed() { - StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx; - try { - do { - consumerCtx.nextCalled = false; - producerCtx.producer.streamConsumed(producerCtx); - } while (consumerCtx.nextCalled); - } catch (Throwable t) { - reject(new StreamProducerException(t)); - } - } - } -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/Stream.java b/buffer/src/main/java/io/netty/buffer/stream/Stream.java deleted file mode 100644 index 58dab1e1e0..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/Stream.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; -import io.netty.util.concurrent.EventExecutor; - -public interface Stream { - void accept(EventExecutor executor, StreamConsumer consumer); - void discard(); - void reject(Throwable cause); -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumer.java b/buffer/src/main/java/io/netty/buffer/stream/StreamConsumer.java deleted file mode 100644 index e70757d114..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumer.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; - -public interface StreamConsumer { - T newStreamBuffer(StreamConsumerContext ctx) throws Exception; - - void streamOpen(StreamConsumerContext ctx) throws Exception; - void streamUpdated(StreamConsumerContext ctx) throws Exception; - void streamAborted(StreamConsumerContext ctx, Throwable cause) throws Exception; - void streamClosed(StreamConsumerContext ctx) throws Exception; -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerContext.java b/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerContext.java deleted file mode 100644 index 6901db0c07..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerContext.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; -import io.netty.util.concurrent.EventExecutor; - -public interface StreamConsumerContext { - EventExecutor executor(); - T buffer(); - void next(); - void discard(); - void reject(Throwable cause); -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerException.java b/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerException.java deleted file mode 100644 index db51dea27c..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamConsumerException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -public class StreamConsumerException extends StreamException { - - private static final long serialVersionUID = -7688455132621735741L; - - public StreamConsumerException() { } - - public StreamConsumerException(String message) { - super(message); - } - - public StreamConsumerException(String message, Throwable cause) { - super(message, cause); - } - - public StreamConsumerException(Throwable cause) { - super(cause); - } -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamException.java b/buffer/src/main/java/io/netty/buffer/stream/StreamException.java deleted file mode 100644 index 78f51e59b2..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -public class StreamException extends RuntimeException { - - private static final long serialVersionUID = -8529582560484984135L; - - public StreamException() { } - - public StreamException(String message) { - super(message); - } - - public StreamException(String message, Throwable cause) { - super(message, cause); - } - - public StreamException(Throwable cause) { - super(cause); - } -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamProducer.java b/buffer/src/main/java/io/netty/buffer/stream/StreamProducer.java deleted file mode 100644 index 31acd6142f..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamProducer.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; - -public interface StreamProducer { - void streamAccepted(StreamProducerContext ctx) throws Exception; - void streamConsumed(StreamProducerContext ctx) throws Exception; - void streamDiscarded(StreamProducerContext ctx) throws Exception; - void streamRejected(StreamProducerContext ctx, Throwable cause) throws Exception; -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamProducerContext.java b/buffer/src/main/java/io/netty/buffer/stream/StreamProducerContext.java deleted file mode 100644 index 443ab0e2b5..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamProducerContext.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.Buf; -import io.netty.util.concurrent.EventExecutor; - -public interface StreamProducerContext { - EventExecutor executor(); - T buffer(); - StreamProducerContext update(); - void close(); - void abort(Throwable cause); -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/StreamProducerException.java b/buffer/src/main/java/io/netty/buffer/stream/StreamProducerException.java deleted file mode 100644 index 6bfd605cce..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/StreamProducerException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -public class StreamProducerException extends StreamException { - - private static final long serialVersionUID = 8830744325775707415L; - - public StreamProducerException() { } - - public StreamProducerException(String message) { - super(message); - } - - public StreamProducerException(String message, Throwable cause) { - super(message, cause); - } - - public StreamProducerException(Throwable cause) { - super(cause); - } -} diff --git a/buffer/src/main/java/io/netty/buffer/stream/package-info.java b/buffer/src/main/java/io/netty/buffer/stream/package-info.java deleted file mode 100644 index 7828ac770f..0000000000 --- a/buffer/src/main/java/io/netty/buffer/stream/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * Enables exchanging a very large stream between a producer and a consumer. - */ -package io.netty.buffer.stream; diff --git a/buffer/src/test/java/io/netty/buffer/stream/StreamTest.java b/buffer/src/test/java/io/netty/buffer/stream/StreamTest.java deleted file mode 100644 index abbcc5353b..0000000000 --- a/buffer/src/test/java/io/netty/buffer/stream/StreamTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.buffer.stream; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Promise; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -public class StreamTest { - - private static EventExecutor executorA; - private static EventExecutor executorB; - - @BeforeClass - public static void setUp() { - executorA = new DefaultEventExecutorGroup(1).next(); - executorB = new DefaultEventExecutorGroup(1).next(); - } - - @AfterClass - public static void tearDown() { - executorB.shutdown(); - executorA.shutdown(); - } - - @Test(timeout = 5000) - public void testSimpleWithSameExecutor() throws Exception { - final long size = 1048576L * 1024L * 1024L; // Transfer whooping 1 TiB of garbage - testSimple0(executorA, executorA, size); - } - - @Test(timeout = 10000) - public void testSimpleWithDifferentExecutors() throws Exception { - final long size = 1048576L * 1024L * 16L; // Transfer 16 GiB of garbage - testSimple0(executorA, executorB, size); - } - - private static void testSimple0(EventExecutor executorA, EventExecutor executorB, long size) throws Exception { - LargeByteStreamConsumer consumer = new LargeByteStreamConsumer(size); - Stream stream = new LargeByteStream(executorA, size); - stream.accept(executorB, consumer); - - for (;;) { - if (consumer.future.await(1000)) { - break; - } - - System.err.println(consumer.counter + " / " + size); - } - - consumer.future.sync(); - } - - private static class LargeByteStream extends AbstractStream { - LargeByteStream(EventExecutor executor, long size) { - super(executor, new LargeByteStreamProducer(size)); - } - } - - private static class LargeByteStreamProducer implements StreamProducer { - - private final long size; - private long counter; - - LargeByteStreamProducer(long size) { - this.size = size; - } - - @Override - public void streamAccepted(StreamProducerContext ctx) throws Exception { - generate(ctx); - } - - @Override - public void streamConsumed(StreamProducerContext ctx) throws Exception { - generate(ctx); - } - - private void generate(StreamProducerContext ctx) { - ByteBuf buf = ctx.buffer(); - int chunkSize = (int) Math.min(size - counter, buf.maxWritableBytes()); - buf.ensureWritable(chunkSize); - buf.writerIndex(buf.writerIndex() + chunkSize); - ctx.update(); - counter += chunkSize; - if (counter == size) { - ctx.close(); - return; - } - - if (counter > size) { - throw new AssertionError("counter > size"); - } - } - - @Override - public void streamDiscarded(StreamProducerContext ctx) throws Exception { - throw new AssertionError("stream discarded"); - } - - @Override - public void streamRejected(StreamProducerContext ctx, Throwable cause) throws Exception { - throw new AssertionError("stream rejected", cause); - } - } - - private static class LargeByteStreamConsumer implements StreamConsumer { - - private final long size; - volatile Promise future; - volatile long counter; - - LargeByteStreamConsumer(long size) { - this.size = size; - } - - @Override - public ByteBuf newStreamBuffer(StreamConsumerContext ctx) throws Exception { - future = new DefaultPromise(ctx.executor()); - return Unpooled.buffer(0, 65536); // Only use 64 KiB at max. - } - - @Override - public void streamOpen(StreamConsumerContext ctx) throws Exception { } - - @Override - public void streamUpdated(StreamConsumerContext ctx) throws Exception { - ByteBuf buf = ctx.buffer(); - int chunkSize = buf.readableBytes(); - buf.skipBytes(chunkSize); - buf.discardReadBytes(); - counter += chunkSize; - if (counter == size) { - future.setSuccess(); - } else if (counter > size) { - AssertionError error = new AssertionError("counter > size"); - ctx.reject(error); - future.setFailure(error); - } else { - ctx.next(); - } - } - - @Override - public void streamAborted(StreamConsumerContext ctx, Throwable cause) throws Exception { - future.setFailure(cause); - } - - @Override - public void streamClosed(StreamConsumerContext ctx) throws Exception { - future.tryFailure(new AssertionError("tryFailure() must return false.")); - } - } -}