From afd49d7bd4d12d1f7a0e04417684618a75990278 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Sat, 27 Feb 2016 03:28:19 +0200 Subject: [PATCH 1/2] ProgressInfo: Add creation from json object and json array This will allow to creat ProgressInfo object from json object and json Array it needed to report stream file information. Signed-off-by: Amnon Heiman --- .../cassandra/streaming/ProgressInfo.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java index 12c83c8..8b6f99d 100644 --- a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -26,6 +26,11 @@ package org.apache.cassandra.streaming; import java.io.Serializable; import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import javax.json.JsonArray; +import javax.json.JsonObject; import com.google.common.base.Objects; @@ -74,6 +79,31 @@ public class ProgressInfo implements Serializable this.totalBytes = totalBytes; } + static public ProgressInfo fromJsonObject(JsonObject obj) { + try { + return new ProgressInfo(InetAddress.getByName(obj.getString("peer")), + obj.getInt("session_index"), + obj.getString("file_name"), + Direction.valueOf(obj.getString("direction")), + obj.getJsonNumber("current_bytes").longValue(), + obj.getJsonNumber("total_bytes").longValue()); + } catch (UnknownHostException e) { + // Not suppose to get here + } + + return null; + } + + static public Map fromJArrray(JsonArray arr) { + Map res = new HashMap(); + if (arr != null) { + for (int i = 0; i < arr.size(); i++) { + ProgressInfo obj = fromJsonObject(arr.getJsonObject(i).getJsonObject("value")); + res.put(obj.fileName, obj); + } + } + return res; + } /** * @return true if file transfer is completed */ From 767517f6bec8e32f24dae416e31e7cf798ee73cf Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Sat, 27 Feb 2016 03:31:01 +0200 Subject: [PATCH 2/2] SessionInfo: Add receiving_files and sending_files support This patch adds the streaming session files receiving and sending information. It is needed for the streaming information. The constructor now expect the file information, so the sessionInfoCompositeData was changed to add an empty value for them. Signed-off-by: Amnon Heiman --- .../cassandra/streaming/SessionInfo.java | 21 ++++++++++++------- .../management/SessionInfoCompositeData.java | 4 +++- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java index 4dbb415..6d44484 100644 --- a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -72,14 +73,16 @@ public final class SessionInfo implements Serializable InetAddress connecting, Collection receivingSummaries, Collection sendingSummaries, - StreamSession.State state) { + StreamSession.State state, + Map receivingFiles, + Map sendingFiles) { this.peer = peer; this.sessionIndex = sessionIndex; this.connecting = connecting; this.receivingSummaries = receivingSummaries; this.sendingSummaries = sendingSummaries; - this.receivingFiles = new ConcurrentHashMap<>(); - this.sendingFiles = new ConcurrentHashMap<>(); + this.receivingFiles = receivingFiles; + this.sendingFiles = sendingFiles; this.state = state; } @@ -88,17 +91,21 @@ public final class SessionInfo implements Serializable String connecting, Collection receivingSummaries, Collection sendingSummaries, - String state) { + String state, + Map receivingFiles, + Map sendingFiles) { this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries, - StreamSession.State.valueOf(state)); + StreamSession.State.valueOf(state), receivingFiles, sendingFiles); } - + ProgressInfo in; public static SessionInfo fromJsonObject(JsonObject obj) { return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"), StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")), StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), - obj.getString("state")); + obj.getString("state"), + ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")), + ProgressInfo.fromJArrray(obj.getJsonArray("sending_files"))); } public static Set fromJsonArr(JsonArray arr) { diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index 8f41992..ef5d955 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -39,6 +39,7 @@ import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamSummary; +import java.util.HashMap; public class SessionInfoCompositeData { @@ -150,7 +151,8 @@ public class SessionInfoCompositeData connecting, fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), - StreamSession.State.valueOf((String) values[5])); + StreamSession.State.valueOf((String) values[5]), + new HashMap(), new HashMap()); Function toProgressInfo = new Function() { public ProgressInfo apply(CompositeData input)