Si vous recherchez sur Google avec Python et le traitement parallèle, vous verrez de nombreux articles sur le module appelé multiprocessing, alors je l'ai essayé avec l'article que j'ai écrit plus tôt.
[Python] Essayez d'optimiser les paramètres de systole FX avec un algorithme génétique
L'algorithme génétique évalue de nombreux individus et effectue un traitement génétique tel que la sélection et le croisement en fonction de leur adaptabilité, mais comme l'évaluation de chaque individu est complètement indépendante, il convient au traitement parallèle. .. Cette fois également, j'appliquerai un traitement parallèle à la partie évaluation individuelle.
Code pour l'algorithme génétique avant la parallélisation. Ce n'est pas un usage général car il a été utilisé pour optimiser les paramètres de systole dans l'article ci-dessus. Le code avant et après est omis.
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #Fonction Shift
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #Moyenne mobile à long terme
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #Moyenne mobile à court terme
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #Moyenne mobile pour le paiement
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #le dernier prix
M = 20 #Nombre d'individus
Eval = np.zeros([M, 6]) #Élément d'évaluation
Param = InitParam(Prange, M) #Initialisation des paramètres
gens = 0 #Nombre de générations
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Acheter le signal d'entrée
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Vendre un signal d'entrée
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Acheter un signal de sortie
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Vendre un signal de sortie
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
#Changement générationnel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Laissez-le fonctionner tel quel et mesurez le temps d'exécution.
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec
Dans le code d'origine, c'est entre les instructions for
qui sont parallélisées. Il faut un certain temps pour effectuer un backtesting et une évaluation avec les paramètres de chaque individu. Lorsque vous utilisez le multitraitement, il semble facile d'utiliser la méthode map
, alors essayez d'abord de remplacer l'instruction for
par la fonction map
.
Pour cela, nous devons faire de la partie répétitive une fonction, mais il y a une petite note ici. Si vous voulez juste en faire une fonction map
, il est plus pratique de définir la fonction dans la fonction ʻOptimiser, mais si vous utilisez le multitraitement, une erreur se produira. Donc, j'ai défini la fonction avec le nom ʻevaluate
en dehors de la fonction ʻOptimize`.
Pour faciliter le passage à map
, je veux utiliser uniquement k
l'argument de la fonction ʻevaluate. Par conséquent, les variables des indicateurs techniques tels que «SlowMA» et «FastMA» sont des variables globales. Cependant,
Param` est un argument de la fonction.
SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #Moyenne mobile à long terme
for i in range(len(SlowMAperiod)):
SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])
FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #Moyenne mobile à court terme
for i in range(len(FastMAperiod)):
FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])
ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #Moyenne mobile pour le paiement
for i in range(len(ExitMAperiod)):
ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])
Close = ohlc['Close'].values #le dernier prix
#Fonction Shift
def shift(x, n=1):
return np.concatenate((np.zeros(n), x[:-n]))
#Fonction à traiter en parallèle
def evaluate(k,Param):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#Acheter le signal d'entrée
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#Vendre un signal d'entrée
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#Acheter un signal de sortie
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#Vendre un signal de sortie
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#Backtest
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
return BacktestReport(Trade, PL)
Le code suivant est remplacé par la fonction map
au lieu de l'instruction for
.
import functools
def Optimize(ohlc, Prange):
M = 20 #Nombre d'individus
Eval = np.zeros([M, 4]) #Élément d'évaluation
Param = InitParam(Prange, M) #Initialisation des paramètres
gens = 0 #Nombre de générations
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Changement générationnel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
En fait, ce n'était pas facile d'écrire avec juste «map». J'ai mis la fonction répétitive ʻevaluate dans le premier argument de la fonction
map, mais comme il y a deux arguments de la fonction ʻevaluate
, le second argument Param
doit être fixé à Param``. J'utilise functools.partial
.
De plus, la valeur de retour de map
est convertie en tableau NumPy, mais il semble qu'elle doive être convertie en liste avant cela. (Il semble que cela dépend de la version de Python. J'ai essayé Python 3.5.1 cette fois.)
Quand j'ai fait cela, j'ai obtenu les résultats suivants:
elapsed_time = 11.157917446009389 sec
Même si vous changez l'instruction for
en map
, le temps d'exécution ne change pas beaucoup.
multiprocessing
Il est facile d'introduire le multitraitement s'il est remplacé par la fonction map
.
import functools
import multiprocessing as mp
def Optimize(ohlc, Prange):
M = 20 #Nombre d'individus
Eval = np.zeros([M, 4]) #Élément d'évaluation
Param = InitParam(Prange, M) #Initialisation des paramètres
pool = mp.Pool() #Créer un pool de processus
gens = 0 #Nombre de générations
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
#Changement générationnel
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Créez simplement un pool de processus avec la classe Pool
et remplacez la partie map
par pool.map
. Spécifiez le nombre de processus avec l'argument Pool
. Si aucun argument n'est écrit, tous les threads du processeur seront utilisés.
Vous pouvez utiliser tous les threads pour un traitement simple, mais comme il existe un autre code, utiliser un peu plus de la moitié des threads a été le plus rapide cette fois.
Puisqu'il s'agissait de 8 threads de Corei7, le résultat de l'exécution avec Pool (5)
elapsed_time = 5.766524394366197 sec
est. C'est environ deux fois plus rapide. Je m'attendais à ce que ce soit un peu plus rapide, mais probablement parce qu'il y avait un traitement génétique autre que la répétition de l'individu. Pour les systèmes dont le backtest est plus long, la parallélisation peut être un peu plus efficace.
Recommended Posts