Utilisez SQL Alchemy et le multitraitement

Échantillon utilisant une combinaison de multitraitement et de SQLAlchemy, qui sont des bibliothèques multi-processus pratiques de Python. Les données sont échangées entre les processus dans la file d'attente et la base de données est accessible à l'aide de la session SQLALchemy.

from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)


engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()

####################
#Définition de la table omise(Tableau des articles)
####################

class Hoge(object):
    def __init__(self):
        '''Cue pour alimenter en multi-processus et cue pour cracher'''
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.session = Session()

    def __del__(self):
        self.session.commit()
        self.session.close()

    def get_items(self, worker_num=4):
        '''Obtenir des données de DB en multi-processus
Stockez l'identifiant que vous souhaitez obtenir dans la file d'attente, accédez à la base de données dans plusieurs processus et récupérez-le
        '''
        item_ids = self.session.query(Item.id).all()

        for item_id in item_ids:
            self.in_queue.put(item_id[0])

        jobs = []
        for i in xrange(worker_num):           
            p = Process(target=self.worker, args=[])
            p.daemon = True
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()
            #Message de fin de processus
            print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)

        return True

    def worker(self):
        '''Travailleur multi-processus
        self.in_Prenez l'identifiant de la file d'attente, récupérez les données de la base de données et de vous-même.out_Stocker dans la file d'attente
        '''
        while not self.in_queue.empty():
            id = self.in_queue.get()
            print id,
            try:
                item = self.session.query(Item).filter(Item.id == id).first()
            except Exception as e:
                print 'error =>', e
                continue
            self.out_queue.put(item.name)
            print 'in_queue: {0} (out_queue: {1})'\
                    .format(self.in_queue.qsize(), self.out_queue.qsize())
        return True  #Ça ne se terminera pas sans ça

point important

Si vous créez une session avec scoped_session ici

Could not locate column in row for column …

Veuillez noter que vous pouvez obtenir une erreur telle que

Aussi,

Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe

J'ai eu une erreur en disant, mais je ne l'ai pas compris car elle n'a pas été reproduite ...

Recommended Posts

Utilisez SQL Alchemy et le multitraitement
Utiliser Enum avec SQLAlchemy
Utilisez pyrtm et RTM CLI
Utiliser DATE_FORMAT avec le filtre SQLAlchemy
métaclasse python et déclaration sqlalchemy
Utilisation et intégration de "Shodan"
Group_by avec sqlalchemy et sum
Utilisation de la base de données SQL d'Azure avec SQL Alchemy
Utilisation correcte de la méthode d'instance et de la méthode de classe
Différence entre SQLAlchemy filter () et filter_by ()
Traitement parallèle Python (multitraitement et Joblib)
Comment installer et utiliser Tesseract-OCR
[Python / matplotlib] Comprendre et utiliser FuncAnimation
Lire et utiliser des fichiers Python à partir de Python
Jetez curl et utilisez httpie
[Note] Construction et utilisation du noyau WSL2
Utilisation de sessions et de réflexions avec SQL Alchemy
Comment utiliser .bash_profile et .bashrc
Comment installer et utiliser Graphviz
Différence entre SQLAlchemy flush () et commit ()
sqlalchemy
Utilisez pytest-qt lors du pytesting PySdie et PyQt
[Python] Utiliser et et ou lors de la création de variables
Comment installer et utiliser pandas_datareader [Python]
Utilisez xticks () pour pyplot et set_xticklabels () pour Axes.
Utiliser Jupyter Lab et Jupyter Notebook avec EC2
Utilisez PIL ou Pillow avec Cygwin Python
python: Comment utiliser les locals () et globals ()
Utilisez TPU et Keras avec Google Colaboratory
Comment utiliser le zip Python et énumérer
Facile! Utilisez gensim et word2vec avec MAMP.
Compréhension complète du threading Python et du multitraitement
Comment utiliser is et == en Python
Comment utiliser SQLAlchemy / Connect avec aiomysql
Utiliser Python et MeCab avec Azure Functions
Comment utiliser les pandas Timestamp et date_range