2009-06-11 02:48:49 +02:00
|
|
|
/*
|
2009-08-28 09:15:49 +02:00
|
|
|
* Copyright 2009 Red Hat, Inc.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-08-28 09:15:49 +02:00
|
|
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
|
|
|
* (the "License"); you may not use this file except in compliance with the
|
|
|
|
* License. You may obtain a copy of the License at:
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-08-28 09:15:49 +02:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2009-08-28 09:15:49 +02:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
package org.jboss.netty.channel.socket.nio;
|
|
|
|
|
2010-02-01 07:21:49 +01:00
|
|
|
import static org.jboss.netty.channel.Channels.*;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
import java.io.IOException;
|
2009-06-11 08:10:46 +02:00
|
|
|
import java.net.InetAddress;
|
2009-06-11 02:48:49 +02:00
|
|
|
import java.net.InetSocketAddress;
|
2009-06-11 08:10:46 +02:00
|
|
|
import java.net.NetworkInterface;
|
2009-06-11 02:48:49 +02:00
|
|
|
import java.net.SocketAddress;
|
2010-01-27 06:07:32 +01:00
|
|
|
import java.nio.ByteBuffer;
|
2009-06-11 02:48:49 +02:00
|
|
|
import java.nio.channels.DatagramChannel;
|
|
|
|
import java.util.Queue;
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
|
import org.jboss.netty.channel.AbstractChannel;
|
2009-06-11 08:10:46 +02:00
|
|
|
import org.jboss.netty.channel.Channel;
|
2009-06-11 02:48:49 +02:00
|
|
|
import org.jboss.netty.channel.ChannelException;
|
|
|
|
import org.jboss.netty.channel.ChannelFactory;
|
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
|
import org.jboss.netty.channel.ChannelPipeline;
|
|
|
|
import org.jboss.netty.channel.ChannelSink;
|
|
|
|
import org.jboss.netty.channel.MessageEvent;
|
|
|
|
import org.jboss.netty.channel.socket.DatagramChannelConfig;
|
|
|
|
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
|
|
|
import org.jboss.netty.util.internal.ThreadLocalBoolean;
|
|
|
|
|
|
|
|
/**
|
2009-06-18 09:33:37 +02:00
|
|
|
* Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
2010-01-26 10:04:19 +01:00
|
|
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
|
|
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
2009-06-11 06:33:07 +02:00
|
|
|
* @author Daniel Bevenius (dbevenius@jboss.com)
|
2009-06-11 07:06:14 +02:00
|
|
|
*
|
2009-06-11 06:33:07 +02:00
|
|
|
* @version $Rev$, $Date$
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
2009-06-11 08:10:46 +02:00
|
|
|
class NioDatagramChannel extends AbstractChannel
|
|
|
|
implements org.jboss.netty.channel.socket.DatagramChannel {
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* The {@link DatagramChannelConfig}.
|
|
|
|
*/
|
|
|
|
private final NioDatagramChannelConfig config;
|
|
|
|
|
|
|
|
/**
|
2009-06-12 04:47:57 +02:00
|
|
|
* The {@link NioDatagramWorker} for this NioDatagramChannnel.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
2009-06-12 04:47:57 +02:00
|
|
|
final NioDatagramWorker worker;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* The {@link DatagramChannel} that this channel uses.
|
|
|
|
*/
|
2009-06-11 08:10:46 +02:00
|
|
|
private final java.nio.channels.DatagramChannel datagramChannel;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Monitor object to synchronize access to InterestedOps.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Object interestOpsLock = new Object();
|
|
|
|
|
|
|
|
/**
|
2010-02-03 11:06:46 +01:00
|
|
|
* Monitor object for synchronizing access to the {@link WriteBufferQueue}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
2010-02-03 11:06:46 +01:00
|
|
|
final Object writeLock = new Object();
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* WriteTask that performs write operations.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Runnable writeTask = new WriteTask();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Indicates if there is a {@link WriteTask} in the task queue.
|
|
|
|
*/
|
|
|
|
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Queue of write {@link MessageEvent}s.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final Queue<MessageEvent> writeBufferQueue = new WriteBufferQueue();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Keeps track of the number of bytes that the {@link WriteBufferQueue} currently
|
|
|
|
* contains.
|
|
|
|
*/
|
|
|
|
final AtomicInteger writeBufferSize = new AtomicInteger();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* Keeps track of the highWaterMark.
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
|
|
|
|
|
|
|
|
/**
|
2009-06-11 06:33:07 +02:00
|
|
|
* The current write {@link MessageEvent}
|
2009-06-11 02:48:49 +02:00
|
|
|
*/
|
|
|
|
MessageEvent currentWriteEvent;
|
2010-01-27 06:07:32 +01:00
|
|
|
ByteBuffer currentWriteBuffer;
|
|
|
|
boolean currentWriteBufferIsPooled;
|
2009-06-11 02:48:49 +02:00
|
|
|
|
2010-02-03 11:06:46 +01:00
|
|
|
/**
|
|
|
|
* Boolean that indicates that write operation is in progress.
|
|
|
|
*/
|
|
|
|
volatile boolean inWriteNowLoop;
|
|
|
|
|
2009-06-23 09:52:12 +02:00
|
|
|
private volatile InetSocketAddress localAddress;
|
2009-06-30 11:17:50 +02:00
|
|
|
volatile InetSocketAddress remoteAddress;
|
2009-06-23 09:52:12 +02:00
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
NioDatagramChannel(final ChannelFactory factory,
|
2009-06-11 02:48:49 +02:00
|
|
|
final ChannelPipeline pipeline, final ChannelSink sink,
|
2009-06-12 04:47:57 +02:00
|
|
|
final NioDatagramWorker worker) {
|
2009-06-11 02:48:49 +02:00
|
|
|
super(null, factory, pipeline, sink);
|
|
|
|
this.worker = worker;
|
|
|
|
datagramChannel = openNonBlockingChannel();
|
|
|
|
config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
|
|
|
|
|
|
|
|
fireChannelOpen(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
private DatagramChannel openNonBlockingChannel() {
|
|
|
|
try {
|
|
|
|
final DatagramChannel channel = DatagramChannel.open();
|
|
|
|
channel.configureBlocking(false);
|
|
|
|
return channel;
|
|
|
|
} catch (final IOException e) {
|
|
|
|
throw new ChannelException("Failed to open a DatagramChannel.", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
public InetSocketAddress getLocalAddress() {
|
2009-06-23 09:52:12 +02:00
|
|
|
InetSocketAddress localAddress = this.localAddress;
|
|
|
|
if (localAddress == null) {
|
|
|
|
try {
|
|
|
|
this.localAddress = localAddress =
|
|
|
|
(InetSocketAddress) datagramChannel.socket().getLocalSocketAddress();
|
|
|
|
} catch (Throwable t) {
|
|
|
|
// Sometimes fails on a closed socket in Windows.
|
|
|
|
return null;
|
|
|
|
}
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
2009-06-23 09:52:12 +02:00
|
|
|
return localAddress;
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
public InetSocketAddress getRemoteAddress() {
|
2009-06-30 11:17:50 +02:00
|
|
|
InetSocketAddress remoteAddress = this.remoteAddress;
|
|
|
|
if (remoteAddress == null) {
|
|
|
|
try {
|
|
|
|
this.remoteAddress = remoteAddress =
|
|
|
|
(InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress();
|
|
|
|
} catch (Throwable t) {
|
|
|
|
// Sometimes fails on a closed socket in Windows.
|
|
|
|
return null;
|
|
|
|
}
|
2009-06-11 08:10:46 +02:00
|
|
|
}
|
2009-06-30 11:17:50 +02:00
|
|
|
return remoteAddress;
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public boolean isBound() {
|
|
|
|
return isOpen() && datagramChannel.socket().isBound();
|
|
|
|
}
|
|
|
|
|
|
|
|
public boolean isConnected() {
|
2009-09-30 09:00:07 +02:00
|
|
|
return datagramChannel.isConnected();
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected boolean setClosed() {
|
|
|
|
return super.setClosed();
|
|
|
|
}
|
|
|
|
|
|
|
|
public NioDatagramChannelConfig getConfig() {
|
|
|
|
return config;
|
|
|
|
}
|
|
|
|
|
|
|
|
DatagramChannel getDatagramChannel() {
|
|
|
|
return datagramChannel;
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
@Override
|
|
|
|
public int getInterestOps() {
|
|
|
|
if (!isOpen()) {
|
|
|
|
return Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
|
|
|
|
int interestOps = getRawInterestOps();
|
|
|
|
int writeBufferSize = this.writeBufferSize.get();
|
|
|
|
if (writeBufferSize != 0) {
|
|
|
|
if (highWaterMarkCounter.get() > 0) {
|
|
|
|
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
|
|
|
if (writeBufferSize >= lowWaterMark) {
|
|
|
|
interestOps |= Channel.OP_WRITE;
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
|
|
|
if (writeBufferSize >= highWaterMark) {
|
|
|
|
interestOps |= Channel.OP_WRITE;
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
interestOps &= ~Channel.OP_WRITE;
|
|
|
|
}
|
|
|
|
|
|
|
|
return interestOps;
|
|
|
|
}
|
|
|
|
|
2009-06-11 02:48:49 +02:00
|
|
|
int getRawInterestOps() {
|
|
|
|
return super.getInterestOps();
|
|
|
|
}
|
|
|
|
|
|
|
|
void setRawInterestOpsNow(int interestOps) {
|
|
|
|
super.setInterestOpsNow(interestOps);
|
|
|
|
}
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
|
|
|
|
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
|
|
|
|
return super.write(message, null);
|
|
|
|
} else {
|
|
|
|
return super.write(message, remoteAddress);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-06-11 02:48:49 +02:00
|
|
|
/**
|
|
|
|
* WriteBuffer is an extension of {@link LinkedTransferQueue} that adds
|
|
|
|
* support for highWaterMark checking of the write buffer size.
|
|
|
|
*/
|
|
|
|
private final class WriteBufferQueue extends
|
|
|
|
LinkedTransferQueue<MessageEvent> {
|
2009-12-24 04:10:34 +01:00
|
|
|
|
|
|
|
private static final long serialVersionUID = 5057413071460766376L;
|
|
|
|
|
2009-06-11 02:48:49 +02:00
|
|
|
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
|
|
|
|
|
|
|
|
WriteBufferQueue() {
|
|
|
|
super();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This method first delegates to {@link LinkedTransferQueue#offer(Object)} and
|
|
|
|
* adds support for keeping track of the size of the this write buffer.
|
|
|
|
*/
|
|
|
|
@Override
|
2009-06-11 08:10:46 +02:00
|
|
|
public boolean offer(MessageEvent e) {
|
|
|
|
boolean success = super.offer(e);
|
2009-06-11 02:48:49 +02:00
|
|
|
assert success;
|
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
|
|
|
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
|
|
|
|
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
|
2009-06-11 02:48:49 +02:00
|
|
|
|
|
|
|
if (newWriteBufferSize >= highWaterMark) {
|
|
|
|
if (newWriteBufferSize - messageSize < highWaterMark) {
|
|
|
|
highWaterMarkCounter.incrementAndGet();
|
|
|
|
if (!notifying.get()) {
|
|
|
|
notifying.set(Boolean.TRUE);
|
|
|
|
fireChannelInterestChanged(NioDatagramChannel.this);
|
|
|
|
notifying.set(Boolean.FALSE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This method first delegates to {@link LinkedTransferQueue#poll()} and
|
|
|
|
* adds support for keeping track of the size of the this writebuffers queue.
|
|
|
|
*/
|
|
|
|
@Override
|
|
|
|
public MessageEvent poll() {
|
2009-06-11 08:10:46 +02:00
|
|
|
MessageEvent e = super.poll();
|
2009-06-11 02:48:49 +02:00
|
|
|
if (e != null) {
|
2009-06-11 08:10:46 +02:00
|
|
|
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
|
|
|
|
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
|
|
|
|
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
|
2009-06-11 02:48:49 +02:00
|
|
|
|
2009-06-11 08:10:46 +02:00
|
|
|
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
|
2009-06-11 02:48:49 +02:00
|
|
|
if (newWriteBufferSize + messageSize >= lowWaterMark) {
|
|
|
|
highWaterMarkCounter.decrementAndGet();
|
|
|
|
if (!notifying.get()) {
|
|
|
|
notifying.set(Boolean.TRUE);
|
|
|
|
fireChannelInterestChanged(NioDatagramChannel.this);
|
|
|
|
notifying.set(Boolean.FALSE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return e;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2009-06-12 04:47:57 +02:00
|
|
|
* WriteTask is a simple runnable performs writes by delegating the {@link NioDatagramWorker}.
|
2009-06-11 02:48:49 +02:00
|
|
|
*
|
|
|
|
*/
|
|
|
|
private final class WriteTask implements Runnable {
|
|
|
|
WriteTask() {
|
|
|
|
super();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void run() {
|
|
|
|
writeTaskInTaskQueue.set(false);
|
2010-02-03 11:06:46 +01:00
|
|
|
worker.write(NioDatagramChannel.this, false);
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|
|
|
|
}
|
2009-06-11 08:10:46 +02:00
|
|
|
|
|
|
|
public void joinGroup(InetAddress multicastAddress) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void joinGroup(InetSocketAddress multicastAddress,
|
|
|
|
NetworkInterface networkInterface) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void leaveGroup(InetAddress multicastAddress) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void leaveGroup(InetSocketAddress multicastAddress,
|
|
|
|
NetworkInterface networkInterface) {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
2009-06-11 02:48:49 +02:00
|
|
|
}
|