209 lines
9.5 KiB
Python
209 lines
9.5 KiB
Python
|
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||
|
# This source code is licensed under both the GPLv2 (found in the
|
||
|
# COPYING file in the root directory) and Apache 2.0 License
|
||
|
# (found in the LICENSE.Apache file in the root directory).
|
||
|
|
||
|
from abc import abstractmethod
|
||
|
from advisor.db_log_parser import DataSource
|
||
|
from enum import Enum
|
||
|
import math
|
||
|
|
||
|
|
||
|
NO_ENTITY = 'ENTITY_PLACEHOLDER'
|
||
|
|
||
|
|
||
|
class TimeSeriesData(DataSource):
|
||
|
class Behavior(Enum):
|
||
|
bursty = 1
|
||
|
evaluate_expression = 2
|
||
|
|
||
|
class AggregationOperator(Enum):
|
||
|
avg = 1
|
||
|
max = 2
|
||
|
min = 3
|
||
|
latest = 4
|
||
|
oldest = 5
|
||
|
|
||
|
def __init__(self):
|
||
|
super().__init__(DataSource.Type.TIME_SERIES)
|
||
|
self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]]
|
||
|
self.stats_freq_sec = None
|
||
|
|
||
|
@abstractmethod
|
||
|
def get_keys_from_conditions(self, conditions):
|
||
|
# This method takes in a list of time-series conditions; for each
|
||
|
# condition it manipulates the 'keys' in the way that is supported by
|
||
|
# the subclass implementing this method
|
||
|
pass
|
||
|
|
||
|
@abstractmethod
|
||
|
def fetch_timeseries(self, required_statistics):
|
||
|
# this method takes in a list of statistics and fetches the timeseries
|
||
|
# for each of them and populates the 'keys_ts' dictionary
|
||
|
pass
|
||
|
|
||
|
def fetch_burst_epochs(
|
||
|
self, entities, statistic, window_sec, threshold, percent
|
||
|
):
|
||
|
# type: (str, int, float, bool) -> Dict[str, Dict[int, float]]
|
||
|
# this method calculates the (percent) rate change in the 'statistic'
|
||
|
# for each entity (over 'window_sec' seconds) and returns the epochs
|
||
|
# where this rate change is greater than or equal to the 'threshold'
|
||
|
# value
|
||
|
if self.stats_freq_sec == 0:
|
||
|
# not time series data, cannot check for bursty behavior
|
||
|
return
|
||
|
if window_sec < self.stats_freq_sec:
|
||
|
window_sec = self.stats_freq_sec
|
||
|
# 'window_samples' is the number of windows to go back to
|
||
|
# compare the current window with, while calculating rate change.
|
||
|
window_samples = math.ceil(window_sec / self.stats_freq_sec)
|
||
|
burst_epochs = {}
|
||
|
# if percent = False:
|
||
|
# curr_val = value at window for which rate change is being calculated
|
||
|
# prev_val = value at window that is window_samples behind curr_window
|
||
|
# Then rate_without_percent =
|
||
|
# ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp)
|
||
|
# if percent = True:
|
||
|
# rate_with_percent = (rate_without_percent * 100) / prev_val
|
||
|
# These calculations are in line with the rate() transform supported
|
||
|
# by ODS
|
||
|
for entity in entities:
|
||
|
if statistic not in self.keys_ts[entity]:
|
||
|
continue
|
||
|
timestamps = sorted(list(self.keys_ts[entity][statistic].keys()))
|
||
|
for ix in range(window_samples, len(timestamps), 1):
|
||
|
first_ts = timestamps[ix - window_samples]
|
||
|
last_ts = timestamps[ix]
|
||
|
first_val = self.keys_ts[entity][statistic][first_ts]
|
||
|
last_val = self.keys_ts[entity][statistic][last_ts]
|
||
|
diff = last_val - first_val
|
||
|
if percent:
|
||
|
diff = diff * 100 / first_val
|
||
|
rate = (diff * self.duration_sec) / (last_ts - first_ts)
|
||
|
# if the rate change is greater than the provided threshold,
|
||
|
# then the condition is triggered for entity at time 'last_ts'
|
||
|
if rate >= threshold:
|
||
|
if entity not in burst_epochs:
|
||
|
burst_epochs[entity] = {}
|
||
|
burst_epochs[entity][last_ts] = rate
|
||
|
return burst_epochs
|
||
|
|
||
|
def fetch_aggregated_values(self, entity, statistics, aggregation_op):
|
||
|
# type: (str, AggregationOperator) -> Dict[str, float]
|
||
|
# this method performs the aggregation specified by 'aggregation_op'
|
||
|
# on the timeseries of 'statistics' for 'entity' and returns:
|
||
|
# Dict[statistic, aggregated_value]
|
||
|
result = {}
|
||
|
for stat in statistics:
|
||
|
if stat not in self.keys_ts[entity]:
|
||
|
continue
|
||
|
agg_val = None
|
||
|
if aggregation_op is self.AggregationOperator.latest:
|
||
|
latest_timestamp = max(list(self.keys_ts[entity][stat].keys()))
|
||
|
agg_val = self.keys_ts[entity][stat][latest_timestamp]
|
||
|
elif aggregation_op is self.AggregationOperator.oldest:
|
||
|
oldest_timestamp = min(list(self.keys_ts[entity][stat].keys()))
|
||
|
agg_val = self.keys_ts[entity][stat][oldest_timestamp]
|
||
|
elif aggregation_op is self.AggregationOperator.max:
|
||
|
agg_val = max(list(self.keys_ts[entity][stat].values()))
|
||
|
elif aggregation_op is self.AggregationOperator.min:
|
||
|
agg_val = min(list(self.keys_ts[entity][stat].values()))
|
||
|
elif aggregation_op is self.AggregationOperator.avg:
|
||
|
values = list(self.keys_ts[entity][stat].values())
|
||
|
agg_val = sum(values) / len(values)
|
||
|
result[stat] = agg_val
|
||
|
return result
|
||
|
|
||
|
def check_and_trigger_conditions(self, conditions):
|
||
|
# get the list of statistics that need to be fetched
|
||
|
reqd_keys = self.get_keys_from_conditions(conditions)
|
||
|
# fetch the required statistics and populate the map 'keys_ts'
|
||
|
self.fetch_timeseries(reqd_keys)
|
||
|
# Trigger the appropriate conditions
|
||
|
for cond in conditions:
|
||
|
complete_keys = self.get_keys_from_conditions([cond])
|
||
|
# Get the entities that have all statistics required by 'cond':
|
||
|
# an entity is checked for a given condition only if we possess all
|
||
|
# of the condition's 'keys' for that entity
|
||
|
entities_with_stats = []
|
||
|
for entity in self.keys_ts:
|
||
|
stat_missing = False
|
||
|
for stat in complete_keys:
|
||
|
if stat not in self.keys_ts[entity]:
|
||
|
stat_missing = True
|
||
|
break
|
||
|
if not stat_missing:
|
||
|
entities_with_stats.append(entity)
|
||
|
if not entities_with_stats:
|
||
|
continue
|
||
|
if cond.behavior is self.Behavior.bursty:
|
||
|
# for a condition that checks for bursty behavior, only one key
|
||
|
# should be present in the condition's 'keys' field
|
||
|
result = self.fetch_burst_epochs(
|
||
|
entities_with_stats,
|
||
|
complete_keys[0], # there should be only one key
|
||
|
cond.window_sec,
|
||
|
cond.rate_threshold,
|
||
|
True
|
||
|
)
|
||
|
# Trigger in this case is:
|
||
|
# Dict[entity_name, Dict[timestamp, rate_change]]
|
||
|
# where the inner dictionary contains rate_change values when
|
||
|
# the rate_change >= threshold provided, with the
|
||
|
# corresponding timestamps
|
||
|
if result:
|
||
|
cond.set_trigger(result)
|
||
|
elif cond.behavior is self.Behavior.evaluate_expression:
|
||
|
self.handle_evaluate_expression(
|
||
|
cond,
|
||
|
complete_keys,
|
||
|
entities_with_stats
|
||
|
)
|
||
|
|
||
|
def handle_evaluate_expression(self, condition, statistics, entities):
|
||
|
trigger = {}
|
||
|
# check 'condition' for each of these entities
|
||
|
for entity in entities:
|
||
|
if hasattr(condition, 'aggregation_op'):
|
||
|
# in this case, the aggregation operation is performed on each
|
||
|
# of the condition's 'keys' and then with aggregated values
|
||
|
# condition's 'expression' is evaluated; if it evaluates to
|
||
|
# True, then list of the keys values is added to the
|
||
|
# condition's trigger: Dict[entity_name, List[stats]]
|
||
|
result = self.fetch_aggregated_values(
|
||
|
entity, statistics, condition.aggregation_op
|
||
|
)
|
||
|
keys = [result[key] for key in statistics]
|
||
|
try:
|
||
|
if eval(condition.expression):
|
||
|
trigger[entity] = keys
|
||
|
except Exception as e:
|
||
|
print(
|
||
|
'WARNING(TimeSeriesData) check_and_trigger: ' + str(e)
|
||
|
)
|
||
|
else:
|
||
|
# assumption: all stats have same series of timestamps
|
||
|
# this is similar to the above but 'expression' is evaluated at
|
||
|
# each timestamp, since there is no aggregation, and all the
|
||
|
# epochs are added to the trigger when the condition's
|
||
|
# 'expression' evaluated to true; so trigger is:
|
||
|
# Dict[entity, Dict[timestamp, List[stats]]]
|
||
|
for epoch in self.keys_ts[entity][statistics[0]].keys():
|
||
|
keys = [
|
||
|
self.keys_ts[entity][key][epoch]
|
||
|
for key in statistics
|
||
|
]
|
||
|
try:
|
||
|
if eval(condition.expression):
|
||
|
if entity not in trigger:
|
||
|
trigger[entity] = {}
|
||
|
trigger[entity][epoch] = keys
|
||
|
except Exception as e:
|
||
|
print(
|
||
|
'WARNING(TimeSeriesData) check_and_trigger: ' +
|
||
|
str(e)
|
||
|
)
|
||
|
if trigger:
|
||
|
condition.set_trigger(trigger)
|