Un guide rapide de PyFlink qui combine Apache Flink et Python

Cet article présente l'architecture de PyFlink et fournit une démonstration rapide de l'utilisation de PyFlink pour analyser les journaux CDN.

Pourquoi avez-vous besoin de PyFlink?

Flink sur Python et Python sur Flink

Alors, qu'est-ce que PyFlink exactement? Comme son nom l'indique, PyFlink est simplement une combinaison d'Apache Flink et de Python, ou Flink sur Python. Mais que signifie Flink en Python? Tout d'abord, en combinant les deux, vous pourrez utiliser toutes les fonctionnalités de Flink en Python. Plus important encore, PyFlink peut tirer parti de la puissance de calcul du vaste écosystème de Python sur Flink, facilitant ainsi le développement de cet écosystème. En d'autres termes, c'est gagnant-gagnant pour les deux parties. Si vous approfondissez un peu ce sujet, vous constaterez que l'intégration du framework Flink avec le langage Python n'est en aucun cas une coïncidence.

image.png

Python et l'écosystème Big Data

Le langage python est étroitement lié au big data. Pour comprendre cela, examinons certains des problèmes que les gens résolvent dans la pratique en utilisant Python. Les enquêtes auprès des utilisateurs ont montré que la plupart des gens utilisent Python pour l'analyse de données et les applications d'apprentissage automatique. Pour ces types de scénarios, certaines solutions souhaitables sont également abordées dans l'espace du Big Data. Outre l'élargissement de l'audience des produits Big Data, l'intégration de Python et du Big Data améliore considérablement les capacités de l'écosystème Python en étendant l'architecture autonome à une architecture distribuée. Cela explique également qu'il existe une forte demande de Python pour analyser de grandes quantités de données.

image.png

Pourquoi Flink et Python?

L'intégration de Python et du big data est en ligne avec certaines autres tendances récentes. Mais encore une fois, pourquoi Flink prend-il désormais en charge Python plutôt que Go, R ou tout autre langage? Et pourquoi la plupart des utilisateurs choisissent PyFlink plutôt que PySpark ou PyHive?

Pour comprendre pourquoi, examinons d'abord les avantages de l'utilisation du framework Flink.

Ensuite, voyons pourquoi Flink prend en charge Python par rapport à d'autres langages. Selon les statistiques, Python est le deuxième langage le plus populaire après Java et C, et se développe rapidement depuis 2018. Java et Scala sont les langages par défaut de Flink, mais la prise en charge de Python semble raisonnable.

image.png

PyFlink est un produit inévitable du développement des technologies associées. Cependant, comprendre les implications de PyFlink ne suffit pas, car le but ultime est de profiter aux utilisateurs de Flink et de Python et de résoudre de vrais problèmes. Par conséquent, nous devons approfondir la manière dont PyFlink peut être implémenté.

image.png

Architecture PyFlink

Pour mettre en œuvre PyFlink, nous devons connaître les principaux objectifs à atteindre et les principaux défis à résoudre. Quel est le but principal de PyFlink? En bref, l'objectif principal de PyFlink est détaillé comme suit.

  1. Rendez toutes les fonctionnalités Flink disponibles aux utilisateurs de Python.
  2. En exécutant les fonctions d'analyse / calcul de Python sur Flink, vous pouvez améliorer la capacité de Python à résoudre les problèmes de Big Data.

Ensuite, analysons les problèmes importants qui doivent être résolus pour atteindre ces objectifs.

image.png

Mettre les fonctionnalités Flink à la disposition des utilisateurs Python

Est-il nécessaire de développer un moteur Python sur Flink comme le moteur Java existant pour implémenter PyFlink? La réponse est non. J'ai essayé avec Flink version 1.8 et antérieure, mais cela n'a pas fonctionné. Le principe de la conception de base est d'atteindre un objectif donné au moindre coût. C'est le plus simple, mais il est préférable de fournir une couche de l'API Python et de réutiliser votre moteur informatique existant.

Alors, quel type d'API Python devrions-nous fournir pour Flink? Les API de table de haut niveau, SQL, API DataStream avec état, etc. sont familiers. Maintenant que nous nous rapprochons de la logique interne de Flink, il est temps de fournir l'API Table et l'API DataStream pour Python. Mais quels sont les problèmes importants qui subsistent à ce moment-là?

image.png

Questions clés

De toute évidence, un problème important est l'établissement d'une poignée de main entre la machine virtuelle Python (PyVM) et la machine virtuelle Java (JVM), ce qui est essentiel pour que Flink prenne en charge plusieurs langues. Afin de résoudre ce problème, il est nécessaire de sélectionner une technologie de communication appropriée. d'accord allons-y.

image.png

Sélection de technologie de communication de machine virtuelle

Actuellement, les solutions pour implémenter la communication entre PyVM et JVM sont Apache Beam et [Py4J](https: // Il y en a deux (www.py4j.org/?spm=a2c65.11461447.0.0.464e694fRAGzkI). Le premier est un projet bien connu qui prend en charge plusieurs langages et multi-moteurs, et le second est une solution spécialisée dans la communication entre PyVM et JVM. Pour comprendre la différence entre Apache Beam et Py4J, vous pouvez comparer et contraster à partir de plusieurs perspectives différentes. Tout d'abord, considérons cette analogie. Pour traverser le mur, Py4J creuse un trou comme un mogura, et Apache Beam détruit tout le mur comme un gros ours. De ce point de vue, implémenter la communication VM avec Apache Beam est un peu compliqué. En bref, Apache Beam est universel et inflexible dans les cas extrêmes.

image.png

En plus de cela, Flink nécessite une programmation interactive comme FLIP-36. De plus, Flink doit être sémantiquement cohérent en ce qui concerne la conception d'API, en particulier le support multilingue, pour que cela fonctionne correctement. De toute évidence, Py4J est la meilleure option pour prendre en charge la communication entre PyVM et JVM, car l'architecture existante d'Apache Beam ne peut pas répondre à ces exigences.

image.png

Architecture technique

Après avoir établi la communication entre PyVM et JVM, nous avons atteint notre premier objectif de rendre les fonctionnalités de Flink disponibles aux utilisateurs de Python. Cela a déjà été réalisé dans la version Flink 1.9. Jetons maintenant un œil à l'architecture de l'API PyFlink dans Flink version 1.9.

Flink version 1.9 utilise Py4J pour implémenter la communication de machine virtuelle. Activation de la passerelle pour PyVM et du serveur de passerelle pour JVM pour accepter les requêtes Python. L'API Python fournit également des objets tels que TableENV et Table, qui sont identiques à ceux fournis par l'API Java. Par conséquent, l'essence de l'écriture de l'API Python est de savoir comment appeler l'API Java. La version 1.9 de Flink résout également les problèmes de placement. Vous pouvez soumettre des travaux de différentes manières, par exemple en exécutant des commandes Python ou en utilisant le shell Python ou la CLI.

image.png

Mais quels sont les avantages de cette architecture? Tout d'abord, il a une architecture simple qui assure une cohérence sémantique entre l'API Python et l'API Java. Deuxièmement, il offre d'excellentes performances de traitement des travaux Python comparables à celles des travaux Java. Par exemple, l'API Java de Flink a pu traiter 2 551 millions d'enregistrements de données par seconde lors du Double 11 de l'année dernière.

image.png

Exécuter des fonctions d'analyse / calcul Python sur Flink

La section précédente a décrit comment rendre les fonctionnalités de Flink disponibles aux utilisateurs de Python. Voici comment exécuter une fonction Python sur Flink. En général, il existe deux façons d'exécuter des fonctions Python sur Flink:

1, ** Sélectionnez une bibliothèque de classes Python typique et ajoutez son API à PyFlink. ** Cette méthode prend du temps car il y a trop de bibliothèques de classes pour Python. Avant d'intégrer l'API, vous devez rationaliser l'exécution de Python.

2, ** Sur la base des caractéristiques de l'API Flink Table et de la bibliothèque de classes Python existantes, toutes les fonctions de la bibliothèque de classes Python existante peuvent être traitées comme des fonctions définies par l'utilisateur et intégrées dans Flink. ** Pris en charge dans la version Flink 1.10. Quels sont les problèmes importants de l'intégration des fonctions? Encore une fois, il s'agit d'exécuter des fonctions définies par l'utilisateur Python.

Ensuite, sélectionnons la technologie pour cette question importante. image.png

Choix de la technologie pour exécuter les fonctions définies par l'utilisateur

L'exécution de fonctions définies par l'utilisateur Python est en fait assez compliquée. Non seulement la communication entre les machines virtuelles, mais aussi la gestion de l'environnement d'exécution Python, l'analyse des données métiers échangées entre Java et Python, le passage du backend d'état de Flink à Python, la surveillance de l'état d'exécution, etc. .. C'est à quel point Apache Beam est compliqué. En tant que grand ours prenant en charge plusieurs moteurs et langages, Apache Beam peut faire beaucoup pour aider dans cette situation, comment Apache Beam gère l'exécution des fonctions définies par l'utilisateur Python. Voyons si.

Vous trouverez ci-dessous le Portability Framework, une architecture hautement abstraite pour Apache Beam conçue pour prendre en charge plusieurs langages et moteurs. Apache Beam prend actuellement en charge plusieurs langages différents, notamment Java, Go et Python. Beam Fn Runners and Execution en bas de la figure montre l'environnement d'exécution du moteur et des fonctionnalités définies par l'utilisateur. Apache Beam utilise Protobuf pour résumer la structure des données et [gRPC](https :: //grpc.io/?spm=a2c65.11461447.0.0.464e694fRAGzkI) Active la communication sur le protocole et encapsule le service central de gRPC. À cet égard, Apache Beam ressemble plus à une luciole qui éclaire le chemin d'exécution des fonctions définies par l'utilisateur dans PyFlink. Fait intéressant, les lucioles sont devenues la mascotte d'Apache Beam, donc ce n'est peut-être pas une coïncidence.

Ensuite, jetons un œil au service gRPC fourni par Apache Beam. image.png

Dans la figure ci-dessous, le runner représente l'opérateur Java de Flink. L'exécuteur est mappé au worker SDK dans l'environnement d'exécution Python. Apache Beam propose des services abstraits tels que le contrôle, les données, l'état et les journaux. En fait, ces services ont longtemps été exploités de manière stable et efficace par Beam Flink Runner. Cela facilite l'exécution de PyFlink UDF. De plus, Apache Beam propose des solutions pour les appels d'API et l'exécution de fonctions définies par l'utilisateur. PyFlink utilise Py4J pour la communication entre les machines virtuelles au niveau de l'API et utilise le cadre de portabilité d'Apache Beam pour définir l'environnement d'exécution des fonctions définies par l'utilisateur.

Cela montre que PyFlink adhère strictement au principe de la réalisation d'un objectif donné au coût le plus bas dans le choix de la technologie et adopte toujours l'architecture technologique la plus adaptée au développement à long terme. .. En passant, tout en travaillant avec Apache Beam, j'ai soumis plus de 20 correctifs d'optimisation à la communauté Beam.

image.png

Architecture fonctionnelle définie par l'utilisateur

L'architecture UDF doit non seulement implémenter la communication entre PyVM et JVM, mais également répondre à différentes exigences lors des étapes de compilation et d'exécution. Dans le diagramme d'architecture des fonctions définies par l'utilisateur PyLink ci-dessous, le comportement sur JVM est affiché en vert et le comportement sur PyVM est affiché en bleu. Jetons un coup d'œil à la conception locale lors de la compilation. La conception locale repose sur des appels de mappage d'API purs. Py4J est utilisé pour la communication VM. Chaque fois que vous appelez une API Python, l'API Java correspondante est appelée de manière synchrone.

L'API d'enregistrement de fonction définie par l'utilisateur(register_function) ʻest nécessaire pour prendre en charge les fonctions définies par l'utilisateur. Vous aurez également besoin de certaines bibliothèques tierces lors de la définition des fonctions définies par l'utilisateur Python. Par conséquent, l'ajout d'une dépendance nécessite un ensemble de méthodes supplémentaires telles que ʻadd_Python_file (). Lors de l'écriture d'un travail Python, l'API Java est également appelée avant de soumettre le travail pour créer un JobGraph. Vous pouvez ensuite soumettre des travaux au cluster de plusieurs manières différentes, par exemple via l'interface de ligne de commande.

image.png Voir l'image https://yqintl.alicdn.com/a72ad37ed976e62edc9ba8dcb027bf61be8fe3f3.gif

Voyons maintenant comment les API Python et Java fonctionnent dans cette architecture. Du côté Java, JobMaster attribue des travaux à TaskManager de la même manière que les travaux Java généraux, et TaskManager exécute les tâches qui impliquent l'exécution d'opérateurs dans JVM et PyVM. Les opérateurs de fonction définis par l'utilisateur Python conçoivent divers services gRPC pour la communication entre JVM et PyVM, tels que DataService pour la communication de données métier et StateService pour Python UDF pour appeler des backends d'état Java. Faire. De nombreux autres services tels que la journalisation et les métriques sont fournis.

Ces services sont basés sur l'API Fn de Beam. La fonction définie par l'utilisateur sera finalement exécutée sur le worker Python et le service gRPC correspondant renverra le résultat à l'opérateur de fonction Python défini par l'utilisateur dans la JVM. Les travailleurs Python peuvent s'exécuter en tant que processus dans des conteneurs Docker et même des clusters de services externes. Ce mécanisme d'extension pose une base solide pour l'intégration de PyFlink avec d'autres frameworks Python. Maintenant que vous avez une compréhension de base de l'architecture des fonctions définies par l'utilisateur de Python introduite dans PyFlink 1.10, jetons un coup d'œil à ses avantages.

Premièrement, il doit s'agir d'un cadre multilingue mature. L'architecture basée sur les faisceaux peut être facilement étendue pour prendre en charge d'autres langues. Deuxièmement, la prise en charge des fonctions définies par l'utilisateur avec état. Beam résume les services avec état et permet à PyFlink de prendre en charge facilement les fonctions définies par l'utilisateur avec état. Le troisième est la simple maintenance. Deux communautés actives - Apache Beam et Apache Flink maintiennent et optimisent le même framework.

image.png

Comment utiliser PyFlink

Maintenant que vous comprenez l'architecture de PyFlink et les idées qui la sous-tendent, examinons les scénarios d'application spécifiques de PyFlink.

Scénario d'application PyFlink

Quels scénarios commerciaux sont pris en charge par PyFlink? Le scénario d'application peut être analysé sous deux angles. Python et Java. Gardez à l'esprit que PyFlink convient à tous les scénarios applicables à Java.

  1. ** Scénarios événementiels ** tels que plantations cliquables et surveillance.
  2. ** Analyse des données ** telles que la gestion des stocks et la visualisation des données.
  3. ** Pipeline de données **, également connu sous le nom de scénarios ETL tels que l'analyse des journaux. 4, ** Apprentissage automatique ** tel que des recommandations ciblées. Vous pouvez utiliser PyFlink dans tous ces scénarios. PyFlink s'applique également aux scénarios spécifiques à Python tels que le calcul scientifique. Avec autant de scénarios d'application, vous vous demandez peut-être exactement quelles API PyFlink sont disponibles. Examinons donc cette question maintenant.

image.png

Installez PyFlink

Vous devez installer PyFlink avant de pouvoir utiliser l'API. Actuellement, pour installer PyFlink, exécutez la commande suivante: image.png

PyFlink API L'API PyFlink est entièrement intégrée à l'API Java Table et prend en charge une variété d'opérations relationnelles et de fenêtre. Certaines des API PyFlink faciles à utiliser sont encore plus puissantes que l'API SQL, telles que les API spécifiques aux colonnes. En plus de l'API, PyFlink fournit plusieurs façons de définir les UDF en Python. image.png

Définition des fonctions définies par l'utilisateur dans PyFlink

ScalarFunction peut être étendu pour fournir plus de fonctionnalités auxiliaires (par exemple, en ajoutant des métriques). De plus, les fonctions utilisateur de PyFlink prennent en charge toutes les définitions de méthodes prises en charge par Python, y compris les fonctions lambda, les fonctions nommées et les fonctions appelables.

Après avoir défini ces méthodes, utilisez PyFlink Decorators pour les baliser et décrire les types de données d'entrée / sortie. Vous pouvez également profiter de la fonction d'indication de type de Python pour rationaliser davantage les versions ultérieures pour la dérivation de type. L'exemple suivant vous permettra de mieux comprendre comment définir des fonctions définies par l'utilisateur.

image.png

Un exemple de définition d'une fonction définie par l'utilisateur Python

Dans cet exemple, les deux nombres sont additionnés. Importez les classes nécessaires pour cela et définissez les fonctions ci-dessus. C'est assez simple, alors passons au cas réel.

image.png

Pour PyFlink: analyse des journaux en temps réel d'Alibaba Cloud CDN

Ici, nous allons présenter comment résoudre des problèmes pratiques à l'aide de PyFlink, en utilisant la fonction d'analyse des journaux en temps réel d'Alibaba Cloud Content Deliver Network (CDN) comme exemple. Nous utilisons Alibaba Cloud CDN pour accélérer les téléchargements de ressources. En général, les journaux CDN sont analysés selon un modèle commun. Tout d'abord, il collecte les données de journal du nœud de périphérie et enregistre ces données dans la file d'attente de messages. Deuxièmement, il combine des files d'attente de messages avec des clusters de calcul en temps réel pour effectuer une analyse des journaux en temps réel. La troisième consiste à écrire les résultats de l'analyse dans le système de stockage. Dans cet exemple, l'architecture est instanciée, Kafka est utilisé comme file d'attente de messages, Flink est utilisé pour le calcul en temps réel et les données finales sont stockées dans une base de données MySQL. image.png

Exigences

Pour plus de commodité, nous avons simplifié les exigences relatives aux statistiques commerciales réelles. Cet exemple collecte des statistiques de vitesse de consultation, de téléchargement et de téléchargement de page par région. Seuls les champs principaux sont sélectionnés comme format de données. Par exemple, ʻuuidest l'ID de journal unique,client_ip est la source d'accès, request_time est le temps de téléchargement de la ressource et response_sizeest la taille des données de la ressource. Ici, le journal d'origine ne contient pas de champ régional, même si nous devons collecter des statistiques régionales. Par conséquent, nous devons définir une UDF Python pour interroger la zone de chaque point de données en fonction declient_ip`. Analysons comment définir une fonction définie par l'utilisateur.

Définition de fonction définie par l'utilisateur

Ici, la fonction de nom de la fonction définie par l'utilisateur ʻip_to_province () ʻest définie. L'entrée est l'adresse IP et la sortie est la chaîne de nom de zone. Ici, les types d'entrée et de sortie sont définis comme des chaînes. Le service de requête ici est à des fins de démonstration. Dans un environnement de production, vous devez le remplacer par un service d'interrogation de région fiable.

image.png

import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
   """
   format:
       {
       'ip': '27.184.139.25',
       'pro': 'Province du Hebei',
       'proCode': '130000',
       'city': 'Ville d'Ishiya Sho',
       'cityCode': '130100',
       'region': 'Shouju',
       'regionCode': '130126',
       'addr': 'Province du Hebei, ville d'Ishiyasho',
       'regionNames': '',
       'err': ''
       }
   """
   try:
       urlobj = urlopen( \
        'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
       data = str(urlobj.read(), "gbk")
       pos = re.search("{[^{}]+\}", data).span()
       geo_data = json.loads(data[pos[0]:pos[1]])
       if geo_data['pro']:
           return geo_data['pro']
       else:
           return geo_data['err']
   except:
       return "UnKnow"

Définition du connecteur

Maintenant que nous avons analysé les exigences et défini les fonctionnalités définies par l'utilisateur, passons au développement des tâches. Dans une structure de travail typique, vous devez définir un connecteur source pour lire les données Kafka et un connecteur récepteur pour stocker les résultats de l'opération dans une base de données MySQL. Enfin, nous devons également écrire une logique statistique.

PyFlink prend également en charge les instructions SQL DDL, vous permettant de définir des connecteurs source à l'aide d'instructions DDL simples. Assurez-vous de définir connector.type sur Kafka. Vous pouvez également utiliser une instruction DDL pour définir un connecteur Sink et définir connector.type sur jdbc. Comme vous pouvez le voir, la logique de la définition d'un connecteur est très simple. Examinons ensuite la logique de base des statistiques.

image.png

kafka_source_ddl = """
CREATE TABLE cdn_access_log (
 uuid VARCHAR,
 client_ip VARCHAR,
 request_time BIGINT,
 response_size BIGINT,
 uri VARCHAR
) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic' = 'access_log',
 'connector.properties.zookeeper.connect' = 'localhost:2181',
 'connector.properties.bootstrap.servers' = 'localhost:9092',
 'format.type' = 'csv',
 'format.ignore-parse-errors' = 'true'
)
"""

mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
 'connector.table' = 'access_statistic',
 'connector.username' = 'root',
 'connector.password' = 'root',
 'connector.write.flush.interval' = '1s'
)
"""

Logique statistique de base

Dans cette partie, vous devez d'abord lire les données de la source de données, puis utiliser ʻip_to_province (ip) pour convertir le client_ip` dans une région spécifique. Collectez ensuite les statistiques de vitesse de consultation, de téléchargement et de téléchargement des pages régionales. Enfin, stockez les résultats statistiques dans le tableau des résultats. Cette logique statistique utilise non seulement des fonctions Python définies par l'utilisateur, mais également deux fonctions Java AGG intégrées à Flink, «sum» et «count».

image.png

#Statistique de base
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " #Nom du district de conversion IP
           "response_size, request_time")\
   .group_by("province")\
   .select( #Nombre de questions calculé
           "province, count(uuid) as access_count, " 
           #Montant de chargement calculé
           "sum(response_size) as total_download,  " 
           #Vitesse de chargement calculée
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

Code complet pour l'analyse des journaux en temps réel

Maintenant, vérifions à nouveau le code. Vous devez d'abord importer les dépendances principales, puis créer l'ENV et enfin configurer le planificateur. Flink prend actuellement en charge les planificateurs Flink et Blink. Nous vous recommandons d'utiliser un planificateur de clignotement.

Ensuite, exécutez l'instruction DDL pour enregistrer la table source Kafka et la table de résultats MySQL définies précédemment. Le troisième est d'enregistrer l'UDF Python. Notez que vous pouvez spécifier d'autres dépendances UDF dans votre demande d'API et les envoyer au cluster avec le travail. Enfin, écrivez la logique statistique de base, appelez l'exécuteur et soumettez le travail. Jusqu'à présent, vous avez créé une tâche d'analyse des journaux en temps réel pour le CDN Alibaba Cloud. Vérifions les résultats statistiques réels.

image.png

import os

from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl

#创 Environnement de table KEN, planificateur pour une utilisation parallèle
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
   env,
   environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

#Tableau des nombres Kafka
t_env.sql_update(kafka_source_ddl)
#创 Tableau des résultats KEN MySql
t_env.sql_update(mysql_sink_ddl)

#Remarque 册 Nom du district de conversion IP UDF
t_env.register_function("ip_to_province", ip_to_province)

#Texte Python dépendant de l'addition
t_env.add_Python_file(
    os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
    os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")

#Statistique de base
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " #Nom du district de conversion IP
           "response_size, request_time")\
   .group_by("province")\
   .select( #Nombre de questions calculé
           "province, count(uuid) as access_count, " 
           #Montant de chargement calculé
           "sum(response_size) as total_download,  " 
           #Vitesse de chargement calculée
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

#Entreprise
t_env.execute("pyFlink_parse_cdn_log")

Résultat de sortie de l'analyse du journal en temps réel

J'ai envoyé les données factices à Kafka en tant que données de journal CDN. Sur le côté droit de la figure ci-dessous, les statistiques de vitesse de consultation, de téléchargement et de téléchargement sont collectées en temps réel par région.

image.png Reportez-vous au résultat de l'analyse https://yqintl.alicdn.com/e05da15f039d8331896ee1e7f294585416809ad9.gif

Quelles sont les perspectives d'avenir de PyFlink?

En général, le développement commercial avec PyFlink est facile. Vous pouvez facilement écrire une logique métier via SQL ou des API de table sans avoir à comprendre l'implémentation sous-jacente. Jetons un coup d'œil aux perspectives générales de PyFlink.

Feuille de route selon le but

Le développement de PyFlink visait à rendre les fonctionnalités de Flink disponibles aux utilisateurs de Python et à intégrer des fonctions Python dans Flink. Selon la feuille de route PyFlink ci-dessous, nous avons d'abord établi la communication entre PyVM et JVM. Et Flink 1.9 fournit l'API Python Table, qui ouvre les fonctionnalités de l'API Flink Table existante aux utilisateurs Python. Dans Flink 1.10, l'intégration Apache Beam, les paramètres d'environnement d'exécution de fonction définis par l'utilisateur Python, la gestion des dépendances Python avec d'autres bibliothèques de classes, la définition d'API de fonction définie par l'utilisateur pour prendre en charge les fonctions définies par l'utilisateur Python, etc. , Préparé pour intégrer les fonctions Python dans Flink.

Pour étendre les fonctionnalités de Python distribué, PyFlink utilise Pandas Series et [DataFrame](https: //pandas.pydata. org / pandas-docs / stable / getting_started / dsintro.html? Spm = a2c65.11461447.0.0.464e694fRAGzkI) est pris en charge, et les fonctions définies par l'utilisateur Pandas peuvent être utilisées directement avec PyFlink. À l'avenir, nous prévoyons d'activer les fonctions définies par l'utilisateur Python sur les clients SQL pour rendre PyFlink plus facile à utiliser. Il fournit également une API de pipeline Python ML pour rendre PyFlink disponible aux utilisateurs de Python pour l'apprentissage automatique. La surveillance de l'exécution des fonctions définies par l'utilisateur de Python est très importante en production et en entreprise. Par conséquent, PyFlink fournit une gestion de métrique supplémentaire pour les fonctions définies par l'utilisateur Python. Ces fonctionnalités sont intégrées à Flink 1.11.

Cependant, ce ne sont qu'une partie des futurs plans de développement de PyFlink. Il reste encore beaucoup à faire à l'avenir, notamment l'optimisation des performances de PyFlink, la fourniture d'API de calcul graphique et la prise en charge de Pandas sur l'API native Pandas de Flink. En rendant les fonctionnalités existantes de Flink disponibles en permanence aux utilisateurs de Python et en intégrant les fonctionnalités puissantes de Python dans Flink, nous atteindrons notre objectif initial d'élargir l'écosystème Python.

image.png Voir l'image https://yqintl.alicdn.com/f85ba5bd5d24a01558e751bcdc8887b3f5d565ca.gif

Aperçu de PyFlink 1.11

Jetons un coup d'œil rapide aux points de PyFlink dans la mise à jour Flink 1.11.

Fonctionnalité

Examinons de plus près les fonctionnalités principales de PyFlink, qui est basé sur Flink 1.11. Nous nous concentrons sur la fonctionnalité, les performances et la facilité d'utilisation de PyFlink et prendrons en charge les fonctions définies par l'utilisateur Pandas dans PyFlink 1.11. De cette façon, les fonctionnalités pratiques de la bibliothèque de classes de Pandas peuvent être utilisées directement dans PyFlink, comme les fonctions de distribution cumulative.

image.png

Il intègre également l'API ML Pipeline avec PyFlink pour répondre aux besoins de l'entreprise dans les scénarios d'apprentissage automatique. Voici un exemple d'implémentation de la méthode KMeans à l'aide de PyFlink.

image.png

performance

Nous nous concentrerons également sur l'amélioration des performances de PyFlink. Nous allons essayer d'améliorer les performances d'exécution de Python UDF en utilisant Codegen, CPython, une sérialisation et une désérialisation optimisées. Une comparaison préliminaire montre que PyFlink 1.11 fonctionne environ 15 fois mieux que PyFlink 1.10.

image.png

facilité d'utilisation

Pour faciliter l'utilisation de PyFlink, nous prenons en charge les fonctions définies par l'utilisateur Python dans les clients SQL DDL et SQL. Cela rend PyFlink disponible sur une variété de canaux.

image.png

Feuille de route PyFlink, mission, vision

Nous avons déjà défini PyFlink et expliqué ses implications, l'architecture d'API, l'architecture de fonction définie par l'utilisateur et les compromis derrière l'architecture et leurs avantages. Nous avons vu le cas CDN dans Flink 1.11, la feuille de route PyFlink, les points PyFlink, etc. Mais de quoi d'autre avez-vous besoin?

Enfin, regardons l'avenir de PyFlink. Quelles sont les perspectives de PyFlink, motivées par la mission de rendre la fonctionnalité Flink disponible pour les utilisateurs de Python et d'exécuter des fonctions Python sur Flink? Comme vous le savez peut-être, PyFlink fait partie d'Apache Flink et comprend une couche d'exécution et d'API.

Comment PyFlink va-t-il évoluer dans ces deux couches? Au moment de l'exécution, PyFlink crée des services généraux gRPC (contrôle, données, état, etc.) pour la communication entre JVM et PyVM. Ce framework résume les opérateurs des fonctions définies par l'utilisateur Java Python et crée un conteneur d'exécution Python pour prendre en charge l'exécution Python de plusieurs manières. Par exemple, PyFlink peut s'exécuter en tant que processus dans un conteneur Docker et même dans un cluster de services externe. Des extensions illimitées sont activées, en particulier sous la forme de sockets, lors de l'exécution dans un cluster de services externe. Tout cela joue un rôle important dans l'intégration ultérieure de Python.

En ce qui concerne l'API, nous rendrons l'API basée sur Python disponible dans Flink pour accomplir la mission. Cela dépend également du cadre de communication de la machine virtuelle Py4J. PyFlink prendra progressivement en charge plus d'API, y compris l'API Java de Flink (API de table Python, UDX, ML Pipeline, DataStream, CEP, Gelly, API d'état, etc.) et l'API Pandas, qui est la plus populaire auprès des utilisateurs de Python. C'est un horaire. Sur la base de ces API, PyFlink continuera à s'intégrer à d'autres écosystèmes et à faciliter le développement, par exemple Notebook, Zeppelin, Jupyter, Alink. / alink-is-now-open-source_595847? spm = a2c65.11461447.0.0.464e694fRAGzkI), et fonctionnera avec la version open source d'Alibaba Flink. Actuellement, la fonctionnalité de PyAlink est entièrement intégrée. PyFlink sera également intégré aux plates-formes système d'IA existantes telles que le bien connu TensorFlow.

À cette fin, nous pouvons voir que les forces basées sur la mission maintiennent PyFlink en vie. Encore une fois, la mission de PyFlink est de rendre les fonctionnalités de Flink disponibles aux utilisateurs de Python et d'exécuter des fonctions d'analyse et de calcul Python sur Flink. Actuellement, les principaux committers de PyFlink travaillent dur dans la communauté avec cette mission.

image.png Voir l'image https://yqintl.alicdn.com/908ea3ff2a2fc93d3fe2797bbe9c302ad83c0581.gif

Committer de base PyFlink

Enfin, je voudrais vous présenter le core committer de PyFlink.

--Fu Dian: commissaire pour Flink et deux autres projets Apache de haut niveau. Fu est un énorme contributeur à PyFlink. --Huang Xingbo: Optimiseur de performances PyFlink UDF dédié. Huang a déjà remporté le tournoi Alibaba Security Algorithm Challenge et a remporté de nombreux succès dans les tournois de performance IA et middleware. --Cheng Hequn: un committer bien connu dans la communauté Flink. Chen a partagé des informations très utiles encore et encore. De nombreux utilisateurs peuvent encore se souvenir de sa carte de connaissances Flink. --Zhong Wei: un committer qui s'est concentré sur la gestion des dépendances des fonctions définies par l'utilisateur de PyFlink et sur l'optimisation de la facilité d'utilisation. M. Naka a publié beaucoup de code.

Le dernier commetteur, c'est moi. Mon introduction est à la fin de cet article. Si vous avez des questions sur PyFlink, n'hésitez pas à contacter notre équipe de committer.

image.png

Pour les problèmes courants, nous vous encourageons à envoyer un e-mail à quelqu'un sur la liste des utilisateurs de Flink pour le partager. En cas de problème urgent, nous vous recommandons d'envoyer un mail au committer. Mais pour un stockage et un partage efficaces, vous pouvez poser des questions dans Stackoverflow. Avant de poser une question, commencez par rechercher le contenu de votre question pour voir si vous y répondez. Sinon, veuillez énoncer clairement votre question. Enfin, n'oubliez pas d'ajouter la balise PyFlink à votre question.

image.png

Aperçu

Dans cet article, j'ai examiné plus en détail PyFlink. L'architecture d'API PyFlink utilise Py4J pour la communication entre PyVM et JVM et est conçue pour maintenir la cohérence sémantique entre les API Python et Java. L'architecture de fonction définie par l'utilisateur Python s'intègre à l'infrastructure de portabilité d'Apache Beam pour fournir des fonctions définies par l'utilisateur Python efficaces et stables. Il interprète également les réflexions derrière l'architecture, les compromis techniques et les mérites des bâtiments existants.

Ensuite, j'ai présenté les scénarios commerciaux qui peuvent être appliqués à PyFlink et présenté le fonctionnement réel de PyFlink, en utilisant comme exemple l'analyse des journaux en temps réel d'Alibaba Cloud CDN.

Après cela, j'ai regardé la feuille de route PyFlink et prévisualisé les points PyFlink dans Flink 1.11. Avec PyFlink 1.11, vous pouvez vous attendre à une amélioration des performances de 15 fois ou plus par rapport à PyFlink 1.10. Enfin, nous avons analysé les missions de PyFlink, «Rendre PyFlink disponible pour les utilisateurs de Python» et «Exécuter les fonctions d'analyse et de calcul de Python sur Flink».

A propos de l'auteur

L'auteur de cet article, Son Kinjo, a rejoint Alibaba en 2011. Après neuf ans à Alibaba, M. Son a dirigé le développement de nombreux systèmes internes de base, y compris le système de gestion des journaux de comportement du groupe Alibaba, Arirang, le système de transcodage cloud et le système de conversion de documents. Il a découvert la communauté Apache Flink début 2016. Dans un premier temps, a participé au développement de la ville en tant que développeur. Après cela, il a dirigé le développement de modules spécifiques et a été en charge de la construction de l'API Apache Flink Python (PyFlink). Il est actuellement membre PMC d'Apache Flink et d'ALC (Pékin) et est commissaire d'Apache Flink, Apache Beam et Apache IoT DB.

Recommended Posts

Un guide rapide de PyFlink qui combine Apache Flink et Python
Comment écrire une classe méta qui prend en charge à la fois python2 et python3
Un joli nimporter qui connecte nim et python
Je veux exécuter et distribuer un programme qui redimensionne les images Python3 + pyinstaller
Script Python qui explore le flux RSS du statut Azure et le publie sur Hipchat
Un programme qui demande quelques kilogrammes pour atteindre l'IMC et le poids standard [Python]
Un script qui combine vos modules et binaires Python préférés en une seule couche Lambda
Une comparaison rapide des bibliothèques de test Python et node.js
[Python] Comment écrire une docstring conforme à PEP8
[Python] Une bibliothèque pratique qui convertit les kanji en hiragana
Création d'un toolver qui crache le système d'exploitation, Python, les modules et les versions d'outils à Markdown
[C / C ++] Passez la valeur calculée en C / C ++ à une fonction python pour exécuter le processus et utilisez cette valeur en C / C ++.
Les messages d'erreur Python sont spécifiques et faciles à comprendre "ga" (avant cela, deux points (:) et point-virgule (;))
Feuille de route d'apprentissage qui vous permet de développer et de publier des services à partir de zéro avec Python
Une histoire qui facilite l'estimation de la surface habitable à l'aide d'Elasticsearch et de Python
Une note qui déploie une application Python de Circle CI vers Elastic Beanstalk et avertit Slack
[Python] Un programme pour trouver le nombre de pommes et d'oranges qui peuvent être récoltées
Un script Python qui enregistre une image de presse-papiers (GTK) dans un fichier.
Un moyen standard de développer et de distribuer des packages en Python
Essayez d'ouvrir une sous-fenêtre avec PyQt5 et Python
Créer un environnement Python et transférer des données vers le serveur
Créez le code qui renvoie "A et prétendant B" en python
Essayez simplement de recevoir un webhook avec ngrok et Python
Un script python qui convertit les données Oracle Database en csv
[Python] Une histoire qui semblait tomber dans un piège à contourner
Apache mod_auth_tkt et Python AuthTkt
Défis et opportunités Apache Flink
Je souhaite utiliser un caractère générique que je souhaite décortiquer avec Python remove
Expressions régulières faciles et solides à apprendre en Python
l'expression régulière de python, str et unicode sont sobres et addictives
[Python] Comment ajouter des lignes et des colonnes à une table (pandas DataFrame)
[Python] Un mémo que j'ai essayé de démarrer avec asyncio
Comment créer une caméra de surveillance (caméra de sécurité) avec Opencv et Python
J'ai essayé de faire un processus d'exécution périodique avec Selenium et Python
Ecrire un programme qui abuse du programme et envoie 100 e-mails
Un codec Python spécial qui semble savoir mais ne sait pas
Un script python qui supprime les fichiers ._DS_Store et ._ * créés sur Mac
[python] Une note que j'ai commencé à comprendre le comportement de matplotlib.pyplot
[Python] Un programme qui fait pivoter le contenu de la liste vers la gauche
Python a + = b et a = a + b sont différents
Python 3.6 sous Windows ... et vers Xamarin.
[Introduction à Python3 Jour 1] Programmation et Python
Ceci et cela des propriétés python
[Python] renvoie A [ou / et] B
Journalisation Python et vidage vers json
5 façons de créer un chatbot Python
Sélénium et python pour ouvrir Google
Modèles Python qui ont été publiés dans le monde et ont été examinés plus tard
[Python] J'ai fait un décorateur qui ne semble pas avoir d'utilité.
[Introduction à Python] Quelle est la différence entre une liste et un taple?
Avec PEP8 et PEP257, un codage Python qui n'est pas gênant à montrer aux gens!
J'ai créé une application Web en Python qui convertit Markdown en HTML
[Python] Un programme qui calcule le nombre de chaussettes jumelées
Notez que l'environnement Python de Pineapple peut être modifié avec pyenv
[Python] Créez un linebot pour écrire le nom et l'âge sur l'image
Migration de Python2 vers Python3 (Python2 est reconstruit comme un environnement virtuel et coexiste)
Comment mettre un espace demi-largeur avant les lettres et les chiffres en Python.