When sending a job from Cloud Composer (apache-airflow = = 11.03) using Dataflow Python Operator
, the SDK version on the Dataflow side is now Google Cloud Dataflow SDK for Python 2.5.0
, which is scheduled to end support. Since it will end up, I will upgrade the version from the execution environment in Python2 to the execution environment of Python3 of ʻApache Beam Python3.x SDK xxx`.
--Those who have touched Cloud Composer / Air Flow
When sending a job from Cloud Compposer to Dataflow, I think that the possible cause of execution with Google Cloud Dataflow SDK for Python 2.5.0
is the implementation of Dataflow Python Operator
on the airflow side.
Take a look at the implementation
--Initialize the DataFlowHook
class in the ʻexecute
function of DataflowPythonOperator
and execute the start_python_dataflow
function.
--In the start_python_dataflow
function of DataFlowHook
, python2
is hard-coded as part of the command_prefix
argument of the _start_dataflow
function.
class DataFlowHook(GoogleCloudBaseHook):
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
# "python2"Is hard-coded
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)
In the future implementation, we will create a command to send a job to Dataflow, but the prefix of this command is still python2
and we will try to execute the Dataflow file as it is, so the execution environment on the Dataflow side is Google Cloud I'm wondering if it will be the Dataflow SDK for Python 2.5.0
.
Do the following in order:
To install apache-beam, install the following 4 dependencies.
apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3
To install it, execute the following command.
(Put requirements.txt
in a suitable directory)
environment=your_composer_environment_name
location=your_location
gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}
DataflowPythonOperator``
DataFlowHook`Create a class that inherits DataflowPythonOperator
and DataFlowHook
of airflow so that the dataflow file can be executed with the python3 command.
Reference link https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': YOUR_PROJECT,
'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
'runner': 'DataflowRunner'
}
}
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
self,
job_name: str,
variables: Dict,
dataflow: str,
py_options: List[str],
append_job_name: bool = True,
py_interpreter: str = "python3"
):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
with airflow.DAG(
dag_id="airflow_test_dataflow",
default_args=default_args,
schedule_interval=None) as dag:
t1 = DummyOperator(task_id="start")
t2 = DataFlowPython3Operator(
py_file=DATAFLOW_PY_FILE,
task_id="test_job",
dag=dag)
By specifying py_interpreter =" python3 "
in the argument of start_python_dataflow
executed in the execute
function of the DataFlowPython3Operator
class, you can execute the Dataflow file with the python3
command. I will.
It is ok if you can confirm that it was executed with the version of ʻApache Beam Python3.6 SDK 2.15.0` as shown below.
A modified PR has been created to allow the python3
command to be executed using airflow's DataflowPythonOperator
and has been merged into airflow 2.0 and later.
--issue jira
https://issues.apache.org/jira/browse/AIRFLOW-4983
PullRrequest
Recommended Posts