Storage service: Fix 3.x style notifications (repair)

This commit is contained in:
Calle Wilund 2016-10-17 11:34:21 +00:00
parent 4ec7d58249
commit 4ed049739a
1 changed files with 119 additions and 24 deletions

View File

@ -22,6 +22,8 @@
*/
package org.apache.cassandra.service;
import static java.util.Arrays.asList;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -672,10 +674,12 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
private String message;
private MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<String, String>();
private int cmd;
private final boolean legacy;
public CheckRepair(int id, String keyspace) {
public CheckRepair(int id, String keyspace, boolean legacy) {
this.id = id;
this.keyspace = keyspace;
this.legacy = legacy;
APIClient.set_query_param(queryParams, "id", Integer.toString(id));
message = String.format("Repair session %d ", id);
// The returned id is the command number
@ -687,11 +691,12 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
String status = client.getStringValue("/storage_service/repair_async/" + keyspace, queryParams);
if (!status.equals("RUNNING")) {
cancel();
if (!status.equals("SUCCESSFUL")) {
sendNotification("repair", message + "failed",
new int[] { cmd, RepairStatus.SESSION_FAILED.ordinal() });
if (status.equals("SUCCESSFUL")) {
sendMessage(cmd, RepairStatus.SESSION_SUCCESS, message, legacy);
} else {
sendMessage(cmd, RepairStatus.SESSION_FAILED, message + "failed", legacy);
}
sendNotification("repair", message + "finished", new int[] { cmd, RepairStatus.FINISHED.ordinal() });
sendMessage(cmd, RepairStatus.FINISHED, message + "finished", legacy);
}
}
@ -725,14 +730,57 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
*
* @param repair
*/
public int waitAndNotifyRepair(int cmd, String keyspace, String message) {
private int waitAndNotifyRepair(int cmd, String keyspace, String message, boolean legacy) {
logger.finest(message);
sendNotification("repair", message, new int[] { cmd, RepairStatus.STARTED.ordinal() });
TimerTask taskToExecute = new CheckRepair(cmd, keyspace);
sendMessage(cmd, RepairStatus.STARTED, message, legacy);
TimerTask taskToExecute = new CheckRepair(cmd, keyspace, legacy);
timer.schedule(taskToExecute, 100, 1000);
return cmd;
}
// See org.apache.cassandra.utils.progress.ProgressEventType
private static enum ProgressEventType {
START, PROGRESS, ERROR, ABORT, SUCCESS, COMPLETE, NOTIFICATION
}
private void sendMessage(int cmd, RepairStatus status, String message, boolean legacy) {
String tag = "repair:" + cmd;
ProgressEventType type = ProgressEventType.ERROR;
int total = 100;
int count = 0;
switch (status) {
case STARTED:
type = ProgressEventType.START;
break;
case FINISHED:
type = ProgressEventType.COMPLETE;
count = 100;
break;
case SESSION_SUCCESS:
type = ProgressEventType.SUCCESS;
count = 100;
break;
default:
break;
}
Notification jmxNotification = new Notification("progress", tag, notificationSerialNumber.incrementAndGet(),
message);
Map<String, Integer> userData = new HashMap<>();
userData.put("type", type.ordinal());
userData.put("progressCount", count);
userData.put("total", total);
jmxNotification.setUserData(userData);
sendNotification(jmxNotification);
if (legacy) {
sendNotification("repair", message, new int[] { cmd, status.ordinal() });
}
}
/**
* Invoke repair asynchronously. You can track repair progress by
* subscribing JMX notification sent from this StorageServiceMBean.
@ -747,6 +795,24 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
*/
@Override
public int repairAsync(String keyspace, Map<String, String> options) {
return repairAsync(keyspace, options, false);
}
@SuppressWarnings("unused")
private static final String PARALLELISM_KEY = "parallelism";
private static final String PRIMARY_RANGE_KEY = "primaryRange";
@SuppressWarnings("unused")
private static final String INCREMENTAL_KEY = "incremental";
@SuppressWarnings("unused")
private static final String JOB_THREADS_KEY = "jobThreads";
private static final String RANGES_KEY = "ranges";
private static final String COLUMNFAMILIES_KEY = "columnFamilies";
private static final String DATACENTERS_KEY = "dataCenters";
private static final String HOSTS_KEY = "hosts";
@SuppressWarnings("unused")
private static final String TRACE_KEY = "trace";
private int repairAsync(String keyspace, Map<String, String> options, boolean legacy) {
log(" repairAsync(String keyspace, Map<String, String> options)");
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<String, String>();
for (String op : options.keySet()) {
@ -754,38 +820,66 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
}
int cmd = client.postInt("/storage_service/repair_async/" + keyspace, queryParams);
waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true));
waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true),
legacy);
return cmd;
}
private static String commaSeparated(Collection<?> c) {
String s = c.toString();
return s.substring(1, s.length() - 1);
}
private int repairRangeAsync(String beginToken, String endToken, String keyspaceName, Boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, Boolean primaryRange, Boolean repairedAt,
String... columnFamilies) {
log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws IOException");
Map<String, String> options = new HashMap<String, String>();
if (beginToken != null && endToken != null) {
options.put(RANGES_KEY, beginToken + ":" + endToken);
}
if (dataCenters != null) {
options.put(DATACENTERS_KEY, commaSeparated(dataCenters));
}
if (hosts != null) {
options.put(HOSTS_KEY, commaSeparated(hosts));
}
if (columnFamilies != null && columnFamilies.length != 0) {
options.put(COLUMNFAMILIES_KEY, commaSeparated(asList(columnFamilies)));
}
if (primaryRange != null) {
options.put(PRIMARY_RANGE_KEY, primaryRange.toString());
}
return repairAsync(keyspaceName, options, true);
}
@Override
@Deprecated
public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies)
throws IOException {
throws IOException {
log(" forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
public int forceRepairAsync(String keyspace) {
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
return repairRangeAsync(null, null, keyspace, isSequential, dataCenters, hosts, primaryRange, repairedAt,
columnFamilies);
}
@Override
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies)
throws IOException {
Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) {
log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws IOException");
return client.getIntValue("");
return repairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, hosts, null, repairedAt,
columnFamilies);
}
@Override
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange,
@Deprecated
public int forceRepairAsync(String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange,
boolean fullRepair, String... columnFamilies) {
log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
return repairRangeAsync(null, null, keyspaceName, isSequential, null, null, primaryRange, null, columnFamilies);
}
@Override
@ -793,7 +887,8 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean,
public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential,
boolean isLocal, boolean repairedAt, String... columnFamilies) {
log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies)");
return client.getIntValue("");
return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, null, null, repairedAt,
columnFamilies);
}
@Override