Merge "Adding notification for the repair command" from Amnon

"The API is currently not supporting notification, but the repair
 command that perform via the nodetool relays on the notification to know
 when the command terminate.

 This series adds support for the repair notification, based on a timer
 and periodically check if a current repair command was terminated."
This commit is contained in:
Pekka Enberg 2015-08-31 09:41:01 +03:00
commit db6d48a695
4 changed files with 120 additions and 17 deletions

View File

@ -77,6 +77,21 @@ public class APIClient {
post(path, null);
}
public String postGetVal(String path, MultivaluedMap<String, String> queryParams) {
if (queryParams != null) {
return get(path, queryParams).post(String.class);
}
return get(path).post(String.class);
}
public int postInt(String path, MultivaluedMap<String, String> queryParams) {
return Integer.parseInt(postGetVal(path, queryParams));
}
public int postInt(String path) {
return postInt(path, null);
}
public void delete(String path, MultivaluedMap<String, String> queryParams) {
if (queryParams != null) {
get(path, queryParams).delete();

View File

@ -22,6 +22,19 @@
*/
package org.apache.cassandra.repair;
public class RepairParallelism {
public enum RepairParallelism {
/**
* One node at a time
*/
SEQUENTIAL,
/**
* All nodes at the same time
*/
PARALLEL,
/**
* One node per data center at a time
*/
DATACENTER_AWARE
}

View File

@ -29,6 +29,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.*;
import javax.management.openmbean.TabularData;
@ -36,7 +37,6 @@ import javax.ws.rs.core.MultivaluedMap;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.repair.RepairParallelism;
import com.cloudius.urchin.api.APIClient;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@ -51,7 +51,7 @@ public class StorageService extends NotificationBroadcasterSupport
.getLogger(StorageService.class.getName());
private APIClient c = new APIClient();
private static Timer timer = new Timer("Storage Service Repair");
private StorageMetrics metrics = new StorageMetrics();
public static final StorageService instance = new StorageService();
@ -60,6 +60,14 @@ public class StorageService extends NotificationBroadcasterSupport
return instance;
}
public static enum RepairStatus
{
STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
}
/* JMX notification serial number counter */
private final AtomicLong notificationSerialNumber = new AtomicLong();
private final ObjectName jmxObjectName;
public StorageService() {
@ -513,6 +521,72 @@ public class StorageService extends NotificationBroadcasterSupport
c.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams);
}
class CheckRepair extends TimerTask {
private APIClient c = new APIClient();
int id;
String keyspace;
String message;
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
int cmd;
public CheckRepair(int id, String keyspace) {
this.id = id;
this.keyspace = keyspace;
APIClient.set_query_param(queryParams, "id", Integer.toString(id));
message = String.format("Repair session %d ", id);
// The returned id is the command number
this.cmd = id;
}
@Override
public void run() {
String status = c.getStringValue("/storage_service/repair_async/" + keyspace, queryParams);
if (!status.equals("RUNNING")) {
cancel();
if (!status.equals("SUCCESSFUL")) {
sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.SESSION_SUCCESS.ordinal()});
} else {
sendNotification("repair", message + "failed", new int[]{cmd, RepairStatus.SESSION_FAILED.ordinal()});
}
sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.FINISHED.ordinal()});
}
}
}
/**
* Sends JMX notification to subscribers.
*
* @param type Message type
* @param message Message itself
* @param userObject Arbitrary object to attach to notification
*/
public void sendNotification(String type, String message, Object userObject)
{
Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
jmxNotification.setUserData(userObject);
sendNotification(jmxNotification);
}
public String getRepairMessage(final int cmd,
final String keyspace,
final int ranges_size,
final RepairParallelism parallelismDegree,
final boolean fullRepair) {
return String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)",
cmd, ranges_size, keyspace, parallelismDegree, fullRepair);
}
/**
*
* @param repair
*/
public int waitAndNotifyRepair(int cmd, String keyspace, String message) {
logger.info(message);
sendNotification("repair", message, new int[]{cmd, RepairStatus.STARTED.ordinal()});
TimerTask taskToExecute = new CheckRepair(cmd, keyspace);
timer.schedule(taskToExecute, 100, 1000);
return cmd;
}
/**
* Invoke repair asynchronously. You can track repair progress by
* subscribing JMX notification sent from this StorageServiceMBean.
@ -536,28 +610,26 @@ public class StorageService extends NotificationBroadcasterSupport
opts = opts + op + "=" + options.get(op);
}
APIClient.set_query_param(queryParams, "options", opts);
return c.getIntValue("/storage_service/repair_async/" + keyspace);
int cmd = c.postInt("/storage_service/repair_async/" + keyspace);
waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true));
return cmd;
}
@Deprecated
@Override
public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean repairedAt, String... columnFamilies)
throws IOException {
log(" forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
public int forceRepairAsync(String keyspace,
RepairParallelism parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRange, boolean fullRepair,
String... columnFamilies) {
log(" forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)");
return c.getIntValue("");
public int forceRepairAsync(String keyspace) {
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
@ -575,12 +647,13 @@ public class StorageService extends NotificationBroadcasterSupport
return c.getIntValue("");
}
@Deprecated
@Override
public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange, boolean fullRepair,
String... columnFamilies) {
log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
@ -1146,7 +1219,8 @@ public class StorageService extends NotificationBroadcasterSupport
String... columnFamilies) {
// TODO Auto-generated method stub
log(" forceRepairAsync()");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Override

View File

@ -351,6 +351,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies);
public int forceRepairAsync(String keyspace);
/**
* Same as forceRepairAsync, but handles a specified range
*/