[JAVA] Liaison de données avec Spark et Cassandra

introduction

Dans l'analyse des données, il existe souvent des options à implémenter en combinaison avec Apache Spark + Cassandra.

Qu'est-ce qu'Apache Spark?

Apache Spark est un outil d'analyse de données très connu. image.png

Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.

RDD (Resilient Distributed Dataset), DataFrame et DataSet. .. ..

Source: https://spark.apache.org/

Qu'est-ce que Cassandra

Cassandra est une base de données de colonnes NoSQL. image.png

Manage massive amounts of data, fast, without losing sleep

Source: http://cassandra.apache.org/

En particulier, nous avons considéré l'évolutivité depuis le début, de sorte que le clustering est facile.

Exemple pour enregistrer les données de fichier CSV dans Cassandra

Spark a diverses fonctions, mais créons un exemple pour enregistrer le CSV dans Cassandra.

Créez un exemple de fichier appelé users.csv

image.png

Présentation d'une bibliothèque à un projet Gradle

build.gradle


dependencies {
	// https://mvnrepository.com/artifact/org.scala-lang/scala-library
	compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-core
	compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
	compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
	
	// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
	compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
	compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'

}

Enregistrez-le de CSV vers Cassandra et essayez de l'obtenir à partir de DB.

CsvReader.java


package com.test.spark;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class CsvReader {

	private static final Logger logger = Logger.getLogger(CsvReader.class);

	public static void main(String[] args) {

		//Paramètres Spark
		SparkConf conf = new SparkConf();
		conf.setAppName("CSVReader");
		conf.setMaster("local[*]");
		conf.set("spark.cassandra.connection.host", "192.168.10.248");
		conf.set("spark.cassandra.connection.port", "9042");

		//Espace de clés et nom de table Cassandra
		String keyspace = "sample";
		String tableUser = "user";
		String userCsv = "C:\\data\\spark\\users.csv";

		JavaSparkContext sc = new JavaSparkContext(conf);
		try {
			SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
					.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();

			//Connexion Cassandra
			CassandraConnector connector = CassandraConnector.apply(sc.getConf());
			
			try (Session session = connector.openSession()) {
				//Supprimer l'espace de clés s'il existe
				session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
				
				//Créer un espace de clés
				session.execute("CREATE KEYSPACE " + keyspace
						+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
				
				//Créer une table
				session.execute("CREATE TABLE " + keyspace + "." + tableUser
						+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
			}

			//Obtenir des données à partir de CSV
			//La colonne AS est également importante pour correspondre à la définition de la table
			Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
					.option("encoding", "UTF-8").load(userCsv).select(new Column("Identifiant d'utilisateur").as("user_id"),
							new Column("Nom complet").as("user_name"), 
							new Column("adresse mail").as("email_address"),
							new Column("Remarques").as("memo"));

			//Sauvegarder à Cassandra
			csv.write().format("org.apache.spark.sql.cassandra")
					.option("header", "true")
					.option("keyspace", keyspace)
					.option("table", tableUser)
					.option("column", "user_id")
					.option("column", "user_name")
					.option("column", "email_address")
					.option("column", "memo")
					.mode(SaveMode.Append)
					.save();

			//Lire les données de Cassandra
			Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
					.option("keyspace", keyspace)
					.option("table", tableUser).load();
			
			//Obtenir un tableau à partir d'un ensemble de données
			List<Row> asList = dataset.collectAsList();
			for (Row r : asList) {
				logger.info(r);
			}
		} catch (Exception e) {
			logger.error(e);
		} finally {
			sc.stop();
			sc.close();
		}
	}
}

Données Cassandra

image.png

Données utilisateur acquises par JAVA

19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 ans après avoir rejoint l'entreprise,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3ème année après avoir rejoint l'entreprise,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5e année après avoir rejoint l'entreprise,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1ère année après avoir rejoint l'entreprise,Yamada Taro]

Des informations détaillées telles que les opérations de base peuvent être trouvées dans le guide. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

c'est tout

Recommended Posts

Liaison de données avec Spark et Cassandra
Compatibilité de Spring JDBC et My Batis avec Spring Data JDBC (provisoire)
[Apprentissage automatique avec Apache Spark] Vecteur fragmenté (vecteur fragmenté) et vecteur dense (vecteur dense)
Agrégation de fenêtres de données de capteurs avec Apache Flink et Java 8
Micro service facile avec Spark Framework!
Type de données de base et type de référence
Programmation Java (variables et données)
Poteaux Rails et liaison utilisateur
URLSession avec URLSession et Combine normalement
Exécutez logstash avec Docker et essayez de télécharger des données sur Elastic Cloud
Créer une API REST avec Spring JPA Data avec REST et Lombok incroyablement facile.