This article is a continuation of Using Elasticsearch Java-API (BulkRequest Registration). See above for requirements, data structures, etc.
By the way, I would like to handle various data registered in Bulk.
――I want to display the top 10 daily sales rankings. ――I want to be able to switch the date range by day, week, month, etc. ――I want to be able to change the number of sales rankings. ――I want to be able to see it separately by general and genre.
Kibana
You can now view the TOP 5 of a particular day.
The above configuration can be expressed as a query as follows.
GET sample-ranking-qiita-*/_search
{
"aggs": {
"range": {
"date_range": {
"field": "execDate",
"ranges": [
{
"from": "2018/01/15T00:00:00+09:00",
"to": "2018/01/15T23:59:59+09:00"
}
]
},
"aggs": {
"title_rate": {
"terms": {
"field": "title.keyword",
"size": 5,
"order": {
"sum_rate": "desc"
}
},
"aggs": {
"sum_rate": {
"sum": {
"field": "rate"
}
}
}
}
}
}
}
}
Isn't it a little hard to see the nesting of aggs? The following three aggs are defined.
rate
s with the name sum_rate
sum_rate
by title
in descending order with the name title_rate
.title_rate
shall be between from
and to
of ʻexec Date`.SQL
For reference, SQL is such an image. (I haven't tried it, so I'm sorry if I make a mistake)
ranking.sql
SELECT
ranking
, title
, sum_rate
FROM (
select
ROW_NUMBER() OVER (ORDER BY sum_rate DESC) AS ranking
, tb1.title
, tb1.sum_rate
FROM (
SELECT
title
, SUM(rate) as sum_rate
FROM
tbl_ranking
WHERE
execDate >= '2018/01/15T00:00:00+09:00'
AND execDate <= '2018/01/15T23:59:59+09:00'
GROUP BY
title
) tb1
)
WHERE
ranking <= 5
When I get the result of the query, the result of aggs is nested inside the ʻaggregationstag. The list in the
buckets` tag is the result list for each aggs.
You can see the same titles displayed in Kibana. The total rate value also matches.
{
"took": 426,
"timed_out": false,
"_shards": {
"total": 15,
"successful": 15,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 24800,
"max_score": 1,
"hits": [
(Omitted)
]
},
"aggregations": {
"range": {
"buckets": [
{
// 3. title_Aggregate date range of rate------------------------------
"key": "2018/01/14T15:00:00+0000-2018/01/15T14:59:59+0000",
"from": 1515942000000,
"from_as_string": "2018/01/14T15:00:00+0000",
"to": 1516028399000,
"to_as_string": "2018/01/15T14:59:59+0000",
"doc_count": 400,
// 2.title another sum_rate descending order------------------------------
"title_rate": {
"doc_count_error_upper_bound": -1,
"sum_other_doc_count": 380,
"buckets": [
{
"key": "New opening",
"doc_count": 4,
"sum_rate": {
// 1.rate total------------
"value": 355
}
},
{
"key": "High school student",
"doc_count": 4,
"sum_rate": {
"value": 337
}
},
{
"key": "Intermediate test",
"doc_count": 4,
"sum_rate": {
"value": 333
}
},
{
"key": "The truth of the host",
"doc_count": 4,
"sum_rate": {
"value": 292
}
},
{
"key": "A moment of the match",
"doc_count": 4,
"sum_rate": {
"value": 292
}
}
]
}
}
]
}
}
}
Java-API
Now, let's get the above query with Java-API.
AggsSampleViewer.java
public class AggsSampleViewer {
private static Logger logger = LoggerFactory.getLogger(AggsSampleViewer.class);
//Year / month index format(yyyyMM)
private static DateTimeFormatter YM_INDEX_FORMATTER;
//Execution date format(yyyy/MM/dd'T'HH:mm:ss+09:00)
private static DateTimeFormatter DT_INDEX_FORMATTER;
//Various setting information
QiitaSettingBean setting;
/**
*constructor
* @param setting setting information
*/
public AggsSampleViewer(QiitaSettingBean setting) {
super();
this.setting = setting;
DT_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getExecDateFormat());
YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
}
/**
*Acquisition process execution
*
* @param subIndex Subindex name(Null when getting the whole)
* @throws Exception
*/
public void execute(String subIndex) throws Exception {
//Create a client to handle ELS in Java
// setting.getElasticearch().getAddress():IP address
// setting.getElasticearch().getPort():port number(Usually 9300)
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(setting.getElasticearch().getAddress()),
setting.getElasticearch().getPort()));
//Index generation for search(When sub-index is specified, sub is also included)
String searchIndex = setting.getElsImport().getIndex();
if (StringUtils.isNotEmpty(subIndex)) {
searchIndex = searchIndex + "-" + subIndex;
}
searchIndex += "*";
logger.debug("■■■ Search target index: " + searchIndex);
// sum_Aggs for rate: Calculate the total value of rate
AggregationBuilder sumRateAggs = AggregationBuilders.sum("sum_rate").field("rate");
// title_Aggs for rate: Descending order of rate by title
AggregationBuilder titleRateAggs = AggregationBuilders.terms("title_rate")
//Aggregate by title
.field("title.keyword")
// setting.getRankingLimit():Ranking number
.size(setting.getRankingLimit())
//The display order is sum_descending order of rate
.order(Order.aggregation("sum_rate", false))
//to subaggs, sum_Added Aggs for rate
.subAggregation(sumRateAggs)
;
//Aggs for date range
AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
//The target of the date range is the processing execution date
.field("execDate")
//Set the date range from the day before 0:00 on the processing target day to 0:00 on the day.
//Example) setting.getNow(): 2018/01/16 13:20:35
// from:2018/01/15 00:00:00
// to :2018/01/16 00:00:00
.addRange(
setting.getNow().minusDays(1).truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
, setting.getNow().truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
)
//subaggs, title_Added Aggs for rate
.subAggregation(titleRateAggs)
;
//Whole query
SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
.addAggregation(dateRangeAggs);
//Get search results
SearchResponse res = requestBuilder.get();
//Get the result of Aggs of the whole query
for (Aggregation aggs : res.getAggregations()) {
logger.debug("aggs name: "+ aggs.getName());
}
//[Aggs for date range](type:date_range) ----------------
InternalDateRange resDateRangeAggs = res.getAggregations().get("range");
//Check the bucket in Aggs for date range
for (InternalDateRange.Bucket dateRangeBucket : resDateRangeAggs.getBuckets()) {
//key and doc_count can be obtained from any bucket
logger.debug("* resDateRangeAggs bucket key: "+ dateRangeBucket.getKey());
logger.debug("* resDateRangeAggs bucket doc_count: "+ dateRangeBucket.getDocCount());
//Since it refers to the bucket for the date range, From and To can be obtained.
logger.debug("* resDateRangeAggs bucket from: "+ dateRangeBucket.getFromAsString());
logger.debug("* resDateRangeAggs bucket to: "+ dateRangeBucket.getToAsString());
//Check Aggs results in Aggs for date range
for (Aggregation aggs : dateRangeBucket.getAggregations()) {
logger.debug("* resDateRangeAggs bucket aggs: "+ aggs.getName());
}
// 【title_Aggs for rate](type:terms) ----------------
Terms resTitleRateAggs = dateRangeBucket.getAggregations().get("title_rate");
// title_Check the bucket in Aggs for rate
for (Terms.Bucket termBucket : resTitleRateAggs.getBuckets()) {
logger.debug("** resTitleRateAggs bucket key: "+ termBucket.getKey());
logger.debug("** resTitleRateAggs bucket doc_count: "+ termBucket.getDocCount());
// title_Aggs result confirmation for rate
for (Aggregation aggs : termBucket.getAggregations()) {
logger.debug("** resTitleRateAggs bucket aggs: "+ aggs.getName());
}
// 【sum_Aggs for rate](type:sum) ----------------
Sum resSumRateAggs = termBucket.getAggregations().get("sum_rate");
//There is no Aggs in Sum and you can get sum results
logger.debug("*** resSumRateAggs sum name: "+ resSumRateAggs.getName());
logger.debug("*** resSumRateAggs sum value: "+ resSumRateAggs.getValueAsString());
}
}
}
}
We will define Aggs with the same nested structure as the query. Is it a little difficult to find out which Bucket is returned using which Aggs ^^; It is best to check the formula in this area.
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
Execution is done like this.
main.java
AggsSampleViewer viewer = new AggsSampleViewer(setting);
//Overall ranking information generation
logger.debug("■ Overall ranking----------------------------------------");
viewer.execute(null);
//Boy ranking information generation
logger.debug("■ Boy ranking----------------------------------------");
viewer.execute("boy");
■ Overall ranking----------------------------------------
■■■ Search target index: sample-ranking-qiita*
aggs name: range
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000
* resDateRangeAggs bucket doc_count: 400
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000
* resDateRangeAggs bucket aggs: title_rate
** resTitleRateAggs bucket key:New opening
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 355.0
** resTitleRateAggs bucket key:High school student
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 337.0
** resTitleRateAggs bucket key:Intermediate test
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 333.0
** resTitleRateAggs bucket key:The truth of the host
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 292.0
** resTitleRateAggs bucket key:A moment of the match
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 292.0
■ Boy ranking----------------------------------------
■■■ Search target index: sample-ranking-qiita-boy*
aggs name: range
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000
* resDateRangeAggs bucket doc_count: 141
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000
* resDateRangeAggs bucket aggs: title_rate
** resTitleRateAggs bucket key:High school student
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 337.0
** resTitleRateAggs bucket key:Knockout and episode 2
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 287.0
** resTitleRateAggs bucket key:Results forward and appearance
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 278.0
** resTitleRateAggs bucket key:heart
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 267.0
** resTitleRateAggs bucket key:Championship
** resTitleRateAggs bucket doc_count: 3
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 260.0
I was able to get the same results as Kibana and queries!
Ah, it is natural as an aggregate query, but with ʻaggs, only the fields used for aggregation can be acquired, and other fields (authors, etc. in this case) cannot be acquired. It would be nice if I could get the result of associating with the field in ʻaggs
in one shot with query
, but it doesn't seem like that.
If you increase the size
of query
, the possibility of getting it will increase, but I think that the upper limit of size
of query
is 10000
, so you can definitely get it within this range. If you are not sure, it is better to get it quietly with another query.
The genre of books is defined as a sub-index to make it easier to narrow down the search range. I wondered if narrowing down as an index would reduce the burden as much as possible when searching for a long-term range. Of course, it can be included in the query condition.
AggsSampleViewer2.java
//Query for genre filtering
QueryBuilder queryBuilder = QueryBuilders.termQuery("index", subIndex);
//Whole query
SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
.addAggregation(dateRangeAggs);
//Add query if subindex is specified
if (StringUtils.isNotEmpty(subIndex)) {
requestBuilder.setQuery(queryBuilder);
}
You can also narrow down by using Query Builder
like this.
If you want to change the date range, such as weekly or monthly, switch within ʻaddRange of
dateRangeAggs`.
I also created range definition information because I want to be able to change it freely from the settings.
AggsSampleViewer3.java
//Aggs for date range
AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
//The target of the date range is the processing execution date
.field("execDate")
//Set the date range from From to To of the search range
//Example)From 10 days before the execution date to midnight on the execution date
//"searchRange": {
// "from": "-10",
// "fromChrono": "days",
// "to": "0",
// "toChrono": "days"
//}
.addRange(
DT_INDEX_FORMATTER.format(
//Add the From of the search range setting on the execution date in the specified unit unit
setting.getNow()
.plus(setting.getSearchRange().getFrom()
, setting.getSearchRange().findFromChronoUnit())
//Cut off after the day (make it exactly at 0 o'clock)
.truncatedTo(ChronoUnit.DAYS))
, DT_INDEX_FORMATTER.format(
//Add the To of the search range setting on the execution date in the specified unit unit
setting.getNow()
.plus(setting.getSearchRange().getTo()
, setting.getSearchRange().findToChronoUnit())
//Cut off after the day (make it exactly at 0 o'clock)
.truncatedTo(ChronoUnit.DAYS))
)
//subaggs, title_Added Aggs for rate
.subAggregation(titleRateAggs)
;
RangeBean.java
public class RangeBean {
private int from;
private String fromChrono;
private int to;
private String toChrono;
public ChronoUnit findFromChronoUnit() {
return findChronoUnit(fromChrono);
}
public ChronoUnit findToChronoUnit() {
return findChronoUnit(toChrono);
}
/**
*Change Chrono string to unit
*
* @param chrono
* @return
*/
public ChronoUnit findChronoUnit(String chrono) {
if (StringUtils.equals(chrono, "days")) {
return ChronoUnit.DAYS;
}
if (StringUtils.equals(chrono, "weeks")) {
return ChronoUnit.WEEKS;
}
if (StringUtils.equals(chrono, "months")) {
return ChronoUnit.MONTHS;
}
if (StringUtils.equals(chrono, "years")) {
return ChronoUnit.YEARS;
}
return null;
}
//(Omitted)
}
Now you can specify the range as you like, either monthly or three years.
Next time, I would like to write how to specify and get a query in Java-API.
Recommended Posts