This article introduces PyFlink's architecture and provides a quick demo of using PyFlink to analyze CDN logs.
So what exactly is PyFlink? As the name implies, PyFlink is simply a combination of Apache Flink and Python, or rather Flink on Python. But what does Python's Flink mean? First, by combining the two, you will be able to use all the features of Flink in Python. More importantly, PyFlink can leverage the computing power of Python's extensive ecosystem on Flink, further facilitating the development of that ecosystem. In other words, it's a win-win for both parties. If you dig a little deeper into this topic, you'll find that the integration of the Flink framework with the Python language is by no means a coincidence.
The python language is closely related to big data. To understand that, let's take a look at some of the problems people are solving in practice using Python. User surveys show that most people use Python for data analytics and machine learning applications. For these types of scenarios, some desirable solutions are also addressed in the big data space. Apart from expanding the audience for big data products, the integration of Python and big data greatly enhances the functionality of the Python ecosystem by extending the stand-alone architecture to a distributed architecture. It also explains that there is a strong demand for Python for analyzing large amounts of data.
The integration of Python and big data is in line with some other recent trends. But again, why does Flink now support Python instead of Go, R or any other language? And why do most users choose PyFlink over PySpark or PyHive?
To understand why, let's first consider the benefits of using the Flink framework.
-** Advantageous architecture : Flink is a pure stream computing engine with unified stream and batch processing capabilities. - Refreshing Vitality : According to ASF's objective statistics, Flink is the most active open source project in 2019. - High reliability **: As an open source project, Flink has long been tested and widely applied in the production environment of big data companies.
Next, let's see why Flink supports Python over other languages. According to statistics, Python is the second most popular language after Java and C, and has been developing rapidly since 2018. Java and Scala are Flink's default languages, but support for Python seems reasonable.
PyFlink is an inevitable product of the development of related technologies. However, understanding the implications of PyFlink is not enough, as the ultimate goal is to benefit Flink and Python users and solve real problems. So we need to dig deeper into how PyFlink can be implemented.
In order to implement PyFlink, you need to know the main goals to be achieved and the core challenges to be solved. What is the main purpose of PyFlink? In short, the main purpose of PyFlink is detailed as follows.
Then, let's analyze the important issues that need to be solved to achieve these goals.
Is it necessary to develop a Python engine on Flink like the existing Java engine to implement PyFlink? The answer is no. I tried with Flink version 1.8 and earlier, but it didn't work. The principle of basic design is to achieve a given purpose at the lowest cost. It's the simplest, but it's best to provide one layer of the Python API and reuse your existing computing engine.
So what kind of Python API should we provide for Flink? High-level table API, SQL, stateful DataStream API, etc. are familiar. Now that we're getting closer to Flink's internal logic, it's time to provide the Table API and the DataStream API for Python. But what are the important issues left at that time?
Obviously an important issue is establishing a handshake between the Python virtual machine (PyVM) and the Java virtual machine (JVM), which is essential for Flink to support multiple languages. To solve this problem, it is necessary to select an appropriate communication technology. alright, let's go.
Currently, the solutions for implementing communication between PyVM and JVM are Apache Beam and [Py4J](https: // There are two (www.py4j.org/?spm=a2c65.11461447.0.0.464e694fRAGzkI). The former is a well-known project that supports multiple languages and multi-engines, and the latter is a solution that specializes in communication between PyVM and JVM. To understand the difference between Apache Beam and Py4J, you can compare and contrast from several different perspectives. First, consider this analogy. To cross the wall, Py4J digs a hole like a mole, and Apache Beam destroys the entire wall like a big bear. From this point of view, implementing VM communication with Apache Beam is a bit complicated. In short, Apache Beam emphasizes universality and is inflexible in extreme cases.
In addition to this, Flink requires interactive programming like FLIP-36. In addition, Flink needs to be semantically consistent with respect to API design, especially multilingual support, for it to work properly. Clearly, Py4J is the best option to support communication between PyVM and the JVM, as Apache Beam's existing architecture cannot meet these requirements.
After establishing communication between the PyVM and the JVM, we achieved our first goal of making Flink's features available to Python users. This is already achieved with Flink version 1.9. Now let's take a look at the architecture of the PyFlink API in Flink version 1.9.
Flink version 1.9 uses Py4J to implement virtual machine communication. Enabled the gateway for PyVM and the gateway server for JVM to accept Python requests. The Python API also provides objects such as TableENV and Table, which are the same as those provided by the Java API. So the essence of writing a Python API is how to call the Java API. Flink version 1.9 also resolves job placement issues. You can submit jobs in various ways, such as executing Python commands or using the Python shell or CLI.
But what are the benefits of this architecture? First, it has a simple architecture that ensures a semantic consistency between the Python API and the Java API. Second, it offers excellent Python job processing performance comparable to Java jobs. For example, Flink's Java API was able to process 2,551 million data records per second during last year's Double 11.
The previous section described how to make Flink's features available to Python users. Here's how to execute a Python function on Flink. In general, there are two ways to execute a Python function on Flink:
1, ** Select a library of typical Python classes and add its API to PyFlink. ** This method is time consuming as there are too many class libraries in Python. Before you can embed the API, you need to streamline Python execution.
2, ** Based on the characteristics of the existing Flink Table API and Python class library, all the functions of the existing Python class library can be treated as user-defined functions and integrated into Flink. ** Supported in Flink version 1.10. What are the important issues of function integration? Again, it's in executing Python user-defined functions.
Next, let's select the technology for this important issue.
Executing a Python user-defined function is actually quite complicated. Not only communication between virtual machines, but also management of Python execution environment, analysis of business data exchanged between Java and Python, passing Flink's state backend to Python, monitoring of execution status, etc. .. That's how complicated Apache Beam comes in. As a big bear with support for multiple engines and languages, Apache Beam can do a lot to help with this situation, so how Apache Beam handles the execution of Python user-defined functions. Let's see if.
Shown below is the Portability Framework, a highly abstracted architecture for Apache Beam designed to support multiple languages and engines. Apache Beam currently supports several different languages, including Java, Go and Python. Beam Fn Runners and Execution at the bottom of the figure show the engine and user-defined function execution environment. Apache Beam uses Protocol Buffers, also known as Protocol Buffers, to abstract the data structure and [gRPC](https:: //grpc.io/?spm=a2c65.11461447.0.0.464e694fRAGzkI) Enables communication over the protocol and encapsulates gRPC core services. In this respect, Apache Beam is more like a firefly that illuminates the path of user-defined functions in PyFlink. Interestingly, fireflies have become the mascot of the Apache beam, so it may not be a coincidence.
Next, let's take a look at the gRPC service provided by Apache Beam.
In the figure below, the runner represents Flink's Java operator. The runner maps to the SDK worker in the Python runtime environment. Apache Beam has abstract services such as control, data, state, and logs. In fact, these services have long been operated stably and efficiently by Beam Flink Runner. This makes it easier to run the PyFlink UDF. In addition, Apache Beam has solutions for both API calls and user-defined function execution. PyFlink uses Py4J for communication between virtual machines at the API level, and uses Apache Beam's Portability Framework to set the execution environment for user-defined functions.
This shows that PyFlink strictly adheres to the principle of achieving a given purpose at the lowest cost in technology selection and always adopts the technology architecture most suitable for long-term development. .. By the way, in collaboration with Apache Beam, I have submitted over 20 optimization patches to the Beam community.
The UDF architecture must not only implement communication between the PyVM and the JVM, but also meet different requirements during the compilation and execution stages. In the PyLink user-defined function architecture diagram below, the behavior in the JVM is shown in green and the behavior in PyVM is shown in blue. Let's take a look at the local design during compilation. Local design relies on pure API mapping calls. Py4J is used for VM communication. Each time you call the Python API, the corresponding Java API is called synchronously.
A user-defined function registration API(register_function)
is required to support user-defined functions. You will also need some third-party libraries when defining Python user-defined functions. Therefore, adding a dependency requires a set of additional methods such as ʻadd_Python_file ()`. When writing a Python job, the Java API is also called before submitting the job to create a JobGraph. You can then submit jobs to the cluster in several different ways, such as through the CLI.
See image https://yqintl.alicdn.com/a72ad37ed976e62edc9ba8dcb027bf61be8fe3f3.gif
Now let's see how the Python and Java APIs work in this architecture. On the Java side, JobMaster assigns jobs to TaskManager in the same way as general Java jobs, and TaskManager executes tasks that involve the execution of operators in both the JVM and PyVM. Python user-defined function operators design various gRPC services for communication between the JVM and PyVM, such as DataService for business data communication and StateService for Python UDF to call Java state backends. To do. Many other services such as logging and metrics are also provided.
These services are built on Beam's Fn API. The user-defined function eventually runs in the Python worker, and the corresponding gRPC service returns the result to the Python user-defined function operator in the JVM. Python workers can run as processes within Docker containers and even in external service clusters. This extension mechanism lays a solid foundation for integrating PyFlink with other Python frameworks. Now that you have a basic understanding of Python's user-defined function architecture introduced in PyFlink 1.10, let's take a look at its benefits.
First, it should be a mature multilingual framework. The beam-based architecture can be easily extended to support other languages. Second, support for stateful user-defined functions. Beam abstracts stateful services and makes it easy for PyFlink to support stateful user-defined functions. The third is simple maintenance. Two active communities-Apache Beam and Apache Flink maintain and optimize the same framework.
Now that you understand PyFlink's architecture and the ideas behind it, let's take a look at PyFlink's specific application scenarios.
What business scenarios does PyFlink support? The application scenario can be analyzed from two perspectives. Python and Java. Keep in mind that PyFlink is suitable for all Java applicable scenarios.
You need to install PyFlink before you can use the API. Currently, to install PyFlink, run the following command:
PyFlink API The PyFlink API is fully integrated with the Java Table API and supports a variety of relational and window operations. Some of the easy-to-use PyFlink APIs are even more powerful than the SQL API, such as those that specialize in column manipulation. In addition to the API, PyFlink provides multiple ways to define Python UDFs.
ScalarFunction can be extended (for example, by adding metrics) to provide more auxiliary functionality. In addition, PyFlink's user functions support all method definitions supported by Python, including lambda functions, named functions, and callable functions.
After defining these methods, use PyFlink Decorators to tag them and describe the I / O data types. You can also take advantage of Python's type hinting feature to further streamline later versions for type derivation. The following example will give you a better understanding of how to define a user-defined function.
In this example, the two numbers are added together. Import the classes needed for that and define the above function. This is pretty straightforward, so let's move on to the actual case.
Here, we will introduce how to solve practical problems using PyFlink, using the real-time log analysis function of Alibaba Cloud Content Deliver Network (CDN) as an example. We use Alibaba Cloud CDN to speed up resource downloads. In general, CDN logs are parsed in a common pattern. First, it collects log data from the edge node and saves that data in a message queue. Second, it combines message queues with real-time compute clusters to perform real-time log analysis. The third is to write the analysis results to the storage system. In this example, the architecture is instantiated, Kafka is used as the message queue, Flink is used for real-time computing, and the final data is stored in a MySQL database.
For convenience, we have simplified the requirements for actual business statistics. This example collects pageview, download, and download speed statistics by region. Only core fields are selected as the data format. For example, ʻuuid is the unique log ID,
client_ipis the access source,
request_time is the resource download time, and
response_sizeis the resource data size. Here, the original log does not contain a regional field, even though we need to collect regional statistics. Therefore, we need to define a Python UDF to query the area of each data point according to
client_ip`. Let's analyze how to define a user-defined function.
Here, the name function of the user-defined function ʻip_to_province () `is defined. The input is the IP address and the output is the region name string. Here, both input and output types are defined as strings. The query service here is for demo purposes. In a production environment, you should replace it with a reliable region query service.
import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
"""
format:
{
'ip': '27.184.139.25',
'pro': 'Hebei',
'proCode': '130000',
'city': 'Shijiazhuang',
'cityCode': '130100',
'region': 'Lingshou County',
'regionCode': '130126',
'addr': 'Lingshou County, Shijiazhuang City, Hebei Province Telegraph',
'regionNames': '',
'err': ''
}
"""
try:
urlobj = urlopen( \
'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
data = str(urlobj.read(), "gbk")
pos = re.search("{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
else:
return geo_data['err']
except:
return "UnKnow"
Now that we've analyzed the requirements and defined the user-defined features, let's move on to job development. In a typical job structure, you need to define a source connector for reading Kafka data and a sink connector for storing the operation results in a MySQL database. Finally, we also need to write statistical logic.
PyFlink also supports SQL DDL statements, which allow you to define a source connector using simple DDL statements. Be sure to set connector.type
to Kafka. You can also use a DDL statement to define a Sink connector and set connector.type
to jdbc
. As you can see, the logic of defining a connector is very simple. Next, let's look at the core logic of statistics.
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.interval' = '1s'
)
"""
In this part, you need to first read the data from the data source and then use ʻip_to_province (ip)to convert the
client_ipto a specific region. Then collect regional pageview, download, and download speed statistics. Finally, store the statistical results in the results table. This statistical logic uses Python's user-defined functions as well as two Java AGG functions built into Flink,
sum and
count`.
#Core statistic
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " #IP conversion district name
"response_size, request_time")\
.group_by("province")\
.select( #Calculated amount of questions
"province, count(uuid) as access_count, "
#Calculated download capacity
"sum(response_size) as total_download, "
#Calculated download speed
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
Now let's check the code again. First you need to import the core dependencies, then create the ENV, and finally set up the planner. Flink currently supports Flink and Blink planners. We recommend using a blink planner.
Next, execute the DDL statement to register the Kafka source table and MySQL result table defined earlier. The third is to register the Python UDF. Note that you can specify other UDF dependent files in your API request and send them to the cluster with your job. Finally, write the core statistical logic and call the executor to submit the job. So far, you have created a real-time log analysis job for Alibaba Cloud CDN. Let's check the actual statistical results.
import os
from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
#创 KEN Table Environment, Planner for parallel use
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
#Data of Kafka data
t_env.sql_update(kafka_source_ddl)
#创 KEN MySql result table
t_env.sql_update(mysql_sink_ddl)
#Note: IP conversion district name UDF
t_env.register_function("ip_to_province", ip_to_province)
#Addition-dependent Python text
t_env.add_Python_file(
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")
#Core statistic
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " #IP conversion district name
"response_size, request_time")\
.group_by("province")\
.select( #Calculated amount of questions
"province, count(uuid) as access_count, "
#Calculated download capacity
"sum(response_size) as total_download, "
#Calculated download speed
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
#Business
t_env.execute("pyFlink_parse_cdn_log")
I sent the mock data to Kafka as CDN log data. On the right side of the figure below, pageview, download, and download speed statistics are collected in real time by region.
Refer to the analysis result https://yqintl.alicdn.com/e05da15f039d8331896ee1e7f294585416809ad9.gif
In general, business development with PyFlink is easy. You can easily write business logic through SQL and table APIs without understanding the underlying implementation. Let's take a look at the overall outlook for PyFlink.
The development of PyFlink has been aimed at making Flink functionality available to Python users and integrating Python functions into Flink. According to the PyFlink roadmap shown below, we first established communication between the PyVM and the JVM. And Flink 1.9 provides the Python Table API, which opens up the functionality of the existing Flink Table API to Python users. Flink 1.10 integrates Apache Beam, sets up a Python user-defined function execution environment, manages dependencies on Python's other class libraries, defines a user-defined function API to support Python's user-defined functions, and more. , Prepared to integrate Python functions into Flink.
To extend the functionality of distributed Python, PyFlink uses Pandas Series and [DataFrame](https://pandas.pydata. It supports org / pandas-docs / stable / getting_started / dsintro.html? Spm = a2c65.11461447.0.0.464e694fRAGzkI), which allows you to use Pandas user-defined functions directly in PyFlink. We also plan to enable Python user-defined functions on SQL clients in the future to make PyFlink easier to use. It also provides a Python ML pipeline API that enables Python users to use PyFlink for machine learning. Monitoring the execution of Python's user-defined functions is very important in production and business. As such, PyFlink provides additional metric management for Python user-defined functions. These features are built into Flink 1.11.
However, these are only part of PyFlink's future development plans. There's a lot more to do in the future, including optimizing PyFlink's performance, providing a graph computing API, and supporting Pandas on Flink's Pandas native API. We will continue to make existing Flink features available to Python users and integrate the powerful features of Python into Flink to achieve our original goal of expanding the Python ecosystem.
See image https://yqintl.alicdn.com/f85ba5bd5d24a01558e751bcdc8887b3f5d565ca.gif
Let's take a quick look at the points of PyFlink in the upgraded version of Flink 1.11.
Let's take a closer look at the core features of PyFlink, which is based on Flink 1.11. Focusing on PyFlink's functionality, performance, and ease of use, we plan to provide support for Pandas user-defined functions in PyFlink 1.11. In this way, Pandas' practical class library features can be used directly in PyFlink like a cumulative distribution function.
It also integrates the ML Pipeline API with PyFlink to meet business needs in machine learning scenarios. Here, we will introduce an example of implementing the KMeans method using PyFlink.
We will also focus on improving the performance of PyFlink. We will try to improve the execution performance of Python UDF by Codegen, CPython, optimized serialization, and deserialization. Preliminary comparisons show that PyFlink 1.11 performs about 15 times better than PyFlink 1.10.
To make PyFlink easier to use, we support Python user-defined functions in SQL DDL and SQL clients. This will allow you to use PyFlink on various channels.
We've already defined PyFlink and explained its implications, API architecture, user-defined function architecture, and the trade-offs behind the architecture and their benefits. We've seen the CDN case in Flink 1.11, the PyFlink roadmap, the PyFlink points, and more. But what else do you need?
Finally, let's look at the future of PyFlink. What is the outlook for PyFlink, driven by the mission of making Flink functionality available to Python users and running Python functions on top of it? As you may know, PyFlink is part of Apache Flink and includes the runtime and API layers.
How will PyFlink evolve in these two layers? At runtime, PyFlink builds gRPC general services (Control, Data, State, etc.) for communication between the JVM and PyVM. This framework abstracts the operators of Java Python user-defined functions and builds a Python execution container to support Python execution in multiple ways. For example, PyFlink can run as a process inside a Docker container and even in an external service cluster. Unlimited extensions are enabled in the form of sockets, especially if you are running in an external service cluster. All of this plays an important role in the subsequent integration of Python.
Regarding the API, we will make the Python-based API available in Flink to accomplish the mission. This also depends on the Py4J VM communication framework. PyFlink will gradually support more APIs, including Flink's Java API (Python Table API, UDX, ML Pipeline, DataStream, CEP, Gelly, State API, etc.) and the Pandas API, which is most popular with Python users. It's a schedule. Based on these APIs, PyFlink will continue to integrate with other ecosystems and facilitate development, for example, Notebook, Zeppelin, Jupyter, Alink. / alink-is-now-open-source_595847? spm = a2c65.11461447.0.0.464e694fRAGzkI), and will work with Alibaba's open source version of Flink. Currently, PyAlink's functionality is fully built-in. PyFlink will also be integrated with existing AI system platforms such as the well-known TensorFlow.
To that end, we can see that mission-based forces keep PyFlink alive. Again, PyFlink's mission is to make Flink's functionality available to Python users and perform Python parsing and computing functions on Flink. Currently, PyFlink's core committers are working hard in the community with this mission.
See image https://yqintl.alicdn.com/908ea3ff2a2fc93d3fe2797bbe9c302ad83c0581.gif
Finally, I would like to introduce PyFlink's core committer.
--Fu Dian: Committer of Flink and two other top-level Apache projects. Fu is a huge contributor to PyFlink. --Huang Xingbo: Dedicated PyFlink UDF Performance Optimizer. Mr. Huang once won the Alibaba Security Algorithm Challenge Tournament and has achieved numerous results in AI and middleware performance tournaments. --Cheng Hequn: A well-known committer in the Flink community. Chen has shared very useful information over and over again. Many users may still remember his Flink Knowledge Map. --Zhong Wei: A committer who has focused on managing the dependencies of PyFlink's user-defined functions and optimizing their ease of use. Mr. Naka has posted a lot of code.
The last committer is me. My introduction is at the end of this post. If you have any questions about PyFlink, feel free to contact our committer team.
For common issues, we encourage you to email and share with anyone on Flink's user list. In case of an urgent problem, we recommend sending an email to the committer. But for effective storage and sharing, you can ask questions on Stackoverflow. Before asking a question, first search the content of your question to see if it is answered. If not, please state your question clearly. Finally, don't forget to add the PyFlink tag to your question.
In this article, I've done a deep analysis of PyFlink. The PyFlink API architecture uses Py4J for communication between the PyVM and the JVM and is designed with semantic consistency between the Python and Java APIs. The Python User-Defined Functions Architecture integrates with Apache Beam's Portability Framework to provide efficient and stable Python user-defined functions. It also interprets the thoughts behind the architecture, the technical trade-offs, and the merits of existing architecture.
Next, I introduced the business scenarios that can be applied to PyFlink, and introduced how PyFlink actually works, using the real-time log analysis of Alibaba Cloud CDN as an example.
After that, I looked at the PyFlink roadmap and previewed the PyFlink points in Flink 1.11. With PyFlink 1.11, you can expect a performance improvement of 15 times or more compared to PyFlink 1.10. Finally, we analyzed PyFlink's missions, "Making PyFlink available to Python users" and "Running Python analysis and calculation functions on Flink."
The author of this article, Son Kinjo, joined Alibaba in 2011. During his nine years in Alibaba, Mr. Son has led the development of many core in-house systems such as Alibaba Group's behavior log management system, Arirang, cloud transcoding system, and document conversion system. He learned about the Apache Flink community in early 2016. At first, participated in town development as a developer. After that, he led the development of specific modules and was in charge of building the Apache Flink Python API (PyFlink). He is currently a PMC member of Apache Flink and ALC (Beijing) and is a committer of Apache Flink, Apache Beam, and Apache IoT DB.
Recommended Posts