I was wondering if I could easily create a MapReduce application in Python, which I'm used to, and run it on ** Amazon EMR **, but it was called mrjob. I learned about the Python framework. There are other options such as PySpark, but it is a framework with the impression that it has a lower learning cost and is easy to implement. So, in this article, I would like to describe how to run an application created with mrjob on Amazon EMR. I'll also try running it on GCP's ** Cloud Dataproc **.
mrjob is a framework that allows you to write applications in Python that run in Hadoop clusters. ** You can easily run MapReduce applications locally or in the cloud without any Hadoop expertise. ** mrjob is running on Hadoop Streaming internally. What is Hadoop in the first place? In that case, the outline is described in this article, so please refer to it if you like.
Normally, a MapReduce application splits the input dataset and performs ** Map processing **, ** (Shuffle processing) **, ** Reduce processing **. You can also use a process called ** Combine process ** that performs an intermediate aggregation before passing the output result of the Map process to the Reduce process.
The application implemented this time is a program that counts the number of times a word appears in a document. It is expected to receive the following input.
input.txt
We are the world, we are the children
We are the ones who make a brighter day
Takes key-value pairs as input line by line and returns zero or more key-value pairs.
Input
Input: (None, "we are the world, we are the children")
Output
Output:
"we", 1
"are", 1
"the", 1
"world", 1
"we", 1
"are", 1
"the", 1
"children", 1
It takes a line-by-line list of keys and their values as input and returns zero or more key-value pairs.
Input
Input: ("we", [1, 1])
Output
Output:
"we", 2
It takes a list of keys and their values as input and returns zero or more key-value pairs as output.
Input
Input: ("we", [2, 1]) #1st line"we"2 times, 2nd line"we"Appears once
Output
Output:
"we", 3
Build an environment for creating MapReduce applications in Python.
You can install it from PyPI. This time I'm using version ** 0.7.1 **.
pip install mrjob==0.7.1
Now, let's write the above process in mrjob. If the process is simple and can be described in one step, create a class that inherits ** mrjob.job.MRJob ** as follows.
mr_word_count.py
import re
from mrjob.job import MRJob
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
#Process the input line by line,(word, 1)Generate key value of
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
#Takes a line-by-line key and a list of its values as input and totals
def combiner(self, word, counts):
yield word, sum(counts)
#Take a list of keys and their values as input and sum them up
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
If the process is more complicated and you need to process in multiple steps, define the process by passing ** mrjob.step.MRStep ** to MRJob's ** steps ** function as follows: can do.
mr_word_count.py
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(mapper=...,
combiner=...,
reducer=...),
...
]
def mapper_get_words(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner_count_words(self, word, counts):
yield word, sum(counts)
def reducer_count_words(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Try running your application locally, in an Amazon EMR, or Cloud Dataproc environment. The package structure is like this.
Package configuration
.
├── config.yaml
├── input.txt
└── src
├── __init__.py
└── mr_word_count.py
When running the application, you can specify options such as the input path. There are two options, ** pre-written in the Config file ** and ** passed from the console **. If the specified option is covered, ** the value passed from the console takes precedence. ** Config files can be defined in yaml or json format.
To run the application created by mrjob, enter the following from the console:
python {Application path} -r {Execution environment} -c {Config file path} < {Input path} > {Output path}
If you run it locally, you don't need to specify the execution environment or the path to the Config file if you don't need it. If you do not specify the output path, it will be output to the standard output.
python src/mr_word_count.py < input.txt
#If you need to pass the execution environment or the path of the Config file
python src/mr_word_count.py -r local -c config.yaml < input.txt
When you run the above command, the application will run and you will see results similar to the following in standard output:
"world" 1
"day" 1
"make" 1
"ones" 1
"the" 3
"we" 3
"who" 1
"brighter" 1
"children" 1
"a" 1
"are" 3
Amazon EMR To run on Amazon EMR, you must set ** AWS_ACCESS_KEY_ID ** and ** AWS_SECRET_ACCESS_KEY ** in your environment variables or in the Config file. You can also set the instance type and the number of cores. The application and its associated files are uploaded to S3 before they run.
config.yaml
runners:
emr:
aws_access_key_id: <your key ID>
aws_secret_access_key: <your secret>
instance_type: c1.medium
num_core_instances: 4
Other options can be found here (https://mrjob.readthedocs.io/en/latest/guides/emr-opts.html).
When you run it, select ** emr ** for your environment.
python src/mr_word_count.py -r emr -c config.yaml input.txt --output-dir=s3://emr-test/output/
Cloud Dataproc To take advantage of Cloud Dataproc, enable the Dataproc API from GCP. Then, specify the path of the credential file in the environment variable ** GOOGLE_APPLICATION_CREDENTIALS **. In the Config file, set the zone, type, and number of cores of the instance. Another option is here (https://mrjob.readthedocs.io/en/latest/guides/dataproc-opts.html).
config.yaml
runners:
dataproc:
zone: us-central1-a
instance_type: n1-standard-1
num_core_instances: 2
When you run it, select ** dataproc ** for your environment.
python src/mr_word_count.py -r dataproc -c config.yaml input.txt --output-dir=gs://dataproc-test/output/
By using mrjob, I was able to easily create a MapReduce application in Python, and I was able to easily execute the created application in the cloud environment. In addition, mrjob has abundant documents, so it is easy to get started and is very easy to get started if you want to easily execute MapReduce jobs in Python. It was a useful framework.
Recommended Posts