[Python] Faites de votre mieux pour accélérer SQL Alchemy

introduction

Qu'utilisez-vous pour frapper MySQL à partir de python? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ Je pense qu'il y a beaucoup de gens qui utilisent autour). Je l'ai beaucoup utilisé et j'ai fini avec SQL Alchemy, mais quand il s'agit de dizaines de millions ou de centaines de millions de commandes de données, c'est lent et très ennuyeux.

Donc cette fois (à part l'histoire de ne pas utiliser sql pour les données volumineuses ou python pour le traitement à grande vitesse), je laisserai un mémorandum divisé en plusieurs éléments sur la façon d'accélérer le traitement des données à l'aide de SQL Alchemy. Mettre.

SQLAlchemy est écrit comme ~, mais il s'agit d'un désordre de conseils d'accélération de python lors de l'INSERT et de la sélection de données dans la base de données à l'aide de SQLAlchemy. En fait, il vaut mieux séparer les articles, mais je suis heureux qu'ils soient organisés quand je me repense, alors je l'ai fait.

Tout le code utilisé cette fois-ci est en github, alors jetez-y également un œil.

Environnement d'exécution

Structure de la base de données

La structure DB utilisée dans ce test est la suivante.

user table

id name age team_id created_at updated_at
1 John1 12 4 1486030539 1486030539
2 Kevin2 54 12 1486030539 1486030539
...

team table

id name created_at updated_at
1 A 1486030539 1486030539
2 B 1486030539 1486030539
...

l'utilisateur dispose d'une clé externe pour l'équipe.

INSÉRER

Tout d'abord, enregistrez les données dans les tables des équipes et des utilisateurs.

team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]

Commencez à partir de l'endroit où vous avez les données. Le nombre d'équipes est de 26 de A à Z et le nombre d'utilisateurs est de 100 000.

0. Créez les objets SQLAlchemy Table un par un et insérez-les un par un

Comme prévu, je ne ferai pas cela depuis le début, mais je partirai de là pour comparaison.

class Base(object):

    def __iter__(self):
        return iter(self.__dict__.items())

    def dict(self):
        return self.__dict__

    @classmethod
    def query(cls):
        if not hasattr(cls, "_query"):
            cls._query = database.session().query_property()
        return cls._query
class User(Base):

    def __repr__(self):
        return '<User %r>' % (self.id)

    def __init__(self, name, age, team):
        self.name = name
        self.age = age
        self.team = team
        self.updated_at = time.time()
        self.created_at = time.time()

    @staticmethod
    def create_dict(name, age, team_id):
        return {'name': name, 'age': age, 'team_id': team_id,
                'updated_at': time.time(), 'created_at': time.time()}


signup_user = Table('user', metadata,
                    Column('id', BigInteger, nullable=False,
                           primary_key=True, autoincrement=True),
                    Column('name', Unicode(255), nullable=False),
                    Column('age', Integer, nullable=False),
                    Column('team_id', ForeignKey('team.id'), nullable=False),
                    Column('updated_at', BigInteger, nullable=False),
                    Column('created_at', BigInteger, nullable=False))

mapper(User, signup_user,
       properties={
           'id': signup_user.c.id,
           'name': signup_user.c.name,
           'age': signup_user.c.age,
           'team': relationship(Team),
           'updated_at': signup_user.c.updated_at,
           'created_at': signup_user.c.created_at
       })

User.__table__ = signup_user

Créez un objet Table comme celui-ci. La préparation de la session côté DB est comme ça

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker

metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
                                               autoflush=True,
                                               expire_on_commit=False,
                                               bind=_engine))

metadata.create_all(bind=_engine)

La partie pour enregistrer l'équipe est omise,

def insert_user(name, age, team):
    u = User(name, age, team)
    session.add(u)
    session.commit()

teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]

Créez des utilisateurs un par un comme celui-ci, ajoutez et validez.

1. Insérez plusieurs éléments ensemble.

Évidemment, 0. est inefficace, alors utilisez ʻadd_all`, qui vous permet d'ajouter plusieurs enregistrements à la fois.

users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()

C'était beaucoup plus propre comme code.

  1. bulk insert

Il y a bulk_save_objects dans l'ORM de SQLAlchemy.

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class UserTable(Base):
    __tablename__ = "user"
    id = Column(BigInteger,  nullable=False,
                primary_key=True, autoincrement=True)
    name = Column(Unicode(255), nullable=False)
    age = Column(Integer, nullable=False)
    team_id = Column(BigInteger, nullable=False)
    updated_at = Column(BigInteger, nullable=False)
    created_at = Column(BigInteger, nullable=False)

Créez un objet Table comme

session.bulk_save_objects(
    [UserTable(name=d[0],
               age=d[1],
               team_id=team_dict[d[2]].id,
               updated_at = time.time(),
               created_at = time.time())
     for d in data_list], return_defaults=True)
session.commit()

Insérez-le comme ça. Vous pouvez voir que la gestion des clés externes, etc. a changé.

3. Utilisez sqlalchemy.core

ORM présente divers avantages, tels que la facilité d'utilisation, le raccourcissement du code et l'exécution de contrôles fins tels que la restauration en arrière-plan, mais la surcharge de la génération de requêtes est importante et cela devient un goulot d'étranglement lorsqu'on envisage d'accélérer. Utiliser sqlalchemy.core peut être un problème, mais il peut émettre des requêtes plus rapidement qu'avec ORM. ..

users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
          'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()

Comparaison

Comparons ensemble les vitesses de 0 à 3.

SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]

sqlalchemy.core ... Accablant ... !! Il est 5 ou 6 fois plus rapide que l'insert en vrac.

Même si vous utilisez ORM, il semble bon d'utiliser l'insert en vrac.

Bonus. Pas besoin de fractionner l'insertion groupée?

Au moment de l'insertion en bloc, j'ai senti qu'il était plus rapide de se diviser lors de l'insertion d'une certaine quantité de données, j'ai donc essayé d'utiliser sqlalchemy.core (1 million de cas) ..

SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]

Eh bien, il semble que c'était à cause de mon esprit ...

SELECT édition

Utilisez les données d'équipe et d'utilisateur enregistrées précédemment. Le nombre d'équipes est de 26 de A à Z et le nombre d'utilisateurs est de 1 million.

Tout d'abord, simplement, par ordre d'âge, [{'id': 10, 'name': 'John', 'age': 34, 'team': 'K'}, {...}, ...] Créez un processus qui renvoie une liste de dictionnaires limite (100 cette fois) comme.

0. Réglage du côté MySQL (Index etc.)

Évidemment, réglons d'abord le côté MySQL. Dans cet exemple, il suffit de coller l'index dans user.age pour accélérer le traitement environ 10 fois. Puisqu'il existe déjà divers articles sur le réglage, je vais l'omettre cette fois.

1. Utilisez ORM

users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
          for u in users]

court! C'est bon. Je suis reconnaissant que le code soit particulièrement court lors du chevauchement de tables connectées par des clés externes.

2. Utilisez sqlalchemy.core

Utilisez la fonction de sélection de sqlalchemy.

from sqlalchemy import select, desc, and_, func

u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
       .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
       .order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
          for r in session.execute(sel)]

C'est devenu assez long.

3. Combiné avec le multitraitement

Dans cet exemple, il n'est pas directement lié au traitement SQL, mais si le nombre de données augmente, la vitesse de traitement s'améliorera considérablement si un traitement parallèle est effectué. Pour savoir comment l'utiliser, j'ai déjà écrit un article, alors jetez un œil là-bas.

from multiprocessing import Pool
import multiprocessing as multi


def get_user(r):
    return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}


def select_user_multi():
    u = User.__table__.c
    t = Team.__table__.c
    sel = select([u.id, u.name, u.age, t.name])\
          .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
          .order_by(desc(u.age)).limit(limit)
    p = Pool(multi.cpu_count())
    result = p.map(get_user, session.execute(sel))
    p.close()
    return result

Comparaison

sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]

C'est ... il est tard. Cela peut être dû au fait que la requête était simple. Le résultat du multitraitement mordant est beaucoup plus rapide.

Bonus 1. compte

Ensuite, comptons le nombre d'utilisateurs pour chaque équipe. Compliquez un peu la requête et comptez le nombre d'utilisateurs de moins de 50 ans appartenant à chaque équipe et créez des données comme {'A': 1400, 'B': 2122, ....} Effectuer le traitement.

Veuillez vous reporter ici (Notes sur la gestion de COUNT () dans InnoDB) pour accélérer le processus de comptage lui-même. Devenir

ORM

def select_teams_orm():
    return Team.query().all()

teams = select_teams_orm()
counts = {tm.name: User.query()\
              .filter(and_(User.team == tm, User.age < 50)).count()\
              for tm in teams}

Aussi court que jamais!

sqlalchemy.core

def select_teams_core():
    t = Team.__table__.c
    sel = select([t.id, t.name]).select_from(Team.__table__)
    res = session.execute(sel)
    result = [{'id': r[0], 'name': r[1]} for r in res]
    return result

teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
        select([func.count()]).select_from(User.__table__)\
        .where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}

Difficile à comprendre! Il est écrit comme ci-dessus pour finir avec une boucle, mais une fois démonté

def create_query(team_id): #Création de requêtes Affiner par ID d'équipe et âge de l'utilisateur
    u = User.__table__.c
    return select([func.count()]).select_from(User.__table__)\
             .where(add_(u.team_id == team_id, u.age < 50))

queries = [create_query(tm['id']) for tm in teams] #Créer une requête pour chaque équipe
counts = [session.execute(q) for q in queries] #Question de problème
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Créer un dictionnaire

C'est comme ça. En outre, la quantité de code a considérablement augmenté.

multiprocessing

Ensuite, mettons en parallèle à partir de l'endroit où la requête SELECT est lancée. Il y a deux points à noter: ** La parallélisation de Joblib ne peut pas être utilisée, et la session créée avec scoped_session ne peut pas être utilisée en traitement parallèle **.

session = sessionmaker(autocommit=False, # scoped_la session n'est pas bonne
                       autoflush=True,
                       expire_on_commit=False,
                       bind=_engine)

def count_user(team):
    u = User.__table__.c
    sel = select([func.count()]).select_from(User.__table__)\
          .where(and_(u.team_id == team['id'], u.age < 50))
    result = session.execute(sel).scalar()
    return result


def count_user_multi():
    teams = select_teams_core()
    p = Pool(multi.cpu_count())
    counts = p.map(count_user, teams)
    counts = {t['name']: c for t, c in zip(teams, counts)}
    p.close()
    session.close()
    return counts

Améliorations des requêtes

Cette fois, je lance une requête qui compte pour chaque équipe, mais en premier lieu

SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;

Si tel est le cas, vous n'avez besoin de lancer la requête qu'une seule fois, alors modifiez-la.

u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
      .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
      .where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}

Comparaison

sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]

Cette fois, l'utilisation du noyau est un peu plus rapide que l'ORM. Il semble que la parallélisation soit environ 10 fois plus rapide. De plus, lorsque la requête a été améliorée pour pouvoir être lancée une fois, elle était un peu plus de trois fois plus rapide que la requête d'origine, mais elle n'était pas loin de la requête parallélisée.

Bonus 2. Une requête un peu plus compliquée

Il était plus lent d'utiliser le noyau auparavant, mais pour chaque équipe, 100 personnes sont acquises par ordre décroissant d'âge et les données suivantes sont renvoyées.

[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
 {'id': 2, 'name': 'B', 'users': [...]},
  ...]

Je vais omettre le code (voir github pour plus de détails), mais le résultat est le suivant.

sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]

Dans le cas de cet exemple, la différence entre ORM et core est d'environ 0,1 s, vous n'aurez donc peut-être pas à vous en soucier, mais comme elle est d'environ 1 million, la requête devient plus compliquée et le nombre de cas augmente. Dans ce cas, il semble utile d'utiliser sqlalchemy.core et de le paralléliser.

en conclusion

Comme mentionné ci-dessus, je l'ai vu comme INSERT et SELECT, mais en résumé,

était. La bonté d'ORM est presque tuée, et il est dit que vous devriez plus utiliser un autre langage, mais lorsque vous devez utiliser python, il semble bon de l'utiliser correctement en fonction de la taille des données et du contenu de l'application.

Les références

Recommended Posts

[Python] Faites de votre mieux pour accélérer SQL Alchemy
Numba pour accélérer en Python
Comment accélérer les calculs Python
[Python] Comment faire PCA avec Python
Pour faire une récursion avec Python2
Que faire avec la sortie de PYTHON?
Accélérez grossièrement Python avec numba
Projet Euler 4 Tentative d'accélération
[DRF] Extrait pour accélérer PrimaryKeyRelatedField
N'écrivez pas Python si vous voulez l'accélérer avec Python
[Python] Hit Keras depuis TensorFlow et TensorFlow depuis c ++ pour accélérer l'exécution.
Indispensable si vous utilisez Python! Comment utiliser Numpy pour accélérer les calculs!
Comment accélérer la belle instanciation de soupe
Comment faire R chartr () en Python
Que faire si ipython et python démarrent avec des versions différentes
Comment faire un test de sac avec python
[Python] Présentez UIKit3 au projet Django
Entrée où les débutants en Python font de leur mieux pour frapper petit à petit 100 processus de langage
Appelez Rust de Python pour accélérer! Tutoriel PyO3: Partie des classes d'habillage ➀
Organiser les outils Python pour accélérer le mouvement initial des compétitions d'analyse de données
Appelez Rust de Python pour accélérer! Tutoriel PyO3: partie des classes d'emballage ➁
Le programme Python est lent! Je veux accélérer! Dans ce cas ...
Vous voulez ajouter des indices de type aux décorateurs Python?
Je veux faire le test de Dunnett en Python
Comment faire un traitement parallèle multicœur avec python
[Python] Ce que j'ai fait pour faire un test unitaire
Comment accélérer Scicit-Learn comme Conda Numpy
Implémentation minimale d'Union Find en Python
Vitesse: ajouter un élément à la fin du tableau Python
Que faire pour obtenir une feuille de calcul Google en Python
"Backport" vers python 2
Le meilleur outil pour protéger votre vie privée des photos ...!
De la configuration du Raspberry Pi à l'installation de l'environnement Python
Essais et erreurs pour accélérer la génération de cartes thermiques
Mémo pour créer votre propre Box avec le Python de Pepper
Essai et erreur pour accélérer les captures d'écran Android
Comment configurer un environnement Python à l'aide de pyenv
[Introduction à l'application Udemy Python3 +] 66. Création de votre propre exception
Comment faire un calcul de hachage avec Salt en Python
Essayez d'améliorer votre propre quiz d'introduction avec Python
Étapes pour installer le dernier Python sur votre Mac
[Python débutant] Si __name__ == Déplacez votre main pour comprendre '__ main__'.
Examen de atcoder ABC158, jusqu'à la question E (Python)
[Road to Intermediate Python] Définissez dans votre propre classe
Que faire lorsque "impossible d'importer le nom xxx" [Python]
Temps de multiplication à plusieurs chiffres jusqu'à 300 millions de chiffres en python
Comment configurer et compiler l'environnement Cython
Pour faire l'équivalent de Ruby ObjectSpace._id2ref en Python
Je veux faire quelque chose avec Python à la fin
Appelez Rust de Python pour accélérer! Tutoriel PyO3: Emballage d'une partie de fonction simple ➁