Cet article est l'article du 9ème jour du MicroAd Advent Calendar 2019.
Tout le monde aime la recherche élastique, Un outil appelé [Elast Alert](https://github.com/ Yelp / elastalert) est utile lorsque vous souhaitez surveiller en permanence les données sur Elasticsearch et ignorer les alertes lorsqu'un modèle spécifique se produit.
ElastAlert est livré avec un certain nombre de modèles de surveillance appelés «Règle» par défaut, mais il y a des moments où cela ne suffit pas pour répondre à vos besoins. Dans un tel cas, résolvons le problème en créant un nouveau modèle de surveillance appelé règle personnalisée.
Ces deux ont été utilisés comme référence lors de la création de règles personnalisées. Surtout ce dernier est à voir car il a un code pratique.
Dans cet article
Créez une règle qui peut l'être.
Par exemple, s'il y a field_a
et field_b
dans un index d'Elasticsearch,
-0.001 < sum(field_a) - sum(field_b) < 0.001
Dans cet état, je veux faire quelque chose comme ** alerte lorsque ** disparaît.
À titre d’exemple de l’utilisation spécifique de l’auteur, «Ce devrait être la même chose, mais c’est mauvais si les valeurs sont différentes les unes des autres.» Définissez cette règle pour deux données, et s’il s’agit de valeurs différentes, immédiatement C'est comme sauter les notifications vers.
Parmi les règles par défaut, «MetricAggregationRule» a une fonction similaire, mais comme cette règle ne peut utiliser que le résultat de l'agrégation d'un seul champ, il n'est pas possible de «surveiller le résultat du calcul entre les résultats agrégés».
Tout en se référant à la classe MetricAggregationRule
, elle est la base des règles qui gèrent les données agrégées [BaseAggregationRule](https://github.com/ Yelp / elastalert / blob / 325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead / elastalert / ruletypes.py" ) Est hérité pour créer une classe appelée BinaryOperationOnAggregatedMetricRule
.
elastalert/elastalert_modules/custum_rules.py
import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule
class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):
required_options = frozenset([
'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
'binary_operator'
])
allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
'subtract': {'func': op.sub, 'sign': '-'},
'multiply': {'func': op.mul, 'sign': '*'},
'divide': {'func': op.truediv, 'sign': '/'}}
def __init__(self, *args):
super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
self.ts_field = self.rules.get('timestamp_field', '@timestamp')
self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
self.rules['aggregation_query_element'] = self.generate_aggregation_query()
if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))
if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))
if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")
def get_match_str(self, match):
message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
self.rules['metric_agg_key_first'],
self.rules['metric_agg_type_first'],
self.binary_operator['sign'],
self.rules['metric_agg_key_second'],
self.rules['metric_agg_type_second'],
str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
self.rules.get('min_threshold'),
self.rules.get('max_threshold')
)
if self.rules.get('delete_ruletype_text'):
message = ''
top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]
def events_to_message(items):
message = ''
items = sorted(items, key=lambda x: x[1], reverse=True)
for term, count in items:
message += '%s : %s\n' % (term, count)
return message
for key, counts in top_events:
message += '%s:\n' % (key)
message += '%s\n' % (events_to_message(counts.items()))
return message
def generate_aggregation_query(self):
"""
custom_top_count_keys: A list of fields.
ElastAlert will perform a terms query for the top X most common values for each of the fields,
where X is 5 by default, or custom_top_count_number if it exists.
custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
"""
query = {
"all_matching_docs": {
"filters": {
"filters": {
"all": {
"match_all": {}
}
}
},
'aggs': {
'topx_match_aggs': {
"filter": {
"bool": {
"must": []
}
},
'aggregations': {
}
},
self.metric_key_first: {
self.rules['metric_agg_type_first']: {
'field': self.rules['metric_agg_key_first']
}
},
self.metric_key_second: {
self.rules['metric_agg_type_second']: {
'field': self.rules['metric_agg_key_second']
}
},
'binary_operation': {
'bucket_script': {
'buckets_path': {
'first': self.metric_key_first,
'second': self.metric_key_second
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
}
if self.rules.get('custom_top_count_keys'):
number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
for key in keys:
child_query = {
'terms': {
'field': key,
'order': {'_count': 'desc'},
'size': number
},
'aggs': {
'metric_aggregation_first': {
self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
},
'metric_aggregation_second': {
self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
},
'metric_aggregation': {
'bucket_script': {
'buckets_path': {
'first': 'metric_aggregation_first',
'second': 'metric_aggregation_second'
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
return query
def check_matches(self, timestamp, query_key, aggregation_data):
if "compound_query_key" in self.rules:
self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match = {
self.rules['timestamp_field']: timestamp,
self.metric_key_first: metric_val_first,
self.metric_key_second: metric_val_second,
'binary_operation': binary_operation
}
if query_key is not None:
match[self.rules['query_key']] = query_key
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match.update(counts)
self.add_match(match)
def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
if len(compound_keys) < 1:
# shouldn't get to this point, but checking for safety
return
match_data[compound_keys[0]] = aggregation_data['key']
if 'bucket_aggs' in aggregation_data:
for result in aggregation_data['bucket_aggs']['buckets']:
self.check_matches_recursive(timestamp,
query_key,
result,
compound_keys[1:],
match_data)
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match_data[self.rules['timestamp_field']] = timestamp
match_data[self.metric_key_first] = metric_val_first
match_data[self.metric_key_second] = metric_val_second
match_data['binary_operation'] = binary_operation
# add compound key to payload to allow alerts to trigger for every unique occurrence
compound_value = [match_data[key] for key in self.rules['compound_query_key']]
match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match_data.update(counts)
self.add_match(match_data)
def get_top_counts(self, aggregation_data):
"""
Counts the number of events for each unique value for each key field.
Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
"""
all_counts = {}
number = self.top_count_number
keys = self.top_count_keys
for key in keys:
hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
if hits_terms is None:
top_events_count = {}
else:
buckets = hits_terms.get('buckets')
terms = {}
for bucket in buckets:
terms[bucket['key']] = bucket['metric_aggregation']['value']
counts = terms.items()
counts = sorted(counts, key=lambda x: x[1], reverse=True)
top_events_count = dict(counts[:number])
# Save a dict with the top 5 events by key
all_counts['top_events_%s' % (key)] = top_events_count
return all_counts
def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
if metric_val_first is None or metric_val_second is None:
return False
if metric_val_second == 0 and binary_operator == op.truediv:
return False
if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
return True
if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
return True
return False
Le code est en python, mais l'essentiel est de comprendre la structure des «requêtes Elasticsearch» émises en interne.
Dans cette requête, il était nécessaire de comprendre le contenu de «bucket_script» et de «buckets_path».
Les éléments spécifiés dans le fichier de configuration qui utilise cette règle sont les suivants.
+, -, ×, ÷
Peut être utilisé respectivement.
rule_setting.yaml
es_host: <host_name>
es_port: <port_number>
name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"
index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>
# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract
min_threshold: -0.0001
max_threshold: 0.00001
query_key:
- xxxxxx_id
custom_top_count_keys:
- zzzzzz_id
Si les paramètres et les programmes fonctionnent, vous devriez recevoir une alerte comme celle-ci:
your rule name
Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)
zzzzzz_id:
19 : 0.25
binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1
La plupart des modèles de surveillance sont réalisables tant que vous pouvez émettre des requêtes qui fonctionnent comme prévu.
Cependant, il y a des cas où cela n'est un peu gênant que lors de la personnalisation de la pièce qui spécifie l'heure. En effet, la requête est construite à un emplacement différent de la règle personnalisée pour la partie qui spécifie l'heure, il est donc difficile de personnaliser la spécification d'heure telle quelle.
mais c'est d'accord. Comme c'est joli. Je voudrais vous présenter la situation qui peut être gérée à la prochaine occasion.
Recommended Posts