[JAVA] C'était une vie que je voulais réinitialiser le compteur associatif thread-safe

J'ai eu du mal ces derniers temps parce que je voulais mettre un compteur associatif thread-safe dans un microservice. Les exigences sont comme ça.

--Incrémenter la valeur de comptage des différentes clés pour chaque demande

J'ai d'abord examiné la fonction de métriques de Finagle, que j'utilise comme framework, mais il ne semblait pas y avoir de moyen de le réinitialiser, et cela ne ressemblait pas à enregistrer des centaines de milliers de clés, alors j'aimerais le faire moi-même. J'ai décidé de le faire avec. Donc, si vous jetez un œil à la documentation du tableau associatif thread-safe de Java ConcurrentHashMap

ConcurrentHashMap peut être utilisé comme carte de fréquence évolutive (au format histogramme ou multi-ensembles) en utilisant la valeur de LongAdder et en l'initialisant avec computeIfAbsent. Par exemple, pour ajouter des décomptes aux fréquences ConcurrentHashMap <String, LongAdder>, vous pouvez utiliser freqs.computeIfAbsent (k-> new LongAdder ()). Increment ();.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

Il semble que c'est exactement ce que je recherchais. Je pensais que LongAdder n'était pas AtomicLong, mais il semble qu'il soit utilisé correctement en fonction du but.

Cette classe est généralement préférée à AtomicLong lorsque les sommes courantes utilisées à des fins telles que la collecte de statistiques plutôt que pour un contrôle de synchronisation précis sont mises à jour par plusieurs threads. Les caractéristiques des deux classes sont similaires lorsqu'il y a moins de conflits pour les mises à jour. Lorsqu'il y a beaucoup de concurrence, le débit attendu est beaucoup plus élevé dans cette classe. Cependant, il consomme également plus de capacité.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html

Et en fait, Scala a également un tableau associatif thread-safe appelé TrieMap, donc cette fois j'ai décidé de l'implémenter en utilisant TrieMap et LongAdder. Autrement dit, déclarez un compteur associatif comme celui-ci,

val counter = TrieMap[Key, LongAdder]()

Incrémenter partout dans chaque processus de requête, allouer de la mémoire si la clé n'existe pas,

counter.getOrElseUpdate(key, new LongAdder()).increment()

Séparément, définissez une minuterie et supprimez des éléments tout en produisant périodiquement des journaux,

for (key <- counter.keys) {
  val sum = counter.remove(key).sum()
  logger.debug(s"$key: $sum")
}

J'avais l'habitude de l'implémenter avec l'idée que c'est si facile, mais ce n'est pas vraiment un compteur précis, n'est-ce pas? Toutes les méthodes utilisées sont atomiques, mais même si vous combinez des méthodes atomiques, cela ne devient pas atomique, donc je n'ai pas confirmé d'exemple réel, mais il devrait y avoir un tel cas.

  1. Thread A: getOrElseUpdate (clé, nouveau LongAdder ())
  2. Filetage B: remove (key)
  3. Thread B: sum ()
  4. Thread A: ʻincrement () `

Si les mêmes clés sont traitées dans cet ordre, le dernier ʻincrement () `longAdder a déjà été supprimé de TrieMap et la valeur de sortie du journal a été évaluée, donc les informations incrémentées seront bues dans l'obscurité. .. En outre, du point de vue de la charge de calcul, TrieMap devient vide à la fois lorsque le journal est sorti, puis se développe à chaque fois qu'il est incrémenté, il y a donc un inconvénient que la charge augmente avant et après la sortie du journal.

Il semble difficile de réinitialiser et de supprimer des éléments en même temps, donc pour le moment, lors de la sortie du journal,

for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val sum = value.sumThenReset()
  logger.debug(s"$key: $sum")
}

J'ai pensé que je devrais simplement le réinitialiser

public long sumThenReset() L'effet est le même que lorsque reset () est exécuté après sum (). Par exemple, cette méthode peut être appliquée pendant une période de repos entre des calculs multithreads. Si des mises à jour sont effectuées en parallèle avec cette méthode, la valeur renvoyée n'est pas garantie comme étant la dernière valeur survenue avant la réinitialisation.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html#sumThenReset--

Comment, cette information également incrémentée peut être bue. Cela semble inévitable en raison de la structure de données de LongAdder, donc si vous regardez AtomicLong,

public final long getAndSet(long newValue) Amplifie la valeur spécifiée et renvoie la valeur précédente.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#getAndSet-long-

C'est atomique. Maintenant, changez la structure de données du compteur associatif,

val counter = TrieMap[Key, AtomicLong]()
for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val v = value.getAndSet(0)
  logger.debug(s"$key: $v")
}

C'est une minuterie différente de la sortie du journal, et si la valeur de comptage est 0, il semble bon de supprimer l'élément. Cette implémentation

for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val v = value.get()
  if (v == 0) counter.remove(key)
}

Bien sûr, s'il est incrémenté de get () à remove (key), oui, il fait sombre! (Bon travail!) En d'autres termes, il y a une chose tellement pratique que le jugement de condition de la valeur et la suppression de l'élément doivent être effectués de manière atomique.

def remove(k: K, v: V): Boolean Removes the entry for the specified key if it's currently mapped to the specified value.

https://www.scala-lang.org/api/2.12.7/scala/collection/concurrent/TrieMap.html#remove(k:K,v:V):Boolean

Il y en a, mais c'est un jugement de match. Comme AtomicLong est un type référence, il peut être utilisé comme valeur de comptage mais comme instance.

scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false

Ce n'est pas égal à ça. C'est la même chose avec ConcurrentHashMap.

public boolean remove(Object key, Object value) Supprime l'entrée de la clé uniquement si la clé est actuellement mappée à la valeur spécifiée. Ceci équivaut à la description suivante.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#remove-java.lang.Object-java.lang.Object-

Par conséquent, pour utiliser cette suppression, le type de valeur doit être défini sur la primitive Long, et le contrôle exclusif qui incrémente atomic doit s'appuyer sur ConcurrentHashMap.compute ou similaire.

compute public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Tente de calculer le mappage pour la clé spécifiée et la valeur actuellement mappée (null si le mappage actuel n'existe pas). L'appel de méthode entier est exécuté de manière atomique. Gardez le calcul court et facile, car certaines des opérations de mise à jour que d'autres threads tentent sur cette carte peuvent être bloquées pendant que le calcul est en cours. N'essayez pas non plus de mettre à jour d'autres mappages de cette carte dans le calcul

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-

Et, en fait, j'ai remarqué quand je me suis tordu la tête un peu avant cela, mais il y avait un exemple d'une telle implémentation à Gauva.

public final class AtomicLongMap<K> implements Serializable {
  private final ConcurrentHashMap<K, Long> map;

https://github.com/google/guava/blob/v27.0.1/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L59

Cela semble également avoir été implémenté dans AtomicLong dans le passé,

public final class AtomicLongMap<K> {
  private final ConcurrentHashMap<K, AtomicLong> map;

https://github.com/google/guava/blob/v20.0/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L55

Je pense qu'il y avait quelque chose qui n'allait pas avec le commutateur.

En conclusion, il semble que l'AtomicLongMap de Gauva devrait être utilisé ou imité et implémenté. Le traitement parallèle est difficile.

Recommended Posts

C'était une vie que je voulais réinitialiser le compteur associatif thread-safe
Quand j'ai voulu créer une méthode pour Premium Friday, c'était déjà dans l'API standard Java 8
L'histoire que je voulais développer Zip
Sauvegarde grammaire-RealmObject de transformation générique, ce que j'ai pensé parce que je suis fatigué d'écrire RealmMigration pendant le développement mais que je veux reprendre les données
Je voulais ajouter @VisibleForTesting à la méthode
J'étais accro à la méthode du rouleau
J'étais accro au test Spring-Batch
Une histoire à laquelle j'étais accro lors du test de l'API à l'aide de MockMVC
Ce que j'ai essayé quand je voulais obtenir tous les champs d'un haricot
Je voulais que (a == 1 && a == 2 && a == 3) vrai en Java
J'étais un peu accro à la comparaison S3 Checksum, alors prenez note.
Je voulais faire un diaporama du fond d'écran car l'image de l'écran de verrouillage de Windows 10 est magnifique
Notez que j'étais accro aux paramètres du projet Android d'IntelliJ IDEA
J'étais accro à NoSuchMethodError dans Cloud Endpoints
J'étais accro au record du modèle associé
Je souhaite ajouter une fonction de suppression à la fonction de commentaire
Ce à quoi j'étais accro lors de l'introduction de la bibliothèque JNI
J'étais accro à la mise à jour de la déclaration dans MyBatis
Je voulais juste créer une propriété réactive en Java
Je souhaite créer un formulaire pour sélectionner la catégorie [Rails]
J'étais accro au réglage de laradock + VSCode + xdebug
Ce à quoi j'étais accro avec l'API REST Redmine
Je veux donner un nom de classe à l'attribut select
J'étais confus parce qu'il y avait une scission dans le tableau
L'histoire à laquelle j'étais accro lors de la création de STS
J'ai créé une action GitHub qui facilite la compréhension du résultat de l'exécution de RSpec
[Circle CI] Une histoire à laquelle j'étais accro chez Start Building
Une note quand j'étais accro à la conversion d'Ubuntu sur WSL1 en WSL2
J'ai fait un petit bijou pour poster le texte du mode org sur qiita
À propos de la question pour laquelle j'étais accro à l'utilisation de hashmap
Je voulais faciliter la programmation JavaFX avec Spring Framework
[Java] J'ai essayé de faire un labyrinthe par la méthode de creusage ♪
[Introduction à JSP + Servlet] J'ai joué avec pendant un moment ♬
J'ai créé un outil pour afficher la différence du fichier CSV
"RSpec ne fonctionne pas!" La cause était le printemps, alors je l'ai étudiée.
J'étais accro au paramètre API version min23 de registerTorchCallback
J'ai utilisé le modèle Mediator pour exprimer un puzzle de traversée de rivière.
J'ai pu déployer l'application Docker + laravel + MySQL sur Heroku!
Une histoire à laquelle j'étais accro à deux reprises avec le paramètre de démarrage automatique de Tomcat 8 sur CentOS 8