Dans l'analyse des données, il existe souvent des options à implémenter en combinaison avec Apache Spark + Cassandra.
Apache Spark est un outil d'analyse de données très connu.
Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.
RDD (Resilient Distributed Dataset), DataFrame et DataSet. .. ..
Source: https://spark.apache.org/
Cassandra est une base de données de colonnes NoSQL.
Manage massive amounts of data, fast, without losing sleep
Source: http://cassandra.apache.org/
En particulier, nous avons considéré l'évolutivité depuis le début, de sorte que le clustering est facile.
Spark a diverses fonctions, mais créons un exemple pour enregistrer le CSV dans Cassandra.
build.gradle
dependencies {
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'
}
CsvReader.java
package com.test.spark;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CsvReader {
private static final Logger logger = Logger.getLogger(CsvReader.class);
public static void main(String[] args) {
//Paramètres Spark
SparkConf conf = new SparkConf();
conf.setAppName("CSVReader");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", "192.168.10.248");
conf.set("spark.cassandra.connection.port", "9042");
//Espace de clés et nom de table Cassandra
String keyspace = "sample";
String tableUser = "user";
String userCsv = "C:\\data\\spark\\users.csv";
JavaSparkContext sc = new JavaSparkContext(conf);
try {
SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();
//Connexion Cassandra
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
//Supprimer l'espace de clés s'il existe
session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
//Créer un espace de clés
session.execute("CREATE KEYSPACE " + keyspace
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
//Créer une table
session.execute("CREATE TABLE " + keyspace + "." + tableUser
+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
}
//Obtenir des données à partir de CSV
//La colonne AS est également importante pour correspondre à la définition de la table
Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
.option("encoding", "UTF-8").load(userCsv).select(new Column("Identifiant d'utilisateur").as("user_id"),
new Column("Nom complet").as("user_name"),
new Column("adresse mail").as("email_address"),
new Column("Remarques").as("memo"));
//Sauvegarder à Cassandra
csv.write().format("org.apache.spark.sql.cassandra")
.option("header", "true")
.option("keyspace", keyspace)
.option("table", tableUser)
.option("column", "user_id")
.option("column", "user_name")
.option("column", "email_address")
.option("column", "memo")
.mode(SaveMode.Append)
.save();
//Lire les données de Cassandra
Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", tableUser).load();
//Obtenir un tableau à partir d'un ensemble de données
List<Row> asList = dataset.collectAsList();
for (Row r : asList) {
logger.info(r);
}
} catch (Exception e) {
logger.error(e);
} finally {
sc.stop();
sc.close();
}
}
}
19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 ans après avoir rejoint l'entreprise,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3ème année après avoir rejoint l'entreprise,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5e année après avoir rejoint l'entreprise,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1ère année après avoir rejoint l'entreprise,Yamada Taro]
Des informations détaillées telles que les opérations de base peuvent être trouvées dans le guide. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
c'est tout
Recommended Posts