I wanted to parallelize the process of executing pd.read_sql
multiple times with multiprocessing
.
Versions
Name | Version |
---|---|
python | 3.7.3 |
pandas | 0.24.2 |
numpy | 1.16.2 |
psycopg2-binary | 2.8.4 |
PostgreSQL | 11.5 |
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
When I rewrote the bottom of the code in the above success example with multiprocessing
and modifiedfunction ()
to eat variables accordingly, an error was output.
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
multiprocessing.pool.RemoteTraceback:
#Omission
psycopg2.OperationalError: lost synchronization with server: got message type
#Omission
psycopg2.InterfaceError: connection already closed
#Omission
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
#Omission
In the code of the above failure example, it could be avoided by writing the connection information in function ()
.
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)Some processing including
return
The following error will occur depending on the number of parallels and the processing content of function ()
, but it can be avoided by setting ʻexport OMP_NUM_THREADS = 1`.
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.
Recommended Posts