Maintenant que j'étudie Elastic Search, je me suis demandé si je pouvais faire quelque chose avec ES, j'ai donc écrit un robot d'exploration Web basé sur les événements utilisant Kinesis + Lambda.
Le flux général est le suivant.
Étant donné que vous avez besoin d'une autorisation pour utiliser Kinesis et Elastic Search, Avoir ** ID de clé d'accès ** et ** clé d'accès secrète ** pour chacun.
De plus, l'ARN de l'utilisateur est également requis, alors gardez-le à l'esprit (arn: aws: iam :: ********: user / ********)
Commencez par créer un flux Kinesis.
Ensuite, créez un ES avec Amazon Elastic Search Service.
Create a new domain
Entrez un nom de domaine approprié dans le champ Nom de domaine Elasticsearch (provisoirement archives Web)
Sélectionnez [5.1] pour la version Elasticsearch. Appuyez sur [Suivant]
Définissez le type d'instance sur [t2.small] dans Configurer le cluster. Appuyez sur Suivant (pour tester, donc avec une petite instance)
Sélectionnez Autoriser ou refuser l'accès à un ou plusieurs comptes AWS ou utilisateurs IAM dans la stratégie d'accès Configurer
Entrez l'ARN de l'utilisateur que vous souhaitez autoriser dans l'ID de compte ou l'ARN *
Créer avec [Confirmer et créer]
Après un certain temps, ES démarrera, vérifiez donc [Endpoint] et gardez-le à l'esprit car il sera utilisé dans Lambda.
Créez des données de mappage pour enregistrer l'URL, le titre et le contenu de l'article pour le stockage des articles.
mapping.json
{
"mappings": {
"article": {
"properties" : {
"url" : {
"type": "string",
"index" : "not_analyzed"
},
"title" : {
"type": "string",
"index" : "analyzed"
},
"contents" : {
"type": "string",
"index" : "analyzed"
}
}
}
}
}
Ensuite, créez les données de mappage ci-dessus et un script pour créer un index.
Installez les packages suivants localement à l'avance
$ pip install requests_aws4auth elasticsearch
es-mapping.py
# -*- coding: utf-8 -*-
import elasticsearch
from requests_aws4auth import AWS4Auth
import json
if __name__ == '__main__':
#Spécifiez le point de terminaison ES
host='search-***************.ap-northeast-1.es.amazonaws.com'
awsauth = AWS4Auth(
#ID de clé d'accès utilisateur AWS et clé d'accès secrète
'ACCESS_KRY_ID',
'SECRET_ACCESS_KEY',
'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
f = open('mapping.json', 'r')
mapping = json.load(f)
es.indices.create(index='website')
es.indices.put_mapping(index='website', doc_type='article', body=mapping['mappings'])
$ python es-mapping.py
Lorsque vous exécutez le script, il doit être indexé sur AWS ES.
Maintenant que nous avons créé ElasticSearch, il est temps de créer la fonction Lambda.
Créez une fonction Lambda localement.
$ mkdir web_crawler
$ cd web_crawler
$ vim lambda_function.py
lambda_function.py
# -*- coding: utf-8 -*-
import os
import base64
from readability import Document
import html2text
import requests
import elasticsearch
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
host = os.environ['ES_HOST']
#Utiliser le rôle IAM pour s'authentifier auprès d'ElasticSearch Service
awsauth = AWS4Auth(
os.environ['ACCESS_ID'],
os.environ['SECRET_KEY'], 'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
articles = []
#Obtenir des événements de Kinesis Stream
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
try:
response = requests.get(payload)
if response.ok:
article = Document(response.content).summary()
titleText = html2text.html2text(Document(response.content).title())
contentsText = html2text.html2text(article)
res = es.search(index="website", body={"query": {"match": {"url": payload}}})
#L'URL est-elle déjà enregistrée dans ES?
if res['hits']['total'] is 0:
doc = {
'url': payload,
'title': titleText.encode('utf-8'),
'contents': contentsText.encode('utf-8')
}
articles.append({'_index':'website', '_type':'scraper', '_source':doc})
except requests.exceptions.HTTPError as err:
print("HTTPError: " + err)
# Bulk Insert
helpers.bulk(es, articles)
Après avoir créé la fonction Lambda, installez les bibliothèques requises dans la même hiérarchie
$ pip install readability-lxml html2text elasticsearch requests_aws4auth requests -t /path/to/web_crawler
Durcir avec zip
$ zip -r web_crawler.zip .
[Créer une fonction Lambda]
Sélectionnez [Fonction vierge]
Sélectionnez le flux Kinesis créé précédemment dans [Paramètres de déclenchement].
[Taille du lot] est d'environ 10
[Position de départ] est une coupe horizontale
Vérifiez l'activation de la gâchette
Entrez [Nom] dans [Paramètres de fonction](provisoirement WebCrawler ici)
Sélectionnez Python 2.7 pour Runtime
Sous Type d'entrée de code, sélectionnez Télécharger un fichier .ZIP
Spécifiez le fichier zip créé précédemment à partir de [Function Package]
Définissez [Variables d'environnement] sur 3 pour accéder à Elastic Search.
Accédez à l'ID de clé dans ACCESS_ID
Clé d'accès secrète à SECRET_KEY
Point de terminaison Elastic Search sur ES_HOST
[Handler] reste lambda_function.lambda_handler
Créez les rôles appropriés
Réglez [Délai] dans [Paramètres détaillés] sur environ 2 minutes.
[Créer une fonction]
Ensuite, dans la dernière étape, utilisez Scrapy pour extraire l'URL de la page de liste et envoyer les données au flux Kinesis.
La page de liste utilise l'entrée chaude de Hatena Bookmark. RSS semble être plus facile à obtenir des données si vous utilisez Scrapy, mais j'ai osé gratter la page Web. Scrapy est un framework utile et puissant pour créer des robots d'exploration avancés, donc si vous êtes intéressé, n'hésitez pas à le toucher.
Installez d'abord Scrapy
$ pip install scrapy
$ scrapy startproject hotentry
$ vim hotentry/hotentry/spiders/hotentry.py
Entrez le code ci-dessous.
hotentry.py
# -*- coding: utf-8 -*-
import scrapy
from scrapy.conf import settings
import boto3
import json
kinesis = boto3.client(
'kinesis',
aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'],
region_name='ap-northeast-1')
class HotEntrySpider(scrapy.Spider):
name = "hotentry"
allowed_domains = ["b.hatena.ne.jp"]
start_urls = ['http://b.hatena.ne.jp/hotentry/general']
def parse(self, response):
for sel in response.css("li.hb-entry-unit-with-favorites"):
url = sel.css("a.entry-link::attr('href')").extract_first()
if url is None:
continue
kinesis.put_record(
StreamName = "scraping_url",
Data = sel.css("a.entry-link::attr('href')").extract_first(),
PartitionKey = "scraper"
)
$ vim hotentry/hotentry/settings.py
Ajoutez l'ID de la clé d'accès et la clé d'accès secrète à settings.py
AWS_ACCESS_KEY_ID = 'AKI******************'
AWS_SECRET_ACCESS_KEY = '************************************'
Vous pouvez maintenant METTRE dans le flux Kinesis. Essayons d'exécuter ce code.
$ scrapy crawl hotenty
Vous devriez maintenant être en mesure de remplir les données avec "Scrapy-> Kinesis-> AWS Lambda-> Elastic Search".
J'ai pu extraire l'URL avec Scrapy et l'envoyer à Kinesis, mais tel quel, ce sera un lot local, alors déployez le code Scrapy sur un service cloud appelé Scrapinghub.
Veuillez consulter l'article suivant pour savoir comment l'installer.
C'est facile à faire de l'enregistrement des utilisateurs au déploiement, je vais donc le décomposer.
Au départ, j'ai utilisé SQS et DynamoDB et divisé la fonction Lambda en plusieurs parties, mais cela est devenu compliqué et j'étais frustré car je ne pouvais pas suivre l'erreur. Après tout, c'est simple, c'est mieux. Je souhaite que davantage de déclencheurs Lambda prennent en charge plus de services.
** * Étant donné que ce code a été écrit dans un test, la gestion des erreurs, etc. n'est pas strictement effectuée. Si vous rencontrez des inconvénients avec ce code, veuillez le faire à vos propres risques. ** **
Recommended Posts