Code formatting + source cleanup (eclipse)

This commit is contained in:
elcallio 2016-10-11 14:17:06 +02:00 committed by Calle Wilund
parent 9c2d6cec51
commit 434ce947b0
30 changed files with 575 additions and 711 deletions

0
SCYLLA-VERSION-GEN Executable file → Normal file
View File

0
scripts/git-archive-all Executable file → Normal file
View File

View File

@ -24,69 +24,57 @@
package com.scylladb.jmx.utils;
import java.io.*;
import java.io.File;
import java.text.DecimalFormat;
public class FileUtils
{
public class FileUtils {
private static final double KB = 1024d;
private static final double MB = 1024*1024d;
private static final double GB = 1024*1024*1024d;
private static final double TB = 1024*1024*1024*1024d;
private static final double MB = 1024 * 1024d;
private static final double GB = 1024 * 1024 * 1024d;
private static final double TB = 1024 * 1024 * 1024 * 1024d;
private static final DecimalFormat df = new DecimalFormat("#.##");
public static String stringifyFileSize(double value)
{
public static String stringifyFileSize(double value) {
double d;
if ( value >= TB )
{
if (value >= TB) {
d = value / TB;
String val = df.format(d);
return val + " TB";
}
else if ( value >= GB )
{
} else if (value >= GB) {
d = value / GB;
String val = df.format(d);
return val + " GB";
}
else if ( value >= MB )
{
} else if (value >= MB) {
d = value / MB;
String val = df.format(d);
return val + " MB";
}
else if ( value >= KB )
{
} else if (value >= KB) {
d = value / KB;
String val = df.format(d);
return val + " KB";
}
else
{
} else {
String val = df.format(value);
return val + " bytes";
}
}
/**
* Get the size of a directory in bytes
* @param directory The directory for which we need size.
*
* @param directory
* The directory for which we need size.
* @return The size of the directory
*/
public static long folderSize(File directory)
{
public static long folderSize(File directory) {
long length = 0;
for (File file : directory.listFiles())
{
if (file.isFile())
for (File file : directory.listFiles()) {
if (file.isFile()) {
length += file.length();
else
} else {
length += folderSize(file);
}
}
return length;
}
}
}

View File

@ -26,43 +26,38 @@ package com.scylladb.jmx.utils;
import com.google.common.base.Objects;
public class Pair<T1, T2>
{
public class Pair<T1, T2> {
public final T1 left;
public final T2 right;
protected Pair(T1 left, T2 right)
{
protected Pair(T1 left, T2 right) {
this.left = left;
this.right = right;
}
@Override
public final int hashCode()
{
public final int hashCode() {
int hashCode = 31 + (left == null ? 0 : left.hashCode());
return 31*hashCode + (right == null ? 0 : right.hashCode());
return 31 * hashCode + (right == null ? 0 : right.hashCode());
}
@Override
public final boolean equals(Object o)
{
if(!(o instanceof Pair))
public final boolean equals(Object o) {
if (!(o instanceof Pair)) {
return false;
}
@SuppressWarnings("rawtypes")
Pair that = (Pair)o;
Pair that = (Pair) o;
// handles nulls properly
return Objects.equal(left, that.left) && Objects.equal(right, that.right);
}
@Override
public String toString()
{
public String toString() {
return "(" + left + "," + right + ")";
}
public static <X, Y> Pair<X, Y> create(X x, Y y)
{
public static <X, Y> Pair<X, Y> create(X x, Y y) {
return new Pair<X, Y>(x, y);
}
}

View File

@ -23,18 +23,24 @@
package com.scylladb.jmx.utils;
import java.util.Map;
import javax.management.openmbean.*;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import com.google.common.base.Throwables;
public class SnapshotDetailsTabularData {
private static final String[] ITEM_NAMES = new String[] { "Snapshot name",
"Keyspace name", "Column family name", "True size", "Size on disk" };
private static final String[] ITEM_NAMES = new String[] { "Snapshot name", "Keyspace name", "Column family name",
"True size", "Size on disk" };
private static final String[] ITEM_DESCS = new String[] { "snapshot_name",
"keyspace_name", "columnfamily_name", "TrueDiskSpaceUsed",
"TotalDiskSpaceUsed" };
private static final String[] ITEM_DESCS = new String[] { "snapshot_name", "keyspace_name", "columnfamily_name",
"TrueDiskSpaceUsed", "TotalDiskSpaceUsed" };
private static final String TYPE_NAME = "SnapshotDetails";
@ -48,28 +54,22 @@ public class SnapshotDetailsTabularData {
static {
try {
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
SimpleType.STRING, SimpleType.STRING, SimpleType.STRING };
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING,
SimpleType.STRING };
COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES,
ITEM_DESCS, ITEM_TYPES);
COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE,
ITEM_NAMES);
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static void from(final String snapshot, final String ks,
final String cf,
Map.Entry<String, Pair<Long, Long>> snapshotDetail,
TabularDataSupport result) {
public static void from(final String snapshot, final String ks, final String cf,
Map.Entry<String, Pair<Long, Long>> snapshotDetail, TabularDataSupport result) {
try {
final String totalSize = FileUtils.stringifyFileSize(snapshotDetail
.getValue().left);
final String liveSize = FileUtils.stringifyFileSize(snapshotDetail
.getValue().right);
final String totalSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().left);
final String liveSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().right);
result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
new Object[] { snapshot, ks, cf, liveSize, totalSize }));
} catch (OpenDataException e) {
@ -77,8 +77,8 @@ public class SnapshotDetailsTabularData {
}
}
public static void from(final String snapshot, final String ks,
final String cf, long total, long live, TabularDataSupport result) {
public static void from(final String snapshot, final String ks, final String cf, long total, long live,
TabularDataSupport result) {
try {
final String totalSize = FileUtils.stringifyFileSize(total);
final String liveSize = FileUtils.stringifyFileSize(live);

View File

@ -27,8 +27,7 @@ import javax.management.openmbean.OpenDataException;
/**
* The MBean interface for ColumnFamilyStore
*/
public interface ColumnFamilyStoreMBean
{
public interface ColumnFamilyStoreMBean {
/**
* @return the name of the column family
*/
@ -40,7 +39,9 @@ public interface ColumnFamilyStoreMBean
/**
* force a major compaction of this column family
*
* @param splitOutput true if the output of the major compaction should be split in several sstables
* @param splitOutput
* true if the output of the major compaction should be split in
* several sstables
*/
public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException;
@ -60,7 +61,8 @@ public interface ColumnFamilyStoreMBean
public int getMaximumCompactionThreshold();
/**
* Sets the maximum and maximum number of SSTables in queue before compaction kicks off
* Sets the maximum and maximum number of SSTables in queue before
* compaction kicks off
*/
public void setCompactionThresholds(int minThreshold, int maxThreshold);
@ -72,33 +74,42 @@ public interface ColumnFamilyStoreMBean
/**
* Sets the compaction parameters locally for this node
*
* Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
* Note that this will be set until an ALTER with compaction = {..} is
* executed or the node is restarted
*
* @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..}
* @param options
* compaction options with the same syntax as when doing ALTER
* ... WITH compaction = {..}
*/
public void setCompactionParametersJson(String options);
public String getCompactionParametersJson();
/**
* Sets the compaction parameters locally for this node
*
* Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
* Note that this will be set until an ALTER with compaction = {..} is
* executed or the node is restarted
*
* @param options compaction options map
* @param options
* compaction options map
*/
public void setCompactionParameters(Map<String, String> options);
public Map<String, String> getCompactionParameters();
/**
* Get the compression parameters
*/
public Map<String,String> getCompressionParameters();
public Map<String, String> getCompressionParameters();
/**
* Set the compression parameters
* @param opts map of string names to values
*
* @param opts
* map of string names to values
*/
public void setCompressionParameters(Map<String,String> opts);
public void setCompressionParameters(Map<String, String> opts);
/**
* Set new crc check chance
@ -109,66 +120,74 @@ public interface ColumnFamilyStoreMBean
public long estimateKeys();
/**
* Returns a list of the names of the built column indexes for current store
*
* @return list of the index names
*/
public List<String> getBuiltIndexes();
/**
* Returns a list of filenames that contain the given key on this node
*
* @param key
* @return list of filenames containing the key
*/
public List<String> getSSTablesForKey(String key);
/**
* Scan through Keyspace/ColumnFamily's data directory
* determine which SSTables should be loaded and load them
* Scan through Keyspace/ColumnFamily's data directory determine which
* SSTables should be loaded and load them
*/
public void loadNewSSTables();
/**
* @return the number of SSTables in L0. Always return 0 if Leveled compaction is not enabled.
* @return the number of SSTables in L0. Always return 0 if Leveled
* compaction is not enabled.
*/
public int getUnleveledSSTables();
/**
* @return sstable count for each level. null unless leveled compaction is used.
* array index corresponds to level(int[0] is for level 0, ...).
* @return sstable count for each level. null unless leveled compaction is
* used. array index corresponds to level(int[0] is for level 0,
* ...).
*/
public int[] getSSTableCountPerLevel();
/**
* Get the ratio of droppable tombstones to real columns (and non-droppable tombstones)
* Get the ratio of droppable tombstones to real columns (and non-droppable
* tombstones)
*
* @return ratio
*/
public double getDroppableTombstoneRatio();
/**
* @return the size of SSTables in "snapshots" subdirectory which aren't live anymore
* @return the size of SSTables in "snapshots" subdirectory which aren't
* live anymore
*/
public long trueSnapshotsSize();
/**
* begin sampling for a specific sampler with a given capacity. The cardinality may
* be larger than the capacity, but depending on the use case it may affect its accuracy
* begin sampling for a specific sampler with a given capacity. The
* cardinality may be larger than the capacity, but depending on the use
* case it may affect its accuracy
*/
public void beginLocalSampling(String sampler, int capacity);
/**
* @return top <i>count</i> items for the sampler since beginLocalSampling was called
* @return top <i>count</i> items for the sampler since beginLocalSampling
* was called
*/
public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException;
/*
Is Compaction space check enabled
* Is Compaction space check enabled
*/
public boolean isCompactionDiskSpaceCheckEnabled();
/*
Enable/Disable compaction space check
* Enable/Disable compaction space check
*/
public void compactionDiskSpaceCheck(boolean enable);
}

View File

@ -17,14 +17,13 @@
*/
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public interface CommitLogMBean {
/**
* Command to execute to archive a commitlog segment. Blank to disabled.
* Command to execute to archive a commitlog segment. Blank to disabled.
*/
public String getArchiveCommand();
@ -66,12 +65,14 @@ public interface CommitLogMBean {
public List<String> getActiveSegmentNames();
/**
* @return Files which are pending for archival attempt. Does NOT include failed archive attempts.
* @return Files which are pending for archival attempt. Does NOT include
* failed archive attempts.
*/
public List<String> getArchivingSegmentNames();
/**
* @return The size of the mutations in all active commit log segments (uncompressed).
* @return The size of the mutations in all active commit log segments
* (uncompressed).
*/
public long getActiveContentSize();
@ -81,7 +82,8 @@ public interface CommitLogMBean {
public long getActiveOnDiskSize();
/**
* @return A map between active log segments and the compression ratio achieved for each.
* @return A map between active log segments and the compression ratio
* achieved for each.
*/
public Map<String, Double> getActiveSegmentCompressionRatios();
}

View File

@ -36,13 +36,11 @@ import javax.management.openmbean.TabularType;
import com.google.common.base.Throwables;
public class CompactionHistoryTabularData {
private static final String[] ITEM_NAMES = new String[] { "id",
"keyspace_name", "columnfamily_name", "compacted_at", "bytes_in",
"bytes_out", "rows_merged" };
private static final String[] ITEM_NAMES = new String[] { "id", "keyspace_name", "columnfamily_name",
"compacted_at", "bytes_in", "bytes_out", "rows_merged" };
private static final String[] ITEM_DESCS = new String[] { "time uuid",
"keyspace name", "column family name", "compaction finished at",
"total bytes in", "total bytes out", "total rows merged" };
private static final String[] ITEM_DESCS = new String[] { "time uuid", "keyspace name", "column family name",
"compaction finished at", "total bytes in", "total bytes out", "total rows merged" };
private static final String TYPE_NAME = "CompactionHistory";
@ -56,22 +54,18 @@ public class CompactionHistoryTabularData {
static {
try {
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
SimpleType.STRING, SimpleType.LONG, SimpleType.LONG,
SimpleType.LONG, SimpleType.STRING };
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG,
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING };
COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES,
ITEM_DESCS, ITEM_TYPES);
COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE,
ITEM_NAMES);
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static TabularData from(JsonArray resultSet)
throws OpenDataException {
public static TabularData from(JsonArray resultSet) throws OpenDataException {
TabularDataSupport result = new TabularDataSupport(TABULAR_TYPE);
for (int i = 0; i < resultSet.size(); i++) {
JsonObject row = resultSet.getJsonObject(i);
@ -91,15 +85,13 @@ public class CompactionHistoryTabularData {
if (m > 0) {
sb.append(',');
}
sb.append(entry.getString("key")).append(':')
.append(entry.getString("value"));
sb.append(entry.getString("key")).append(':').append(entry.getString("value"));
}
sb.append('}');
}
result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
new Object[] { id, ksName, cfName, compactedAt, bytesIn,
bytesOut, sb.toString() }));
new Object[] { id, ksName, cfName, compactedAt, bytesIn, bytesOut, sb.toString() }));
}
return result;
}

View File

@ -19,10 +19,10 @@ package org.apache.cassandra.db.compaction;
import java.util.List;
import java.util.Map;
import javax.management.openmbean.TabularData;
public interface CompactionManagerMBean
{
public interface CompactionManagerMBean {
/** List of running compaction objects. */
public List<Map<String, String>> getCompactions();
@ -45,7 +45,7 @@ public interface CompactionManagerMBean
/**
* Stop all running compaction-like tasks having the provided {@code type}.
*
*
* @param type
* the type of compaction to stop. Can be one of: - COMPACTION -
* VALIDATION - CLEANUP - SCRUB - INDEX_BUILD
@ -54,9 +54,11 @@ public interface CompactionManagerMBean
/**
* Stop an individual running compaction using the compactionId.
* @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
* the transaction log files whose name starts with compaction_,
* located in the table transactions folder.
*
* @param compactionId
* Compaction ID of compaction to stop. Such IDs can be found in
* the transaction log files whose name starts with compaction_,
* located in the table transactions folder.
*/
public void stopCompactionById(String compactionId);
@ -67,7 +69,7 @@ public interface CompactionManagerMBean
/**
* Allows user to resize maximum size of the compaction thread pool.
*
*
* @param number
* New maximum of compaction threads
*/
@ -80,7 +82,7 @@ public interface CompactionManagerMBean
/**
* Allows user to resize maximum size of the compaction thread pool.
*
*
* @param number
* New maximum of compaction threads
*/
@ -93,7 +95,7 @@ public interface CompactionManagerMBean
/**
* Allows user to resize maximum size of the compaction thread pool.
*
*
* @param number
* New maximum of compaction threads
*/
@ -106,7 +108,7 @@ public interface CompactionManagerMBean
/**
* Allows user to resize maximum size of the validator thread pool.
*
*
* @param number
* New maximum of validator threads
*/

View File

@ -24,31 +24,12 @@
package org.apache.cassandra.gms;
public enum ApplicationState
{
STATUS,
LOAD,
SCHEMA,
DC,
RACK,
RELEASE_VERSION,
REMOVAL_COORDINATOR,
INTERNAL_IP,
RPC_ADDRESS,
X_11_PADDING, // padding specifically for 1.1
SEVERITY,
NET_VERSION,
HOST_ID,
TOKENS,
public enum ApplicationState {
STATUS, LOAD, SCHEMA, DC, RACK, RELEASE_VERSION, REMOVAL_COORDINATOR, INTERNAL_IP, RPC_ADDRESS, X_11_PADDING, // padding
// specifically
// for
// 1.1
SEVERITY, NET_VERSION, HOST_ID, TOKENS,
// pad to allow adding new states to existing cluster
X1,
X2,
X3,
X4,
X5,
X6,
X7,
X8,
X9,
X10,
X1, X2, X3, X4, X5, X6, X7, X8, X9, X10,
}

View File

@ -42,6 +42,7 @@ public class EndpointState {
ApplicationState[] applicationValues;
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(EndpointState.class.getName());
EndpointState(HeartBeatState initialHbState) {
applicationValues = ApplicationState.values();
hbState = initialHbState;
@ -101,8 +102,8 @@ public class EndpointState {
isAlive = alive;
}
@Override
public String toString() {
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = "
+ applicationState;
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
}
}

View File

@ -23,8 +23,7 @@ import java.util.Map;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
public interface FailureDetectorMBean
{
public interface FailureDetectorMBean {
public void dumpInterArrivalTimes();
public void setPhiConvictThreshold(double phi);

View File

@ -19,8 +19,7 @@ package org.apache.cassandra.gms;
import java.net.UnknownHostException;
public interface GossiperMBean
{
public interface GossiperMBean {
public long getEndpointDowntime(String address) throws UnknownHostException;
public int getCurrentGenerationNumber(String address) throws UnknownHostException;

View File

@ -58,8 +58,8 @@ class HeartBeatState {
version = Integer.MAX_VALUE;
}
@Override
public String toString() {
return String.format("HeartBeat: generation = %d, version = %d",
generation, version);
return String.format("HeartBeat: generation = %d, version = %d", generation, version);
}
}

View File

@ -22,34 +22,40 @@ import java.net.UnknownHostException;
/**
* MBean exposing standard Snitch info
*/
public interface EndpointSnitchInfoMBean
{
public interface EndpointSnitchInfoMBean {
/**
* Provides the Rack name depending on the respective snitch used, given the host name/ip
* Provides the Rack name depending on the respective snitch used, given the
* host name/ip
*
* @param host
* @throws UnknownHostException
*/
public String getRack(String host) throws UnknownHostException;
/**
* Provides the Datacenter name depending on the respective snitch used, given the hostname/ip
* Provides the Datacenter name depending on the respective snitch used,
* given the hostname/ip
*
* @param host
* @throws UnknownHostException
*/
public String getDatacenter(String host) throws UnknownHostException;
/**
* Provides the Rack name depending on the respective snitch used for this node
* Provides the Rack name depending on the respective snitch used for this
* node
*/
public String getRack();
/**
* Provides the Datacenter name depending on the respective snitch used for this node
* Provides the Datacenter name depending on the respective snitch used for
* this node
*/
public String getDatacenter();
/**
* Provides the snitch name of the cluster
*
* @return Snitch name
*/
public String getSnitchName();

View File

@ -24,8 +24,6 @@
package org.apache.cassandra.net;
import java.net.UnknownHostException;
import java.util.Map;
@ -133,6 +131,6 @@ public interface MessagingServiceMBean {
* Number of timeouts since last check per host.
*/
public Map<String, Long> getRecentTimeoutsPerHost();
public int getVersion(String address) throws UnknownHostException;
}

View File

@ -22,30 +22,33 @@
* Modified by Cloudius Systems
*/
package org.apache.cassandra.service;
import java.util.concurrent.ExecutionException;
public interface CacheServiceMBean
{
public interface CacheServiceMBean {
public int getRowCacheSavePeriodInSeconds();
public void setRowCacheSavePeriodInSeconds(int rcspis);
public int getKeyCacheSavePeriodInSeconds();
public void setKeyCacheSavePeriodInSeconds(int kcspis);
public int getCounterCacheSavePeriodInSeconds();
public void setCounterCacheSavePeriodInSeconds(int ccspis);
public int getRowCacheKeysToSave();
public void setRowCacheKeysToSave(int rckts);
public int getKeyCacheKeysToSave();
public void setKeyCacheKeysToSave(int kckts);
public int getCounterCacheKeysToSave();
public void setCounterCacheKeysToSave(int cckts);
/**
@ -69,8 +72,13 @@ public interface CacheServiceMBean
/**
* save row and key caches
*
* @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception
* @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity.
* @throws ExecutionException
* when attempting to retrieve the result of a task that aborted
* by throwing an exception
* @throws InterruptedException
* when a thread is waiting, sleeping, or otherwise occupied,
* and the thread is interrupted, either before or during the
* activity.
*/
public void saveCaches() throws ExecutionException, InterruptedException;
}

View File

@ -18,8 +18,8 @@
*/
package org.apache.cassandra.service;
public interface GCInspectorMXBean
{
// returns { interval (ms), max(gc real time (ms)), sum(gc real time (ms)), sum((gc real time (ms))^2), sum(gc bytes), count(gc) }
public interface GCInspectorMXBean {
// returns { interval (ms), max(gc real time (ms)), sum(gc real time (ms)),
// sum((gc real time (ms))^2), sum(gc bytes), count(gc) }
public double[] getAndResetStats();
}

View File

@ -83,8 +83,7 @@ public interface StorageProxyMBean {
public void setTruncateRpcTimeout(Long timeoutInMillis);
public void setNativeTransportMaxConcurrentConnections(
Long nativeTransportMaxConcurrentConnections);
public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections);
public Long getNativeTransportMaxConcurrentConnections();

View File

@ -132,8 +132,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
*
* @return mapping of ranges to end points
*/
public Map<List<String>, List<String>> getRangeToEndpointMap(
String keyspace);
public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace);
/**
* Retrieve a map of range to rpc addresses that describe the ring topology
@ -141,8 +140,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
*
* @return mapping of ranges to rpc addresses
*/
public Map<List<String>, List<String>> getRangeToRpcaddressMap(
String keyspace);
public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace);
/**
* The same as {@code describeRing(String)} but converts TokenRange to the
@ -164,8 +162,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* the keyspace to get the pending range map for.
* @return a map of pending ranges to endpoints
*/
public Map<List<String>, List<String>> getPendingRangeToEndpointMap(
String keyspace);
public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace);
/**
* Retrieve a map of tokens to endpoints, including the bootstrapping ones.
@ -211,11 +208,9 @@ public interface StorageServiceMBean extends NotificationEmitter {
* - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf,
String key);
public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key);
public List<InetAddress> getNaturalEndpoints(String keyspaceName,
ByteBuffer key);
public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key);
/**
* Takes the snapshot for the given keyspaces. A snapshot name must be
@ -226,8 +221,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* @param keyspaceNames
* the name of the keyspaces to snapshot; empty means "all."
*/
public void takeSnapshot(String tag, String... keyspaceNames)
throws IOException;
public void takeSnapshot(String tag, String... keyspaceNames) throws IOException;
/**
* Takes the snapshot of a specific column family. A snapshot name must be specified.
@ -251,8 +245,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* @param tag
* the tag given to the snapshot; may not be null or empty
*/
public void takeColumnFamilySnapshot(String keyspaceName,
String columnFamilyName, String tag) throws IOException;
public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException;
/**
* Takes the snapshot of a multiple column family from different keyspaces.
@ -264,15 +257,13 @@ public interface StorageServiceMBean extends NotificationEmitter {
* list of columnfamily from different keyspace in the form of
* ks1.cf1 ks2.cf2
*/
public void takeMultipleColumnFamilySnapshot(String tag,
String... columnFamilyList) throws IOException;
public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList) throws IOException;
/**
* Remove the snapshot with the given name from the given keyspaces. If no
* tag is specified we will remove all snapshots.
*/
public void clearSnapshot(String tag, String... keyspaceNames)
throws IOException;
public void clearSnapshot(String tag, String... keyspaceNames) throws IOException;
/**
* Get the details of all the snapshot
@ -289,21 +280,26 @@ public interface StorageServiceMBean extends NotificationEmitter {
public long trueSnapshotsSize();
/**
* Forces refresh of values stored in system.size_estimates of all column families.
* Forces refresh of values stored in system.size_estimates of all column
* families.
*/
public void refreshSizeEstimates() throws ExecutionException;
/**
* Forces major compaction of a single keyspace
*/
public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames)
throws IOException, ExecutionException, InterruptedException;
/**
* Trigger a cleanup of keys on a single keyspace
*/
@Deprecated
public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException;
public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException;
public int forceKeyspaceCleanup(String keyspaceName, String... tables)
throws IOException, ExecutionException, InterruptedException;
public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables)
throws IOException, ExecutionException, InterruptedException;
/**
* Scrub (deserialize + reserialize at the latest version, skipping bad rows
@ -313,26 +309,36 @@ public interface StorageServiceMBean extends NotificationEmitter {
* Scrubbed CFs will be snapshotted first, if disableSnapshot is false
*/
@Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames)
throws IOException, ExecutionException, InterruptedException;
@Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName,
String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Verify (checksums of) the given keyspace.
* If tableNames array is empty, all CFs are verified.
* Verify (checksums of) the given keyspace. If tableNames array is empty,
* all CFs are verified.
*
* The entire sstable will be read to ensure each cell validates if extendedVerify is true
* The entire sstable will be read to ensure each cell validates if
* extendedVerify is true
*/
public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int verify(boolean extendedVerify, String keyspaceName, String... tableNames)
throws IOException, ExecutionException, InterruptedException;
/**
* Rewrite all sstables to the latest version. Unlike scrub, it doesn't skip
* bad rows and do not snapshot sstables first.
*/
@Deprecated
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames)
throws IOException, ExecutionException, InterruptedException;
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames)
throws IOException, ExecutionException, InterruptedException;
/**
* Flush all memtables for the given column families, or all columnfamilies
@ -342,71 +348,86 @@ public interface StorageServiceMBean extends NotificationEmitter {
* @param columnFamilies
* @throws IOException
*/
public void forceKeyspaceFlush(String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException,
InterruptedException;
public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException;
/**
* Invoke repair asynchronously.
* You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
* Notification format is:
* type: "repair"
* userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status
* Invoke repair asynchronously. You can track repair progress by
* subscribing JMX notification sent from this StorageServiceMBean.
* Notification format is: type: "repair" userObject: int array of length 2,
* [0]=command number, [1]=ordinal of ActiveRepairService.Status
*
* @param keyspace Keyspace name to repair. Should not be null.
* @param options repair option.
* @param keyspace
* Keyspace name to repair. Should not be null.
* @param options
* repair option.
* @return Repair command number, or 0 if nothing to repair
*/
public int repairAsync(String keyspace, Map<String, String> options);
/**
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*/
@Deprecated
public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... tableNames) throws IOException;
public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... tableNames)
throws IOException;
/**
* Invoke repair asynchronously.
* You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
* Notification format is:
* type: "repair"
* userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status
* Invoke repair asynchronously. You can track repair progress by
* subscribing JMX notification sent from this StorageServiceMBean.
* Notification format is: type: "repair" userObject: int array of length 2,
* [0]=command number, [1]=ordinal of ActiveRepairService.Status
*
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*
* @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
* @param parallelismDegree
* 0: sequential, 1: parallel, 2: DC parallel
* @return Repair command number, or 0 if nothing to repair
*/
@Deprecated
public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... tableNames);
public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... tableNames);
/**
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*/
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames) throws IOException;
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames)
throws IOException;
/**
* Same as forceRepairAsync, but handles a specified range
*
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*
* @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
* @param parallelismDegree
* 0: sequential, 1: parallel, 2: DC parallel
*/
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames);
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree,
Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... tableNames);
/**
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*/
@Deprecated
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... tableNames);
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange,
boolean fullRepair, String... tableNames);
/**
* @deprecated use {@link #repairAsync(String keyspace, Map options)} instead.
* @deprecated use {@link #repairAsync(String keyspace, Map options)}
* instead.
*/
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... tableNames);
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential,
boolean isLocal, boolean fullRepair, String... tableNames);
public void forceTerminateAllRepairSessions();
@ -457,8 +478,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
*
* @see ch.qos.logback.classic.Level#toLevel(String)
*/
public void setLoggingLevel(String classQualifier, String level)
throws Exception;
public void setLoggingLevel(String classQualifier, String level) throws Exception;
/** get the runtime logging levels */
public Map<String, String> getLoggingLevels();
@ -479,8 +499,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* makes node unavailable for writes, flushes memtables and replays
* commitlog.
*/
public void drain()
throws IOException, InterruptedException, ExecutionException;
public void drain() throws IOException, InterruptedException, ExecutionException;
/**
* Truncates (deletes) the given columnFamily from the provided keyspace.
@ -494,8 +513,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* @param columnFamily
* The column family to delete data from.
*/
public void truncate(String keyspace, String columnFamily)
throws TimeoutException, IOException;
public void truncate(String keyspace, String columnFamily) throws TimeoutException, IOException;
/**
* given a list of tokens (representing the nodes in the cluster), returns a
@ -510,8 +528,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
* the same replication strategies and if yes then we will use the first
* else a empty Map is returned.
*/
public Map<InetAddress, Float> effectiveOwnership(String keyspace)
throws IllegalStateException;
public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException;
public List<String> getKeyspaces();
@ -535,9 +552,8 @@ public interface StorageServiceMBean extends NotificationEmitter {
* @param dynamicBadnessThreshold
* double, (default 0.0)
*/
public void updateSnitch(String epSnitchClassName, Boolean dynamic,
Integer dynamicUpdateInterval, Integer dynamicResetInterval,
Double dynamicBadnessThreshold) throws ClassNotFoundException;
public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval,
Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException;
// allows a user to forcibly 'kill' a sick node
public void stopGossiping();
@ -579,6 +595,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
public int getStreamThroughputMbPerSec();
public void setInterDCStreamThroughputMbPerSec(int value);
public int getInterDCStreamThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
@ -635,8 +652,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
/**
* rebuild the specified indexes
*/
public void rebuildSecondaryIndex(String ksName, String cfName,
String... idxNames);
public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames);
public void resetLocalSchema() throws IOException;
@ -657,11 +673,9 @@ public interface StorageServiceMBean extends NotificationEmitter {
*/
public double getTraceProbability();
void disableAutoCompaction(String ks, String... columnFamilies)
throws IOException;
void disableAutoCompaction(String ks, String... columnFamilies) throws IOException;
void enableAutoCompaction(String ks, String... columnFamilies)
throws IOException;
void enableAutoCompaction(String ks, String... columnFamilies) throws IOException;
public void deliverHints(String host) throws UnknownHostException;
@ -685,10 +699,13 @@ public interface StorageServiceMBean extends NotificationEmitter {
/** Returns the threshold for rejecting queries due to a large batch size */
public int getBatchSizeFailureThreshold();
/** Sets the threshold for rejecting queries due to a large batch size */
public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold);
/** Sets the hinted handoff throttle in kb per second, per delivery thread. */
/**
* Sets the hinted handoff throttle in kb per second, per delivery thread.
*/
public void setHintedHandoffThrottleInKB(int throttleInKB);
/**

View File

@ -29,6 +29,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.json.JsonArray;
import javax.json.JsonObject;
@ -37,25 +38,21 @@ import com.google.common.base.Objects;
/**
* ProgressInfo contains file transfer progress.
*/
public class ProgressInfo implements Serializable
{
@SuppressWarnings("serial")
public class ProgressInfo implements Serializable {
/**
* Direction of the stream.
*/
public static enum Direction
{
OUT(0),
IN(1);
public static enum Direction {
OUT(0), IN(1);
public final byte code;
private Direction(int code)
{
private Direction(int code) {
this.code = (byte) code;
}
public static Direction fromByte(byte direction)
{
public static Direction fromByte(byte direction) {
return direction == 0 ? OUT : IN;
}
}
@ -67,8 +64,8 @@ public class ProgressInfo implements Serializable
public final long currentBytes;
public final long totalBytes;
public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
{
public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes,
long totalBytes) {
assert totalBytes > 0;
this.peer = peer;
@ -81,12 +78,9 @@ public class ProgressInfo implements Serializable
static public ProgressInfo fromJsonObject(JsonObject obj) {
try {
return new ProgressInfo(InetAddress.getByName(obj.getString("peer")),
obj.getInt("session_index"),
obj.getString("file_name"),
Direction.valueOf(obj.getString("direction")),
obj.getJsonNumber("current_bytes").longValue(),
obj.getJsonNumber("total_bytes").longValue());
return new ProgressInfo(InetAddress.getByName(obj.getString("peer")), obj.getInt("session_index"),
obj.getString("file_name"), Direction.valueOf(obj.getString("direction")),
obj.getJsonNumber("current_bytes").longValue(), obj.getJsonNumber("total_bytes").longValue());
} catch (UnknownHostException e) {
// Not suppose to get here
}
@ -104,45 +98,55 @@ public class ProgressInfo implements Serializable
}
return res;
}
/**
* @return true if file transfer is completed
*/
public boolean isCompleted()
{
public boolean isCompleted() {
return currentBytes >= totalBytes;
}
/**
* ProgressInfo is considered to be equal only when all attributes except currentBytes are equal.
* ProgressInfo is considered to be equal only when all attributes except
* currentBytes are equal.
*/
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProgressInfo that = (ProgressInfo) o;
if (totalBytes != that.totalBytes) return false;
if (direction != that.direction) return false;
if (!fileName.equals(that.fileName)) return false;
if (sessionIndex != that.sessionIndex) return false;
if (totalBytes != that.totalBytes) {
return false;
}
if (direction != that.direction) {
return false;
}
if (!fileName.equals(that.fileName)) {
return false;
}
if (sessionIndex != that.sessionIndex) {
return false;
}
return peer.equals(that.peer);
}
@Override
public int hashCode()
{
public int hashCode() {
return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
}
@Override
public String toString()
{
public String toString() {
StringBuilder sb = new StringBuilder(fileName);
sb.append(" ").append(currentBytes);
sb.append("/").append(totalBytes).append(" bytes");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append("(").append(currentBytes * 100 / totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
sb.append("idx:").append(sessionIndex);
sb.append(peer);

View File

@ -28,30 +28,26 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.json.JsonArray;
import javax.json.JsonObject;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
/**
* Stream session info.
*/
public final class SessionInfo implements Serializable
{
@SuppressWarnings("serial")
public final class SessionInfo implements Serializable {
public final InetAddress peer;
public final int sessionIndex;
public final InetAddress connecting;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
/** Immutable collection of sending summaries */
public final Collection<StreamSummary> sendingSummaries;
/** Current session state */
public final StreamSession.State state;
@ -67,15 +63,10 @@ public final class SessionInfo implements Serializable
return null;
}
public SessionInfo(InetAddress peer,
int sessionIndex,
InetAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state,
Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
public SessionInfo(InetAddress peer, int sessionIndex, InetAddress connecting,
Collection<StreamSummary> receivingSummaries, Collection<StreamSummary> sendingSummaries,
StreamSession.State state, Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this.peer = peer;
this.sessionIndex = sessionIndex;
this.connecting = connecting;
@ -86,24 +77,19 @@ public final class SessionInfo implements Serializable
this.state = state;
}
public SessionInfo(String peer,
int sessionIndex,
String connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
String state,
Map<String, ProgressInfo> receivingFiles,
public SessionInfo(String peer, int sessionIndex, String connecting, Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries, String state, Map<String, ProgressInfo> receivingFiles,
Map<String, ProgressInfo> sendingFiles) {
this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries,
StreamSession.State.valueOf(state), receivingFiles, sendingFiles);
}
ProgressInfo in;
public static SessionInfo fromJsonObject(JsonObject obj) {
return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"),
obj.getString("connecting"),
return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"),
StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")),
StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")),
obj.getString("state"),
StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), obj.getString("state"),
ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")),
ProgressInfo.fromJArrray(obj.getJsonArray("sending_files")));
}
@ -118,135 +104,117 @@ public final class SessionInfo implements Serializable
return res;
}
public boolean isFailed()
{
public boolean isFailed() {
return state == StreamSession.State.FAILED;
}
/**
* Update progress of receiving/sending file.
*
* @param newProgress new progress info
* @param newProgress
* new progress info
*/
public void updateProgress(ProgressInfo newProgress)
{
public void updateProgress(ProgressInfo newProgress) {
assert peer.equals(newProgress.peer);
Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN
? receivingFiles : sendingFiles;
Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN ? receivingFiles
: sendingFiles;
currentFiles.put(newProgress.fileName, newProgress);
}
public Collection<ProgressInfo> getReceivingFiles()
{
public Collection<ProgressInfo> getReceivingFiles() {
return receivingFiles.values();
}
public Collection<ProgressInfo> getSendingFiles()
{
public Collection<ProgressInfo> getSendingFiles() {
return sendingFiles.values();
}
/**
* @return total number of files already received.
*/
public long getTotalFilesReceived()
{
public long getTotalFilesReceived() {
return getTotalFilesCompleted(receivingFiles.values());
}
/**
* @return total number of files already sent.
*/
public long getTotalFilesSent()
{
public long getTotalFilesSent() {
return getTotalFilesCompleted(sendingFiles.values());
}
/**
* @return total size(in bytes) already received.
*/
public long getTotalSizeReceived()
{
public long getTotalSizeReceived() {
return getTotalSizeInProgress(receivingFiles.values());
}
/**
* @return total size(in bytes) already sent.
*/
public long getTotalSizeSent()
{
public long getTotalSizeSent() {
return getTotalSizeInProgress(sendingFiles.values());
}
/**
* @return total number of files to receive in the session
*/
public long getTotalFilesToReceive()
{
public long getTotalFilesToReceive() {
return getTotalFiles(receivingSummaries);
}
/**
* @return total number of files to send in the session
*/
public long getTotalFilesToSend()
{
public long getTotalFilesToSend() {
return getTotalFiles(sendingSummaries);
}
/**
* @return total size(in bytes) to receive in the session
*/
public long getTotalSizeToReceive()
{
public long getTotalSizeToReceive() {
return getTotalSizes(receivingSummaries);
}
/**
* @return total size(in bytes) to send in the session
*/
public long getTotalSizeToSend()
{
public long getTotalSizeToSend() {
return getTotalSizes(sendingSummaries);
}
private long getTotalSizeInProgress(Collection<ProgressInfo> files)
{
private long getTotalSizeInProgress(Collection<ProgressInfo> files) {
long total = 0;
for (ProgressInfo file : files)
for (ProgressInfo file : files) {
total += file.currentBytes;
}
return total;
}
private long getTotalFiles(Collection<StreamSummary> summaries)
{
private long getTotalFiles(Collection<StreamSummary> summaries) {
long total = 0;
for (StreamSummary summary : summaries)
for (StreamSummary summary : summaries) {
total += summary.files;
}
return total;
}
private long getTotalSizes(Collection<StreamSummary> summaries)
{
private long getTotalSizes(Collection<StreamSummary> summaries) {
if (summaries == null) {
return 0;
}
long total = 0;
for (StreamSummary summary : summaries)
for (StreamSummary summary : summaries) {
total += summary.totalSize;
}
return total;
}
private long getTotalFilesCompleted(Collection<ProgressInfo> files)
{
Iterable<ProgressInfo> completed = Iterables.filter(files, new Predicate<ProgressInfo>()
{
public boolean apply(ProgressInfo input)
{
return input.isCompleted();
}
});
private long getTotalFilesCompleted(Collection<ProgressInfo> files) {
Iterable<ProgressInfo> completed = Iterables.filter(files, input -> input.isCompleted());
return Iterables.size(completed);
}
}

View File

@ -25,6 +25,7 @@
package org.apache.cassandra.streaming;
import java.util.Set;
import javax.management.NotificationEmitter;
import javax.management.openmbean.CompositeData;

View File

@ -25,81 +25,80 @@
package org.apache.cassandra.streaming;
/**
* Handles the streaming a one or more section of one of more sstables to and from a specific
* remote node.
* Handles the streaming a one or more section of one of more sstables to and
* from a specific remote node.
*
* Both this node and the remote one will create a similar symmetrical StreamSession. A streaming
* session has the following life-cycle:
* Both this node and the remote one will create a similar symmetrical
* StreamSession. A streaming session has the following life-cycle:
*
* 1. Connections Initialization
*
* (a) A node (the initiator in the following) create a new StreamSession, initialize it (init())
* and then start it (start()). Start will create a {@link ConnectionHandler} that will create
* two connections to the remote node (the follower in the following) with whom to stream and send
* a StreamInit message. The first connection will be the incoming connection for the
* initiator, and the second connection will be the outgoing.
* (b) Upon reception of that StreamInit message, the follower creates its own StreamSession,
* initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler
* according to StreamInit message's isForOutgoing flag.
* (d) When the both incoming and outgoing connections are established, StreamSession calls
* StreamSession#onInitializationComplete method to start the streaming prepare phase
* (StreamResultFuture.startStreaming()).
* (a) A node (the initiator in the following) create a new StreamSession,
* initialize it (init()) and then start it (start()). Start will create a
* {@link ConnectionHandler} that will create two connections to the remote node
* (the follower in the following) with whom to stream and send a StreamInit
* message. The first connection will be the incoming connection for the
* initiator, and the second connection will be the outgoing. (b) Upon reception
* of that StreamInit message, the follower creates its own StreamSession,
* initialize it if it still does not exist, and attach connecting socket to its
* ConnectionHandler according to StreamInit message's isForOutgoing flag. (d)
* When the both incoming and outgoing connections are established,
* StreamSession calls StreamSession#onInitializationComplete method to start
* the streaming prepare phase (StreamResultFuture.startStreaming()).
*
* 2. Streaming preparation phase
*
* (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a
* PrepareMessage that includes what files/sections this node will stream to the follower
* (stored in a StreamTransferTask, each column family has it's own transfer task) and what
* the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has
* nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise,
* it waits for the follower PrepareMessage.
* (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive
* and send back its own PrepareMessage with a summary of the files/sections that will be sent to
* the initiator (prepare()). After having sent that message, the follower goes to its Streamning
* phase.
* (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will
* receive and then goes to his own Streaming phase.
* (a) This phase is started when the initiator onInitializationComplete()
* method is called. This method sends a PrepareMessage that includes what
* files/sections this node will stream to the follower (stored in a
* StreamTransferTask, each column family has it's own transfer task) and what
* the follower needs to stream back (StreamReceiveTask, same as above). If the
* initiator has nothing to receive from the follower, it goes directly to its
* Streaming phase. Otherwise, it waits for the follower PrepareMessage. (b)
* Upon reception of the PrepareMessage, the follower records which
* files/sections it will receive and send back its own PrepareMessage with a
* summary of the files/sections that will be sent to the initiator (prepare()).
* After having sent that message, the follower goes to its Streamning phase.
* (c) When the initiator receives the follower PrepareMessage, it records which
* files/sections it will receive and then goes to his own Streaming phase.
*
* 3. Streaming phase
*
* (a) The streaming phase is started by each node (the sender in the follower, but note that each side
* of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles().
* This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage
* consists of a FileMessageHeader that indicates which file is coming and then start streaming the
* content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the
* fileSent() method is called for that file. If all the files for a StreamTransferTask are sent
* (StreamTransferTask.complete()), the task is marked complete (taskCompleted()).
* (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in
* FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as
* complete (received()). When all files for the StreamReceiveTask have been received, the sstables
* are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
* is marked complete (taskCompleted())
* (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream
* (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
* by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new
* FileMessage for that file.
* (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
* (maybeCompleted()).
* (a) The streaming phase is started by each node (the sender in the follower,
* but note that each side of the StreamSession may be sender for some of the
* files) involved by calling startStreamingFiles(). This will sequentially send
* a FileMessage for each file of each SteamTransferTask. Each FileMessage
* consists of a FileMessageHeader that indicates which file is coming and then
* start streaming the content for that file (StreamWriter in
* FileMessage.serialize()). When a file is fully sent, the fileSent() method is
* called for that file. If all the files for a StreamTransferTask are sent
* (StreamTransferTask.complete()), the task is marked complete
* (taskCompleted()). (b) On the receiving side, a SSTable will be written for
* the incoming file (StreamReader in FileMessage.deserialize()) and once the
* FileMessage is fully received, the file will be marked as complete
* (received()). When all files for the StreamReceiveTask have been received,
* the sstables are added to the CFS (and 2ndary index are built,
* StreamReceiveTask.complete()) and the task is marked complete
* (taskCompleted()) (b) If during the streaming of a particular file an I/O
* error occurs on the receiving end of a stream (FileMessage.deserialize), the
* node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
* by sending a RetryMessage to the sender. On receiving a RetryMessage, the
* sender simply issue a new FileMessage for that file. (c) When all transfer
* and receive tasks for a session are complete, the move to the Completion
* phase (maybeCompleted()).
*
* 4. Completion phase
*
* (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()).
* If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
* (a) When a node has finished all transfer and receive task, it enter the
* completion phase (maybeCompleted()). If it had already received a
* CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
* session is done is is closed (closeSession()). Otherwise, the node switch to
* the WAIT_COMPLETE state and send a CompleteMessage to the other side.
*/
public class StreamSession
{
public class StreamSession {
public static enum State
{
INITIALIZED,
PREPARING,
STREAMING,
WAIT_COMPLETE,
COMPLETE,
FAILED,
public static enum State {
INITIALIZED, PREPARING, STREAMING, WAIT_COMPLETE, COMPLETE, FAILED,
}
}

View File

@ -28,14 +28,12 @@ import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* Current snapshot of streaming progress.
*/
public class StreamState implements Serializable
{
public class StreamState implements Serializable {
/**
*
*/
@ -49,19 +47,12 @@ public class StreamState implements Serializable
this.description = description;
this.sessions = sessions;
}
public StreamState(String planId, String description, Set<SessionInfo> sessions)
{
public StreamState(String planId, String description, Set<SessionInfo> sessions) {
this(UUID.fromString(planId), description, sessions);
}
public boolean hasFailedSession()
{
return Iterables.any(sessions, new Predicate<SessionInfo>()
{
public boolean apply(SessionInfo session)
{
return session.isFailed();
}
});
public boolean hasFailedSession() {
return Iterables.any(sessions, session -> session.isFailed());
}
}

View File

@ -36,18 +36,17 @@ import com.google.common.base.Objects;
/**
* Summary of streaming.
*/
public class StreamSummary
{
public class StreamSummary {
public final UUID cfId;
/**
* Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
* Number of files to transfer. Can be 0 if nothing to transfer for some
* streaming request.
*/
public final int files;
public final long totalSize;
public StreamSummary(UUID cfId, int files, long totalSize)
{
public StreamSummary(UUID cfId, int files, long totalSize) {
this.cfId = cfId;
this.files = files;
this.totalSize = totalSize;
@ -58,7 +57,8 @@ public class StreamSummary
}
public static StreamSummary fromJsonObject(JsonObject obj) {
return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), obj.getJsonNumber("total_size").longValue());
return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"),
obj.getJsonNumber("total_size").longValue());
}
public static Collection<StreamSummary> fromJsonArr(JsonArray arr) {
@ -71,24 +71,26 @@ public class StreamSummary
}
return res;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StreamSummary summary = (StreamSummary) o;
return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
}
@Override
public int hashCode()
{
public int hashCode() {
return Objects.hashCode(cfId, files, totalSize);
}
@Override
public String toString()
{
public String toString() {
final StringBuilder sb = new StringBuilder("StreamSummary{");
sb.append("path=").append(cfId);
sb.append(", files=").append(files);

View File

@ -29,54 +29,38 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.management.openmbean.*;
import com.google.common.base.Throwables;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import org.apache.cassandra.streaming.ProgressInfo;
public class ProgressInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
"sessionIndex",
"fileName",
"direction",
"currentBytes",
"totalBytes"};
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
"Session peer",
"Index of session",
"Name of the file",
"Direction('IN' or 'OUT')",
"Current bytes transferred",
"Total bytes to transfer"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
SimpleType.LONG};
import com.google.common.base.Throwables;
public class ProgressInfoCompositeData {
private static final String[] ITEM_NAMES = new String[] { "planId", "peer", "sessionIndex", "fileName", "direction",
"currentBytes", "totalBytes" };
private static final String[] ITEM_DESCS = new String[] { "String representation of Plan ID", "Session peer",
"Index of session", "Name of the file", "Direction('IN' or 'OUT')", "Current bytes transferred",
"Total bytes to transfer" };
private static final OpenType<?>[] ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
SimpleType.INTEGER, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG };
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
"ProgressInfo",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
static {
try {
COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(), "ProgressInfo", ITEM_NAMES, ITEM_DESCS,
ITEM_TYPES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
{
public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
@ -85,30 +69,19 @@ public class ProgressInfoCompositeData
valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
try
{
try {
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static ProgressInfo fromCompositeData(CompositeData cd)
{
public static ProgressInfo fromCompositeData(CompositeData cd) {
Object[] values = cd.getAll(ITEM_NAMES);
try
{
return new ProgressInfo(InetAddress.getByName((String) values[1]),
(int) values[2],
(String) values[3],
ProgressInfo.Direction.valueOf((String)values[4]),
(long) values[5],
(long) values[6]);
}
catch (UnknownHostException e)
{
try {
return new ProgressInfo(InetAddress.getByName((String) values[1]), (int) values[2], (String) values[3],
ProgressInfo.Direction.valueOf((String) values[4]), (long) values[5], (long) values[6]);
} catch (UnknownHostException e) {
throw Throwables.propagate(e);
}
}

View File

@ -26,8 +26,24 @@ package org.apache.cassandra.streaming.management;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import javax.management.openmbean.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
@ -35,149 +51,86 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
import java.util.HashMap;
public class SessionInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
"connecting",
"receivingSummaries",
"sendingSummaries",
"state",
"receivingFiles",
"sendingFiles",
"sessionIndex"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
"Session peer",
"Connecting address",
"Summaries of receiving data",
"Summaries of sending data",
"Current session state",
"Receiving files",
"Sending files",
"Session index"};
public class SessionInfoCompositeData {
private static final String[] ITEM_NAMES = new String[] { "planId", "peer", "connecting", "receivingSummaries",
"sendingSummaries", "state", "receivingFiles", "sendingFiles", "sessionIndex" };
private static final String[] ITEM_DESCS = new String[] { "Plan ID", "Session peer", "Connecting address",
"Summaries of receiving data", "Summaries of sending data", "Current session state", "Receiving files",
"Sending files", "Session index" };
private static final OpenType<?>[] ITEM_TYPES;
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
SimpleType.STRING,
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
SimpleType.STRING,
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
SimpleType.INTEGER};
COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
"SessionInfo",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
static {
try {
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING,
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), SimpleType.STRING,
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), SimpleType.INTEGER };
COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(), "SessionInfo", ITEM_NAMES, ITEM_DESCS,
ITEM_TYPES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
{
public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
{
public CompositeData apply(StreamSummary input)
{
return StreamSummaryCompositeData.toCompositeData(input);
}
};
Function<StreamSummary, CompositeData> fromStreamSummary = input -> StreamSummaryCompositeData
.toCompositeData(input);
valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
{
public CompositeData apply(ProgressInfo input)
{
return ProgressInfoCompositeData.toCompositeData(planId, input);
}
};
Function<ProgressInfo, CompositeData> fromProgressInfo = input -> ProgressInfoCompositeData
.toCompositeData(planId, input);
valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
try
{
try {
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static SessionInfo fromCompositeData(CompositeData cd)
{
public static SessionInfo fromCompositeData(CompositeData cd) {
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
Object[] values = cd.getAll(ITEM_NAMES);
InetAddress peer, connecting;
try
{
try {
peer = InetAddress.getByName((String) values[1]);
connecting = InetAddress.getByName((String) values[2]);
}
catch (UnknownHostException e)
{
} catch (UnknownHostException e) {
throw Throwables.propagate(e);
}
Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
{
public StreamSummary apply(CompositeData input)
{
return StreamSummaryCompositeData.fromCompositeData(input);
}
};
SessionInfo info = new SessionInfo(peer,
(int)values[8],
connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
StreamSession.State.valueOf((String) values[5]),
new HashMap<String, ProgressInfo>(), new HashMap<String, ProgressInfo>());
Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)
{
return ProgressInfoCompositeData.fromCompositeData(input);
}
};
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
{
Function<CompositeData, StreamSummary> toStreamSummary = input -> StreamSummaryCompositeData
.fromCompositeData(input);
SessionInfo info = new SessionInfo(peer, (int) values[8], connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
StreamSession.State.valueOf((String) values[5]), new HashMap<String, ProgressInfo>(),
new HashMap<String, ProgressInfo>());
Function<CompositeData, ProgressInfo> toProgressInfo = input -> ProgressInfoCompositeData
.fromCompositeData(input);
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) {
info.updateProgress(progress);
}
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
{
for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) {
info.updateProgress(progress);
}
return info;
}
private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
{
private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func) {
return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
}
private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
{
private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert,
Function<T, CompositeData> func) {
if (toConvert == null) {
toConvert = Sets.newHashSet();
}

View File

@ -24,79 +24,68 @@
package org.apache.cassandra.streaming.management;
import java.util.*;
import javax.management.openmbean.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamState;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamState;
/**
*/
public class StreamStateCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions",
"currentRxBytes", "totalRxBytes", "rxPercentage",
"currentTxBytes", "totalTxBytes", "txPercentage"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
"Stream plan description",
"Active stream sessions",
"Number of bytes received across all streams",
"Total bytes available to receive across all streams",
"Percentage received across all streams",
"Number of bytes sent across all streams",
"Total bytes available to send across all streams",
"Percentage sent across all streams"};
public class StreamStateCompositeData {
private static final String[] ITEM_NAMES = new String[] { "planId", "description", "sessions", "currentRxBytes",
"totalRxBytes", "rxPercentage", "currentTxBytes", "totalTxBytes", "txPercentage" };
private static final String[] ITEM_DESCS = new String[] { "Plan ID of this stream", "Stream plan description",
"Active stream sessions", "Number of bytes received across all streams",
"Total bytes available to receive across all streams", "Percentage received across all streams",
"Number of bytes sent across all streams", "Total bytes available to send across all streams",
"Percentage sent across all streams" };
private static final OpenType<?>[] ITEM_TYPES;
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE),
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE,
SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE};
COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
"StreamState",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
static {
try {
ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE), SimpleType.LONG, SimpleType.LONG,
SimpleType.DOUBLE, SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE };
COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(), "StreamState", ITEM_NAMES, ITEM_DESCS,
ITEM_TYPES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(final StreamState streamState)
{
public static CompositeData toCompositeData(final StreamState streamState) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
valueMap.put(ITEM_NAMES[1], streamState.description);
CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
{
public CompositeData apply(SessionInfo input)
{
return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
}
})).toArray(sessions);
Lists.newArrayList(Iterables.transform(streamState.sessions,
input -> SessionInfoCompositeData.toCompositeData(streamState.planId, input))).toArray(sessions);
valueMap.put(ITEM_NAMES[2], sessions);
long currentRxBytes = 0;
long totalRxBytes = 0;
long currentTxBytes = 0;
long totalTxBytes = 0;
for (SessionInfo sessInfo : streamState.sessions)
{
for (SessionInfo sessInfo : streamState.sessions) {
currentRxBytes += sessInfo.getTotalSizeReceived();
totalRxBytes += sessInfo.getTotalSizeToReceive();
currentTxBytes += sessInfo.getTotalSizeSent();
@ -112,30 +101,20 @@ public class StreamStateCompositeData
valueMap.put(ITEM_NAMES[7], totalTxBytes);
valueMap.put(ITEM_NAMES[8], txPercentage);
try
{
try {
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static StreamState fromCompositeData(CompositeData cd)
{
public static StreamState fromCompositeData(CompositeData cd) {
assert cd.getCompositeType().equals(COMPOSITE_TYPE);
Object[] values = cd.getAll(ITEM_NAMES);
UUID planId = UUID.fromString((String) values[0]);
String description = (String) values[1];
Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
new Function<CompositeData, SessionInfo>()
{
public SessionInfo apply(CompositeData input)
{
return SessionInfoCompositeData.fromCompositeData(input);
}
}));
input -> SessionInfoCompositeData.fromCompositeData(input)));
return new StreamState(planId, description, sessions);
}
}

View File

@ -27,63 +27,51 @@ package org.apache.cassandra.streaming.management;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.management.openmbean.*;
import com.google.common.base.Throwables;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import org.apache.cassandra.streaming.StreamSummary;
import com.google.common.base.Throwables;
/**
*/
public class StreamSummaryCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"cfId",
"files",
"totalSize"};
private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
"Number of files",
"Total bytes of the files"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.INTEGER,
SimpleType.LONG};
public class StreamSummaryCompositeData {
private static final String[] ITEM_NAMES = new String[] { "cfId", "files", "totalSize" };
private static final String[] ITEM_DESCS = new String[] { "ColumnFamilu ID", "Number of files",
"Total bytes of the files" };
private static final OpenType<?>[] ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.INTEGER,
SimpleType.LONG };
public static final CompositeType COMPOSITE_TYPE;
static {
try
{
COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
"StreamSummary",
ITEM_NAMES,
ITEM_DESCS,
ITEM_TYPES);
}
catch (OpenDataException e)
{
static {
try {
COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(), "StreamSummary", ITEM_NAMES, ITEM_DESCS,
ITEM_TYPES);
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static CompositeData toCompositeData(StreamSummary streamSummary)
{
public static CompositeData toCompositeData(StreamSummary streamSummary) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
valueMap.put(ITEM_NAMES[1], streamSummary.files);
valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
try
{
try {
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
}
catch (OpenDataException e)
{
} catch (OpenDataException e) {
throw Throwables.propagate(e);
}
}
public static StreamSummary fromCompositeData(CompositeData cd)
{
public static StreamSummary fromCompositeData(CompositeData cd) {
Object[] values = cd.getAll(ITEM_NAMES);
return new StreamSummary(UUID.fromString((String) values[0]),
(int) values[1],
(long) values[2]);
return new StreamSummary(UUID.fromString((String) values[0]), (int) values[1], (long) values[2]);
}
}