From 36c4a7df276a4bcc0d35183abaf2704a45bddf80 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 31 Dec 2015 11:35:48 +0200 Subject: [PATCH 1/5] SessionInfo: allow null and modified API The API of the session info returns parameters in snake case instead of camel case. This patch chagne the expected field to match the API. It was also modified to accept empty fields and store them as null. Signed-off-by: Amnon Heiman --- .../org/apache/cassandra/streaming/SessionInfo.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java index 597b26e..4dbb415 100644 --- a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -76,8 +76,8 @@ public final class SessionInfo implements Serializable this.peer = peer; this.sessionIndex = sessionIndex; this.connecting = connecting; - this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); - this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); + this.receivingSummaries = receivingSummaries; + this.sendingSummaries = sendingSummaries; this.receivingFiles = new ConcurrentHashMap<>(); this.sendingFiles = new ConcurrentHashMap<>(); this.state = state; @@ -94,10 +94,10 @@ public final class SessionInfo implements Serializable } public static SessionInfo fromJsonObject(JsonObject obj) { - return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"), + return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"), - StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")), - StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")), + StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")), + StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), obj.getString("state")); } @@ -222,6 +222,9 @@ public final class SessionInfo implements Serializable private long getTotalSizes(Collection summaries) { + if (summaries == null) { + return 0; + } long total = 0; for (StreamSummary summary : summaries) total += summary.totalSize; From 2840880e951e8269488f3ab5c50fd8fe9dd8ce11 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 31 Dec 2015 12:16:46 +0200 Subject: [PATCH 2/5] SessionInfoCompositeData: to support null values This patch allows the SessionInfoCompositeData to accept null values. Signed-off-by: Amnon Heiman --- .../streaming/management/SessionInfoCompositeData.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index eb1414f..8f41992 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -33,6 +33,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; @@ -175,6 +176,9 @@ public class SessionInfoCompositeData private static CompositeData[] toArrayOfCompositeData(Collection toConvert, Function func) { + if (toConvert == null) { + toConvert = Sets.newHashSet(); + } CompositeData[] composites = new CompositeData[toConvert.size()]; return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites); } From cda744831491ff1a898515135dbb3a3c33f12a16 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 31 Dec 2015 12:19:11 +0200 Subject: [PATCH 3/5] StreamSummary: Accept null values This patch allows the StreamSummary to support missing values that return from the API. Signed-off-by: Amnon Heiman --- .../java/org/apache/cassandra/streaming/StreamSummary.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java index b70a815..dbda53e 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java @@ -62,6 +62,9 @@ public class StreamSummary } public static Collection fromJsonArr(JsonArray arr) { + if (arr == null) { + return null; + } Collection res = new ArrayList(); for (int i = 0; i < arr.size(); i++) { res.add(fromJsonObject(arr.getJsonObject(i))); From 686207b59a5ed3624b99968330cf6a3b18fc85a2 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 31 Dec 2015 12:39:47 +0200 Subject: [PATCH 4/5] Import the StreamingMetrics from origin This patch import and modify the StreamingMetrics from orgin. It will pull periodically the API to check for the current stream and when it will find any, it will register their MBean. After this patch during streaming (ie. node is adding to the cluster) it will be possible to check with jconsole and see the stream. A nodetool netstats example: $ nodetool netstats Mode: NORMAL Bootstrap 331955a0-aeff-11e5-895c-000000000000 /127.0.0.2 Sending 1 files, 140724545317112 bytes total. Already sent 0 files, 0 bytes total Read Repair Statistics: Attempted: 6 Mismatch (Blocking): 0 Mismatch (Background): 0 Pool Name Active Pending Completed Commands n/a 0 85 Responses n/a 0 46 Signed-off-by: Amnon Heiman --- .../cassandra/metrics/StreamingMetrics.java | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java new file mode 100644 index 0000000..e70421b --- /dev/null +++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ +package org.apache.cassandra.metrics; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import javax.json.JsonArray; + +import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.APIMetrics; +import com.scylladb.jmx.metrics.DefaultNameFactory; +import com.scylladb.jmx.metrics.MetricNameFactory; +import com.yammer.metrics.core.Counter; + +/** + * Metrics for streaming. + */ +public class StreamingMetrics +{ + public static final String TYPE_NAME = "Streaming"; + private static final Map instances = new HashMap(); + static final int INTERVAL = 1000; //update every 1second + + private static Timer timer = new Timer("Streaming Metrics"); + + public static final Counter activeStreamsOutbound = APIMetrics.newCounter("/stream_manager/metrics/outbound", DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null)); + public static final Counter totalIncomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null)); + public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null)); + public final Counter incomingBytes; + public final Counter outgoingBytes; + + public static void register_mbeans() { + TimerTask taskToExecute = new CheckRegistration(); + timer.scheduleAtFixedRate(taskToExecute, 100, INTERVAL); + } + + public StreamingMetrics(final InetAddress peer) + { + MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replaceAll(":", ".")); + incomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming/" + peer,factory.createMetricName("IncomingBytes")); + outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes")); + } + + private static final class CheckRegistration extends TimerTask { + private APIClient c = new APIClient(); + + @Override + public void run() { + try { + JsonArray streams = c.getJsonArray("/stream_manager/"); + Set all = new HashSet(); + for (int i = 0; i < streams.size(); i ++) { + JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); + for (int j = 0; j < sessions.size(); j++) { + String name = sessions.getJsonObject(j).getString("peer"); + System.out.println("adding stream" + name); + if (!instances.containsKey(name)) { + StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name)); + instances.put(name, metrics); + } + all.add(name); + } + } + //removing deleted stream + for (String n : instances.keySet()) { + if (! all.contains(n)) { + instances.remove(n); + } + } + } catch (Exception e) { + // ignoring exceptions, will retry on the next interval + } + } + } +} From 2ccb657fca7e8aafd026b0e9af8943859e35a768 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Sun, 5 Jul 2015 11:55:53 +0300 Subject: [PATCH 5/5] Main: start the stream metrics pulling This patch adds a call to main to start the stream metrics pulling. --- src/main/java/com/scylladb/jmx/main/Main.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/scylladb/jmx/main/Main.java b/src/main/java/com/scylladb/jmx/main/Main.java index ba4dac3..4661874 100644 --- a/src/main/java/com/scylladb/jmx/main/Main.java +++ b/src/main/java/com/scylladb/jmx/main/Main.java @@ -11,6 +11,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.EndpointSnitchInfo; +import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.GCInspector; @@ -34,6 +35,7 @@ public class Main { CacheService.getInstance(); CompactionManager.getInstance(); GCInspector.register(); + StreamingMetrics.register_mbeans(); Thread.sleep(Long.MAX_VALUE); }