2009-06-11 02:48:49 +02:00
|
|
|
/*
|
2009-06-11 07:02:39 +02:00
|
|
|
* JBoss, Home of Professional Open Source
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-06-11 07:02:39 +02:00
|
|
|
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
|
|
|
|
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
|
|
|
|
* full listing of individual contributors.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-06-11 07:02:39 +02:00
|
|
|
* This is free software; you can redistribute it and/or modify it
|
|
|
|
* under the terms of the GNU Lesser General Public License as
|
|
|
|
* published by the Free Software Foundation; either version 2.1 of
|
|
|
|
* the License, or (at your option) any later version.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-06-11 07:02:39 +02:00
|
|
|
* This software is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
* Lesser General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
|
|
* License along with this software; if not, write to the Free
|
|
|
|
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
|
|
|
|
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
package org.jboss.netty.channel.socket.nio;
|
|
|
|
|
|
|
|
import static org.jboss.netty.channel.Channels.*;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
2009-06-11 08:10:46 +02:00
|
|
|
import java.net.InetAddress;
|
2009-06-11 02:48:49 +02:00
|
|
|
import java.net.InetSocketAddress;
|
2009-06-11 08:10:46 +02:00
|
|
|
import java.net.NetworkInterface;
|
2009-06-11 02:48:49 +02:00
|
|
|
import java.net.SocketAddress;
|
|
|
|
import java.nio.channels.DatagramChannel;
|
|
|
|
import java.util.Queue;
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
|
import org.jboss.netty.channel.AbstractChannel;
|
2009-06-11 08:10:46 +02:00
|
|
|
import org.jboss.netty.channel.Channel;
|
2009-06-11 02:48:49 +02:00
|
|
|
import org.jboss.netty.channel.ChannelException;
|
|
|
|
import org.jboss.netty.channel.ChannelFactory;
|
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
|
import org.jboss.netty.channel.ChannelPipeline;
|
|
|
|
import org.jboss.netty.channel.ChannelSink;
|
|
|
|
import org.jboss.netty.channel.MessageEvent;
|
|
|
|
import org.jboss.netty.channel.socket.DatagramChannelConfig;
|
|
|
|
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
|
|
|
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
|
|
|
|
|
|
|
/**
|
2009-06-18 09:33:37 +02:00
|
|
|
* Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-06-11 06:33:07 +02:00
|
|
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
2009-06-11 07:06:14 +02:00
|
|
|
* @author Trustin Lee (tlee@redhat.com)
|
2009-06-11 06:33:07 +02:00
|
|
|
* @author Daniel Bevenius (dbevenius@jboss.com)
|
2009-06-11 07:06:14 +02:00
|
|
|
*
|
2009-06-11 06:33:07 +02:00
|
|
|
* @version $Rev$, $Date$
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
2009-06-11 08:10:46 +02:00
|
|
|
class NioDatagramChannel extends AbstractChannel
|
|
|
|
implements org.jboss.netty.channel.socket.DatagramChannel {
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* The {@link DatagramChannelConfig}.
|
|
|
|
*/
|
|
|
|
private final NioDatagramChannelConfig config;
|
|
|
|
|
|
|
|
/**
|
2009-06-12 04:47:57 +02:00
|
|
|
* The {@link NioDatagramWorker} for this NioDatagramChannnel.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
2009-06-12 04:47:57 +02:00
|
|
|
final NioDatagramWorker worker;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* The {@link DatagramChannel} that this channel uses.
|
|
|
|
*/
|
2009-06-11 08:10:46 +02:00
|
|
|
private final java.nio.channels.DatagramChannel datagramChannel;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Monitor object to synchronize access to InterestedOps.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Object interestOpsLock = new Object();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Monitor object for synchronizing access to the {@link WriteBufferQueue}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Object writeLock = new Object();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* WriteTask that performs write operations.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Runnable writeTask = new WriteTask();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Indicates if there is a {@link WriteTask} in the task queue.
|
|
|
|
*/
|
|
|
|
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Queue of write {@link MessageEvent}s.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Queue<MessageEvent> writeBufferQueue = new WriteBufferQueue();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Keeps track of the number of bytes that the {@link WriteBufferQueue} currently
|
|
|
|
* contains.
|
|
|
|
*/
|
|
|
|
final AtomicInteger writeBufferSize = new AtomicInteger();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Keeps track of the highWaterMark.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* The current write {@link MessageEvent}
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
MessageEvent currentWriteEvent;
|
|
|
|
|
|
|
|
/**
|
2009-06-11 07:02:39 +02:00
|
|
|
* Boolean that indicates that write operation is in progress.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
volatile boolean inWriteNowLoop;
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
NioDatagramChannel(final ChannelFactory factory,
|
2009-06-11 02:48:49 +02:00
|
|
|
final ChannelPipeline pipeline, final ChannelSink sink,
|
2009-06-12 04:47:57 +02:00
|
|
|
final NioDatagramWorker worker) {
|
2009-06-11 02:48:49 +02:00
|
|
|
super(null, factory, pipeline, sink);
|
|
|
|
this.worker = worker;
|
|
|
|
datagramChannel = openNonBlockingChannel();
|
|
|
|
config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
|
|
|
|
|
|
|
|
fireChannelOpen(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
private DatagramChannel openNonBlockingChannel() {
|
|
|
|
try {
|
|
|
|
final DatagramChannel channel = DatagramChannel.open();
|
|
|
|
channel.configureBlocking(false);
|
|
|
|
return channel;
|
|
|
|
} catch (final IOException e) {
|
|
|
|
throw new ChannelException("Failed to open a DatagramChannel.", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
public InetSocketAddress getLocalAddress() {
|
2009-06-11 02:48:49 +02:00
|
|
|
try {
|
2009-06-11 08:10:46 +02:00
|
|
|
return (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress();
|
|
|
|
} catch (Throwable t) {
|
|
|
|
// Sometimes fails on a closed socket in Windows.
|
|
|
|
return null;
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
public InetSocketAddress getRemoteAddress() {
|
|
|
|
try {
|
|
|
|
return (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress();
|
|
|
|
} catch (Throwable t) {
|
|
|
|
// Sometimes fails on a closed socket in Windows.
|
|
|
|
return null;
|
|
|
|
}
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public boolean isBound() {
|
|
|
|
return isOpen() && datagramChannel.socket().isBound();
|
|
|
|
}
|
|
|
|
|
|
|
|
public boolean isConnected() {
|
2009-06-11 08:10:46 +02:00
|
|
|
return datagramChannel.socket().isConnected();
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected boolean setClosed() {
|
|
|
|
return super.setClosed();
|
|
|
|
}
|
|
|
|
|
|
|
|
public NioDatagramChannelConfig getConfig() {
|
|
|
|
return config;
|
|
|
|
}
|
|
|
|
|
|
|
|
DatagramChannel getDatagramChannel() {
|
|
|
|
return datagramChannel;
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
@Override
|
|
|
|
public int getInterestOps() {
|
|
|
|
if (!isOpen()) {
|
|
|
|
return Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
|
|
|
|
int interestOps = getRawInterestOps();
|
|
|
|
int writeBufferSize = this.writeBufferSize.get();
|
|
|
|
if (writeBufferSize != 0) {
|
|
|
|
if (highWaterMarkCounter.get() > 0) {
|
|
|
|
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
|
|
|
if (writeBufferSize >= lowWaterMark) {
|
|
|
|
interestOps |= Channel.OP_WRITE;
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
|
|
|
if (writeBufferSize >= highWaterMark) {
|
|
|
|
interestOps |= Channel.OP_WRITE;
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
|
|
|
|
return interestOps;
|
|
|
|
}
|
|
|
|
|
2009-06-11 02:48:49 +02:00
|
|
|
int getRawInterestOps() {
|
|
|
|
return super.getInterestOps();
|
|
|
|
}
|
|
|
|
|
|
|
|
void setRawInterestOpsNow(int interestOps) {
|
|
|
|
super.setInterestOpsNow(interestOps);
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
|
|
|
|
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
|
|
|
|
return super.write(message, null);
|
|
|
|
} else {
|
|
|
|
return super.write(message, remoteAddress);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-06-11 02:48:49 +02:00
|
|
|
/**
|
|
|
|
* WriteBuffer is an extension of {@link LinkedTransferQueue} that adds
|
|
|
|
* support for highWaterMark checking of the write buffer size.
|
|
|
|
*/
|
|
|
|
private final class WriteBufferQueue extends
|
|
|
|
LinkedTransferQueue<MessageEvent> {
|
|
|
|
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
|
|
|
|
|
|
|
WriteBufferQueue() {
|
|
|
|
super();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This method first delegates to {@link LinkedTransferQueue#offer(Object)} and
|
|
|
|
* adds support for keeping track of the size of the this write buffer.
|
|
|
|
*/
|
|
|
|
@Override
|
2009-06-11 08:10:46 +02:00
|
|
|
public boolean offer(MessageEvent e) {
|
|
|
|
boolean success = super.offer(e);
|
2009-06-11 02:48:49 +02:00
|
|
|
assert success;
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
|
|
|
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
|
|
|
|
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
if (newWriteBufferSize >= highWaterMark) {
|
|
|
|
if (newWriteBufferSize - messageSize < highWaterMark) {
|
|
|
|
highWaterMarkCounter.incrementAndGet();
|
|
|
|
if (!notifying.get()) {
|
|
|
|
notifying.set(Boolean.TRUE);
|
|
|
|
fireChannelInterestChanged(NioDatagramChannel.this);
|
|
|
|
notifying.set(Boolean.FALSE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This method first delegates to {@link LinkedTransferQueue#poll()} and
|
|
|
|
* adds support for keeping track of the size of the this writebuffers queue.
|
|
|
|
*/
|
|
|
|
@Override
|
|
|
|
public MessageEvent poll() {
|
2009-06-11 08:10:46 +02:00
|
|
|
MessageEvent e = super.poll();
|
2009-06-11 02:48:49 +02:00
|
|
|
if (e != null) {
|
2009-06-11 08:10:46 +02:00
|
|
|
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
|
|
|
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
|
|
|
|
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
2009-06-11 02:48:49 +02:00
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
|
2009-06-11 02:48:49 +02:00
|
|
|
if (newWriteBufferSize + messageSize >= lowWaterMark) {
|
|
|
|
highWaterMarkCounter.decrementAndGet();
|
|
|
|
if (!notifying.get()) {
|
|
|
|
notifying.set(Boolean.TRUE);
|
|
|
|
fireChannelInterestChanged(NioDatagramChannel.this);
|
|
|
|
notifying.set(Boolean.FALSE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return e;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2009-06-12 04:47:57 +02:00
|
|
|
* WriteTask is a simple runnable performs writes by delegating the {@link NioDatagramWorker}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
|
|
|
*/
|
|
|
|
private final class WriteTask implements Runnable {
|
|
|
|
WriteTask() {
|
|
|
|
super();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void run() {
|
|
|
|
writeTaskInTaskQueue.set(false);
|
2009-06-12 04:47:57 +02:00
|
|
|
NioDatagramWorker.write(NioDatagramChannel.this, false);
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
}
|
2009-06-11 08:10:46 +02:00
|
|
|
|
|
|
|
public void joinGroup(InetAddress multicastAddress) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void joinGroup(InetSocketAddress multicastAddress,
|
|
|
|
NetworkInterface networkInterface) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void leaveGroup(InetAddress multicastAddress) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void leaveGroup(InetSocketAddress multicastAddress,
|
|
|
|
NetworkInterface networkInterface) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|