diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index effff96..a10880c 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -22,21 +22,33 @@ */ package org.apache.cassandra.service; -import java.io.*; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import javax.json.Json; import javax.json.JsonArray; import javax.json.JsonObject; -import javax.json.JsonWriter; -import javax.management.*; +import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; import javax.management.openmbean.TabularData; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; @@ -45,11 +57,9 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.streaming.StreamManager; -import com.scylladb.jmx.api.APIClient; -import com.scylladb.jmx.api.APIConfig; -import com.scylladb.jmx.utils.FileUtils; - import com.google.common.base.Joiner; +import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.utils.FileUtils; /** * This abstraction contains the token/identifier of this node on the identifier @@ -63,6 +73,7 @@ public class StorageService extends NotificationBroadcasterSupport private APIClient c = new APIClient(); private static Timer timer = new Timer("Storage Service Repair"); + @SuppressWarnings("unused") private StorageMetrics metrics = new StorageMetrics(); public static final StorageService instance = new StorageService(); @@ -104,6 +115,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of IP addresses, as Strings */ + @Override public List getLiveNodes() { log(" getLiveNodes()"); return c.getListStrValue("/gossiper/endpoint/live"); @@ -115,6 +127,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of IP addresses, as Strings */ + @Override public List getUnreachableNodes() { log(" getUnreachableNodes()"); return c.getListStrValue("/gossiper/endpoint/down"); @@ -125,6 +138,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of IP addresses, as Strings */ + @Override public List getJoiningNodes() { log(" getJoiningNodes()"); return c.getListStrValue("/storage_service/nodes/joining"); @@ -135,6 +149,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of IP addresses, as Strings */ + @Override public List getLeavingNodes() { log(" getLeavingNodes()"); return c.getListStrValue("/storage_service/nodes/leaving"); @@ -145,6 +160,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of IP addresses, as Strings */ + @Override public List getMovingNodes() { log(" getMovingNodes()"); return c.getListStrValue("/storage_service/nodes/moving"); @@ -155,6 +171,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return a collection of tokens formatted as strings */ + @Override public List getTokens() { log(" getTokens()"); try { @@ -173,6 +190,7 @@ public class StorageService extends NotificationBroadcasterSupport * string representation of an node * @return a collection of tokens formatted as strings */ + @Override public List getTokens(String endpoint) throws UnknownHostException { log(" getTokens(String endpoint) throws UnknownHostException"); return c.getListStrValue("/storage_service/tokens/" + endpoint); @@ -183,6 +201,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return A string representation of the Cassandra version. */ + @Override public String getReleaseVersion() { log(" getReleaseVersion()"); return c.getStringValue("/storage_service/release_version"); @@ -193,6 +212,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return A string representation of the Schema version. */ + @Override public String getSchemaVersion() { log(" getSchemaVersion()"); return c.getStringValue("/storage_service/schema_version"); @@ -203,6 +223,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return String array of all locations */ + @Override public String[] getAllDataFileLocations() { log(" getAllDataFileLocations()"); return c.getStringArrValue("/storage_service/data_file/locations"); @@ -213,6 +234,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return a string path */ + @Override public String getCommitLogLocation() { log(" getCommitLogLocation()"); return c.getStringValue("/storage_service/commitlog"); @@ -223,6 +245,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return a string path */ + @Override public String getSavedCachesLocation() { log(" getSavedCachesLocation()"); return c.getStringValue("/storage_service/saved_caches/location"); @@ -234,6 +257,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return mapping of ranges to end points */ + @Override public Map, List> getRangeToEndpointMap( String keyspace) { log(" getRangeToEndpointMap(String keyspace)"); @@ -246,6 +270,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return mapping of ranges to rpc addresses */ + @Override public Map, List> getRangeToRpcaddressMap( String keyspace) { log(" getRangeToRpcaddressMap(String keyspace)"); @@ -265,6 +290,7 @@ public class StorageService extends NotificationBroadcasterSupport * @return a List of TokenRange(s) converted to String for the given * keyspace */ + @Override public List describeRingJMX(String keyspace) throws IOException { log(" describeRingJMX(String keyspace) throws IOException"); JsonArray arr = c.getJsonArray("/storage_service/describe_ring/" + keyspace); @@ -325,6 +351,7 @@ public class StorageService extends NotificationBroadcasterSupport * the keyspace to get the pending range map for. * @return a map of pending ranges to endpoints */ + @Override public Map, List> getPendingRangeToEndpointMap( String keyspace) { log(" getPendingRangeToEndpointMap(String keyspace)"); @@ -337,6 +364,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return a map of tokens to endpoints in ascending order */ + @Override public Map getTokenToEndpointMap() { log(" getTokenToEndpointMap()"); Map mapInetAddress = c.getMapStrValue("/storage_service/tokens_endpoint"); @@ -352,6 +380,7 @@ public class StorageService extends NotificationBroadcasterSupport } /** Retrieve this hosts unique ID */ + @Override public String getLocalHostId() { log(" getLocalHostId()"); return c.getStringValue("/storage_service/hostid/local"); @@ -365,6 +394,7 @@ public class StorageService extends NotificationBroadcasterSupport return getHostIdToAddressMap().get(getLocalHostId()); } /** Retrieve the mapping of endpoint to host ID */ + @Override public Map getHostIdMap() { log(" getHostIdMap()"); return c.getMapStrValue("/storage_service/host_id"); @@ -388,12 +418,14 @@ public class StorageService extends NotificationBroadcasterSupport } /** Human-readable load value */ + @Override public String getLoadString() { log(" getLoadString()"); return FileUtils.stringifyFileSize(getLoad()); } /** Human-readable load value. Keys are IP addresses. */ + @Override public Map getLoadMap() { log(" getLoadMap()"); Map load = getLoadMapAsDouble(); @@ -415,6 +447,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return generation number */ + @Override public int getCurrentGenerationNumber() { log(" getCurrentGenerationNumber()"); return c.getIntValue("/storage_service/generation_number"); @@ -432,6 +465,7 @@ public class StorageService extends NotificationBroadcasterSupport * - key for which we need to find the endpoint return value - * the endpoint responsible for this key */ + @Override public List getNaturalEndpoints(String keyspaceName, String cf, String key) { log(" getNaturalEndpoints(String keyspaceName, String cf, String key)"); @@ -443,6 +477,7 @@ public class StorageService extends NotificationBroadcasterSupport queryParams); } + @Override public List getNaturalEndpoints(String keyspaceName, ByteBuffer key) { log(" getNaturalEndpoints(String keyspaceName, ByteBuffer key)"); @@ -458,6 +493,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param keyspaceNames * the name of the keyspaces to snapshot; empty means "all." */ + @Override public void takeSnapshot(String tag, String... keyspaceNames) throws IOException { log(" takeSnapshot(String tag, String... keyspaceNames) throws IOException"); @@ -479,6 +515,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param tag * the tag given to the snapshot; may not be null or empty */ + @Override public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException { log(" takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException"); @@ -499,6 +536,7 @@ public class StorageService extends NotificationBroadcasterSupport * Remove the snapshot with the given name from the given keyspaces. If no * tag is specified we will remove all snapshots. */ + @Override public void clearSnapshot(String tag, String... keyspaceNames) throws IOException { log(" clearSnapshot(String tag, String... keyspaceNames) throws IOException"); @@ -514,6 +552,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return A map of snapshotName to all its details in Tabular form. */ + @Override public Map getSnapshotDetails() { log(" getSnapshotDetails()"); return c.getMapStringSnapshotTabularDataValue( @@ -546,6 +585,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return True size taken by all the snapshots. */ + @Override public long trueSnapshotsSize() { log(" trueSnapshotsSize()"); return c.getLongValue("/storage_service/snapshots/size/true"); @@ -568,6 +608,7 @@ public class StorageService extends NotificationBroadcasterSupport /** * Trigger a cleanup of keys on a single keyspace */ + @Override public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { @@ -586,6 +627,7 @@ public class StorageService extends NotificationBroadcasterSupport * * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ + @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { @@ -613,6 +655,7 @@ public class StorageService extends NotificationBroadcasterSupport * Rewrite all sstables to the latest version. Unlike scrub, it doesn't skip * bad rows and do not snapshot sstables first. */ + @Override public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, @@ -635,6 +678,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param columnFamilies * @throws IOException */ + @Override public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { @@ -721,6 +765,7 @@ public class StorageService extends NotificationBroadcasterSupport * repair option. * @return Repair command number, or 0 if nothing to repair */ + @Override public int repairAsync(String keyspace, Map options) { log(" repairAsync(String keyspace, Map options)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -748,6 +793,7 @@ public class StorageService extends NotificationBroadcasterSupport return repairAsync(keyspace, options); } + @Override public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, @@ -756,15 +802,6 @@ public class StorageService extends NotificationBroadcasterSupport return c.getIntValue(""); } - @Deprecated - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, RepairParallelism parallelismDegree, - Collection dataCenters, Collection hosts, - boolean fullRepair, String... columnFamilies) { - log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies)"); - return c.getIntValue(""); - } - @Override public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, @@ -774,6 +811,7 @@ public class StorageService extends NotificationBroadcasterSupport return repairAsync(keyspace, options); } + @Override @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, @@ -782,6 +820,7 @@ public class StorageService extends NotificationBroadcasterSupport return c.getIntValue(""); } + @Override public void forceTerminateAllRepairSessions() { log(" forceTerminateAllRepairSessions()"); c.post("/storage_service/force_terminate"); @@ -790,6 +829,7 @@ public class StorageService extends NotificationBroadcasterSupport /** * transfer this node's data to other machines and remove it from service. */ + @Override public void decommission() throws InterruptedException { log(" decommission() throws InterruptedException"); c.post("/storage_service/decommission"); @@ -800,6 +840,7 @@ public class StorageService extends NotificationBroadcasterSupport * token to move this node to. This node will unload its data * onto its neighbors, and bootstrap to the new token. */ + @Override public void move(String newToken) throws IOException { log(" move(String newToken) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -812,6 +853,7 @@ public class StorageService extends NotificationBroadcasterSupport * it) from the ring * @param hostIdString the host id to remove */ + @Override public void removeNode(String hostIdString) { log(" removeNode(String token)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -822,6 +864,7 @@ public class StorageService extends NotificationBroadcasterSupport /** * Get the status of a token removal. */ + @Override public String getRemovalStatus() { log(" getRemovalStatus()"); return c.getStringValue("/storage_service/removal_status"); @@ -830,6 +873,7 @@ public class StorageService extends NotificationBroadcasterSupport /** * Force a remove operation to finish. */ + @Override public void forceRemoveCompletion() { log(" forceRemoveCompletion()"); c.post("/storage_service/force_remove_completion"); @@ -854,6 +898,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @see ch.qos.logback.classic.Level#toLevel(String) */ + @Override public void setLoggingLevel(String classQualifier, String level) throws Exception { log(" setLoggingLevel(String classQualifier, String level) throws Exception"); @@ -863,6 +908,7 @@ public class StorageService extends NotificationBroadcasterSupport } /** get the runtime logging levels */ + @Override public Map getLoggingLevels() { log(" getLoggingLevels()"); return c.getMapStrValue("/storage_service/logging_level"); @@ -872,18 +918,21 @@ public class StorageService extends NotificationBroadcasterSupport * get the operational mode (leaving, joining, normal, decommissioned, * client) **/ + @Override public String getOperationMode() { log(" getOperationMode()"); return c.getStringValue("/storage_service/operation_mode"); } /** Returns whether the storage service is starting or not */ + @Override public boolean isStarting() { log(" isStarting()"); return c.getBooleanValue("/storage_service/is_starting"); } /** get the progress of a drain operation */ + @Override public String getDrainProgress() { log(" getDrainProgress()"); // FIXME @@ -897,6 +946,7 @@ public class StorageService extends NotificationBroadcasterSupport * makes node unavailable for writes, flushes memtables and replays * commitlog. */ + @Override public void drain() throws IOException, InterruptedException, ExecutionException { log(" drain() throws IOException, InterruptedException, ExecutionException"); @@ -915,6 +965,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param columnFamily * The column family to delete data from. */ + @Override public void truncate(String keyspace, String columnFamily) throws TimeoutException, IOException { log(" truncate(String keyspace, String columnFamily)throws TimeoutException, IOException"); @@ -927,6 +978,7 @@ public class StorageService extends NotificationBroadcasterSupport * given a list of tokens (representing the nodes in the cluster), returns a * mapping from "token -> %age of cluster owned by that token" */ + @Override public Map getOwnership() { log(" getOwnership()"); return c.getMapInetAddressFloatValue("/storage_service/ownership/"); @@ -939,6 +991,7 @@ public class StorageService extends NotificationBroadcasterSupport * the same replication strategies and if yes then we will use the first * else a empty Map is returned. */ + @Override public Map effectiveOwnership(String keyspace) throws IllegalStateException { log(" effectiveOwnership(String keyspace) throws IllegalStateException"); @@ -949,11 +1002,10 @@ public class StorageService extends NotificationBroadcasterSupport } } + @Override public List getKeyspaces() { log(" getKeyspaces()"); - MultivaluedMap queryParams = new MultivaluedHashMap(); - queryParams.add("non_system", "true"); - return c.getListStrValue("/storage_service/keyspaces", queryParams); + return c.getListStrValue("/storage_service/keyspaces"); } public Map> getColumnFamilyPerKeyspace() { @@ -973,9 +1025,12 @@ public class StorageService extends NotificationBroadcasterSupport return res; } + @Override public List getNonSystemKeyspaces() { log(" getNonSystemKeyspaces()"); - return c.getListStrValue("/storage_service/keyspaces"); + MultivaluedMap queryParams = new MultivaluedHashMap(); + queryParams.add("type", "user"); + return c.getListStrValue("/storage_service/keyspaces", queryParams); } /** @@ -994,6 +1049,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param dynamicBadnessThreshold * double, (default 0.0) */ + @Override public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { @@ -1018,85 +1074,93 @@ public class StorageService extends NotificationBroadcasterSupport } // allows a user to forcibly 'kill' a sick node + @Override public void stopGossiping() { log(" stopGossiping()"); c.delete("/storage_service/gossiping"); } // allows a user to recover a forcibly 'killed' node + @Override public void startGossiping() { log(" startGossiping()"); c.post("/storage_service/gossiping"); } // allows a user to see whether gossip is running or not + @Override public boolean isGossipRunning() { log(" isGossipRunning()"); return c.getBooleanValue("/storage_service/gossiping"); } // allows a user to forcibly completely stop cassandra + @Override public void stopDaemon() { log(" stopDaemon()"); c.post("/storage_service/stop_daemon"); } // to determine if gossip is disabled + @Override public boolean isInitialized() { log(" isInitialized()"); return c.getBooleanValue("/storage_service/is_initialized"); } // allows a user to disable thrift + @Override public void stopRPCServer() { log(" stopRPCServer()"); c.delete("/storage_service/rpc_server"); } // allows a user to reenable thrift + @Override public void startRPCServer() { log(" startRPCServer()"); c.post("/storage_service/rpc_server"); } // to determine if thrift is running + @Override public boolean isRPCServerRunning() { log(" isRPCServerRunning()"); return c.getBooleanValue("/storage_service/rpc_server"); } + @Override public void stopNativeTransport() { log(" stopNativeTransport()"); c.delete("/storage_service/native_transport"); } + @Override public void startNativeTransport() { log(" startNativeTransport()"); c.post("/storage_service/native_transport"); } + @Override public boolean isNativeTransportRunning() { log(" isNativeTransportRunning()"); return c.getBooleanValue("/storage_service/native_transport"); } // allows a node that have been started without joining the ring to join it + @Override public void joinRing() throws IOException { log(" joinRing() throws IOException"); c.post("/storage_service/join_ring"); } + @Override public boolean isJoined() { log(" isJoined()"); return c.getBooleanValue("/storage_service/join_ring"); } - @Deprecated - public int getExceptionCount() { - log(" getExceptionCount()"); - return c.getIntValue(""); - } - + @Override public void setStreamThroughputMbPerSec(int value) { log(" setStreamThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1104,6 +1168,7 @@ public class StorageService extends NotificationBroadcasterSupport c.post("/storage_service/stream_throughput", queryParams); } + @Override public int getStreamThroughputMbPerSec() { log(" getStreamThroughputMbPerSec()"); return c.getIntValue("/storage_service/stream_throughput"); @@ -1114,6 +1179,7 @@ public class StorageService extends NotificationBroadcasterSupport return c.getIntValue("/storage_service/compaction_throughput"); } + @Override public void setCompactionThroughputMbPerSec(int value) { log(" setCompactionThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1121,11 +1187,13 @@ public class StorageService extends NotificationBroadcasterSupport c.post("/storage_service/compaction_throughput", queryParams); } + @Override public boolean isIncrementalBackupsEnabled() { log(" isIncrementalBackupsEnabled()"); return c.getBooleanValue("/storage_service/incremental_backups"); } + @Override public void setIncrementalBackupsEnabled(boolean value) { log(" setIncrementalBackupsEnabled(boolean value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1143,6 +1211,7 @@ public class StorageService extends NotificationBroadcasterSupport * Name of DC from which to select sources for streaming or null * to pick any node */ + @Override public void rebuild(String sourceDc) { log(" rebuild(String sourceDc)"); if (sourceDc != null) { @@ -1155,6 +1224,7 @@ public class StorageService extends NotificationBroadcasterSupport } /** Starts a bulk load and blocks until it completes. */ + @Override public void bulkLoad(String directory) { log(" bulkLoad(String directory)"); c.post("/storage_service/bulk_load/" + directory); @@ -1164,12 +1234,14 @@ public class StorageService extends NotificationBroadcasterSupport * Starts a bulk load asynchronously and returns the String representation * of the planID for the new streaming session. */ + @Override public String bulkLoadAsync(String directory) { log(" bulkLoadAsync(String directory)"); return c.getStringValue( "/storage_service/bulk_load_async/" + directory); } + @Override public void rescheduleFailedDeletions() { log(" rescheduleFailedDeletions()"); c.post("/storage_service/reschedule_failed_deletions"); @@ -1183,6 +1255,7 @@ public class StorageService extends NotificationBroadcasterSupport * @param cfName * The ColumnFamily name where SSTables belong */ + @Override public void loadNewSSTables(String ksName, String cfName) { log(" loadNewSSTables(String ksName, String cfName)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1200,6 +1273,7 @@ public class StorageService extends NotificationBroadcasterSupport * * @return set of Tokens as Strings */ + @Override public List sampleKeyRange() { log(" sampleKeyRange()"); return c.getListStrValue("/storage_service/sample_key_range"); @@ -1208,11 +1282,13 @@ public class StorageService extends NotificationBroadcasterSupport /** * rebuild the specified indexes */ + @Override public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { log(" rebuildSecondaryIndex(String ksName, String cfName, String... idxNames)"); } + @Override public void resetLocalSchema() throws IOException { log(" resetLocalSchema() throws IOException"); c.post("/storage_service/relocal_schema"); @@ -1228,6 +1304,7 @@ public class StorageService extends NotificationBroadcasterSupport * enable tracing for all requests (which mich severely cripple * the system) */ + @Override public void setTraceProbability(double probability) { log(" setTraceProbability(double probability)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1238,11 +1315,13 @@ public class StorageService extends NotificationBroadcasterSupport /** * Returns the configured tracing probability. */ + @Override public double getTraceProbability() { log(" getTraceProbability()"); return c.getDoubleValue("/storage_service/trace_probability"); } + @Override public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("disableAutoCompaction(String ks, String... columnFamilies)"); @@ -1252,6 +1331,7 @@ public class StorageService extends NotificationBroadcasterSupport c.delete("/storage_service/auto_compaction/", queryParams); } + @Override public void enableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("enableAutoCompaction(String ks, String... columnFamilies)"); @@ -1267,6 +1347,7 @@ public class StorageService extends NotificationBroadcasterSupport } + @Override public void deliverHints(String host) throws UnknownHostException { log(" deliverHints(String host) throws UnknownHostException"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1275,24 +1356,28 @@ public class StorageService extends NotificationBroadcasterSupport } /** Returns the name of the cluster */ + @Override public String getClusterName() { log(" getClusterName()"); return c.getStringValue("/storage_service/cluster_name"); } /** Returns the cluster partitioner */ + @Override public String getPartitionerName() { log(" getPartitionerName()"); return c.getStringValue("/storage_service/partitioner_name"); } /** Returns the threshold for warning of queries with many tombstones */ + @Override public int getTombstoneWarnThreshold() { log(" getTombstoneWarnThreshold()"); return c.getIntValue("/storage_service/tombstone_warn_threshold"); } /** Sets the threshold for warning queries with many tombstones */ + @Override public void setTombstoneWarnThreshold(int tombstoneDebugThreshold) { log(" setTombstoneWarnThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1302,12 +1387,14 @@ public class StorageService extends NotificationBroadcasterSupport } /** Returns the threshold for abandoning queries with many tombstones */ + @Override public int getTombstoneFailureThreshold() { log(" getTombstoneFailureThreshold()"); return c.getIntValue("/storage_service/tombstone_failure_threshold"); } /** Sets the threshold for abandoning queries with many tombstones */ + @Override public void setTombstoneFailureThreshold(int tombstoneDebugThreshold) { log(" setTombstoneFailureThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1317,12 +1404,14 @@ public class StorageService extends NotificationBroadcasterSupport } /** Returns the threshold for rejecting queries due to a large batch size */ + @Override public int getBatchSizeFailureThreshold() { log(" getBatchSizeFailureThreshold()"); return c.getIntValue("/storage_service/batch_size_failure_threshold"); } /** Sets the threshold for rejecting queries due to a large batch size */ + @Override public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold) { log(" setBatchSizeFailureThreshold(int batchSizeDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1333,6 +1422,7 @@ public class StorageService extends NotificationBroadcasterSupport /** * Sets the hinted handoff throttle in kb per second, per delivery thread. */ + @Override public void setHintedHandoffThrottleInKB(int throttleInKB) { log(" setHintedHandoffThrottleInKB(int throttleInKB)"); MultivaluedMap queryParams = new MultivaluedHashMap(); @@ -1445,9 +1535,81 @@ public class StorageService extends NotificationBroadcasterSupport } @Override - public double getTracingProbability() { + public Map getEndpointToHostId() { + return getHostIdMap(); + } + + @Override + public Map getHostIdToEndpoint() { + return getHostIdToAddressMap(); + } + + @Override + public void refreshSizeEstimates() throws ExecutionException { + // TODO Auto-generated method stub + log(" refreshSizeEstimates"); + } + + @Override + public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) + throws IOException, ExecutionException, InterruptedException { + // "splitOutput" afaik not relevant for scylla (yet?...) + forceKeyspaceCompaction(keyspaceName, tableNames); + } + + @Override + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) + throws IOException, ExecutionException, InterruptedException { + // "jobs" not (yet) relevant for scylla. (though possibly useful...) + return forceKeyspaceCleanup(keyspaceName, tables); + } + + @Override + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, + String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + // "jobs" not (yet) relevant for scylla. (though possibly useful...) + return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies); + } + + @Override + public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) + throws IOException, ExecutionException, InterruptedException { // TODO Auto-generated method stub - log(" getTracingProbability()"); - return c.getDoubleValue(""); + log(" verify"); + return 0; + } + + @Override + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) + throws IOException, ExecutionException, InterruptedException { + // "jobs" not (yet) relevant for scylla. (though possibly useful...) + return upgradeSSTables(keyspaceName, excludeCurrentVersion, tableNames); + } + + @Override + public List getNonLocalStrategyKeyspaces() { + log(" getNonLocalStrategyKeyspaces"); + MultivaluedMap queryParams = new MultivaluedHashMap(); + queryParams.add("type", "non_local_strategy"); + return c.getListStrValue("/storage_service/keyspaces", queryParams); + } + + @Override + public void setInterDCStreamThroughputMbPerSec(int value) { + // TODO Auto-generated method stub + log(" setInterDCStreamThroughputMbPerSec"); + } + + @Override + public int getInterDCStreamThroughputMbPerSec() { + // TODO Auto-generated method stub + log(" getInterDCStreamThroughputMbPerSec"); + return 0; + } + + @Override + public boolean resumeBootstrap() { + log(" resumeBootstrap"); + return false; } } diff --git a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java index 9acbbc8..4234a88 100644 --- a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -15,6 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + package org.apache.cassandra.service; import java.io.IOException; @@ -173,13 +180,11 @@ public interface StorageServiceMBean extends NotificationEmitter { /** Retrieve the mapping of endpoint to host ID */ public Map getHostIdMap(); - /** - * Numeric load value. - * - * @see org.apache.cassandra.metrics.StorageMetrics#load - */ - @Deprecated - public double getLoad(); + /** Retrieve the mapping of endpoint to host ID */ + public Map getEndpointToHostId(); + + /** Retrieve the mapping of host ID to endpoint */ + public Map getHostIdToEndpoint(); /** Human-readable load value */ public String getLoadString(); @@ -224,6 +229,17 @@ public interface StorageServiceMBean extends NotificationEmitter { public void takeSnapshot(String tag, String... keyspaceNames) throws IOException; + /** + * Takes the snapshot of a specific column family. A snapshot name must be specified. + * + * @param keyspaceName the keyspace which holds the specified column family + * @param tableName the table to snapshot + * @param tag the tag given to the snapshot; may not be null or empty + */ + default void takeTableSnapshot(String keyspaceName, String tableName, String tag) throws IOException { + takeColumnFamilySnapshot(keyspaceName, tableName, tag); + } + /** * Takes the snapshot of a specific column family. A snapshot name must be * specified. @@ -272,19 +288,22 @@ public interface StorageServiceMBean extends NotificationEmitter { */ public long trueSnapshotsSize(); + /** + * Forces refresh of values stored in system.size_estimates of all column families. + */ + public void refreshSizeEstimates() throws ExecutionException; + /** * Forces major compaction of a single keyspace */ - public void forceKeyspaceCompaction(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException; + public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** * Trigger a cleanup of keys on a single keyspace */ - public int forceKeyspaceCleanup(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException; + @Deprecated + public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; /** * Scrub (deserialize + reserialize at the latest version, skipping bad rows @@ -294,23 +313,26 @@ public interface StorageServiceMBean extends NotificationEmitter { * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ @Deprecated - public int scrub(boolean disableSnapshot, boolean skipCorrupted, - String keyspaceName, String... columnFamilies) throws IOException, - ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + @Deprecated + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; - public int scrub(boolean disableSnapshot, boolean skipCorrupted, - boolean checkData, String keyspaceName, String... columnFamilies) - throws IOException, ExecutionException, - InterruptedException; + /** + * Verify (checksums of) the given keyspace. + * If tableNames array is empty, all CFs are verified. + * + * The entire sstable will be read to ensure each cell validates if extendedVerify is true + */ + public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. Unlike scrub, it doesn't skip * bad rows and do not snapshot sstables first. */ - public int upgradeSSTables(String keyspaceName, - boolean excludeCurrentVersion, String... columnFamilies) - throws IOException, ExecutionException, - InterruptedException; + @Deprecated + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** * Flush all memtables for the given column families, or all columnfamilies @@ -325,70 +347,66 @@ public interface StorageServiceMBean extends NotificationEmitter { InterruptedException; /** - * Invoke repair asynchronously. You can track repair progress by - * subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: type: "repair" userObject: int array of length 2, - * [0]=command number, [1]=ordinal of ActiveRepairService.Status + * Invoke repair asynchronously. + * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: + * type: "repair" + * userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status * + * @param keyspace Keyspace name to repair. Should not be null. + * @param options repair option. * @return Repair command number, or 0 if nothing to repair */ - public int forceRepairAsync(String keyspace, boolean isSequential, - Collection dataCenters, Collection hosts, - boolean primaryRange, boolean repairedAt, String... columnFamilies) - throws IOException; + public int repairAsync(String keyspace, Map options); /** - * Invoke repair asynchronously. You can track repair progress by - * subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: type: "repair" userObject: int array of length 2, - * [0]=command number, [1]=ordinal of ActiveRepairService.Status + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + */ + @Deprecated + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames) throws IOException; + + /** + * Invoke repair asynchronously. + * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: + * type: "repair" + * userObject: int array of length 2, [0]=command number, [1]=ordinal of ActiveRepairService.Status * - * @param parallelismDegree - * 0: sequential, 1: parallel, 2: DC parallel + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + * + * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel * @return Repair command number, or 0 if nothing to repair */ - public int forceRepairAsync(String keyspace, int parallelismDegree, - Collection dataCenters, Collection hosts, - boolean primaryRange, boolean fullRepair, String... columnFamilies); + @Deprecated + public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... tableNames); - public int forceRepairAsync(String keyspace); /** - * Same as forceRepairAsync, but handles a specified range + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. */ - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, boolean isSequential, - Collection dataCenters, Collection hosts, - boolean repairedAt, String... columnFamilies) throws IOException; + @Deprecated + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames) throws IOException; /** * Same as forceRepairAsync, but handles a specified range * - * @param parallelismDegree - * 0: sequential, 1: parallel, 2: DC parallel - */ - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, int parallelismDegree, - Collection dataCenters, Collection hosts, - boolean fullRepair, String... columnFamilies); - - /** - * Invoke repair asynchronously. You can track repair progress by - * subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: type: "repair" userObject: int array of length 2, - * [0]=command number, [1]=ordinal of ActiveRepairService.Status + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. * - * @return Repair command number, or 0 if nothing to repair + * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel */ - public int forceRepairAsync(String keyspace, boolean isSequential, - boolean isLocal, boolean primaryRange, boolean fullRepair, - String... columnFamilies); + @Deprecated + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... tableNames); /** - * Same as forceRepairAsync, but handles a specified range + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. */ - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, boolean isSequential, boolean isLocal, - boolean repairedAt, String... columnFamilies); + @Deprecated + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... tableNames); + + /** + * @deprecated use {@link #repairAsync(String keyspace, Map options)} instead. + */ + @Deprecated + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... tableNames); public void forceTerminateAllRepairSessions(); @@ -497,6 +515,10 @@ public interface StorageServiceMBean extends NotificationEmitter { public List getKeyspaces(); + public List getNonSystemKeyspaces(); + + public List getNonLocalStrategyKeyspaces(); + /** * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at * runtime @@ -552,14 +574,12 @@ public interface StorageServiceMBean extends NotificationEmitter { public boolean isJoined(); - @Deprecated - public int getExceptionCount(); - public void setStreamThroughputMbPerSec(int value); public int getStreamThroughputMbPerSec(); - public int getCompactionThroughputMbPerSec(); + public void setInterDCStreamThroughputMbPerSec(int value); + public int getInterDCStreamThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); @@ -635,7 +655,7 @@ public interface StorageServiceMBean extends NotificationEmitter { /** * Returns the configured tracing probability. */ - public double getTracingProbability(); + public double getTraceProbability(); void disableAutoCompaction(String ks, String... columnFamilies) throws IOException; @@ -663,8 +683,16 @@ public interface StorageServiceMBean extends NotificationEmitter { /** Sets the threshold for abandoning queries with many tombstones */ public void setTombstoneFailureThreshold(int tombstoneDebugThreshold); + /** Returns the threshold for rejecting queries due to a large batch size */ + public int getBatchSizeFailureThreshold(); + /** Sets the threshold for rejecting queries due to a large batch size */ + public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold); + + /** Sets the hinted handoff throttle in kb per second, per delivery thread. */ + public void setHintedHandoffThrottleInKB(int throttleInKB); + /** * Sets the hinted handoff throttle in kb per second, per delivery thread. */ - public void setHintedHandoffThrottleInKB(int throttleInKB); + public boolean resumeBootstrap(); }