I'm studying ElasticSearch right now, so I wondered if I could do something with ES, so I wrote an event-driven web crawler using Kinesis + Lambda.
--Execution environment - CentOS7 - python 2.7
The general flow is as follows.
Since you need permission to use Kinesis and ElasticSearch, Have ** access key ID ** and ** secret access key ** for each.
Also, the user's ARN is also required, so keep it in mind (arn: aws: iam :: ********: user / ************)
First, create a Kinesis stream.
Next, create an ES with Amazon ElasticSearch Service.
Create a new domain
Enter an appropriate domain name in Elasticsearch domain name (tentatively web-archives)
Select [5.1] for Elasticsearch version. Press [Next]
Set Instance type to [t2.small] in Configure cluster. Press Next (for testing, so with a small instance)
Select Allow or deny access to one or more AWS accounts or IAM users in the Set up access policy
Enter the ARN of the user you want to allow in Account ID or ARN *
Create with [Confirm and create]
After a while, ES will start up, so check [Endpoint] and use it with Lambda, so keep it in mind.
Create mapping data to save the URL, title, and article content for saving the article.
mapping.json
{
"mappings": {
"article": {
"properties" : {
"url" : {
"type": "string",
"index" : "not_analyzed"
},
"title" : {
"type": "string",
"index" : "analyzed"
},
"contents" : {
"type": "string",
"index" : "analyzed"
}
}
}
}
}
Next, create the above mapping data and a script to create an index.
Install the following packages locally in advance
$ pip install requests_aws4auth elasticsearch
es-mapping.py
# -*- coding: utf-8 -*-
import elasticsearch
from requests_aws4auth import AWS4Auth
import json
if __name__ == '__main__':
#Specify ES endpoint
host='search-***************.ap-northeast-1.es.amazonaws.com'
awsauth = AWS4Auth(
#AWS user access key ID and secret access key
'ACCESS_KRY_ID',
'SECRET_ACCESS_KEY',
'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
f = open('mapping.json', 'r')
mapping = json.load(f)
es.indices.create(index='website')
es.indices.put_mapping(index='website', doc_type='article', body=mapping['mappings'])
$ python es-mapping.py
When you run the script, it should be indexed on AWS ES.
Now that we've created ElasticSearch, let's create a Lambda Function.
Create a Lambda function locally.
$ mkdir web_crawler
$ cd web_crawler
$ vim lambda_function.py
lambda_function.py
# -*- coding: utf-8 -*-
import os
import base64
from readability import Document
import html2text
import requests
import elasticsearch
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
host = os.environ['ES_HOST']
#Use IAM Role to authenticate to ElasticSearch Service
awsauth = AWS4Auth(
os.environ['ACCESS_ID'],
os.environ['SECRET_KEY'], 'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
articles = []
#Get events from Kinesis Stream
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
try:
response = requests.get(payload)
if response.ok:
article = Document(response.content).summary()
titleText = html2text.html2text(Document(response.content).title())
contentsText = html2text.html2text(article)
res = es.search(index="website", body={"query": {"match": {"url": payload}}})
#Is the URL already registered in ES?
if res['hits']['total'] is 0:
doc = {
'url': payload,
'title': titleText.encode('utf-8'),
'contents': contentsText.encode('utf-8')
}
articles.append({'_index':'website', '_type':'scraper', '_source':doc})
except requests.exceptions.HTTPError as err:
print("HTTPError: " + err)
# Bulk Insert
helpers.bulk(es, articles)
After creating the Lambda function, install the required libraries in the same hierarchy
$ pip install readability-lxml html2text elasticsearch requests_aws4auth requests -t /path/to/web_crawler
Zip it up
$ zip -r web_crawler.zip .
[Create Lambda function]
Select [Blank Function]
Select the Kinesis stream created earlier in [Trigger Settings].
[Batch size] is about 10
[Start position] is horizontal trim
Check trigger activation
Enter [Name] in [Function Settings](Tentatively WebCrawler here)
Select Python 2.7 for Runtime
Under Code Entry Type, select Upload .ZIP file
Specify the zip file created earlier from [Function Package]
Set [Environment Variables] to 3 for accessing ElasticSearch.
Access key ID in ACCESS_ID
Secret access key to SECRET_KEY
ElasticSearch endpoint on ES_HOST
[Handler] remains lambda_function.lambda_handler
Create roles as appropriate
Set [Timeout] in [Detailed Settings] to about 2 minutes.
[Create Function]
Next, in the final stage, I will use Scrapy to extract the URL from the list page and send the data to the Kinesis stream.
The list page uses the hot entry of Hatena Bookmark. RSS seems to be easier to get data if you use Scrapy, but I dared to scrape it from a web page. Scrapy is a useful and powerful framework for creating advanced web crawlers, so feel free to touch it if you're interested.
First install Scrapy
$ pip install scrapy
$ scrapy startproject hotentry
$ vim hotentry/hotentry/spiders/hotentry.py
Enter the code below.
hotentry.py
# -*- coding: utf-8 -*-
import scrapy
from scrapy.conf import settings
import boto3
import json
kinesis = boto3.client(
'kinesis',
aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'],
region_name='ap-northeast-1')
class HotEntrySpider(scrapy.Spider):
name = "hotentry"
allowed_domains = ["b.hatena.ne.jp"]
start_urls = ['http://b.hatena.ne.jp/hotentry/general']
def parse(self, response):
for sel in response.css("li.hb-entry-unit-with-favorites"):
url = sel.css("a.entry-link::attr('href')").extract_first()
if url is None:
continue
kinesis.put_record(
StreamName = "scraping_url",
Data = sel.css("a.entry-link::attr('href')").extract_first(),
PartitionKey = "scraper"
)
$ vim hotentry/hotentry/settings.py
Add access key ID and secret access key to settings.py
AWS_ACCESS_KEY_ID = 'AKI******************'
AWS_SECRET_ACCESS_KEY = '************************************'
You can now PUT into your Kinesis stream. Let's try running this code.
$ scrapy crawl hotenty
You should now be able to populate the data with "Scrapy-> Kinesis-> AWS Lambda-> ElasticSearch".
I was able to extract the URL with Scrapy and send it to Kinesis, but as it is, it will be a local batch, so deploy the Scrapy code to a cloud service called Scrapinghub.
Please see the following article for details on how to install it.
It's easy to do from user registration to deployment, so I'll fold it.
Initially, I used SQS and DynamoDB and divided the Lambda function into multiple functions, but it became complicated and I was frustrated because I could not follow the error. After all simple is best. I want more Lambda triggers to support more services.
** * Since this code was written in a test, error handling etc. is not strictly performed. If you have any disadvantages with this code, please do so at your own risk. ** **
Recommended Posts