Lorsque je traite des données GeoSpatial en Python, j'utilise souvent GeoPandas pour les données de petite à moyenne taille, mais il y a des limites lorsqu'il s'agit de données à grande échelle. .. Ensuite, il semble que la fonction d'extension de PostgreSQL PostGIS soit souvent utilisée, et s'il s'agit de NoSQL, le type Geometry peut être utilisé même dans MongoDB, par exemple. Oui (référence), mais j'ai démarré une base de données, préparé un schéma et créé une table ou une collection. C'est assez gênant, et je m'intéresse à ce domaine, mais je n'y ai pas encore touché.
Personnellement, j'utilise souvent pyspark pour le traitement de données à grande échelle, puis-je faire quelque chose avec ça? Quand je cherchais, j'ai trouvé quelque chose qui s'appelait GeoSpark.
Il semble que la librairie soit encore en développement, mais comme vous pouvez le voir, dans 2020-07-19 (* date de rédaction de l'article: 2020-08-08), sous le nom d'Apache Sedona, [Apache Incubator](https: //) Il est enregistré sur incubator.apache.org/). (Je ne suis pas très familier avec cela, mais il semble que s'il prend un bon départ, il deviendra un projet Apache formel. De plus, étant donné que la direction passera du côté Apache, je pense que les documents du référentiel ci-dessus seront également déplacés bientôt. .)
Il semble assez intéressant de pouvoir traiter des données avec un sentiment relativement proche des pandas et des géopandas (cela peut être utilisé avec un sentiment approprié sans conception de schéma) tout en ayant la possibilité de traiter des données à grande échelle, j'ai donc joué un peu avec. est.
Environnement à portée de main: Linux(Ubuntu20.04) (Bien que omis, vous pouvez le faire dans presque la même atmosphère sous Windows)
J'ai préparé l'environnement Python avec pyenv
+ miniconda3
(c'est-à-dire conda), mais je pense que tout va bien.
Par exemple, préparez le fichier YAML suivant:
create_env.yml
name: geospark_demo
channels:
- conda-forge
- defaults
dependencies:
- python==3.7.*
- pip
- jupyterlab # for test
- pyspark==2.4.*
- openjdk==8.*
- pyarrow
- pandas
- geopandas
- folium # for test
- matplotlib # for test
- descartes # for test
- pip:
- geospark
--folium
, matplotlib
, descartes
, jupyterlab
ne sont pas nécessaires pour geospark, mais ils sont inclus à des fins de visualisation pour les tests.
--pyspark
et java8
ne sont pas nécessaires si vous avez préparé les vôtres
geospark (1.3.1)
est 2.2 - jusqu'à 2.4 séries au moment de la rédaction (août 2020). geospark-sql-python / # apache-spark), donc pyspark
spécifie la série 2.4avec ça
conda env create -f create_env.yml
#Entrez dans l'environnement virtuel créé
conda activate geospark_demo
Ensuite, vous pouvez créer un environnement virtuel conda nommé geospark_demo
.
(Pour divers ajustements tels que les packages et les noms d'environnement virtuel, reportez-vous à here, par exemple. )
(Je pense que vous pouvez faire la même chose sans utiliser conda)
Dans l'exemple ci-dessus (en utilisant l'environnement virtuel conda), les paramètres PATH
et JAVA_HOME
seront effectués sans autorisation, mais certaines variables d'environnement supplémentaires doivent être définies.
Tout d'abord, geospark fait parfois référence à SPARK_HOME
en interne, alors définissez l'emplacement d'installation d'Apache Spark avec la variable d'environnement.
De plus, lorsqu'Apache Spark est installé avec conda etc. comme dans cet exemple, il peut être difficile de savoir où se trouve le corps principal de Spark, par exemple, ici comment trouver le répertoire d'installation des étincelles)
#Vérifiez l'emplacement d'installation d'Apache Spark
echo 'sc.getConf.get("spark.home")' | spark-shell
# SPARK_Paramètres HOME
export SPARK_HOME=<Le chemin qui est sorti ci-dessus>
Réglez comme ça. Je me sens comme SPARK_HOME = / home / <username> /. Pyenv / versions / miniconda3-latest / envs / geospark_demo / lib / python3.7 / site-packages / pyspark
.
De plus, si la version installée de pyarrow
est 0.15 ou ultérieure, [ici](https://spark.apache.org/docs/2.4.6/sql-pyspark-pandas-with-arrow.html#compatibiliy" -réglage-pour-pyarrow - 0150-et-spark-23x-24x)
export ARROW_PRE_0_15_IPC_FORMAT=1
Il est nécessaire de définir (paramètre requis pour la série pyspark 2.x).
Vous pouvez également spécifier pyarrow = = 0.14. *
Pour l'installer.
C'est gênant de le faire à la main, donc je l'écris personnellement dans un fichier et le rend source
, ou le configure avec Docker en utilisant ʻENV` etc.
Jupyter Notebook pour python et les données de test nécessaires (stockées dans python / data /
) sont placés sur Official GitHub Alors utilisez-les pour vous assurer que cela fonctionne bien.
Par exemple
#Déplacer vers le répertoire de travail
cd /path/to/workdir
#Télécharger le notebook depuis github
wget https://raw.githubusercontent.com/DataSystemsLab/GeoSpark/master/python
/GeoSparkCore.ipynb
wget https://raw.githubusercontent.com/DataSystemsLab/GeoSpark/master/python/GeoSparkSQL.ipynb
#Téléchargez uniquement des répertoires spécifiques depuis github en utilisant svn
svn export https://github.com/DataSystemsLab/GeoSpark/trunk/python/data/
Vous pouvez l'obtenir comme ça. Vous pouvez télécharger le répertoire depuis GitHub en utilisant svn ici ou ici. Je l'ai mentionné.
Ensuite, lancez jupyter lab ou jupyter notebook et exécutez ↑ notebook. Je pense que ce sera une référence pour le type d'atmosphère que vous pouvez utiliser lors de la vérification du fonctionnement.
Le cahier et le document officiel Tutoriel utilisés dans le contrôle de fonctionnement de ↑ sont plus utiles, mais c'est un gros problème. Je jouerai un peu moi-même.
Utilisez les [National Municipal Boundary Data] d'esri Japan (https://www.esrij.com/products/japan-shp/).
Si vous cliquez sur "Télécharger le fichier" à la destination du lien et cochez "J'accepte", vous pouvez obtenir le fichier shp sous la forme de japan_ver821.zip
, alors décompressez-le dans le répertoire de travail et placez-le.
Essayez ce qui suit:
Ci-dessous, nous avons confirmé l'opération sur jupyterlab.
#Importer les bibliothèques requises
import os
import folium
import geopandas as gpd
from pyspark.sql import SparkSession
from geospark.register import GeoSparkRegistrator
from geospark.utils import GeoSparkKryoRegistrator, KryoSerializer
from geospark.register import upload_jars
#Générer une session Spark
upload_jars()
spark = SparkSession.builder.\
master("local[*]").\
appName("TestApp").\
config("spark.serializer", KryoSerializer.getName).\
config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName) .\
getOrCreate()
GeoSparkRegistrator.registerAll(spark)
sdf_japan = spark.createDataFrame(
#Lisez les données téléchargées sur les limites des villes / quartiers / villages / quartiers nationaux d'esri Japan avec les géopandas
gpd.read_file("japan_ver821/japan_ver821.shp")
)
#Vérification
sdf_japan.show(5)
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# |JCODE| KEN| SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# |01101|Hokkaido|Bureau de promotion d'Ishikari|null|Sapporo|Chuo-ku|Sapporo-shi, Chuo-ku|235449.0|141734.0|POLYGON ((141.342...|
# |01102|Hokkaido|Bureau de promotion d'Ishikari|null|Sapporo|Kita Ward|Sapporo-shi, Kita-ku|286112.0|151891.0|POLYGON ((141.408...|
# |01103|Hokkaido|Bureau de promotion d'Ishikari|null|Sapporo|Quartier Higashi|Sapporo-shi, Higa...|261777.0|142078.0|POLYGON ((141.446...|
# |01104|Hokkaido|Bureau de promotion d'Ishikari|null|Sapporo|Quartier Shiraishi|Sapporo-shi, Shir...|212671.0|122062.0|POLYGON ((141.465...|
# |01105|Hokkaido|Bureau de promotion d'Ishikari|null|Sapporo|Quartier Toyohei|Sapporo-shi, Toyo...|222504.0|126579.0|POLYGON ((141.384...|
# +-----+------+----------+----+------+----------+--------------------+--------+--------+--------------------+
# only showing top 5 rows
#Enregistrer en tant que fichier (snappy par défaut).format parquet)
sdf_japan.write.save("esri_japan")
#Enregistrer au format de table de ruche (le fichier réel est accrocheur par défaut).parquet)
spark.sql("CREATE DATABASE IF NOT EXISTS geo_test") #Non requis, mais création de base de données
sdf_japan.write.saveAsTable("geo_test.esri_japan") #Géo de la base de données_Table esri sur test_Sauver au Japon
↑ Vous pouvez changer format
et compression
avec les options save et saveAsTable, et il semble que vous pouvez aussi sauvegarder avec zlib.orc
et json.gzip
. (Mis à part à quel point c'est heureux)
#Lecture de fichier
#Spécifiez le répertoire dans lequel le fichier réel est enregistré. Lors de l'enregistrement dans un format autre que le parquet, spécifiez le format dans l'option de chargement.
sdf_from_file = spark.read.load("esri_japan")
sdf_from_file.show(5)
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |32207|Préfecture de Shimane| null| null| null|Ville d'Ezu| Gotsu-shi|23664.0|11513.0|MULTIPOLYGON (((1...|
# |32209|Préfecture de Shimane| null| null| null|Ville du Yunnan| Unnan-shi|38479.0|13786.0|MULTIPOLYGON (((1...|
# |32343|Préfecture de Shimane| null|Nita-gun| null|Ville d'Oku Izumo| Okuizumo-cho|12694.0| 4782.0|POLYGON ((133.078...|
# |32386|Préfecture de Shimane| null|Pistolet Iiishi| null|Ville d'Iinan| Iinan-cho| 4898.0| 2072.0|POLYGON ((132.678...|
# |32441|Préfecture de Shimane| null|Echi-gun| null|Ville de Kawamoto|Kawamoto-machi| 3317.0| 1672.0|POLYGON ((132.487...|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# only showing top 5 rows
#lecture de table
sdf_from_table = spark.table("geo_test.esri_japan") #Spécifiez le nom de la table à lire
sdf_from_table.show(5)
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# |32207|Préfecture de Shimane| null| null| null|Ville d'Ezu| Gotsu-shi|23664.0|11513.0|MULTIPOLYGON (((1...|
# |32209|Préfecture de Shimane| null| null| null|Ville du Yunnan| Unnan-shi|38479.0|13786.0|MULTIPOLYGON (((1...|
# |32343|Préfecture de Shimane| null|Nita-gun| null|Ville d'Oku Izumo| Okuizumo-cho|12694.0| 4782.0|POLYGON ((133.078...|
# |32386|Préfecture de Shimane| null|Pistolet Iiishi| null|Ville d'Iinan| Iinan-cho| 4898.0| 2072.0|POLYGON ((132.678...|
# |32441|Préfecture de Shimane| null|Echi-gun| null|Ville de Kawamoto|Kawamoto-machi| 3317.0| 1672.0|POLYGON ((132.487...|
# +-----+------+-----+------+------+----------+--------------+-------+-------+--------------------+
# only showing top 5 rows
Il a été confirmé qu'il peut être enregistré et lu sous forme de fichier simple ou sous forme de tableau. De plus, j'ai senti que la conversion de geopandas en DataFrame de pyspark était lente, j'ai donc pensé qu'il valait mieux minimiser le nombre de conversions de geopandas <-> pyspark.
※dtypes
sdf = sdf_from_file #Ci-dessous, celui lu dans le fichier est appelé sdf.
display(sdf.dtypes)
# [('JCODE', 'string'),
# ('KEN', 'string'),
# ('SICHO', 'string'),
# ('GUN', 'string'),
# ('SEIREI', 'string'),
# ('SIKUCHOSON', 'string'),
# ('CITY_ENG', 'string'),
# ('P_NUM', 'double'),
# ('H_NUM', 'double'),
# ('geometry', 'udt')]
Il semble que geometry
soit traité comme un type ʻudt` défini dans la bibliothèque geospark.
(Donc, si vous essayez de lire un fichier ou une table sans paramètres geospark, vous obtiendrez une erreur)
Voir notamment dans la documentation officielle:
#Enregistrez le DataFrame en tant que TEMP VIEW afin de pouvoir utiliser Spark SQL
sdf.createOrReplaceTempView('esri_japan')
#Confirmer le nombre de données d'origine
sdf.count() # 1907
#Longitude: 135-140, latitude: 35-Filtrer dans la gamme de 40
sdf_filtered = spark.sql("""
SELECT * FROM esri_japan
WHERE ST_Contains(ST_PolygonFromEnvelope(135., 35., 140., 40.), esri_japan.geometry)
""")
sdf_filtered.show(5)
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# |JCODE| KEN|SICHO| GUN|SEIREI|SIKUCHOSON| CITY_ENG| P_NUM| H_NUM| geometry|
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# |06401|Préfecture de Yamagata| null|Nishioki-gun| null|Ville d'Oguni| Oguni-machi| 7612.0|3076.0|POLYGON ((139.911...|
# |06426|Préfecture de Yamagata| null|Canon Higashidagawa| null|Ville de Mikawa| Mikawa-machi| 7400.0|2387.0|POLYGON ((139.842...|
# |07364|Préfecture de Fukushima| null|Minami Aizu-gun| null|Village de Hiedagi| Hinoemata-mura| 557.0| 202.0|POLYGON ((139.259...|
# |07367|Préfecture de Fukushima| null|Minami Aizu-gun| null|Ville de Tadami| Tadami-machi| 4366.0|1906.0|POLYGON ((139.366...|
# |07368|Préfecture de Fukushima| null|Minami Aizu-gun| null|Ville de Minami Aizu|Minamiaizu-machi|15679.0|6707.0|POLYGON ((139.530...|
# +-----+------+-----+--------+------+----------+----------------+-------+------+--------------------+
# only showing top 5 rows
sdf_filtered.count() # 573 <- original: 1907
Le nombre de DataFrames a également diminué (1907-> 573), et il semble que le filtre a été correctement complété, mais je vais le visualiser et le vérifier au cas où.
# matplotlib
gdf_filtered = gpd.GeoDataFrame( #Convertir en géopandas
sdf_filtered.toPandas(),
geometry='geometry'
)
gdf_filtered.plot()
(résultat du tracé)
À propos, si vous tracez l'intégralité du fichier japan_ver821.shp d'origine
gpd.read_file('japan_ver821/japan_ver821.shp') \
.plot()
Il semble donc que le filtrage soit fait correctement.
Vous pouvez également utiliser folium
pour une visualisation interactive.
m = folium.Map(
location=(37.5, 137.5), #Notez que l'ordre est la latitude et la longitude.
zoom_start=7,
control_scale=True,
)
m.add_child(folium.LatLngPopup()) #Cliquez pour vérifier la latitude et la longitude dans une fenêtre contextuelle
#Convertissez le DataFrame filtré en GeoJSON et transmettez-le au folium
m.add_child(
folium.GeoJson(gdf_filtered.to_json())
)
folium.LayerControl().add_to(m) #Ajout de LayerControl
m.save('df_filterd.html') #sauvegarder
m #Affiché sur jupyter
J'ai également pu le visualiser sur le folium.
―― Je n'étais pas très content cette fois car je viens de faire une requête très simple sur de petites données, mais c'était intéressant de pouvoir créer une atmosphère de type PostGIS sur Spark.
Recommended Posts