Merge "Enabling nodetool netstats" from Amnon
"This series adds the jmx implementation to enable netstats. After this series netstats should complete successfuly. A run example: $ ./bin/nodetool netstats Mode: NORMAL repair 397c91a0-8205-11e5-83e4-000000000001 repair 3977d5ba-8205-11e5-83e4-000000000001 repair 3977d624-8205-11e5-83e4-000000000001 repair 397c8fc8-8205-11e5-83e4-000000000001 ....... ...... repair 3977d502-8205-11e5-83e4-000000000001 Read Repair Statistics: Attempted: 1 Mismatch (Blocking): 0 Mismatch (Background): 0 Pool Name Active Pending Completed Commands n/a 0 21182 Responses n/a 0 597"
This commit is contained in:
commit
72f6f5dab4
@ -486,9 +486,24 @@ public class APIClient {
|
||||
public Map<InetAddress, Float> getMapInetAddressFloatValue(String string) {
|
||||
return getMapInetAddressFloatValue(string, null);
|
||||
}
|
||||
|
||||
public Map<String, Long> getMapStringLongValue(String string,
|
||||
MultivaluedMap<String, String> queryParams) {
|
||||
Map<String, Long> res = new HashMap<String, Long>();
|
||||
|
||||
JsonReader reader = getReader(string, queryParams);
|
||||
|
||||
JsonArray arr = reader.readArray();
|
||||
JsonObject obj = null;
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
obj = arr.getJsonObject(i);
|
||||
res.put(obj.getString("key"), obj.getJsonNumber("value").longValue());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public Map<String, Long> getMapStringLongValue(String string) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
return getMapStringLongValue(string, null);
|
||||
}
|
||||
|
||||
public long[] getLongArrValue(String string,
|
||||
@ -507,9 +522,23 @@ public class APIClient {
|
||||
return getLongArrValue(string, null);
|
||||
}
|
||||
|
||||
public Map<String, Integer> getMapStringIntegerValue(String string,
|
||||
MultivaluedMap<String, String> queryParams) {
|
||||
Map<String, Integer> res = new HashMap<String, Integer>();
|
||||
|
||||
JsonReader reader = getReader(string, queryParams);
|
||||
|
||||
JsonArray arr = reader.readArray();
|
||||
JsonObject obj = null;
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
obj = arr.getJsonObject(i);
|
||||
res.put(obj.getString("key"), obj.getInt("value"));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public Map<String, Integer> getMapStringIntegerValue(String string) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
return getMapStringIntegerValue(string, null);
|
||||
}
|
||||
|
||||
public int[] getIntArrValue(String string,
|
||||
|
@ -94,7 +94,7 @@ public final class MessagingService implements MessagingServiceMBean {
|
||||
*/
|
||||
public Map<String, Integer> getResponsePendingTasks() {
|
||||
log(" getResponsePendingTasks()");
|
||||
return c.getMapStringIntegerValue("");
|
||||
return c.getMapStringIntegerValue("/messaging_service/messages/respond_pending");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -102,7 +102,7 @@ public final class MessagingService implements MessagingServiceMBean {
|
||||
*/
|
||||
public Map<String, Long> getResponseCompletedTasks() {
|
||||
log(" getResponseCompletedTasks()");
|
||||
return c.getMapStringLongValue("");
|
||||
return c.getMapStringLongValue("/messaging_service/messages/respond_completed");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -110,7 +110,7 @@ public final class MessagingService implements MessagingServiceMBean {
|
||||
*/
|
||||
public Map<String, Integer> getDroppedMessages() {
|
||||
log(" getDroppedMessages()");
|
||||
return c.getMapStringIntegerValue("");
|
||||
return c.getMapStringIntegerValue("/messaging_service/messages/dropped");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -38,6 +38,8 @@ import javax.ws.rs.core.MultivaluedMap;
|
||||
|
||||
import org.apache.cassandra.metrics.StorageMetrics;
|
||||
import org.apache.cassandra.repair.RepairParallelism;
|
||||
import org.apache.cassandra.streaming.StreamManager;
|
||||
|
||||
import com.cloudius.urchin.api.APIClient;
|
||||
import com.cloudius.urchin.utils.FileUtils;
|
||||
|
||||
@ -77,8 +79,7 @@ public class StorageService extends NotificationBroadcasterSupport
|
||||
jmxObjectName = new ObjectName(
|
||||
"org.apache.cassandra.db:type=StorageService");
|
||||
mbs.registerMBean(this, jmxObjectName);
|
||||
// mbs.registerMBean(StreamManager.instance, new ObjectName(
|
||||
// StreamManager.OBJECT_NAME));
|
||||
mbs.registerMBean(StreamManager.getInstance(), new ObjectName(StreamManager.OBJECT_NAME));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
121
src/main/java/org/apache/cassandra/streaming/ProgressInfo.java
Normal file
121
src/main/java/org/apache/cassandra/streaming/ProgressInfo.java
Normal file
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.InetAddress;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
/**
|
||||
* ProgressInfo contains file transfer progress.
|
||||
*/
|
||||
public class ProgressInfo implements Serializable
|
||||
{
|
||||
/**
|
||||
* Direction of the stream.
|
||||
*/
|
||||
public static enum Direction
|
||||
{
|
||||
OUT(0),
|
||||
IN(1);
|
||||
|
||||
public final byte code;
|
||||
|
||||
private Direction(int code)
|
||||
{
|
||||
this.code = (byte) code;
|
||||
}
|
||||
|
||||
public static Direction fromByte(byte direction)
|
||||
{
|
||||
return direction == 0 ? OUT : IN;
|
||||
}
|
||||
}
|
||||
|
||||
public final InetAddress peer;
|
||||
public final int sessionIndex;
|
||||
public final String fileName;
|
||||
public final Direction direction;
|
||||
public final long currentBytes;
|
||||
public final long totalBytes;
|
||||
|
||||
public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
|
||||
{
|
||||
assert totalBytes > 0;
|
||||
|
||||
this.peer = peer;
|
||||
this.sessionIndex = sessionIndex;
|
||||
this.fileName = fileName;
|
||||
this.direction = direction;
|
||||
this.currentBytes = currentBytes;
|
||||
this.totalBytes = totalBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if file transfer is completed
|
||||
*/
|
||||
public boolean isCompleted()
|
||||
{
|
||||
return currentBytes >= totalBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* ProgressInfo is considered to be equal only when all attributes except currentBytes are equal.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
ProgressInfo that = (ProgressInfo) o;
|
||||
|
||||
if (totalBytes != that.totalBytes) return false;
|
||||
if (direction != that.direction) return false;
|
||||
if (!fileName.equals(that.fileName)) return false;
|
||||
if (sessionIndex != that.sessionIndex) return false;
|
||||
return peer.equals(that.peer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
StringBuilder sb = new StringBuilder(fileName);
|
||||
sb.append(" ").append(currentBytes);
|
||||
sb.append("/").append(totalBytes).append(" bytes");
|
||||
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
|
||||
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
|
||||
sb.append("idx:").append(sessionIndex);
|
||||
sb.append(peer);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
243
src/main/java/org/apache/cassandra/streaming/SessionInfo.java
Normal file
243
src/main/java/org/apache/cassandra/streaming/SessionInfo.java
Normal file
@ -0,0 +1,243 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Stream session info.
|
||||
*/
|
||||
public final class SessionInfo implements Serializable
|
||||
{
|
||||
public final InetAddress peer;
|
||||
public final int sessionIndex;
|
||||
public final InetAddress connecting;
|
||||
/** Immutable collection of receiving summaries */
|
||||
public final Collection<StreamSummary> receivingSummaries;
|
||||
/** Immutable collection of sending summaries*/
|
||||
public final Collection<StreamSummary> sendingSummaries;
|
||||
/** Current session state */
|
||||
public final StreamSession.State state;
|
||||
|
||||
private final Map<String, ProgressInfo> receivingFiles;
|
||||
private final Map<String, ProgressInfo> sendingFiles;
|
||||
|
||||
static InetAddress address(String val) {
|
||||
try {
|
||||
return InetAddress.getByName(val);
|
||||
} catch (UnknownHostException e) {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public SessionInfo(InetAddress peer,
|
||||
int sessionIndex,
|
||||
InetAddress connecting,
|
||||
Collection<StreamSummary> receivingSummaries,
|
||||
Collection<StreamSummary> sendingSummaries,
|
||||
StreamSession.State state) {
|
||||
this.peer = peer;
|
||||
this.sessionIndex = sessionIndex;
|
||||
this.connecting = connecting;
|
||||
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
|
||||
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
|
||||
this.receivingFiles = new ConcurrentHashMap<>();
|
||||
this.sendingFiles = new ConcurrentHashMap<>();
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public SessionInfo(String peer,
|
||||
int sessionIndex,
|
||||
String connecting,
|
||||
Collection<StreamSummary> receivingSummaries,
|
||||
Collection<StreamSummary> sendingSummaries,
|
||||
String state) {
|
||||
this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries,
|
||||
StreamSession.State.valueOf(state));
|
||||
}
|
||||
|
||||
public static SessionInfo fromJsonObject(JsonObject obj) {
|
||||
return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"),
|
||||
obj.getString("connecting"),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")),
|
||||
StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")),
|
||||
obj.getString("state"));
|
||||
}
|
||||
|
||||
public static Set<SessionInfo> fromJsonArr(JsonArray arr) {
|
||||
Set<SessionInfo> res = new HashSet<SessionInfo>();
|
||||
if (arr != null) {
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
res.add(fromJsonObject(arr.getJsonObject(i)));
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public boolean isFailed()
|
||||
{
|
||||
return state == StreamSession.State.FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update progress of receiving/sending file.
|
||||
*
|
||||
* @param newProgress new progress info
|
||||
*/
|
||||
public void updateProgress(ProgressInfo newProgress)
|
||||
{
|
||||
assert peer.equals(newProgress.peer);
|
||||
|
||||
Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN
|
||||
? receivingFiles : sendingFiles;
|
||||
currentFiles.put(newProgress.fileName, newProgress);
|
||||
}
|
||||
|
||||
public Collection<ProgressInfo> getReceivingFiles()
|
||||
{
|
||||
return receivingFiles.values();
|
||||
}
|
||||
|
||||
public Collection<ProgressInfo> getSendingFiles()
|
||||
{
|
||||
return sendingFiles.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files already received.
|
||||
*/
|
||||
public long getTotalFilesReceived()
|
||||
{
|
||||
return getTotalFilesCompleted(receivingFiles.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files already sent.
|
||||
*/
|
||||
public long getTotalFilesSent()
|
||||
{
|
||||
return getTotalFilesCompleted(sendingFiles.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) already received.
|
||||
*/
|
||||
public long getTotalSizeReceived()
|
||||
{
|
||||
return getTotalSizeInProgress(receivingFiles.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) already sent.
|
||||
*/
|
||||
public long getTotalSizeSent()
|
||||
{
|
||||
return getTotalSizeInProgress(sendingFiles.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files to receive in the session
|
||||
*/
|
||||
public long getTotalFilesToReceive()
|
||||
{
|
||||
return getTotalFiles(receivingSummaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of files to send in the session
|
||||
*/
|
||||
public long getTotalFilesToSend()
|
||||
{
|
||||
return getTotalFiles(sendingSummaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) to receive in the session
|
||||
*/
|
||||
public long getTotalSizeToReceive()
|
||||
{
|
||||
return getTotalSizes(receivingSummaries);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total size(in bytes) to send in the session
|
||||
*/
|
||||
public long getTotalSizeToSend()
|
||||
{
|
||||
return getTotalSizes(sendingSummaries);
|
||||
}
|
||||
|
||||
private long getTotalSizeInProgress(Collection<ProgressInfo> files)
|
||||
{
|
||||
long total = 0;
|
||||
for (ProgressInfo file : files)
|
||||
total += file.currentBytes;
|
||||
return total;
|
||||
}
|
||||
|
||||
private long getTotalFiles(Collection<StreamSummary> summaries)
|
||||
{
|
||||
long total = 0;
|
||||
for (StreamSummary summary : summaries)
|
||||
total += summary.files;
|
||||
return total;
|
||||
}
|
||||
|
||||
private long getTotalSizes(Collection<StreamSummary> summaries)
|
||||
{
|
||||
long total = 0;
|
||||
for (StreamSummary summary : summaries)
|
||||
total += summary.totalSize;
|
||||
return total;
|
||||
}
|
||||
|
||||
private long getTotalFilesCompleted(Collection<ProgressInfo> files)
|
||||
{
|
||||
Iterable<ProgressInfo> completed = Iterables.filter(files, new Predicate<ProgressInfo>()
|
||||
{
|
||||
public boolean apply(ProgressInfo input)
|
||||
{
|
||||
return input.isCompleted();
|
||||
}
|
||||
});
|
||||
return Iterables.size(completed);
|
||||
}
|
||||
}
|
110
src/main/java/org/apache/cassandra/streaming/StreamManager.java
Normal file
110
src/main/java/org/apache/cassandra/streaming/StreamManager.java
Normal file
@ -0,0 +1,110 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
import javax.management.ListenerNotFoundException;
|
||||
import javax.management.MBeanNotificationInfo;
|
||||
import javax.management.NotificationFilter;
|
||||
import javax.management.NotificationListener;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
|
||||
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
|
||||
|
||||
import com.cloudius.urchin.api.APIClient;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* StreamManager manages currently running {@link StreamResultFuture}s and
|
||||
* provides status of all operation invoked.
|
||||
*
|
||||
* All stream operation should be created through this class to track streaming
|
||||
* status and progress.
|
||||
*/
|
||||
public class StreamManager implements StreamManagerMBean {
|
||||
public static final StreamManager instance = new StreamManager();
|
||||
private static final java.util.logging.Logger logger = java.util.logging.Logger
|
||||
.getLogger(StreamManager.class.getName());
|
||||
private APIClient c = new APIClient();
|
||||
|
||||
public Set<StreamState> getState() {
|
||||
JsonArray arr = c.getJsonArray("/stream_manager/");
|
||||
Set<StreamState> res = new HashSet<StreamState>();
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
JsonObject obj = arr.getJsonObject(i);
|
||||
res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), SessionInfo.fromJsonArr(obj.getJsonArray("sessions"))));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public static StreamManager getInstance() {
|
||||
return instance;
|
||||
}
|
||||
public Set<CompositeData> getCurrentStreams() {
|
||||
logger.info("getCurrentStreams");
|
||||
return Sets.newHashSet(Iterables.transform(getState(), new Function<StreamState, CompositeData>()
|
||||
{
|
||||
public CompositeData apply(StreamState input)
|
||||
{
|
||||
return StreamStateCompositeData.toCompositeData(input);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeNotificationListener(NotificationListener arg0,
|
||||
NotificationFilter arg1, Object arg2)
|
||||
throws ListenerNotFoundException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addNotificationListener(NotificationListener arg0,
|
||||
NotificationFilter arg1, Object arg2)
|
||||
throws IllegalArgumentException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MBeanNotificationInfo[] getNotificationInfo() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeNotificationListener(NotificationListener arg0)
|
||||
throws ListenerNotFoundException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.util.Set;
|
||||
import javax.management.NotificationEmitter;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
|
||||
public interface StreamManagerMBean extends NotificationEmitter {
|
||||
public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
|
||||
|
||||
/**
|
||||
* Returns the current state of all ongoing streams.
|
||||
*/
|
||||
Set<CompositeData> getCurrentStreams();
|
||||
}
|
105
src/main/java/org/apache/cassandra/streaming/StreamSession.java
Normal file
105
src/main/java/org/apache/cassandra/streaming/StreamSession.java
Normal file
@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
/**
|
||||
* Handles the streaming a one or more section of one of more sstables to and from a specific
|
||||
* remote node.
|
||||
*
|
||||
* Both this node and the remote one will create a similar symmetrical StreamSession. A streaming
|
||||
* session has the following life-cycle:
|
||||
*
|
||||
* 1. Connections Initialization
|
||||
*
|
||||
* (a) A node (the initiator in the following) create a new StreamSession, initialize it (init())
|
||||
* and then start it (start()). Start will create a {@link ConnectionHandler} that will create
|
||||
* two connections to the remote node (the follower in the following) with whom to stream and send
|
||||
* a StreamInit message. The first connection will be the incoming connection for the
|
||||
* initiator, and the second connection will be the outgoing.
|
||||
* (b) Upon reception of that StreamInit message, the follower creates its own StreamSession,
|
||||
* initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler
|
||||
* according to StreamInit message's isForOutgoing flag.
|
||||
* (d) When the both incoming and outgoing connections are established, StreamSession calls
|
||||
* StreamSession#onInitializationComplete method to start the streaming prepare phase
|
||||
* (StreamResultFuture.startStreaming()).
|
||||
*
|
||||
* 2. Streaming preparation phase
|
||||
*
|
||||
* (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a
|
||||
* PrepareMessage that includes what files/sections this node will stream to the follower
|
||||
* (stored in a StreamTransferTask, each column family has it's own transfer task) and what
|
||||
* the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has
|
||||
* nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise,
|
||||
* it waits for the follower PrepareMessage.
|
||||
* (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive
|
||||
* and send back its own PrepareMessage with a summary of the files/sections that will be sent to
|
||||
* the initiator (prepare()). After having sent that message, the follower goes to its Streamning
|
||||
* phase.
|
||||
* (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will
|
||||
* receive and then goes to his own Streaming phase.
|
||||
*
|
||||
* 3. Streaming phase
|
||||
*
|
||||
* (a) The streaming phase is started by each node (the sender in the follower, but note that each side
|
||||
* of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles().
|
||||
* This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage
|
||||
* consists of a FileMessageHeader that indicates which file is coming and then start streaming the
|
||||
* content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the
|
||||
* fileSent() method is called for that file. If all the files for a StreamTransferTask are sent
|
||||
* (StreamTransferTask.complete()), the task is marked complete (taskCompleted()).
|
||||
* (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in
|
||||
* FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as
|
||||
* complete (received()). When all files for the StreamReceiveTask have been received, the sstables
|
||||
* are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
|
||||
* is marked complete (taskCompleted())
|
||||
* (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream
|
||||
* (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
|
||||
* by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new
|
||||
* FileMessage for that file.
|
||||
* (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
|
||||
* (maybeCompleted()).
|
||||
*
|
||||
* 4. Completion phase
|
||||
*
|
||||
* (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()).
|
||||
* If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
|
||||
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
|
||||
* send a CompleteMessage to the other side.
|
||||
*/
|
||||
public class StreamSession
|
||||
{
|
||||
|
||||
public static enum State
|
||||
{
|
||||
INITIALIZED,
|
||||
PREPARING,
|
||||
STREAMING,
|
||||
WAIT_COMPLETE,
|
||||
COMPLETE,
|
||||
FAILED,
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Current snapshot of streaming progress.
|
||||
*/
|
||||
public class StreamState implements Serializable
|
||||
{
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = 1L;
|
||||
public final UUID planId;
|
||||
public final String description;
|
||||
public final Set<SessionInfo> sessions;
|
||||
|
||||
public StreamState(UUID planId, String description, Set<SessionInfo> sessions) {
|
||||
this.planId = planId;
|
||||
this.description = description;
|
||||
this.sessions = sessions;
|
||||
}
|
||||
public StreamState(String planId, String description, Set<SessionInfo> sessions)
|
||||
{
|
||||
this(UUID.fromString(planId), description, sessions);
|
||||
}
|
||||
|
||||
public boolean hasFailedSession()
|
||||
{
|
||||
return Iterables.any(sessions, new Predicate<SessionInfo>()
|
||||
{
|
||||
public boolean apply(SessionInfo session)
|
||||
{
|
||||
return session.isFailed();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
/**
|
||||
* Summary of streaming.
|
||||
*/
|
||||
public class StreamSummary
|
||||
{
|
||||
public final UUID cfId;
|
||||
|
||||
/**
|
||||
* Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
|
||||
*/
|
||||
public final int files;
|
||||
public final long totalSize;
|
||||
|
||||
public StreamSummary(UUID cfId, int files, long totalSize)
|
||||
{
|
||||
this.cfId = cfId;
|
||||
this.files = files;
|
||||
this.totalSize = totalSize;
|
||||
}
|
||||
|
||||
public StreamSummary(String cfId, int files, long totalSize) {
|
||||
this(UUID.fromString(cfId), files, totalSize);
|
||||
}
|
||||
|
||||
public static StreamSummary fromJsonObject(JsonObject obj) {
|
||||
return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), obj.getJsonNumber("total_size").longValue());
|
||||
}
|
||||
|
||||
public static Collection<StreamSummary> fromJsonArr(JsonArray arr) {
|
||||
Collection<StreamSummary> res = new ArrayList<StreamSummary>();
|
||||
for (int i = 0; i < arr.size(); i++) {
|
||||
res.add(fromJsonObject(arr.getJsonObject(i)));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
StreamSummary summary = (StreamSummary) o;
|
||||
return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hashCode(cfId, files, totalSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
final StringBuilder sb = new StringBuilder("StreamSummary{");
|
||||
sb.append("path=").append(cfId);
|
||||
sb.append(", files=").append(files);
|
||||
sb.append(", totalSize=").append(totalSize);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming.management;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import javax.management.openmbean.*;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import org.apache.cassandra.streaming.ProgressInfo;
|
||||
|
||||
public class ProgressInfoCompositeData
|
||||
{
|
||||
private static final String[] ITEM_NAMES = new String[]{"planId",
|
||||
"peer",
|
||||
"sessionIndex",
|
||||
"fileName",
|
||||
"direction",
|
||||
"currentBytes",
|
||||
"totalBytes"};
|
||||
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
|
||||
"Session peer",
|
||||
"Index of session",
|
||||
"Name of the file",
|
||||
"Direction('IN' or 'OUT')",
|
||||
"Current bytes transferred",
|
||||
"Total bytes to transfer"};
|
||||
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
|
||||
SimpleType.STRING,
|
||||
SimpleType.INTEGER,
|
||||
SimpleType.STRING,
|
||||
SimpleType.STRING,
|
||||
SimpleType.LONG,
|
||||
SimpleType.LONG};
|
||||
|
||||
public static final CompositeType COMPOSITE_TYPE;
|
||||
static {
|
||||
try
|
||||
{
|
||||
COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
|
||||
"ProgressInfo",
|
||||
ITEM_NAMES,
|
||||
ITEM_DESCS,
|
||||
ITEM_TYPES);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
|
||||
{
|
||||
Map<String, Object> valueMap = new HashMap<>();
|
||||
valueMap.put(ITEM_NAMES[0], planId.toString());
|
||||
valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
|
||||
valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
|
||||
valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
|
||||
valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
|
||||
valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
|
||||
valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
|
||||
try
|
||||
{
|
||||
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static ProgressInfo fromCompositeData(CompositeData cd)
|
||||
{
|
||||
Object[] values = cd.getAll(ITEM_NAMES);
|
||||
try
|
||||
{
|
||||
return new ProgressInfo(InetAddress.getByName((String) values[1]),
|
||||
(int) values[2],
|
||||
(String) values[3],
|
||||
ProgressInfo.Direction.valueOf((String)values[4]),
|
||||
(long) values[5],
|
||||
(long) values[6]);
|
||||
}
|
||||
catch (UnknownHostException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,181 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming.management;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import javax.management.openmbean.*;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.cassandra.streaming.ProgressInfo;
|
||||
import org.apache.cassandra.streaming.SessionInfo;
|
||||
import org.apache.cassandra.streaming.StreamSession;
|
||||
import org.apache.cassandra.streaming.StreamSummary;
|
||||
|
||||
public class SessionInfoCompositeData
|
||||
{
|
||||
private static final String[] ITEM_NAMES = new String[]{"planId",
|
||||
"peer",
|
||||
"connecting",
|
||||
"receivingSummaries",
|
||||
"sendingSummaries",
|
||||
"state",
|
||||
"receivingFiles",
|
||||
"sendingFiles",
|
||||
"sessionIndex"};
|
||||
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
|
||||
"Session peer",
|
||||
"Connecting address",
|
||||
"Summaries of receiving data",
|
||||
"Summaries of sending data",
|
||||
"Current session state",
|
||||
"Receiving files",
|
||||
"Sending files",
|
||||
"Session index"};
|
||||
private static final OpenType<?>[] ITEM_TYPES;
|
||||
|
||||
public static final CompositeType COMPOSITE_TYPE;
|
||||
static {
|
||||
try
|
||||
{
|
||||
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
|
||||
SimpleType.STRING,
|
||||
SimpleType.STRING,
|
||||
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
|
||||
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
|
||||
SimpleType.STRING,
|
||||
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
|
||||
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
|
||||
SimpleType.INTEGER};
|
||||
COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
|
||||
"SessionInfo",
|
||||
ITEM_NAMES,
|
||||
ITEM_DESCS,
|
||||
ITEM_TYPES);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
|
||||
{
|
||||
Map<String, Object> valueMap = new HashMap<>();
|
||||
valueMap.put(ITEM_NAMES[0], planId.toString());
|
||||
valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
|
||||
valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
|
||||
Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
|
||||
{
|
||||
public CompositeData apply(StreamSummary input)
|
||||
{
|
||||
return StreamSummaryCompositeData.toCompositeData(input);
|
||||
}
|
||||
};
|
||||
valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
|
||||
valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
|
||||
valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
|
||||
Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
|
||||
{
|
||||
public CompositeData apply(ProgressInfo input)
|
||||
{
|
||||
return ProgressInfoCompositeData.toCompositeData(planId, input);
|
||||
}
|
||||
};
|
||||
valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
|
||||
valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
|
||||
valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
|
||||
try
|
||||
{
|
||||
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static SessionInfo fromCompositeData(CompositeData cd)
|
||||
{
|
||||
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
|
||||
|
||||
Object[] values = cd.getAll(ITEM_NAMES);
|
||||
InetAddress peer, connecting;
|
||||
try
|
||||
{
|
||||
peer = InetAddress.getByName((String) values[1]);
|
||||
connecting = InetAddress.getByName((String) values[2]);
|
||||
}
|
||||
catch (UnknownHostException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
|
||||
{
|
||||
public StreamSummary apply(CompositeData input)
|
||||
{
|
||||
return StreamSummaryCompositeData.fromCompositeData(input);
|
||||
}
|
||||
};
|
||||
SessionInfo info = new SessionInfo(peer,
|
||||
(int)values[8],
|
||||
connecting,
|
||||
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
|
||||
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
|
||||
StreamSession.State.valueOf((String) values[5]));
|
||||
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
|
||||
{
|
||||
public ProgressInfo apply(CompositeData input)
|
||||
{
|
||||
return ProgressInfoCompositeData.fromCompositeData(input);
|
||||
}
|
||||
};
|
||||
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
|
||||
{
|
||||
info.updateProgress(progress);
|
||||
}
|
||||
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
|
||||
{
|
||||
info.updateProgress(progress);
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
|
||||
{
|
||||
return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
|
||||
}
|
||||
|
||||
private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
|
||||
{
|
||||
CompositeData[] composites = new CompositeData[toConvert.size()];
|
||||
return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
|
||||
}
|
||||
}
|
@ -0,0 +1,141 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming.management;
|
||||
|
||||
import java.util.*;
|
||||
import javax.management.openmbean.*;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.cassandra.streaming.SessionInfo;
|
||||
import org.apache.cassandra.streaming.StreamState;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class StreamStateCompositeData
|
||||
{
|
||||
private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions",
|
||||
"currentRxBytes", "totalRxBytes", "rxPercentage",
|
||||
"currentTxBytes", "totalTxBytes", "txPercentage"};
|
||||
private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
|
||||
"Stream plan description",
|
||||
"Active stream sessions",
|
||||
"Number of bytes received across all streams",
|
||||
"Total bytes available to receive across all streams",
|
||||
"Percentage received across all streams",
|
||||
"Number of bytes sent across all streams",
|
||||
"Total bytes available to send across all streams",
|
||||
"Percentage sent across all streams"};
|
||||
private static final OpenType<?>[] ITEM_TYPES;
|
||||
|
||||
public static final CompositeType COMPOSITE_TYPE;
|
||||
static {
|
||||
try
|
||||
{
|
||||
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
|
||||
SimpleType.STRING,
|
||||
ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE),
|
||||
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE,
|
||||
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE};
|
||||
COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
|
||||
"StreamState",
|
||||
ITEM_NAMES,
|
||||
ITEM_DESCS,
|
||||
ITEM_TYPES);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CompositeData toCompositeData(final StreamState streamState)
|
||||
{
|
||||
Map<String, Object> valueMap = new HashMap<>();
|
||||
valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
|
||||
valueMap.put(ITEM_NAMES[1], streamState.description);
|
||||
|
||||
CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
|
||||
Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
|
||||
{
|
||||
public CompositeData apply(SessionInfo input)
|
||||
{
|
||||
return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
|
||||
}
|
||||
})).toArray(sessions);
|
||||
valueMap.put(ITEM_NAMES[2], sessions);
|
||||
|
||||
long currentRxBytes = 0;
|
||||
long totalRxBytes = 0;
|
||||
long currentTxBytes = 0;
|
||||
long totalTxBytes = 0;
|
||||
for (SessionInfo sessInfo : streamState.sessions)
|
||||
{
|
||||
currentRxBytes += sessInfo.getTotalSizeReceived();
|
||||
totalRxBytes += sessInfo.getTotalSizeToReceive();
|
||||
currentTxBytes += sessInfo.getTotalSizeSent();
|
||||
totalTxBytes += sessInfo.getTotalSizeToSend();
|
||||
}
|
||||
double rxPercentage = (totalRxBytes == 0 ? 100L : currentRxBytes * 100L / totalRxBytes);
|
||||
double txPercentage = (totalTxBytes == 0 ? 100L : currentTxBytes * 100L / totalTxBytes);
|
||||
|
||||
valueMap.put(ITEM_NAMES[3], currentRxBytes);
|
||||
valueMap.put(ITEM_NAMES[4], totalRxBytes);
|
||||
valueMap.put(ITEM_NAMES[5], rxPercentage);
|
||||
valueMap.put(ITEM_NAMES[6], currentTxBytes);
|
||||
valueMap.put(ITEM_NAMES[7], totalTxBytes);
|
||||
valueMap.put(ITEM_NAMES[8], txPercentage);
|
||||
|
||||
try
|
||||
{
|
||||
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static StreamState fromCompositeData(CompositeData cd)
|
||||
{
|
||||
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
|
||||
Object[] values = cd.getAll(ITEM_NAMES);
|
||||
UUID planId = UUID.fromString((String) values[0]);
|
||||
String description = (String) values[1];
|
||||
Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
|
||||
new Function<CompositeData, SessionInfo>()
|
||||
{
|
||||
public SessionInfo apply(CompositeData input)
|
||||
{
|
||||
return SessionInfoCompositeData.fromCompositeData(input);
|
||||
}
|
||||
}));
|
||||
return new StreamState(planId, description, sessions);
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
package org.apache.cassandra.streaming.management;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import javax.management.openmbean.*;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import org.apache.cassandra.streaming.StreamSummary;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class StreamSummaryCompositeData
|
||||
{
|
||||
private static final String[] ITEM_NAMES = new String[]{"cfId",
|
||||
"files",
|
||||
"totalSize"};
|
||||
private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
|
||||
"Number of files",
|
||||
"Total bytes of the files"};
|
||||
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
|
||||
SimpleType.INTEGER,
|
||||
SimpleType.LONG};
|
||||
|
||||
public static final CompositeType COMPOSITE_TYPE;
|
||||
static {
|
||||
try
|
||||
{
|
||||
COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
|
||||
"StreamSummary",
|
||||
ITEM_NAMES,
|
||||
ITEM_DESCS,
|
||||
ITEM_TYPES);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static CompositeData toCompositeData(StreamSummary streamSummary)
|
||||
{
|
||||
Map<String, Object> valueMap = new HashMap<>();
|
||||
valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
|
||||
valueMap.put(ITEM_NAMES[1], streamSummary.files);
|
||||
valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
|
||||
try
|
||||
{
|
||||
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
|
||||
}
|
||||
catch (OpenDataException e)
|
||||
{
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static StreamSummary fromCompositeData(CompositeData cd)
|
||||
{
|
||||
Object[] values = cd.getAll(ITEM_NAMES);
|
||||
return new StreamSummary(UUID.fromString((String) values[0]),
|
||||
(int) values[1],
|
||||
(long) values[2]);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user