diff --git a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java index 12c83c8..8b6f99d 100644 --- a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -26,6 +26,11 @@ package org.apache.cassandra.streaming; import java.io.Serializable; import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import javax.json.JsonArray; +import javax.json.JsonObject; import com.google.common.base.Objects; @@ -74,6 +79,31 @@ public class ProgressInfo implements Serializable this.totalBytes = totalBytes; } + static public ProgressInfo fromJsonObject(JsonObject obj) { + try { + return new ProgressInfo(InetAddress.getByName(obj.getString("peer")), + obj.getInt("session_index"), + obj.getString("file_name"), + Direction.valueOf(obj.getString("direction")), + obj.getJsonNumber("current_bytes").longValue(), + obj.getJsonNumber("total_bytes").longValue()); + } catch (UnknownHostException e) { + // Not suppose to get here + } + + return null; + } + + static public Map fromJArrray(JsonArray arr) { + Map res = new HashMap(); + if (arr != null) { + for (int i = 0; i < arr.size(); i++) { + ProgressInfo obj = fromJsonObject(arr.getJsonObject(i).getJsonObject("value")); + res.put(obj.fileName, obj); + } + } + return res; + } /** * @return true if file transfer is completed */ diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java index 4dbb415..6d44484 100644 --- a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -72,14 +73,16 @@ public final class SessionInfo implements Serializable InetAddress connecting, Collection receivingSummaries, Collection sendingSummaries, - StreamSession.State state) { + StreamSession.State state, + Map receivingFiles, + Map sendingFiles) { this.peer = peer; this.sessionIndex = sessionIndex; this.connecting = connecting; this.receivingSummaries = receivingSummaries; this.sendingSummaries = sendingSummaries; - this.receivingFiles = new ConcurrentHashMap<>(); - this.sendingFiles = new ConcurrentHashMap<>(); + this.receivingFiles = receivingFiles; + this.sendingFiles = sendingFiles; this.state = state; } @@ -88,17 +91,21 @@ public final class SessionInfo implements Serializable String connecting, Collection receivingSummaries, Collection sendingSummaries, - String state) { + String state, + Map receivingFiles, + Map sendingFiles) { this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries, - StreamSession.State.valueOf(state)); + StreamSession.State.valueOf(state), receivingFiles, sendingFiles); } - + ProgressInfo in; public static SessionInfo fromJsonObject(JsonObject obj) { return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"), StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")), StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), - obj.getString("state")); + obj.getString("state"), + ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")), + ProgressInfo.fromJArrray(obj.getJsonArray("sending_files"))); } public static Set fromJsonArr(JsonArray arr) { diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index 8f41992..ef5d955 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -39,6 +39,7 @@ import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamSummary; +import java.util.HashMap; public class SessionInfoCompositeData { @@ -150,7 +151,8 @@ public class SessionInfoCompositeData connecting, fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), - StreamSession.State.valueOf((String) values[5])); + StreamSession.State.valueOf((String) values[5]), + new HashMap(), new HashMap()); Function toProgressInfo = new Function() { public ProgressInfo apply(CompositeData input)