This article is the 9th day article of MicroAd Advent Calendar 2019.
Everyone loves Elasticsearch, A tool called [ElastAlert](https://github.com/ Yelp/elastalert) is useful when you want to constantly monitor data on Elasticsearch and skip alerts when a specific pattern occurs.
ElastAlert comes with a number of monitoring patterns called Rule
by default, but there are times when you can't meet your needs by themselves.
In such a case, let's solve it by creating a new monitoring pattern called a custom rule.
These two were used as a reference when creating custom rules. Especially the latter is a must-see because it has practical code.
In this article
Create a rule that can be.
For example, suppose you have field_a
and field_b
in an index with Elasticsearch.
-0.001 < sum(field_a) - sum(field_b) < 0.001
In that state, I want to do something like ** alert when ** disappears.
As an example of the specific use of the author, "It should be the same, but it is bad if the values are different from each other." Set this rule for two data, and if they are different values, immediately It's like skipping notifications to.
Among the default rules, MetricAggregationRule
has a similar function, but since this rule can only use the result of aggregating a single field, it is not possible to" monitor the calculation result between aggregated results ".
While referring to the MetricAggregationRule
class, it is the base of the rules that handle aggregated data [BaseAggregationRule](https://github.com/ Yelp/elastalert/blob/325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead/elastalert/ruletypes.py#L972" Base ) Is inherited to create a class called BinaryOperationOnAggregatedMetricRule
.
elastalert/elastalert_modules/custum_rules.py
import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule
class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):
required_options = frozenset([
'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
'binary_operator'
])
allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
'subtract': {'func': op.sub, 'sign': '-'},
'multiply': {'func': op.mul, 'sign': '*'},
'divide': {'func': op.truediv, 'sign': '/'}}
def __init__(self, *args):
super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
self.ts_field = self.rules.get('timestamp_field', '@timestamp')
self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
self.rules['aggregation_query_element'] = self.generate_aggregation_query()
if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))
if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))
if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")
def get_match_str(self, match):
message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
self.rules['metric_agg_key_first'],
self.rules['metric_agg_type_first'],
self.binary_operator['sign'],
self.rules['metric_agg_key_second'],
self.rules['metric_agg_type_second'],
str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
self.rules.get('min_threshold'),
self.rules.get('max_threshold')
)
if self.rules.get('delete_ruletype_text'):
message = ''
top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]
def events_to_message(items):
message = ''
items = sorted(items, key=lambda x: x[1], reverse=True)
for term, count in items:
message += '%s : %s\n' % (term, count)
return message
for key, counts in top_events:
message += '%s:\n' % (key)
message += '%s\n' % (events_to_message(counts.items()))
return message
def generate_aggregation_query(self):
"""
custom_top_count_keys: A list of fields.
ElastAlert will perform a terms query for the top X most common values for each of the fields,
where X is 5 by default, or custom_top_count_number if it exists.
custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
"""
query = {
"all_matching_docs": {
"filters": {
"filters": {
"all": {
"match_all": {}
}
}
},
'aggs': {
'topx_match_aggs': {
"filter": {
"bool": {
"must": []
}
},
'aggregations': {
}
},
self.metric_key_first: {
self.rules['metric_agg_type_first']: {
'field': self.rules['metric_agg_key_first']
}
},
self.metric_key_second: {
self.rules['metric_agg_type_second']: {
'field': self.rules['metric_agg_key_second']
}
},
'binary_operation': {
'bucket_script': {
'buckets_path': {
'first': self.metric_key_first,
'second': self.metric_key_second
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
}
if self.rules.get('custom_top_count_keys'):
number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
for key in keys:
child_query = {
'terms': {
'field': key,
'order': {'_count': 'desc'},
'size': number
},
'aggs': {
'metric_aggregation_first': {
self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
},
'metric_aggregation_second': {
self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
},
'metric_aggregation': {
'bucket_script': {
'buckets_path': {
'first': 'metric_aggregation_first',
'second': 'metric_aggregation_second'
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
return query
def check_matches(self, timestamp, query_key, aggregation_data):
if "compound_query_key" in self.rules:
self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match = {
self.rules['timestamp_field']: timestamp,
self.metric_key_first: metric_val_first,
self.metric_key_second: metric_val_second,
'binary_operation': binary_operation
}
if query_key is not None:
match[self.rules['query_key']] = query_key
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match.update(counts)
self.add_match(match)
def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
if len(compound_keys) < 1:
# shouldn't get to this point, but checking for safety
return
match_data[compound_keys[0]] = aggregation_data['key']
if 'bucket_aggs' in aggregation_data:
for result in aggregation_data['bucket_aggs']['buckets']:
self.check_matches_recursive(timestamp,
query_key,
result,
compound_keys[1:],
match_data)
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match_data[self.rules['timestamp_field']] = timestamp
match_data[self.metric_key_first] = metric_val_first
match_data[self.metric_key_second] = metric_val_second
match_data['binary_operation'] = binary_operation
# add compound key to payload to allow alerts to trigger for every unique occurrence
compound_value = [match_data[key] for key in self.rules['compound_query_key']]
match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match_data.update(counts)
self.add_match(match_data)
def get_top_counts(self, aggregation_data):
"""
Counts the number of events for each unique value for each key field.
Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
"""
all_counts = {}
number = self.top_count_number
keys = self.top_count_keys
for key in keys:
hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
if hits_terms is None:
top_events_count = {}
else:
buckets = hits_terms.get('buckets')
terms = {}
for bucket in buckets:
terms[bucket['key']] = bucket['metric_aggregation']['value']
counts = terms.items()
counts = sorted(counts, key=lambda x: x[1], reverse=True)
top_events_count = dict(counts[:number])
# Save a dict with the top 5 events by key
all_counts['top_events_%s' % (key)] = top_events_count
return all_counts
def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
if metric_val_first is None or metric_val_second is None:
return False
if metric_val_second == 0 and binary_operator == op.truediv:
return False
if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
return True
if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
return True
return False
The code is python, but most of the point is to understand the structure of the internally issued ʻElasticsearch query`.
In this query, it was necessary to understand the contents of bucket_script
and buckets_path
.
The items specified in the configuration file that uses this rule are as follows.
'min','max','avg','sum','cardinality','value_count'
+,-, ×, ÷
Can be used respectively.
rule_setting.yaml
es_host: <host_name>
es_port: <port_number>
name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"
index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>
# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract
min_threshold: -0.0001
max_threshold: 0.00001
query_key:
- xxxxxx_id
custom_top_count_keys:
- zzzzzz_id
If the settings and programs work, you should get an alert like this:
your rule name
Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)
zzzzzz_id:
19 : 0.25
binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1
Most monitoring patterns are feasible as long as you can issue queries that work as intended.
However, there are cases where it is a little troublesome only when customizing the part that specifies the time. This is because the query is constructed in a place different from the custom rule for the part that specifies the time, so it is difficult to customize the time specification as it is.
but it's okay. How pretty. I would like to introduce the situation that can be managed at the next opportunity.
Recommended Posts