Rework StreamManager

This commit is contained in:
elcallio 2016-10-11 14:07:25 +02:00 committed by Calle Wilund
parent 4ed049739a
commit f4f3c44dc1

View File

@ -26,21 +26,23 @@ package org.apache.cassandra.streaming;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.logging.Logger;
import javax.json.JsonArray; import javax.json.JsonArray;
import javax.json.JsonObject; import javax.json.JsonObject;
import javax.management.ListenerNotFoundException; import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo; import javax.management.MBeanNotificationInfo;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter; import javax.management.NotificationFilter;
import javax.management.NotificationListener; import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import org.apache.cassandra.streaming.management.StreamStateCompositeData; import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.APIMBean;
/** /**
* StreamManager manages currently running {@link StreamResultFuture}s and * StreamManager manages currently running {@link StreamResultFuture}s and
@ -49,62 +51,51 @@ import com.scylladb.jmx.api.APIClient;
* All stream operation should be created through this class to track streaming * All stream operation should be created through this class to track streaming
* status and progress. * status and progress.
*/ */
public class StreamManager implements StreamManagerMBean { public class StreamManager extends APIMBean implements StreamManagerMBean {
public static final StreamManager instance = new StreamManager(); private static final Logger logger = Logger.getLogger(StreamManager.class.getName());
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(StreamManager.class.getName()); private final NotificationBroadcasterSupport notifier = new NotificationBroadcasterSupport();
private APIClient c = new APIClient();
public StreamManager(APIClient c) {
super(c);
}
public Set<StreamState> getState() { public Set<StreamState> getState() {
JsonArray arr = c.getJsonArray("/stream_manager/"); JsonArray arr = client.getJsonArray("/stream_manager/");
Set<StreamState> res = new HashSet<StreamState>(); Set<StreamState> res = new HashSet<StreamState>();
for (int i = 0; i < arr.size(); i++) { for (int i = 0; i < arr.size(); i++) {
JsonObject obj = arr.getJsonObject(i); JsonObject obj = arr.getJsonObject(i);
res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), SessionInfo.fromJsonArr(obj.getJsonArray("sessions")))); res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"),
SessionInfo.fromJsonArr(obj.getJsonArray("sessions"))));
} }
return res; return res;
} }
public static StreamManager getInstance() { @Override
return instance;
}
public Set<CompositeData> getCurrentStreams() { public Set<CompositeData> getCurrentStreams() {
logger.finest("getCurrentStreams"); logger.finest("getCurrentStreams");
return Sets.newHashSet(Iterables.transform(getState(), new Function<StreamState, CompositeData>() return Sets
{ .newHashSet(Iterables.transform(getState(), input -> StreamStateCompositeData.toCompositeData(input)));
public CompositeData apply(StreamState input)
{
return StreamStateCompositeData.toCompositeData(input);
}
}));
} }
@Override @Override
public void removeNotificationListener(NotificationListener arg0, public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) {
NotificationFilter arg1, Object arg2) notifier.addNotificationListener(listener, filter, handback);
throws ListenerNotFoundException {
// TODO Auto-generated method stub
} }
@Override @Override
public void addNotificationListener(NotificationListener arg0, public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException {
NotificationFilter arg1, Object arg2) notifier.removeNotificationListener(listener);
throws IllegalArgumentException { }
// TODO Auto-generated method stub
@Override
public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
throws ListenerNotFoundException {
notifier.removeNotificationListener(listener, filter, handback);
} }
@Override @Override
public MBeanNotificationInfo[] getNotificationInfo() { public MBeanNotificationInfo[] getNotificationInfo() {
// TODO Auto-generated method stub return notifier.getNotificationInfo();
return null;
}
@Override
public void removeNotificationListener(NotificationListener arg0)
throws ListenerNotFoundException {
// TODO Auto-generated method stub
} }
} }