[Java] Java library-help send data to logging service with Alibaba Cloud’s LOG Java Producer

5 minute read

This article introduces the easy-to-use and highly configurable Java library “Java Cloud LOG Java Producer” that supports sending data to a log service.

*This blog is a translation from the English version. The original is from here. You can check. Some machine translations are used. If you have any translation errors, we would appreciate it if you could point them out. *


Logs are everywhere. As a carrier of changes in the world, logs are widely used in many fields such as marketing, research and development, operations, security, BI, and audit.


Alibaba Log Serviceisanall-in-oneserviceplatformforlogdata.Itscorecomponent,LogHub,isaninfrastructureforbigdataprocessing,especiallyreal-timedataprocessing,withitsexcellentfeaturessuchashighthroughput,lowlatency,andautomaticscaling.Flink,Spark,JobsrunningonbigdatacomputingenginessuchasStorm write data processing results and intermediate results to LogHub in real time I will. Using data from LogHub, downstream systems can provide many services such as query analysis, alarm monitoring, machine learning, and iterative computing. The LogHub big data processing architecture looks like this:


In order for your system to work properly, you need to use convenient and efficient data writing methods. Direct use of APIs and SDKs is not enough to meet the data writeability requirements of big data scenarios. There, “Alibaba Cloud LOG Java Producer” was developed.


Alibaba Cloud LOG Java Producer is an easy-to-use, highly configurable Java class library. It has the following functions.

1, Thread Safe: All methods exposed by Alibaba Cloud LOG Java Producer (“Producer”) are thread safe. 2, Asynchronous Sending: Calls to the Producer’s SEND method typically return immediately, without waiting for data to be sent or a response from the server to be received. Producer has an internal cache mechanism (LogAcccumulator) to cache the data to be sent in batch, and it improves the throughput by sending the data in batch.

  1. Automatic retry: Producer provides a retry mechanism (RetryQueue) that can be set automatically for retryable exceptions. You can set the maximum retry time and backoff period for RetryQueue. 4, Traceability: Callbacks and futures can be used to know if the data of interest was successfully sent and the attempt made to send the data. You can use this feature to trace a problem and make a decision to resolve it. 5, Context restore: Logs generated by the same Producer are in the same context, and related logs before and after a certain log can be checked on the server side.
  2. Shutdown: When the close method returns the result, all the data cached by the Producer is processed and you can receive the corresponding notification.


Writing data to LogHub using Producer has the following advantages compared to using API or SDK.

High performance

With large amounts of data and limited resources, you need to implement complex logic such as multithreading, cache policies, batch processing, and retry in case of failure to achieve the desired throughput. Producer implements the above logic to improve application performance and simplify the application development process.

Asynchronous and non-blocking task execution

With sufficient cache memory, Producer caches the data it sends to LogHub. When you call the send method, the specified data is sent immediately without blocking processing. This realizes the separation of operation and I/O logic. At a later date, you can retrieve the data transmission result from the returned future object or the registered callback.

Controllable resource usage

The size of memory used by Producer to cache the data to be sent can be controlled by a parameter as well as the number of threads used for the data sending task. This prevents Producer from consuming unlimited resources. You can also balance resource consumption and write throughput depending on the actual situation.


In summary, Producer offers many benefits by automatically handling the complex underlying details and exposing a simple interface. In addition, the threshold of data access can be significantly reduced without affecting the normal operation of upper layer services.

Mechanism description

To better understand Producer performance, this section describes how Producer works, including data write logic, core component implementation, and graceful shutdown. The overall Producer architecture is shown in the diagram below.


Write data

Producer data write logic:

  1. After calling the producer.send() method to send the data to the specified log store, the data is loaded into the Producer batch in the LogAccumulator. Normally, the send method returns the result immediately. However, if the Producer instance does not have enough space to store the desired data, the send method will block until one of the following conditions is met:
  • —–1, the previously cached data is processed by the batch handler and the memory occupied by that data is released. As a result, the Producer will have enough space to store the data of interest.
  • —–2, Exception will occur if the specified blocking time is exceeded.
  1. If you call Producer.send(), the number of logs in the target batch may exceed maxBatchCount, or there may not be enough space to store the target data in the target batch. In this case, the Producer first sends the target batch to the IOThreadPool and then creates a new batch to store the target data. To avoid blocking threads, IOThreadPool uses an unlimited blocking queue. The queue length does not grow indefinitely because the Producer instance can only cache a limited number of logs. 3, Mover traverses each Producer batch of LogAccumulator and sends the batch that exceeds the maximum cache time to expiredBatches. Also, record the earliest expiration time (t) for unexpired batches. 4, Then send the expired batch from LogAccumulator to IOThreadPool. 5, After that, Mover gets the Producer batch that matches the sending condition from RetryQueue. If no batch meets the criteria, wait for t periods. 6, and then send the expired batch from RetryQueue to IOThreadPool. When step 6 is complete, the Mover repeats steps 3-6. 7, IOThreadPool worker threads send batches from the blocked queue to the target log store.
  2. After batch is sent to log store, it goes to success queue.
  3. If the transmission fails and one of the following conditions is met, go to the failure queue.
    • ——1, failed batch cannot be retried.
    • ——2, RetryQueue is closed.
    • ——3, The specified number of retries is reached, and the number of batches in the failure queue does not exceed 1/2 of the total number of batches to be sent.

10, otherwise the worker thread will calculate the next send time for the failed batch and send it to RetryQueue. 11, SuccessBatchHandler thread takes a batch from the success queue and executes all callbacks registered for this batch. 12, FailureBatchHandler thread takes a batch from the failure queue and executes all callbacks registered for this batch.

Core componentsProducer のコアコンポーネントには、LogAccumulatorRetryQueueMoverIOThreadPoolSendProducerBatchTaskBatchHandler があります。


スループットを向上させるために、より大きなバッチにデータを蓄積し、バッチでデータを送信するのが一般的です。ここで説明するLogAccumulatorの主な役割は、データをバッチにマージすることです。異なるデータを大きなバッチにマージするには、データが同じプロジェクト、ログストア、トピック、ソース、およびshardHashプロパティを持っている必要があります。LogAccumulator は、これらのプロパティに基づいて、これらのデータを内部マップの異なる位置にキャッシュします。マップのキーは、上記の5つのプロパティの5つの要素であり、値はProducerBatchです。スレッドの安全性と高い並行性を確保するために、ConcurrentMapが使用されます。



RetryQueue は、送信に失敗して再試行を待っているバッチを保存するために使用されます。これらの各バッチは、バッチを送信する時間を示すフィールドを持っています。期限切れのバッチを効率的に引き出すために、プロデューサーはこれらのバッチを保存するためのDelayQueueを持っています。DelayQueue は時間ベースの優先度の高いキューで、最も早い期限切れのバッチが最初に処理されます。このキューはスレッドセーフです。


Mover は独立したスレッドです。LogAccumulator と RetryQueue から期限切れのバッチを定期的に IOThreadPool に送信します。Mover はアイドル状態でも CPU リソースを占有します。CPU リソースの無駄遣いを避けるために、Mover は、LogAccumulator および RetryQueue から送信される適格なバッチが見つからない間、RetryQueue からの期限切れバッチを待ちます。この期間は、構成した最大キャッシュ時間 lingerMs です。


IOThreadPool内のワーカースレッドは、ログストアにデータを送信します。IOThreadPool のサイズは ioThreadCount パラメータで指定でき、デフォルト値はプロセッサ数の 2 倍です。


SendProducerBatchTaskは、バッチ送信ロジックでカプセル化されています。I/O スレッドのブロックを避けるために、SendProducerBatchTask は、ターゲットのバッチが正常に送信されたかどうかにかかわらず、コールバック実行のために別のキューにターゲットのバッチを送信します。さらに、失敗したバッチがリトライ条件を満たした場合、現在のI/Oスレッドではすぐに再送されません。すぐに再送された場合、通常は再び失敗します。その代わりに、SendProducerBatchTask は、指数的なバックオフポリシーに従って RetryQueue に送ります。


プロデューサーは、送信に成功したバッチと失敗したバッチを処理するために、SuccessBatchHandler と FailureBatchHandler を起動します。ハンドラがコールバックの実行やバッチの未来の設定を完了した後、新しいデータを使用するために、このバッチが占有しているメモリを解放します。別々の処理は、正常に送信されたバッチと失敗したバッチが分離されていることを確実にします。これは Producer のスムーズな操作を保証します。



1、close メソッドが結果をあなたに返すとき、プロデューサーのすべてのスレッドが終了している必要があります。また、キャッシュされたデータが適切に処理されていること、自分で登録したコールバックがすべて実行されていること、自分に返す先物がすべて設定されていることが必要になります。 2、また、closeメソッドの最大待ち時間を設定できるようにしておく必要があります。メソッドは、スレッドが終了したかどうか、キャッシュされたデータが処理されたかどうかに関わらず、この期間を超えた後、直ちに結果をあなたに返さなければなりません。 3、closeメソッドは、マルチスレッド環境でも複数回呼び出すことができ、正常に動作します。 4、コールバックでcloseメソッドを呼び出すことは安全であり、アプリケーションにデッドロックを起こすことはありません。


1、LogAccumulatorを閉じる。LogAccumulator にデータを書き続けると例外が発生します。 2、RetryQueue を閉じます。RetryQueue にバッチを送り続けると、例外がスローされます。 3、Mover を閉じて、完全に終了するのを待ちます。クローズシグナルを検出した後、Mover は送信条件を満たしているかどうかに関わらず、LogAccumulator と RetryQueue から残っているすべてのバッチを IOThreadPool に送信します。データ損失を避けるために、Mover は、他のスレッドが書き込みをしなくなるまで、常に LogAccumulator と RetryQueue からバッチを引っ張ってきます。 4、IOThreadPool を閉じて、送信されたすべてのタスクが完了するのを待ちます。RetryQueue が既に閉じられている場合、失敗したバッチは直接失敗キューに送られます。 5、SuccessBatchHandlerを閉じて、完全に終了するのを待ちます。コールバックでcloseメソッドが呼び出された場合、待機処理はスキップされます。Closeシグナルを検出した後、SuccessBatchHandlerは成功キューからすべてのバッチを取り出し、1つずつ処理します。 6、FailureBatchHandlerを閉じて、完全に終了するのを待つ。コールバックでcloseメソッドが呼び出された場合、待機処理はスキップされます。Close シグナルを検出した後、FailureBatchHandler は失敗キューからすべてのバッチを取り出し、1 つずつ処理します。



Alibaba Cloud LOG Java Producerは、以前のバージョンのProducerの包括的なアップグレードです。ネットワーク例外が発生した場合のCPU使用率の高さや、Producerを終了する際のデータ損失の少なさなど、以前のバージョンでの多くの問題点を解決しています。さらに、フォールトトレランス機構が強化されました。Producerは、操作ミスをした後でも、適切なリソース使用量、高いスループット、厳密な隔離を確保することができます。

アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。 アリババクラウドの詳細は、こちらからご覧ください。 アリババクラウドジャパン公式ページ