Merge "Adding streaming metrics support" from Amnon
"This series will enable straming support and the nodetool netstats command. After this series: $ nodetool netstats Mode: NORMAL Bootstrap 331955a0-aeff-11e5-895c-000000000000 /127.0.0.2 Sending 1 files, 140724545317112 bytes total. Already sent 0 files, 0 bytes total Read Repair Statistics: Attempted: 6 Mismatch (Blocking): 0 Mismatch (Background): 0 Pool Name Active Pending Completed Commands n/a 0 121 Responses n/a 0 64 Fixes scylladb/scylla #731"
This commit is contained in:
commit
10caab8590
@ -11,6 +11,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
|
||||
import org.apache.cassandra.gms.Gossiper;
|
||||
import org.apache.cassandra.gms.FailureDetector;
|
||||
import org.apache.cassandra.locator.EndpointSnitchInfo;
|
||||
import org.apache.cassandra.metrics.StreamingMetrics;
|
||||
import org.apache.cassandra.net.MessagingService;
|
||||
import org.apache.cassandra.service.CacheService;
|
||||
import org.apache.cassandra.service.GCInspector;
|
||||
@ -34,6 +35,7 @@ public class Main {
|
||||
CacheService.getInstance();
|
||||
CompactionManager.getInstance();
|
||||
GCInspector.register();
|
||||
StreamingMetrics.register_mbeans();
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
|
102
src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
Normal file
102
src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
Normal file
@ -0,0 +1,102 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
package org.apache.cassandra.metrics;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
|
||||
import com.scylladb.jmx.api.APIClient;
|
||||
import com.scylladb.jmx.metrics.APIMetrics;
|
||||
import com.scylladb.jmx.metrics.DefaultNameFactory;
|
||||
import com.scylladb.jmx.metrics.MetricNameFactory;
|
||||
import com.yammer.metrics.core.Counter;
|
||||
|
||||
/**
|
||||
* Metrics for streaming.
|
||||
*/
|
||||
public class StreamingMetrics
|
||||
{
|
||||
public static final String TYPE_NAME = "Streaming";
|
||||
private static final Map<String, StreamingMetrics> instances = new HashMap<String, StreamingMetrics>();
|
||||
static final int INTERVAL = 1000; //update every 1second
|
||||
|
||||
private static Timer timer = new Timer("Streaming Metrics");
|
||||
|
||||
public static final Counter activeStreamsOutbound = APIMetrics.newCounter("/stream_manager/metrics/outbound", DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null));
|
||||
public static final Counter totalIncomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null));
|
||||
public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null));
|
||||
public final Counter incomingBytes;
|
||||
public final Counter outgoingBytes;
|
||||
|
||||
public static void register_mbeans() {
|
||||
TimerTask taskToExecute = new CheckRegistration();
|
||||
timer.scheduleAtFixedRate(taskToExecute, 100, INTERVAL);
|
||||
}
|
||||
|
||||
public StreamingMetrics(final InetAddress peer)
|
||||
{
|
||||
MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replaceAll(":", "."));
|
||||
incomingBytes = APIMetrics.newCounter("/stream_manager/metrics/incoming/" + peer,factory.createMetricName("IncomingBytes"));
|
||||
outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes"));
|
||||
}
|
||||
|
||||
private static final class CheckRegistration extends TimerTask {
|
||||
private APIClient c = new APIClient();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
JsonArray streams = c.getJsonArray("/stream_manager/");
|
||||
Set<String> all = new HashSet<String>();
|
||||
for (int i = 0; i < streams.size(); i ++) {
|
||||
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
|
||||
for (int j = 0; j < sessions.size(); j++) {
|
||||
String name = sessions.getJsonObject(j).getString("peer");
|
||||
System.out.println("adding stream" + name);
|
||||
if (!instances.containsKey(name)) {
|
||||
StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
|
||||
instances.put(name, metrics);
|
||||
}
|
||||
all.add(name);
|
||||
}
|
||||
}
|
||||
//removing deleted stream
|
||||
for (String n : instances.keySet()) {
|
||||
if (! all.contains(n)) {
|
||||
instances.remove(n);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignoring exceptions, will retry on the next interval
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -76,8 +76,8 @@ public final class SessionInfo implements Serializable
|
||||
this.peer = peer;
|
||||
this.sessionIndex = sessionIndex;
|
||||
this.connecting = connecting;
|
||||
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
|
||||
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
|
||||
this.receivingSummaries = receivingSummaries;
|
||||
this.sendingSummaries = sendingSummaries;
|
||||
this.receivingFiles = new ConcurrentHashMap<>();
|
||||
this.sendingFiles = new ConcurrentHashMap<>();
|
||||
this.state = state;
|
||||
@ -94,10 +94,10 @@ public final class SessionInfo implements Serializable
|
||||
}
|
||||
|
||||
public static SessionInfo fromJsonObject(JsonObject obj) {
|
||||
return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"),
|
||||
return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"),
|
||||
obj.getString("connecting"),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")),
|
||||
obj.getString("state"));
|
||||
}
|
||||
|
||||
@ -222,6 +222,9 @@ public final class SessionInfo implements Serializable
|
||||
|
||||
private long getTotalSizes(Collection<StreamSummary> summaries)
|
||||
{
|
||||
if (summaries == null) {
|
||||
return 0;
|
||||
}
|
||||
long total = 0;
|
||||
for (StreamSummary summary : summaries)
|
||||
total += summary.totalSize;
|
||||
|
@ -62,6 +62,9 @@ public class StreamSummary
|
||||
}
|
||||
|
||||
public static Collection<StreamSummary> fromJsonArr(JsonArray arr) {
|
||||
if (arr == null) {
|
||||
return null;
|
||||
}
|
||||
Collection<StreamSummary> res = new ArrayList<StreamSummary>();
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
res.add(fromJsonObject(arr.getJsonObject(i)));
|
||||
|
@ -33,6 +33,7 @@ import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.cassandra.streaming.ProgressInfo;
|
||||
import org.apache.cassandra.streaming.SessionInfo;
|
||||
@ -175,6 +176,9 @@ public class SessionInfoCompositeData
|
||||
|
||||
private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
|
||||
{
|
||||
if (toConvert == null) {
|
||||
toConvert = Sets.newHashSet();
|
||||
}
|
||||
CompositeData[] composites = new CompositeData[toConvert.size()];
|
||||
return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user