/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Copyright 2015 Cloudius Systems * * Modified by Cloudius Systems */ package org.apache.cassandra.service; import static java.util.Arrays.asList; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import java.util.stream.Collectors; import javax.json.JsonArray; import javax.json.JsonObject; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; import javax.management.NotificationBroadcaster; import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.repair.RepairParallelism; import com.google.common.base.Joiner; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; import com.scylladb.jmx.utils.FileUtils; /** * This abstraction contains the token/identifier of this node on the identifier * space. This token gets gossiped around. This class will also maintain * histograms of the load information of other nodes in the cluster. */ public class StorageService extends MetricsMBean implements StorageServiceMBean, NotificationBroadcaster { private static final Logger logger = Logger.getLogger(StorageService.class.getName()); private static final Timer timer = new Timer("Storage Service Repair", true); private final NotificationBroadcasterSupport notificationBroadcasterSupport = new NotificationBroadcasterSupport(); @Override public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) { notificationBroadcasterSupport.addNotificationListener(listener, filter, handback); } @Override public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException { notificationBroadcasterSupport.removeNotificationListener(listener); } @Override public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException { notificationBroadcasterSupport.removeNotificationListener(listener, filter, handback); } @Override public MBeanNotificationInfo[] getNotificationInfo() { return notificationBroadcasterSupport.getNotificationInfo(); } public void sendNotification(Notification notification) { notificationBroadcasterSupport.sendNotification(notification); } public static enum RepairStatus { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED } /* JMX notification serial number counter */ private final AtomicLong notificationSerialNumber = new AtomicLong(); public StorageService(APIClient client) { super("org.apache.cassandra.db:type=StorageService", client, new StorageMetrics()); } public void log(String str) { logger.finest(str); } /** * Retrieve the list of live nodes in the cluster, where "liveness" is * determined by the failure detector of the node being queried. * * @return set of IP addresses, as Strings */ @Override public List getLiveNodes() { log(" getLiveNodes()"); return client.getListStrValue("/gossiper/endpoint/live"); } /** * Retrieve the list of unreachable nodes in the cluster, as determined by * this node's failure detector. * * @return set of IP addresses, as Strings */ @Override public List getUnreachableNodes() { log(" getUnreachableNodes()"); return client.getListStrValue("/gossiper/endpoint/down"); } /** * Retrieve the list of nodes currently bootstrapping into the ring. * * @return set of IP addresses, as Strings */ @Override public List getJoiningNodes() { log(" getJoiningNodes()"); return client.getListStrValue("/storage_service/nodes/joining"); } /** * Retrieve the list of nodes currently leaving the ring. * * @return set of IP addresses, as Strings */ @Override public List getLeavingNodes() { log(" getLeavingNodes()"); return client.getListStrValue("/storage_service/nodes/leaving"); } /** * Retrieve the list of nodes currently moving in the ring. * * @return set of IP addresses, as Strings */ @Override public List getMovingNodes() { log(" getMovingNodes()"); return client.getListStrValue("/storage_service/nodes/moving"); } /** * Fetch string representations of the tokens for this node. * * @return a collection of tokens formatted as strings */ @Override public List getTokens() { log(" getTokens()"); try { return getTokens(getLocalBroadCastingAddress()); } catch (UnknownHostException e) { // We should never reach here, // but it makes the compiler happy return null; } } /** * Fetch string representations of the tokens for a specified node. * * @param endpoint * string representation of an node * @return a collection of tokens formatted as strings */ @Override public List getTokens(String endpoint) throws UnknownHostException { log(" getTokens(String endpoint) throws UnknownHostException"); return client.getListStrValue("/storage_service/tokens/" + endpoint); } /** * Fetch a string representation of the Cassandra version. * * @return A string representation of the Cassandra version. */ @Override public String getReleaseVersion() { log(" getReleaseVersion()"); return client.getStringValue("/storage_service/release_version"); } /** * Fetch a string representation of the current Schema version. * * @return A string representation of the Schema version. */ @Override public String getSchemaVersion() { log(" getSchemaVersion()"); return client.getStringValue("/storage_service/schema_version"); } /** * Get the list of all data file locations from conf * * @return String array of all locations */ @Override public String[] getAllDataFileLocations() { log(" getAllDataFileLocations()"); return client.getStringArrValue("/storage_service/data_file/locations"); } /** * Get location of the commit log * * @return a string path */ @Override public String getCommitLogLocation() { log(" getCommitLogLocation()"); return client.getStringValue("/storage_service/commitlog"); } /** * Get location of the saved caches dir * * @return a string path */ @Override public String getSavedCachesLocation() { log(" getSavedCachesLocation()"); return client.getStringValue("/storage_service/saved_caches/location"); } /** * Retrieve a map of range to end points that describe the ring topology of * a Cassandra cluster. * * @return mapping of ranges to end points */ @Override public Map, List> getRangeToEndpointMap(String keyspace) { log(" getRangeToEndpointMap(String keyspace)"); return client.getMapListStrValue("/storage_service/range_to_endpoint_map/" + keyspace); } /** * Retrieve a map of range to rpc addresses that describe the ring topology * of a Cassandra cluster. * * @return mapping of ranges to rpc addresses */ @Override public Map, List> getRangeToRpcaddressMap(String keyspace) { log(" getRangeToRpcaddressMap(String keyspace)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("rpc", "true"); return client.getMapListStrValue("/storage_service/range/" + keyspace, queryParams); } /** * The same as {@code describeRing(String)} but converts TokenRange to the * String for JMX compatibility * * @param keyspace * The keyspace to fetch information about * * @return a List of TokenRange(s) converted to String for the given * keyspace */ @Override public List describeRingJMX(String keyspace) throws IOException { log(" describeRingJMX(String keyspace) throws IOException"); JsonArray arr = client.getJsonArray("/storage_service/describe_ring/" + keyspace); List res = new ArrayList(); for (int i = 0; i < arr.size(); i++) { JsonObject obj = arr.getJsonObject(i); StringBuilder sb = new StringBuilder(); sb.append("TokenRange("); sb.append("start_token:"); sb.append(obj.getString("start_token")); sb.append(", end_token:"); sb.append(obj.getString("end_token")); sb.append(", endpoints:["); JsonArray endpoints = obj.getJsonArray("endpoints"); for (int j = 0; j < endpoints.size(); j++) { if (j > 0) { sb.append(", "); } sb.append(endpoints.getString(j)); } sb.append("], rpc_endpoints:["); JsonArray rpc_endpoints = obj.getJsonArray("rpc_endpoints"); for (int j = 0; j < rpc_endpoints.size(); j++) { if (j > 0) { sb.append(", "); } sb.append(rpc_endpoints.getString(j)); } sb.append("], endpoint_details:["); JsonArray endpoint_details = obj.getJsonArray("endpoint_details"); for (int j = 0; j < endpoint_details.size(); j++) { JsonObject detail = endpoint_details.getJsonObject(j); if (j > 0) { sb.append(", "); } sb.append("EndpointDetails("); sb.append("host:"); sb.append(detail.getString("host")); sb.append(", datacenter:"); sb.append(detail.getString("datacenter")); sb.append(", rack:"); sb.append(detail.getString("rack")); sb.append(')'); } sb.append("])"); res.add(sb.toString()); } return res; } /** * Retrieve a map of pending ranges to endpoints that describe the ring * topology * * @param keyspace * the keyspace to get the pending range map for. * @return a map of pending ranges to endpoints */ @Override public Map, List> getPendingRangeToEndpointMap(String keyspace) { log(" getPendingRangeToEndpointMap(String keyspace)"); return client.getMapListStrValue("/storage_service/pending_range/" + keyspace); } /** * Retrieve a map of tokens to endpoints, including the bootstrapping ones. * * @return a map of tokens to endpoints in ascending order */ @Override public Map getTokenToEndpointMap() { log(" getTokenToEndpointMap()"); return client.getMapStrValue("/storage_service/tokens_endpoint"); } /** Retrieve this hosts unique ID */ @Override public String getLocalHostId() { log(" getLocalHostId()"); return client.getStringValue("/storage_service/hostid/local"); } public String getLocalBroadCastingAddress() { // FIXME: // There is no straight API to get the broadcasting // address, instead of trying to figure it out from the configuration // we will use the getHostIdToAddressMap with the hostid return getHostIdToAddressMap().get(getLocalHostId()); } /** Retrieve the mapping of endpoint to host ID */ @Override public Map getHostIdMap() { log(" getHostIdMap()"); return client.getMapStrValue("/storage_service/host_id"); } /** Retrieve the mapping of endpoint to host ID */ public Map getHostIdToAddressMap() { log(" getHostIdToAddressMap()"); return client.getReverseMapStrValue("/storage_service/host_id"); } /** * Numeric load value. * * @see org.apache.cassandra.metrics.StorageMetrics#load */ @Deprecated public double getLoad() { log(" getLoad()"); return client.getDoubleValue("/storage_service/load"); } /** Human-readable load value */ @Override public String getLoadString() { log(" getLoadString()"); return FileUtils.stringifyFileSize(getLoad()); } /** Human-readable load value. Keys are IP addresses. */ @Override public Map getLoadMap() { log(" getLoadMap()"); Map load = getLoadMapAsDouble(); Map map = new HashMap<>(); for (Map.Entry entry : load.entrySet()) { map.put(entry.getKey(), FileUtils.stringifyFileSize(entry.getValue())); } return map; } public Map getLoadMapAsDouble() { log(" getLoadMapAsDouble()"); return client.getMapStringDouble("/storage_service/load_map"); } /** * Return the generation value for this node. * * @return generation number */ @Override public int getCurrentGenerationNumber() { log(" getCurrentGenerationNumber()"); return client.getIntValue("/storage_service/generation_number"); } /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication. * * @param keyspaceName * keyspace name * @param cf * Column family name * @param key * - key for which we need to find the endpoint return value - * the endpoint responsible for this key */ @Override public List getNaturalEndpoints(String keyspaceName, String cf, String key) { log(" getNaturalEndpoints(String keyspaceName, String cf, String key)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("cf", cf); queryParams.add("key", key); return client.getListInetAddressValue("/storage_service/natural_endpoints/" + keyspaceName, queryParams); } @Override public List getNaturalEndpoints(String keyspaceName, ByteBuffer key) { log(" getNaturalEndpoints(String keyspaceName, ByteBuffer key)"); return client.getListInetAddressValue(""); } @Override public void checkAndRepairCdcStreams() throws IOException { log(" checkAndRepairCdcStreams() throws IOException"); client.post("/storage_service/cdc_streams_check_and_repair"); } /** * Takes the snapshot for the given keyspaces. A snapshot name must be * specified. * * @param tag * the tag given to the snapshot; may not be null or empty * @param keyspaceNames * the name of the keyspaces to snapshot; empty means "all." */ @Override public void takeSnapshot(String tag, String... keyspaceNames) throws IOException { takeSnapshot(tag, null, keyspaceNames); } @Override public void takeSnapshot(String tag, Map options, String... keyspaceNames) throws IOException { log(" takeSnapshot(String tag, String... keyspaceNames) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "tag", tag); if (keyspaceNames.length == 1 && keyspaceNames[0].indexOf('.') != -1) { String[] parts = keyspaceNames[0].split("\\."); keyspaceNames = new String[] { parts[0] }; APIClient.set_query_param(queryParams, "cf", parts[1]); } APIClient.set_query_param(queryParams, "kn", APIClient.join(keyspaceNames)); // TODO: origin has one recognized option: skip flush. We don't. client.post("/storage_service/snapshots", queryParams); } /** * Takes the snapshot of a specific column family. A snapshot name must be * specified. * * @param keyspaceName * the keyspace which holds the specified column family * @param columnFamilyName * the column family to snapshot * @param tag * the tag given to the snapshot; may not be null or empty */ @Override public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException { log(" takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); if (keyspaceName == null) { throw new IOException("You must supply a keyspace name"); } if (columnFamilyName == null) { throw new IOException("You must supply a table name"); } if (tag == null || tag.equals("")) { throw new IOException("You must supply a snapshot name."); } queryParams.add("tag", tag); queryParams.add("kn", keyspaceName); queryParams.add("cf", columnFamilyName); client.post("/storage_service/snapshots", queryParams); } /** * Remove the snapshot with the given name from the given keyspaces. If no * tag is specified we will remove all snapshots. */ @Override public void clearSnapshot(String tag, String... keyspaceNames) throws IOException { log(" clearSnapshot(String tag, String... keyspaceNames) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "tag", tag); APIClient.set_query_param(queryParams, "kn", APIClient.join(keyspaceNames)); client.delete("/storage_service/snapshots", queryParams); } /** * Get the details of all the snapshot * * @return A map of snapshotName to all its details in Tabular form. */ @Override public Map getSnapshotDetails() { log(" getSnapshotDetails()"); return client.getMapStringSnapshotTabularDataValue("/storage_service/snapshots", null); } public Map>> getSnapshotKeyspaceColumnFamily() { JsonArray arr = client.getJsonArray("/storage_service/snapshots"); Map>> res = new HashMap>>(); for (int i = 0; i < arr.size(); i++) { JsonObject obj = arr.getJsonObject(i); Map> kscf = new HashMap>(); JsonArray snapshots = obj.getJsonArray("value"); for (int j = 0; j < snapshots.size(); j++) { JsonObject s = snapshots.getJsonObject(j); String ks = s.getString("ks"); String cf = s.getString("cf"); if (!kscf.containsKey(ks)) { kscf.put(ks, new HashSet()); } kscf.get(ks).add(cf); } res.put(obj.getString("key"), kscf); } return res; } /** * Get the true size taken by all snapshots across all keyspaces. * * @return True size taken by all the snapshots. */ @Override public long trueSnapshotsSize() { log(" trueSnapshotsSize()"); return client.getLongValue("/storage_service/snapshots/size/true"); } /** * Forces major compaction of a single keyspace */ @Override public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); client.post("/storage_service/keyspace_compaction/" + keyspaceName, queryParams); } @Override public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException { // TODO: actually handle token ranges. forceKeyspaceCompaction(keyspaceName, tableNames); } /** * Trigger a cleanup of keys on a single keyspace */ @Override public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); return client.postInt("/storage_service/keyspace_cleanup/" + keyspaceName, queryParams); } /** * Scrub (deserialize + reserialize at the latest version, skipping bad rows * if any) the given keyspace. If columnFamilies array is empty, all CFs are * scrubbed. * * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); } @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" scrub(boolean disableSnapshot, boolean skipCorrupted, bool checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_bool_query_param(queryParams, "disable_snapshot", disableSnapshot); APIClient.set_bool_query_param(queryParams, "skip_corrupted", skipCorrupted); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); return client.getIntValue("/storage_service/keyspace_scrub/" + keyspaceName, queryParams); } /** * Rewrite all sstables to the latest version. Unlike scrub, it doesn't skip * bad rows and do not snapshot sstables first. */ @Override public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_bool_query_param(queryParams, "exclude_current_version", excludeCurrentVersion); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); return client.getIntValue("/storage_service/keyspace_upgrade_sstables/" + keyspaceName, queryParams); } /** * Flush all memtables for the given column families, or all columnfamilies * for the given keyspace if none are explicitly listed. * * @param keyspaceName * @param columnFamilies * @throws IOException */ @Override public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); client.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams); } private class CheckRepair extends TimerTask { @SuppressWarnings("unused") private int id; private String keyspace; private String message; private MultivaluedMap queryParams = new MultivaluedHashMap(); private int cmd; private final boolean legacy; public CheckRepair(int id, String keyspace, boolean legacy) { this.id = id; this.keyspace = keyspace; this.legacy = legacy; APIClient.set_query_param(queryParams, "id", Integer.toString(id)); message = String.format("Repair session %d ", id); // The returned id is the command number this.cmd = id; } @Override public void run() { String status = client.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); if (!status.equals("RUNNING")) { cancel(); if (status.equals("SUCCESSFUL")) { sendMessage(cmd, RepairStatus.SESSION_SUCCESS, message, legacy); } else { sendMessage(cmd, RepairStatus.SESSION_FAILED, message + "failed", legacy); } sendMessage(cmd, RepairStatus.FINISHED, message + "finished", legacy); } } } /** * Sends JMX notification to subscribers. * * @param type * Message type * @param message * Message itself * @param userObject * Arbitrary object to attach to notification */ public void sendNotification(String type, String message, Object userObject) { Notification jmxNotification = new Notification(type, getBoundName(), notificationSerialNumber.incrementAndGet(), message); jmxNotification.setUserData(userObject); sendNotification(jmxNotification); } public String getRepairMessage(final int cmd, final String keyspace, final int ranges_size, final RepairParallelism parallelismDegree, final boolean fullRepair) { return String.format( "Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", cmd, ranges_size, keyspace, parallelismDegree, fullRepair); } /** * * @param repair */ private int waitAndNotifyRepair(int cmd, String keyspace, String message, boolean legacy) { logger.finest(message); sendMessage(cmd, RepairStatus.STARTED, message, legacy); TimerTask taskToExecute = new CheckRepair(cmd, keyspace, legacy); timer.schedule(taskToExecute, 100, 1000); return cmd; } // See org.apache.cassandra.utils.progress.ProgressEventType private static enum ProgressEventType { START, PROGRESS, ERROR, ABORT, SUCCESS, COMPLETE, NOTIFICATION } private void sendMessage(int cmd, RepairStatus status, String message, boolean legacy) { String tag = "repair:" + cmd; ProgressEventType type = ProgressEventType.ERROR; int total = 100; int count = 0; switch (status) { case STARTED: type = ProgressEventType.START; break; case FINISHED: type = ProgressEventType.COMPLETE; count = 100; break; case SESSION_SUCCESS: type = ProgressEventType.SUCCESS; count = 100; break; default: break; } Notification jmxNotification = new Notification("progress", tag, notificationSerialNumber.incrementAndGet(), message); Map userData = new HashMap<>(); userData.put("type", type.ordinal()); userData.put("progressCount", count); userData.put("total", total); jmxNotification.setUserData(userData); sendNotification(jmxNotification); if (legacy) { sendNotification("repair", message, new int[] { cmd, status.ordinal() }); } } /** * Invoke repair asynchronously. You can track repair progress by * subscribing JMX notification sent from this StorageServiceMBean. * Notification format is: type: "repair" userObject: int array of length 2, * [0]=command number, [1]=ordinal of AntiEntropyService.Status * * @param keyspace * Keyspace name to repair. Should not be null. * @param options * repair option. * @return Repair command number, or 0 if nothing to repair */ @Override public int repairAsync(String keyspace, Map options) { return repairAsync(keyspace, options, false); } @SuppressWarnings("unused") private static final String PARALLELISM_KEY = "parallelism"; private static final String PRIMARY_RANGE_KEY = "primaryRange"; @SuppressWarnings("unused") private static final String INCREMENTAL_KEY = "incremental"; @SuppressWarnings("unused") private static final String JOB_THREADS_KEY = "jobThreads"; private static final String RANGES_KEY = "ranges"; private static final String COLUMNFAMILIES_KEY = "columnFamilies"; private static final String DATACENTERS_KEY = "dataCenters"; private static final String HOSTS_KEY = "hosts"; @SuppressWarnings("unused") private static final String TRACE_KEY = "trace"; private int repairAsync(String keyspace, Map options, boolean legacy) { log(" repairAsync(String keyspace, Map options)"); MultivaluedMap queryParams = new MultivaluedHashMap(); for (String op : options.keySet()) { APIClient.set_query_param(queryParams, op, options.get(op)); } int cmd = client.postInt("/storage_service/repair_async/" + keyspace, queryParams); waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true), legacy); return cmd; } private static String commaSeparated(Collection c) { String s = c.toString(); return s.substring(1, s.length() - 1); } private int repairRangeAsync(String beginToken, String endToken, String keyspaceName, Boolean isSequential, Collection dataCenters, Collection hosts, Boolean primaryRange, Boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException"); Map options = new HashMap(); if (beginToken != null && endToken != null) { options.put(RANGES_KEY, beginToken + ":" + endToken); } if (dataCenters != null) { options.put(DATACENTERS_KEY, commaSeparated(dataCenters)); } if (hosts != null) { options.put(HOSTS_KEY, commaSeparated(hosts)); } if (columnFamilies != null && columnFamilies.length != 0) { options.put(COLUMNFAMILIES_KEY, commaSeparated(asList(columnFamilies))); } if (primaryRange != null) { options.put(PRIMARY_RANGE_KEY, primaryRange.toString()); } return repairAsync(keyspaceName, options, true); } @Override @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException { log(" forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException"); return repairRangeAsync(null, null, keyspace, isSequential, dataCenters, hosts, primaryRange, repairedAt, columnFamilies); } @Override @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException"); return repairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, hosts, null, repairedAt, columnFamilies); } @Override @Deprecated public int forceRepairAsync(String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); return repairRangeAsync(null, null, keyspaceName, isSequential, null, null, primaryRange, null, columnFamilies); } @Override @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies)"); return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, null, null, repairedAt, columnFamilies); } @Override public void forceTerminateAllRepairSessions() { log(" forceTerminateAllRepairSessions()"); client.post("/storage_service/force_terminate"); } /** * transfer this node's data to other machines and remove it from service. */ @Override public void decommission() throws InterruptedException { log(" decommission() throws InterruptedException"); client.post("/storage_service/decommission"); } /** * @param newToken * token to move this node to. This node will unload its data * onto its neighbors, and bootstrap to the new token. */ @Override public void move(String newToken) throws IOException { log(" move(String newToken) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "new_token", newToken); client.post("/storage_service/move", queryParams); } /** * removeToken removes token (and all data associated with enpoint that had * it) from the ring * * @param hostIdString * the host id to remove */ @Override public void removeNode(String hostIdString) { log(" removeNode(String token)"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "host_id", hostIdString); client.post("/storage_service/remove_node", queryParams); } /** * Get the status of a token removal. */ @Override public String getRemovalStatus() { log(" getRemovalStatus()"); return client.getStringValue("/storage_service/removal_status"); } /** * Force a remove operation to finish. */ @Override public void forceRemoveCompletion() { log(" forceRemoveCompletion()"); client.post("/storage_service/force_remove_completion"); } /** * set the logging level at runtime
*
* If both classQualifer and level are empty/null, it will reload the * configuration to reset.
* If classQualifer is not empty but level is empty/null, it will set the * level to null for the defined classQualifer
* If level cannot be parsed, then the level will be defaulted to DEBUG
*
* The logback configuration should have < jmxConfigurator /> set * * @param classQualifier * The logger's classQualifer * @param level * The log level * @throws Exception * * @see ch.qos.logback.classic.Level#toLevel(String) */ @Override public void setLoggingLevel(String classQualifier, String level) throws Exception { log(" setLoggingLevel(String classQualifier, String level) throws Exception"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "level", level); client.post("/system/logger/" + classQualifier, queryParams); } /** get the runtime logging levels */ @Override public Map getLoggingLevels() { log(" getLoggingLevels()"); return client.getMapStrValue("/storage_service/logging_level"); } /** * get the operational mode (leaving, joining, normal, decommissioned, * client) **/ @Override public String getOperationMode() { log(" getOperationMode()"); return client.getStringValue("/storage_service/operation_mode"); } /** Returns whether the storage service is starting or not */ @Override public boolean isStarting() { log(" isStarting()"); return client.getBooleanValue("/storage_service/is_starting"); } /** get the progress of a drain operation */ @Override public String getDrainProgress() { log(" getDrainProgress()"); // FIXME // This is a workaround so the nodetool would work // it should be revert when the drain progress will be implemented // return c.getStringValue("/storage_service/drain"); return String.format("Drained %s/%s ColumnFamilies", 0, 0); } /** * makes node unavailable for writes, flushes memtables and replays * commitlog. */ @Override public void drain() throws IOException, InterruptedException, ExecutionException { log(" drain() throws IOException, InterruptedException, ExecutionException"); client.post("/storage_service/drain"); } /** * Truncates (deletes) the given columnFamily from the provided keyspace. * Calling truncate results in actual deletion of all data in the cluster * under the given columnFamily and it will fail unless all hosts are up. * All data in the given column family will be deleted, but its definition * will not be affected. * * @param keyspace * The keyspace to delete from * @param columnFamily * The column family to delete data from. */ @Override public void truncate(String keyspace, String columnFamily) throws TimeoutException, IOException { log(" truncate(String keyspace, String columnFamily)throws TimeoutException, IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", columnFamily); client.post("/storage_service/truncate/" + keyspace, queryParams); } /** * given a list of tokens (representing the nodes in the cluster), returns a * mapping from "token -> %age of cluster owned by that token" */ @Override public Map getOwnership() { log(" getOwnership()"); return client.getMapInetAddressFloatValue("/storage_service/ownership/"); } /** * Effective ownership is % of the data each node owns given the keyspace we * calculate the percentage using replication factor. If Keyspace == null, * this method will try to verify if all the keyspaces in the cluster have * the same replication strategies and if yes then we will use the first * else a empty Map is returned. */ @Override public Map effectiveOwnership(String keyspace) throws IllegalStateException { log(" effectiveOwnership(String keyspace) throws IllegalStateException"); try { return client.getMapInetAddressFloatValue("/storage_service/ownership/" + keyspace); } catch (Exception e) { throw new IllegalStateException( "Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless"); } } @Override public List getKeyspaces() { log(" getKeyspaces()"); return client.getListStrValue("/storage_service/keyspaces"); } public Map> getColumnFamilyPerKeyspace() { Map> res = new HashMap>(); JsonArray mbeans = client.getJsonArray("/column_family/"); for (int i = 0; i < mbeans.size(); i++) { JsonObject mbean = mbeans.getJsonObject(i); String ks = mbean.getString("ks"); String cf = mbean.getString("cf"); if (!res.containsKey(ks)) { res.put(ks, new HashSet()); } res.get(ks).add(cf); } return res; } @Override public List getNonSystemKeyspaces() { log(" getNonSystemKeyspaces()"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("type", "user"); return client.getListStrValue("/storage_service/keyspaces", queryParams); } @Override public Map getViewBuildStatuses(String keyspace, String view) { log(" getViewBuildStatuses()"); return client.getMapStrValue("storage_service/view_build_statuses/" + keyspace + "/" + view); } /** * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at * runtime * * @param epSnitchClassName * the canonical path name for a class implementing * IEndpointSnitch * @param dynamic * boolean that decides whether dynamicsnitch is used or not * @param dynamicUpdateInterval * integer, in ms (default 100) * @param dynamicResetInterval * integer, in ms (default 600,000) * @param dynamicBadnessThreshold * double, (default 0.0) */ @Override public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { log(" updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_bool_query_param(queryParams, "dynamic", dynamic); APIClient.set_query_param(queryParams, "epSnitchClassName", epSnitchClassName); if (dynamicUpdateInterval != null) { queryParams.add("dynamic_update_interval", dynamicUpdateInterval.toString()); } if (dynamicResetInterval != null) { queryParams.add("dynamic_reset_interval", dynamicResetInterval.toString()); } if (dynamicBadnessThreshold != null) { queryParams.add("dynamic_badness_threshold", dynamicBadnessThreshold.toString()); } client.post("/storage_service/update_snitch", queryParams); } // allows a user to forcibly 'kill' a sick node @Override public void stopGossiping() { log(" stopGossiping()"); client.delete("/storage_service/gossiping"); } // allows a user to recover a forcibly 'killed' node @Override public void startGossiping() { log(" startGossiping()"); client.post("/storage_service/gossiping"); } // allows a user to see whether gossip is running or not @Override public boolean isGossipRunning() { log(" isGossipRunning()"); return client.getBooleanValue("/storage_service/gossiping"); } // allows a user to forcibly completely stop cassandra @Override public void stopDaemon() { log(" stopDaemon()"); client.post("/storage_service/stop_daemon"); } // to determine if gossip is disabled @Override public boolean isInitialized() { log(" isInitialized()"); return client.getBooleanValue("/storage_service/is_initialized"); } // allows a user to disable thrift @Override public void stopRPCServer() { log(" stopRPCServer()"); client.delete("/storage_service/rpc_server"); } // allows a user to reenable thrift @Override public void startRPCServer() { log(" startRPCServer()"); client.post("/storage_service/rpc_server"); } // to determine if thrift is running @Override public boolean isRPCServerRunning() { log(" isRPCServerRunning()"); return client.getBooleanValue("/storage_service/rpc_server"); } @Override public void stopNativeTransport() { log(" stopNativeTransport()"); client.delete("/storage_service/native_transport"); } @Override public void startNativeTransport() { log(" startNativeTransport()"); client.post("/storage_service/native_transport"); } @Override public boolean isNativeTransportRunning() { log(" isNativeTransportRunning()"); return client.getBooleanValue("/storage_service/native_transport"); } // allows a node that have been started without joining the ring to join it @Override public void joinRing() throws IOException { log(" joinRing() throws IOException"); client.post("/storage_service/join_ring"); } @Override public boolean isJoined() { log(" isJoined()"); return client.getBooleanValue("/storage_service/join_ring"); } @Override public void setStreamThroughputMbPerSec(int value) { log(" setStreamThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Integer.toString(value)); client.post("/storage_service/stream_throughput", queryParams); } @Override public int getStreamThroughputMbPerSec() { log(" getStreamThroughputMbPerSec()"); return client.getIntValue("/storage_service/stream_throughput"); } public int getCompactionThroughputMbPerSec() { log(" getCompactionThroughputMbPerSec()"); return client.getIntValue("/storage_service/compaction_throughput"); } @Override public void setCompactionThroughputMbPerSec(int value) { log(" setCompactionThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Integer.toString(value)); client.post("/storage_service/compaction_throughput", queryParams); } @Override public boolean isIncrementalBackupsEnabled() { log(" isIncrementalBackupsEnabled()"); return client.getBooleanValue("/storage_service/incremental_backups"); } @Override public void setIncrementalBackupsEnabled(boolean value) { log(" setIncrementalBackupsEnabled(boolean value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Boolean.toString(value)); client.post("/storage_service/incremental_backups", queryParams); } /** * Initiate a process of streaming data for which we are responsible from * other nodes. It is similar to bootstrap except meant to be used on a node * which is already in the cluster (typically containing no data) as an * alternative to running repair. * * @param sourceDc * Name of DC from which to select sources for streaming or null * to pick any node */ @Override public void rebuild(String sourceDc) { rebuild(sourceDc, null, null, null); } /** * Same as {@link #rebuild(String)}, but only for specified keyspace and * ranges. * * @param sourceDc * Name of DC from which to select sources for streaming or null * to pick any node * @param keyspace * Name of the keyspace which to rebuild or null to rebuild all * keyspaces. * @param tokens * Range of tokens to rebuild or null to rebuild all token * ranges. In the format of: * "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]" */ @Override public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources) { log(" rebuild(String sourceDc, String keyspace, String tokens, String specificSources)"); if (keyspace != null) { throw new UnsupportedOperationException("Rebuild: 'keyspace' not yet supported"); } if (tokens != null) { throw new UnsupportedOperationException("Rebuild: 'token range' not yet supported"); } if (specificSources != null) { throw new UnsupportedOperationException("Rebuild: 'specific sources' not yet supported"); } if (sourceDc != null) { MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "source_dc", sourceDc); client.post("/storage_service/rebuild", queryParams); } else { client.post("/storage_service/rebuild"); } } /** Starts a bulk load and blocks until it completes. */ @Override public void bulkLoad(String directory) { log(" bulkLoad(String directory)"); client.post("/storage_service/bulk_load/" + directory); } /** * Starts a bulk load asynchronously and returns the String representation * of the planID for the new streaming session. */ @Override public String bulkLoadAsync(String directory) { log(" bulkLoadAsync(String directory)"); return client.getStringValue("/storage_service/bulk_load_async/" + directory); } @Override public void rescheduleFailedDeletions() { log(" rescheduleFailedDeletions()"); client.post("/storage_service/reschedule_failed_deletions"); } /** * Load new SSTables to the given keyspace/columnFamily * * @param ksName * The parent keyspace name * @param cfName * The ColumnFamily name where SSTables belong */ @Override public void loadNewSSTables(String ksName, String cfName) { log(" loadNewSSTables(String ksName, String cfName)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("cf", cfName); client.post("/storage_service/sstables/" + ksName, queryParams); } /** * Return a List of Tokens representing a sample of keys across all * ColumnFamilyStores. * * Note: this should be left as an operation, not an attribute (methods * starting with "get") to avoid sending potentially multiple MB of data * when accessing this mbean by default. See CASSANDRA-4452. * * @return set of Tokens as Strings */ @Override public List sampleKeyRange() { log(" sampleKeyRange()"); return client.getListStrValue("/storage_service/sample_key_range"); } /** * rebuild the specified indexes */ @Override public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { log(" rebuildSecondaryIndex(String ksName, String cfName, String... idxNames)"); } @Override public void resetLocalSchema() throws IOException { log(" resetLocalSchema() throws IOException"); client.post("/storage_service/relocal_schema"); } /** * Enables/Disables tracing for the whole system. Only thrift requests can * start tracing currently. * * @param probability * ]0,1[ will enable tracing on a partial number of requests with * the provided probability. 0 will disable tracing and 1 will * enable tracing for all requests (which mich severely cripple * the system) */ @Override public void setTraceProbability(double probability) { log(" setTraceProbability(double probability)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("probability", Double.toString(probability)); client.post("/storage_service/trace_probability", queryParams); } /** * Returns the configured tracing probability. */ @Override public double getTraceProbability() { log(" getTraceProbability()"); return client.getDoubleValue("/storage_service/trace_probability"); } @Override public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("disableAutoCompaction(String ks, String... columnFamilies)"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); client.delete("/storage_service/auto_compaction/" + ks, queryParams); } @Override public void enableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("enableAutoCompaction(String ks, String... columnFamilies)"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); client.post("/storage_service/auto_compaction/" + ks, queryParams); } @Override public void deliverHints(String host) throws UnknownHostException { log(" deliverHints(String host) throws UnknownHostException"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("host", host); client.post("/storage_service/deliver_hints", queryParams); } /** Returns the name of the cluster */ @Override public String getClusterName() { log(" getClusterName()"); return client.getStringValue("/storage_service/cluster_name"); } /** Returns the cluster partitioner */ @Override public String getPartitionerName() { log(" getPartitionerName()"); return client.getStringValue("/storage_service/partitioner_name"); } /** Returns the threshold for warning of queries with many tombstones */ @Override public int getTombstoneWarnThreshold() { log(" getTombstoneWarnThreshold()"); return client.getIntValue("/storage_service/tombstone_warn_threshold"); } /** Sets the threshold for warning queries with many tombstones */ @Override public void setTombstoneWarnThreshold(int tombstoneDebugThreshold) { log(" setTombstoneWarnThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("debug_threshold", Integer.toString(tombstoneDebugThreshold)); client.post("/storage_service/tombstone_warn_threshold", queryParams); } /** Returns the threshold for abandoning queries with many tombstones */ @Override public int getTombstoneFailureThreshold() { log(" getTombstoneFailureThreshold()"); return client.getIntValue("/storage_service/tombstone_failure_threshold"); } /** Sets the threshold for abandoning queries with many tombstones */ @Override public void setTombstoneFailureThreshold(int tombstoneDebugThreshold) { log(" setTombstoneFailureThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("debug_threshold", Integer.toString(tombstoneDebugThreshold)); client.post("/storage_service/tombstone_failure_threshold", queryParams); } /** Returns the threshold for rejecting queries due to a large batch size */ @Override public int getBatchSizeFailureThreshold() { log(" getBatchSizeFailureThreshold()"); return client.getIntValue("/storage_service/batch_size_failure_threshold"); } /** Sets the threshold for rejecting queries due to a large batch size */ @Override public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold) { log(" setBatchSizeFailureThreshold(int batchSizeDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("threshold", Integer.toString(batchSizeDebugThreshold)); client.post("/storage_service/batch_size_failure_threshold", queryParams); } /** * Sets the hinted handoff throttle in kb per second, per delivery thread. */ @Override public void setHintedHandoffThrottleInKB(int throttleInKB) { log(" setHintedHandoffThrottleInKB(int throttleInKB)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("throttle", Integer.toString(throttleInKB)); client.post("/storage_service/hinted_handoff", queryParams); } @Override public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList) throws IOException { log(" takeMultipleColumnFamilySnapshot"); Map> keyspaceColumnfamily = new HashMap>(); Map> kss = getColumnFamilyPerKeyspace(); Map>> snapshots = getSnapshotKeyspaceColumnFamily(); for (String columnFamily : columnFamilyList) { String splittedString[] = columnFamily.split("\\."); if (splittedString.length == 2) { String keyspaceName = splittedString[0]; String columnFamilyName = splittedString[1]; if (keyspaceName == null) { throw new IOException("You must supply a keyspace name"); } if (columnFamilyName == null) { throw new IOException("You must supply a column family name"); } if (tag == null || tag.equals("")) { throw new IOException("You must supply a snapshot name."); } if (!kss.containsKey(keyspaceName)) { throw new IOException("Keyspace " + keyspaceName + " does not exist"); } if (!kss.get(keyspaceName).contains(columnFamilyName)) { throw new IllegalArgumentException( String.format("Unknown keyspace/cf pair (%s.%s)", keyspaceName, columnFamilyName)); } // As there can be multiple column family from same keyspace // check if snapshot exist for that specific // columnfamily and not for whole keyspace if (snapshots.containsKey(tag) && snapshots.get(tag).containsKey(keyspaceName) && snapshots.get(tag).get(keyspaceName).contains(columnFamilyName)) { throw new IOException("Snapshot " + tag + " already exists."); } if (!keyspaceColumnfamily.containsKey(keyspaceName)) { keyspaceColumnfamily.put(keyspaceName, new ArrayList()); } // Add Keyspace columnfamily to map in order to support // atomicity for snapshot process. // So no snapshot should happen if any one of the above // conditions fail for any keyspace or columnfamily keyspaceColumnfamily.get(keyspaceName).add(columnFamilyName); } else { throw new IllegalArgumentException( "Cannot take a snapshot on secondary index or invalid column family name. You must supply a column family name in the form of keyspace.columnfamily"); } } for (Entry> entry : keyspaceColumnfamily.entrySet()) { for (String columnFamily : entry.getValue()) { takeColumnFamilySnapshot(entry.getKey(), columnFamily, tag); } } } @Override public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRange, fullRepair, columnFamilies)"); Map options = new HashMap(); Joiner commas = Joiner.on(","); options.put("parallelism", Integer.toString(parallelismDegree)); if (dataCenters != null) { options.put("dataCenters", commas.join(dataCenters)); } if (hosts != null) { options.put("hosts", commas.join(hosts)); } options.put("primaryRange", Boolean.toString(primaryRange)); options.put("incremental", Boolean.toString(!fullRepair)); if (columnFamilies != null && columnFamilies.length > 0) { options.put("columnFamilies", commas.join(columnFamilies)); } return repairAsync(keyspace, options); } @Override public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies) { log(" forceRepairRangeAsync(beginToken, endToken, keyspaceName, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)"); Map options = new HashMap(); Joiner commas = Joiner.on(","); options.put("parallelism", Integer.toString(parallelismDegree)); if (dataCenters != null) { options.put("dataCenters", commas.join(dataCenters)); } if (hosts != null) { options.put("hosts", commas.join(hosts)); } options.put("incremental", Boolean.toString(!fullRepair)); options.put("startToken", beginToken); options.put("endToken", endToken); return repairAsync(keyspaceName, options); } @Override public Map getEndpointToHostId() { return getHostIdMap(); } @Override public Map getHostIdToEndpoint() { return getHostIdToAddressMap(); } @Override public void refreshSizeEstimates() throws ExecutionException { // TODO Auto-generated method stub log(" refreshSizeEstimates"); } @Override public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { // "splitOutput" afaik not relevant for scylla (yet?...) forceKeyspaceCompaction(keyspaceName, tableNames); } @Override public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException { // "jobs" not (yet) relevant for scylla. (though possibly useful...) return forceKeyspaceCleanup(keyspaceName, tables); } @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { // "jobs" not (yet) relevant for scylla. (though possibly useful...) return scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); } @Override public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { // TODO Auto-generated method stub log(" verify"); return 0; } @Override public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException { // "jobs" not (yet) relevant for scylla. (though possibly useful...) return upgradeSSTables(keyspaceName, excludeCurrentVersion, tableNames); } @Override public List getNonLocalStrategyKeyspaces() { log(" getNonLocalStrategyKeyspaces"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("type", "non_local_strategy"); return client.getListStrValue("/storage_service/keyspaces", queryParams); } @Override public void setInterDCStreamThroughputMbPerSec(int value) { // TODO Auto-generated method stub log(" setInterDCStreamThroughputMbPerSec"); } @Override public int getInterDCStreamThroughputMbPerSec() { // TODO Auto-generated method stub log(" getInterDCStreamThroughputMbPerSec"); return 0; } @Override public boolean resumeBootstrap() { log(" resumeBootstrap"); return false; } @Override public List getSSTableInfo(String keyspace, String table) { if (keyspace == null && table != null) { throw new IllegalArgumentException("Missing keyspace name"); } MultivaluedMap queryParams = null; if (keyspace != null) { queryParams = new MultivaluedHashMap(); queryParams.add("keyspace", keyspace); } if (table != null) { queryParams.add("cf", table); } return client.get("/storage_service/sstable_info", queryParams) .get(new GenericType>() { }).stream().map((i) -> i.toCompositeData()).collect(Collectors.toList()); } @Override public List getSSTableInfo() { return getSSTableInfo(null, null); } @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_bool_query_param(queryParams, "disable_snapshot", disableSnapshot); APIClient.set_bool_query_param(queryParams, "skip_corrupted", skipCorrupted); APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); return client.getIntValue("/storage_service/keyspace_scrub/" + keyspaceName, queryParams); } }