Skip to content

Commit

Permalink
♻ [REF] Move GPX/KML handling methods to Parsers utils (refs #3947)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chatewgne committed Nov 21, 2024
1 parent 9c6c755 commit f41030e
Show file tree
Hide file tree
Showing 6 changed files with 576 additions and 270 deletions.
4 changes: 2 additions & 2 deletions geotrek/cirkwi/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from django.contrib.gis.geos import Point, MultiPoint, GEOSGeometry
from django.utils.translation import gettext as _

from geotrek.common.utils.parsers import get_geom_from_gpx
from geotrek.trekking.models import DifficultyLevel
from geotrek.cirkwi.models import CirkwiLocomotion
from geotrek.common.parsers import AttachmentParserMixin, GlobalImportError, Parser, RowImportError
from geotrek.tourism.models import TouristicContent, TouristicContentType1
from geotrek.trekking.models import Trek, Practice
from geotrek.trekking.parsers import ApidaeTrekParser


class CirkwiParser(AttachmentParserMixin, Parser):
Expand Down Expand Up @@ -154,7 +154,7 @@ class CirkwiTrekParser(CirkwiParser):

def filter_geom(self, src, val):
response = self.request_or_retry(url=val)
return ApidaeTrekParser._get_geom_from_gpx(response.content)
return get_geom_from_gpx(response.content)

def filter_practice(self, src, val):
"""
Expand Down
152 changes: 149 additions & 3 deletions geotrek/common/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import os
from shutil import copy as copyfile

from django.conf import settings
from django.contrib.gis.geos import Point
from django.test import SimpleTestCase, TestCase, override_settings
from django.test import SimpleTestCase, TestCase
from django.test.utils import override_settings

from ..parsers import Parser
from ..utils import uniquify, format_coordinates, spatial_reference, simplify_coords
from ..utils import (format_coordinates, simplify_coords, spatial_reference,
uniquify)
from ..utils.file_infos import get_encoding_file
from ..utils.import_celery import create_tmp_destination, subclasses
from ..utils.parsers import add_http_prefix
from ..utils.parsers import (add_http_prefix, get_geom_from_gpx,
get_geom_from_kml, maybe_fix_encoding_to_utf8)


class UtilsTest(TestCase):
Expand Down Expand Up @@ -100,3 +105,144 @@ def test_add_http_prefix_without_prefix(self):

def test_add_http_prefix_with_prefix(self):
self.assertEqual('http://test.com', add_http_prefix('http://test.com'))


class GpxToGeomTests(SimpleTestCase):

@staticmethod
def _get_gpx_from(filename):
with open(filename, 'r') as f:
gpx = f.read()
return bytes(gpx, 'utf-8')

def test_gpx_with_waypoint_can_be_converted(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/apidae_test_trek.gpx')

geom = get_geom_from_gpx(gpx)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 13)
first_point = geom.coords[0]
self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1)
self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1)

def test_gpx_with_route_points_can_be_converted(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_route_points.gpx')

geom = get_geom_from_gpx(gpx)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 13)
first_point = geom.coords[0]
self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1)
self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1)

def test_it_raises_an_error_on_not_continuous_segments(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_not_continuous_segments.gpx')

with self.assertRaises(ValueError):
get_geom_from_gpx(gpx)

def test_it_handles_segment_with_single_point(self):
gpx = self._get_gpx_from(
'geotrek/trekking/tests/data/apidae_trek_parser/trace_with_single_point_segment.gpx'
)
geom = get_geom_from_gpx(gpx)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 13)

def test_it_raises_an_error_when_no_linestring(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_feature.gpx')

with self.assertRaises(ValueError):
get_geom_from_gpx(gpx)

def test_it_handles_multiple_continuous_features(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features.gpx')
geom = get_geom_from_gpx(gpx)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 12)
first_point = geom.coords[0]
self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1)
self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1)

def test_it_handles_multiple_continuous_features_with_one_empty(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features_and_one_empty.gpx')
geom = get_geom_from_gpx(gpx)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 12)
first_point = geom.coords[0]
self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1)
self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1)

def test_it_raises_error_on_multiple_not_continuous_features(self):
gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_not_continuous_features.gpx')
with self.assertRaises(ValueError):
get_geom_from_gpx(gpx)


class KmlToGeomTests(SimpleTestCase):

@staticmethod
def _get_kml_from(filename):
with open(filename, 'r') as f:
kml = f.read()
return bytes(kml, 'utf-8')

def test_kml_can_be_converted(self):
kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace.kml')

geom = get_geom_from_kml(kml)

self.assertEqual(geom.srid, 2154)
self.assertEqual(geom.geom_type, 'LineString')
self.assertEqual(len(geom.coords), 61)
first_point = geom.coords[0]
self.assertAlmostEqual(first_point[0], 973160.8, delta=0.1)
self.assertAlmostEqual(first_point[1], 6529320.1, delta=0.1)

def test_it_raises_exception_when_no_linear_data(self):
kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_line.kml')

with self.assertRaises(ValueError):
get_geom_from_kml(kml)


class TestConvertEncodingFiles(TestCase):
data_dir = "geotrek/trekking/tests/data"

def setUp(self):
if not os.path.exists(settings.TMP_DIR):
os.mkdir(settings.TMP_DIR)

def test_fix_encoding_to_utf8(self):
file_name = f'{settings.TMP_DIR}/file_bad_encoding_tmp.kml'
copyfile(f'{self.data_dir}/file_bad_encoding.kml', file_name)

encoding = get_encoding_file(file_name)
self.assertNotEqual(encoding, "utf-8")

new_file_name = maybe_fix_encoding_to_utf8(file_name)

encoding = get_encoding_file(new_file_name)
self.assertEqual(encoding, "utf-8")

def test_not_fix_encoding_to_utf8(self):
file_name = f'{settings.TMP_DIR}/file_good_encoding_tmp.kml'
copyfile(f'{self.data_dir}/file_good_encoding.kml', file_name)

encoding = get_encoding_file(file_name)
self.assertEqual(encoding, "utf-8")

new_file_name = maybe_fix_encoding_to_utf8(file_name)

encoding = get_encoding_file(new_file_name)
self.assertEqual(encoding, "utf-8")
121 changes: 121 additions & 0 deletions geotrek/common/utils/parsers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,126 @@
import codecs
import os
from datetime import datetime
from tempfile import NamedTemporaryFile

from django.conf import settings
from django.contrib.gis.gdal import DataSource
from django.contrib.gis.geos import MultiLineString
from django.utils.translation import gettext as _

from geotrek.common.utils.file_infos import get_encoding_file


def add_http_prefix(url):
if url.startswith('http'):
return url
else:
return 'http://' + url


def maybe_fix_encoding_to_utf8(file_name):
encoding = get_encoding_file(file_name)

# If not utf-8, convert file to utf-8
if encoding != "utf-8":
tmp_file_path = os.path.join(settings.TMP_DIR, 'fileNameTmp_' + str(datetime.now().timestamp()))
BLOCKSIZE = 9_048_576
with codecs.open(file_name, "r", encoding) as sourceFile:
with codecs.open(tmp_file_path, "w", "utf-8") as targetFile:
while True:
contents = sourceFile.read(BLOCKSIZE)
if not contents:
break
targetFile.write(contents)
os.replace(tmp_file_path, file_name)
return file_name


def get_geom_from_gpx(data):
def convert_to_geos(geom):
# FIXME: is it right to try to correct input geometries?
# FIXME: how to log that info/spread errors?
if geom.geom_type == 'MultiLineString' and any([ls for ls in geom if ls.num_points == 1]):
# Handles that framework conversion fails when there are LineStrings of length 1
geos_mls = MultiLineString([ls.geos for ls in geom if ls.num_points > 1])
geos_mls.srid = geom.srid
return geos_mls

return geom.geos

def get_layer(datasource, layer_name):
for layer in datasource:
if layer.name == layer_name:
return layer

def maybe_get_linestring_from_layer(layer):
if layer.num_feat == 0:
return None
geoms = []
for feat in layer:
if feat.geom.num_coords == 0:
continue
geos = convert_to_geos(feat.geom)
if geos.geom_type == 'MultiLineString':
geos = geos.merged # If possible we merge the MultiLineString into a LineString
if geos.geom_type == 'MultiLineString':
raise ValueError(
_("Feature geometry cannot be converted to a single continuous LineString feature"))
geoms.append(geos)

full_geom = MultiLineString(geoms)
full_geom.srid = geoms[0].srid
full_geom = full_geom.merged # If possible we merge the MultiLineString into a LineString
if full_geom.geom_type == 'MultiLineString':
raise ValueError(
_("Geometries from various features cannot be converted to a single continuous LineString feature"))

return full_geom

"""Given GPX data as bytes it returns a geom."""
# FIXME: is there another way than the temporary file? It seems not. `DataSource` really expects a filename.
with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf:
ntf.write(data)
ntf.flush()

file_path = maybe_fix_encoding_to_utf8(ntf.name)
ds = DataSource(file_path)
for layer_name in ('tracks', 'routes'):
layer = get_layer(ds, layer_name)
geos = maybe_get_linestring_from_layer(layer)
if geos:
break
else:
raise ValueError("No LineString feature found in GPX layers tracks or routes")
geos.transform(settings.SRID)
return geos


def get_geom_from_kml(data):
"""Given KML data as bytes it returns a geom."""

def get_geos_linestring(datasource):
layer = datasource[0]
geom = get_first_geom_with_type_in(types=['MultiLineString', 'LineString'], geoms=layer.get_geoms())
geom.coord_dim = 2
geos = geom.geos
if geos.geom_type == 'MultiLineString':
geos = geos.merged
return geos

def get_first_geom_with_type_in(types, geoms):
for g in geoms:
for t in types:
if g.geom_type.name.startswith(t):
return g
raise ValueError('The attached KML geometry does not have any LineString or MultiLineString data')

with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf:
ntf.write(data)
ntf.flush()

file_path = maybe_fix_encoding_to_utf8(ntf.name)
ds = DataSource(file_path)
geos = get_geos_linestring(ds)
geos.transform(settings.SRID)
return geos
Loading

0 comments on commit f41030e

Please sign in to comment.