diff --git a/msc_pygeoapi/loader/thunderstorm_outlook.py b/msc_pygeoapi/loader/thunderstorm_outlook.py index 2459527d..378524f3 100644 --- a/msc_pygeoapi/loader/thunderstorm_outlook.py +++ b/msc_pygeoapi/loader/thunderstorm_outlook.py @@ -107,52 +107,68 @@ def generate_geojson_features(self): data = json.load(f)['features'] features = [] - for feature in data: - - # flatten metobject properties - metobj = feature['properties']['metobject'] - for item, values in metobj.items(): - try: - metobj_flat_item = self.flatten_json(item, values, - 'metobject') - feature['properties'].update(metobj_flat_item) - filename = f.name.split('/')[-1] - file_id = re.sub(r'_v\d+\.json', '', filename) - feature['properties']['file_id'] = file_id - except Exception as err: - msg = f'Error while flattening Thunderstorm JSON {err}' - LOGGER.error(f'{msg}') - pass - - del feature['properties']['metobject'] - - feat_exp_datetime = feature['properties']['expiration_datetime'] - exp_datetime = datetime.strptime(feat_exp_datetime, - DATETIME_FORMAT) - - if exp_datetime > datetime.now(): - features.append(feature) - - # check if id is already in ES and if amendment is +=1 - amendment = features[0]['properties']['amendment'] - file_id = features[0]['properties']['file_id'] - is_newer = self.check_if_newer(file_id, amendment) - - if is_newer['update']: - for outlook in features: - action = { - '_id': outlook['properties']['id'], - '_index': INDEX_NAME, - '_op_type': 'update', - 'doc': outlook, - 'doc_as_upsert': True, + filename = f.name.split('/')[-1] + file_id = re.sub(r'_v\d+\.json', '', filename) + + if len(data) > 0: + for feature in data: + + # flatten metobject properties + metobj = feature['properties']['metobject'] + for item, values in metobj.items(): + try: + metobj_flat_item = self.flatten_json(item, + values, + 'metobject') + feature['properties'].update(metobj_flat_item) + feature['properties']['file_id'] = file_id + except Exception as err: + msg = f'Error while flattening Thunderstorm JSON {err}' + LOGGER.error(f'{msg}') + pass + + del feature['properties']['metobject'] + f_exp_datetime = feature['properties']['expiration_datetime'] + exp_datetime = datetime.strptime(f_exp_datetime, + DATETIME_FORMAT) + + if exp_datetime > datetime.now(): + features.append(feature) + + # check if id is already in ES and if amendment is +=1 + amendment = features[0]['properties']['amendment'] + is_newer = self.check_if_newer(file_id, amendment) + + if is_newer['update']: + for outlook in features: + action = { + '_id': outlook['properties']['id'], + '_index': INDEX_NAME, + '_op_type': 'update', + 'doc': outlook, + 'doc_as_upsert': True, + } + + yield action + + for id_ in is_newer['id_list']: + self.conn.Elasticsearch.delete(index=INDEX_NAME, + id=id_) + else: + LOGGER.warning(f'empty thunderstorm outlook json in {filename}') + + version = re.search(r'v(\d+)\.json$', filename).group(1) + if int(version) > 1: + # we need to delete the associated outlooks + query = { + "query": { + "match": { + "properties.file_id": file_id + } + } } - - yield action - - for id_ in is_newer['id_list']: - self.conn.Elasticsearch.delete(index=INDEX_NAME, - id=id_) + self.conn.Elasticsearch.delete_by_query(index=INDEX_NAME, + body=query) def flatten_json(self, key, values, parent_key=''): """