In data analysis, there are often options to implement in combination with Apache Spark + Cassandra.
Apache Spark is a very famous data analysis tool.
Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.
RDD (Resilient Distributed Dataset), DataFrame, DataSet, etc. .. ..
Source: https://spark.apache.org/
Cassandra is a NoSQL wide column database.
Manage massive amounts of data, fast, without losing sleep
Source: http://cassandra.apache.org/
In particular, we have considered scalability from the beginning, so clustering is easy.
Spark has various functions, but let's create a sample to save CSV in Cassandra.
build.gradle
dependencies {
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'
}
CsvReader.java
package com.test.spark;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CsvReader {
private static final Logger logger = Logger.getLogger(CsvReader.class);
public static void main(String[] args) {
//Spark settings
SparkConf conf = new SparkConf();
conf.setAppName("CSVReader");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", "192.168.10.248");
conf.set("spark.cassandra.connection.port", "9042");
//Cassandra keyspace and table name
String keyspace = "sample";
String tableUser = "user";
String userCsv = "C:\\data\\spark\\users.csv";
JavaSparkContext sc = new JavaSparkContext(conf);
try {
SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();
//Cassandra connection
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
//Delete keyspace if it exists
session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
//Create keyspace
session.execute("CREATE KEYSPACE " + keyspace
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
//Create a table
session.execute("CREATE TABLE " + keyspace + "." + tableUser
+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
}
//Get data from CSV
//Column AS is also important to match the table definition
Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
.option("encoding", "UTF-8").load(userCsv).select(new Column("User ID").as("user_id"),
new Column("Full name").as("user_name"),
new Column("mail address").as("email_address"),
new Column("Remarks").as("memo"));
//Save to Cassandra
csv.write().format("org.apache.spark.sql.cassandra")
.option("header", "true")
.option("keyspace", keyspace)
.option("table", tableUser)
.option("column", "user_id")
.option("column", "user_name")
.option("column", "email_address")
.option("column", "memo")
.mode(SaveMode.Append)
.save();
//Read data from Cassandra
Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", tableUser).load();
//Get an array from a dataset
List<Row> asList = dataset.collectAsList();
for (Row r : asList) {
logger.info(r);
}
} catch (Exception e) {
logger.error(e);
} finally {
sc.stop();
sc.close();
}
}
}
19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 years after joining the company,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3rd year after joining the company,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5th year after joining the company,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1st year after joining the company,Yamada Taro]
Detailed materials such as basic operations can be found in the guide. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
that's all
Recommended Posts