Merge "Adding file information to stream" from Amnon

"This series depends on scylla patch fixing the stream information.

Now that the API report on file information in the stream they need to be
populated to the jmx.

After this patch the nodetool netstats report about file information:

$ nodetool  netstats
Mode: NORMAL
Bootstrap ee150e80-dcef-11e5-bee0-000000000000
    /127.0.0.2
        Sending 1 files, 0 bytes total. Already sent 1 files, 8391192 bytes total
            txnofile 8391192/8391192 bytes(100%) sent to idx:0/127.0.0.2
Read Repair Statistics:
Attempted: 6
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0          16268
Responses                       n/a         0              2

Fixes scylladb/scylla#948"
This commit is contained in:
Pekka Enberg 2016-02-27 20:26:09 +02:00
commit 00a62ca126
3 changed files with 47 additions and 8 deletions

View File

@ -26,6 +26,11 @@ package org.apache.cassandra.streaming;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.json.JsonArray;
import javax.json.JsonObject;
import com.google.common.base.Objects;
@ -74,6 +79,31 @@ public class ProgressInfo implements Serializable
this.totalBytes = totalBytes;
}
static public ProgressInfo fromJsonObject(JsonObject obj) {
try {
return new ProgressInfo(InetAddress.getByName(obj.getString("peer")),
obj.getInt("session_index"),
obj.getString("file_name"),
Direction.valueOf(obj.getString("direction")),
obj.getJsonNumber("current_bytes").longValue(),
obj.getJsonNumber("total_bytes").longValue());
} catch (UnknownHostException e) {
// Not suppose to get here
}
return null;
}
static public Map<String, ProgressInfo> fromJArrray(JsonArray arr) {
Map<String, ProgressInfo> res = new HashMap<String, ProgressInfo>();
if (arr != null) {
for (int i = 0; i < arr.size(); i++) {
ProgressInfo obj = fromJsonObject(arr.getJsonObject(i).getJsonObject("value"));
res.put(obj.fileName, obj);
}
}
return res;
}
/**
* @return true if file transfer is completed
*/

View File

@ -28,6 +28,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -72,14 +73,16 @@ public final class SessionInfo implements Serializable
InetAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state) {
StreamSession.State state,
Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this.peer = peer;
this.sessionIndex = sessionIndex;
this.connecting = connecting;
this.receivingSummaries = receivingSummaries;
this.sendingSummaries = sendingSummaries;
this.receivingFiles = new ConcurrentHashMap<>();
this.sendingFiles = new ConcurrentHashMap<>();
this.receivingFiles = receivingFiles;
this.sendingFiles = sendingFiles;
this.state = state;
}
@ -88,17 +91,21 @@ public final class SessionInfo implements Serializable
String connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
String state) {
String state,
Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries,
StreamSession.State.valueOf(state));
StreamSession.State.valueOf(state), receivingFiles, sendingFiles);
}
ProgressInfo in;
public static SessionInfo fromJsonObject(JsonObject obj) {
return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"),
obj.getString("connecting"),
StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")),
StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")),
obj.getString("state"));
obj.getString("state"),
ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")),
ProgressInfo.fromJArrray(obj.getJsonArray("sending_files")));
}
public static Set<SessionInfo> fromJsonArr(JsonArray arr) {

View File

@ -39,6 +39,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
import java.util.HashMap;
public class SessionInfoCompositeData
{
@ -150,7 +151,8 @@ public class SessionInfoCompositeData
connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
StreamSession.State.valueOf((String) values[5]));
StreamSession.State.valueOf((String) values[5]),
new HashMap<String, ProgressInfo>(), new HashMap<String, ProgressInfo>());
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)