DefaultFileRegion.transferTo with invalid count may cause busy-spin (#8885)
Motivation: `DefaultFileRegion.transferTo` will return 0 all the time when we request more data then the actual file size. This may result in a busy spin while processing the fileregion during writes. Modifications: - If we wrote 0 bytes check if the underlying file size is smaller then the requested count and if so throw an IOException - Add DefaultFileRegionTest - Add a test to the testsuite Result: Fixes https://github.com/netty/netty/issues/8868.
This commit is contained in:
parent
da45e07e80
commit
106bd0c091
@ -22,10 +22,12 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
@ -73,6 +75,11 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
||||
run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileRegionCountLargerThenFile() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testFileRegion0(sb, cb, false, true, true);
|
||||
}
|
||||
@ -93,6 +100,34 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
||||
testFileRegion0(sb, cb, true, false, true);
|
||||
}
|
||||
|
||||
public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
File file = File.createTempFile("netty-", ".tmp");
|
||||
file.deleteOnExit();
|
||||
|
||||
final FileOutputStream out = new FileOutputStream(file);
|
||||
out.write(data);
|
||||
out.close();
|
||||
|
||||
sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
|
||||
// Just drop the message.
|
||||
}
|
||||
});
|
||||
cb.handler(new ChannelInboundHandlerAdapter());
|
||||
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.connect(sc.localAddress()).sync().channel();
|
||||
|
||||
// Request file region which is bigger then the underlying file.
|
||||
FileRegion region = new DefaultFileRegion(
|
||||
new FileInputStream(file).getChannel(), 0, data.length + 1024);
|
||||
|
||||
assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
|
||||
cc.close().sync();
|
||||
sc.close().sync();
|
||||
}
|
||||
|
||||
private static void testFileRegion0(
|
||||
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
|
||||
throws Throwable {
|
||||
|
@ -359,13 +359,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
||||
* </ul>
|
||||
*/
|
||||
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||
final long offset = region.transferred();
|
||||
final long regionCount = region.count();
|
||||
if (region.transferred() >= regionCount) {
|
||||
if (offset >= regionCount) {
|
||||
in.remove();
|
||||
return 0;
|
||||
}
|
||||
|
||||
final long offset = region.transferred();
|
||||
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
||||
if (flushedAmount > 0) {
|
||||
in.progress(flushedAmount);
|
||||
@ -373,6 +373,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
||||
in.remove();
|
||||
}
|
||||
return 1;
|
||||
} else if (flushedAmount == 0) {
|
||||
validateFileRegion(region, offset);
|
||||
}
|
||||
return WRITE_STATUS_SNDBUF_FULL;
|
||||
}
|
||||
|
@ -207,12 +207,13 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
||||
*/
|
||||
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||
final long regionCount = region.count();
|
||||
if (region.transferred() >= regionCount) {
|
||||
final long offset = region.transferred();
|
||||
|
||||
if (offset >= regionCount) {
|
||||
in.remove();
|
||||
return 0;
|
||||
}
|
||||
|
||||
final long offset = region.transferred();
|
||||
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
||||
if (flushedAmount > 0) {
|
||||
in.progress(flushedAmount);
|
||||
@ -220,6 +221,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
||||
in.remove();
|
||||
}
|
||||
return 1;
|
||||
} else if (flushedAmount == 0) {
|
||||
validateFileRegion(region, offset);
|
||||
}
|
||||
return WRITE_STATUS_SNDBUF_FULL;
|
||||
}
|
||||
|
@ -1094,6 +1094,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return msg;
|
||||
}
|
||||
|
||||
protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
|
||||
DefaultFileRegion.validate(region, position);
|
||||
}
|
||||
|
||||
static final class CloseFuture extends DefaultChannelPromise {
|
||||
|
||||
CloseFuture(AbstractChannel ch) {
|
||||
|
@ -136,6 +136,12 @@ public class DefaultFileRegion extends AbstractReferenceCounted implements FileR
|
||||
long written = file.transferTo(this.position + position, count, target);
|
||||
if (written > 0) {
|
||||
transferred += written;
|
||||
} else if (written == 0) {
|
||||
// If the amount of written data is 0 we need to check if the requested count is bigger then the
|
||||
// actual file itself as it may have been truncated on disk.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/8868
|
||||
validate(this, position);
|
||||
}
|
||||
return written;
|
||||
}
|
||||
@ -179,4 +185,16 @@ public class DefaultFileRegion extends AbstractReferenceCounted implements FileR
|
||||
public FileRegion touch(Object hint) {
|
||||
return this;
|
||||
}
|
||||
|
||||
static void validate(DefaultFileRegion region, long position) throws IOException {
|
||||
// If the amount of written data is 0 we need to check if the requested count is bigger then the
|
||||
// actual file itself as it may have been truncated on disk.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/8868
|
||||
long size = region.file.size();
|
||||
long count = region.count - position;
|
||||
if (region.position + count + position > size) {
|
||||
throw new IOException("Underlying file size " + size + " smaller then requested count " + region.count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class DefaultFileRegionTest {
|
||||
|
||||
private static final byte[] data = new byte[1048576 * 10];
|
||||
|
||||
static {
|
||||
ThreadLocalRandom.current().nextBytes(data);
|
||||
}
|
||||
|
||||
private static File newFile() throws IOException {
|
||||
File file = File.createTempFile("netty-", ".tmp");
|
||||
file.deleteOnExit();
|
||||
|
||||
final FileOutputStream out = new FileOutputStream(file);
|
||||
out.write(data);
|
||||
out.close();
|
||||
return file;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateFromFile() throws IOException {
|
||||
File file = newFile();
|
||||
try {
|
||||
testFileRegion(new DefaultFileRegion(file, 0, data.length));
|
||||
} finally {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateFromFileChannel() throws IOException {
|
||||
File file = newFile();
|
||||
|
||||
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
|
||||
testFileRegion(new DefaultFileRegion(randomAccessFile.getChannel(), 0, data.length));
|
||||
} finally {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
private static void testFileRegion(FileRegion region) throws IOException {
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
|
||||
try (WritableByteChannel channel = Channels.newChannel(outputStream)) {
|
||||
assertEquals(data.length, region.count());
|
||||
assertEquals(0, region.transferred());
|
||||
assertEquals(data.length, region.transferTo(channel, 0));
|
||||
assertEquals(data.length, region.count());
|
||||
assertEquals(data.length, region.transferred());
|
||||
assertArrayEquals(data, outputStream.toByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncated() throws IOException {
|
||||
File file = newFile();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
|
||||
try (WritableByteChannel channel = Channels.newChannel(outputStream);
|
||||
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
|
||||
FileRegion region = new DefaultFileRegion(randomAccessFile.getChannel(), 0, data.length);
|
||||
|
||||
randomAccessFile.getChannel().truncate(data.length - 1024);
|
||||
|
||||
assertEquals(data.length, region.count());
|
||||
assertEquals(0, region.transferred());
|
||||
|
||||
assertEquals(data.length - 1024, region.transferTo(channel, 0));
|
||||
assertEquals(data.length, region.count());
|
||||
assertEquals(data.length - 1024, region.transferred());
|
||||
try {
|
||||
region.transferTo(channel, data.length - 1024);
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
// expected
|
||||
}
|
||||
} finally {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user