When dealing with GeoSpatial data in Python, I often use GeoPandas for small to medium-sized data, but there are limits when dealing with large-scale data. .. Then, it seems that the extension function of PostgreSQL PostGIS is often used, and if it is NoSQL, Geometry type can be used even in MongoDB, for example. Yes (reference), but I started a database, prepared a schema, and created a table or collection. Is quite troublesome, and I'm interested in this area, but I haven't touched it yet.
Personally, I often use pyspark for large-scale data processing, so can I do something with this? When I was looking for, I found something called GeoSpark.
-Official documentation -Official GitHub
It seems that the library is still developing, but as you can see, in 2020-07-19 (* article writing date: 2020-08-08) under the name of Apache Sedona [Apache Incubator](https: // It is registered in incubator.apache.org/). (I'm not very familiar with it, but it seems that if it gets off to a good start, it will become a formal Apache project. Also, since management will move to the Apache side, I think that the above repository documents will also be moved soon. .)
It seems to be quite interesting to be able to process data with a feeling relatively close to pandas and geopandas (you can use it with an appropriate feeling without schema design) while having the possibility of processing large-scale data, so I played with it a little. is.
Environment at hand: Linux(Ubuntu20.04) (Although omitted, you can make it in almost the same atmosphere on Windows)
I prepared the Python environment with pyenv
+ miniconda3
(that is, conda), but I think anything is fine.
For example, prepare the following YAML file:
create_env.yml
name: geospark_demo
channels:
- conda-forge
- defaults
dependencies:
- python==3.7.*
- pip
- jupyterlab # for test
- pyspark==2.4.*
- openjdk==8.*
- pyarrow
- pandas
- geopandas
- folium # for test
- matplotlib # for test
- descartes # for test
- pip:
- geospark
--folium
, matplotlib
, descartes
, jupyterlab
are not required for geospark, but they are included for the purpose of visualization for testing.
--pyspark
and java8
are unnecessary if you have prepared your own
--In addition, the version of Apache Spark supported by geospark (1.3.1)
is 2.2 --up to 2.4 series at the time of writing (August 2020). geospark-sql-python / # apache-spark), so pyspark
specifies 2.4 series
with this
conda env create -f create_env.yml
#Enter the created virtual environment
conda activate geospark_demo
Then you can create a conda virtual environment named geospark_demo
.
(For various adjustments such as package and virtual environment name, refer to here etc. )
(I think you can do the same thing without using conda)
In the above example (using conda virtual environment), PATH
setting and JAVA_HOME
will be done without permission, but some additional environment variables need to be set.
First of all, geospark sometimes refers to SPARK_HOME
internally, so set the installation location of Apache Spark with environment variables.
In addition, when Apache Spark is installed with conda etc. as in this example, it may be difficult to know where the main body of Spark is, so for example, here how-to-find-sparks-installation-directory)
#Check the installation location of Apache Spark
echo 'sc.getConf.get("spark.home")' | spark-shell
# SPARK_HOME settings
export SPARK_HOME=<The path that came out above>
Set like this. The author looks like SPARK_HOME = / home / <user name> /. Pyenv / versions / miniconda3-latest / envs / geospark_demo / lib / python3.7 / site-packages / pyspark
.
Also, if the installed pyarrow
version is 0.15 or later, [here](https://spark.apache.org/docs/2.4.6/sql-pyspark-pandas-with-arrow.html#compatibiliy" -setting-for-pyarrow--0150-and-spark-23x-24x)
export ARROW_PRE_0_15_IPC_FORMAT=1
It is necessary to set (setting required for pyspark 2.x series).
Alternatively, specify pyarrow = = 0.14. *
to install.
Since it is troublesome to do this by hand, I personally write it in a file and make it source
, or set it with Docker using ʻENV` etc.
Jupyter Notebook for python and necessary test data (stored in python / data /
) are placed on Official GitHub So use them to make sure it works fine.
For example
#Move to working directory
cd /path/to/workdir
#Download notebook from github
wget https://raw.githubusercontent.com/DataSystemsLab/GeoSpark/master/python
/GeoSparkCore.ipynb
wget https://raw.githubusercontent.com/DataSystemsLab/GeoSpark/master/python/GeoSparkSQL.ipynb
#Download only specific directories from github using svn
svn export https://github.com/DataSystemsLab/GeoSpark/trunk/python/data/
You can get it like this. To download the directory from GitHub using svn, here and here I referred to.
After that, launch jupyter lab or jupyter notebook and execute the notebook ↑. I think this will be a reference for what kind of atmosphere you can use while checking the operation.
The notebook and official document Tutorial used in the operation check of ↑ are more useful, but it's a big deal. I will play with it myself.
Use esri Japan's National City Data.
If you click "Download File" at the link destination and check "Agree", you can get the shp file in the form of japan_ver821.zip
, so unzip it in the working directory.
Try the following:
Below, we have confirmed the operation on jupyterlab.
#Import required libraries
import os
import folium
import geopandas as gpd
from pyspark.sql import SparkSession
from geospark.register import GeoSparkRegistrator
from geospark.utils import GeoSparkKryoRegistrator, KryoSerializer
from geospark.register import upload_jars
#Generate spark session
upload_jars()
spark = SparkSession.builder.\
master("local[*]").\
appName("TestApp").\
config("spark.serializer", KryoSerializer.getName).\
config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName) .\
getOrCreate()
GeoSparkRegistrator.registerAll(spark)
sdf_japan = spark.createDataFrame(
#Load the downloaded esri Japan national city / ward / town / village boundary data with geopandas
gpd.read_file("japan_ver821/japan_ver821.shp")
)
#Verification
sdf_japan.show(5)
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# |JCODE| KEN| SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# |01101|Hokkaido|Ishikari Promotion Bureau|null|Sapporo|Chuo-ku|Sapporo-shi, Chuo-ku|235449.0|141734.0|POLYGON ((141.342...|
# |01102|Hokkaido|Ishikari Promotion Bureau|null|Sapporo|Kita Ward|Sapporo-shi, Kita-ku|286112.0|151891.0|POLYGON ((141.408...|
# |01103|Hokkaido|Ishikari Promotion Bureau|null|Sapporo|Higashi Ward|Sapporo-shi, Higa...|261777.0|142078.0|POLYGON ((141.446...|
# |01104|Hokkaido|Ishikari Promotion Bureau|null|Sapporo|Shiroishi Ward|Sapporo-shi, Shir...|212671.0|122062.0|POLYGON ((141.465...|
# |01105|Hokkaido|Ishikari Promotion Bureau|null|Sapporo|Toyohira Ward|Sapporo-shi, Toyo...|222504.0|126579.0|POLYGON ((141.384...|
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# only showing top 5 rows
#Save as file (snappy by default).parquet format)
sdf_japan.write.save("esri_japan")
#Save in hive table format (actual file is snappy by default).parquet)
spark.sql("CREATE DATABASE IF NOT EXISTS geo_test") #Not required, but database creation
sdf_japan.write.saveAsTable("geo_test.esri_japan") #Database geo_Table on test esri_Save as japan
↑ You can change format
and compression
with the save and saveAsTable options, and it seems that you can also save with zlib.orc
and json.gzip
. (Aside from how happy it is)
#File reading
#Specify the directory where the actual file is saved. When saving in a format other than parquet, specify the format in the load option.
sdf_from_file = spark.read.load("esri_japan")
sdf_from_file.show(5)
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |32207|Shimane Prefecture| null| null| null|Gotsu City| Gotsu-shi|23664.0|11513.0|MULTIPOLYGON (((1...|
# |32209|Shimane Prefecture| null| null| null|Unnan City| Unnan-shi|38479.0|13786.0|MULTIPOLYGON (((1...|
# |32343|Shimane Prefecture| null|Nita-gun| null|Okuizumo Town| Okuizumo-cho|12694.0| 4782.0|POLYGON ((133.078...|
# |32386|Shimane Prefecture| null|Iishi-gun| null|Iinan Town| Iinan-cho| 4898.0| 2072.0|POLYGON ((132.678...|
# |32441|Shimane Prefecture| null|Ochi-gun| null|Kawamoto Town|Kawamoto-machi| 3317.0| 1672.0|POLYGON ((132.487...|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# only showing top 5 rows
#table read
sdf_from_table = spark.table("geo_test.esri_japan") #Specify the table name to read
sdf_from_table.show(5)
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |32207|Shimane Prefecture| null| null| null|Gotsu City| Gotsu-shi|23664.0|11513.0|MULTIPOLYGON (((1...|
# |32209|Shimane Prefecture| null| null| null|Unnan City| Unnan-shi|38479.0|13786.0|MULTIPOLYGON (((1...|
# |32343|Shimane Prefecture| null|Nita-gun| null|Okuizumo Town| Okuizumo-cho|12694.0| 4782.0|POLYGON ((133.078...|
# |32386|Shimane Prefecture| null|Iishi-gun| null|Iinan Town| Iinan-cho| 4898.0| 2072.0|POLYGON ((132.678...|
# |32441|Shimane Prefecture| null|Ochi-gun| null|Kawamoto Town|Kawamoto-machi| 3317.0| 1672.0|POLYGON ((132.487...|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# only showing top 5 rows
It was confirmed that it can be saved and read as a simple file or in a table format. In addition, I felt that the conversion from geopandas to DataFrame of pyspark was slow, so I thought that it was better to minimize the conversion of geopandas <-> pyspark.
※dtypes
sdf = sdf_from_file #Below, the one read from the file is referred to as sdf.
display(sdf.dtypes)
# [('JCODE', 'string'),
# ('KEN', 'string'),
# ('SICHO', 'string'),
# ('GUN', 'string'),
# ('SEIREI', 'string'),
# ('SIKUCHOSON', 'string'),
# ('CITY_ENG', 'string'),
# ('P_NUM', 'double'),
# ('H_NUM', 'double'),
# ('geometry', 'udt')]
It seems that geometry
is treated as a ʻudt` type defined in the geospark library.
(So, if you try to read a file or table without geospark settings, you will get an error)
See especially from the official documentation:
#Register DataFrame as TEMP VIEW so that you can use spark sql
sdf.createOrReplaceTempView('esri_japan')
#Confirm the number of original data
sdf.count() # 1907
#Longitude: 135-140, latitude: 35-Filter in the range of 40
sdf_filtered = spark.sql("""
SELECT * FROM esri_japan
WHERE ST_Contains(ST_PolygonFromEnvelope(135., 35., 140., 40.), esri_japan.geometry)
""")
sdf_filtered.show(5)
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# |06401|Yamagata Prefecture| null|Nishiokitama District| null|Oguni Town| Oguni-machi| 7612.0|3076.0|POLYGON ((139.911...|
# |06426|Yamagata Prefecture| null|Higashitagawa-gun| null|Mikawa Town| Mikawa-machi| 7400.0|2387.0|POLYGON ((139.842...|
# |07364|Fukushima Prefecture| null|Minamiaizu-gun| null|Hinoemata Village| Hinoemata-mura| 557.0| 202.0|POLYGON ((139.259...|
# |07367|Fukushima Prefecture| null|Minamiaizu-gun| null|Tadami Town| Tadami-machi| 4366.0|1906.0|POLYGON ((139.366...|
# |07368|Fukushima Prefecture| null|Minamiaizu-gun| null|Minamiaizu Town|Minamiaizu-machi|15679.0|6707.0|POLYGON ((139.530...|
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# only showing top 5 rows
sdf_filtered.count() # 573 <- original: 1907
The number of DataFrames has also decreased (1907-> 573), and it seems that the filter has been completed properly, but just in case, let's visualize it and check it.
# matplotlib
gdf_filtered = gpd.GeoDataFrame( #Convert to geopandas
sdf_filtered.toPandas(),
geometry='geometry'
)
gdf_filtered.plot()
(plot result)
By the way, if you plot the entire original japan_ver821.shp
gpd.read_file('japan_ver821/japan_ver821.shp') \
.plot()
So it seems that filtering is done properly.
You can also use folium
for interactive visualization.
m = folium.Map(
location=(37.5, 137.5), #Note that the order is latitude and longitude.
zoom_start=7,
control_scale=True,
)
m.add_child(folium.LatLngPopup()) #Click to check the latitude and longitude in a pop-up
#Convert filtered DataFrame to GeoJSON and pass it to folium
m.add_child(
folium.GeoJson(gdf_filtered.to_json())
)
folium.LayerControl().add_to(m) #Added LayerControl
m.save('df_filterd.html') #Save
m #Display on jupyter
I was able to visualize it on the folium as well.
――I wasn't very happy this time because I just made a very simple query to small data, but it was interesting to be able to create a PostGIS-like atmosphere on Spark. --The part to be visualized in the latter half was not related to geospark with geopandas + matplotlib and folium, but Function for visualization (GeoSpark Viz) ), So I hope I can try it when I have time. ――As I wrote in the beginning, it seems that future development can be expected because I entered Apache Incubator, but specifications have been changed and package names have been changed, so what I wrote this time may not be usable immediately. .. ..
Recommended Posts