I wanted to parallelize the process of executing pd.read_sql multiple times with multiprocessing.
Versions
| Name | Version |
|---|---|
| python | 3.7.3 |
| pandas | 0.24.2 |
| numpy | 1.16.2 |
| psycopg2-binary | 2.8.4 |
| PostgreSQL | 11.5 |
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
When I rewrote the bottom of the code in the above success example with multiprocessing and modifiedfunction ()to eat variables accordingly, an error was output.
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
multiprocessing.pool.RemoteTraceback:
#Omission
psycopg2.OperationalError: lost synchronization with server: got message type
#Omission
psycopg2.InterfaceError: connection already closed
#Omission
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
#Omission
In the code of the above failure example, it could be avoided by writing the connection information in function ().
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
The following error will occur depending on the number of parallels and the processing content of function (), but it can be avoided by setting ʻexport OMP_NUM_THREADS = 1`.
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.
Recommended Posts