Ich habe in letzter Zeit Probleme, weil ich einen thread-sicheren assoziativen Zähler in einen Microservice einbauen wollte. Die Anforderungen sind wie folgt.
Ich habe zuerst die Mterics-Funktion von Finagle in Betracht gezogen, die ich als Framework verwende, aber es schien nicht möglich zu sein, sie zurückzusetzen, und es schien nicht so, als würde ich Hunderttausende von Schlüsseln registrieren, also würde ich es gerne selbst tun. Ich habe beschlossen, es mit zu machen. Schauen Sie sich also die Dokumentation zum Thread-sicheren assoziativen Array ConcurrentHashMap von Java an
ConcurrentHashMap kann als skalierbare Frequenzkarte (im Histogramm- oder Multi-Set-Format) verwendet werden, indem der Wert von LongAdder verwendet und mit computeIfAbsent initialisiert wird. Um beispielsweise ConcurrentHashMap <String, LongAdder> freqs Zählwerte hinzuzufügen, können Sie freqs.computeIfAbsent (k-> new LongAdder ()) verwenden. Increment ();
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
Es scheint, dass dies genau das ist, wonach ich gesucht habe. Ich dachte, dass LongAdder nicht AtomicLong ist, aber es scheint, dass es je nach Zweck richtig verwendet wird.
Diese Klasse wird normalerweise AtomicLong vorgezogen, wenn allgemeine Summen, die für Zwecke wie die Erfassung von Statistiken und nicht für eine fein abgestimmte Synchronisationssteuerung verwendet werden, von mehreren Threads aktualisiert werden. Die Eigenschaften der beiden Klassen sind ähnlich, wenn weniger um Aktualisierungen gekämpft wird. Bei starkem Wettbewerb ist der erwartete Durchsatz in dieser Klasse viel höher. Es verbraucht jedoch auch mehr Kapazität.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html
Tatsächlich hat Scala auch ein thread-sicheres assoziatives Array namens TrieMap, daher habe ich mich dieses Mal entschlossen, es mit TrieMap und LongAdder zu implementieren. Das heißt, deklarieren Sie einen assoziativen Zähler wie diesen,
val counter = TrieMap[Key, LongAdder]()
Inkrementieren Sie überall in jedem Anforderungsprozess, weisen Sie Speicher zu, wenn der Schlüssel nicht vorhanden ist.
counter.getOrElseUpdate(key, new LongAdder()).increment()
Stellen Sie separat einen Timer ein und löschen Sie Elemente, während Sie regelmäßig Protokolle ausgeben.
for (key <- counter.keys) {
val sum = counter.remove(key).sum()
logger.debug(s"$key: $sum")
}
Früher habe ich es mit der Idee implementiert, dass es so einfach ist, aber es ist eigentlich kein genauer Zähler, nicht wahr? Orz Alle verwendeten Methoden sind atomar, aber selbst wenn Sie atomare kombinieren, wird es nicht atomar, daher habe ich kein tatsächliches Beispiel bestätigt, aber es sollte einen solchen Fall geben.
getOrElseUpdate (Schlüssel, neuer LongAdder ())
remove (key)
sum ()
increment ()
Wenn dieselben Schlüssel in dieser Reihenfolge verarbeitet werden, wurde der letzte increment ()
LongAdder bereits aus TrieMap entfernt und die Protokollausgabe wurde ausgewertet, sodass die inkrementierten Informationen im Dunkeln ertrinken. ..
Unter dem Gesichtspunkt der Rechenlast wird TrieMap bei der Ausgabe des Protokolls sofort leer und wird dann bei jeder Inkrementierung erweitert, sodass der Nachteil besteht, dass die Last vor und nach der Protokollausgabe zunimmt.
Es scheint umständlich zu sein, Elemente gleichzeitig zurückzusetzen und zu löschen. Wenn Sie also das Protokoll ausgeben,
for {
key <- counter.keys
value <- counter.get(key)
} {
val sum = value.sumThenReset()
logger.debug(s"$key: $sum")
}
Ich dachte, ich sollte es einfach zurücksetzen,
public long sumThenReset() Der Effekt ist der gleiche wie wenn reset () nach sum () ausgeführt wird. Beispielsweise kann dieses Verfahren während der Ruhephase zwischen Multithread-Berechnungen angewendet werden. Wenn parallel zu dieser Methode Aktualisierungen vorgenommen werden, ist nicht garantiert, dass der zurückgegebene Wert der letzte Wert ist, der vor dem Zurücksetzen aufgetreten ist.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html#sumThenReset--
Wie kann diese auch inkrementierte Information getrunken werden. Dies scheint aufgrund der Datenstruktur von LongAdder unvermeidlich zu sein. Wenn Sie sich also AtomicLong ansehen,
public final long getAndSet(long newValue) Verstärkt den angegebenen Wert und gibt den vorherigen Wert zurück.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#getAndSet-long-
Das ist atomar. Wechseln Sie nun die Datenstruktur des assoziativen Zählers.
val counter = TrieMap[Key, AtomicLong]()
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.getAndSet(0)
logger.debug(s"$key: $v")
}
Es ist ein Timer, der sich von der Protokollausgabe unterscheidet. Wenn der Zählwert 0 ist, scheint es gut, das Element zu löschen. Diese Implementierung
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.get()
if (v == 0) counter.remove(key)
}
Natürlich, wenn es von "get ()" auf "remove (key)" erhöht wird, ist es natürlich dunkel! (Gute Arbeit!) Mit anderen Worten, es gibt eine so bequeme Sache, dass die Bedingungsbeurteilung des Wertes und die Elementlöschung atomar durchgeführt werden müssen.
def remove(k: K, v: V): Boolean Removes the entry for the specified key if it's currently mapped to the specified value.
https://www.scala-lang.org/api/2.12.7/scala/collection/concurrent/TrieMap.html#remove(k:K,v:V):Boolean
Es gibt einige, aber es ist ein Match-Urteil. Da AtomicLong ein Referenztyp ist, kann er als Zählwert, aber als Instanz verwendet werden.
scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false
Das ist nicht gleich. Dies gilt auch für ConcurrentHashMap.
public boolean remove(Object key, Object value) Löscht den Eintrag für den Schlüssel nur, wenn der Schlüssel aktuell dem angegebenen Wert zugeordnet ist. Dies entspricht der folgenden Beschreibung.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#remove-java.lang.Object-java.lang.Object-
Um diese Entfernung zu verwenden, muss der Werttyp daher das primitive Long sein, und die ausschließliche Steuerung, mit der Atomic inkrementiert wird, muss auf ConcurrentHashMap.compute oder dergleichen beruhen.
compute public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Es wird versucht, die Zuordnung für den angegebenen Schlüssel und den aktuell zugeordneten Wert zu berechnen (null, wenn die aktuelle Zuordnung nicht vorhanden ist). Der gesamte Methodenaufruf wird atomar ausgeführt. Halten Sie die Berechnung kurz und einfach, da einige der Aktualisierungsvorgänge, die andere Threads auf dieser Karte ausführen, möglicherweise blockiert werden, während die Berechnung ausgeführt wird. Versuchen Sie auch nicht, andere Zuordnungen in dieser Karte in der Berechnung zu aktualisieren
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-
Tatsächlich bemerkte ich, dass ich kurz zuvor meinen Kopf verdreht hatte, aber es gab ein Beispiel für eine solche Implementierung in Gauva.
public final class AtomicLongMap<K> implements Serializable {
private final ConcurrentHashMap<K, Long> map;
https://github.com/google/guava/blob/v27.0.1/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L59
Dies scheint auch in der Vergangenheit in AtomicLong implementiert worden zu sein.
public final class AtomicLongMap<K> {
private final ConcurrentHashMap<K, AtomicLong> map;
https://github.com/google/guava/blob/v20.0/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L55
Ich habe das Gefühl, dass mit dem Schalter etwas nicht stimmt.
Zusammenfassend scheint es, dass Gauvas AtomicLongMap verwendet oder imitiert und implementiert werden sollte. Die parallele Verarbeitung ist schwierig.
Recommended Posts