Dans cet article, nous présenterons la bibliothèque ** Java ** facile à utiliser et hautement configurable «** Alibaba Cloud LOG Java Producer **» qui prend en charge l'envoi de données au service de journalisation.
Les journaux sont partout. En tant que support pour enregistrer les changements dans le monde, les journaux sont largement utilisés dans de nombreux domaines tels que le marketing, la recherche et développement, l'exploitation, la sécurité, la BI et l'audit.
Alibaba Log Service est une plate-forme de service tout-en-un pour les données de journal. Son composant principal, LogHub, est une infrastructure pour le traitement du Big Data, en particulier le traitement des données en temps réel, avec des fonctionnalités telles qu'un débit élevé, une faible latence et une mise à l'échelle automatique. Flink, Spark, Les travaux exécutés sur des moteurs de calcul Big Data tels que Storm écrivent les résultats du traitement des données et les résultats intermédiaires dans LogHub en temps réel. Je vais. En utilisant les données de LogHub, les systèmes en aval peuvent fournir de nombreux services tels que l'analyse des requêtes, la surveillance des alarmes, l'apprentissage automatique et les calculs itératifs. L'architecture de traitement de Big Data de LogHub ressemble à la figure suivante.
Pour que le système fonctionne correctement, vous devez utiliser une méthode d'écriture de données pratique et efficace. L'utilisation directe d'API et de SDK ne suffit pas pour répondre aux exigences de capacité d'écriture de données dans les scénarios Big Data. "Alibaba Cloud LOG Java Producer" y a été développé.
Alibaba Cloud LOG Java Producer est une bibliothèque de classes Java facile à utiliser et hautement configurable. Il a les fonctions suivantes.
L'écriture de données sur LogHub à l'aide de Producer présente les avantages suivants par rapport à l'utilisation de l'API ou du SDK.
Avec de grandes quantités de données et des ressources limitées, une logique complexe telle que le multithreading, la politique de cache, le traitement par lots et la nouvelle tentative en cas d'échec doit être mise en œuvre pour atteindre le débit souhaité. Producer met en œuvre la logique ci-dessus pour améliorer les performances des applications et simplifier le processus de développement des applications.
Si vous disposez de suffisamment de mémoire cache, Producer mettra en cache les données que vous envoyez à LogHub. Lorsque vous appelez la méthode d'envoi, les données spécifiées seront envoyées immédiatement sans bloquer le traitement. Ceci réalise la séparation du calcul et de la logique d'E / S. À une date ultérieure, vous pouvez récupérer les résultats de transmission de données à partir des futurs objets renvoyés et des rappels enregistrés.
La taille de la mémoire utilisée par Producer pour mettre en cache les données à envoyer peut être contrôlée par des paramètres ainsi que par le nombre de threads utilisés pour la tâche d'envoi de données. Cela évite au producteur de consommer des ressources illimitées. Vous pouvez également équilibrer la consommation de ressources et le débit d'écriture en fonction de la situation réelle.
En résumé, Producer offre de nombreux avantages en traitant automatiquement les détails sous-jacents complexes et en exposant une interface simple. De plus, cela n'affecte pas le fonctionnement normal des services de couche supérieure et peut réduire considérablement le seuil d'accès aux données.
Pour mieux comprendre les performances de Producer, cette section décrit le fonctionnement de Producer, notamment la logique d'écriture des données, l'implémentation des composants principaux et l'arrêt progressif. L'architecture globale de Producer est illustrée dans la figure ci-dessous.
Logique d'écriture des données du producteur:
producteur.send ()
pour envoyer les données au magasin de journaux spécifié, les données seront chargées dans le lot Producer dans le LogAccumulator. La méthode send renvoie généralement le résultat immédiatement. Cependant, si l'instance Producer ne dispose pas de suffisamment d'espace pour stocker les données souhaitées, la méthode d'envoi se bloquera jusqu'à ce que l'une des conditions suivantes soit remplie:------ 1, les données précédemment mises en cache sont traitées par le gestionnaire de lots et la mémoire occupée par ces données est libérée. En conséquence, le producteur disposera de suffisamment d'espace pour stocker les données d'intérêt. ------ 2, une exception sera déclenchée si le temps de blocage spécifié est dépassé.
Lors de l'appel de Producer.send (), le nombre de journaux du lot cible peut dépasser maxBatchCount, ou le lot cible peut ne pas avoir assez d'espace pour stocker les données cible. Dans ce cas, Producer envoie d'abord le lot cible à IOThreadPool, puis crée un nouveau lot pour stocker les données cible. Pour éviter de bloquer les threads, IOThreadPool utilise une file d'attente de blocage illimitée. Le nombre de journaux pouvant être mis en cache dans une instance Producer est limité, de sorte que la longueur de la file d'attente n'augmentera pas indéfiniment.
Mover parcourt chaque lot Producer dans LogAccumulator et envoie les lots qui dépassent la durée maximale du cache à expiredBatches. Il enregistre également le premier délai d'expiration (t) pour les lots non expirés. 4, puis LogAccumulator envoie un lot expiré à IOThreadPool.
Après cela, Mover obtient le lot Producer qui correspond aux conditions de transmission de RetryQueue. Si aucun lot ne remplit les conditions, attendez une période de t.
Envoyez ensuite le lot expiré de RetryQueue à IOThreadPool. À la fin de l'étape 6, Mover répète les étapes 3 à 6.
Les threads de travail IOThreadPool envoient des lots des files d'attente bloquées au journal cible. 8, une fois le lot envoyé au magasin de journaux, il va dans la file d'attente de réussite.
Si la transmission échoue et que l'une des conditions suivantes est remplie, accédez à la file d'attente des échecs. ------- 1, le lot ayant échoué ne peut pas être retenté. ------- 2, RetryQueue sera fermé. ------- 3, Le nombre de tentatives spécifié a été atteint et le nombre de lots dans la file d'attente des échecs ne dépasse pas la moitié du nombre total de lots envoyés.
Sinon, le thread de travail calculera la prochaine heure d'envoi du lot ayant échoué et l'envoyera à RetryQueue.
Le thread SuccessBatchHandler prend le lot de la file d'attente de réussite et exécute tous les rappels enregistrés dans ce lot.
Le thread FailureBatchHandler prend le lot de la file d'attente des échecs et exécute tous les rappels enregistrés dans ce lot.
Le composant principal de Producer est [LogAccumulator](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer /internals/LogAccumulator.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=LogAccumulator.java), [RetryQueue](https://github.com/aliyun/aliyun-log-java-producer/blob/master/service /java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=RetryQueue.java), Mover -log-java-producteur / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producteur / internals / Mover.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = Mover.java), [ IOThreadPool](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/IOThreadPool.java?spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = IOThreadPool.java), [SendProducerBatchTask](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=SendProducerBatchTask.java), [BatchHandler/](httyps: // -log-java-producteur / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producteur / internals / BatchHandler.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = BatchHandler.java) ..
LogAccumulator Il est courant de stocker des données par lots plus importants et d'envoyer les données par lots pour améliorer le débit. Le rôle principal du LogAccumulator décrit ici est de fusionner les données en lots. Pour fusionner différentes données dans un lot plus volumineux, les données doivent avoir les mêmes propriétés de projet, de magasin de journaux, de rubrique, de source et de shardHash. LogAccumulator met ces données en cache à différents emplacements sur la carte interne en fonction de ces propriétés. La clé de la carte est constituée des 5 éléments des 5 propriétés ci-dessus et la valeur est ProducerBatch. ConcurrentMap est utilisé pour garantir la sécurité des threads et une concurrence élevée.
Une autre fonctionnalité de LogAccumulator est de contrôler la taille totale des données mises en cache. J'utilise Semaphore pour implémenter cette logique de contrôle. Semaphore est un outil de synchronisation hautes performances basé sur AbstractQueuedSynchronizer (basé sur AQS). Semaphore tente d'abord d'acquérir des ressources partagées en tournant, ce qui réduit la surcharge de changement de contexte.
RetryQueue RetryQueue est utilisé pour stocker les lots dont l'envoi a échoué et qui attendent d'être retentés. Chacun de ces lots a un champ qui indique quand envoyer le lot. Pour récupérer efficacement les lots expirés, le producteur dispose d'une file d'attente de retard pour stocker ces lots. DelayQueue est une file d'attente à priorité élevée basée sur le temps qui traite en premier le premier lot expiré. Cette file d'attente est thread-safe.
Mover Mover est un fil distinct. LogAccumulator et RetryQueue envoient régulièrement des lots expirés à IOThreadPool. Mover occupe les ressources du processeur même lorsqu'il est inactif. Pour éviter de gaspiller les ressources du processeur, Mover attend les lots expirés de RetryQueue alors qu'il ne peut pas trouver un lot qualifié envoyé par LogAccumulator et RetryQueue. Cette période correspond à la durée maximale du cache configurée.
IOThreadPool Le thread de travail dans IOThreadPool envoie des données au magasin de journaux. La taille de IOThreadPool peut être spécifiée avec le paramètre ioThreadCount et la valeur par défaut est le double du nombre de processeurs.
SendProducerBatchTask SendProducerBatchTask est encapsulé dans la logique d'envoi par lots. Pour éviter de bloquer le thread d'E / S, SendProducerBatchTask envoie le lot cible à une autre file d'attente pour une exécution de rappel, que le lot cible ait été envoyé ou non avec succès. En outre, si le lot ayant échoué répond aux critères de nouvelle tentative, il ne sera pas renvoyé immédiatement dans le thread d'E / S actuel. S'il est renvoyé immédiatement, il échoue généralement à nouveau. Au lieu de cela, SendProducerBatchTask l'envoie à RetryQueue selon une stratégie d'interruption exponentielle.
BatchHandler Le producteur lance SuccessBatchHandler et FailureBatchHandler pour gérer les lots réussis et non réussis. Une fois que le gestionnaire a terminé l'exécution du rappel et la configuration future du lot, il libère la mémoire occupée par ce lot pour de nouvelles données. Un traitement séparé garantit que les lots envoyés avec succès et les lots infructueux sont séparés. Cela garantit le bon fonctionnement du producteur.
GracefulShutdown Pour implémenter GracefulShutdown, les conditions suivantes doivent être remplies:
Pour répondre aux exigences ci-dessus, la logique de fermeture du producteur est conçue comme suit:
De cette manière, en fermant les files d'attente et les threads un par un en fonction du sens du flux de données, un arrêt progressif et une terminaison sûre sont obtenus.
Alibaba Cloud LOG Java Producer est une mise à niveau complète des versions précédentes de Producer. Il résout de nombreux problèmes avec les versions précédentes, tels qu'une utilisation élevée du processeur en cas d'exception de réseau et une faible perte de données lors de la sortie de Producer. De plus, le mécanisme de tolérance aux pannes a été renforcé. Le producteur peut garantir une utilisation appropriée des ressources, un débit élevé et une isolation étroite même après une erreur.
Recommended Posts