OSM PBF format spark datasource.
Opestreetmap is a literally planet scale database storing features found on a Earth surfaces. Commonly used OSM processing software either uses single host processing or converts and stores OSM data to some database engine and uses it as a source of truth for distributed processing. With Spark abilities to automatically distribute huge dataset over hundreds of nodes and process more data, that a single node could fit, it sounds pretty logical to implement native Spark OSM PBF data source
Supported scala versions are 2.11 and 2.12
<dependency>
<groupId>com.wolt.osm</groupId>
<artifactId>spark-osm-datasource_2.11</artifactId>
<version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.wolt.osm</groupId>
<artifactId>spark-osm-datasource_2.12</artifactId>
<version>0.3.0</version>
</dependency>
compile group: 'com.wolt.osm', name: 'spark-osm-datasource_2.11', version: '0.3.0'
compile group: 'com.wolt.osm', name: 'spark-osm-datasource_2.12', version: '0.3.0'
libraryDependencies += "com.wolt.osm" % "spark-osm-datasource" % "0.3.0"
https://github.com/woltapp/spark-osm-datasource/releases/tag/v0.31.0
OSM PBF data source uses parallel OSM PBF parser internally, therefore it needs to provide it with a input stream. OSM DataSource internally uses HDFS driver, so it will accept any HDFS compatible path, like local file, http link, s3 link etc and convert it to the inputstream. The stream will be opened and read as many times, as many partitions are requested:
val spark = SparkSession
.builder()
.appName("OsmReader")
.config("spark.master", "local[4]")
.getOrCreate()
val osm = spark.read.option("partitions", 256).format(OsmSource.OSM_SOURCE_NAME).load("s3://maps/extract-cesko-brno.osm.pbf").drop("INFO").persist(StorageLevel.MEMORY_AND_DISK)
There are two options for the reader:
partitions
- Number of partitions to split OSM PBF file. Pay attentions, that full planet file is about 1.2TB of uncompressed data, so plan your partitioning accordingly.threads
- Number of threads to use by parallel OSM PBF parser. Keep it1
for a local deployments or set to number of cores per worker node.useLocalFile
- Enables Spark file distribution mechanics, see below.
Another one option is to use Sparks ability to cache file on the nodes and read it locally. With that approach under some circumstances (repeated re-reading of same set of big files in different steps) a better performance can be achieved.
spark.sparkContext.addFile("s3://maps/extract-cesko-brno.osm.pbf")
val osm = spark.read.option("partitions", 256).option("useLocalFile", "true").format(OsmSource.OSM_SOURCE_NAME).load("extract-cesko-brno.osm.pbf")
Please keep in mind, that you should specify just a file name as the load
parameter, not a full path.
Due to Spark processing paradigm, all types of the OSM entities are mapped to the same dataframe of the following structure:
StructField("ID", LongType, nullable = false),
StructField("TAG", MapType(StringType, StringType, valueContainsNull = false), nullable = false),
StructField("INFO", StructType( StructField("UID", IntegerType, nullable = true),
StructField("USERNAME", StringType, nullable = true),
StructField("VERSION", IntegerType, nullable = true),
StructField("TIMESTAMP", LongType, nullable = true),
StructField("CHANGESET", LongType, nullable = true),
StructField("VISIBLE", BooleanType, nullable = false)
), nullable = true),
StructField("TYPE", IntegerType, nullable = false),
StructField("LAT", DoubleType, nullable = true),
StructField("LON", DoubleType, nullable = true),
StructField("WAY", ArrayType(LongType, containsNull = false), nullable = true),
StructField("RELATION", ArrayType(StructType( StructField("ID", LongType, nullable = false),
StructField("ROLE", StringType, nullable = true),
StructField("TYPE", IntegerType, nullable = false)
), containsNull = false), nullable = true)
OSM nodes will have LAT
and LON
fields set and WAY
and RELATION
fields will be null.
OSM nodes will have WAY
fields set and LAT
/LON
and RELATION
fields will be null.
OSM relations will have RELATION
fields set and LAT
/LON
and WAYS
fields will be null.
TYPE
field in both RELATION
structure and in dataframe structure indicates type of the OSM entity. It is an
integer field with following values:
- 0 - Item is a OSM Node entity
- 1 - Item is a OSM Way entity
- 2 - Item is a OSM Relation entity
Reader supports column pruning during read, so dropped columnts will not be loaded from the OSM PBF file. Even more,
if both LAT
and LON
columns will be dropped, no node processing will be started at all. Same applies for
WAY
and RELATION
columnn.
OSM PBF file can be sorted and stored in a ordered way. Unfortunately, due to parallel nature of the Spark, that ordering will be broken during parsing and several consequent dataframe loads may return data in a different order for each run. In case order is important for you, you can sort the dataframe after loading. |
We use SemVer for versioning. For the versions available, see the tags on this repository.
- Denis Chaplygin - Initial work - akashihi
This project is licensed under the GPLv3 License - see the LICENSE file for details.