SessionInfo: Add receiving_files and sending_files support

This patch adds the streaming session files receiving and sending
information. It is needed for the streaming information.

The constructor now expect the file information, so the
sessionInfoCompositeData was changed to add an empty value for them.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
This commit is contained in:
Amnon Heiman 2016-02-27 03:31:01 +02:00
parent afd49d7bd4
commit 767517f6be
2 changed files with 17 additions and 8 deletions

View File

@ -28,6 +28,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -72,14 +73,16 @@ public final class SessionInfo implements Serializable
InetAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state) {
StreamSession.State state,
Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this.peer = peer;
this.sessionIndex = sessionIndex;
this.connecting = connecting;
this.receivingSummaries = receivingSummaries;
this.sendingSummaries = sendingSummaries;
this.receivingFiles = new ConcurrentHashMap<>();
this.sendingFiles = new ConcurrentHashMap<>();
this.receivingFiles = receivingFiles;
this.sendingFiles = sendingFiles;
this.state = state;
}
@ -88,17 +91,21 @@ public final class SessionInfo implements Serializable
String connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
String state) {
String state,
Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries,
StreamSession.State.valueOf(state));
StreamSession.State.valueOf(state), receivingFiles, sendingFiles);
}
ProgressInfo in;
public static SessionInfo fromJsonObject(JsonObject obj) {
return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"),
obj.getString("connecting"),
StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")),
StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")),
obj.getString("state"));
obj.getString("state"),
ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")),
ProgressInfo.fromJArrray(obj.getJsonArray("sending_files")));
}
public static Set<SessionInfo> fromJsonArr(JsonArray arr) {

View File

@ -39,6 +39,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
import java.util.HashMap;
public class SessionInfoCompositeData
{
@ -150,7 +151,8 @@ public class SessionInfoCompositeData
connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
StreamSession.State.valueOf((String) values[5]));
StreamSession.State.valueOf((String) values[5]),
new HashMap<String, ProgressInfo>(), new HashMap<String, ProgressInfo>());
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)