In diesem Artikel stellen wir die benutzerfreundliche und hoch konfigurierbare ** Java ** -Bibliothek "** Alibaba Cloud LOG Java Producer **" vor, die das Senden von Daten an den Protokolldienst unterstützt.
Protokolle sind überall. Als Träger zur Aufzeichnung von Veränderungen in der Welt werden Protokolle in vielen Bereichen wie Marketing, Forschung und Entwicklung, Betrieb, Sicherheit, BI und Auditing häufig verwendet.
Alibaba Log Service ist eine All-in-One-Serviceplattform für Protokolldaten. Die Kernkomponente, LogHub, ist eine Infrastruktur für die Big-Data-Verarbeitung, insbesondere für die Echtzeit-Datenverarbeitung, mit großartigen Funktionen wie hohem Durchsatz, geringer Latenz und automatischer Skalierung. Flink, Spark, Jobs, die auf Big-Data-Computing-Engines wie Storm ausgeführt werden, schreiben Datenverarbeitungsergebnisse und Zwischenergebnisse in Echtzeit in LogHub. Ich werde. Mithilfe von Daten aus LogHub können nachgeschaltete Systeme viele Dienste wie Abfrageanalyse, Alarmüberwachung, maschinelles Lernen und iterative Berechnungen bereitstellen. Die Big-Data-Verarbeitungsarchitektur von LogHub sieht wie in der folgenden Abbildung aus.
Damit das System ordnungsgemäß funktioniert, müssen Sie eine bequeme und effiziente Datenschreibmethode verwenden. Die direkte Verwendung von APIs und SDKs reicht nicht aus, um die Anforderungen an die Datenschreibfähigkeit in Big-Data-Szenarien zu erfüllen. "Alibaba Cloud LOG Java Producer" wurde dort entwickelt.
Alibaba Cloud LOG Java Producer ist eine benutzerfreundliche und hoch konfigurierbare Java-Klassenbibliothek. Es hat die folgenden Funktionen.
Das Schreiben von Daten in LogHub mit Producer bietet gegenüber API oder SDK die folgenden Vorteile.
Bei großen Datenmengen und begrenzten Ressourcen muss eine komplexe Logik wie Multithreading, Cache-Richtlinien, Stapelverarbeitung und Wiederholungsversuche im Fehlerfall implementiert werden, um den gewünschten Durchsatz zu erzielen. Der Hersteller implementiert die obige Logik, um die Anwendungsleistung zu verbessern und den Anwendungsentwicklungsprozess zu vereinfachen.
Wenn Sie über genügend Cache-Speicher verfügen, speichert Producer die Daten, die Sie an LogHub senden, zwischen. Wenn Sie die Sendemethode aufrufen, werden die angegebenen Daten sofort gesendet, ohne die Verarbeitung zu blockieren. Dies realisiert die Trennung von Berechnungs- und E / A-Logik. Zu einem späteren Zeitpunkt können Sie die Datenübertragungsergebnisse von den zurückgegebenen zukünftigen Objekten und registrierten Rückrufen abrufen.
Die Größe des Speichers, der vom Produzenten zum Zwischenspeichern der zu sendenden Daten verwendet wird, kann durch Parameter sowie die Anzahl der für die Datenübertragungsaufgabe verwendeten Threads gesteuert werden. Dies vermeidet, dass der Produzent unbegrenzte Ressourcen verbraucht. Sie können auch den Ressourcenverbrauch und den Schreibdurchsatz je nach tatsächlicher Situation ausgleichen.
Zusammenfassend bietet Producer viele Vorteile, indem komplexe zugrunde liegende Details automatisch verarbeitet und eine einfache Benutzeroberfläche verfügbar gemacht werden. Darüber hinaus hat dies keine Auswirkungen auf den normalen Betrieb von Diensten der oberen Schicht und kann den Schwellenwert für den Datenzugriff erheblich senken.
Um die Leistung von Producer besser zu verstehen, wird in diesem Abschnitt die Funktionsweise von Producer beschrieben, einschließlich Datenschreiblogik, Implementierung der Kernkomponenten und ordnungsgemäßem Herunterfahren. Die Gesamtarchitektur von Producer ist in der folgenden Abbildung dargestellt.
Logik zum Schreiben von Produzentendaten:
------ 1 werden die zuvor zwischengespeicherten Daten vom Batch-Handler verarbeitet und der von diesen Daten belegte Speicher wird freigegeben. Infolgedessen verfügt der Produzent über genügend Speicherplatz, um die interessierenden Daten zu speichern. ------ 2 wird eine Ausnahme ausgelöst, wenn die angegebene Sperrzeit überschritten wird.
Wenn Sie Producer.send () aufrufen, kann die Anzahl der Protokolle des Zielstapels maxBatchCount überschreiten, oder der Zielstapel verfügt möglicherweise nicht über genügend Speicherplatz zum Speichern der Zieldaten. In diesem Fall sendet Producer zuerst den Zielstapel an IOThreadPool und erstellt dann einen neuen Stapel zum Speichern der Zieldaten. Um das Blockieren von Threads zu vermeiden, verwendet IOThreadPool eine unbegrenzte Blockierungswarteschlange. Die Anzahl der Protokolle, die in einer Producer-Instanz zwischengespeichert werden können, ist begrenzt, sodass die Warteschlangenlänge nicht unbegrenzt zunimmt.
Mover durchläuft jeden Producer-Stapel in LogAccumulator und sendet Stapel, die die maximale Cache-Zeit überschreiten, an expiredBatches. Außerdem wird die früheste Ablaufzeit (t) für nicht abgelaufene Chargen aufgezeichnet. 4, dann sendet LogAccumulator einen abgelaufenen Stapel an IOThreadPool. 5, dann erhält Mover den Producer-Stapel von RetryQueue, der den Sendekriterien entspricht. Wenn keine Charge die Bedingungen erfüllt, warten Sie einen Zeitraum von t.
Senden Sie dann den abgelaufenen Stapel von RetryQueue an IOThreadPool. Am Ende von Schritt 6 wiederholt Mover die Schritte 3-6.
IOThreadPool-Worker-Threads senden Stapel aus blockierten Warteschlangen an den Zielprotokollspeicher. 8, nachdem der Stapel an den Protokollspeicher gesendet wurde, geht er in die Erfolgswarteschlange.
Wenn die Übertragung fehlschlägt und eine der folgenden Bedingungen erfüllt ist, wechseln Sie in die Fehlerwarteschlange. ------- 1, der fehlgeschlagene Stapel kann nicht wiederholt werden. ------- 2, RetryQueue wird geschlossen. ------- 3, Die angegebene Anzahl von Wiederholungsversuchen wurde erreicht, und die Anzahl der Stapel in der Fehlerwarteschlange überschreitet nicht die Hälfte der Gesamtzahl der gesendeten Stapel.
Andernfalls berechnet der Worker-Thread die nächste Sendezeit für den fehlgeschlagenen Stapel und sendet sie an die RetryQueue.
Der SuccessBatchHandler-Thread nimmt den Stapel aus der Erfolgswarteschlange und führt alle in diesem Stapel registrierten Rückrufe aus.
Der FailureBatchHandler-Thread nimmt den Stapel aus der Fehlerwarteschlange und führt alle in diesem Stapel registrierten Rückrufe aus.
Die Kernkomponente von Producer ist LogAccumulator /internals/LogAccumulator.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=LogAccumulator.java), [RetryQueue](https://github.com/aliyun/aliyun-log-java-producer/blob/master/r /java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=RetryQueue.java), [Mover](https://github.com/aliyun/ -log-java-Produzent / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / Produzent / internals / Mover.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = Mover.java), [ IOThreadPool](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/IOThreadPool.java?spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = IOThreadPool.java), [SendProducerBatchTask](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=SendProducerBatchTask.java), [BatchHandler](https:// -log-java-Produzent / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / Produzent / internals / BatchHandler.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = BatchHandler.java) ..
LogAccumulator Es ist üblich, Daten in größeren Stapeln zu speichern und die Daten in Stapeln zu senden, um den Durchsatz zu verbessern. Die hier beschriebene Hauptaufgabe des LogAccumulators besteht darin, die Daten zu Stapeln zusammenzuführen. Um verschiedene Daten zu einem größeren Stapel zusammenzuführen, müssen die Daten dieselben Projekt-, Protokollspeicher-, Themen-, Quell- und shardHash-Eigenschaften haben. LogAccumulator speichert diese Daten basierend auf diesen Eigenschaften an verschiedenen Stellen auf der internen Karte zwischen. Der Schlüssel der Karte sind die 5 Elemente der obigen 5 Eigenschaften und der Wert ist ProducerBatch. ConcurrentMap wird verwendet, um Thread-Sicherheit und hohe Parallelität zu gewährleisten.
Eine weitere Funktion von LogAccumulator ist die Steuerung der Gesamtgröße der zwischengespeicherten Daten. Ich verwende Semaphore, um diese Steuerlogik zu implementieren. Semaphore ist ein auf AbstractQueuedSynchronizer basierendes (AQS-basiertes) Hochleistungssynchronisationstool. Semaphore versucht zunächst, gemeinsam genutzte Ressourcen durch Drehen zu erwerben, wodurch der Overhead für Kontextwechsel verringert wird.
RetryQueue RetryQueue wird zum Speichern von Stapeln verwendet, die nicht gesendet werden konnten und darauf warten, erneut versucht zu werden. Jeder dieser Stapel verfügt über ein Feld, das angibt, wann der Stapel gesendet werden soll. Um abgelaufene Stapel effizient abzurufen, verfügt der Hersteller über eine Verzögerungswarteschlange zum Speichern dieser Stapel. DelayQueue ist eine zeitbasierte Warteschlange mit hoher Priorität, die den frühesten abgelaufenen Stapel zuerst verarbeitet. Diese Warteschlange ist threadsicher.
Mover Mover ist ein separater Thread. LogAccumulator und RetryQueue senden regelmäßig abgelaufene Stapel an IOThreadPool. Mover belegt CPU-Ressourcen auch im Leerlauf. Um die Verschwendung von CPU-Ressourcen zu vermeiden, wartet Mover auf abgelaufene Stapel von RetryQueue, während er keinen qualifizierten Stapel findet, der von LogAccumulator und RetryQueue gesendet wurde. Dieser Zeitraum ist die maximal konfigurierte Cache-Zeit.
IOThreadPool Arbeitsthreads in IOThreadPool senden Daten an den Protokollspeicher. Die Größe von IOThreadPool kann mit dem Parameter ioThreadCount angegeben werden, und der Standardwert ist doppelt so groß wie die Anzahl der Prozessoren.
SendProducerBatchTask SendProducerBatchTask ist in der Batch-Sendelogik gekapselt. Um zu vermeiden, dass der E / A-Thread blockiert wird, sendet SendProducerBatchTask den Zielstapel zur Rückrufausführung an eine andere Warteschlange, unabhängig davon, ob der Zielstapel erfolgreich gesendet wurde. Wenn der fehlgeschlagene Stapel die Wiederholungskriterien erfüllt, wird er nicht sofort im aktuellen E / A-Thread erneut gesendet. Wenn es sofort erneut gesendet wird, schlägt es normalerweise erneut fehl. Stattdessen sendet SendProducerBatchTask es gemäß einer exponentiellen Backoff-Richtlinie an RetryQueue.
BatchHandler Der Hersteller startet SuccessBatchHandler und FailureBatchHandler, um erfolgreiche und nicht erfolgreiche Stapel zu verarbeiten. Nachdem der Handler die Rückrufausführung und die zukünftige Konfiguration des Stapels abgeschlossen hat, gibt er den von diesem Stapel belegten Speicher für neue Daten frei. Durch die getrennte Verarbeitung wird sichergestellt, dass erfolgreich gesendete und nicht erfolgreiche Stapel getrennt werden. Dies gewährleistet einen reibungslosen Betrieb des Herstellers.
GracefulShutdown Um GracefulShutdown zu implementieren, müssen die folgenden Anforderungen erfüllt sein:
Um die oben genannten Anforderungen zu erfüllen, ist die enge Logik des Herstellers wie folgt aufgebaut:
Auf diese Weise werden durch das Schließen von Warteschlangen und Threads nacheinander basierend auf der Richtung des Datenflusses ein ordnungsgemäßes Herunterfahren und eine sichere Beendigung erreicht.
Alibaba Cloud LOG Java Producer ist ein umfassendes Upgrade auf frühere Versionen von Producer. Es behebt viele Probleme mit früheren Versionen, wie z. B. eine hohe CPU-Auslastung im Falle einer Netzwerkausnahme und einen geringen Datenverlust beim Beenden von Producer. Zusätzlich wurde der Fehlertoleranzmechanismus verstärkt. Der Hersteller kann auch nach einem Fehler eine ordnungsgemäße Ressourcennutzung, einen hohen Durchsatz und eine enge Isolierung sicherstellen.
Recommended Posts