StreamingMetrics: Preparation for removing pull mode
This patch expose the check stream registration as an external static method. Signed-off-by: Amnon Heiman <amnon@scylladb.com>
This commit is contained in:
parent
5c33a8afa7
commit
0bfaba0a82
@ -55,6 +55,7 @@ public class StreamingMetrics
|
|||||||
public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null));
|
public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null));
|
||||||
public final Counter incomingBytes;
|
public final Counter incomingBytes;
|
||||||
public final Counter outgoingBytes;
|
public final Counter outgoingBytes;
|
||||||
|
private static APIClient s_c = new APIClient();
|
||||||
|
|
||||||
public static void register_mbeans() {
|
public static void register_mbeans() {
|
||||||
TimerTask taskToExecute = new CheckRegistration();
|
TimerTask taskToExecute = new CheckRegistration();
|
||||||
@ -68,34 +69,38 @@ public class StreamingMetrics
|
|||||||
outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes"));
|
outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class CheckRegistration extends TimerTask {
|
public static boolean checkRegistration() {
|
||||||
private APIClient c = new APIClient();
|
try {
|
||||||
|
JsonArray streams = s_c.getJsonArray("/stream_manager/");
|
||||||
|
Set<String> all = new HashSet<String>();
|
||||||
|
for (int i = 0; i < streams.size(); i ++) {
|
||||||
|
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
|
||||||
|
for (int j = 0; j < sessions.size(); j++) {
|
||||||
|
String name = sessions.getJsonObject(j).getString("peer");
|
||||||
|
if (!instances.containsKey(name)) {
|
||||||
|
StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
|
||||||
|
instances.put(name, metrics);
|
||||||
|
}
|
||||||
|
all.add(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//removing deleted stream
|
||||||
|
for (String n : instances.keySet()) {
|
||||||
|
if (! all.contains(n)) {
|
||||||
|
instances.remove(n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignoring exceptions, will retry on the next interval
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class CheckRegistration extends TimerTask {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
checkRegistration();
|
||||||
JsonArray streams = c.getJsonArray("/stream_manager/");
|
|
||||||
Set<String> all = new HashSet<String>();
|
|
||||||
for (int i = 0; i < streams.size(); i ++) {
|
|
||||||
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
|
|
||||||
for (int j = 0; j < sessions.size(); j++) {
|
|
||||||
String name = sessions.getJsonObject(j).getString("peer");
|
|
||||||
if (!instances.containsKey(name)) {
|
|
||||||
StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
|
|
||||||
instances.put(name, metrics);
|
|
||||||
}
|
|
||||||
all.add(name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//removing deleted stream
|
|
||||||
for (String n : instances.keySet()) {
|
|
||||||
if (! all.contains(n)) {
|
|
||||||
instances.remove(n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// ignoring exceptions, will retry on the next interval
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user