[JAVA] Datenverknüpfung mit Spark und Cassandra

Einführung

Bei der Datenanalyse gibt es häufig Optionen, die in Kombination mit Apache Spark + Cassandra implementiert werden können.

Was ist Apache Spark?

Apache Spark ist ein sehr bekanntes Datenanalysetool. image.png

Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.

RDD (Resilient Distributed Dataset), DataFrame und DataSet. .. ..

Quelle: https://spark.apache.org/

Was ist Cassandra?

Cassandra ist eine NoSQL-breite Spaltendatenbank. image.png

Manage massive amounts of data, fast, without losing sleep

Quelle: http://cassandra.apache.org/

Insbesondere haben wir von Anfang an die Skalierbarkeit berücksichtigt, sodass das Clustering einfach ist.

Beispiel zum Speichern von CSV-Dateidaten in Cassandra

Spark hat verschiedene Funktionen, aber lassen Sie uns ein Beispiel erstellen, um CSV in Cassandra zu speichern.

Erstellen Sie eine Beispieldatei mit dem Namen users.csv

image.png

Einführung einer Bibliothek in ein Gradle-Projekt

build.gradle


dependencies {
	// https://mvnrepository.com/artifact/org.scala-lang/scala-library
	compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-core
	compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
	compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
	
	// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
	compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
	compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'

}

Speichern Sie es von CSV in Cassandra und versuchen Sie, es von DB zu erhalten.

CsvReader.java


package com.test.spark;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class CsvReader {

	private static final Logger logger = Logger.getLogger(CsvReader.class);

	public static void main(String[] args) {

		//Funkeneinstellungen
		SparkConf conf = new SparkConf();
		conf.setAppName("CSVReader");
		conf.setMaster("local[*]");
		conf.set("spark.cassandra.connection.host", "192.168.10.248");
		conf.set("spark.cassandra.connection.port", "9042");

		//Cassandra-Schlüsselraum und Tabellenname
		String keyspace = "sample";
		String tableUser = "user";
		String userCsv = "C:\\data\\spark\\users.csv";

		JavaSparkContext sc = new JavaSparkContext(conf);
		try {
			SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
					.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();

			//Cassandra-Verbindung
			CassandraConnector connector = CassandraConnector.apply(sc.getConf());
			
			try (Session session = connector.openSession()) {
				//Löschen Sie den Schlüsselraum, falls vorhanden
				session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
				
				//Erstellen Sie einen Schlüsselraum
				session.execute("CREATE KEYSPACE " + keyspace
						+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
				
				//Erstellen Sie eine Tabelle
				session.execute("CREATE TABLE " + keyspace + "." + tableUser
						+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
			}

			//Holen Sie sich Daten von CSV
			//Die Spalte AS ist auch wichtig, um mit der Tabellendefinition übereinzustimmen
			Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
					.option("encoding", "UTF-8").load(userCsv).select(new Column("Benutzeridentifikation").as("user_id"),
							new Column("Vollständiger Name").as("user_name"), 
							new Column("Mail Adresse").as("email_address"),
							new Column("Bemerkungen").as("memo"));

			//Speichern Sie zu Cassandra
			csv.write().format("org.apache.spark.sql.cassandra")
					.option("header", "true")
					.option("keyspace", keyspace)
					.option("table", tableUser)
					.option("column", "user_id")
					.option("column", "user_name")
					.option("column", "email_address")
					.option("column", "memo")
					.mode(SaveMode.Append)
					.save();

			//Lesen Sie Daten von Cassandra
			Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
					.option("keyspace", keyspace)
					.option("table", tableUser).load();
			
			//Holen Sie sich ein Array aus einem Datensatz
			List<Row> asList = dataset.collectAsList();
			for (Row r : asList) {
				logger.info(r);
			}
		} catch (Exception e) {
			logger.error(e);
		} finally {
			sc.stop();
			sc.close();
		}
	}
}

Cassandra-Daten

image.png

Von JAVA erfasste Benutzerdaten

19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 Jahre nach dem Eintritt in das Unternehmen,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3. Jahr nach dem Eintritt in das Unternehmen,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5. Jahr nach dem Eintritt in das Unternehmen,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1. Jahr nach dem Eintritt in das Unternehmen,Yamada Taro]

Detaillierte Materialien wie grundlegende Vorgänge finden Sie im Handbuch. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

das ist alles

Recommended Posts

Datenverknüpfung mit Spark und Cassandra
Kompatibilität von Spring JDBC und My Batis mit Spring Data JDBC (vorläufig)
[Maschinelles Lernen mit Apache Spark] Sparse Vector (spärlicher Vektor) und Dense Vector (dichter Vektor)
Fensteraggregation von Sensordaten mit Apache Flink und Java 8
Einfacher Mikroservice mit Spark Framework!
Basisdatentyp und Referenztyp
Java-Programmierung (Variablen und Daten)
Rails Posts und User Linkage
URLSession mit URLSession und Normal kombinieren
Führen Sie logstash mit Docker aus und versuchen Sie, Daten in Elastic Cloud hochzuladen
Das Erstellen einer REST-API mit Spring JPA-Daten mit REST und Lombok ist unglaublich einfach.