Using Elasticsearch Java-API (Aggregation)

This article is a continuation of Using Elasticsearch Java-API (BulkRequest Registration). See above for requirements, data structures, etc.

By the way, I would like to handle various data registered in Bulk.

Requirements

――I want to display the top 10 daily sales rankings. ――I want to be able to switch the date range by day, week, month, etc. ――I want to be able to change the number of sales rankings. ――I want to be able to see it separately by general and genre.

Acquisition method

Kibana

image.png

You can now view the TOP 5 of a particular day.

Query

The above configuration can be expressed as a query as follows.

GET sample-ranking-qiita-*/_search
{
  "aggs": {
    "range": {
      "date_range": {
        "field": "execDate",
        "ranges": [
          {
            "from": "2018/01/15T00:00:00+09:00",
            "to": "2018/01/15T23:59:59+09:00"
          }
        ]
      },
      "aggs": {
        "title_rate": {
          "terms": {
            "field": "title.keyword",
            "size": 5,
            "order": {
              "sum_rate": "desc"
            }
          },
          "aggs": {
            "sum_rate": {
              "sum": {
                "field": "rate"
              }
            }
          }
        }
      }
    }
  }
}

Isn't it a little hard to see the nesting of aggs? The following three aggs are defined.

  1. Sum the rates with the name sum_rate
  2. Display sum_rate by title in descending order with the name title_rate.
  3. The aggregation range of title_rate shall be between from and to of ʻexec Date`.

SQL

For reference, SQL is such an image. (I haven't tried it, so I'm sorry if I make a mistake)

ranking.sql


SELECT
	ranking
	, title
	, sum_rate
FROM (
	select
		ROW_NUMBER() OVER (ORDER BY sum_rate DESC) AS ranking
		, tb1.title
		, tb1.sum_rate 
	FROM (
		SELECT
			title
			, SUM(rate) as sum_rate 
		FROM
			tbl_ranking
		WHERE
			execDate >= '2018/01/15T00:00:00+09:00'
			AND execDate <= '2018/01/15T23:59:59+09:00'
		GROUP BY
			title
	) tb1
)
WHERE
	ranking <= 5

Acquisition result

When I get the result of the query, the result of aggs is nested inside the ʻaggregationstag. The list in thebuckets` tag is the result list for each aggs. You can see the same titles displayed in Kibana. The total rate value also matches.

{
  "took": 426,
  "timed_out": false,
  "_shards": {
    "total": 15,
    "successful": 15,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 24800,
    "max_score": 1,
    "hits": [
(Omitted)
    ]
  },
  "aggregations": {
    "range": {
      "buckets": [
        {
          // 3. title_Aggregate date range of rate------------------------------
          "key": "2018/01/14T15:00:00+0000-2018/01/15T14:59:59+0000",
          "from": 1515942000000,
          "from_as_string": "2018/01/14T15:00:00+0000",
          "to": 1516028399000,
          "to_as_string": "2018/01/15T14:59:59+0000",
          "doc_count": 400,
          // 2.title another sum_rate descending order------------------------------
          "title_rate": {
            "doc_count_error_upper_bound": -1,
            "sum_other_doc_count": 380,
            "buckets": [
              {
                "key": "New opening",
                "doc_count": 4,
                "sum_rate": {
                  // 1.rate total------------
                  "value": 355
                }
              },
              {
                "key": "High school student",
                "doc_count": 4,
                "sum_rate": {
                  "value": 337
                }
              },
              {
                "key": "Intermediate test",
                "doc_count": 4,
                "sum_rate": {
                  "value": 333
                }
              },
              {
                "key": "The truth of the host",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              },
              {
                "key": "A moment of the match",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              }
            ]
          }
        }
      ]
    }
  }
}

Java-API

Now, let's get the above query with Java-API.

AggsSampleViewer.java


public class AggsSampleViewer {

	private static Logger logger = LoggerFactory.getLogger(AggsSampleViewer.class);

	//Year / month index format(yyyyMM)
	private static DateTimeFormatter YM_INDEX_FORMATTER;
	//Execution date format(yyyy/MM/dd'T'HH:mm:ss+09:00)
	private static DateTimeFormatter DT_INDEX_FORMATTER;

	//Various setting information
	QiitaSettingBean setting;


	/**
	 *constructor
	 * @param setting setting information
	 */
	public AggsSampleViewer(QiitaSettingBean setting) {
		super();
		this.setting = setting;

		DT_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getExecDateFormat());
		YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
	}

	/**
	 *Acquisition process execution
	 *
	 * @param subIndex Subindex name(Null when getting the whole)
	 * @throws Exception
	 */
	public void execute(String subIndex) throws Exception {

        //Create a client to handle ELS in Java
        // setting.getElasticearch().getAddress():IP address
        // setting.getElasticearch().getPort():port number(Usually 9300)
		TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
				new InetSocketTransportAddress(InetAddress.getByName(setting.getElasticearch().getAddress()),
						setting.getElasticearch().getPort()));

		//Index generation for search(When sub-index is specified, sub is also included)
		String searchIndex = setting.getElsImport().getIndex();
		if (StringUtils.isNotEmpty(subIndex)) {
			searchIndex = searchIndex + "-" + subIndex;
		}
		searchIndex += "*";

		logger.debug("■■■ Search target index: " + searchIndex);

		// sum_Aggs for rate: Calculate the total value of rate
		AggregationBuilder sumRateAggs = AggregationBuilders.sum("sum_rate").field("rate");

		// title_Aggs for rate: Descending order of rate by title
		AggregationBuilder titleRateAggs = AggregationBuilders.terms("title_rate")
				//Aggregate by title
				.field("title.keyword")
				// setting.getRankingLimit():Ranking number
				.size(setting.getRankingLimit())
				//The display order is sum_descending order of rate
				.order(Order.aggregation("sum_rate", false))
				//to subaggs, sum_Added Aggs for rate
				.subAggregation(sumRateAggs)
				;

		//Aggs for date range
		AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
				//The target of the date range is the processing execution date
				.field("execDate")
				//Set the date range from the day before 0:00 on the processing target day to 0:00 on the day.
				//Example) setting.getNow(): 2018/01/16 13:20:35
				//		from:2018/01/15 00:00:00
				//		to  :2018/01/16 00:00:00
				.addRange(
						setting.getNow().minusDays(1).truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
						, setting.getNow().truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
				)
				//subaggs, title_Added Aggs for rate
				.subAggregation(titleRateAggs)
				;

		//Whole query
		SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
				.addAggregation(dateRangeAggs);


		//Get search results
		SearchResponse res = requestBuilder.get();

		//Get the result of Aggs of the whole query
		for (Aggregation aggs : res.getAggregations()) {
			logger.debug("aggs name: "+ aggs.getName());
		}

		//[Aggs for date range](type:date_range) ----------------
		InternalDateRange resDateRangeAggs = res.getAggregations().get("range");
		//Check the bucket in Aggs for date range
		for (InternalDateRange.Bucket dateRangeBucket : resDateRangeAggs.getBuckets()) {
			//key and doc_count can be obtained from any bucket
			logger.debug("* resDateRangeAggs bucket key: "+ dateRangeBucket.getKey());
			logger.debug("* resDateRangeAggs bucket doc_count: "+ dateRangeBucket.getDocCount());
			//Since it refers to the bucket for the date range, From and To can be obtained.
			logger.debug("* resDateRangeAggs bucket from: "+ dateRangeBucket.getFromAsString());
			logger.debug("* resDateRangeAggs bucket to: "+ dateRangeBucket.getToAsString());

			//Check Aggs results in Aggs for date range
			for (Aggregation aggs : dateRangeBucket.getAggregations()) {
				logger.debug("* resDateRangeAggs bucket aggs: "+ aggs.getName());
			}

			// 【title_Aggs for rate](type:terms) ----------------
			Terms resTitleRateAggs = dateRangeBucket.getAggregations().get("title_rate");

			// title_Check the bucket in Aggs for rate
			for (Terms.Bucket termBucket : resTitleRateAggs.getBuckets()) {
				logger.debug("** resTitleRateAggs bucket key: "+ termBucket.getKey());
				logger.debug("** resTitleRateAggs bucket doc_count: "+ termBucket.getDocCount());

				// title_Aggs result confirmation for rate
				for (Aggregation aggs : termBucket.getAggregations()) {
					logger.debug("** resTitleRateAggs bucket aggs: "+ aggs.getName());
				}

				// 【sum_Aggs for rate](type:sum) ----------------
				Sum resSumRateAggs = termBucket.getAggregations().get("sum_rate");

				//There is no Aggs in Sum and you can get sum results
				logger.debug("*** resSumRateAggs sum name: "+ resSumRateAggs.getName());
				logger.debug("*** resSumRateAggs sum value: "+ resSumRateAggs.getValueAsString());
			}
		}
	}
}

We will define Aggs with the same nested structure as the query. Is it a little difficult to find out which Bucket is returned using which Aggs ^^; It is best to check the formula in this area.

It says how to define the formula, but it doesn't say what kind of information will be returned as a result ... Is it going to be considered by making full use of auto-completion etc ...

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

Execution is done like this.

main.java


AggsSampleViewer viewer = new AggsSampleViewer(setting);

//Overall ranking information generation
logger.debug("■ Overall ranking----------------------------------------");
viewer.execute(null);

//Boy ranking information generation
logger.debug("■ Boy ranking----------------------------------------");
viewer.execute("boy");

Execution result

■ Overall ranking---------------------------------------- 
■■■ Search target index: sample-ranking-qiita* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 400 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key:New opening
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 355.0 
** resTitleRateAggs bucket key:High school student
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key:Intermediate test
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 333.0 
** resTitleRateAggs bucket key:The truth of the host
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
** resTitleRateAggs bucket key:A moment of the match
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
■ Boy ranking---------------------------------------- 
■■■ Search target index: sample-ranking-qiita-boy* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 141 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key:High school student
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key:Knockout and episode 2
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 287.0 
** resTitleRateAggs bucket key:Results forward and appearance
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 278.0 
** resTitleRateAggs bucket key:heart
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 267.0 
** resTitleRateAggs bucket key:Championship
** resTitleRateAggs bucket doc_count: 3 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 260.0 

I was able to get the same results as Kibana and queries! Ah, it is natural as an aggregate query, but with ʻaggs, only the fields used for aggregation can be acquired, and other fields (authors, etc. in this case) cannot be acquired. It would be nice if I could get the result of associating with the field in ʻaggs in one shot with query, but it doesn't seem like that. If you increase the size of query, the possibility of getting it will increase, but I think that the upper limit of size of query is 10000, so you can definitely get it within this range. If you are not sure, it is better to get it quietly with another query.

The genre of books is defined as a sub-index to make it easier to narrow down the search range. I wondered if narrowing down as an index would reduce the burden as much as possible when searching for a long-term range. Of course, it can be included in the query condition.

AggsSampleViewer2.java


		//Query for genre filtering
		QueryBuilder queryBuilder = QueryBuilders.termQuery("index", subIndex);


		//Whole query
		SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
				.addAggregation(dateRangeAggs);

		//Add query if subindex is specified
		if (StringUtils.isNotEmpty(subIndex)) {
			requestBuilder.setQuery(queryBuilder);
		}

You can also narrow down by using Query Builder like this.

Switching date range

If you want to change the date range, such as weekly or monthly, switch within ʻaddRange of dateRangeAggs`. I also created range definition information because I want to be able to change it freely from the settings.

AggsSampleViewer3.java


		//Aggs for date range
		AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
				//The target of the date range is the processing execution date
				.field("execDate")
				//Set the date range from From to To of the search range
				//Example)From 10 days before the execution date to midnight on the execution date
				//"searchRange": {
				//		"from": "-10",
				//		"fromChrono": "days",
				//		"to": "0",
				//		"toChrono": "days"
				//}
				.addRange(
						DT_INDEX_FORMATTER.format(
								//Add the From of the search range setting on the execution date in the specified unit unit
								setting.getNow()
									.plus(setting.getSearchRange().getFrom()
											, setting.getSearchRange().findFromChronoUnit())
								//Cut off after the day (make it exactly at 0 o'clock)
								.truncatedTo(ChronoUnit.DAYS))
						, DT_INDEX_FORMATTER.format(
								//Add the To of the search range setting on the execution date in the specified unit unit
								setting.getNow()
									.plus(setting.getSearchRange().getTo()
											, setting.getSearchRange().findToChronoUnit())
								//Cut off after the day (make it exactly at 0 o'clock)
								.truncatedTo(ChronoUnit.DAYS))
				)
				//subaggs, title_Added Aggs for rate
				.subAggregation(titleRateAggs)
				;

RangeBean.java


public class RangeBean {
	private int from;
	private String fromChrono;
	private int to;
	private String toChrono;

	public ChronoUnit findFromChronoUnit() {
		return findChronoUnit(fromChrono);
	}

	public ChronoUnit findToChronoUnit() {
		return findChronoUnit(toChrono);
	}

	/**
	 *Change Chrono string to unit
	 *
	 * @param chrono
	 * @return
	 */
	public ChronoUnit findChronoUnit(String chrono) {
		if (StringUtils.equals(chrono, "days")) {
			return ChronoUnit.DAYS;
		}
		if (StringUtils.equals(chrono, "weeks")) {
			return ChronoUnit.WEEKS;
		}
		if (StringUtils.equals(chrono, "months")) {
			return ChronoUnit.MONTHS;
		}
		if (StringUtils.equals(chrono, "years")) {
			return ChronoUnit.YEARS;
		}

		return null;
	}

	//(Omitted)
}

Now you can specify the range as you like, either monthly or three years.

Next time, I would like to write how to specify and get a query in Java-API.

Recommended Posts

Using Elasticsearch Java-API (Aggregation)
Using Elasticsearch Java-API (BulkRequest registration) [Additional notes]
I tried using Elasticsearch API in Java
Test the integrity of the aggregation using ArchUnit ②
Status monitoring of java application using Elasticsearch
Test the integrity of the aggregation using ArchUnit ③