diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index 29d54f40cd..a82fa60d00 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -15,7 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelInterestChanged; +import static org.jboss.netty.channel.Channels.fireChannelOpen; import java.io.IOException; import java.net.InetAddress; @@ -27,7 +28,6 @@ import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.jboss.netty.buffer.ChannelBuffer; @@ -78,7 +78,7 @@ class NioDatagramChannel extends AbstractChannel /** * Monitor object for synchronizing access to the {@link WriteBufferQueue}. */ - final Lock writeLock = new ReentrantLock(); + final ReentrantLock writeLock = new ReentrantLock(); /** * WriteTask that performs write operations. diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 72194c20dd..62d26ae90b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.jboss.netty.buffer.ChannelBuffer; @@ -446,7 +447,8 @@ class NioDatagramWorker implements Runnable { return; } - if (!channel.writeLock.tryLock()) { + final ReentrantLock writeLock = channel.writeLock; + if (writeLock.isHeldByCurrentThread() || !writeLock.tryLock()) { rescheduleWrite(channel); return; } @@ -458,7 +460,7 @@ class NioDatagramWorker implements Runnable { boolean removeOpWrite = false; int writtenBytes = 0; - synchronized (channel.writeLock) { + try { // loop forever... for (;;) { MessageEvent evt = channel.currentWriteEvent; @@ -535,6 +537,8 @@ class NioDatagramWorker implements Runnable { fireExceptionCaught(channel, t); } } + } finally { + writeLock.unlock(); } fireWriteComplete(channel, writtenBytes); @@ -622,7 +626,8 @@ class NioDatagramWorker implements Runnable { boolean fireExceptionCaught = false; // Clean up the stale messages in the write buffer. - synchronized (channel.writeLock) { + channel.writeLock.lock(); + try { MessageEvent evt = channel.currentWriteEvent; ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { @@ -667,6 +672,8 @@ class NioDatagramWorker implements Runnable { fireExceptionCaught = true; } } + } finally { + channel.writeLock.unlock(); } if (fireExceptionCaught) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 034c3163c7..3a8f6c46c1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -24,7 +24,6 @@ import java.nio.channels.SocketChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.jboss.netty.buffer.ChannelBuffer; @@ -61,7 +60,7 @@ class NioSocketChannel extends AbstractChannel private volatile InetSocketAddress remoteAddress; final Object interestOpsLock = new Object(); - final Lock writeLock = new ReentrantLock(); + final ReentrantLock writeLock = new ReentrantLock(); final Runnable writeTask = new WriteTask(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 902486cb38..0dae337fb2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.jboss.netty.buffer.ChannelBuffer; @@ -378,8 +379,9 @@ class NioWorker implements Runnable { cleanUpWriteBuffer(channel); return; } - - if (!channel.writeLock.tryLock()) { + + final ReentrantLock writeLock = channel.writeLock; + if (writeLock.isHeldByCurrentThread() || !writeLock.tryLock()) { rescheduleWrite(channel); return; } @@ -460,7 +462,7 @@ class NioWorker implements Runnable { } } } finally { - channel.writeLock.unlock(); + writeLock.unlock(); } fireWriteComplete(channel, writtenBytes);