StorageService: Add notification suport for the repair command

This patch adds a notification support for the repair in StorageService.

When a repair command starts a timer is set to check the status of the
repair, when the repair complets it sends notification for the
successful or fail of the repair.

Because jconsole doesn't run method with variable number of parameters
an additional MBean method was added for async repair with only a
keyspace as its parameter.

All the forceAsycRepair methods are mapped to the asyncRepair metod that
replaces them in 2.2, when options will be supported in the repair, it
will be added to the jmx.

After this patch it is possible to register in the jconsole for
notification, perform a forceAsyncRepair and get the notification that
the repair complets
This commit is contained in:
Amnon Heiman 2015-08-13 09:53:47 +03:00
parent e96e62b6bd
commit 67dca4da9d
2 changed files with 91 additions and 16 deletions

View File

@ -29,6 +29,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.*;
import javax.management.openmbean.TabularData;
@ -36,7 +37,6 @@ import javax.ws.rs.core.MultivaluedMap;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.repair.RepairParallelism;
import com.cloudius.urchin.api.APIClient;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@ -51,7 +51,7 @@ public class StorageService extends NotificationBroadcasterSupport
.getLogger(StorageService.class.getName());
private APIClient c = new APIClient();
private static Timer timer = new Timer("Storage Service Repair");
private StorageMetrics metrics = new StorageMetrics();
public static final StorageService instance = new StorageService();
@ -60,6 +60,14 @@ public class StorageService extends NotificationBroadcasterSupport
return instance;
}
public static enum RepairStatus
{
STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
}
/* JMX notification serial number counter */
private final AtomicLong notificationSerialNumber = new AtomicLong();
private final ObjectName jmxObjectName;
public StorageService() {
@ -513,6 +521,72 @@ public class StorageService extends NotificationBroadcasterSupport
c.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams);
}
class CheckRepair extends TimerTask {
private APIClient c = new APIClient();
int id;
String keyspace;
String message;
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
int cmd;
public CheckRepair(int id, String keyspace) {
this.id = id;
this.keyspace = keyspace;
APIClient.set_query_param(queryParams, "id", Integer.toString(id));
message = String.format("Repair session %d ", id);
// The returned id is the command number
this.cmd = id;
}
@Override
public void run() {
String status = c.getStringValue("/storage_service/repair_async/" + keyspace, queryParams);
if (!status.equals("RUNNING")) {
cancel();
if (!status.equals("SUCCESSFUL")) {
sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.SESSION_SUCCESS.ordinal()});
} else {
sendNotification("repair", message + "failed", new int[]{cmd, RepairStatus.SESSION_FAILED.ordinal()});
}
sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.FINISHED.ordinal()});
}
}
}
/**
* Sends JMX notification to subscribers.
*
* @param type Message type
* @param message Message itself
* @param userObject Arbitrary object to attach to notification
*/
public void sendNotification(String type, String message, Object userObject)
{
Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
jmxNotification.setUserData(userObject);
sendNotification(jmxNotification);
}
public String getRepairMessage(final int cmd,
final String keyspace,
final int ranges_size,
final RepairParallelism parallelismDegree,
final boolean fullRepair) {
return String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)",
cmd, ranges_size, keyspace, parallelismDegree, fullRepair);
}
/**
*
* @param repair
*/
public int waitAndNotifyRepair(int cmd, String keyspace, String message) {
logger.info(message);
sendNotification("repair", message, new int[]{cmd, RepairStatus.STARTED.ordinal()});
TimerTask taskToExecute = new CheckRepair(cmd, keyspace);
timer.schedule(taskToExecute, 100, 1000);
return cmd;
}
/**
* Invoke repair asynchronously. You can track repair progress by
* subscribing JMX notification sent from this StorageServiceMBean.
@ -536,28 +610,26 @@ public class StorageService extends NotificationBroadcasterSupport
opts = opts + op + "=" + options.get(op);
}
APIClient.set_query_param(queryParams, "options", opts);
return c.getIntValue("/storage_service/repair_async/" + keyspace);
int cmd = c.postInt("/storage_service/repair_async/" + keyspace);
waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true));
return cmd;
}
@Deprecated
@Override
public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean repairedAt, String... columnFamilies)
throws IOException {
log(" forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
public int forceRepairAsync(String keyspace,
RepairParallelism parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRange, boolean fullRepair,
String... columnFamilies) {
log(" forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)");
return c.getIntValue("");
public int forceRepairAsync(String keyspace) {
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
@ -575,12 +647,13 @@ public class StorageService extends NotificationBroadcasterSupport
return c.getIntValue("");
}
@Deprecated
@Override
public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange, boolean fullRepair,
String... columnFamilies) {
log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Deprecated
@ -1144,7 +1217,8 @@ public class StorageService extends NotificationBroadcasterSupport
String... columnFamilies) {
// TODO Auto-generated method stub
log(" forceRepairAsync()");
return c.getIntValue("");
Map<String, String> options = new HashMap<String, String>();
return repairAsync(keyspace, options);
}
@Override

View File

@ -351,6 +351,7 @@ public interface StorageServiceMBean extends NotificationEmitter {
Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies);
public int forceRepairAsync(String keyspace);
/**
* Same as forceRepairAsync, but handles a specified range
*/