In this article, we will introduce the easy-to-use and highly configurable ** Java ** library "** Alibaba Cloud LOG Java Producer **" that supports data transmission to the log service.
Logs are everywhere. As a carrier to record changes in the world, logs are widely used in many fields such as marketing, R & D, operations, security, BI, and auditing.
Alibaba Log Service is an all-in-one service platform for log data. Its core component, LogHub, is an infrastructure for big data processing, especially real-time data processing, with great features such as high throughput, low latency, and autoscaling. Flink, Spark, Jobs running on big data computing engines such as Storm write data processing results and intermediate results to LogHub in real time. I will. Using data from LogHub, downstream systems can provide many services such as query analysis, alarm monitoring, machine learning, and iterative calculations. LogHub's big data processing architecture looks like the following figure.
In order for your system to work properly, you need to use a convenient and efficient way to write data. Direct use of APIs and SDKs is not enough to meet the data write capability requirements in big data scenarios. That's why "Alibaba Cloud LOG Java Producer" was developed.
Alibaba Cloud LOG Java Producer is an easy-to-use and highly configurable Java class library. It has the following functions.
Writing data to LogHub using Producer has the following advantages over using API or SDK.
With large amounts of data and limited resources, complex logic such as multithreading, cache policy, batch processing, and retry in the event of a failure must be implemented to achieve the desired throughput. Producer implements the above logic to improve application performance and simplify the application development process.
If you have enough cache memory, Producer will cache the data you send to LogHub. When you call the send method, the specified data is sent immediately without blocking processing. This realizes the separation of operation and I / O logic. At a later date, you can retrieve the data transmission results from the returned future objects and registered callbacks.
The size of the memory used by the Producer to cache the data to be sent can be controlled by parameters as well as the number of threads used for the data sending task. This avoids Producer consuming unlimited resources. You can also balance resource consumption and write throughput depending on the situation.
In summary, Producer offers many benefits by automatically handling complex underlying details and exposing a simple interface. In addition, it does not affect the normal operation of higher layer services and can significantly reduce the threshold for data access.
To better understand Producer performance, this section describes how Producer works, including data write logic, core component implementation, and graceful shutdown. The overall architecture of Producer is shown in the figure below.
Producer data writing logic:
producer.send ()
method to send the data to the specified log store, the data will be loaded into the Producer batch in the LogAccumulator. Normally, the send method returns the result immediately. However, if the Producer instance does not have enough space to store the desired data, the send method will block until one of the following conditions is met:------1, the previously cached data is processed by the batch handler and the memory occupied by that data is released. As a result, the Producer will have enough space to store the data of interest. ------ 2, an exception will be thrown if the specified blocking time is exceeded.
When you call Producer.send (), the number of logs of the target batch may exceed maxBatchCount, or there may not be enough space to store the target data in the target batch. In this case, Producer first sends the target batch to IOThreadPool and then creates a new batch to store the target data. To avoid blocking threads, IOThreadPool uses an unlimited blocking queue. The number of logs that can be cached in a Producer instance is limited, so the queue length will not grow indefinitely.
Mover traverses each Producer batch in LogAccumulator and sends batches that exceed the maximum cache time to expiredBatches. It also records the earliest expiration time (t) for unexpired batches. 4, then LogAccumulator sends an expired batch to IOThreadPool.
After that, Mover gets the Producer batch that matches the transmission conditions from RetryQueue. If no batch meets the conditions, wait for a period of t.
Then send the expired batch from RetryQueue to IOThreadPool. At the end of step 6, Mover repeats steps 3-6.
IOThreadPool worker threads send batches from blocked queues to the target logstore. 8, after the batch is sent to the log store, it goes to the success queue.
If the transmission fails and one of the following conditions is met, go to the failure queue. ------- 1, the failed batch cannot be retried. -------2, RetryQueue will be closed. ------- 3, The specified number of retries has been reached, and the number of batches in the failure queue does not exceed 1/2 of the total number of batches sent.
Otherwise, the worker thread will calculate the next send time for the failed batch and send it to the RetryQueue.
The SuccessBatchHandler thread takes the batch from the success queue and executes all the callbacks registered in this batch.
The FailureBatchHandler thread takes the batch from the failure queue and executes all the callbacks registered in this batch.
The core component of Producer is [LogAccumulator](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer /internals/LogAccumulator.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=LogAccumulator.java), [RetryQueue](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=RetryQueue.java), Mover -log-java-producer / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producer / internals / Mover.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = Mover.java), [ IOThreadPool](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/IOThreadPool.java?spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = IOThreadPool.java), [SendProducerBatchTask](https://github.com/aliyun/aliyun-log-java-producer/blob/master/src/main /java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java?spm=a2c65.11461447.0.0.7c3a1eddqF17Kl&file=SendProducerBatchTask.java), [BatchHandler](https://github.com/aliyun/aliyun -log-java-producer / blob / master / src / main / java / com / aliyun / openservices / aliyun / log / producer / internals / BatchHandler.java? Spm = a2c65.11461447.0.0.7c3a1eddqF17Kl & file = BatchHandler.java) ..
LogAccumulator It is common to store data in larger batches and send the data in batches to improve throughput. The main role of the LogAccumulator described here is to merge the data into batches. To merge different data into a large batch, the data must have the same project, logstore, topic, source, and shardHash properties. LogAccumulator caches these data in different locations on the internal map based on these properties. The key of the map is the 5 elements of the above 5 properties and the value is ProducerBatch. ConcurrentMap is used to ensure thread safety and high concurrency.
Another feature of LogAccumulator is to control the total size of the cached data. I am using Semaphore to implement this control logic. Semaphore is an AbstractQueuedSynchronizer-based (AQS-based) high-performance synchronization tool. Semaphore first attempts to acquire shared resources by spinning, reducing context switch overhead.
RetryQueue RetryQueue is used to store batches that have failed to send and are waiting to be retried. Each of these batches has a field that indicates when to send the batch. To efficiently retrieve expired batches, the producer has a Delay Queue to store these batches. DelayQueue is a time-based, high-priority queue that processes the earliest expired batch first. This queue is thread safe.
Mover Mover is an independent thread. Periodically send expired batches to IOThreadPool from LogAccumulator and RetryQueue. Mover occupies CPU resources even when idle. To avoid wasting CPU resources, Mover waits for expired batches from RetryQueue while it cannot find a suitable batch sent by LogAccumulator and RetryQueue. This period is the maximum cache time lingerMs configured.
IOThreadPool Worker threads in IOThreadPool send data to the log store. The size of IOThreadPool can be specified with the ioThreadCount parameter, and the default value is twice the number of processors.
SendProducerBatchTask SendProducerBatchTask is encapsulated in batch send logic. To avoid blocking the I / O thread, SendProducerBatchTask sends the target batch to another queue for callback execution, regardless of whether the target batch was sent successfully. In addition, if the failed batch meets the retry criteria, it will not be resent immediately in the current I / O thread. If it is resent immediately, it usually fails again. Instead, SendProducerBatchTask sends to RetryQueue according to an exponential backoff policy.
BatchHandler The producer launches SuccessBatchHandler and FailureBatchHandler to handle the batches that succeeded and failed to send. After the handler completes the callback execution and future configuration of the batch, it frees the memory occupied by this batch for new data. Separate processing ensures that successfully sent and failed batches are separated. This ensures the smooth operation of Producer.
GracefulShutdown To implement GracefulShutdown, the following requirements must be met:
To meet the above requirements, the producer's close logic is designed as follows:
In this way, by closing queues and threads one by one based on the direction of data flow, graceful shutdown and safe termination are achieved.
Alibaba Cloud LOG Java Producer is a comprehensive upgrade to previous versions of Producer. It resolves many issues with previous versions, such as high CPU utilization in the event of a network exception and low data loss when exiting Producer. In addition, the fault tolerance mechanism has been strengthened. Producer can ensure proper resource usage, high throughput, and tight isolation even after a mistake.
Recommended Posts