diff --git a/src/main/java/com/scylladb/jmx/main/Main.java b/src/main/java/com/scylladb/jmx/main/Main.java index ba4dac3..4661874 100644 --- a/src/main/java/com/scylladb/jmx/main/Main.java +++ b/src/main/java/com/scylladb/jmx/main/Main.java @@ -11,6 +11,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.EndpointSnitchInfo; +import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.GCInspector; @@ -34,6 +35,7 @@ public class Main { CacheService.getInstance(); CompactionManager.getInstance(); GCInspector.register(); + StreamingMetrics.register_mbeans(); Thread.sleep(Long.MAX_VALUE); } diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java new file mode 100644 index 0000000..e70421b --- /dev/null +++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ +package org.apache.cassandra.metrics; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import javax.json.JsonArray; + +import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.APIMetrics; +import com.scylladb.jmx.metrics.DefaultNameFactory; +import com.scylladb.jmx.metrics.MetricNameFactory; +import com.yammer.metrics.core.Counter; + +/** + * Metrics for streaming. + */ +public class StreamingMetrics +{ + public static final String TYPE_NAME = "Streaming"; + private static final Map instances = new HashMap(); + static final int INTERVAL = 1000; //update every 1second + + private static Timer timer = new Timer("Streaming Metrics"); + + public static final Counter activeStreamsOutbound = APIMetrics.newCounter("/stream_manager/metrics/outbound", DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null)); + public static final Counter totalIncomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null)); + public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null)); + public final Counter incomingBytes; + public final Counter outgoingBytes; + + public static void register_mbeans() { + TimerTask taskToExecute = new CheckRegistration(); + timer.scheduleAtFixedRate(taskToExecute, 100, INTERVAL); + } + + public StreamingMetrics(final InetAddress peer) + { + MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replaceAll(":", ".")); + incomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming/" + peer,factory.createMetricName("IncomingBytes")); + outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes")); + } + + private static final class CheckRegistration extends TimerTask { + private APIClient c = new APIClient(); + + @Override + public void run() { + try { + JsonArray streams = c.getJsonArray("/stream_manager/"); + Set all = new HashSet(); + for (int i = 0; i < streams.size(); i ++) { + JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); + for (int j = 0; j < sessions.size(); j++) { + String name = sessions.getJsonObject(j).getString("peer"); + System.out.println("adding stream" + name); + if (!instances.containsKey(name)) { + StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name)); + instances.put(name, metrics); + } + all.add(name); + } + } + //removing deleted stream + for (String n : instances.keySet()) { + if (! all.contains(n)) { + instances.remove(n); + } + } + } catch (Exception e) { + // ignoring exceptions, will retry on the next interval + } + } + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java index 597b26e..4dbb415 100644 --- a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -76,8 +76,8 @@ public final class SessionInfo implements Serializable this.peer = peer; this.sessionIndex = sessionIndex; this.connecting = connecting; - this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); - this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); + this.receivingSummaries = receivingSummaries; + this.sendingSummaries = sendingSummaries; this.receivingFiles = new ConcurrentHashMap<>(); this.sendingFiles = new ConcurrentHashMap<>(); this.state = state; @@ -94,10 +94,10 @@ public final class SessionInfo implements Serializable } public static SessionInfo fromJsonObject(JsonObject obj) { - return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"), + return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"), - StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")), - StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")), + StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")), + StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), obj.getString("state")); } @@ -222,6 +222,9 @@ public final class SessionInfo implements Serializable private long getTotalSizes(Collection summaries) { + if (summaries == null) { + return 0; + } long total = 0; for (StreamSummary summary : summaries) total += summary.totalSize; diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java index b70a815..dbda53e 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java @@ -62,6 +62,9 @@ public class StreamSummary } public static Collection fromJsonArr(JsonArray arr) { + if (arr == null) { + return null; + } Collection res = new ArrayList(); for (int i = 0; i < arr.size(); i++) { res.add(fromJsonObject(arr.getJsonObject(i))); diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index eb1414f..8f41992 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -33,6 +33,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; @@ -175,6 +176,9 @@ public class SessionInfoCompositeData private static CompositeData[] toArrayOfCompositeData(Collection toConvert, Function func) { + if (toConvert == null) { + toConvert = Sets.newHashSet(); + } CompositeData[] composites = new CompositeData[toConvert.size()]; return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites); }