From 434ce947b09a07dea0733858926a1fa22b761dc7 Mon Sep 17 00:00:00 2001 From: elcallio Date: Tue, 11 Oct 2016 14:17:06 +0200 Subject: [PATCH] Code formatting + source cleanup (eclipse) --- SCYLLA-VERSION-GEN | 0 scripts/git-archive-all | 0 .../com/scylladb/jmx/utils/FileUtils.java | 52 ++---- .../java/com/scylladb/jmx/utils/Pair.java | 25 +-- .../jmx/utils/SnapshotDetailsTabularData.java | 44 ++--- .../cassandra/db/ColumnFamilyStoreMBean.java | 67 ++++--- .../db/commitlog/CommitLogMBean.java | 12 +- .../CompactionHistoryTabularData.java | 30 ++- .../db/compaction/CompactionManagerMBean.java | 22 ++- .../cassandra/gms/ApplicationState.java | 33 +--- .../apache/cassandra/gms/EndpointState.java | 5 +- .../cassandra/gms/FailureDetectorMBean.java | 3 +- .../apache/cassandra/gms/GossiperMBean.java | 3 +- .../apache/cassandra/gms/HeartBeatState.java | 4 +- .../locator/EndpointSnitchInfoMBean.java | 18 +- .../cassandra/net/MessagingServiceMBean.java | 4 +- .../cassandra/service/CacheServiceMBean.java | 20 +- .../cassandra/service/GCInspectorMXBean.java | 6 +- .../cassandra/service/StorageProxyMBean.java | 3 +- .../service/StorageServiceMBean.java | 173 ++++++++++-------- .../cassandra/streaming/ProgressInfo.java | 72 ++++---- .../cassandra/streaming/SessionInfo.java | 112 ++++-------- .../streaming/StreamManagerMBean.java | 1 + .../cassandra/streaming/StreamSession.java | 119 ++++++------ .../cassandra/streaming/StreamState.java | 19 +- .../cassandra/streaming/StreamSummary.java | 30 +-- .../management/ProgressInfoCompositeData.java | 85 +++------ .../management/SessionInfoCompositeData.java | 163 ++++++----------- .../management/StreamStateCompositeData.java | 101 ++++------ .../StreamSummaryCompositeData.java | 60 +++--- 30 files changed, 575 insertions(+), 711 deletions(-) mode change 100755 => 100644 SCYLLA-VERSION-GEN mode change 100755 => 100644 scripts/git-archive-all diff --git a/SCYLLA-VERSION-GEN b/SCYLLA-VERSION-GEN old mode 100755 new mode 100644 diff --git a/scripts/git-archive-all b/scripts/git-archive-all old mode 100755 new mode 100644 diff --git a/src/main/java/com/scylladb/jmx/utils/FileUtils.java b/src/main/java/com/scylladb/jmx/utils/FileUtils.java index 11ffba1..b71c055 100644 --- a/src/main/java/com/scylladb/jmx/utils/FileUtils.java +++ b/src/main/java/com/scylladb/jmx/utils/FileUtils.java @@ -24,69 +24,57 @@ package com.scylladb.jmx.utils; -import java.io.*; +import java.io.File; import java.text.DecimalFormat; -public class FileUtils -{ +public class FileUtils { private static final double KB = 1024d; - private static final double MB = 1024*1024d; - private static final double GB = 1024*1024*1024d; - private static final double TB = 1024*1024*1024*1024d; + private static final double MB = 1024 * 1024d; + private static final double GB = 1024 * 1024 * 1024d; + private static final double TB = 1024 * 1024 * 1024 * 1024d; private static final DecimalFormat df = new DecimalFormat("#.##"); - - public static String stringifyFileSize(double value) - { + public static String stringifyFileSize(double value) { double d; - if ( value >= TB ) - { + if (value >= TB) { d = value / TB; String val = df.format(d); return val + " TB"; - } - else if ( value >= GB ) - { + } else if (value >= GB) { d = value / GB; String val = df.format(d); return val + " GB"; - } - else if ( value >= MB ) - { + } else if (value >= MB) { d = value / MB; String val = df.format(d); return val + " MB"; - } - else if ( value >= KB ) - { + } else if (value >= KB) { d = value / KB; String val = df.format(d); return val + " KB"; - } - else - { + } else { String val = df.format(value); return val + " bytes"; } } - /** * Get the size of a directory in bytes - * @param directory The directory for which we need size. + * + * @param directory + * The directory for which we need size. * @return The size of the directory */ - public static long folderSize(File directory) - { + public static long folderSize(File directory) { long length = 0; - for (File file : directory.listFiles()) - { - if (file.isFile()) + for (File file : directory.listFiles()) { + if (file.isFile()) { length += file.length(); - else + } else { length += folderSize(file); + } } return length; } - } +} diff --git a/src/main/java/com/scylladb/jmx/utils/Pair.java b/src/main/java/com/scylladb/jmx/utils/Pair.java index c5aa795..9644cc9 100644 --- a/src/main/java/com/scylladb/jmx/utils/Pair.java +++ b/src/main/java/com/scylladb/jmx/utils/Pair.java @@ -26,43 +26,38 @@ package com.scylladb.jmx.utils; import com.google.common.base.Objects; -public class Pair -{ +public class Pair { public final T1 left; public final T2 right; - protected Pair(T1 left, T2 right) - { + protected Pair(T1 left, T2 right) { this.left = left; this.right = right; } @Override - public final int hashCode() - { + public final int hashCode() { int hashCode = 31 + (left == null ? 0 : left.hashCode()); - return 31*hashCode + (right == null ? 0 : right.hashCode()); + return 31 * hashCode + (right == null ? 0 : right.hashCode()); } @Override - public final boolean equals(Object o) - { - if(!(o instanceof Pair)) + public final boolean equals(Object o) { + if (!(o instanceof Pair)) { return false; + } @SuppressWarnings("rawtypes") - Pair that = (Pair)o; + Pair that = (Pair) o; // handles nulls properly return Objects.equal(left, that.left) && Objects.equal(right, that.right); } @Override - public String toString() - { + public String toString() { return "(" + left + "," + right + ")"; } - public static Pair create(X x, Y y) - { + public static Pair create(X x, Y y) { return new Pair(x, y); } } diff --git a/src/main/java/com/scylladb/jmx/utils/SnapshotDetailsTabularData.java b/src/main/java/com/scylladb/jmx/utils/SnapshotDetailsTabularData.java index 403e7c4..3624ee4 100644 --- a/src/main/java/com/scylladb/jmx/utils/SnapshotDetailsTabularData.java +++ b/src/main/java/com/scylladb/jmx/utils/SnapshotDetailsTabularData.java @@ -23,18 +23,24 @@ package com.scylladb.jmx.utils; import java.util.Map; -import javax.management.openmbean.*; + +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import com.google.common.base.Throwables; public class SnapshotDetailsTabularData { - private static final String[] ITEM_NAMES = new String[] { "Snapshot name", - "Keyspace name", "Column family name", "True size", "Size on disk" }; + private static final String[] ITEM_NAMES = new String[] { "Snapshot name", "Keyspace name", "Column family name", + "True size", "Size on disk" }; - private static final String[] ITEM_DESCS = new String[] { "snapshot_name", - "keyspace_name", "columnfamily_name", "TrueDiskSpaceUsed", - "TotalDiskSpaceUsed" }; + private static final String[] ITEM_DESCS = new String[] { "snapshot_name", "keyspace_name", "columnfamily_name", + "TrueDiskSpaceUsed", "TotalDiskSpaceUsed" }; private static final String TYPE_NAME = "SnapshotDetails"; @@ -48,28 +54,22 @@ public class SnapshotDetailsTabularData { static { try { - ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, - SimpleType.STRING, SimpleType.STRING, SimpleType.STRING }; + ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, + SimpleType.STRING }; - COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, - ITEM_DESCS, ITEM_TYPES); + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); - TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, - ITEM_NAMES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static void from(final String snapshot, final String ks, - final String cf, - Map.Entry> snapshotDetail, - TabularDataSupport result) { + public static void from(final String snapshot, final String ks, final String cf, + Map.Entry> snapshotDetail, TabularDataSupport result) { try { - final String totalSize = FileUtils.stringifyFileSize(snapshotDetail - .getValue().left); - final String liveSize = FileUtils.stringifyFileSize(snapshotDetail - .getValue().right); + final String totalSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().left); + final String liveSize = FileUtils.stringifyFileSize(snapshotDetail.getValue().right); result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, new Object[] { snapshot, ks, cf, liveSize, totalSize })); } catch (OpenDataException e) { @@ -77,8 +77,8 @@ public class SnapshotDetailsTabularData { } } - public static void from(final String snapshot, final String ks, - final String cf, long total, long live, TabularDataSupport result) { + public static void from(final String snapshot, final String ks, final String cf, long total, long live, + TabularDataSupport result) { try { final String totalSize = FileUtils.stringifyFileSize(total); final String liveSize = FileUtils.stringifyFileSize(live); diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index a74316e..355b733 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -27,8 +27,7 @@ import javax.management.openmbean.OpenDataException; /** * The MBean interface for ColumnFamilyStore */ -public interface ColumnFamilyStoreMBean -{ +public interface ColumnFamilyStoreMBean { /** * @return the name of the column family */ @@ -40,7 +39,9 @@ public interface ColumnFamilyStoreMBean /** * force a major compaction of this column family * - * @param splitOutput true if the output of the major compaction should be split in several sstables + * @param splitOutput + * true if the output of the major compaction should be split in + * several sstables */ public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException; @@ -60,7 +61,8 @@ public interface ColumnFamilyStoreMBean public int getMaximumCompactionThreshold(); /** - * Sets the maximum and maximum number of SSTables in queue before compaction kicks off + * Sets the maximum and maximum number of SSTables in queue before + * compaction kicks off */ public void setCompactionThresholds(int minThreshold, int maxThreshold); @@ -72,33 +74,42 @@ public interface ColumnFamilyStoreMBean /** * Sets the compaction parameters locally for this node * - * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted + * Note that this will be set until an ALTER with compaction = {..} is + * executed or the node is restarted * - * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..} + * @param options + * compaction options with the same syntax as when doing ALTER + * ... WITH compaction = {..} */ public void setCompactionParametersJson(String options); + public String getCompactionParametersJson(); /** * Sets the compaction parameters locally for this node * - * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted + * Note that this will be set until an ALTER with compaction = {..} is + * executed or the node is restarted * - * @param options compaction options map + * @param options + * compaction options map */ public void setCompactionParameters(Map options); + public Map getCompactionParameters(); /** * Get the compression parameters */ - public Map getCompressionParameters(); + public Map getCompressionParameters(); /** * Set the compression parameters - * @param opts map of string names to values + * + * @param opts + * map of string names to values */ - public void setCompressionParameters(Map opts); + public void setCompressionParameters(Map opts); /** * Set new crc check chance @@ -109,66 +120,74 @@ public interface ColumnFamilyStoreMBean public long estimateKeys(); - /** * Returns a list of the names of the built column indexes for current store + * * @return list of the index names */ public List getBuiltIndexes(); /** * Returns a list of filenames that contain the given key on this node + * * @param key * @return list of filenames containing the key */ public List getSSTablesForKey(String key); /** - * Scan through Keyspace/ColumnFamily's data directory - * determine which SSTables should be loaded and load them + * Scan through Keyspace/ColumnFamily's data directory determine which + * SSTables should be loaded and load them */ public void loadNewSSTables(); /** - * @return the number of SSTables in L0. Always return 0 if Leveled compaction is not enabled. + * @return the number of SSTables in L0. Always return 0 if Leveled + * compaction is not enabled. */ public int getUnleveledSSTables(); /** - * @return sstable count for each level. null unless leveled compaction is used. - * array index corresponds to level(int[0] is for level 0, ...). + * @return sstable count for each level. null unless leveled compaction is + * used. array index corresponds to level(int[0] is for level 0, + * ...). */ public int[] getSSTableCountPerLevel(); /** - * Get the ratio of droppable tombstones to real columns (and non-droppable tombstones) + * Get the ratio of droppable tombstones to real columns (and non-droppable + * tombstones) + * * @return ratio */ public double getDroppableTombstoneRatio(); /** - * @return the size of SSTables in "snapshots" subdirectory which aren't live anymore + * @return the size of SSTables in "snapshots" subdirectory which aren't + * live anymore */ public long trueSnapshotsSize(); /** - * begin sampling for a specific sampler with a given capacity. The cardinality may - * be larger than the capacity, but depending on the use case it may affect its accuracy + * begin sampling for a specific sampler with a given capacity. The + * cardinality may be larger than the capacity, but depending on the use + * case it may affect its accuracy */ public void beginLocalSampling(String sampler, int capacity); /** - * @return top count items for the sampler since beginLocalSampling was called + * @return top count items for the sampler since beginLocalSampling + * was called */ public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException; /* - Is Compaction space check enabled + * Is Compaction space check enabled */ public boolean isCompactionDiskSpaceCheckEnabled(); /* - Enable/Disable compaction space check + * Enable/Disable compaction space check */ public void compactionDiskSpaceCheck(boolean enable); } diff --git a/src/main/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/main/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java index e0cfd3c..77ad028 100644 --- a/src/main/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java +++ b/src/main/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.db.commitlog; - import java.io.IOException; import java.util.List; import java.util.Map; public interface CommitLogMBean { /** - * Command to execute to archive a commitlog segment. Blank to disabled. + * Command to execute to archive a commitlog segment. Blank to disabled. */ public String getArchiveCommand(); @@ -66,12 +65,14 @@ public interface CommitLogMBean { public List getActiveSegmentNames(); /** - * @return Files which are pending for archival attempt. Does NOT include failed archive attempts. + * @return Files which are pending for archival attempt. Does NOT include + * failed archive attempts. */ public List getArchivingSegmentNames(); /** - * @return The size of the mutations in all active commit log segments (uncompressed). + * @return The size of the mutations in all active commit log segments + * (uncompressed). */ public long getActiveContentSize(); @@ -81,7 +82,8 @@ public interface CommitLogMBean { public long getActiveOnDiskSize(); /** - * @return A map between active log segments and the compression ratio achieved for each. + * @return A map between active log segments and the compression ratio + * achieved for each. */ public Map getActiveSegmentCompressionRatios(); } diff --git a/src/main/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java b/src/main/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java index ab3fa2b..07a27ee 100644 --- a/src/main/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java +++ b/src/main/java/org/apache/cassandra/db/compaction/CompactionHistoryTabularData.java @@ -36,13 +36,11 @@ import javax.management.openmbean.TabularType; import com.google.common.base.Throwables; public class CompactionHistoryTabularData { - private static final String[] ITEM_NAMES = new String[] { "id", - "keyspace_name", "columnfamily_name", "compacted_at", "bytes_in", - "bytes_out", "rows_merged" }; + private static final String[] ITEM_NAMES = new String[] { "id", "keyspace_name", "columnfamily_name", + "compacted_at", "bytes_in", "bytes_out", "rows_merged" }; - private static final String[] ITEM_DESCS = new String[] { "time uuid", - "keyspace name", "column family name", "compaction finished at", - "total bytes in", "total bytes out", "total rows merged" }; + private static final String[] ITEM_DESCS = new String[] { "time uuid", "keyspace name", "column family name", + "compaction finished at", "total bytes in", "total bytes out", "total rows merged" }; private static final String TYPE_NAME = "CompactionHistory"; @@ -56,22 +54,18 @@ public class CompactionHistoryTabularData { static { try { - ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, - SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, - SimpleType.LONG, SimpleType.STRING }; + ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; - COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, - ITEM_DESCS, ITEM_TYPES); + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); - TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, - ITEM_NAMES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static TabularData from(JsonArray resultSet) - throws OpenDataException { + public static TabularData from(JsonArray resultSet) throws OpenDataException { TabularDataSupport result = new TabularDataSupport(TABULAR_TYPE); for (int i = 0; i < resultSet.size(); i++) { JsonObject row = resultSet.getJsonObject(i); @@ -91,15 +85,13 @@ public class CompactionHistoryTabularData { if (m > 0) { sb.append(','); } - sb.append(entry.getString("key")).append(':') - .append(entry.getString("value")); + sb.append(entry.getString("key")).append(':').append(entry.getString("value")); } sb.append('}'); } result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES, - new Object[] { id, ksName, cfName, compactedAt, bytesIn, - bytesOut, sb.toString() })); + new Object[] { id, ksName, cfName, compactedAt, bytesIn, bytesOut, sb.toString() })); } return result; } diff --git a/src/main/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/main/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java index f101245..3e0d650 100644 --- a/src/main/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java +++ b/src/main/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java @@ -19,10 +19,10 @@ package org.apache.cassandra.db.compaction; import java.util.List; import java.util.Map; + import javax.management.openmbean.TabularData; -public interface CompactionManagerMBean -{ +public interface CompactionManagerMBean { /** List of running compaction objects. */ public List> getCompactions(); @@ -45,7 +45,7 @@ public interface CompactionManagerMBean /** * Stop all running compaction-like tasks having the provided {@code type}. - * + * * @param type * the type of compaction to stop. Can be one of: - COMPACTION - * VALIDATION - CLEANUP - SCRUB - INDEX_BUILD @@ -54,9 +54,11 @@ public interface CompactionManagerMBean /** * Stop an individual running compaction using the compactionId. - * @param compactionId Compaction ID of compaction to stop. Such IDs can be found in - * the transaction log files whose name starts with compaction_, - * located in the table transactions folder. + * + * @param compactionId + * Compaction ID of compaction to stop. Such IDs can be found in + * the transaction log files whose name starts with compaction_, + * located in the table transactions folder. */ public void stopCompactionById(String compactionId); @@ -67,7 +69,7 @@ public interface CompactionManagerMBean /** * Allows user to resize maximum size of the compaction thread pool. - * + * * @param number * New maximum of compaction threads */ @@ -80,7 +82,7 @@ public interface CompactionManagerMBean /** * Allows user to resize maximum size of the compaction thread pool. - * + * * @param number * New maximum of compaction threads */ @@ -93,7 +95,7 @@ public interface CompactionManagerMBean /** * Allows user to resize maximum size of the compaction thread pool. - * + * * @param number * New maximum of compaction threads */ @@ -106,7 +108,7 @@ public interface CompactionManagerMBean /** * Allows user to resize maximum size of the validator thread pool. - * + * * @param number * New maximum of validator threads */ diff --git a/src/main/java/org/apache/cassandra/gms/ApplicationState.java b/src/main/java/org/apache/cassandra/gms/ApplicationState.java index 31958cf..cb9ecea 100644 --- a/src/main/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/main/java/org/apache/cassandra/gms/ApplicationState.java @@ -24,31 +24,12 @@ package org.apache.cassandra.gms; -public enum ApplicationState -{ - STATUS, - LOAD, - SCHEMA, - DC, - RACK, - RELEASE_VERSION, - REMOVAL_COORDINATOR, - INTERNAL_IP, - RPC_ADDRESS, - X_11_PADDING, // padding specifically for 1.1 - SEVERITY, - NET_VERSION, - HOST_ID, - TOKENS, +public enum ApplicationState { + STATUS, LOAD, SCHEMA, DC, RACK, RELEASE_VERSION, REMOVAL_COORDINATOR, INTERNAL_IP, RPC_ADDRESS, X_11_PADDING, // padding + // specifically + // for + // 1.1 + SEVERITY, NET_VERSION, HOST_ID, TOKENS, // pad to allow adding new states to existing cluster - X1, - X2, - X3, - X4, - X5, - X6, - X7, - X8, - X9, - X10, + X1, X2, X3, X4, X5, X6, X7, X8, X9, X10, } diff --git a/src/main/java/org/apache/cassandra/gms/EndpointState.java b/src/main/java/org/apache/cassandra/gms/EndpointState.java index c30eff2..d352910 100644 --- a/src/main/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/main/java/org/apache/cassandra/gms/EndpointState.java @@ -42,6 +42,7 @@ public class EndpointState { ApplicationState[] applicationValues; private static final java.util.logging.Logger logger = java.util.logging.Logger .getLogger(EndpointState.class.getName()); + EndpointState(HeartBeatState initialHbState) { applicationValues = ApplicationState.values(); hbState = initialHbState; @@ -101,8 +102,8 @@ public class EndpointState { isAlive = alive; } + @Override public String toString() { - return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " - + applicationState; + return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; } } diff --git a/src/main/java/org/apache/cassandra/gms/FailureDetectorMBean.java b/src/main/java/org/apache/cassandra/gms/FailureDetectorMBean.java index 23fae3a..15ce293 100644 --- a/src/main/java/org/apache/cassandra/gms/FailureDetectorMBean.java +++ b/src/main/java/org/apache/cassandra/gms/FailureDetectorMBean.java @@ -23,8 +23,7 @@ import java.util.Map; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -public interface FailureDetectorMBean -{ +public interface FailureDetectorMBean { public void dumpInterArrivalTimes(); public void setPhiConvictThreshold(double phi); diff --git a/src/main/java/org/apache/cassandra/gms/GossiperMBean.java b/src/main/java/org/apache/cassandra/gms/GossiperMBean.java index c4b244c..9f8e567 100644 --- a/src/main/java/org/apache/cassandra/gms/GossiperMBean.java +++ b/src/main/java/org/apache/cassandra/gms/GossiperMBean.java @@ -19,8 +19,7 @@ package org.apache.cassandra.gms; import java.net.UnknownHostException; -public interface GossiperMBean -{ +public interface GossiperMBean { public long getEndpointDowntime(String address) throws UnknownHostException; public int getCurrentGenerationNumber(String address) throws UnknownHostException; diff --git a/src/main/java/org/apache/cassandra/gms/HeartBeatState.java b/src/main/java/org/apache/cassandra/gms/HeartBeatState.java index 0af0ef6..be9efbd 100644 --- a/src/main/java/org/apache/cassandra/gms/HeartBeatState.java +++ b/src/main/java/org/apache/cassandra/gms/HeartBeatState.java @@ -58,8 +58,8 @@ class HeartBeatState { version = Integer.MAX_VALUE; } + @Override public String toString() { - return String.format("HeartBeat: generation = %d, version = %d", - generation, version); + return String.format("HeartBeat: generation = %d, version = %d", generation, version); } } diff --git a/src/main/java/org/apache/cassandra/locator/EndpointSnitchInfoMBean.java b/src/main/java/org/apache/cassandra/locator/EndpointSnitchInfoMBean.java index 6de5022..2f5bc5f 100644 --- a/src/main/java/org/apache/cassandra/locator/EndpointSnitchInfoMBean.java +++ b/src/main/java/org/apache/cassandra/locator/EndpointSnitchInfoMBean.java @@ -22,34 +22,40 @@ import java.net.UnknownHostException; /** * MBean exposing standard Snitch info */ -public interface EndpointSnitchInfoMBean -{ +public interface EndpointSnitchInfoMBean { /** - * Provides the Rack name depending on the respective snitch used, given the host name/ip + * Provides the Rack name depending on the respective snitch used, given the + * host name/ip + * * @param host * @throws UnknownHostException */ public String getRack(String host) throws UnknownHostException; /** - * Provides the Datacenter name depending on the respective snitch used, given the hostname/ip + * Provides the Datacenter name depending on the respective snitch used, + * given the hostname/ip + * * @param host * @throws UnknownHostException */ public String getDatacenter(String host) throws UnknownHostException; /** - * Provides the Rack name depending on the respective snitch used for this node + * Provides the Rack name depending on the respective snitch used for this + * node */ public String getRack(); /** - * Provides the Datacenter name depending on the respective snitch used for this node + * Provides the Datacenter name depending on the respective snitch used for + * this node */ public String getDatacenter(); /** * Provides the snitch name of the cluster + * * @return Snitch name */ public String getSnitchName(); diff --git a/src/main/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/main/java/org/apache/cassandra/net/MessagingServiceMBean.java index 5a508e0..be98145 100644 --- a/src/main/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/main/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -24,8 +24,6 @@ package org.apache.cassandra.net; - - import java.net.UnknownHostException; import java.util.Map; @@ -133,6 +131,6 @@ public interface MessagingServiceMBean { * Number of timeouts since last check per host. */ public Map getRecentTimeoutsPerHost(); - + public int getVersion(String address) throws UnknownHostException; } diff --git a/src/main/java/org/apache/cassandra/service/CacheServiceMBean.java b/src/main/java/org/apache/cassandra/service/CacheServiceMBean.java index a28d2d1..bcb0cfb 100644 --- a/src/main/java/org/apache/cassandra/service/CacheServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/CacheServiceMBean.java @@ -22,30 +22,33 @@ * Modified by Cloudius Systems */ - - package org.apache.cassandra.service; import java.util.concurrent.ExecutionException; -public interface CacheServiceMBean -{ +public interface CacheServiceMBean { public int getRowCacheSavePeriodInSeconds(); + public void setRowCacheSavePeriodInSeconds(int rcspis); public int getKeyCacheSavePeriodInSeconds(); + public void setKeyCacheSavePeriodInSeconds(int kcspis); public int getCounterCacheSavePeriodInSeconds(); + public void setCounterCacheSavePeriodInSeconds(int ccspis); public int getRowCacheKeysToSave(); + public void setRowCacheKeysToSave(int rckts); public int getKeyCacheKeysToSave(); + public void setKeyCacheKeysToSave(int kckts); public int getCounterCacheKeysToSave(); + public void setCounterCacheKeysToSave(int cckts); /** @@ -69,8 +72,13 @@ public interface CacheServiceMBean /** * save row and key caches * - * @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception - * @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity. + * @throws ExecutionException + * when attempting to retrieve the result of a task that aborted + * by throwing an exception + * @throws InterruptedException + * when a thread is waiting, sleeping, or otherwise occupied, + * and the thread is interrupted, either before or during the + * activity. */ public void saveCaches() throws ExecutionException, InterruptedException; } diff --git a/src/main/java/org/apache/cassandra/service/GCInspectorMXBean.java b/src/main/java/org/apache/cassandra/service/GCInspectorMXBean.java index c26a67c..fae5724 100644 --- a/src/main/java/org/apache/cassandra/service/GCInspectorMXBean.java +++ b/src/main/java/org/apache/cassandra/service/GCInspectorMXBean.java @@ -18,8 +18,8 @@ */ package org.apache.cassandra.service; -public interface GCInspectorMXBean -{ - // returns { interval (ms), max(gc real time (ms)), sum(gc real time (ms)), sum((gc real time (ms))^2), sum(gc bytes), count(gc) } +public interface GCInspectorMXBean { + // returns { interval (ms), max(gc real time (ms)), sum(gc real time (ms)), + // sum((gc real time (ms))^2), sum(gc bytes), count(gc) } public double[] getAndResetStats(); } diff --git a/src/main/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/main/java/org/apache/cassandra/service/StorageProxyMBean.java index fd086fa..404821c 100644 --- a/src/main/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -83,8 +83,7 @@ public interface StorageProxyMBean { public void setTruncateRpcTimeout(Long timeoutInMillis); - public void setNativeTransportMaxConcurrentConnections( - Long nativeTransportMaxConcurrentConnections); + public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections); public Long getNativeTransportMaxConcurrentConnections(); diff --git a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java index 4234a88..f89ee91 100644 --- a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -132,8 +132,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * * @return mapping of ranges to end points */ - public Map, List> getRangeToEndpointMap( - String keyspace); + public Map, List> getRangeToEndpointMap(String keyspace); /** * Retrieve a map of range to rpc addresses that describe the ring topology @@ -141,8 +140,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * * @return mapping of ranges to rpc addresses */ - public Map, List> getRangeToRpcaddressMap( - String keyspace); + public Map, List> getRangeToRpcaddressMap(String keyspace); /** * The same as {@code describeRing(String)} but converts TokenRange to the @@ -164,8 +162,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * the keyspace to get the pending range map for. * @return a map of pending ranges to endpoints */ - public Map, List> getPendingRangeToEndpointMap( - String keyspace); + public Map, List> getPendingRangeToEndpointMap(String keyspace); /** * Retrieve a map of tokens to endpoints, including the bootstrapping ones. @@ -211,11 +208,9 @@ public interface StorageServiceMBean extends NotificationEmitter { * - key for which we need to find the endpoint return value - * the endpoint responsible for this key */ - public List getNaturalEndpoints(String keyspaceName, String cf, - String key); + public List getNaturalEndpoints(String keyspaceName, String cf, String key); - public List getNaturalEndpoints(String keyspaceName, - ByteBuffer key); + public List getNaturalEndpoints(String keyspaceName, ByteBuffer key); /** * Takes the snapshot for the given keyspaces. A snapshot name must be @@ -226,8 +221,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * @param keyspaceNames * the name of the keyspaces to snapshot; empty means "all." */ - public void takeSnapshot(String tag, String... keyspaceNames) - throws IOException; + public void takeSnapshot(String tag, String... keyspaceNames) throws IOException; /** * Takes the snapshot of a specific column family. A snapshot name must be specified. @@ -251,8 +245,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * @param tag * the tag given to the snapshot; may not be null or empty */ - public void takeColumnFamilySnapshot(String keyspaceName, - String columnFamilyName, String tag) throws IOException; + public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException; /** * Takes the snapshot of a multiple column family from different keyspaces. @@ -264,15 +257,13 @@ public interface StorageServiceMBean extends NotificationEmitter { * list of columnfamily from different keyspace in the form of * ks1.cf1 ks2.cf2 */ - public void takeMultipleColumnFamilySnapshot(String tag, - String... columnFamilyList) throws IOException; + public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList) throws IOException; /** * Remove the snapshot with the given name from the given keyspaces. If no * tag is specified we will remove all snapshots. */ - public void clearSnapshot(String tag, String... keyspaceNames) - throws IOException; + public void clearSnapshot(String tag, String... keyspaceNames) throws IOException; /** * Get the details of all the snapshot @@ -289,21 +280,26 @@ public interface StorageServiceMBean extends NotificationEmitter { public long trueSnapshotsSize(); /** - * Forces refresh of values stored in system.size_estimates of all column families. + * Forces refresh of values stored in system.size_estimates of all column + * families. */ public void refreshSizeEstimates() throws ExecutionException; /** * Forces major compaction of a single keyspace */ - public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) + throws IOException, ExecutionException, InterruptedException; /** * Trigger a cleanup of keys on a single keyspace */ @Deprecated - public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; - public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; + public int forceKeyspaceCleanup(String keyspaceName, String... tables) + throws IOException, ExecutionException, InterruptedException; + + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) + throws IOException, ExecutionException, InterruptedException; /** * Scrub (deserialize + reserialize at the latest version, skipping bad rows @@ -313,26 +309,36 @@ public interface StorageServiceMBean extends NotificationEmitter { * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ @Deprecated - public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) + throws IOException, ExecutionException, InterruptedException; + @Deprecated - public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; - public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, + String... tableNames) throws IOException, ExecutionException, InterruptedException; + + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, + String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** - * Verify (checksums of) the given keyspace. - * If tableNames array is empty, all CFs are verified. + * Verify (checksums of) the given keyspace. If tableNames array is empty, + * all CFs are verified. * - * The entire sstable will be read to ensure each cell validates if extendedVerify is true + * The entire sstable will be read to ensure each cell validates if + * extendedVerify is true */ - public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) + throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. Unlike scrub, it doesn't skip * bad rows and do not snapshot sstables first. */ @Deprecated - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) throws IOException, ExecutionException, InterruptedException; - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) + throws IOException, ExecutionException, InterruptedException; + + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) + throws IOException, ExecutionException, InterruptedException; /** * Flush all memtables for the given column families, or all columnfamilies @@ -342,71 +348,86 @@ public interface StorageServiceMBean extends NotificationEmitter { * @param columnFamilies * @throws IOException */ - public void forceKeyspaceFlush(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException; + public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException; /** - * Invoke repair asynchronously. - * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: - * type: "repair" - * userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status + * Invoke repair asynchronously. You can track repair progress by + * subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: type: "repair" userObject: int array of length 2, + * [0]=command number, [1]=ordinal of ActiveRepairService.Status * - * @param keyspace Keyspace name to repair. Should not be null. - * @param options repair option. + * @param keyspace + * Keyspace name to repair. Should not be null. + * @param options + * repair option. * @return Repair command number, or 0 if nothing to repair */ public int repairAsync(String keyspace, Map options); /** - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. */ @Deprecated - public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames) throws IOException; + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, + Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames) + throws IOException; /** - * Invoke repair asynchronously. - * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: - * type: "repair" - * userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status + * Invoke repair asynchronously. You can track repair progress by + * subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: type: "repair" userObject: int array of length 2, + * [0]=command number, [1]=ordinal of ActiveRepairService.Status * - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. * - * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel + * @param parallelismDegree + * 0: sequential, 1: parallel, 2: DC parallel * @return Repair command number, or 0 if nothing to repair */ @Deprecated - public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames); + public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, + Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames); /** - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. */ @Deprecated - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames) throws IOException; + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, + Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames) + throws IOException; /** * Same as forceRepairAsync, but handles a specified range * - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. * - * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel + * @param parallelismDegree + * 0: sequential, 1: parallel, 2: DC parallel */ @Deprecated - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames); + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, + Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames); /** - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. */ @Deprecated - public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... tableNames); + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, + boolean fullRepair, String... tableNames); /** - * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * @deprecated use {@link #repairAsync(String keyspace, Map options)} + * instead. */ @Deprecated - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... tableNames); + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, + boolean isLocal, boolean fullRepair, String... tableNames); public void forceTerminateAllRepairSessions(); @@ -457,8 +478,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * * @see ch.qos.logback.classic.Level#toLevel(String) */ - public void setLoggingLevel(String classQualifier, String level) - throws Exception; + public void setLoggingLevel(String classQualifier, String level) throws Exception; /** get the runtime logging levels */ public Map getLoggingLevels(); @@ -479,8 +499,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * makes node unavailable for writes, flushes memtables and replays * commitlog. */ - public void drain() - throws IOException, InterruptedException, ExecutionException; + public void drain() throws IOException, InterruptedException, ExecutionException; /** * Truncates (deletes) the given columnFamily from the provided keyspace. @@ -494,8 +513,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * @param columnFamily * The column family to delete data from. */ - public void truncate(String keyspace, String columnFamily) - throws TimeoutException, IOException; + public void truncate(String keyspace, String columnFamily) throws TimeoutException, IOException; /** * given a list of tokens (representing the nodes in the cluster), returns a @@ -510,8 +528,7 @@ public interface StorageServiceMBean extends NotificationEmitter { * the same replication strategies and if yes then we will use the first * else a empty Map is returned. */ - public Map effectiveOwnership(String keyspace) - throws IllegalStateException; + public Map effectiveOwnership(String keyspace) throws IllegalStateException; public List getKeyspaces(); @@ -535,9 +552,8 @@ public interface StorageServiceMBean extends NotificationEmitter { * @param dynamicBadnessThreshold * double, (default 0.0) */ - public void updateSnitch(String epSnitchClassName, Boolean dynamic, - Integer dynamicUpdateInterval, Integer dynamicResetInterval, - Double dynamicBadnessThreshold) throws ClassNotFoundException; + public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, + Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException; // allows a user to forcibly 'kill' a sick node public void stopGossiping(); @@ -579,6 +595,7 @@ public interface StorageServiceMBean extends NotificationEmitter { public int getStreamThroughputMbPerSec(); public void setInterDCStreamThroughputMbPerSec(int value); + public int getInterDCStreamThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); @@ -635,8 +652,7 @@ public interface StorageServiceMBean extends NotificationEmitter { /** * rebuild the specified indexes */ - public void rebuildSecondaryIndex(String ksName, String cfName, - String... idxNames); + public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames); public void resetLocalSchema() throws IOException; @@ -657,11 +673,9 @@ public interface StorageServiceMBean extends NotificationEmitter { */ public double getTraceProbability(); - void disableAutoCompaction(String ks, String... columnFamilies) - throws IOException; + void disableAutoCompaction(String ks, String... columnFamilies) throws IOException; - void enableAutoCompaction(String ks, String... columnFamilies) - throws IOException; + void enableAutoCompaction(String ks, String... columnFamilies) throws IOException; public void deliverHints(String host) throws UnknownHostException; @@ -685,10 +699,13 @@ public interface StorageServiceMBean extends NotificationEmitter { /** Returns the threshold for rejecting queries due to a large batch size */ public int getBatchSizeFailureThreshold(); + /** Sets the threshold for rejecting queries due to a large batch size */ public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold); - /** Sets the hinted handoff throttle in kb per second, per delivery thread. */ + /** + * Sets the hinted handoff throttle in kb per second, per delivery thread. + */ public void setHintedHandoffThrottleInKB(int throttleInKB); /** diff --git a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java index 8b6f99d..cdb52ea 100644 --- a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -29,6 +29,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; + import javax.json.JsonArray; import javax.json.JsonObject; @@ -37,25 +38,21 @@ import com.google.common.base.Objects; /** * ProgressInfo contains file transfer progress. */ -public class ProgressInfo implements Serializable -{ +@SuppressWarnings("serial") +public class ProgressInfo implements Serializable { /** * Direction of the stream. */ - public static enum Direction - { - OUT(0), - IN(1); + public static enum Direction { + OUT(0), IN(1); public final byte code; - private Direction(int code) - { + private Direction(int code) { this.code = (byte) code; } - public static Direction fromByte(byte direction) - { + public static Direction fromByte(byte direction) { return direction == 0 ? OUT : IN; } } @@ -67,8 +64,8 @@ public class ProgressInfo implements Serializable public final long currentBytes; public final long totalBytes; - public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes) - { + public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, + long totalBytes) { assert totalBytes > 0; this.peer = peer; @@ -81,12 +78,9 @@ public class ProgressInfo implements Serializable static public ProgressInfo fromJsonObject(JsonObject obj) { try { - return new ProgressInfo(InetAddress.getByName(obj.getString("peer")), - obj.getInt("session_index"), - obj.getString("file_name"), - Direction.valueOf(obj.getString("direction")), - obj.getJsonNumber("current_bytes").longValue(), - obj.getJsonNumber("total_bytes").longValue()); + return new ProgressInfo(InetAddress.getByName(obj.getString("peer")), obj.getInt("session_index"), + obj.getString("file_name"), Direction.valueOf(obj.getString("direction")), + obj.getJsonNumber("current_bytes").longValue(), obj.getJsonNumber("total_bytes").longValue()); } catch (UnknownHostException e) { // Not suppose to get here } @@ -104,45 +98,55 @@ public class ProgressInfo implements Serializable } return res; } + /** * @return true if file transfer is completed */ - public boolean isCompleted() - { + public boolean isCompleted() { return currentBytes >= totalBytes; } /** - * ProgressInfo is considered to be equal only when all attributes except currentBytes are equal. + * ProgressInfo is considered to be equal only when all attributes except + * currentBytes are equal. */ @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } ProgressInfo that = (ProgressInfo) o; - if (totalBytes != that.totalBytes) return false; - if (direction != that.direction) return false; - if (!fileName.equals(that.fileName)) return false; - if (sessionIndex != that.sessionIndex) return false; + if (totalBytes != that.totalBytes) { + return false; + } + if (direction != that.direction) { + return false; + } + if (!fileName.equals(that.fileName)) { + return false; + } + if (sessionIndex != that.sessionIndex) { + return false; + } return peer.equals(that.peer); } @Override - public int hashCode() - { + public int hashCode() { return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes); } @Override - public String toString() - { + public String toString() { StringBuilder sb = new StringBuilder(fileName); sb.append(" ").append(currentBytes); sb.append("/").append(totalBytes).append(" bytes"); - sb.append("(").append(currentBytes*100/totalBytes).append("%) "); + sb.append("(").append(currentBytes * 100 / totalBytes).append("%) "); sb.append(direction == Direction.OUT ? "sent to " : "received from "); sb.append("idx:").append(sessionIndex); sb.append(peer); diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java index 6d44484..9222b3d 100644 --- a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -28,30 +28,26 @@ import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import javax.json.JsonArray; import javax.json.JsonObject; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; /** * Stream session info. */ -public final class SessionInfo implements Serializable -{ +@SuppressWarnings("serial") +public final class SessionInfo implements Serializable { public final InetAddress peer; public final int sessionIndex; public final InetAddress connecting; /** Immutable collection of receiving summaries */ public final Collection receivingSummaries; - /** Immutable collection of sending summaries*/ + /** Immutable collection of sending summaries */ public final Collection sendingSummaries; /** Current session state */ public final StreamSession.State state; @@ -67,15 +63,10 @@ public final class SessionInfo implements Serializable return null; } - - public SessionInfo(InetAddress peer, - int sessionIndex, - InetAddress connecting, - Collection receivingSummaries, - Collection sendingSummaries, - StreamSession.State state, - Map receivingFiles, - Map sendingFiles) { + public SessionInfo(InetAddress peer, int sessionIndex, InetAddress connecting, + Collection receivingSummaries, Collection sendingSummaries, + StreamSession.State state, Map receivingFiles, + Map sendingFiles) { this.peer = peer; this.sessionIndex = sessionIndex; this.connecting = connecting; @@ -86,24 +77,19 @@ public final class SessionInfo implements Serializable this.state = state; } - public SessionInfo(String peer, - int sessionIndex, - String connecting, - Collection receivingSummaries, - Collection sendingSummaries, - String state, - Map receivingFiles, + public SessionInfo(String peer, int sessionIndex, String connecting, Collection receivingSummaries, + Collection sendingSummaries, String state, Map receivingFiles, Map sendingFiles) { this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries, StreamSession.State.valueOf(state), receivingFiles, sendingFiles); } + ProgressInfo in; + public static SessionInfo fromJsonObject(JsonObject obj) { - return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), - obj.getString("connecting"), + return new SessionInfo(obj.getString("peer"), obj.getInt("session_index"), obj.getString("connecting"), StreamSummary.fromJsonArr(obj.getJsonArray("receiving_summaries")), - StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), - obj.getString("state"), + StreamSummary.fromJsonArr(obj.getJsonArray("sending_summaries")), obj.getString("state"), ProgressInfo.fromJArrray(obj.getJsonArray("receiving_files")), ProgressInfo.fromJArrray(obj.getJsonArray("sending_files"))); } @@ -118,135 +104,117 @@ public final class SessionInfo implements Serializable return res; } - public boolean isFailed() - { + public boolean isFailed() { return state == StreamSession.State.FAILED; } /** * Update progress of receiving/sending file. * - * @param newProgress new progress info + * @param newProgress + * new progress info */ - public void updateProgress(ProgressInfo newProgress) - { + public void updateProgress(ProgressInfo newProgress) { assert peer.equals(newProgress.peer); - Map currentFiles = newProgress.direction == ProgressInfo.Direction.IN - ? receivingFiles : sendingFiles; + Map currentFiles = newProgress.direction == ProgressInfo.Direction.IN ? receivingFiles + : sendingFiles; currentFiles.put(newProgress.fileName, newProgress); } - public Collection getReceivingFiles() - { + public Collection getReceivingFiles() { return receivingFiles.values(); } - public Collection getSendingFiles() - { + public Collection getSendingFiles() { return sendingFiles.values(); } /** * @return total number of files already received. */ - public long getTotalFilesReceived() - { + public long getTotalFilesReceived() { return getTotalFilesCompleted(receivingFiles.values()); } /** * @return total number of files already sent. */ - public long getTotalFilesSent() - { + public long getTotalFilesSent() { return getTotalFilesCompleted(sendingFiles.values()); } /** * @return total size(in bytes) already received. */ - public long getTotalSizeReceived() - { + public long getTotalSizeReceived() { return getTotalSizeInProgress(receivingFiles.values()); } /** * @return total size(in bytes) already sent. */ - public long getTotalSizeSent() - { + public long getTotalSizeSent() { return getTotalSizeInProgress(sendingFiles.values()); } /** * @return total number of files to receive in the session */ - public long getTotalFilesToReceive() - { + public long getTotalFilesToReceive() { return getTotalFiles(receivingSummaries); } /** * @return total number of files to send in the session */ - public long getTotalFilesToSend() - { + public long getTotalFilesToSend() { return getTotalFiles(sendingSummaries); } /** * @return total size(in bytes) to receive in the session */ - public long getTotalSizeToReceive() - { + public long getTotalSizeToReceive() { return getTotalSizes(receivingSummaries); } /** * @return total size(in bytes) to send in the session */ - public long getTotalSizeToSend() - { + public long getTotalSizeToSend() { return getTotalSizes(sendingSummaries); } - private long getTotalSizeInProgress(Collection files) - { + private long getTotalSizeInProgress(Collection files) { long total = 0; - for (ProgressInfo file : files) + for (ProgressInfo file : files) { total += file.currentBytes; + } return total; } - private long getTotalFiles(Collection summaries) - { + private long getTotalFiles(Collection summaries) { long total = 0; - for (StreamSummary summary : summaries) + for (StreamSummary summary : summaries) { total += summary.files; + } return total; } - private long getTotalSizes(Collection summaries) - { + private long getTotalSizes(Collection summaries) { if (summaries == null) { return 0; } long total = 0; - for (StreamSummary summary : summaries) + for (StreamSummary summary : summaries) { total += summary.totalSize; + } return total; } - private long getTotalFilesCompleted(Collection files) - { - Iterable completed = Iterables.filter(files, new Predicate() - { - public boolean apply(ProgressInfo input) - { - return input.isCompleted(); - } - }); + private long getTotalFilesCompleted(Collection files) { + Iterable completed = Iterables.filter(files, input -> input.isCompleted()); return Iterables.size(completed); } } diff --git a/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java index 28b25db..17965d9 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java @@ -25,6 +25,7 @@ package org.apache.cassandra.streaming; import java.util.Set; + import javax.management.NotificationEmitter; import javax.management.openmbean.CompositeData; diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSession.java b/src/main/java/org/apache/cassandra/streaming/StreamSession.java index 7646dc6..90d2b2a 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamSession.java @@ -25,81 +25,80 @@ package org.apache.cassandra.streaming; /** - * Handles the streaming a one or more section of one of more sstables to and from a specific - * remote node. + * Handles the streaming a one or more section of one of more sstables to and + * from a specific remote node. * - * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming - * session has the following life-cycle: + * Both this node and the remote one will create a similar symmetrical + * StreamSession. A streaming session has the following life-cycle: * * 1. Connections Initialization * - * (a) A node (the initiator in the following) create a new StreamSession, initialize it (init()) - * and then start it (start()). Start will create a {@link ConnectionHandler} that will create - * two connections to the remote node (the follower in the following) with whom to stream and send - * a StreamInit message. The first connection will be the incoming connection for the - * initiator, and the second connection will be the outgoing. - * (b) Upon reception of that StreamInit message, the follower creates its own StreamSession, - * initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler - * according to StreamInit message's isForOutgoing flag. - * (d) When the both incoming and outgoing connections are established, StreamSession calls - * StreamSession#onInitializationComplete method to start the streaming prepare phase - * (StreamResultFuture.startStreaming()). + * (a) A node (the initiator in the following) create a new StreamSession, + * initialize it (init()) and then start it (start()). Start will create a + * {@link ConnectionHandler} that will create two connections to the remote node + * (the follower in the following) with whom to stream and send a StreamInit + * message. The first connection will be the incoming connection for the + * initiator, and the second connection will be the outgoing. (b) Upon reception + * of that StreamInit message, the follower creates its own StreamSession, + * initialize it if it still does not exist, and attach connecting socket to its + * ConnectionHandler according to StreamInit message's isForOutgoing flag. (d) + * When the both incoming and outgoing connections are established, + * StreamSession calls StreamSession#onInitializationComplete method to start + * the streaming prepare phase (StreamResultFuture.startStreaming()). * * 2. Streaming preparation phase * - * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a - * PrepareMessage that includes what files/sections this node will stream to the follower - * (stored in a StreamTransferTask, each column family has it's own transfer task) and what - * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has - * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise, - * it waits for the follower PrepareMessage. - * (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive - * and send back its own PrepareMessage with a summary of the files/sections that will be sent to - * the initiator (prepare()). After having sent that message, the follower goes to its Streamning - * phase. - * (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will - * receive and then goes to his own Streaming phase. + * (a) This phase is started when the initiator onInitializationComplete() + * method is called. This method sends a PrepareMessage that includes what + * files/sections this node will stream to the follower (stored in a + * StreamTransferTask, each column family has it's own transfer task) and what + * the follower needs to stream back (StreamReceiveTask, same as above). If the + * initiator has nothing to receive from the follower, it goes directly to its + * Streaming phase. Otherwise, it waits for the follower PrepareMessage. (b) + * Upon reception of the PrepareMessage, the follower records which + * files/sections it will receive and send back its own PrepareMessage with a + * summary of the files/sections that will be sent to the initiator (prepare()). + * After having sent that message, the follower goes to its Streamning phase. + * (c) When the initiator receives the follower PrepareMessage, it records which + * files/sections it will receive and then goes to his own Streaming phase. * * 3. Streaming phase * - * (a) The streaming phase is started by each node (the sender in the follower, but note that each side - * of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles(). - * This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage - * consists of a FileMessageHeader that indicates which file is coming and then start streaming the - * content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the - * fileSent() method is called for that file. If all the files for a StreamTransferTask are sent - * (StreamTransferTask.complete()), the task is marked complete (taskCompleted()). - * (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in - * FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as - * complete (received()). When all files for the StreamReceiveTask have been received, the sstables - * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task - * is marked complete (taskCompleted()) - * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream - * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) - * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new - * FileMessage for that file. - * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase - * (maybeCompleted()). + * (a) The streaming phase is started by each node (the sender in the follower, + * but note that each side of the StreamSession may be sender for some of the + * files) involved by calling startStreamingFiles(). This will sequentially send + * a FileMessage for each file of each SteamTransferTask. Each FileMessage + * consists of a FileMessageHeader that indicates which file is coming and then + * start streaming the content for that file (StreamWriter in + * FileMessage.serialize()). When a file is fully sent, the fileSent() method is + * called for that file. If all the files for a StreamTransferTask are sent + * (StreamTransferTask.complete()), the task is marked complete + * (taskCompleted()). (b) On the receiving side, a SSTable will be written for + * the incoming file (StreamReader in FileMessage.deserialize()) and once the + * FileMessage is fully received, the file will be marked as complete + * (received()). When all files for the StreamReceiveTask have been received, + * the sstables are added to the CFS (and 2ndary index are built, + * StreamReceiveTask.complete()) and the task is marked complete + * (taskCompleted()) (b) If during the streaming of a particular file an I/O + * error occurs on the receiving end of a stream (FileMessage.deserialize), the + * node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) + * by sending a RetryMessage to the sender. On receiving a RetryMessage, the + * sender simply issue a new FileMessage for that file. (c) When all transfer + * and receive tasks for a session are complete, the move to the Completion + * phase (maybeCompleted()). * * 4. Completion phase * - * (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()). - * If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that - * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and - * send a CompleteMessage to the other side. + * (a) When a node has finished all transfer and receive task, it enter the + * completion phase (maybeCompleted()). If it had already received a + * CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that + * session is done is is closed (closeSession()). Otherwise, the node switch to + * the WAIT_COMPLETE state and send a CompleteMessage to the other side. */ -public class StreamSession -{ +public class StreamSession { - public static enum State - { - INITIALIZED, - PREPARING, - STREAMING, - WAIT_COMPLETE, - COMPLETE, - FAILED, + public static enum State { + INITIALIZED, PREPARING, STREAMING, WAIT_COMPLETE, COMPLETE, FAILED, } - } diff --git a/src/main/java/org/apache/cassandra/streaming/StreamState.java b/src/main/java/org/apache/cassandra/streaming/StreamState.java index fbaa3fb..aa09143 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamState.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamState.java @@ -28,14 +28,12 @@ import java.io.Serializable; import java.util.Set; import java.util.UUID; -import com.google.common.base.Predicate; import com.google.common.collect.Iterables; /** * Current snapshot of streaming progress. */ -public class StreamState implements Serializable -{ +public class StreamState implements Serializable { /** * */ @@ -49,19 +47,12 @@ public class StreamState implements Serializable this.description = description; this.sessions = sessions; } - public StreamState(String planId, String description, Set sessions) - { + + public StreamState(String planId, String description, Set sessions) { this(UUID.fromString(planId), description, sessions); } - public boolean hasFailedSession() - { - return Iterables.any(sessions, new Predicate() - { - public boolean apply(SessionInfo session) - { - return session.isFailed(); - } - }); + public boolean hasFailedSession() { + return Iterables.any(sessions, session -> session.isFailed()); } } diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java index dbda53e..2e90810 100644 --- a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java @@ -36,18 +36,17 @@ import com.google.common.base.Objects; /** * Summary of streaming. */ -public class StreamSummary -{ +public class StreamSummary { public final UUID cfId; /** - * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + * Number of files to transfer. Can be 0 if nothing to transfer for some + * streaming request. */ public final int files; public final long totalSize; - public StreamSummary(UUID cfId, int files, long totalSize) - { + public StreamSummary(UUID cfId, int files, long totalSize) { this.cfId = cfId; this.files = files; this.totalSize = totalSize; @@ -58,7 +57,8 @@ public class StreamSummary } public static StreamSummary fromJsonObject(JsonObject obj) { - return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), obj.getJsonNumber("total_size").longValue()); + return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), + obj.getJsonNumber("total_size").longValue()); } public static Collection fromJsonArr(JsonArray arr) { @@ -71,24 +71,26 @@ public class StreamSummary } return res; } + @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } StreamSummary summary = (StreamSummary) o; return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId); } @Override - public int hashCode() - { + public int hashCode() { return Objects.hashCode(cfId, files, totalSize); } @Override - public String toString() - { + public String toString() { final StringBuilder sb = new StringBuilder("StreamSummary{"); sb.append("path=").append(cfId); sb.append(", files=").append(files); diff --git a/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java index b722c13..1642e1c 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java @@ -29,54 +29,38 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import javax.management.openmbean.*; -import com.google.common.base.Throwables; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; import org.apache.cassandra.streaming.ProgressInfo; -public class ProgressInfoCompositeData -{ - private static final String[] ITEM_NAMES = new String[]{"planId", - "peer", - "sessionIndex", - "fileName", - "direction", - "currentBytes", - "totalBytes"}; - private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID", - "Session peer", - "Index of session", - "Name of the file", - "Direction('IN' or 'OUT')", - "Current bytes transferred", - "Total bytes to transfer"}; - private static final OpenType[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, - SimpleType.STRING, - SimpleType.INTEGER, - SimpleType.STRING, - SimpleType.STRING, - SimpleType.LONG, - SimpleType.LONG}; +import com.google.common.base.Throwables; + +public class ProgressInfoCompositeData { + private static final String[] ITEM_NAMES = new String[] { "planId", "peer", "sessionIndex", "fileName", "direction", + "currentBytes", "totalBytes" }; + private static final String[] ITEM_DESCS = new String[] { "String representation of Plan ID", "Session peer", + "Index of session", "Name of the file", "Direction('IN' or 'OUT')", "Current bytes transferred", + "Total bytes to transfer" }; + private static final OpenType[] ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, + SimpleType.INTEGER, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG }; public static final CompositeType COMPOSITE_TYPE; - static { - try - { - COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(), - "ProgressInfo", - ITEM_NAMES, - ITEM_DESCS, - ITEM_TYPES); - } - catch (OpenDataException e) - { + static { + try { + COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(), "ProgressInfo", ITEM_NAMES, ITEM_DESCS, + ITEM_TYPES); + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo) - { + public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo) { Map valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress()); @@ -85,30 +69,19 @@ public class ProgressInfoCompositeData valueMap.put(ITEM_NAMES[4], progressInfo.direction.name()); valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes); valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes); - try - { + try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); - } - catch (OpenDataException e) - { + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static ProgressInfo fromCompositeData(CompositeData cd) - { + public static ProgressInfo fromCompositeData(CompositeData cd) { Object[] values = cd.getAll(ITEM_NAMES); - try - { - return new ProgressInfo(InetAddress.getByName((String) values[1]), - (int) values[2], - (String) values[3], - ProgressInfo.Direction.valueOf((String)values[4]), - (long) values[5], - (long) values[6]); - } - catch (UnknownHostException e) - { + try { + return new ProgressInfo(InetAddress.getByName((String) values[1]), (int) values[2], (String) values[3], + ProgressInfo.Direction.valueOf((String) values[4]), (long) values[5], (long) values[6]); + } catch (UnknownHostException e) { throw Throwables.propagate(e); } } diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index ef5d955..bcda9c0 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -26,8 +26,24 @@ package org.apache.cassandra.streaming.management; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.*; -import javax.management.openmbean.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamSummary; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -35,149 +51,86 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.SessionInfo; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.StreamSummary; -import java.util.HashMap; - -public class SessionInfoCompositeData -{ - private static final String[] ITEM_NAMES = new String[]{"planId", - "peer", - "connecting", - "receivingSummaries", - "sendingSummaries", - "state", - "receivingFiles", - "sendingFiles", - "sessionIndex"}; - private static final String[] ITEM_DESCS = new String[]{"Plan ID", - "Session peer", - "Connecting address", - "Summaries of receiving data", - "Summaries of sending data", - "Current session state", - "Receiving files", - "Sending files", - "Session index"}; +public class SessionInfoCompositeData { + private static final String[] ITEM_NAMES = new String[] { "planId", "peer", "connecting", "receivingSummaries", + "sendingSummaries", "state", "receivingFiles", "sendingFiles", "sessionIndex" }; + private static final String[] ITEM_DESCS = new String[] { "Plan ID", "Session peer", "Connecting address", + "Summaries of receiving data", "Summaries of sending data", "Current session state", "Receiving files", + "Sending files", "Session index" }; private static final OpenType[] ITEM_TYPES; public static final CompositeType COMPOSITE_TYPE; - static { - try - { - ITEM_TYPES = new OpenType[]{SimpleType.STRING, - SimpleType.STRING, - SimpleType.STRING, - ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), - ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), - SimpleType.STRING, - ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), - ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), - SimpleType.INTEGER}; - COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(), - "SessionInfo", - ITEM_NAMES, - ITEM_DESCS, - ITEM_TYPES); - } - catch (OpenDataException e) - { + static { + try { + ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, + ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), + ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), SimpleType.STRING, + ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), + ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), SimpleType.INTEGER }; + COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(), "SessionInfo", ITEM_NAMES, ITEM_DESCS, + ITEM_TYPES); + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo) - { + public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo) { Map valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress()); valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress()); - Function fromStreamSummary = new Function() - { - public CompositeData apply(StreamSummary input) - { - return StreamSummaryCompositeData.toCompositeData(input); - } - }; + Function fromStreamSummary = input -> StreamSummaryCompositeData + .toCompositeData(input); valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); valueMap.put(ITEM_NAMES[5], sessionInfo.state.name()); - Function fromProgressInfo = new Function() - { - public CompositeData apply(ProgressInfo input) - { - return ProgressInfoCompositeData.toCompositeData(planId, input); - } - }; + Function fromProgressInfo = input -> ProgressInfoCompositeData + .toCompositeData(planId, input); valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex); - try - { + try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); - } - catch (OpenDataException e) - { + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static SessionInfo fromCompositeData(CompositeData cd) - { + public static SessionInfo fromCompositeData(CompositeData cd) { assert cd.getCompositeType().equals(COMPOSITE_TYPE); Object[] values = cd.getAll(ITEM_NAMES); InetAddress peer, connecting; - try - { + try { peer = InetAddress.getByName((String) values[1]); connecting = InetAddress.getByName((String) values[2]); - } - catch (UnknownHostException e) - { + } catch (UnknownHostException e) { throw Throwables.propagate(e); } - Function toStreamSummary = new Function() - { - public StreamSummary apply(CompositeData input) - { - return StreamSummaryCompositeData.fromCompositeData(input); - } - }; - SessionInfo info = new SessionInfo(peer, - (int)values[8], - connecting, - fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), - fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), - StreamSession.State.valueOf((String) values[5]), - new HashMap(), new HashMap()); - Function toProgressInfo = new Function() - { - public ProgressInfo apply(CompositeData input) - { - return ProgressInfoCompositeData.fromCompositeData(input); - } - }; - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) - { + Function toStreamSummary = input -> StreamSummaryCompositeData + .fromCompositeData(input); + SessionInfo info = new SessionInfo(peer, (int) values[8], connecting, + fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), + fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), + StreamSession.State.valueOf((String) values[5]), new HashMap(), + new HashMap()); + Function toProgressInfo = input -> ProgressInfoCompositeData + .fromCompositeData(input); + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) { info.updateProgress(progress); } - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) - { + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) { info.updateProgress(progress); } return info; } - private static Collection fromArrayOfCompositeData(CompositeData[] cds, Function func) - { + private static Collection fromArrayOfCompositeData(CompositeData[] cds, Function func) { return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func)); } - private static CompositeData[] toArrayOfCompositeData(Collection toConvert, Function func) - { + private static CompositeData[] toArrayOfCompositeData(Collection toConvert, + Function func) { if (toConvert == null) { toConvert = Sets.newHashSet(); } diff --git a/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java index 3f57608..daa1f57 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java @@ -24,79 +24,68 @@ package org.apache.cassandra.streaming.management; -import java.util.*; -import javax.management.openmbean.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamState; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.cassandra.streaming.SessionInfo; -import org.apache.cassandra.streaming.StreamState; - /** */ -public class StreamStateCompositeData -{ - private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions", - "currentRxBytes", "totalRxBytes", "rxPercentage", - "currentTxBytes", "totalTxBytes", "txPercentage"}; - private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream", - "Stream plan description", - "Active stream sessions", - "Number of bytes received across all streams", - "Total bytes available to receive across all streams", - "Percentage received across all streams", - "Number of bytes sent across all streams", - "Total bytes available to send across all streams", - "Percentage sent across all streams"}; +public class StreamStateCompositeData { + private static final String[] ITEM_NAMES = new String[] { "planId", "description", "sessions", "currentRxBytes", + "totalRxBytes", "rxPercentage", "currentTxBytes", "totalTxBytes", "txPercentage" }; + private static final String[] ITEM_DESCS = new String[] { "Plan ID of this stream", "Stream plan description", + "Active stream sessions", "Number of bytes received across all streams", + "Total bytes available to receive across all streams", "Percentage received across all streams", + "Number of bytes sent across all streams", "Total bytes available to send across all streams", + "Percentage sent across all streams" }; private static final OpenType[] ITEM_TYPES; public static final CompositeType COMPOSITE_TYPE; - static { - try - { - ITEM_TYPES = new OpenType[]{SimpleType.STRING, - SimpleType.STRING, - ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE), - SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE, - SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE}; - COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(), - "StreamState", - ITEM_NAMES, - ITEM_DESCS, - ITEM_TYPES); - } - catch (OpenDataException e) - { + static { + try { + ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, + ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE), SimpleType.LONG, SimpleType.LONG, + SimpleType.DOUBLE, SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE }; + COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(), "StreamState", ITEM_NAMES, ITEM_DESCS, + ITEM_TYPES); + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static CompositeData toCompositeData(final StreamState streamState) - { + public static CompositeData toCompositeData(final StreamState streamState) { Map valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], streamState.planId.toString()); valueMap.put(ITEM_NAMES[1], streamState.description); CompositeData[] sessions = new CompositeData[streamState.sessions.size()]; - Lists.newArrayList(Iterables.transform(streamState.sessions, new Function() - { - public CompositeData apply(SessionInfo input) - { - return SessionInfoCompositeData.toCompositeData(streamState.planId, input); - } - })).toArray(sessions); + Lists.newArrayList(Iterables.transform(streamState.sessions, + input -> SessionInfoCompositeData.toCompositeData(streamState.planId, input))).toArray(sessions); valueMap.put(ITEM_NAMES[2], sessions); long currentRxBytes = 0; long totalRxBytes = 0; long currentTxBytes = 0; long totalTxBytes = 0; - for (SessionInfo sessInfo : streamState.sessions) - { + for (SessionInfo sessInfo : streamState.sessions) { currentRxBytes += sessInfo.getTotalSizeReceived(); totalRxBytes += sessInfo.getTotalSizeToReceive(); currentTxBytes += sessInfo.getTotalSizeSent(); @@ -112,30 +101,20 @@ public class StreamStateCompositeData valueMap.put(ITEM_NAMES[7], totalTxBytes); valueMap.put(ITEM_NAMES[8], txPercentage); - try - { + try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); - } - catch (OpenDataException e) - { + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static StreamState fromCompositeData(CompositeData cd) - { + public static StreamState fromCompositeData(CompositeData cd) { assert cd.getCompositeType().equals(COMPOSITE_TYPE); Object[] values = cd.getAll(ITEM_NAMES); UUID planId = UUID.fromString((String) values[0]); String description = (String) values[1]; Set sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]), - new Function() - { - public SessionInfo apply(CompositeData input) - { - return SessionInfoCompositeData.fromCompositeData(input); - } - })); + input -> SessionInfoCompositeData.fromCompositeData(input))); return new StreamState(planId, description, sessions); } } diff --git a/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java index d649aff..93f39c0 100644 --- a/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java +++ b/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java @@ -27,63 +27,51 @@ package org.apache.cassandra.streaming.management; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import javax.management.openmbean.*; -import com.google.common.base.Throwables; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; import org.apache.cassandra.streaming.StreamSummary; +import com.google.common.base.Throwables; + /** */ -public class StreamSummaryCompositeData -{ - private static final String[] ITEM_NAMES = new String[]{"cfId", - "files", - "totalSize"}; - private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID", - "Number of files", - "Total bytes of the files"}; - private static final OpenType[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, - SimpleType.INTEGER, - SimpleType.LONG}; +public class StreamSummaryCompositeData { + private static final String[] ITEM_NAMES = new String[] { "cfId", "files", "totalSize" }; + private static final String[] ITEM_DESCS = new String[] { "ColumnFamilu ID", "Number of files", + "Total bytes of the files" }; + private static final OpenType[] ITEM_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, + SimpleType.LONG }; public static final CompositeType COMPOSITE_TYPE; - static { - try - { - COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(), - "StreamSummary", - ITEM_NAMES, - ITEM_DESCS, - ITEM_TYPES); - } - catch (OpenDataException e) - { + static { + try { + COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(), "StreamSummary", ITEM_NAMES, ITEM_DESCS, + ITEM_TYPES); + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static CompositeData toCompositeData(StreamSummary streamSummary) - { + public static CompositeData toCompositeData(StreamSummary streamSummary) { Map valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString()); valueMap.put(ITEM_NAMES[1], streamSummary.files); valueMap.put(ITEM_NAMES[2], streamSummary.totalSize); - try - { + try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); - } - catch (OpenDataException e) - { + } catch (OpenDataException e) { throw Throwables.propagate(e); } } - public static StreamSummary fromCompositeData(CompositeData cd) - { + public static StreamSummary fromCompositeData(CompositeData cd) { Object[] values = cd.getAll(ITEM_NAMES); - return new StreamSummary(UUID.fromString((String) values[0]), - (int) values[1], - (long) values[2]); + return new StreamSummary(UUID.fromString((String) values[0]), (int) values[1], (long) values[2]); } }