[Python] Notes on accelerating genetic algorithms using multiprocessing

Introduction

If you google with Python and parallel processing, you will see many articles about the module called multiprocessing, so I tried it with the article I wrote earlier.

[Python] Try optimizing FX systole parameters with a genetic algorithm

The genetic algorithm evaluates many individuals and performs genetic processing such as selection and crossing according to their fitness, but since the evaluation of each individual is completely independent, it is suitable for parallel processing. .. This time as well, I will apply parallel processing to the individual evaluation part.

Original code

Code for the genetic algorithm before parallelization. It is not a general purpose because it was used for optimizing the systole parameters in the above article. The code before and after is omitted.

def Optimize(ohlc, Prange):
    def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #Shift function

    SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #Long-term moving average
    for i in range(len(Prange[0])):
        SlowMA[i] = ind.iMA(ohlc, Prange[0][i])

    FastMA = np.empty([len(Prange[1]), len(ohlc)]) #Short-term moving average
    for i in range(len(Prange[1])):
        FastMA[i] = ind.iMA(ohlc, Prange[1][i])
    
    ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #Moving average for payment
    for i in range(len(Prange[2])):
        ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
    
    Close = ohlc['Close'].values #closing price
    
    M = 20 #Population
    Eval = np.zeros([M, 6])  #Evaluation item
    Param = InitParam(Prange, M) #Parameter initialization
    gens = 0 #Number of generations
    while gens < 100:
        for k in range(M):
            i0 = Param[k,0]
            i1 = Param[k,1]
            i2 = Param[k,2]
            #Buy entry signal
            BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
            #Sell entry signal
            SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
            #Buy exit signal
            BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
            #Sell exit signal
            SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
            #Backtest
            Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit) 
            Eval[k] = BacktestReport(Trade, PL)
        #Alternation of generations
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

Let it run as it is and measure the execution time.

import time

start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec

Replace with map function

In the original code, it is between the for statements that are parallelized. It takes some time to perform backtesting and evaluation with the parameters of each individual. When using multiprocessing, it seems easy to use the map method, so first try replacing the for statement with the map function.

For that, we need to make the repeating part a function, but here is a little note. If you just want to make it a map function, it is more convenient to define the function in the ʻOptimize function, but if you use multiprocessing, an error will occur. So, I defined the function with the name ʻevaluate outside the ʻOptimize` function.

For convenience of passing to map, I want to make only k the argument of the ʻevaluatefunction. Therefore, the variables of technical indicators such asSlowMA and FastMAare global variables. However,Param` is an argument of the function.

SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #Long-term moving average
for i in range(len(SlowMAperiod)):
    SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])

FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #Short-term moving average
for i in range(len(FastMAperiod)):
    FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])

ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #Moving average for payment
for i in range(len(ExitMAperiod)):
    ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])

Close = ohlc['Close'].values #closing price

#Shift function
def shift(x, n=1):
    return np.concatenate((np.zeros(n), x[:-n]))

#Function to process in parallel
def evaluate(k,Param):
    i0 = Param[k,0]
    i1 = Param[k,1]
    i2 = Param[k,2]
    #Buy entry signal
    BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
    #Sell entry signal
    SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
    #Buy exit signal
    BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
    #Sell exit signal
    SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
    #Backtest
    Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit) 
    return BacktestReport(Trade, PL)

The following code is replaced with the map function instead of the for statement.

import functools

def Optimize(ohlc, Prange):
    M = 20 #Population
    Eval = np.zeros([M, 4])  #Evaluation item
    Param = InitParam(Prange, M) #Parameter initialization
    gens = 0 #Number of generations
    while gens < 100:
        #for k in range(M): Eval[k] = evaluate(k,Param)
        Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
        #Alternation of generations
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

Actually, it wasn't easy to write with just map. I put the repeating function ʻevaluate in the first argument of the map function, but since there are two arguments of the ʻevaluate function, the second argument Param should be fixed to Param``. I'm using functools.partial.

Also, the return value of map is converted to NumPy array, but it seems that it must be converted to list before that. (It seems that it depends on the version of Python. I tried Python 3.5.1 this time.)

When I did this, I got the following results:

elapsed_time = 11.157917446009389 sec

Even if you change the for statement to map, the execution time does not change much.

multiprocessing

It's easy to introduce multiprocessing if it is replaced by the map function.

import functools
import multiprocessing as mp

def Optimize(ohlc, Prange):
    M = 20 #Population
    Eval = np.zeros([M, 4])  #Evaluation item
    Param = InitParam(Prange, M) #Parameter initialization
    pool = mp.Pool() #Creating a process pool
    gens = 0 #Number of generations
    while gens < 100:
        #for k in range(M): Eval[k] = evaluate(k,Param)
        Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
        #Alternation of generations
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

Just create a process pool with the Pool class and replace the map part with pool.map. Specify the number of processes with the argument of Pool. If no argument is written, all CPU threads will be used.

You can use all threads for simple processing, but since there is other code, using a little over half of the threads was the fastest.

Since it was 8 threads of Core i7, the result of running with Pool (5) was

elapsed_time = 5.766524394366197 sec

is. It's about twice as fast. I expected it to be a little faster, but probably because there was genetic processing other than repeating the individual. For systems that take longer to backtest, parallelization may be a little more effective.

Recommended Posts

[Python] Notes on accelerating genetic algorithms using multiprocessing
Notes on using MeCab from Python
Notes on installing Python using PyEnv
Notes on using rstrip with python.
Notes on using code formatter in Python
Notes on installing Python3 and using pip on Windows7
Notes on using dict in python [Competition Pro]
Notes using Python subprocesses
Notes on using Alembic
Minimum notes when using Python on Mac (Homebrew edition)
Python notes using perl-ternary operator
Python notes using perl-special variables
[Django] Notes on using django-debug-toolbar
[Python] Notes on data analysis
Notes on optimization using Pytorch
Notes on installing Python on Mac
Broadcast on LINE using python
Notes on installing Python on CentOS
How to know the number of GPUs from python ~ Notes on using multiprocessing with pytorch ~
Notes on Python and dictionary types
Introducing Python using pyenv on Ubuntu 20.04
Preparing python using vscode on ubuntu
Notes on using post-receive and post-merge
Algorithm generation automation using genetic algorithms
Study on Tokyo Rent Using Python (3-2)
Notes on accessing dashDB from python
Study on Tokyo Rent Using Python (3-3)
Getting Started with Python Genetic Algorithms
Notes on using matplotlib on the server
Install Python on CentOS using pyenv
(Beginner) Notes on using pyenv on Mac
Notes using cChardet and python3-chardet in Python 3.3.1.
Execute Python code on C ++ (using Boost.Python)
Notes on PyQ machine learning python grammar
Detect "brightness" using python on Raspberry Pi 3!
Notes on nfc.ContactlessFrontend () for nfcpy in python
Install python library on Lambda using [/ tmp]
Notes on doing Japanese OCR with Python
Notes on building Python and pyenv on Mac
Notes on implementing APNs tests using Pytest
Notes on using OpenCL on Linux on the RX6800
Run servomotor on Raspberry Pi 3 using python
Notes on setting pyenv and python environment using Homebrew on Mac OS Marvericks
Notes for using python (pydev) in eclipse
Video processing using Python + OpenCV on Mac
Build Python3.5 + matplotlib environment on Ubuntu 12 using Anaconda
Detect slide switches using python on Raspberry Pi 3!
Python: Try using the UI on Pythonista 3 on iPad
Python development on Ubuntu on AWS EC2 (using JupyterLab)
Sound the buzzer using python on Raspberry Pi 3!
Notes for using TensorFlow on Bash on Ubuntu on Windows
Notes on oct2py calling Octave scripts from Python
Python scraping notes
Python study notes _000
Python learning notes
Python on Windows
Python beginner notes
Python study notes_006
Notes on Flask
Start using Python
python C ++ notes