Using Elasticsearch Java-API (BulkRequest registration) [Additional notes]

Elasticsearch allows you to use the Java API.

Elasticsearch Java API Official Document

This is quite convenient, but it seems that there is not much information available, so I summarized it.

[2018/01/12] Addendum

The TransportClient used in this article is deprecated from the next major version 7 and will be removed in 8. Therefore, it is recommended to migrate to `High Level REST Client`. Please see the comments section for details. Thank you for pointing out, johtani.

Requirements

I would like to make it with such an image.

Development environment

Data structure

In the case of RDB, I think that the flow is to have branch information and title information in separate TBLs, create sales TBLs, and connect them ... but in the case of ELS, it is data for each line, so Register multiple records. I feel that ELS records are easy to make if you have a view-like image.

Is the rate (weighting) characteristic of this record? The ranking is usually the highest, and the ranking goes down as the number increases. However, if you try to display this ranking on the Y-axis of a line graph, you have to display a value that is smaller as the value on the Y-axis goes up, which is the opposite of the usual display, which is difficult to express with ELS. is. Reference (or rather my past question): Ranking display in line graph

Therefore, to make it easier to display the graph, the rate is the result of converting the ranking value in the form of weighting.

Rate calculation method:(Total ranking) - (Own ranking value) + 1

If the total number of rankings moves, the graph display will change, so it will be 100 or 1000, and always use a fixed value.

Create a JSON that can be imported into ELS by any method.

JSON

branch	title	author	genre	ranking	rate	execDate
Shinjuku Title A Author An comic-girl	1	3	2018/01/11T13:00:00+09:00
Shinjuku Title B Author and later comic-male	2	2	2018/01/11T13:00:00+09:00
Shinjuku Title C Author U comic-girl	3	1	2018/01/11T13:00:00+09:00
Marunouchi Title C Author U comic-girl	1	3	2018/01/11T13:00:00+09:00
Marunouchi Title A Author Ahn comic-girl	2	2	2018/01/11T13:00:00+09:00
Marunouchi Title D Author Cloth comic-boy	3	1	2018/01/11T13:00:00+09:00
Shinagawa Title B Author and later comic-male	1	3	2018/01/11T13:00:00+09:00
Shinagawa Title C Author U comic-girl	2	2	2018/01/11T13:00:00+09:00
Shinagawa Title E Author at comic-femail	3	1	2018/01/11T13:00:00+09:00

ranking.json


[{"branch":"Shinjuku","title":"Title A","author":"Author Ahn","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Shinjuku","title":"Title B","author":"From the author","genre":"comic-male","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Shinjuku","title":"Title C","author":"Author U","genre":"comic-girl","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Marunouchi","title":"Title C","author":"Author U","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Marunouchi","title":"Title A","author":"Author Ahn","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Marunouchi","title":"Title D","author":"Author clothing","genre":"comic-boy","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Shinagawa","title":"Title B","author":"From the author","genre":"comic-male","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Shinagawa","title":"Title C","author":"Author U","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"Shinagawa","title":"Title E","author":"At the author","genre":"comic-femail","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"}]

Bean

We will also create a ranking information bean corresponding to this.

RankingDataBean.java


public class RankingDataBean extends ElsDataBaseBean {

	//Branch name
	private String branch;
	//Title
	private String title;
	//Author name
	private String author;
	//Genre
	private String genre;
	//Ranking
	private int ranking;
	//rate
	private int rate;
	//Registration date
	private Date execDate;
	
	(Abbreviation)
}

The feature here is that it inherits from ElsDataBaseBean. The contents are like this.

ElsDataBaseBean.java


public class ElsDataBaseBean {
	//index
	private String index;

	(Abbreviation)
}

I have only the index myself. We will create an Importer so that any bean that inherits this bean can be registered.

Import process to ELS

ElsImporter.java


/**
 *Import data to ELS with Bulk
 */
public class ElsImporter<T extends ElsDataBaseBean> {

	private static Logger logger = LoggerFactory.getLogger(ElsImporter.class);

	//Year / month index format(yyyyMM)
	private static DateTimeFormatter YM_INDEX_FORMATTER;

	//Various setting information
	SettingBaseBean setting;

	//Class of data to import
	Class<T> clazz;

	/**
	 *constructor
	 * @param setting Various setting information
	 * @param clazz Class of data to import
	 */
	public ElsImporter(SettingBaseBean setting, Class<T> clazz) {
		this.setting = setting;
		this.clazz = clazz;

		YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
	}

	/**
	 *Perform import
	 */
	public boolean execute() throws Exception {

		logger.info("ElsImporter Import process started-----------------------------------------");

		//Create a client to handle ELS in Java
		// setting.getElasticearch().getAddress():IP address
		// setting.getElasticearch().getPort():port number(Usually 9300)
		TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
				.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(
						setting.getElasticearch().getAddress()), setting.getElasticearch().getPort()));

		//bulk preparation
		BulkRequestBuilder bulkRequest = client.prepareBulk();

		//Gson preparation
		// setting.getElasticearch().getExecDateFormat(): 
		//Registration date format(yyyy/MM/dd'T'HH:mm:ss+09:00)
		Gson gson = new GsonBuilder().setDateFormat(setting.getElasticearch().getExecDateFormat()).create();

		//Target all files in the directory containing JSON
		File dir = new File(setting.getRootDir(), setting.getSubDir().getJsonDir());
		for (File file : dir.listFiles()) {

			logger.debug("■■ Import: "+ file.getName());

			//Convert JSON data to list
			List<T> list = gson.fromJson(FileUtils.readFileToString(file, "UTF-8"), new ListOfSomething<T>(clazz));

			//Get ranking information one by one and add it to bulk
			for (T data : list) {
				//Registration destination index
				// setting.getElasticearch().getIndex():ELS index(common part)
				// setting.getNow():LocalDateTime at the time of processing execution
				//Example 1: setting.getElasticearch().getIndex()	: ranking
				//      data.getIndex()						: comic-girl
				//      setting.getNow()					: 2018/01/18 12:00:00
				//Generate index: ranking-comic-girl-201801
				//Example 2: setting.getElasticearch().getIndex()	: ranking
				//      data.getIndex()						: null
				//      setting.getNow()					: 2018/01/18 12:00:00
				//Generate index: ranking-201801
				String index;
				if (StringUtils.isEmpty(data.getIndex())) {
					//Non-subindexed version
					index = setting.getElasticearch().getIndex()
							+ "-" + YM_INDEX_FORMATTER.format(setting.getNow());
				}
				else {
					//Sub-indexed version
					index = setting.getElasticearch().getIndex()
							+ "-" + data.getIndex()
							+ "-" + YM_INDEX_FORMATTER.format(setting.getNow());
				}
			
				//Add JSON format data to Bulk
				// setting.getElasticearch().getType():ELS type
				bulkRequest.add(client.prepareIndex(index
						, setting.getElasticearch().getType())
					.setSource(gson.toJson(data), XContentType.JSON));
			}
		}

		//Bulk execution
		BulkResponse bulkResponse = bulkRequest.execute().get();

		//Is Bulk failing?(True if failed)
		if (bulkResponse.hasFailures()) {
			logger.error("ElsImporter bulk failed");

			//Quit the client
			client.close();

			return false;
		}

		logger.info("ElsImporter Import process finished-----------------------------------------");

		//Quit the client
		client.close();

		return true;
	}

	/**
	 *Convert objects in a list to the specified type with Gson
	 */
	class ListOfSomething<X> implements ParameterizedType {

	    private Class<?> wrapped;

	    public ListOfSomething(Class<X> wrapped) {
	        this.wrapped = wrapped;
	    }

	    public Type[] getActualTypeArguments() {
	        return new Type[] {wrapped};
	    }

	    public Type getRawType() {
	        return List.class;
	    }

	    public Type getOwnerType() {
	        return null;
	    }

	}
}

Reference: Java Type Generic as Argument for GSON

Importer does not have the data class information itself to be imported. Since it corresponds only to generics and class information, it can be diverted to other than this ranking information. elsdatabasebeanHad inindexActs like a sub-index. kibanaTo display withjava-apiI think that it is good to give it according to the particle size acquired in. The default particle size of the ELS index is logstash, but it is a good idea to consider this based on how long you often try it.

exec.java


ElsImporter<RankingDataBean> importer 
	= new ElsImporter<RankingDataBean>(setting, RankingDataBean.class);
boolean result = importer.execute();

The call is made this way. rankingdatabeanIt's a bit strange to list 3 places, but I gave up because it supports gson.

Impressions

I tried my best to make the Bulk Request import process as general as possible. From the next time onward, I would like to dig into this registered data.

Recommended Posts

Using Elasticsearch Java-API (BulkRequest registration) [Additional notes]
Using Elasticsearch Java-API (Aggregation)