Java Library-Alibi Cloud's LOG Java Producer aide à envoyer des données aux services de journalisation

Dans cet article, nous présenterons la bibliothèque ** Java ** facile à utiliser et hautement configurable «** Alibaba Cloud LOG Java Producer **» qui prend en charge l'envoi de données au service de journalisation.

Contexte

Les journaux sont partout. En tant que support pour enregistrer les changements dans le monde, les journaux sont largement utilisés dans de nombreux domaines tels que le marketing, la recherche et développement, l'exploitation, la sécurité, la BI et l'audit.

image.png

Alibaba Log Service est une plate-forme de service tout-en-un pour les données de journal. Son composant principal, LogHub, est une infrastructure pour le traitement du Big Data, en particulier le traitement des données en temps réel, avec des fonctionnalités telles qu'un débit élevé, une faible latence et une mise à l'échelle automatique. Flink, Spark, Les travaux exécutés sur des moteurs de calcul Big Data tels que Storm écrivent les résultats du traitement des données et les résultats intermédiaires dans LogHub en temps réel. Je vais. En utilisant les données de LogHub, les systèmes en aval peuvent fournir de nombreux services tels que l'analyse des requêtes, la surveillance des alarmes, l'apprentissage automatique et les calculs itératifs. L'architecture de traitement de Big Data de LogHub ressemble à la figure suivante.

image.png

Pour que le système fonctionne correctement, vous devez utiliser une méthode d'écriture de données pratique et efficace. L'utilisation directe d'API et de SDK ne suffit pas pour répondre aux exigences de capacité d'écriture de données dans les scénarios Big Data. "Alibaba Cloud LOG Java Producer" y a été développé.

Fonctionnalité

Alibaba Cloud LOG Java Producer est une bibliothèque de classes Java facile à utiliser et hautement configurable. Il a les fonctions suivantes.

  1. Thread Safe: Toutes les méthodes exposées par Alibaba Cloud LOG Java Producer («Producer») sont thread-safe.
  2. Envoi asynchrone: les appels à la méthode SEND du producteur sont généralement renvoyés immédiatement, sans attendre l'envoi de données ou une réponse du serveur. Producer dispose d'un mécanisme de cache interne (LogAcccumulator) pour mettre en cache les données à envoyer dans un lot et améliore le débit en envoyant les données dans un lot.
  3. Réessai automatique: Producer fournit un mécanisme de relance configurable automatiquement (RetryQueue) pour les exceptions réessayables. Vous pouvez définir la durée maximale de nouvelle tentative et la période d'attente pour RetryQueue. 4, traçabilité: vous pouvez utiliser des rappels et des contrats à terme pour savoir si les données en question ont été envoyées avec succès et les tentatives faites pour envoyer les données. Vous pouvez utiliser cette fonction pour suivre les problèmes et prendre des décisions pour les résoudre. 5, restauration de contexte: les journaux générés par le même producteur sont dans le même contexte, et les journaux associés avant et après un certain journal peuvent être vérifiés côté serveur.
  4. Arrêt: lorsque la méthode close renvoie un résultat, toutes les données mises en cache par Producer seront traitées et vous en serez informé.

mérite

L'écriture de données sur LogHub à l'aide de Producer présente les avantages suivants par rapport à l'utilisation de l'API ou du SDK.

Haute performance

Avec de grandes quantités de données et des ressources limitées, une logique complexe telle que le multithreading, la politique de cache, le traitement par lots et la nouvelle tentative en cas d'échec doit être mise en œuvre pour atteindre le débit souhaité. Producer met en œuvre la logique ci-dessus pour améliorer les performances des applications et simplifier le processus de développement des applications.

Exécution de tâches asynchrone et non bloquante

Si vous disposez de suffisamment de mémoire cache, Producer mettra en cache les données que vous envoyez à LogHub. Lorsque vous appelez la méthode d'envoi, les données spécifiées seront envoyées immédiatement sans bloquer le traitement. Ceci réalise la séparation du calcul et de la logique d'E / S. À une date ultérieure, vous pouvez récupérer les résultats de transmission de données à partir des futurs objets renvoyés et des rappels enregistrés.

Utilisation de ressources contrôlables

La taille de la mémoire utilisée par Producer pour mettre en cache les données à envoyer peut être contrôlée par des paramètres ainsi que par le nombre de threads utilisés pour la tâche d'envoi de données. Cela évite au producteur de consommer des ressources illimitées. Vous pouvez également équilibrer la consommation de ressources et le débit d'écriture en fonction de la situation réelle.

Résumé

En résumé, Producer offre de nombreux avantages en traitant automatiquement les détails sous-jacents complexes et en exposant une interface simple. De plus, cela n'affecte pas le fonctionnement normal des services de couche supérieure et peut réduire considérablement le seuil d'accès aux données.

Explication du mécanisme

Pour mieux comprendre les performances de Producer, cette section décrit le fonctionnement de Producer, notamment la logique d'écriture des données, l'implémentation des composants principaux et l'arrêt progressif. L'architecture globale de Producer est illustrée dans la figure ci-dessous.

image.png

Écriture de données

Logique d'écriture des données du producteur:

  1. Après avoir appelé la méthode producteur.send () pour envoyer les données au magasin de journaux spécifié, les données seront chargées dans le lot Producer dans le LogAccumulator. La méthode send renvoie généralement le résultat immédiatement. Cependant, si l'instance Producer ne dispose pas de suffisamment d'espace pour stocker les données souhaitées, la méthode d'envoi se bloquera jusqu'à ce que l'une des conditions suivantes soit remplie:

------ 1, les données précédemment mises en cache sont traitées par le gestionnaire de lots et la mémoire occupée par ces données est libérée. En conséquence, le producteur disposera de suffisamment d'espace pour stocker les données d'intérêt. ------ 2, une exception sera déclenchée si le temps de blocage spécifié est dépassé.

  1. Lors de l'appel de Producer.send (), le nombre de journaux du lot cible peut dépasser maxBatchCount, ou le lot cible peut ne pas avoir assez d'espace pour stocker les données cible. Dans ce cas, Producer envoie d'abord le lot cible à IOThreadPool, puis crée un nouveau lot pour stocker les données cible. Pour éviter de bloquer les threads, IOThreadPool utilise une file d'attente de blocage illimitée. Le nombre de journaux pouvant être mis en cache dans une instance Producer est limité, de sorte que la longueur de la file d'attente n'augmentera pas indéfiniment.

  2. Mover parcourt chaque lot Producer dans LogAccumulator et envoie les lots qui dépassent la durée maximale du cache à expiredBatches. Il enregistre également le premier délai d'expiration (t) pour les lots non expirés. 4, puis LogAccumulator envoie un lot expiré à IOThreadPool.

  3. Après cela, Mover obtient le lot Producer qui correspond aux conditions de transmission de RetryQueue. Si aucun lot ne remplit les conditions, attendez une période de t.

  4. Envoyez ensuite le lot expiré de RetryQueue à IOThreadPool. À la fin de l'étape 6, Mover répète les étapes 3 à 6.

  5. Les threads de travail IOThreadPool envoient des lots des files d'attente bloquées au journal cible. 8, une fois le lot envoyé au magasin de journaux, il va dans la file d'attente de réussite.

  6. Si la transmission échoue et que l'une des conditions suivantes est remplie, accédez à la file d'attente des échecs. ------- 1, le lot ayant échoué ne peut pas être retenté. ------- 2, RetryQueue sera fermé. ------- 3, Le nombre de tentatives spécifié a été atteint et le nombre de lots dans la file d'attente des échecs ne dépasse pas la moitié du nombre total de lots envoyés.

  7. Sinon, le thread de travail calculera la prochaine heure d'envoi du lot ayant échoué et l'envoyera à RetryQueue.

  8. Le thread SuccessBatchHandler prend le lot de la file d'attente de réussite et exécute tous les rappels enregistrés dans ce lot.

  9. Le thread FailureBatchHandler prend le lot de la file d'attente des échecs et exécute tous les rappels enregistrés dans ce lot.

Composants principaux

Le composant principal de Producer est [LogAccumulator](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer /internals/LogAccumulator.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=LogAccumulator.java), [RetryQueue](https://github.com/aliyun/aliyun-log-java-producer/blob/master/service /java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=RetryQueue.java), Mover -log-java-producteur / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producteur / internals / Mover.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = Mover.java), [ IOThreadPool](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/IOThreadPool.java?spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = IOThreadPool.java), [SendProducerBatchTask](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=SendProducerBatchTask.java), [BatchHandler/](httyps: // -log-java-producteur / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producteur / internals / BatchHandler.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = BatchHandler.java) ..

LogAccumulator Il est courant de stocker des données par lots plus importants et d'envoyer les données par lots pour améliorer le débit. Le rôle principal du LogAccumulator décrit ici est de fusionner les données en lots. Pour fusionner différentes données dans un lot plus volumineux, les données doivent avoir les mêmes propriétés de projet, de magasin de journaux, de rubrique, de source et de shardHash. LogAccumulator met ces données en cache à différents emplacements sur la carte interne en fonction de ces propriétés. La clé de la carte est constituée des 5 éléments des 5 propriétés ci-dessus et la valeur est ProducerBatch. ConcurrentMap est utilisé pour garantir la sécurité des threads et une concurrence élevée.

Une autre fonctionnalité de LogAccumulator est de contrôler la taille totale des données mises en cache. J'utilise Semaphore pour implémenter cette logique de contrôle. Semaphore est un outil de synchronisation hautes performances basé sur AbstractQueuedSynchronizer (basé sur AQS). Semaphore tente d'abord d'acquérir des ressources partagées en tournant, ce qui réduit la surcharge de changement de contexte.

RetryQueue RetryQueue est utilisé pour stocker les lots dont l'envoi a échoué et qui attendent d'être retentés. Chacun de ces lots a un champ qui indique quand envoyer le lot. Pour récupérer efficacement les lots expirés, le producteur dispose d'une file d'attente de retard pour stocker ces lots. DelayQueue est une file d'attente à priorité élevée basée sur le temps qui traite en premier le premier lot expiré. Cette file d'attente est thread-safe.

Mover Mover est un fil distinct. LogAccumulator et RetryQueue envoient régulièrement des lots expirés à IOThreadPool. Mover occupe les ressources du processeur même lorsqu'il est inactif. Pour éviter de gaspiller les ressources du processeur, Mover attend les lots expirés de RetryQueue alors qu'il ne peut pas trouver un lot qualifié envoyé par LogAccumulator et RetryQueue. Cette période correspond à la durée maximale du cache configurée.

IOThreadPool Le thread de travail dans IOThreadPool envoie des données au magasin de journaux. La taille de IOThreadPool peut être spécifiée avec le paramètre ioThreadCount et la valeur par défaut est le double du nombre de processeurs.

SendProducerBatchTask SendProducerBatchTask est encapsulé dans la logique d'envoi par lots. Pour éviter de bloquer le thread d'E / S, SendProducerBatchTask envoie le lot cible à une autre file d'attente pour une exécution de rappel, que le lot cible ait été envoyé ou non avec succès. En outre, si le lot ayant échoué répond aux critères de nouvelle tentative, il ne sera pas renvoyé immédiatement dans le thread d'E / S actuel. S'il est renvoyé immédiatement, il échoue généralement à nouveau. Au lieu de cela, SendProducerBatchTask l'envoie à RetryQueue selon une stratégie d'interruption exponentielle.

BatchHandler Le producteur lance SuccessBatchHandler et FailureBatchHandler pour gérer les lots réussis et non réussis. Une fois que le gestionnaire a terminé l'exécution du rappel et la configuration future du lot, il libère la mémoire occupée par ce lot pour de nouvelles données. Un traitement séparé garantit que les lots envoyés avec succès et les lots infructueux sont séparés. Cela garantit le bon fonctionnement du producteur.

GracefulShutdown Pour implémenter GracefulShutdown, les conditions suivantes doivent être remplies:

  1. Lorsque la méthode close vous renvoie le résultat, tous les threads du producteur doivent être fermés. Vous devez également que les données mises en cache soient correctement traitées, que tous les rappels que vous avez enregistrés soient exécutés et que tous les contrats à terme à vous renvoyer soient définis.
  2. Vous devez également pouvoir définir le temps d'attente maximal pour la méthode de fermeture. La méthode doit vous renvoyer le résultat immédiatement après le dépassement de cette période, que le thread se soit terminé ou que les données mises en cache aient été traitées.
  3. La méthode close peut être appelée plusieurs fois même dans un environnement multi-thread et fonctionne normalement.
  4. Il est sûr d'appeler la méthode close dans le rappel et cela ne provoquera pas de blocage dans l'application.

Pour répondre aux exigences ci-dessus, la logique de fermeture du producteur est conçue comme suit:

  1. Fermez l'accumulateur de journaux. Si vous continuez à écrire des données dans le LogAccumulator, vous obtiendrez une exception.
  2. Fermez le RetryQueue. Si vous continuez à envoyer des lots à RetryQueue, une exception sera levée.
  3. Fermez Mover et attendez qu'il se termine complètement. Après avoir détecté le signal de fermeture, Mover envoie tous les lots restants de LogAccumulator et RetryQueue à IOThreadPool, que les conditions d'envoi soient satisfaites ou non. Pour éviter la perte de données, Mover extrait toujours les lots de LogAccumulator et RetryQueue jusqu'à ce qu'aucun autre thread n'écrit.
  4. Fermez IOThreadPool et attendez que toutes les tâches soumises soient terminées. Si le RetryQueue est déjà fermé, le lot ayant échoué sera envoyé directement à la file d'attente des échecs.
  5. Fermez SuccessBatchHandler et attendez qu'il se termine complètement. Si la méthode close est appelée dans le rappel, le processus d'attente est ignoré. Après avoir détecté le signal de fermeture, SuccessBatchHandler récupère tous les lots de la file d'attente de réussite et les traite un par un.
  6. Fermez FailureBatchHandler et attendez qu'il se termine complètement. Si la méthode close est appelée dans le rappel, le processus d'attente est ignoré. Après avoir détecté le signal de fermeture, FailureBatchHandler récupère tous les lots de la file d'attente des échecs et les traite un par un.

De cette manière, en fermant les files d'attente et les threads un par un en fonction du sens du flux de données, un arrêt progressif et une terminaison sûre sont obtenus.

Résumé

Alibaba Cloud LOG Java Producer est une mise à niveau complète des versions précédentes de Producer. Il résout de nombreux problèmes avec les versions précédentes, tels qu'une utilisation élevée du processeur en cas d'exception de réseau et une faible perte de données lors de la sortie de Producer. De plus, le mécanisme de tolérance aux pannes a été renforcé. Le producteur peut garantir une utilisation appropriée des ressources, un débit élevé et une isolation étroite même après une erreur.

Recommended Posts

Java Library-Alibi Cloud's LOG Java Producer aide à envoyer des données aux services de journalisation
Comment utiliser LOG Java Producer d'Alibaba Cloud
Classe Kotlin à envoyer aux développeurs Java
Enregistrer la sortie dans un fichier en Java
Kotlin's Class part.2 à envoyer aux développeurs Java
[Java] Comment ajouter des données à la liste (add, addAll)
Fonctions de portée Kotlin à envoyer aux développeurs Java
Java: Comment envoyer des valeurs du servlet au servlet
La sécurité Null de Kotlin à envoyer aux développeurs Java