This article is the 18th day of NTT TechnoCross Advent Calnder 2020.
Hi, my name is @yuyhiraka (Hirakawa) from NTT TechnoCross. I am usually in charge of virtualization/container/cloud infrastructure and PoC of advanced technology per small network.
The content of this article is personal and has nothing to do with your organization.
I tried to see if Koalas and Elasticsearch could work together. There are people in the world who are trying to link Apache Spark TM and Elasticsearch, so it will be an application verification of operation.
-Spark on elasticsearch-hadoop trial -Process Elasticsearch data with Apache Spark -Simple procedure to build Elasticsearch cluster and create Index in Spark
Elasticsearch is a search engine, database and ecosystem for search and analysis.
-What is Elasticsearch? -Elastic Stack Subscription & Deployment Support -NTT TechnoCross-
Apache Spark TM is a distributed processing framework for fast big data. It also supports Python and is especially called PySpark.
-Apache Spark ™ --What is Apache Spark? Introducing it to those who are new to distributed processing
pandas is a powerful data analysis library for Python.
Koalas is a wrapper library that enables pandas-like data manipulation with Apache Spark TM . Apache Spark TM has a concept similar to Pandas DataFrame called Spark Dataset/DataFrame, but it gets confused when converting objects between pandas ⇔ Spark Dataset/DataFrame because various APIs are different. .. Koalas is the approach to solve it.
Since the same version is distributed, it seems that Elasticsearch 7.10 and Hadoop ecosystem (including Apache Spark TM , Koalas) can be linked by using Elasticsearch-Hadoop plugin (elasticsearch-hadoop 7.10). is. On the other hand, as of December 2020, it seems that Elasticsearch and Apache Spark TM 3.0.x cannot be linked using the Elasticsearch-Hadoop plugin.
Therefore, this time, I will use Apache Spark TM 2.4.7, which meets the following conditions.
--Supported version of Koalas --Elasticsearch-Supported version of Hadoop plugin
-Elastic Product EOL/End of Support Date
--Ubuntu 20.04 LTS on VirtualBox 6.1.10. --vCPU 6 core
# docker version
Client: Docker Engine - Community
Version: 20.10.0
Building an environment for verification To save operation, create a container image with PySpark 2.4.7 and JupyterLab installed by referring to Build an Image with a Different Version of Spark.
# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
-t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
--build-arg spark_version=2.4.7 \
--build-arg hadoop_version=2.7 \
--build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
--build-arg openjdk_version=8
Create a Dockerfile to install the Elasticsearch-Hadoop plugin and Koalas on the base image above. However, PySpark 2.4 does not work with Python 3.8.x as it is As a countermeasure, create a conda virtual environment of Python 3.7.x.
# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y
Create a container image using the created Dockerfile.
# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/
Since it is a large container image, get it first.
# docker pull elasticsearch:7.10.1
Create the required directories and docker-compose.yaml.
# mkdir /opt/es
# mkdir /opt/koalas-spark/
#Loosen permissions so they can be accessed from the container(Omission)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'
services:
elasticsearch:
image: elasticsearch:7.10.1
container_name: elasticsearch
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
volumes:
- /opt/es/:/usr/share/elasticsearch/data
networks:
- devnet
koalas-spark:
build: ./koalas-spark
container_name: koalas-spark
working_dir: '/home/jovyan/work/'
tty: true
volumes:
- /opt/koalas-spark/:/home/jovyan/work/
networks:
- devnet
networks:
devnet:
Launch a Koalas container and an Elasticsearch container using Docker Compose. Also, make sure that the Elasticsearch container has started successfully.
# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
Check the container ID of the Koalas container from the list of running containers. Then specify the container ID and enter the Koalas container. Another solution is to use docker-compose exec
.
# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e33681a37aea root_koalas-spark "tini -g -- start-no…" 2 minutes ago Up 2 minutes 8888/tcp koalas-spark
fe65e3351bea elasticsearch:7.10.1 "/tini -- /usr/local…" 16 minutes ago Up 16 minutes 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch
# docker exec -it e33681a37aea bash
Use the curl command to check the communication from the Koalas container to the Elasticsearch container.
$ curl -X GET http://elasticsearch:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
We will continue to work with the Koalas container. Switch to the Python 3.7 environment, start PySpark (IPython), and write data to Elasticsearch.
This time, I am creating 4 rows and 4 columns of data using the Spark RDD function. I converted it to Spark DataFrame once, and then converted it to Koalas DataFrame.
$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
esURL = "elasticsearch"
rdd1 = sc.parallelize([
Row(col1=1, col2=1, col3=1, col4=1),
Row(col1=2, col2=2, col3=2, col4=2),
Row(col1=3, col2=3, col3=3, col4=3),
Row(col1=4, col2=4, col3=4, col4=4)
])
df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)
kdf1.to_spark_io(path="sample/test",
format="org.elasticsearch.spark.sql",
options={"es.nodes.wan.only": "false",
"es.port": 9200,
"es.net.ssl": "false",
"es.nodes": esURL},
mode="Overwrite")
Exit from PySpark (IPython) with Ctrl + D, etc. Then, make sure that Elasticsearch contains the data.
curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "sample",
"_type" : "test",
"_id" : "kaTbZXYBpKFycpUDLgjO",
"_score" : 1.0,
"_source" : {
"col1" : 4,
"col2" : 4,
"col3" : 4,
"col4" : 4
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "kKTbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 2,
"col2" : 2,
"col3" : 2,
"col4" : 2
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "j6TbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 3,
"col2" : 3,
"col3" : 3,
"col4" : 3
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "jqTbZXYBpKFycpUDLgjD",
"_score" : 1.0,
"_source" : {
"col1" : 1,
"col2" : 1,
"col3" : 1,
"col4" : 1
}
}
]
}
}
As mentioned above, we were able to confirm that data can be input from Koalas to Elasticsearch using the Elasticsearch-Hadoop plugin.
Initial assumption
PySpark (IPython) ⇒ JupyterLab
Docker Compose ⇒ Kubernetes
I was planning to verify it in, but I couldn't afford to spend time on something that wasn't essential, so I compromised this time.
Probably, even if it is carried out at Jupyter Lab/Kubernetes, data can be input from Koalas to Elasticsearch without any problem, so I would like to try it in the future. Also, because there are many requests It seems that it will be supported in the near future, but I strongly hope that the Elasticsearch-Hadoop plugin will be available in Apache Spark TM 3.0.x. I will.
Tomorrow is an article by @ y-ohnuki on NTT TechnoCross Advent Calnder 2020. looking forward to!
Recommended Posts