Qu'utilisez-vous pour frapper MySQL à partir de python? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ Je pense qu'il y a beaucoup de gens qui utilisent autour). Je l'ai beaucoup utilisé et j'ai fini avec SQL Alchemy, mais quand il s'agit de dizaines de millions ou de centaines de millions de commandes de données, c'est lent et très ennuyeux.
Donc cette fois (à part l'histoire de ne pas utiliser sql pour les données volumineuses ou python pour le traitement à grande vitesse), je laisserai un mémorandum divisé en plusieurs éléments sur la façon d'accélérer le traitement des données à l'aide de SQL Alchemy. Mettre.
SQLAlchemy est écrit comme ~, mais il s'agit d'un désordre de conseils d'accélération de python lors de l'INSERT et de la sélection de données dans la base de données à l'aide de SQLAlchemy. En fait, il vaut mieux séparer les articles, mais je suis heureux qu'ils soient organisés quand je me repense, alors je l'ai fait.
Tout le code utilisé cette fois-ci est en github, alors jetez-y également un œil.
La structure DB utilisée dans ce test est la suivante.
user table
id | name | age | team_id | created_at | updated_at |
---|---|---|---|---|---|
1 | John1 | 12 | 4 | 1486030539 | 1486030539 |
2 | Kevin2 | 54 | 12 | 1486030539 | 1486030539 |
... |
team table
id | name | created_at | updated_at |
---|---|---|---|
1 | A | 1486030539 | 1486030539 |
2 | B | 1486030539 | 1486030539 |
... |
l'utilisateur dispose d'une clé externe pour l'équipe.
Tout d'abord, enregistrez les données dans les tables des équipes et des utilisateurs.
team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]
Commencez à partir de l'endroit où vous avez les données. Le nombre d'équipes est de 26 de A à Z et le nombre d'utilisateurs est de 100 000.
Comme prévu, je ne ferai pas cela depuis le début, mais je partirai de là pour comparaison.
class Base(object):
def __iter__(self):
return iter(self.__dict__.items())
def dict(self):
return self.__dict__
@classmethod
def query(cls):
if not hasattr(cls, "_query"):
cls._query = database.session().query_property()
return cls._query
class User(Base):
def __repr__(self):
return '<User %r>' % (self.id)
def __init__(self, name, age, team):
self.name = name
self.age = age
self.team = team
self.updated_at = time.time()
self.created_at = time.time()
@staticmethod
def create_dict(name, age, team_id):
return {'name': name, 'age': age, 'team_id': team_id,
'updated_at': time.time(), 'created_at': time.time()}
signup_user = Table('user', metadata,
Column('id', BigInteger, nullable=False,
primary_key=True, autoincrement=True),
Column('name', Unicode(255), nullable=False),
Column('age', Integer, nullable=False),
Column('team_id', ForeignKey('team.id'), nullable=False),
Column('updated_at', BigInteger, nullable=False),
Column('created_at', BigInteger, nullable=False))
mapper(User, signup_user,
properties={
'id': signup_user.c.id,
'name': signup_user.c.name,
'age': signup_user.c.age,
'team': relationship(Team),
'updated_at': signup_user.c.updated_at,
'created_at': signup_user.c.created_at
})
User.__table__ = signup_user
Créez un objet Table comme celui-ci. La préparation de la session côté DB est comme ça
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=_engine))
metadata.create_all(bind=_engine)
La partie pour enregistrer l'équipe est omise,
def insert_user(name, age, team):
u = User(name, age, team)
session.add(u)
session.commit()
teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]
Créez des utilisateurs un par un comme celui-ci, ajoutez et validez.
Évidemment, 0. est inefficace, alors utilisez ʻadd_all`, qui vous permet d'ajouter plusieurs enregistrements à la fois.
users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()
C'était beaucoup plus propre comme code.
Il y a bulk_save_objects dans l'ORM de SQLAlchemy.
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class UserTable(Base):
__tablename__ = "user"
id = Column(BigInteger, nullable=False,
primary_key=True, autoincrement=True)
name = Column(Unicode(255), nullable=False)
age = Column(Integer, nullable=False)
team_id = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
created_at = Column(BigInteger, nullable=False)
Créez un objet Table comme
session.bulk_save_objects(
[UserTable(name=d[0],
age=d[1],
team_id=team_dict[d[2]].id,
updated_at = time.time(),
created_at = time.time())
for d in data_list], return_defaults=True)
session.commit()
Insérez-le comme ça. Vous pouvez voir que la gestion des clés externes, etc. a changé.
ORM présente divers avantages, tels que la facilité d'utilisation, le raccourcissement du code et l'exécution de contrôles fins tels que la restauration en arrière-plan, mais la surcharge de la génération de requêtes est importante et cela devient un goulot d'étranglement lorsqu'on envisage d'accélérer. Utiliser sqlalchemy.core peut être un problème, mais il peut émettre des requêtes plus rapidement qu'avec ORM. ..
users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()
Comparons ensemble les vitesses de 0 à 3.
SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]
sqlalchemy.core ... Accablant ... !! Il est 5 ou 6 fois plus rapide que l'insert en vrac.
Même si vous utilisez ORM, il semble bon d'utiliser l'insert en vrac.
Au moment de l'insertion en bloc, j'ai senti qu'il était plus rapide de se diviser lors de l'insertion d'une certaine quantité de données, j'ai donc essayé d'utiliser sqlalchemy.core (1 million de cas) ..
SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]
Eh bien, il semble que c'était à cause de mon esprit ...
Utilisez les données d'équipe et d'utilisateur enregistrées précédemment. Le nombre d'équipes est de 26 de A à Z et le nombre d'utilisateurs est de 1 million.
Tout d'abord, simplement, par ordre d'âge, [{'id': 10, 'name': 'John', 'age': 34, 'team': 'K'}, {...}, ...]
Créez un processus qui renvoie une liste de dictionnaires limite (100 cette fois) comme.
Évidemment, réglons d'abord le côté MySQL. Dans cet exemple, il suffit de coller l'index dans user.age pour accélérer le traitement environ 10 fois. Puisqu'il existe déjà divers articles sur le réglage, je vais l'omettre cette fois.
users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
for u in users]
court! C'est bon. Je suis reconnaissant que le code soit particulièrement court lors du chevauchement de tables connectées par des clés externes.
Utilisez la fonction de sélection de sqlalchemy.
from sqlalchemy import select, desc, and_, func
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
for r in session.execute(sel)]
C'est devenu assez long.
Dans cet exemple, il n'est pas directement lié au traitement SQL, mais si le nombre de données augmente, la vitesse de traitement s'améliorera considérablement si un traitement parallèle est effectué. Pour savoir comment l'utiliser, j'ai déjà écrit un article, alors jetez un œil là-bas.
from multiprocessing import Pool
import multiprocessing as multi
def get_user(r):
return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
def select_user_multi():
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
p = Pool(multi.cpu_count())
result = p.map(get_user, session.execute(sel))
p.close()
return result
sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]
C'est ... il est tard. Cela peut être dû au fait que la requête était simple. Le résultat du multitraitement mordant est beaucoup plus rapide.
Ensuite, comptons le nombre d'utilisateurs pour chaque équipe. Compliquez un peu la requête et comptez le nombre d'utilisateurs de moins de 50 ans appartenant à chaque équipe et créez des données comme {'A': 1400, 'B': 2122, ....}
Effectuer le traitement.
Veuillez vous reporter ici (Notes sur la gestion de COUNT () dans InnoDB) pour accélérer le processus de comptage lui-même. Devenir
ORM
def select_teams_orm():
return Team.query().all()
teams = select_teams_orm()
counts = {tm.name: User.query()\
.filter(and_(User.team == tm, User.age < 50)).count()\
for tm in teams}
Aussi court que jamais!
sqlalchemy.core
def select_teams_core():
t = Team.__table__.c
sel = select([t.id, t.name]).select_from(Team.__table__)
res = session.execute(sel)
result = [{'id': r[0], 'name': r[1]} for r in res]
return result
teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}
Difficile à comprendre! Il est écrit comme ci-dessus pour finir avec une boucle, mais une fois démonté
def create_query(team_id): #Création de requêtes Affiner par ID d'équipe et âge de l'utilisateur
u = User.__table__.c
return select([func.count()]).select_from(User.__table__)\
.where(add_(u.team_id == team_id, u.age < 50))
queries = [create_query(tm['id']) for tm in teams] #Créer une requête pour chaque équipe
counts = [session.execute(q) for q in queries] #Question de problème
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Créer un dictionnaire
C'est comme ça. En outre, la quantité de code a considérablement augmenté.
multiprocessing
Ensuite, mettons en parallèle à partir de l'endroit où la requête SELECT est lancée.
Il y a deux points à noter: ** La parallélisation de Joblib ne peut pas être utilisée, et la session créée avec scoped_session
ne peut pas être utilisée en traitement parallèle **.
session = sessionmaker(autocommit=False, # scoped_la session n'est pas bonne
autoflush=True,
expire_on_commit=False,
bind=_engine)
def count_user(team):
u = User.__table__.c
sel = select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == team['id'], u.age < 50))
result = session.execute(sel).scalar()
return result
def count_user_multi():
teams = select_teams_core()
p = Pool(multi.cpu_count())
counts = p.map(count_user, teams)
counts = {t['name']: c for t, c in zip(teams, counts)}
p.close()
session.close()
return counts
Cette fois, je lance une requête qui compte pour chaque équipe, mais en premier lieu
SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;
Si tel est le cas, vous n'avez besoin de lancer la requête qu'une seule fois, alors modifiez-la.
u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}
sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]
Cette fois, l'utilisation du noyau est un peu plus rapide que l'ORM. Il semble que la parallélisation soit environ 10 fois plus rapide. De plus, lorsque la requête a été améliorée pour pouvoir être lancée une fois, elle était un peu plus de trois fois plus rapide que la requête d'origine, mais elle n'était pas loin de la requête parallélisée.
Il était plus lent d'utiliser le noyau auparavant, mais pour chaque équipe, 100 personnes sont acquises par ordre décroissant d'âge et les données suivantes sont renvoyées.
[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
{'id': 2, 'name': 'B', 'users': [...]},
...]
Je vais omettre le code (voir github pour plus de détails), mais le résultat est le suivant.
sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]
Dans le cas de cet exemple, la différence entre ORM et core est d'environ 0,1 s, vous n'aurez donc peut-être pas à vous en soucier, mais comme elle est d'environ 1 million, la requête devient plus compliquée et le nombre de cas augmente. Dans ce cas, il semble utile d'utiliser sqlalchemy.core et de le paralléliser.
Comme mentionné ci-dessus, je l'ai vu comme INSERT et SELECT, mais en résumé,
était. La bonté d'ORM est presque tuée, et il est dit que vous devriez plus utiliser un autre langage, mais lorsque vous devez utiliser python, il semble bon de l'utiliser correctement en fonction de la taille des données et du contenu de l'application.
Recommended Posts