From b50a23464ba281f67f8a3b964c1744dd1df7f698 Mon Sep 17 00:00:00 2001 From: Adrian Klink Date: Thu, 8 Mar 2018 22:07:44 +0100 Subject: [PATCH] LayerToGeotiff / MultibandLayerToGeotiff use foreachPartition with serial hadoopConfiguration --- .../spark/examples/LayerToGeotiff.scala | 40 +++++++++++++------ .../examples/MultibandLayerToGeotiff.scala | 39 ++++++++++++------ 2 files changed, 54 insertions(+), 25 deletions(-) diff --git a/src/main/scala/biggis/landuse/spark/examples/LayerToGeotiff.scala b/src/main/scala/biggis/landuse/spark/examples/LayerToGeotiff.scala index 7629e92..2370ff8 100644 --- a/src/main/scala/biggis/landuse/spark/examples/LayerToGeotiff.scala +++ b/src/main/scala/biggis/landuse/spark/examples/LayerToGeotiff.scala @@ -97,20 +97,34 @@ object LayerToGeotiff extends LazyLogging { //.tileToLayout(metadata.cellType, metadata.layout, Utils.RESAMPLING_METHOD) //.repartition(Utils.RDD_PARTITIONS) - /* - outputRdd.foreachPartition{ partition => - partition.map(_.write(new Path("hdfs://..."), serConf.value)) - } // */ - outputRdd.foreach(mbtile => { - val (key, tile) = mbtile - val (col, row) = (key.col, key.row) - val tileextent: Extent = metadata.layout.mapTransform(key) - val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") - logger info s" writing: '${filename.toString}'" - GeoTiff(tile, tileextent, crs) - .write(filename, serConf.value) + val useSerializedHadoopConfig = true + if(useSerializedHadoopConfig){ + // ToDo: test Spark Cluster version + outputRdd.foreachPartition { partition => + partition.foreach { tuple => + val (key, tile) = tuple + val (col, row) = (key.col, key.row) + val tileextent: Extent = metadata.layout.mapTransform(key) + val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") + logger info s" writing: '${filename.toString}'" + GeoTiff(tile, tileextent, crs) + .write(filename, serConf.value) + } + } + } else { + // only for local debugging - do not use in cloud // ToDo: delete after testing + outputRdd.foreach(mbtile => { + val (key, tile) = mbtile + val (col, row) = (key.col, key.row) + val tileextent: Extent = metadata.layout.mapTransform(key) + //val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") + //logger info s" writing: '${filename.toString}'" + GeoTiff(tile, tileextent, crs) + //.write(filename.toString) //.write(filename, serConf.value) + .write(outputPath + "_" + col + "_" + row + ".tif") + } + ) } - ) } //sc.stop() diff --git a/src/main/scala/biggis/landuse/spark/examples/MultibandLayerToGeotiff.scala b/src/main/scala/biggis/landuse/spark/examples/MultibandLayerToGeotiff.scala index da8b22b..ddf88d3 100644 --- a/src/main/scala/biggis/landuse/spark/examples/MultibandLayerToGeotiff.scala +++ b/src/main/scala/biggis/landuse/spark/examples/MultibandLayerToGeotiff.scala @@ -153,19 +153,34 @@ object MultibandLayerToGeotiff extends LazyLogging{ //.repartition(myRDD_PARTITIONS) //.tileToLayout(myMetadata.cellType, myMetadata.layout, myRESAMPLING_METHOD) - /* - outputRdd.foreachPartition{ partition => - partition.map(_.write(new Path("hdfs://..."), serConf.value)) - } // */ - outputRdd.foreach(mbtile => { - val (key, tile) = mbtile - val (col, row) = (key.col, key.row) - val tileextent: Extent = metadata.layout.mapTransform(key) - val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") - MultibandGeoTiff(tile, tileextent, crs) - .write(filename, serConf.value) + val useSerializedHadoopConfig = true + if (useSerializedHadoopConfig) { + // ToDo: test Spark Cluster version + outputRdd.foreachPartition { partition => + partition.foreach { tuple => + val (key, tile) = tuple + val (col, row) = (key.col, key.row) + val tileextent: Extent = metadata.layout.mapTransform(key) + val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") + logger info s" writing: '${filename.toString}'" + MultibandGeoTiff(tile, tileextent, crs) + .write(filename, serConf.value) + } + } + } else { + // only for local debugging - do not use in cloud // ToDo: delete after testing + outputRdd.foreach(mbtile => { + val (key, tile) = mbtile + val (col, row) = (key.col, key.row) + val tileextent: Extent = metadata.layout.mapTransform(key) + //val filename = new Path(outputPath + "_" + col + "_" + row + ".tif") + //logger info s" writing: '${filename.toString}'" + MultibandGeoTiff(tile, tileextent, crs) + //.write(filename.toString) //.write(filename, serConf.value) + .write(outputPath + "_" + col + "_" + row + ".tif") + } + ) } - ) } ////val raster: Raster[MultibandTile] = tile.reproject(metadata.extent, metadata.crs, metadata.crs)