Je voulais paralléliser le processus d'exécution de pd.read_sql
plusieurs fois avec multiprocessing
.
Versions
Name | Version |
---|---|
python | 3.7.3 |
pandas | 0.24.2 |
numpy | 1.16.2 |
psycopg2-binary | 2.8.4 |
PostgreSQL | 11.5 |
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Certains traitements, y compris
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
Lorsque j'ai réécrit le bas du code dans l'exemple de réussite ci-dessus avec multiprocessing
et modifié function ()
afin que les variables puissent être mangées en conséquence, une erreur a été générée.
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Certains traitements, y compris
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
multiprocessing.pool.RemoteTraceback:
#Omission
psycopg2.OperationalError: lost synchronization with server: got message type
#Omission
psycopg2.InterfaceError: connection already closed
#Omission
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
#Omission
Dans le code de l'exemple d'échec ci-dessus, cela pourrait être évité en écrivant les informations de connexion dans function ()
.
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Certains traitements, y compris
return
L'erreur suivante se produira en fonction du nombre de parallèles et du contenu de traitement de function ()
, mais elle peut être évitée en définissant ʻexport OMP_NUM_THREADS = 1`.
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.
Recommended Posts