Add StreamManagerMBean with its subclasses

The StreamManager getStreams returns an hirarchy of classes. This patch
import StreamManagerMBean with the class hirarchy and add an
implementation to StreamManager.

The implementation is based on the stream_manager API.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
This commit is contained in:
Amnon Heiman 2015-11-03 10:53:21 +02:00
parent 1db077618d
commit 20778b2df6
11 changed files with 1306 additions and 0 deletions

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.io.Serializable;
import java.net.InetAddress;
import com.google.common.base.Objects;
/**
* ProgressInfo contains file transfer progress.
*/
public class ProgressInfo implements Serializable
{
/**
* Direction of the stream.
*/
public static enum Direction
{
OUT(0),
IN(1);
public final byte code;
private Direction(int code)
{
this.code = (byte) code;
}
public static Direction fromByte(byte direction)
{
return direction == 0 ? OUT : IN;
}
}
public final InetAddress peer;
public final int sessionIndex;
public final String fileName;
public final Direction direction;
public final long currentBytes;
public final long totalBytes;
public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
{
assert totalBytes > 0;
this.peer = peer;
this.sessionIndex = sessionIndex;
this.fileName = fileName;
this.direction = direction;
this.currentBytes = currentBytes;
this.totalBytes = totalBytes;
}
/**
* @return true if file transfer is completed
*/
public boolean isCompleted()
{
return currentBytes >= totalBytes;
}
/**
* ProgressInfo is considered to be equal only when all attributes except currentBytes are equal.
*/
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProgressInfo that = (ProgressInfo) o;
if (totalBytes != that.totalBytes) return false;
if (direction != that.direction) return false;
if (!fileName.equals(that.fileName)) return false;
if (sessionIndex != that.sessionIndex) return false;
return peer.equals(that.peer);
}
@Override
public int hashCode()
{
return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder(fileName);
sb.append(" ").append(currentBytes);
sb.append("/").append(totalBytes).append(" bytes");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
sb.append("idx:").append(sessionIndex);
sb.append(peer);
return sb.toString();
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.json.JsonArray;
import javax.json.JsonObject;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
/**
* Stream session info.
*/
public final class SessionInfo implements Serializable
{
public final InetAddress peer;
public final int sessionIndex;
public final InetAddress connecting;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
public final Collection<StreamSummary> sendingSummaries;
/** Current session state */
public final StreamSession.State state;
private final Map<String, ProgressInfo> receivingFiles;
private final Map<String, ProgressInfo> sendingFiles;
static InetAddress address(String val) {
try {
return InetAddress.getByName(val);
} catch (UnknownHostException e) {
}
return null;
}
public SessionInfo(InetAddress peer,
int sessionIndex,
InetAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state) {
this.peer = peer;
this.sessionIndex = sessionIndex;
this.connecting = connecting;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.receivingFiles = new ConcurrentHashMap<>();
this.sendingFiles = new ConcurrentHashMap<>();
this.state = state;
}
public SessionInfo(String peer,
int sessionIndex,
String connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
String state) {
this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries,
StreamSession.State.valueOf(state));
}
public static SessionInfo fromJsonObject(JsonObject obj) {
return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"),
obj.getString("connecting"),
StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")),
StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")),
obj.getString("state"));
}
public static Set<SessionInfo> fromJsonArr(JsonArray arr) {
Set<SessionInfo> res = new HashSet<SessionInfo>();
if (arr != null) {
for (int i = 0; i < arr.size(); i++) {
res.add(fromJsonObject(arr.getJsonObject(i)));
}
}
return res;
}
public boolean isFailed()
{
return state == StreamSession.State.FAILED;
}
/**
* Update progress of receiving/sending file.
*
* @param newProgress new progress info
*/
public void updateProgress(ProgressInfo newProgress)
{
assert peer.equals(newProgress.peer);
Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN
? receivingFiles : sendingFiles;
currentFiles.put(newProgress.fileName, newProgress);
}
public Collection<ProgressInfo> getReceivingFiles()
{
return receivingFiles.values();
}
public Collection<ProgressInfo> getSendingFiles()
{
return sendingFiles.values();
}
/**
* @return total number of files already received.
*/
public long getTotalFilesReceived()
{
return getTotalFilesCompleted(receivingFiles.values());
}
/**
* @return total number of files already sent.
*/
public long getTotalFilesSent()
{
return getTotalFilesCompleted(sendingFiles.values());
}
/**
* @return total size(in bytes) already received.
*/
public long getTotalSizeReceived()
{
return getTotalSizeInProgress(receivingFiles.values());
}
/**
* @return total size(in bytes) already sent.
*/
public long getTotalSizeSent()
{
return getTotalSizeInProgress(sendingFiles.values());
}
/**
* @return total number of files to receive in the session
*/
public long getTotalFilesToReceive()
{
return getTotalFiles(receivingSummaries);
}
/**
* @return total number of files to send in the session
*/
public long getTotalFilesToSend()
{
return getTotalFiles(sendingSummaries);
}
/**
* @return total size(in bytes) to receive in the session
*/
public long getTotalSizeToReceive()
{
return getTotalSizes(receivingSummaries);
}
/**
* @return total size(in bytes) to send in the session
*/
public long getTotalSizeToSend()
{
return getTotalSizes(sendingSummaries);
}
private long getTotalSizeInProgress(Collection<ProgressInfo> files)
{
long total = 0;
for (ProgressInfo file : files)
total += file.currentBytes;
return total;
}
private long getTotalFiles(Collection<StreamSummary> summaries)
{
long total = 0;
for (StreamSummary summary : summaries)
total += summary.files;
return total;
}
private long getTotalSizes(Collection<StreamSummary> summaries)
{
long total = 0;
for (StreamSummary summary : summaries)
total += summary.totalSize;
return total;
}
private long getTotalFilesCompleted(Collection<ProgressInfo> files)
{
Iterable<ProgressInfo> completed = Iterables.filter(files, new Predicate<ProgressInfo>()
{
public boolean apply(ProgressInfo input)
{
return input.isCompleted();
}
});
return Iterables.size(completed);
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.util.HashSet;
import java.util.Set;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import com.cloudius.urchin.api.APIClient;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
/**
* StreamManager manages currently running {@link StreamResultFuture}s and
* provides status of all operation invoked.
*
* All stream operation should be created through this class to track streaming
* status and progress.
*/
public class StreamManager implements StreamManagerMBean {
public static final StreamManager instance = new StreamManager();
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(StreamManager.class.getName());
private APIClient c = new APIClient();
public Set<StreamState> getState() {
JsonArray arr = c.getJsonArray("/stream_manager/");
Set<StreamState> res = new HashSet<StreamState>();
for (int i = 0; i < arr.size(); i++) {
JsonObject obj = arr.getJsonObject(i);
res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), SessionInfo.fromJsonArr(obj.getJsonArray("sessions"))));
}
return res;
}
public static StreamManager getInstance() {
return instance;
}
public Set<CompositeData> getCurrentStreams() {
logger.info("getCurrentStreams");
return Sets.newHashSet(Iterables.transform(getState(), new Function<StreamState, CompositeData>()
{
public CompositeData apply(StreamState input)
{
return StreamStateCompositeData.toCompositeData(input);
}
}));
}
@Override
public void removeNotificationListener(NotificationListener arg0,
NotificationFilter arg1, Object arg2)
throws ListenerNotFoundException {
// TODO Auto-generated method stub
}
@Override
public void addNotificationListener(NotificationListener arg0,
NotificationFilter arg1, Object arg2)
throws IllegalArgumentException {
// TODO Auto-generated method stub
}
@Override
public MBeanNotificationInfo[] getNotificationInfo() {
// TODO Auto-generated method stub
return null;
}
@Override
public void removeNotificationListener(NotificationListener arg0)
throws ListenerNotFoundException {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.util.Set;
import javax.management.NotificationEmitter;
import javax.management.openmbean.CompositeData;
public interface StreamManagerMBean extends NotificationEmitter {
public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
/**
* Returns the current state of all ongoing streams.
*/
Set<CompositeData> getCurrentStreams();
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
/**
* Handles the streaming a one or more section of one of more sstables to and from a specific
* remote node.
*
* Both this node and the remote one will create a similar symmetrical StreamSession. A streaming
* session has the following life-cycle:
*
* 1. Connections Initialization
*
* (a) A node (the initiator in the following) create a new StreamSession, initialize it (init())
* and then start it (start()). Start will create a {@link ConnectionHandler} that will create
* two connections to the remote node (the follower in the following) with whom to stream and send
* a StreamInit message. The first connection will be the incoming connection for the
* initiator, and the second connection will be the outgoing.
* (b) Upon reception of that StreamInit message, the follower creates its own StreamSession,
* initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler
* according to StreamInit message's isForOutgoing flag.
* (d) When the both incoming and outgoing connections are established, StreamSession calls
* StreamSession#onInitializationComplete method to start the streaming prepare phase
* (StreamResultFuture.startStreaming()).
*
* 2. Streaming preparation phase
*
* (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a
* PrepareMessage that includes what files/sections this node will stream to the follower
* (stored in a StreamTransferTask, each column family has it's own transfer task) and what
* the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has
* nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise,
* it waits for the follower PrepareMessage.
* (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive
* and send back its own PrepareMessage with a summary of the files/sections that will be sent to
* the initiator (prepare()). After having sent that message, the follower goes to its Streamning
* phase.
* (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will
* receive and then goes to his own Streaming phase.
*
* 3. Streaming phase
*
* (a) The streaming phase is started by each node (the sender in the follower, but note that each side
* of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles().
* This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage
* consists of a FileMessageHeader that indicates which file is coming and then start streaming the
* content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the
* fileSent() method is called for that file. If all the files for a StreamTransferTask are sent
* (StreamTransferTask.complete()), the task is marked complete (taskCompleted()).
* (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in
* FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as
* complete (received()). When all files for the StreamReceiveTask have been received, the sstables
* are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
* is marked complete (taskCompleted())
* (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream
* (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
* by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new
* FileMessage for that file.
* (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
* (maybeCompleted()).
*
* 4. Completion phase
*
* (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()).
* If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
*/
public class StreamSession
{
public static enum State
{
INITIALIZED,
PREPARING,
STREAMING,
WAIT_COMPLETE,
COMPLETE,
FAILED,
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* Current snapshot of streaming progress.
*/
public class StreamState implements Serializable
{
/**
*
*/
private static final long serialVersionUID = 1L;
public final UUID planId;
public final String description;
public final Set<SessionInfo> sessions;
public StreamState(UUID planId, String description, Set<SessionInfo> sessions) {
this.planId = planId;
this.description = description;
this.sessions = sessions;
}
public StreamState(String planId, String description, Set<SessionInfo> sessions)
{
this(UUID.fromString(planId), description, sessions);
}
public boolean hasFailedSession()
{
return Iterables.any(sessions, new Predicate<SessionInfo>()
{
public boolean apply(SessionInfo session)
{
return session.isFailed();
}
});
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import javax.json.JsonArray;
import javax.json.JsonObject;
import com.google.common.base.Objects;
/**
* Summary of streaming.
*/
public class StreamSummary
{
public final UUID cfId;
/**
* Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
*/
public final int files;
public final long totalSize;
public StreamSummary(UUID cfId, int files, long totalSize)
{
this.cfId = cfId;
this.files = files;
this.totalSize = totalSize;
}
public StreamSummary(String cfId, int files, long totalSize) {
this(UUID.fromString(cfId), files, totalSize);
}
public static StreamSummary fromJsonObject(JsonObject obj) {
return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), obj.getJsonNumber("total_size").longValue());
}
public static Collection<StreamSummary> fromJsonArr(JsonArray arr) {
Collection<StreamSummary> res = new ArrayList<StreamSummary>();
for (int i = 0; i < arr.size(); i++) {
res.add(fromJsonObject(arr.getJsonObject(i)));
}
return res;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamSummary summary = (StreamSummary) o;
return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
}
@Override
public int hashCode()
{
return Objects.hashCode(cfId, files, totalSize);
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder("StreamSummary{");
sb.append("path=").append(cfId);
sb.append(", files=").append(files);
sb.append(", totalSize=").append(totalSize);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming.management;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.management.openmbean.*;
import com.google.common.base.Throwables;
import org.apache.cassandra.streaming.ProgressInfo;
public class ProgressInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
"sessionIndex",
"fileName",
"direction",
"currentBytes",
"totalBytes"};
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
"Session peer",
"Index of session",
"Name of the file",
"Direction('IN' or 'OUT')",
"Current bytes transferred",
"Total bytes to transfer"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
SimpleType.LONG};
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
"ProgressInfo",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static ProgressInfo fromCompositeData(CompositeData cd)
{
Object[] values = cd.getAll(ITEM_NAMES);
try
{
return new ProgressInfo(InetAddress.getByName((String) values[1]),
(int) values[2],
(String) values[3],
ProgressInfo.Direction.valueOf((String)values[4]),
(long) values[5],
(long) values[6]);
}
catch (UnknownHostException e)
{
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming.management;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import javax.management.openmbean.*;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
public class SessionInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
"connecting",
"receivingSummaries",
"sendingSummaries",
"state",
"receivingFiles",
"sendingFiles",
"sessionIndex"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
"Session peer",
"Connecting address",
"Summaries of receiving data",
"Summaries of sending data",
"Current session state",
"Receiving files",
"Sending files",
"Session index"};
private static final OpenType<?>[] ITEM_TYPES;
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
SimpleType.STRING,
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
SimpleType.STRING,
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
SimpleType.INTEGER};
COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
"SessionInfo",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
{
public CompositeData apply(StreamSummary input)
{
return StreamSummaryCompositeData.toCompositeData(input);
}
};
valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
{
public CompositeData apply(ProgressInfo input)
{
return ProgressInfoCompositeData.toCompositeData(planId, input);
}
};
valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static SessionInfo fromCompositeData(CompositeData cd)
{
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
Object[] values = cd.getAll(ITEM_NAMES);
InetAddress peer, connecting;
try
{
peer = InetAddress.getByName((String) values[1]);
connecting = InetAddress.getByName((String) values[2]);
}
catch (UnknownHostException e)
{
throw Throwables.propagate(e);
}
Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
{
public StreamSummary apply(CompositeData input)
{
return StreamSummaryCompositeData.fromCompositeData(input);
}
};
SessionInfo info = new SessionInfo(peer,
(int)values[8],
connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
StreamSession.State.valueOf((String) values[5]));
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)
{
return ProgressInfoCompositeData.fromCompositeData(input);
}
};
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
{
info.updateProgress(progress);
}
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
{
info.updateProgress(progress);
}
return info;
}
private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
{
return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
}
private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
{
CompositeData[] composites = new CompositeData[toConvert.size()];
return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming.management;
import java.util.*;
import javax.management.openmbean.*;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamState;
/**
*/
public class StreamStateCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions",
"currentRxBytes", "totalRxBytes", "rxPercentage",
"currentTxBytes", "totalTxBytes", "txPercentage"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
"Stream plan description",
"Active stream sessions",
"Number of bytes received across all streams",
"Total bytes available to receive across all streams",
"Percentage received across all streams",
"Number of bytes sent across all streams",
"Total bytes available to send across all streams",
"Percentage sent across all streams"};
private static final OpenType<?>[] ITEM_TYPES;
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE),
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE,
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE};
COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
"StreamState",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(final StreamState streamState)
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
valueMap.put(ITEM_NAMES[1], streamState.description);
CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
{
public CompositeData apply(SessionInfo input)
{
return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
}
})).toArray(sessions);
valueMap.put(ITEM_NAMES[2], sessions);
long currentRxBytes = 0;
long totalRxBytes = 0;
long currentTxBytes = 0;
long totalTxBytes = 0;
for (SessionInfo sessInfo : streamState.sessions)
{
currentRxBytes += sessInfo.getTotalSizeReceived();
totalRxBytes += sessInfo.getTotalSizeToReceive();
currentTxBytes += sessInfo.getTotalSizeSent();
totalTxBytes += sessInfo.getTotalSizeToSend();
}
double rxPercentage = (totalRxBytes == 0 ? 100L : currentRxBytes * 100L / totalRxBytes);
double txPercentage = (totalTxBytes == 0 ? 100L : currentTxBytes * 100L / totalTxBytes);
valueMap.put(ITEM_NAMES[3], currentRxBytes);
valueMap.put(ITEM_NAMES[4], totalRxBytes);
valueMap.put(ITEM_NAMES[5], rxPercentage);
valueMap.put(ITEM_NAMES[6], currentTxBytes);
valueMap.put(ITEM_NAMES[7], totalTxBytes);
valueMap.put(ITEM_NAMES[8], txPercentage);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static StreamState fromCompositeData(CompositeData cd)
{
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
Object[] values = cd.getAll(ITEM_NAMES);
UUID planId = UUID.fromString((String) values[0]);
String description = (String) values[1];
Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
new Function<CompositeData, SessionInfo>()
{
public SessionInfo apply(CompositeData input)
{
return SessionInfoCompositeData.fromCompositeData(input);
}
}));
return new StreamState(planId, description, sessions);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
package org.apache.cassandra.streaming.management;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.management.openmbean.*;
import com.google.common.base.Throwables;
import org.apache.cassandra.streaming.StreamSummary;
/**
*/
public class StreamSummaryCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"cfId",
"files",
"totalSize"};
private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
"Number of files",
"Total bytes of the files"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.INTEGER,
SimpleType.LONG};
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
"StreamSummary",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(StreamSummary streamSummary)
{
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
valueMap.put(ITEM_NAMES[1], streamSummary.files);
valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
public static StreamSummary fromCompositeData(CompositeData cd)
{
Object[] values = cd.getAll(ITEM_NAMES);
return new StreamSummary(UUID.fromString((String) values[0]),
(int) values[1],
(long) values[2]);
}
}