diff --git a/src/main/java/org/apache/cassandra/streaming/StreamManager.java b/src/main/java/org/apache/cassandra/streaming/StreamManager.java index 57c8285..7d45034 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamManager.java @@ -26,21 +26,23 @@ package org.apache.cassandra.streaming; import java.util.HashSet; import java.util.Set; +import java.util.logging.Logger; import javax.json.JsonArray; import javax.json.JsonObject; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; +import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; import org.apache.cassandra.streaming.management.StreamStateCompositeData; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.APIMBean; /** * StreamManager manages currently running {@link StreamResultFuture}s and @@ -49,62 +51,51 @@ import com.scylladb.jmx.api.APIClient; * All stream operation should be created through this class to track streaming * status and progress. */ -public class StreamManager implements StreamManagerMBean { - public static final StreamManager instance = new StreamManager(); - private static final java.util.logging.Logger logger = java.util.logging.Logger - .getLogger(StreamManager.class.getName()); - private APIClient c = new APIClient(); +public class StreamManager extends APIMBean implements StreamManagerMBean { + private static final Logger logger = Logger.getLogger(StreamManager.class.getName()); + + private final NotificationBroadcasterSupport notifier = new NotificationBroadcasterSupport(); + + public StreamManager(APIClient c) { + super(c); + } public Set getState() { - JsonArray arr = c.getJsonArray("/stream_manager/"); + JsonArray arr = client.getJsonArray("/stream_manager/"); Set res = new HashSet(); for (int i = 0; i < arr.size(); i++) { JsonObject obj = arr.getJsonObject(i); - res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), SessionInfo.fromJsonArr(obj.getJsonArray("sessions")))); + res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), + SessionInfo.fromJsonArr(obj.getJsonArray("sessions")))); } return res; } - public static StreamManager getInstance() { - return instance; - } + @Override public Set getCurrentStreams() { logger.finest("getCurrentStreams"); - return Sets.newHashSet(Iterables.transform(getState(), new Function() - { - public CompositeData apply(StreamState input) - { - return StreamStateCompositeData.toCompositeData(input); - } - })); + return Sets + .newHashSet(Iterables.transform(getState(), input -> StreamStateCompositeData.toCompositeData(input))); } @Override - public void removeNotificationListener(NotificationListener arg0, - NotificationFilter arg1, Object arg2) - throws ListenerNotFoundException { - // TODO Auto-generated method stub - + public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) { + notifier.addNotificationListener(listener, filter, handback); } @Override - public void addNotificationListener(NotificationListener arg0, - NotificationFilter arg1, Object arg2) - throws IllegalArgumentException { - // TODO Auto-generated method stub + public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException { + notifier.removeNotificationListener(listener); + } + @Override + public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) + throws ListenerNotFoundException { + notifier.removeNotificationListener(listener, filter, handback); } @Override public MBeanNotificationInfo[] getNotificationInfo() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void removeNotificationListener(NotificationListener arg0) - throws ListenerNotFoundException { - // TODO Auto-generated method stub - + return notifier.getNotificationInfo(); } }