Bei der Datenanalyse gibt es häufig Optionen, die in Kombination mit Apache Spark + Cassandra implementiert werden können.
Apache Spark ist ein sehr bekanntes Datenanalysetool.
Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.
RDD (Resilient Distributed Dataset), DataFrame und DataSet. .. ..
Quelle: https://spark.apache.org/
Cassandra ist eine NoSQL-breite Spaltendatenbank.
Manage massive amounts of data, fast, without losing sleep
Quelle: http://cassandra.apache.org/
Insbesondere haben wir von Anfang an die Skalierbarkeit berücksichtigt, sodass das Clustering einfach ist.
Spark hat verschiedene Funktionen, aber lassen Sie uns ein Beispiel erstellen, um CSV in Cassandra zu speichern.
build.gradle
dependencies {
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'
}
CsvReader.java
package com.test.spark;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CsvReader {
private static final Logger logger = Logger.getLogger(CsvReader.class);
public static void main(String[] args) {
//Funkeneinstellungen
SparkConf conf = new SparkConf();
conf.setAppName("CSVReader");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", "192.168.10.248");
conf.set("spark.cassandra.connection.port", "9042");
//Cassandra-Schlüsselraum und Tabellenname
String keyspace = "sample";
String tableUser = "user";
String userCsv = "C:\\data\\spark\\users.csv";
JavaSparkContext sc = new JavaSparkContext(conf);
try {
SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();
//Cassandra-Verbindung
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
//Löschen Sie den Schlüsselraum, falls vorhanden
session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
//Erstellen Sie einen Schlüsselraum
session.execute("CREATE KEYSPACE " + keyspace
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
//Erstellen Sie eine Tabelle
session.execute("CREATE TABLE " + keyspace + "." + tableUser
+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
}
//Holen Sie sich Daten von CSV
//Die Spalte AS ist auch wichtig, um mit der Tabellendefinition übereinzustimmen
Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
.option("encoding", "UTF-8").load(userCsv).select(new Column("Benutzeridentifikation").as("user_id"),
new Column("Vollständiger Name").as("user_name"),
new Column("Mail Adresse").as("email_address"),
new Column("Bemerkungen").as("memo"));
//Speichern Sie zu Cassandra
csv.write().format("org.apache.spark.sql.cassandra")
.option("header", "true")
.option("keyspace", keyspace)
.option("table", tableUser)
.option("column", "user_id")
.option("column", "user_name")
.option("column", "email_address")
.option("column", "memo")
.mode(SaveMode.Append)
.save();
//Lesen Sie Daten von Cassandra
Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", tableUser).load();
//Holen Sie sich ein Array aus einem Datensatz
List<Row> asList = dataset.collectAsList();
for (Row r : asList) {
logger.info(r);
}
} catch (Exception e) {
logger.error(e);
} finally {
sc.stop();
sc.close();
}
}
}
19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 Jahre nach dem Eintritt in das Unternehmen,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3. Jahr nach dem Eintritt in das Unternehmen,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5. Jahr nach dem Eintritt in das Unternehmen,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1. Jahr nach dem Eintritt in das Unternehmen,Yamada Taro]
Detaillierte Materialien wie grundlegende Vorgänge finden Sie im Handbuch. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
das ist alles
Recommended Posts