A basic and powerful approach to distributed parallel processing is Single Program Multiple Data (SPMD). This is a model in which multiple processors execute the same program, and each processor processes different data. By dividing large-scale data into units that can be processed independently and processing the subdivided data in parallel by multiple processors, the processing time of the entire data can be significantly reduced.
For example, execute the command job
with ʻa01.txt, ʻa02.txt
, ʻa03.txt, and ʻa04.txt
as input files, and execute the execution results (output) asb01.txt, respectively. Consider storing in
, b02.txt
, b03txt
, b04.txt
. The following code implements this process with a bash shell script.
#!/bin/bash
for src in $@
do
job < $src > ${src/a/b}
done
Since the processing contents in this for loop are independent of each other, they can be easily parallelized and the overall processing time can be shortened.
In natural language processing and machine learning, there are many processes that can be easily parallelized by data division, such as morphological analysis and feature extraction. It seems that such (distributed) parallel processing is called ** stupid ** (in Japan). In this article, how to realize stupidity by command execution with IPython cluster, library specializing in stupidity Bakapara Introducing (: //github.com/chokkan/bakapara).
According to the official documentation Architecture overview, the IPython cluster consists of the following four elements:
To build a parallel execution environment with IPython, you need to start one controller and multiple engines. There are two ways to start the controller and engine.
command and the engine with the ʻipengine
command.
This time, I would like to easily build a cluster environment using the ʻipcluster` command.In this article, the following items are assumed as the environment of the server group.
ssh
without a password (using ssh-agent
etc.)In order to give a concrete explanation, this article uses the following server configuration as an example.
Create and edit the configuration file by referring to the official document Using ipcluster in SSH mode To do. IPython cluster is convenient to manage the cluster execution environment in units called ** profiles **. The name of the profile is arbitrary, but here we will create a profile named "mezcal" based on the name of the server group of the engine.
$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'
A directory called $ HOME / .ipython / profile_ {profile name}
is created, and the IPython cluster configuration file ʻipcluster_config.pyis created. Open this file with an editor and set the execution engine of each node to start via SSH. The setting location is shown with comments left. Let
c.IPClusterStart.engine_launcher_class be
'SSH'`.
ipcluster_config.py
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
# Local : start engines locally as subprocesses [default]
# MPI : use mpiexec to launch engines in an MPI environment
# PBS : use PBS (qsub) to submit engines to a batch queue
# SGE : use SGE (qsub) to submit engines to a batch queue
# LSF : use LSF (bsub) to submit engines to a batch queue
# SSH : use SSH to start the controller
# Note that SSH does *not* move the connection files
# around, so you will likely have to do this manually
# unless the machines are on a shared file system.
# HTCondor : use HTCondor to submit engines to a batch queue
# WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
# c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
# ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'
The difference from the c.IPClusterStart.engine_launcher_class
set earlier is unknown, but the c.IPClusterEngines.engine_launcher_class
is also set to 'SSH'
. In addition, specify the host name and the number of engines (the number of parallel executions) to perform distributed parallel processing in the dictionary object c.SSHEngineSetLauncher.engines
. ʻEnginesSet the host name in the key of the dictionary object and the number of engines in the value. Here is a setting example for starting 4 execution engines each with
mezcal [[01-12]] .cl.ecei.tohoku.ac.jp` and performing up to 48 parallel processing.
ipcluster_config.py
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
# Local : start engines locally as subprocesses [default]
# MPI : use mpiexec to launch engines in an MPI environment
# PBS : use PBS (qsub) to submit engines to a batch queue
# SGE : use SGE (qsub) to submit engines to a batch queue
# LSF : use LSF (bsub) to submit engines to a batch queue
# SSH : use SSH to start the controller
# Note that SSH does *not* move the connection files
# around, so you will likely have to do this manually
# unless the machines are on a shared file system.
# HTCondor : use HTCondor to submit engines to a batch queue
# WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
# c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
# ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'
c.SSHEngineSetLauncher.engines = {
'mezcal01.cl.ecei.tohoku.ac.jp': 4,
'mezcal02.cl.ecei.tohoku.ac.jp': 4,
'mezcal03.cl.ecei.tohoku.ac.jp': 4,
'mezcal04.cl.ecei.tohoku.ac.jp': 4,
'mezcal05.cl.ecei.tohoku.ac.jp': 4,
'mezcal06.cl.ecei.tohoku.ac.jp': 4,
'mezcal07.cl.ecei.tohoku.ac.jp': 4,
'mezcal08.cl.ecei.tohoku.ac.jp': 4,
'mezcal09.cl.ecei.tohoku.ac.jp': 4,
'mezcal10.cl.ecei.tohoku.ac.jp': 4,
'mezcal11.cl.ecei.tohoku.ac.jp': 4,
'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}
If the server that runs the terminal or IPython notebook is different from the controller server, you need to allow connections from other servers to the controller. If the servers are located on a trusted LAN, it is convenient to allow all hosts to connect to the controller. ʻAdd"--ip ='*'"
to the startup options of ipcontroller` (default is that only localhost can be connected).
ipcluster_config.py
#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------
# Launch a controller as a regular external process.
# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']
This time, the home directory is shared between the controller and the engine host, so add the following settings.
ipcluster_config.py
#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------
# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables. There could be other things this needs as well.
# hostname on which to launch the program
# c.SSHLauncher.hostname = ''
# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']
# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''
# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []
# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']
# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []
# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']
# username for ssh
# c.SSHLauncher.user = ''
To understand the meaning of this setting, you need to understand the process of starting the IPython cluster. The flow until IPython cluster starts is shown.
Execute the ʻipcluster` command on the host where you want to run the controller, and start the controller and engine together. At this time, specify the profile name with the --profile
option.
$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully
If "Engines appear to have started successfully" is displayed, it is successful. From the message "Starting 48 Engines with SSH", you can confirm that $ 12 \ times 4 = 48 $ of engines have been started.
ʻImport the IPython.parallel` module.
In [1]: from IPython.parallel import Client
Create a Client
object to operate the controller and engine. Specify the profile name in the profile
argument.
In [2]: rc = Client(profile='mezcal')
If you check the ID of the engine to which the Client
object is connected, you can confirm that it is connected to 48 engines.
In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]
The following procedure has nothing to do with stupidity, but I will explain it as a general usage of IPython cluster. Use the DirectView
instance to execute code directly on each engine without going through the task scheduler.
In [4]: dview = rc[:]
The following code calculates $ x ^ 2 $ for $ x \ in \ {0, 1, ..., 49 \} $ (without parallelization).
In [5]: serial_result = map(lambda x: x**2, range(50))
Let's parallelize this calculation for each element $ x \ in \ {0, 1, ..., 49 \} $.
In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))
The results executed by each engine are aggregated and stored in parallel_result
.
In [7]: parallel_result
Out[7]:
[0,
1,
4,
9,
...
2401]
As a matter of course, the calculation result is the same regardless of the presence or absence of parallelization.
In [8]: serial_result == parallel_result
Out[8]: True
You can use the remote
decorator to define a function (remote function) to be executed by each engine. The following gethostname
function is a function that gets and returns the host name withsocket.getfqdn ()
, but please note that the socket
module is imported in the function. Importing a module on the client side does not mean that the IPython process on the engine has imported the module, so you will need to import the module inside the function.
In [9]: @dview.remote(block=True)
...: def gethostname():
...: import socket
...: return socket.getfqdn()
...:
You can get the host name of each engine by calling the gethostname
function on the client. The order is out of order, but you can see that each of the four engines is running on the hosts from mezcal01 to mezcal12.
In [10]: gethostname()
Out[10]:
['mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
...
'mezcal01.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp']
There are also parallel
decorators that define functions that run in parallel. For more information, see IPython's Direct interface.
The LoadBalancedView
instance executes the job using dynamic load distribution. You will not be able to access individual engines directly, but you can implement job queues on cluster engines, like the job queues in multiprocessing.Pool
.
As a simple example, let's run the command sleep 10
on each engine. Create a list containing the jobs you want to execute.
In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]
Each element of this list is a dictionary type and stores the command you want to execute in the value of the cmd
key. This time, sleep 10
is executed for all jobs, but when actually doing stupidity, the contents of the command should change according to the input data. The first 5 jobs look like this.
In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'}]
Implement the function runjob
that executes the job (command) represented by the dictionary object. The value of the cmd
key of the received dictionary object is executed on the shell, and the return value is stored in the dictionary object and returned.
In [13]: def runjob(job):
....: import subprocess
....: try:
....: returncode = subprocess.call(job['cmd'], shell=True)
....: return dict(code=returncode)
....: except OSError, e:
....: return dict(error=str(e))
....:
In order to execute this runjob
function in the job queue one by one, get aLoadBalancedView
instance from the client.
In [14]: lview = rc.load_balanced_view()
Then, the runjob
function is executed asynchronously for each element of the jobs
list.
In [15]: ar = lview.map_async(runjob, jobs)
Execution of this code is not blocked and an immediate ʻAsyncResult object is returned. You can check the job execution result and progress status via this ʻAsyncResult
object. For example, let's display the job execution status on an interactive shell (also available on IPython notebook).
In [16]: ar.wait_interactive()
48/100 tasks finished after 15 s
When this wait_interactive
function is called, the job execution status is displayed on the shell every second. The above display shows that 48 out of 100 jobs have been completed 15 seconds after the start of job execution. Since the time required for one job is 10 seconds and 48 engines are used at the same time, 48 jobs are completed in 10 seconds from the start of execution, and another 48 jobs are completed between 10 seconds and 20 seconds. Will be executed. When all the jobs are completed, the following will be displayed.
In [16]: ar.wait_interactive()
100/100 tasks finished after 30 s
done
The execution of each job is asynchronous, but the wait_interactive
function does not finish until all jobs are completed. If you want to stop displaying the progress of the job, you can interrupt it with [Ctrl] + [c]("Kernel"-"Interrupt" for IPython notebook). Even if the display is interrupted, the job execution will continue.
You can check the job execution result from the ʻAsyncResult` object. Let's check the execution results of the first 5 jobs.
In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]
When code
is 0
, it means that the return value of the shell (/ bin / sh
) that executed the sleep
command was 0
.
Bakapara is a modularization of the process of executing a job queue by a command on the shell on an IPython cluster. It is designed to be used from Python's interactive shell and IPython notebook. It can also be run independently from the command line.
If you want to use it as a module, download bakapara.py and place it in the directory where PYTHONPATH
passes. .. When executing from the command line, it is convenient to place it in a directory that is in PATH
.
The Bakapara object receives a list of jobs and runs them on the cluster engine. Each job can be in any format as long as it is a dictionary object that meets the following specifications.
Key | value | Example |
---|---|---|
cmd |
The command you want to execute on the engine. Since the command is actually executed via the shell, pipes and redirects can also be used. | wc -l /work/001.txt |
cd |
(Optional) Working directory when the engine executes commands. If this key does not exist, the engine working directory will not be changed. The working directory of each engine is initialized with the working directory when the Bakapara object is constructed. | /home/okazaki/projects/bakapara |
out |
(Optional) If you specify a file name for this value, the standard output when the command is executed is saved in the file. If this key does not exist, the contents of standard output will not be saved. | /work/001.txt.out |
err |
(Optional) If you specify a file name for this value, the standard error output when the command is executed is saved in the file. If this key does not exist, the contents of standard error output will not be saved. | /work/001.txt.err |
host |
(Optional) A list of hosts (engines) that can execute commands. If this key does not exist, the job is considered runnable on all engines. | ['mezcal03.cl.ecei.tohoku.ac.jp',] |
host
is used when you want to execute a specific job on a specific host. For example, if the data you want to process is distributed on the local disk of each server, you can specify the host where the data required to execute the job is located with host
. In addition, the distributed file system GFarm allows you to check which host each file on the distributed file system is located on, so processing is possible. By specifying the host where the data is located, distributed parallel processing that considers data locality such as HDFS + Hadoop can be realized.
The execution result of each job is stored (overwritten) as the value of the result
key of the dictionary object. The value of result
is a dictionary object with the following specifications.
Key | value | Example |
---|---|---|
code |
Exit code | 0 |
submitted |
Date and time the job was submitted by the client | '2014-12-13T01:17:05.593718' |
started |
The date and time when the job started running on the engine | '2014-12-13T01:17:04.559970' |
completed |
Date and time when the job execution was completed on the engine | '2014-12-13T01:17:14.566251' |
received |
Date and time when the client received the job execution result | '2014-12-13T01:17:15.614301' |
elapsed |
Time required to execute the job (completed -started ) |
'0:00:10.006281' |
engine_id |
ID (index) of the engine that executed the job | 3 |
host |
The host name of the engine that ran the job | 'mezcal06.cl.ecei.tohoku.ac.jp' |
pyerr |
Python exceptions (if any). When you specify the host name to execute the jobUnmetDependency Exception may be displayed, but this is normal. |
None |
pyout |
Python interpreter output (if any) | None |
status |
'ok' Or'error' |
'ok' |
msg_id |
UUID of messages exchanged between client and engine | u'48fbc58b-ef73-4815-9f32-1932c01421ad' |
error |
(Exists only when a fatal error occurs) Error message | '' |
Note that there are cases where command execution fails even if status
is'ok'
. For example, if you make a mistake in writing the cmd
command and cannot execute it, status
will be 'ok'
, but the standard error output will be'/ bin / bash: 1: hoge: not found. A message like \ n'
remains. The job command is executed via / bin / bash -o pipe fail
. Therefore, if any of the piped commands returns a status code other than 0
, the return value will be stored in code
. Therefore, it is important to check the return value code
.
First, import the bakapara module and create a Bakapara
instance. The specification of the constructor of the Bakapara
class is [IPython.parallel.Client](http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.client.html#IPython.parallel.client" .client.Client) is the same. Normally, you would specify the profile name as follows.
In [1]: from bakapara import Bakapara
In [2]: bp = Bakapara(profile='mezcal')
Create a list of jobs you want to run. The following code creates a job that executes the command sleep 10
100 times.
In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]
For the sake of explanation, let's check the first job.
In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}
Use the run
method to start the job. The return value True
indicates that the job registration was successful.
In [5]: bp.run(jobs)
Out[5]: True
You can check the progress of the job with the status
method. The progress of the job is displayed every second.
In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905
This status
method does not finish executing until all jobs are completed. If you want to do something else, press [Ctrl] + [c]("Kernel"-"Interrupt" in IPython notebook) to interrupt.
When the job execution is completed, the following will be displayed.
In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117
When the job is completed, the result is written to the job list (the job list given to the run
method of the Bakapara
class). To be precise, the execution result is written to the jobs
list by calling the status
method or the wait
method after the job is completed. Even if the entire job is not completed, the results up to the point when the status
method or wait
method is called are written to the job list. To get the job execution result, call the status
method and check that the message" xx / xxx tasks finished "is displayed.
Let's check the execution result. You can access information such as the host name that executed the job, the required time, and the exit code.
In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
'result': {'code': 0,
'completed': '2014-12-13T12:26:12.665525',
'elapsed': '0:00:10.005647',
'engine_id': 11,
'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
'pyerr': None,
'pyout': None,
'received': '2014-12-13T12:24:38.638917',
'started': '2014-12-13T12:26:02.659878',
'status': u'ok',
'submitted': '2014-12-13T12:23:58.320781'}}
As we have seen, using the Bakapara
module is easy. However, although I don't understand Python at all, I think there are people who want to perform foolish distributed parallel processing. We have also prepared a command line interface for such people. When bakapara.py
is executed alone, the JSON of the job is read from the standard input and the JSON of the job execution result is written to the standard output. The input and output JSON is one job per line, and the dictionary object of the job is described in JSON on each line. The reason why the entire job is not stored in the list format is so that the execution result can be written out as soon as the job is completed.
For example, the JSON file of a job that executes 100 sleep 10
s is as follows.
$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
To execute this job and write the execution result to sleep.result.json
, execute the following command. The profile name of the IPython cluster is specified with the -p
option.
$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed
From the standard output, the executed job is returned as a dictionary object (encoded in JSON). You can see that the execution result is stored in the result
key.
$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}
While looking at the execution results of jobs running on the cluster, you may notice that the contents of the job are incorrect or that the job is out of control. In this case, the job execution will be cancelled. There are two ways to cancel a job run in Bakapara.
In the case of method 1, the running job is not canceled, and only the jobs in the job queue are canceled. Therefore, it cannot be used when a job that takes a very long time is executed or when the job runs out of control.
Method 2 is a method of forcibly terminating the running job by terminating the engine and controller in the cluster execution environment. Specifically, you will press the [Ctrl] + [c] keys on the console where ʻipcluster` is executed.
^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
Unfortunately, the current version of the LoadBalancedView
interface doesn't provide a way to stop a running job (see Resources: [[IPython-dev] Fwd: interrupt / abort parallel jobs](http: /). /mail.scipy.org/pipermail/ipython-dev/2014-March/013426.html)). In case of emergency, it is necessary to restart ʻipcluster` itself as in method 2.
When I started distributed parallel processing in a cluster environment (around 2005), I used GXP Grid & Cluster Shell. It was. GXP has the ability to run workflows like make, but I used it exclusively for stupid purposes. It's a useful tool, but it seems that development is currently stopped. GNU Parallal may have been sufficient for this purpose.
Around 2011, I had seriously considered using Hadoop, but it was difficult to manage and operate Hadoop at the university laboratory level. Also, parallelizing a small program implemented in Python with stupid paralysis requires additional work for Hadoop, so I felt that the hurdles were high for students. However, I found the distributed file system (HDFS) very convenient. Therefore, I started to use GFarm as a distributed file system. In order to process the data placed on the distributed file system in consideration of locality, we implemented our own job queue system using Paramiko. The wreckage of the job queue system is gfqsub.
Recently, however, experiments and data analysis work have begun to be performed on IPython notebook. IPython notebook is very convenient, but the affinity with command line tools is not good, so I searched for a Python library that realizes stupidity. When I looked it up, IPython cluster supported distributed parallel processing, but there was little information other than the official documentation, so I made Bakapara while reading the documentation myself.
This time, I realized stupidity using SSH, but IPython cluster seems to be able to use MPI, PBS (qsub), Windows HPC Server, Amazon EC2 in a unified manner by setting ʻipcluster_config.py`. Perhaps the Bakapara module will still work in these environments (I haven't tested it myself). In addition, IPython cluster can realize fairly advanced distributed parallel processing such as exchanging code data between the controller and engine. I would like to continue exploring what kind of interesting things can be done using these functions.
Recommended Posts