Lors de l'envoi d'une tâche depuis Cloud Composer (apache-airflow = = 11.03) à l'aide de «Dataflow Python Operator», la version du SDK côté Dataflow est désormais «Google Cloud Dataflow SDK for Python 2.5.0», dont la fin du support est prévue. Puisqu'il finira, je passerai la version de l'environnement d'exécution en Python2 à l'environnement d'exécution de Python3 de ʻApache Beam Python3.x SDK xxx`.
Lors de l'envoi d'une tâche de Cloud Compposer à Dataflow, je pense que la cause possible de l'exécution avec Google Cloud Dataflow SDK for Python 2.5.0
est l'implémentation de Dataflow Python Operator
du côté du flux d'air.
Jetez un œil à la mise en œuvre
--Initialisez la classe DataFlowHook
dans la fonction ʻexecute
de DataflowPythonOperator
et exécutez la fonction start_python_dataflow
.
https://github.com/apache/airflow/blob/ffd65440a0b730dcf524934225a65676045ce1f8/airflow/contrib/operators/dataflow_operator.py#L379
La fonction start_python_dataflow
de DataFlowHook
est exécutée avec python2
codé en dur dans le cadre de l'argument command_prefix
de la fonction _start_dataflow
.
https://github.com/apache/airflow/blob/ffd65440a0b730dcf524934225a65676045ce1f8/airflow/contrib/hooks/gcp_dataflow_hook.py#L239
class DataFlowHook(GoogleCloudBaseHook):
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
# "python2"Est codé en dur
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)
Dans la future implémentation, nous allons créer une commande pour envoyer une tâche à Dataflow, mais le préfixe de cette commande est toujours python2
et nous essaierons d'exécuter le fichier Dataflow tel quel, donc l'environnement d'exécution du côté Dataflow est Google Cloud Je me demande si ce sera le SDK Dataflow pour Python 2.5.0
.
Procédez comme suit dans l'ordre:
Pour installer apache-beam, installez les quatre dépendances suivantes.
apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3
Pour l'installer, exécutez la commande suivante.
(Mettez requirements.txt
dans un répertoire approprié)
environment=your_composer_environment_name
location=your_location
gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}
DataflowPythonOperator``` DataFlowHook
Créez une classe qui hérite du flux d'air DataflowPythonOperator
et DataFlowHook
afin que le fichier de flux de données puisse être exécuté avec la commande python3.
Lien de référence https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': YOUR_PROJECT,
'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
'runner': 'DataflowRunner'
}
}
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
self,
job_name: str,
variables: Dict,
dataflow: str,
py_options: List[str],
append_job_name: bool = True,
py_interpreter: str = "python3"
):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
with airflow.DAG(
dag_id="airflow_test_dataflow",
default_args=default_args,
schedule_interval=None) as dag:
t1 = DummyOperator(task_id="start")
t2 = DataFlowPython3Operator(
py_file=DATAFLOW_PY_FILE,
task_id="test_job",
dag=dag)
En spécifiant py_interpreter =" python3 "
dans l'argument de start_python_dataflow
exécuté dans la fonction ʻexecute de la classe
DataFlowPython3Operator, vous pouvez exécuter le fichier Dataflow avec la commande
python3`. Je vais.
C'est correct si vous pouvez confirmer qu'il a été exécuté avec la version de ʻApache Beam Python3.6 SDK 2.15.0` comme indiqué ci-dessous.
Un PR modifié a été créé et fusionné dans airflow 2.0 et versions ultérieures afin que la commande python3
puisse être exécutée à l'aide de DataflowPythonOperator
de airflow.
-issue jira
https://issues.apache.org/jira/browse/AIRFLOW-4983
PullRrequest