Comment activer python3 pour exécuter des tâches lors de l'envoi de tâches de GCP Cloud Composer vers Dataflow

Que faire dans cet article

Lors de l'envoi d'une tâche depuis Cloud Composer (apache-airflow = = 11.03) à l'aide de «Dataflow Python Operator», la version du SDK côté Dataflow est désormais «Google Cloud Dataflow SDK for Python 2.5.0», dont la fin du support est prévue. Puisqu'il finira, je passerai la version de l'environnement d'exécution en Python2 à l'environnement d'exécution de Python3 de ʻApache Beam Python3.x SDK xxx`.

スクリーンショット 2020-03-04 12.19.23.png

Public cible

Environnement d'exécution

Causes possibles

Lors de l'envoi d'une tâche de Cloud Compposer à Dataflow, je pense que la cause possible de l'exécution avec Google Cloud Dataflow SDK for Python 2.5.0 est l'implémentation de Dataflow Python Operator du côté du flux d'air.

Jetez un œil à la mise en œuvre

--Initialisez la classe DataFlowHook dans la fonction ʻexecute de DataflowPythonOperator et exécutez la fonction start_python_dataflow.

class DataFlowHook(GoogleCloudBaseHook):

    def start_python_dataflow(self, job_name, variables, dataflow, py_options,
                              append_job_name=True):
        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]
        # "python2"Est codé en dur
        self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
                             label_formatter)

Dans la future implémentation, nous allons créer une commande pour envoyer une tâche à Dataflow, mais le préfixe de cette commande est toujours python2 et nous essaierons d'exécuter le fichier Dataflow tel quel, donc l'environnement d'exécution du côté Dataflow est Google Cloud Je me demande si ce sera le SDK Dataflow pour Python 2.5.0.

Solution (à partir du 03/09/2020)

Procédez comme suit dans l'ordre:

1. Installez ʻapache-beam` dans l'environnement Cloud Composer

Pour installer apache-beam, installez les quatre dépendances suivantes.

apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3

Pour l'installer, exécutez la commande suivante. (Mettez requirements.txt dans un répertoire approprié)

environment=your_composer_environment_name
location=your_location

gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}

2. Créez une classe qui hérite de DataflowPythonOperator``` DataFlowHook

Créez une classe qui hérite du flux d'air DataflowPythonOperator et DataFlowHook afin que le fichier de flux de données puisse être exécuté avec la commande python3.

Lien de référence https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dataflow_default_options': {
        'project': YOUR_PROJECT,
        'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
        'runner': 'DataflowRunner'
    }
}


class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        self,
        job_name: str,
        variables: Dict,
        dataflow: str,
        py_options: List[str],
        append_job_name: bool = True,
        py_interpreter: str = "python3"
    ):

        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)


class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to,
                             poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")


with airflow.DAG(
        dag_id="airflow_test_dataflow",
        default_args=default_args,
        schedule_interval=None) as dag:

    t1 = DummyOperator(task_id="start")
    t2 = DataFlowPython3Operator(
        py_file=DATAFLOW_PY_FILE,
        task_id="test_job",
        dag=dag)

En spécifiant py_interpreter =" python3 " dans l'argument de start_python_dataflow exécuté dans la fonction ʻexecute de la classe DataFlowPython3Operator, vous pouvez exécuter le fichier Dataflow avec la commande python3`. Je vais.

C'est correct si vous pouvez confirmer qu'il a été exécuté avec la version de ʻApache Beam Python3.6 SDK 2.15.0` comme indiqué ci-dessous.

スクリーンショット 2020-03-05 11.51.34.png スクリーンショット 2020-03-05 12.26.27.png

Remarques

Un PR modifié a été créé et fusionné dans airflow 2.0 et versions ultérieures afin que la commande python3 puisse être exécutée à l'aide de DataflowPythonOperator de airflow.

-issue jira

Recommended Posts

Comment activer python3 pour exécuter des tâches lors de l'envoi de tâches de GCP Cloud Composer vers Dataflow
Exécutez Cloud Dataflow (Python) depuis AppEngine
Comment appeler l'API Cloud à partir de GCP Cloud Functions
Comment exécuter un programme Python à partir d'un script shell
[GCP] Comment générer des journaux Cloud Functions vers Cloud Logging (Stackdriver Logging) (Python)
Comment accéder à wikipedia depuis python
Comment installer OpenCV sur Cloud9 et l'exécuter en Python
[GCP] Comment publier une URL signée Cloud Storage (URL temporaire) en Python
Comment éviter la duplication des données lors de la saisie de Python vers SQLite.
Comment se connecter à Cloud Firestore à partir de Google Cloud Functions avec du code Python
Comment mettre à jour la version Python de Cloud Shell dans GCP
Comment mettre à jour Google Sheets à partir de Python
Comment gérer l'erreur OAuth2 lors de l'utilisation des API Google à partir de Python
Comment accéder à RDS depuis Lambda (python)
Comment changer de version de Python dans cloud9
Comment exécuter des scripts Maya Python
Comment démarrer Python (Flask) au démarrage d'EC2
Étude de Python Hour7: Comment utiliser les classes
[Python] Comment lire les données de CIFAR-10 et CIFAR-100
GCP: répétez de Pub / Sub vers Cloud Functions et de Cloud Functions vers Pub / Sub
Comment exécuter MeCab sur Ubuntu 18.04 LTS Python
Comment générer un objet Python à partir de JSON
Comment bien gérer les commandes Linux à partir de Python
Comment exécuter LeapMotion avec Python non-Apple
Ce que j'ai fait lors de la mise à jour de Python 2.6 vers 2.7
Comment utiliser Ruby's PyCall pour activer Pyenv Python
Comment passer des arguments lors de l'appel d'un script python depuis Blender sur la ligne de commande
[Python] Comment exécuter Jupyter-notebook + pandas + multiprocessing (Pool) [pandas] Memo
[Python] Comment supprimer les valeurs en double de la liste
Comment récupérer des données d'image de Flickr avec Python
Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Comment exécuter python dans l'espace virtuel (pour MacOS)
Comment exécuter des tests avec Python unittest
Python - Remarques lors de la conversion du type str en type int
Comment exécuter setUp une seule fois dans Python Unittest
Comment télécharger des fichiers depuis Selenium of Python dans Chrome
Comment quitter lors de l'utilisation de Python dans Terminal (Mac)
Exécuter la fonction Python à partir de Powershell (comment passer des arguments)
[Python] Comment appeler une fonction de c depuis python (édition ctypes)
Comment installer Python
Changements de Python 3.0 à Python 3.5
Changements de Python 2 à Python 3.0
Comment installer python
Exécutez Python à partir d'Excel
Tutoriel Cloud Run (python)
Comment découper un bloc de plusieurs tableaux à partir d'un multiple en Python
Comment exécuter un fichier Python à une invite de commande Windows 10
Ne perdez pas contre Ruby! Comment exécuter Python (Django) sur Heroku
Comment lancer AWS Batch à partir de l'application cliente Python
Comment se connecter à diverses bases de données à partir de Python (PEP 249) et SQL Alchemy
Comment télécharger des fichiers sur Cloud Storage avec le SDK Python de Firebase
Comment exécuter automatiquement la fonction d'exportation de GCP Datastore
[GCP] Un mémorandum lors de l'exécution d'un programme Python avec Cloud Functions
Comment échantillonner à partir de n'importe quelle fonction de densité de probabilité en Python
Comment exécuter une application construite avec Python + py2app construite avec Anaconda
Autoriser l'exécution rapide des scripts Python dans Cloud Run à l'aide du répondeur
Comment appeler Python ou Julia à partir de Ruby (implémentation expérimentale)