J'ai eu du mal ces derniers temps parce que je voulais mettre un compteur associatif thread-safe dans un microservice. Les exigences sont comme ça.
--Incrémenter la valeur de comptage des différentes clés pour chaque demande
J'ai d'abord examiné la fonction de métriques de Finagle, que j'utilise comme framework, mais il ne semblait pas y avoir de moyen de le réinitialiser, et cela ne ressemblait pas à enregistrer des centaines de milliers de clés, alors j'aimerais le faire moi-même. J'ai décidé de le faire avec. Donc, si vous jetez un œil à la documentation du tableau associatif thread-safe de Java ConcurrentHashMap
ConcurrentHashMap peut être utilisé comme carte de fréquence évolutive (au format histogramme ou multi-ensembles) en utilisant la valeur de LongAdder et en l'initialisant avec computeIfAbsent. Par exemple, pour ajouter des décomptes aux fréquences ConcurrentHashMap <String, LongAdder>, vous pouvez utiliser freqs.computeIfAbsent (k-> new LongAdder ()). Increment ();.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
Il semble que c'est exactement ce que je recherchais. Je pensais que LongAdder n'était pas AtomicLong, mais il semble qu'il soit utilisé correctement en fonction du but.
Cette classe est généralement préférée à AtomicLong lorsque les sommes courantes utilisées à des fins telles que la collecte de statistiques plutôt que pour un contrôle de synchronisation précis sont mises à jour par plusieurs threads. Les caractéristiques des deux classes sont similaires lorsqu'il y a moins de conflits pour les mises à jour. Lorsqu'il y a beaucoup de concurrence, le débit attendu est beaucoup plus élevé dans cette classe. Cependant, il consomme également plus de capacité.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html
Et en fait, Scala a également un tableau associatif thread-safe appelé TrieMap, donc cette fois j'ai décidé de l'implémenter en utilisant TrieMap et LongAdder. Autrement dit, déclarez un compteur associatif comme celui-ci,
val counter = TrieMap[Key, LongAdder]()
Incrémenter partout dans chaque processus de requête, allouer de la mémoire si la clé n'existe pas,
counter.getOrElseUpdate(key, new LongAdder()).increment()
Séparément, définissez une minuterie et supprimez des éléments tout en produisant périodiquement des journaux,
for (key <- counter.keys) {
val sum = counter.remove(key).sum()
logger.debug(s"$key: $sum")
}
J'avais l'habitude de l'implémenter avec l'idée que c'est si facile, mais ce n'est pas vraiment un compteur précis, n'est-ce pas? Toutes les méthodes utilisées sont atomiques, mais même si vous combinez des méthodes atomiques, cela ne devient pas atomique, donc je n'ai pas confirmé d'exemple réel, mais il devrait y avoir un tel cas.
getOrElseUpdate (clé, nouveau LongAdder ())
remove (key)
sum ()
Si les mêmes clés sont traitées dans cet ordre, le dernier ʻincrement () `longAdder a déjà été supprimé de TrieMap et la valeur de sortie du journal a été évaluée, donc les informations incrémentées seront bues dans l'obscurité. .. En outre, du point de vue de la charge de calcul, TrieMap devient vide à la fois lorsque le journal est sorti, puis se développe à chaque fois qu'il est incrémenté, il y a donc un inconvénient que la charge augmente avant et après la sortie du journal.
Il semble difficile de réinitialiser et de supprimer des éléments en même temps, donc pour le moment, lors de la sortie du journal,
for {
key <- counter.keys
value <- counter.get(key)
} {
val sum = value.sumThenReset()
logger.debug(s"$key: $sum")
}
J'ai pensé que je devrais simplement le réinitialiser
public long sumThenReset() L'effet est le même que lorsque reset () est exécuté après sum (). Par exemple, cette méthode peut être appliquée pendant une période de repos entre des calculs multithreads. Si des mises à jour sont effectuées en parallèle avec cette méthode, la valeur renvoyée n'est pas garantie comme étant la dernière valeur survenue avant la réinitialisation.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html#sumThenReset--
Comment, cette information également incrémentée peut être bue. Cela semble inévitable en raison de la structure de données de LongAdder, donc si vous regardez AtomicLong,
public final long getAndSet(long newValue) Amplifie la valeur spécifiée et renvoie la valeur précédente.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#getAndSet-long-
C'est atomique. Maintenant, changez la structure de données du compteur associatif,
val counter = TrieMap[Key, AtomicLong]()
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.getAndSet(0)
logger.debug(s"$key: $v")
}
C'est une minuterie différente de la sortie du journal, et si la valeur de comptage est 0, il semble bon de supprimer l'élément. Cette implémentation
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.get()
if (v == 0) counter.remove(key)
}
Bien sûr, s'il est incrémenté de get ()
à remove (key)
, oui, il fait sombre! (Bon travail!)
En d'autres termes, il y a une chose tellement pratique que le jugement de condition de la valeur et la suppression de l'élément doivent être effectués de manière atomique.
def remove(k: K, v: V): Boolean Removes the entry for the specified key if it's currently mapped to the specified value.
https://www.scala-lang.org/api/2.12.7/scala/collection/concurrent/TrieMap.html#remove(k:K,v:V):Boolean
Il y en a, mais c'est un jugement de match. Comme AtomicLong est un type référence, il peut être utilisé comme valeur de comptage mais comme instance.
scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false
Ce n'est pas égal à ça. C'est la même chose avec ConcurrentHashMap.
public boolean remove(Object key, Object value) Supprime l'entrée de la clé uniquement si la clé est actuellement mappée à la valeur spécifiée. Ceci équivaut à la description suivante.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#remove-java.lang.Object-java.lang.Object-
Par conséquent, pour utiliser cette suppression, le type de valeur doit être défini sur la primitive Long, et le contrôle exclusif qui incrémente atomic doit s'appuyer sur ConcurrentHashMap.compute ou similaire.
compute public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Tente de calculer le mappage pour la clé spécifiée et la valeur actuellement mappée (null si le mappage actuel n'existe pas). L'appel de méthode entier est exécuté de manière atomique. Gardez le calcul court et facile, car certaines des opérations de mise à jour que d'autres threads tentent sur cette carte peuvent être bloquées pendant que le calcul est en cours. N'essayez pas non plus de mettre à jour d'autres mappages de cette carte dans le calcul
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-
Et, en fait, j'ai remarqué quand je me suis tordu la tête un peu avant cela, mais il y avait un exemple d'une telle implémentation à Gauva.
public final class AtomicLongMap<K> implements Serializable {
private final ConcurrentHashMap<K, Long> map;
https://github.com/google/guava/blob/v27.0.1/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L59
Cela semble également avoir été implémenté dans AtomicLong dans le passé,
public final class AtomicLongMap<K> {
private final ConcurrentHashMap<K, AtomicLong> map;
https://github.com/google/guava/blob/v20.0/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L55
Je pense qu'il y avait quelque chose qui n'allait pas avec le commutateur.
En conclusion, il semble que l'AtomicLongMap de Gauva devrait être utilisé ou imité et implémenté. Le traitement parallèle est difficile.
Recommended Posts