DefaultFileRegion.transferTo with invalid count may cause busy-spin (#8885)
Motivation: `DefaultFileRegion.transferTo` will return 0 all the time when we request more data then the actual file size. This may result in a busy spin while processing the fileregion during writes. Modifications: - If we wrote 0 bytes check if the underlying file size is smaller then the requested count and if so throw an IOException - Add DefaultFileRegionTest - Add a test to the testsuite Result: Fixes https://github.com/netty/netty/issues/8868.
This commit is contained in:
parent
5d448377e9
commit
81e43d5088
@ -22,11 +22,13 @@ import io.netty.buffer.Unpooled;
|
|||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandler;
|
import io.netty.channel.ChannelInboundHandler;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.DefaultFileRegion;
|
import io.netty.channel.DefaultFileRegion;
|
||||||
import io.netty.channel.FileRegion;
|
import io.netty.channel.FileRegion;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import org.hamcrest.CoreMatchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -73,6 +75,11 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
|||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileRegionCountLargerThenFile() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testFileRegion0(sb, cb, false, true, true);
|
testFileRegion0(sb, cb, false, true, true);
|
||||||
}
|
}
|
||||||
@ -93,6 +100,34 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
|||||||
testFileRegion0(sb, cb, true, false, true);
|
testFileRegion0(sb, cb, true, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
File file = File.createTempFile("netty-", ".tmp");
|
||||||
|
file.deleteOnExit();
|
||||||
|
|
||||||
|
final FileOutputStream out = new FileOutputStream(file);
|
||||||
|
out.write(data);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
|
||||||
|
// Just drop the message.
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cb.handler(new ChannelInboundHandlerAdapter());
|
||||||
|
|
||||||
|
Channel sc = sb.bind().sync().channel();
|
||||||
|
Channel cc = cb.connect(sc.localAddress()).sync().channel();
|
||||||
|
|
||||||
|
// Request file region which is bigger then the underlying file.
|
||||||
|
FileRegion region = new DefaultFileRegion(
|
||||||
|
new FileInputStream(file).getChannel(), 0, data.length + 1024);
|
||||||
|
|
||||||
|
assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
|
||||||
|
cc.close().sync();
|
||||||
|
sc.close().sync();
|
||||||
|
}
|
||||||
|
|
||||||
private static void testFileRegion0(
|
private static void testFileRegion0(
|
||||||
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
|
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
|
@ -367,13 +367,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
|||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||||
|
final long offset = region.transferred();
|
||||||
final long regionCount = region.count();
|
final long regionCount = region.count();
|
||||||
if (region.transferred() >= regionCount) {
|
if (offset >= regionCount) {
|
||||||
in.remove();
|
in.remove();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
final long offset = region.transferred();
|
|
||||||
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
||||||
if (flushedAmount > 0) {
|
if (flushedAmount > 0) {
|
||||||
in.progress(flushedAmount);
|
in.progress(flushedAmount);
|
||||||
@ -381,6 +381,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
|||||||
in.remove();
|
in.remove();
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
} else if (flushedAmount == 0) {
|
||||||
|
validateFileRegion(region, offset);
|
||||||
}
|
}
|
||||||
return WRITE_STATUS_SNDBUF_FULL;
|
return WRITE_STATUS_SNDBUF_FULL;
|
||||||
}
|
}
|
||||||
|
@ -210,12 +210,13 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
|||||||
*/
|
*/
|
||||||
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||||
final long regionCount = region.count();
|
final long regionCount = region.count();
|
||||||
if (region.transferred() >= regionCount) {
|
final long offset = region.transferred();
|
||||||
|
|
||||||
|
if (offset >= regionCount) {
|
||||||
in.remove();
|
in.remove();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
final long offset = region.transferred();
|
|
||||||
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
|
||||||
if (flushedAmount > 0) {
|
if (flushedAmount > 0) {
|
||||||
in.progress(flushedAmount);
|
in.progress(flushedAmount);
|
||||||
@ -223,6 +224,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
|||||||
in.remove();
|
in.remove();
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
} else if (flushedAmount == 0) {
|
||||||
|
validateFileRegion(region, offset);
|
||||||
}
|
}
|
||||||
return WRITE_STATUS_SNDBUF_FULL;
|
return WRITE_STATUS_SNDBUF_FULL;
|
||||||
}
|
}
|
||||||
|
@ -1149,6 +1149,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
|
||||||
|
DefaultFileRegion.validate(region, position);
|
||||||
|
}
|
||||||
|
|
||||||
static final class CloseFuture extends DefaultChannelPromise {
|
static final class CloseFuture extends DefaultChannelPromise {
|
||||||
|
|
||||||
CloseFuture(AbstractChannel ch) {
|
CloseFuture(AbstractChannel ch) {
|
||||||
|
@ -139,6 +139,12 @@ public class DefaultFileRegion extends AbstractReferenceCounted implements FileR
|
|||||||
long written = file.transferTo(this.position + position, count, target);
|
long written = file.transferTo(this.position + position, count, target);
|
||||||
if (written > 0) {
|
if (written > 0) {
|
||||||
transferred += written;
|
transferred += written;
|
||||||
|
} else if (written == 0) {
|
||||||
|
// If the amount of written data is 0 we need to check if the requested count is bigger then the
|
||||||
|
// actual file itself as it may have been truncated on disk.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/8868
|
||||||
|
validate(this, position);
|
||||||
}
|
}
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
@ -182,4 +188,16 @@ public class DefaultFileRegion extends AbstractReferenceCounted implements FileR
|
|||||||
public FileRegion touch(Object hint) {
|
public FileRegion touch(Object hint) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void validate(DefaultFileRegion region, long position) throws IOException {
|
||||||
|
// If the amount of written data is 0 we need to check if the requested count is bigger then the
|
||||||
|
// actual file itself as it may have been truncated on disk.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/8868
|
||||||
|
long size = region.file.size();
|
||||||
|
long count = region.count - position;
|
||||||
|
if (region.position + count + position > size) {
|
||||||
|
throw new IOException("Underlying file size " + size + " smaller then requested count " + region.count);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,120 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2019 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
public class DefaultFileRegionTest {
|
||||||
|
|
||||||
|
private static final byte[] data = new byte[1048576 * 10];
|
||||||
|
|
||||||
|
static {
|
||||||
|
PlatformDependent.threadLocalRandom().nextBytes(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static File newFile() throws IOException {
|
||||||
|
File file = File.createTempFile("netty-", ".tmp");
|
||||||
|
file.deleteOnExit();
|
||||||
|
|
||||||
|
final FileOutputStream out = new FileOutputStream(file);
|
||||||
|
out.write(data);
|
||||||
|
out.close();
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateFromFile() throws IOException {
|
||||||
|
File file = newFile();
|
||||||
|
try {
|
||||||
|
testFileRegion(new DefaultFileRegion(file, 0, data.length));
|
||||||
|
} finally {
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateFromFileChannel() throws IOException {
|
||||||
|
File file = newFile();
|
||||||
|
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
|
||||||
|
try {
|
||||||
|
testFileRegion(new DefaultFileRegion(randomAccessFile.getChannel(), 0, data.length));
|
||||||
|
} finally {
|
||||||
|
randomAccessFile.close();
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testFileRegion(FileRegion region) throws IOException {
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
WritableByteChannel channel = Channels.newChannel(outputStream);
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertEquals(data.length, region.count());
|
||||||
|
assertEquals(0, region.transferred());
|
||||||
|
assertEquals(data.length, region.transferTo(channel, 0));
|
||||||
|
assertEquals(data.length, region.count());
|
||||||
|
assertEquals(data.length, region.transferred());
|
||||||
|
assertArrayEquals(data, outputStream.toByteArray());
|
||||||
|
} finally {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTruncated() throws IOException {
|
||||||
|
File file = newFile();
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
WritableByteChannel channel = Channels.newChannel(outputStream);
|
||||||
|
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileRegion region = new DefaultFileRegion(randomAccessFile.getChannel(), 0, data.length);
|
||||||
|
|
||||||
|
randomAccessFile.getChannel().truncate(data.length - 1024);
|
||||||
|
|
||||||
|
assertEquals(data.length, region.count());
|
||||||
|
assertEquals(0, region.transferred());
|
||||||
|
|
||||||
|
assertEquals(data.length - 1024, region.transferTo(channel, 0));
|
||||||
|
assertEquals(data.length, region.count());
|
||||||
|
assertEquals(data.length - 1024, region.transferred());
|
||||||
|
try {
|
||||||
|
region.transferTo(channel, data.length - 1024);
|
||||||
|
fail();
|
||||||
|
} catch (IOException expected) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
channel.close();
|
||||||
|
|
||||||
|
randomAccessFile.close();
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user