[RUBY] So zählen Sie UTF-8-Codepunkte schnell

Führt einen Algorithmus ein, der die Anzahl der Codepunkte aus einer UTF-8-Zeichenfolge berechnet. Das Zählen von Codepunkten ist nicht allzu einfach zu schreiben, aber seine hocheffiziente Implementierung ist überraschend verwirrend.

Der Inhalt ist doppelt gefasst.

Als Bonus habe ich eine einfache Leistungsbewertung versucht. Es wird angenommen, dass die UTF-8-Zeichenfolge validiert wurde (es ist bekannt, dass es sich nicht um eine ungültige Sequenz handelt).

Wie es mit der internen Implementierung von Ruby funktioniert

Definieren Sie zunächst "is_utf8_lead_byte", um festzustellen, ob es sich um das führende Byte des Codepunkts handelt. Es ist ein Präprozessor-Makro.

#define is_utf8_lead_byte(c) (((c)&0xC0) != 0x80)

https://github.com/ruby/ruby/blob/v2_6_2/string.c#L1629

Beim Maskieren von "0xC0" oder "0b11000000" bleiben nur die beiden oberen Bits übrig. Da das nicht erste Byte von UTF-8 in der Reihenfolge "0b10xxxxxx" liegt, ist es nach dem Maskieren "0b10000000", wenn es sich um ein nicht erstes Byte handelt. Dies bedeutet jedoch, dass die Vergleichsoperation "! = 0x80" bestimmt, ob es das erste Byte ist Es bedeutet, dass Sie können.

Dieses Makro erleichtert die Implementierung der Codepunktzählung.

#define is_utf8_lead_byte(c) (((c)&0xC0) != 0x80)
int64_t count_utf8_codepoint(const char *p,  const char *e) {
    while (p < e) {
        if (is_utf8_lead_byte(*p)) len++;
        p++;
    }
    return len;
}

Übrigens ist es bisher einfach, aber die Implementierung im Ruby-Verarbeitungssystem wurde etwas weiter ausgearbeitet. In der obigen einfachen Implementierung wird die Zeichenfolgenverarbeitung in Byteeinheiten durchgeführt, aber in einer typischen Umgebung wie einem modernen PC / Server können 32/64-Bit-Ganzzahlen ohne Unannehmlichkeiten verarbeitet werden, sodass die Verarbeitung für jedes Byte möglich ist. Es ist relativ ineffizient. Daher werden mehrere Bytes im Typ "uintptr_t" zusammen gelesen, und die Anzahl aller ersten enthaltenen Bytes wird gleichzeitig gezählt. ** Die Anzahl der Codepunkte entspricht der Anzahl der ersten Bytes **, sodass Sie nur die ersten Bytes zählen müssen, um die Codepunkte zu zählen.

#define NONASCII_MASK UINT64_C(0x8080808080808080)
static inline uintptr_t
count_utf8_lead_bytes_with_word(const uintptr_t *s)
{
    uintptr_t d = *s;
    d = (d>>6) | (~d>>7);
    d &= NONASCII_MASK >> 7;
    return __builtin_popcountll(d);
}

https://github.com/ruby/ruby/blob/v2_6_2/string.c#L1631-L1665

Es ist etwas verwirrend, weil alles zusammen berechnet wird, aber bitte beachten Sie die folgenden Punkte.

--NONASCII_MASK >> 7 ist eine Maske, in der nur das niedrigstwertige Bit jedes Bytes gesetzt ist. -- d ist Bit und damit bleibt nur das niedrigstwertige Bit übrig

Nach der Bitoperation zeigt "d" an, dass es das erste Byte ist, wenn für jedes Byte das niedrigstwertige Bit gesetzt ist. Die anderen Bits sind maskiert und immer 0. Um die Anzahl der ersten enthaltenen Bytes zu zählen, können Sie popcnt ausführen.

Zum Schluss gebe ich Ihnen einen Überblick über die Implementierung mit count_utf8_lead_bytes_with_word.

int64_t ruby_count_utf8_codepoint(const char *p, const char *e)
{
    uintptr_t len = 0;
    if ((int)sizeof(uintptr_t) * 2 < e - p) {
        const uintptr_t *s, *t;
        const uintptr_t lowbits = sizeof(uintptr_t) - 1;
        s = (const uintptr_t*)(~lowbits & ((uintptr_t)p + lowbits));
        t = (const uintptr_t*)(~lowbits & (uintptr_t)e);
        while (p < (const char *)s) {
            if (is_utf8_lead_byte(*p)) len++;
            p++;
        }
        while (s < t) {
            len += count_utf8_lead_bytes_with_word(s);
            s++;
        }
        p = (const char *)s;
    }
    while (p < e) {
        if (is_utf8_lead_byte(*p)) len++;
        p++;
    }
    return (long)len;
}

https://github.com/ruby/ruby/blob/v2_6_2/string.c#L1680-L1700

Ich spreche darüber, warum es eine so komplizierte Implementierung ist, aber damit count_utf8_lead_bytes_with_word richtig funktioniert, ein Zeiger, der an der n-Byte-Grenze von uintptr_t ausgerichtet ist (normalerweise denke ich, n = 4 oder 8). Weil es gelesen werden muss. Die spezifische Methode ist wie folgt.

Sie können "count_utf8_lead_bytes_with_word" für die vorverarbeiteten "s" und "t" verwenden. Natürlich kann es eine Lücke zwischen "s", "t" und "p", "e" geben, daher verwenden wir die normale Schleifenverarbeitung, um die Unterschiede vorher und nachher auszufüllen.

AVX-Algorithmus / Implementierung

(Hinzugefügt am 18.04.2019) @umezawatakeshi hat die Implementierung hier verbessert. Wenn Sie an der Implementierung von AVX interessiert sind, lesen Sie auch diesen Artikel. https://qiita.com/umezawatakeshi/items/ed23935788756c800b86

Da es sich nicht um einen Algorithmus handelt, werde ich ihn parallel zur Implementierung erläutern.

Als grundlegende Richtlinie verwenden wir wie UTF-8-Validierungsalgorithmus die Eigenschaft, die Sie anhand des oberen Halbbytes des Bytes erkennen können, ob es sich um das erste Byte handelt. .. Sie können vpshufb verwenden, um 1 nur an der Position zu setzen, an der sich das erste Byte befand. Obwohl es eine Möglichkeit ist, 1 zu zählen, ist es kostspielig, in SIMD in horizontaler Richtung zu addieren, daher werden wir die in Einheiten von 32 Bytes ausgeschnittenen Vektoren so wie sie sind addieren. Da der Wertebereich, der durch Bytes dargestellt werden kann, 0 bis 255 beträgt, kann ein Überlauf auftreten, wenn die Vektoraddition mehr als 255 Mal durchgeführt wird. Die Problemumgehung besteht darin, die Schleife zu teilen und die Werte alle 255 Mal durch horizontale Addition zu aggregieren (wenn keine weiteren Eingaben vorhanden sind, endet die Schleife natürlich vor 255 Mal). Auf diese Weise wird die horizontale Addition nur einmal im Minimum und einmal alle 255 Mal im Maximum durchgeführt, so dass zu erwarten ist, dass die Kosten relativ niedrig sind.

Jetzt ist es Implementierung. Definieren Sie zunächst eine Funktion, die als Hilfsfunktion horizontal in Byteeinheiten hinzugefügt wird.

inline int32_t avx2_horizontal_sum_epi8(__m256i x)
{
    __m256i sumhi = _mm256_unpackhi_epi8(x, _mm256_setzero_si256());
    __m256i sumlo = _mm256_unpacklo_epi8(x, _mm256_setzero_si256());
    __m256i sum16x16 = _mm256_add_epi16(sumhi, sumlo);
    __m256i sum16x8 = _mm256_add_epi16(sum16x16, _mm256_permute2x128_si256(sum16x16, sum16x16, 1));
    __m256i sum16x4 = _mm256_add_epi16(sum16x8, _mm256_shuffle_epi32(sum16x8, _MM_SHUFFLE(0, 0, 2, 3)));
    uint64_t tmp = _mm256_extract_epi64(sum16x4, 0);
    int32_t result = 0;
    result += (tmp >> 0 ) & 0xffff;
    result += (tmp >> 16) & 0xffff;
    result += (tmp >> 32) & 0xffff;
    result += (tmp >> 48) & 0xffff;
    return result;
}

Mit avx2_horizontal_sum_epi8 ist es relativ einfach, eine Funktion zu schreiben, die Codepunkte für Vielfache von 32 zählt.

int64_t avx_count_utf8_codepoint(const char *p,  const char *e)
{
    // `p` must be 32B-aligned pointer
    p = static_cast<const char *>(__builtin_assume_aligned(p, 32));
    const size_t size = e - p;
    int64_t result = 0;
    for (size_t i = 0; i + 31 < size;) {
        __m256i sum = _mm256_setzero_si256();
        size_t j = 0;
        for (; j < 255 * 32 && (i + 31) + j < size; j += 32) {
            const __m256i table = _mm256_setr_epi8(
                    1, 1, 1, 1, 1, 1, 1, 1, //     .. 0x7
                    0, 0, 0, 0,             // 0x8 .. 0xB
                    1, 1, 1, 1,             // 0xC .. 0xF
                    1, 1, 1, 1, 1, 1, 1, 1, //     .. 0x7
                    0, 0, 0, 0,             // 0x8 .. 0xB
                    1, 1, 1, 1              // 0xC .. 0xF
                    );
            __m256i s = _mm256_load_si256(reinterpret_cast<const __m256i *>(p + i + j));
            s = _mm256_and_si256(_mm256_srli_epi16(s, 4), _mm256_set1_epi8(0x0F));
            s = _mm256_shuffle_epi8(table, s);
            sum = _mm256_add_epi8(sum, s);
        }
        i += j;
        result += avx2_horizontal_sum_epi8(sum);
    }
    return result;
}

Die innere Schleife fügt jeden Vektor hinzu, und die äußere Schleife aggregiert die Elementwerte des hinzugefügten Vektors und fügt sie zu "Ergebnis" hinzu. table ist eine Konvertierungstabelle zum Erkennen des ersten Bytes aus dem oberen Halbbyte von UTF-8 und zum Setzen nur des ersten Bytes auf 1.

Der bedingte Ausdruck "j <255 * 32 && (i + 31) + j <Größe" für die innere Schleife ist etwas verwirrend. Die Schleifenvariable "j" wird um 32 erhöht, so dass die erste Hälfte des bedingten Ausdrucks "j <255 * 32" bedeutet, dass die Schleife 255 Mal unterbrochen wird. Die zweite Hälfte, "(i + 31) + j <Größe", ist ein Schutz, um zu verhindern, dass die Eingangslänge "Größe" ein Vielfaches von 32 überschreitet. Wenn die innere Schleife endet, wird "j" zu "i" hinzugefügt, so dass "i + 31 <Größe" der äußeren Schleife ebenfalls 0 wird und die Schleife endet.

Implementierung der Zusammenarbeit

Der Teil, der über das Vielfache von 32 hinausgeht, kann mit der Ruby-Implementierung kombiniert werden. Dieses Mal habe ich versucht, den hervorstehenden Teil mit einer Implementierung zu füllen, die jeweils ein Byte verarbeitet. Da es möglich ist, dass "p" nicht an der 32B-Grenze ausgerichtet ist, habe ich auch einen Prozess eingefügt, um vor der Vektorverarbeitung skalar zur 32B-Grenze vorzurücken.

_ * (Hinzugefügt am 07.04.2019 00:30) Der Code wurde behoben, weil ein Fehler aufgetreten ist. _

int64_t count_utf8_codepoint(const char *p, const char *e)
{
    int64_t count = 0;
#if defined(__AVX2__)
    if (32 <= e - p) {
        // increment `p` to 32B boundary
        while (((uintptr_t)p % 32) != 0) {
            if (is_utf8_lead_byte(*p)) count++;
            p++;
        }
        // vectorized count
        count += avx_count_utf8_codepoint(p, e);
        p += static_cast<uintptr_t>(e - p) / 32 * 32;
    }
#endif
    while (p < e) {
        if (is_utf8_lead_byte(*p)) count++;
        p++;
    }
    return count;
}

Leistungsbeurteilung

Dieses Mal werden wir untersuchen, ob es einen ausreichenden Durchsatz für lange Saiten gibt und wie viel er quantitativ ist.

Ich werde kurz die Bedingungen für die Leistungsbewertung beschreiben.

--Umwelt: Ubuntu 18.04.1 auf VirtualBox auf MBP Ende 2013

Die Messziele sind die eingeführte skalare Version, die Ruby-Version und der "count_utf8_codepoint" (im Folgenden als AVX-Version bezeichnet), die im Abschnitt über die Implementierung der Zusammenarbeit beschrieben werden.

Messergebnis

Implementierung Compiler Anzahl der Taktzyklen(Mclk)
Skalare Version GCC 7323
Skalare Version Clang 8793
Ruby-Version GCC 4573
Ruby-Version Clang 2643
AVX-Version GCC 2361
AVX-Version Clang 2345

Warum ist das so?

Das Gesamtergebnis ist, dass die skalare Version die langsamste und die AVX-Version die schnellste ist.

Die Ruby-Implementierung hat einen großen Unterschied zwischen GCC und Clang, was anscheinend darauf zurückzuführen ist, dass die automatische Vektorisierung von Clang gut funktioniert. Die automatische Vektorisierung von GCC funktioniert nicht für den schwersten Teil der Ruby-Version (die Schleife von while (s <t)), bei der es einen großen Unterschied gibt.

Interessanterweise scheint GCC bei der Codegenerierung effizienter zu sein, wenn die skalare Version automatisch vektorisiert wird. Hat Clang einen Hinweis von der Ruby-Version des Algorithmus bekommen ... Ich hoffe, es ist kein Messfehler.

Beim Vergleich der Ruby-Version und der AVX-Version für dasselbe Verarbeitungssystem (Clang) kann die Geschwindigkeit durch Handschrift des AVX-Codes um etwa 10% erhöht werden. Mit der Unterstützung des Compilers kann jedoch gesagt werden, dass etwa 90% der Leistung der intrinsischen Handschrift relativ leicht erzielt werden konnten, so dass das Problem der Kostenleistung wahrscheinlich auftritt.

In Echtzeit betrug die AVX-Version Clang übrigens etwa 0,838 Sekunden. Mit anderen Worten, ungefähr 12 GiB / s sind aus. Da die Zeichenfolge groß genug ist, funktioniert der Cache nicht, und wenn man bedenkt, dass der an diesen MBP angeschlossene 2-Kanal-Speicher DDR3-1600 ein theoretisches Band von 25,6 GiB / s hat, können etwa 50% des Bandes verwendet werden. Werden. Ich denke, es ist mehr als genug für ein Single-Thread-Programm.

Zusammenfassung

Appendix

Quellcode

https://gist.github.com/saka1/5bdd0f0ad4e498d22258982f6b9699c5

Recommended Posts

So zählen Sie UTF-8-Codepunkte schnell
So löschen Sie Stücklisten (UTF-8)
So rufen Sie Swift 5.3-Code von Objective-C auf
Rubinlänge, -größe, -anzahl Verwendung
Farbcodierung der Konsolenausgabe in Eclipse
Verwendung von Segmented Control und zu notierenden Punkten
Wie man Code schreibt, der objektorientiertes Ruby denkt
Codeüberprüfungspunkte
So schreiben Sie Testcode mit Basic-Zertifizierung
So implementieren Sie UICollectionView mit Code nur in Swift
So erstellen Sie eine Java-Entwicklungsumgebung mit VS Code
So entwickeln Sie OpenSPIFe
So rufen Sie AmazonSQSAsync auf
Klasse zu zählen
Wie schreibe ich Rails
Wie benutzt man rbenv?
Verwendung mit_option
Verwendung von java.util.logging
Verwendung der Karte
Wie benutzt man Twitter4J
Wie benutzt man active_hash! !!
So installieren Sie Docker
So deinstallieren Sie Rails
So installieren Sie Docker-Maschine
[Verwendung des Etiketts]
Wie man ein schattiertes Glas macht
Wie schreibe ich Docker-Compose
Wie man Identität benutzt
Wie man Hash benutzt
Wie schreibe ich Mockito
So erstellen Sie Docker-Compose
So installieren Sie MySQL
So schreiben Sie eine Migrationsdatei
Wie man android-midi-lib baut
Verwendung von Dozer.mapper
Wie benutzt man Gradle?
Verwendung von org.immutables
Verwendung von java.util.stream.Collector
Verwendung von VisualVM
Verwendung von Map
Wie man einen Schrägstrich zurückschlägt \
So verketten Sie Zeichenfolgen
So legen Sie mit JAXB Zeichencode und Zeilenvorschubcode fest
So kehren Sie die Kompilierung der Apk-Datei in Java-Quellcode mit MAC um
So legen Sie den Zeichencode und den Zeilenvorschubcode in Eclipse fest
So wählen Sie ein bestimmtes Datum anhand des Codes im FS-Kalender aus
[Integrationstestcode] So wählen Sie ein Element aus date_select aus
So wenden Sie das C-Code-Format über die Befehlszeile an