[Java] PostgreSQL time series data case: automatic compression of time series data

7 minute read

This blog is a translation from the English version. You can check the original from here. Some machine translations are used. If you find any mistakes in the translation, we would appreciate it if you could point out.

background

One of the most important features of time series databases is compression over time. For example, the last day’s data is compressed to about 5 minutes and the last week’s data is compressed to about 30 minutes.

The PostgreSQL compression algorithm is customizable. For example, simple average compression, maximum compression, minimum compression, or compression based on the revolving door compression algorithm.

Implementation of revolving door data compression algorithm in PostgreSQL-Application of streaming compression in IoT, monitoring and sensor scenarios

In this article, I introduce a simple compression scenario, such as compressing an RRD database into dimensions such as average, maximum, minimum, total, and number of records depending on the time dimension.

It also introduces advanced SQL usage such as window query, year-over-year comparison, period comparison UDF (including KNN calculation), and uniform writing by the hour.

design

image.png

Detail table

create table tbl (
  id serial8 primary key, --primary key
  sid int, --sensor ID
  hid int, - indicator D
  val float8, --collected value
  ts timestamp --acquisition time
);
  
create index idx_tbl on tbl(ts);

Compressed table

1-5 minute compression table

create table tbl_5min (
  id serial8 primary key, --primary key
  sid int, --sensor ID
  hid int, - indicator ID
  val float8, --inheritance, average, easy to do ring analysis
  ts timestamp, --inheritance, start time, easy to do ring analysis
  val_min float8, --minimum
  val_max float8, --maximum
  val_sum float8, --and
  val_count float8, --number of acquisitions
  ts_start timestamp, --interval start time
  ts_end timestamp --interval end time
);
  
alter table tbl_5min inherit tbl;

2, 30 minutes compression table

create table tbl_30min (
  id serial8 primary key, --primary key
  sid int, --sensor ID
  hid int, - indicator ID
  val float8, --inheritance, average, easy to do ring analysis
  ts timestamp, --inheritance, start time, easy to do ring analysis
  val_min float8, --minimum
  val_max float8, --maximum
  val_sum float8, --and
  val_count float8, --number of acquisitions
  ts_start timestamp, --interval start time
  ts_end timestamp --interval end time

);
  
alter table tbl_30min inherit tbl;

Compressed sentence for 3 or 5 minutes

with tmp1 as (
  delete from only tbl where ts <= now()-interval '1 day' returning *
)
insert into tbl_5min
  (sid, hid, val, ts, val_min, val_max, val_sum, val_count, ts_start, ts_end)
select sid, hid, avg(val) as val, min(ts) as ts, min(val) as val_min, max(val) as val_max, sum(val) as val_sum, count(*) as val_count, min(ts ) as ts_start, max(ts) as ts_end from
tmp1
group by sid, hid, substring(to_char(ts,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts,'yyyymmddhh24mi'), 11, 2)::int / 5) * 5 )::text, 2, '0');

4, 30 minutes compressed sentence

with tmp1 as (
  delete from only tbl_5min where ts_start <= now()-interval '1 day' returning *
)
insert into tbl_30min
  (sid, hid, val_min, val_max, val_sum, val_count, ts_start, ts_end)
select sid, hid, min(val_min) as val_min, max(val_max) as val_max, sum(val_sum) as val_sum, sum(val_count) as val_count, min(ts_start) as ts_start, max(ts_end) as ts_end from
tmp1
group by sid, hid, substring(to_char(ts_start,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start,'yyyymmddhh24mi'), 11, 2)::int / 30) * 30 )::text, 2, '0');

demo

Write and distribute 100 million detailed test data in 10 days.

insert into tbl (sid, hid, val, ts) select random()*1000, random()*5, random()*100, – 1000 sensors and 5 indicators per sensor.
  now()-interval '10 day' + (id * ((10*24*60*60/100000000.0)||' sec')::interval) – push back for 10 days as the starting point + (id * time taken for each record)
from generate_series(1,100000000) t(id);

Compression scheduling for 2-5 minutes. For the data on the last day, the following SQL is scheduled hourly.

with tmp1 as (
  delete from only tbl where ts <= now()-interval '1 day' returning *
)
insert into tbl_5min
  (sid, hid, val, ts, val_min, val_max, val_sum, val_count, ts_start, ts_end)
select sid, hid, avg(val) as val, min(ts) as ts, min(val) as val_min, max(val) as val_max, sum(val) as val_sum, count(*) as val_count, min(ts ) as ts_start, max(ts) as ts_end from
tmp1
group by sid, hid, substring(to_char(ts,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts,'yyyymmddhh24mi'), 11, 2)::int / 5) * 5 )::text, 2, '0');

3, 30 minutes compression scheduling. For the most recent week’s data, the following SQL is scheduled hourly.

with tmp1 as (
  delete from only tbl_5min where ts_start <= now()-interval '1 day' returning *
)
insert into tbl_30min
  (sid, hid, val_min, val_max, val_sum, val_count, ts_start, ts_end)select sid, hid, min(val_min) as val_min, max(val_max) as val_max, sum(val_sum) as val_sum, sum(val_count) as val_count, min(ts_start) as ts_start, max(ts_end) as ts_end from
tmp1
group by sid, hid, substring(to_char(ts_start,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start,'yyyymmddhh24mi'), 11, 2)::int / 30) * 30 )::text, 2, '0');

Overview

1, group time by intervals and use integer division + multiplication.

For example For 5 minutes:

substring(to_char(ts,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts,'yyyymmddhh24mi'), 11, 2)::int / 5) * 5)::text, 2 , '0');

For 30 minutes:

substring(to_char(ts_start,'yyyymmddhh24mi'), 1, 10) || lpad(((substring(to_char(ts_start,'yyyymmddhh24mi'), 11, 2)::int / 30) * 30)::text, 2 , '0')
  1. Generate evenly distributed time series data. By using PG interval and generate_series, write time can be evenly distributed to the corresponding intervals.
insert into tbl (sid, hid, val, ts) select random()*1000, random()*5, random()*100, – 1000 sensors and 5 indicators per sensor.
  now()-interval '10 day' + (id * ((10*24*60*60/100000000.0)||' sec')::interval) – push back for 10 days as the starting point + (id * time taken for each record)
from generate_series(1,100000000) t(id);
  1. One of the most important features of time series databases is that they get compressed over time. For example, the last day’s data is compressed to 5 minutes points and the last week’s data is compressed to 30 minutes points.

The PostgreSQL compression algorithm is customizable. For example, simple average compression, maximum and minimum compression, or compression based on the revolving door compression algorithm.

In this article, I will introduce a simple compression scenario, such as compressing an RRD database into average, maximum, minimum, total, number of records, and other dimensions depending on the time dimension.

Add scheduling.

PostgreSQL scheduled task method 2

[PostgreSQL Oracle Compatibility-DBMS_JOBS-Daily Maintenance-Timing Task (pgagent)](https://github.com/digoal/blog/blob/master/201305/20130531_01.md?spm=a2c65.11461447.0.0.36311fe3ZBBoT1&file=(20130531_01.md)

  1. After compression, it contains interval, maximum, minimum, average and point values. These values can be used to draw the graphics.

  2. By combining with the PG window function, you can easily draw year-on-year graphs and graphs that exceed the cycle. An example of SQL is as follows.

Exponent and acceleration

create index idx_tbl_2 on tbl using btree (sid, hid, ts);
create index idx_tbl_5min_2 on tbl_5min using btree (sid, hid, ts);
create index idx_tbl_30min_2 on tbl_30min using btree (sid, hid, ts);

A complex type that returns the value of the period and period comparison.

create type tp as (id int8, sid int, hid int, val float8, ts timestamp);

Gets a per-period value function that returns a record with the specified SID and HID near a particular point in time, including the KNN algorithm.

create or replace function get_val(v_sid int, v_hid int, v_ts timestamp) returns tp as
$$
  
select t.tp from
(
select
(select (id, sid, hid, val, ts)::tp tp from only tbl where sid=1 and hid=1 and ts>= now() limit 1)
union all
select
(select (id, sid, hid, val, ts)::tp tp from only tbl where sid=1 and hid=1 and ts< now() limit 1)
) t
order by (t.tp).ts limit 1;

$$
 language sql strict;

Year-on-Year, Weekly, Monthly (These values can also be generated automatically to avoid calculations in each query).

select
sid,
hid,
val,
lag(val) over w1, --same ratio
get_val(sid, hid, ts-interval '1 week'), --circular ratio
get_val(sid, hid, ts-interval '1 month')-Moon ratio
  from tbl --where ..., time interval dots.
window w1 as (partition by sid, hid order by ts)
  1. By combining with linear regression of PG, you can draw a prediction index. The example below illustrates this in more detail.

PostgreSQL Multiple Linear Regression-Stock Price Forecast 2

[Forecast using PostgreSQL linear regression analysis-Example 2 predicts the closing price of the next few days](https://github.com/digoal/blog/blob/master/201503/20150305_01.md?(spm=a2c65.11461447.0.0.36311fe3ZBBoT1&file=20150305_01.md)

PostgreSQL linear regression-stock price forecast 1

Using linear regression analysis with PostgreSQL-data prediction implementation

  1. The compressed table is inherited to the detail table to facilitate development. This eliminates the need to write UNION SQL and you can retrieve all the data (including compressed data) just by looking at the DETAIL table.

Related cases

Timeout Streaming-Do not monitor received message data exception

Practice of Alibaba Cloud RDS PostgreSQL Varbitx-Stream tag (asynchronous batch consumption by Atomicity Stream/Batch Computing)-Tag a person arbitrarily in milliseconds (up to several trillion).

PostgreSQL Streaming Statistics-“insert on conflict” Implement Streaming Unique Visitors (Individual), Min, Max, Average, Sum, Count

[HTAP Database PostgreSQL Scenarios and Performance Tests-#32 (OLTP) High Throughput Data Input/Output (No Storage, No Row Scan, No Index)-Asynchronous Batch Consumption Using Atomicity (JSON + Functional Stream Computing)](https(://github.com/digoal/blog/blob/master/201711/20171107_33.md?spm=a2c65.11461447.0.0.36311fe3ZBBoT1&file=20171107_33.md)HTAPdatabasePostgreSQLscenarioandperformancetest-31st(OLTP)HighthroughputdataI/O(storage,rowscan,noindex)-Asynchronousasynchronousbatchconsumption(paralleltestforhighthroughputread/write)

HTAP database PostgreSQL scenario and performance test-27th (OLTP) IoT-FEED log, stream computing, asynchronous batch consumption with atomicity (CTE)

Real-time statistics of 10 million/s is achieved with PostgreSQL-based streaming pipeline DB

Alibaba Cloud is the No. 1 cloud infrastructure operator in the Asia-Pacific region (2019 Gartner) with two data centers in Japan and more than 60 availability zones in the world. Click here for more information about Alibaba Cloud. Alibaba Cloud Japan Official Page