Java Library-Alibi Cloud's LOG Java Producer helps send data to log services

In this article, we will introduce the easy-to-use and highly configurable ** Java ** library "** Alibaba Cloud LOG Java Producer **" that supports data transmission to the log service.

background

Logs are everywhere. As a carrier to record changes in the world, logs are widely used in many fields such as marketing, R & D, operations, security, BI, and auditing.

image.png

Alibaba Log Service is an all-in-one service platform for log data. Its core component, LogHub, is an infrastructure for big data processing, especially real-time data processing, with great features such as high throughput, low latency, and autoscaling. Flink, Spark, Jobs running on big data computing engines such as Storm write data processing results and intermediate results to LogHub in real time. I will. Using data from LogHub, downstream systems can provide many services such as query analysis, alarm monitoring, machine learning, and iterative calculations. LogHub's big data processing architecture looks like the following figure.

image.png

In order for your system to work properly, you need to use a convenient and efficient way to write data. Direct use of APIs and SDKs is not enough to meet the data write capability requirements in big data scenarios. That's why "Alibaba Cloud LOG Java Producer" was developed.

Feature

Alibaba Cloud LOG Java Producer is an easy-to-use and highly configurable Java class library. It has the following functions.

  1. Threadsafe: All methods exposed by Alibaba Cloud LOG Java Producer (“Producer”) are threadsafe.
  2. Asynchronous send: The call to Producer's SEND method is usually returned immediately, without waiting for data to be sent or a response from the server. Producer has an internal cache mechanism (LogAcccumulator) for caching the data to be sent in a batch, and the throughput is improved by sending the data in a batch.
  3. Automatic retry: Producer provides an automatically configurable retry mechanism (RetryQueue) for retryable exceptions. You can set the maximum retry time and backoff period for RetryQueue. 4, Traceability: You can use callbacks and futures to know if the data of interest was sent successfully and the attempts made to send the data. You can use this feature to trace problems and make decisions to resolve them. 5, Context restore: Logs generated by the same Producer are in the same context, and you can check the related logs before and after a certain log on the server side.
  4. Shutdown: When the close method returns a result, all the data cached by Producer will be processed and you will be notified accordingly.

merit

Writing data to LogHub using Producer has the following advantages over using API or SDK.

High performance

With large amounts of data and limited resources, complex logic such as multithreading, cache policy, batch processing, and retry in the event of a failure must be implemented to achieve the desired throughput. Producer implements the above logic to improve application performance and simplify the application development process.

Asynchronous and non-blocking task execution

If you have enough cache memory, Producer will cache the data you send to LogHub. When you call the send method, the specified data is sent immediately without blocking processing. This realizes the separation of operation and I / O logic. At a later date, you can retrieve the data transmission results from the returned future objects and registered callbacks.

Utilization of controllable resources

The size of the memory used by the Producer to cache the data to be sent can be controlled by parameters as well as the number of threads used for the data sending task. This avoids Producer consuming unlimited resources. You can also balance resource consumption and write throughput depending on the situation.

Summary

In summary, Producer offers many benefits by automatically handling complex underlying details and exposing a simple interface. In addition, it does not affect the normal operation of higher layer services and can significantly reduce the threshold for data access.

Explanation of the mechanism

To better understand Producer performance, this section describes how Producer works, including data write logic, core component implementation, and graceful shutdown. The overall architecture of Producer is shown in the figure below.

image.png

Data writing

Producer data writing logic:

  1. After calling the producer.send () method to send the data to the specified log store, the data will be loaded into the Producer batch in the LogAccumulator. Normally, the send method returns the result immediately. However, if the Producer instance does not have enough space to store the desired data, the send method will block until one of the following conditions is met:

------1, the previously cached data is processed by the batch handler and the memory occupied by that data is released. As a result, the Producer will have enough space to store the data of interest. ------ 2, an exception will be thrown if the specified blocking time is exceeded.

  1. When you call Producer.send (), the number of logs of the target batch may exceed maxBatchCount, or there may not be enough space to store the target data in the target batch. In this case, Producer first sends the target batch to IOThreadPool and then creates a new batch to store the target data. To avoid blocking threads, IOThreadPool uses an unlimited blocking queue. The number of logs that can be cached in a Producer instance is limited, so the queue length will not grow indefinitely.

  2. Mover traverses each Producer batch in LogAccumulator and sends batches that exceed the maximum cache time to expiredBatches. It also records the earliest expiration time (t) for unexpired batches. 4, then LogAccumulator sends an expired batch to IOThreadPool.

  3. After that, Mover gets the Producer batch that matches the transmission conditions from RetryQueue. If no batch meets the conditions, wait for a period of t.

  4. Then send the expired batch from RetryQueue to IOThreadPool. At the end of step 6, Mover repeats steps 3-6.

  5. IOThreadPool worker threads send batches from blocked queues to the target logstore. 8, after the batch is sent to the log store, it goes to the success queue.

  6. If the transmission fails and one of the following conditions is met, go to the failure queue. ------- 1, the failed batch cannot be retried. -------2, RetryQueue will be closed. ------- 3, The specified number of retries has been reached, and the number of batches in the failure queue does not exceed 1/2 of the total number of batches sent.

  7. Otherwise, the worker thread will calculate the next send time for the failed batch and send it to the RetryQueue.

  8. The SuccessBatchHandler thread takes the batch from the success queue and executes all the callbacks registered in this batch.

  9. The FailureBatchHandler thread takes the batch from the failure queue and executes all the callbacks registered in this batch.

Core components

The core component of Producer is [LogAccumulator](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer /internals/LogAccumulator.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=LogAccumulator.java), [RetryQueue](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=RetryQueue.java), Mover -log-java-producer / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producer / internals / Mover.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = Mover.java), [ IOThreadPool](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/IOThreadPool.java?spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = IOThreadPool.java), [SendProducerBatchTask](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=SendProducerBatchTask.java), [BatchHandler](https://github.com/aliyun/aliyun -log-java-producer / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producer / internals / BatchHandler.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = BatchHandler.java) ..

LogAccumulator It is common to store data in larger batches and send the data in batches to improve throughput. The main role of the LogAccumulator described here is to merge the data into batches. To merge different data into a large batch, the data must have the same project, logstore, topic, source, and shardHash properties. LogAccumulator caches these data in different locations on the internal map based on these properties. The key of the map is the 5 elements of the above 5 properties and the value is ProducerBatch. ConcurrentMap is used to ensure thread safety and high concurrency.

Another feature of LogAccumulator is to control the total size of the cached data. I am using Semaphore to implement this control logic. Semaphore is an AbstractQueuedSynchronizer-based (AQS-based) high-performance synchronization tool. Semaphore first attempts to acquire shared resources by spinning, reducing context switch overhead.

RetryQueue RetryQueue is used to store batches that have failed to send and are waiting to be retried. Each of these batches has a field that indicates when to send the batch. To efficiently retrieve expired batches, the producer has a Delay Queue to store these batches. DelayQueue is a time-based, high-priority queue that processes the earliest expired batch first. This queue is thread safe.

Mover Mover is an independent thread. Periodically send expired batches to IOThreadPool from LogAccumulator and RetryQueue. Mover occupies CPU resources even when idle. To avoid wasting CPU resources, Mover waits for expired batches from RetryQueue while it cannot find a suitable batch sent by LogAccumulator and RetryQueue. This period is the maximum cache time lingerMs configured.

IOThreadPool Worker threads in IOThreadPool send data to the log store. The size of IOThreadPool can be specified with the ioThreadCount parameter, and the default value is twice the number of processors.

SendProducerBatchTask SendProducerBatchTask is encapsulated in batch send logic. To avoid blocking the I / O thread, SendProducerBatchTask sends the target batch to another queue for callback execution, regardless of whether the target batch was sent successfully. In addition, if the failed batch meets the retry criteria, it will not be resent immediately in the current I / O thread. If it is resent immediately, it usually fails again. Instead, SendProducerBatchTask sends to RetryQueue according to an exponential backoff policy.

BatchHandler The producer launches SuccessBatchHandler and FailureBatchHandler to handle the batches that succeeded and failed to send. After the handler completes the callback execution and future configuration of the batch, it frees the memory occupied by this batch for new data. Separate processing ensures that successfully sent and failed batches are separated. This ensures the smooth operation of Producer.

GracefulShutdown To implement GracefulShutdown, the following requirements must be met:

  1. When the close method returns the result to you, all threads of the producer must be closed. You also need that the cached data is properly processed, that all the callbacks you have registered are executed, and that all futures to be returned to you are set.
  2. You also need to be able to set the maximum wait time for the close method. The method must return the result to you immediately after this period is exceeded, regardless of whether the thread has terminated or the cached data has been processed.
  3. The close method can be called multiple times even in a multithreaded environment and works normally.
  4. It is safe to call the close method in the callback and it will not cause a deadlock in the application.

To meet the above requirements, the producer's close logic is designed as follows:

  1. Close Log Accumulator. If you keep writing data to LogAccumulator, you will get an exception.
  2. Close the RetryQueue. If you keep sending batches to RetryQueue, an exception will be thrown.
  3. Close Mover and wait for it to finish completely. After detecting the close signal, Mover sends all remaining batches from LogAccumulator and RetryQueue to IOThreadPool, regardless of whether the send conditions are met. To avoid data loss, Mover always pulls batches from LogAccumulator and RetryQueue until no other thread writes.
  4. Close IOThreadPool and wait for all submitted tasks to complete. If the RetryQueue is already closed, the failed batch will be sent directly to the failure queue.
  5. Close SuccessBatchHandler and wait for it to finish completely. If the close method is called in the callback, the wait process will be skipped. After detecting the Close signal, SuccessBatchHandler retrieves all batches from the success queue and processes them one at a time.
  6. Close FailureBatchHandler and wait for it to finish completely. If the close method is called in the callback, the wait process will be skipped. After detecting the Close signal, FailureBatchHandler retrieves all batches from the failure queue and processes them one by one.

In this way, by closing queues and threads one by one based on the direction of data flow, graceful shutdown and safe termination are achieved.

Summary

Alibaba Cloud LOG Java Producer is a comprehensive upgrade to previous versions of Producer. It resolves many issues with previous versions, such as high CPU utilization in the event of a network exception and low data loss when exiting Producer. In addition, the fault tolerance mechanism has been strengthened. Producer can ensure proper resource usage, high throughput, and tight isolation even after a mistake.

Recommended Posts

Java Library-Alibi Cloud's LOG Java Producer helps send data to log services
How to use Alibaba Cloud LOG Java Producer
Kotlin Class to send to Java developers
Log output to file in Java
Kotlin Class part.2 to send to Java developers
[Java] How to add data to List (add, addAll)
Kotlin scope functions to send to Java developers
Java: How to send values from Servlet to Servlet
Kotlin's Null safety to send to Java developers