diff --git a/noa-harvester/CHANGELOG.md b/noa-harvester/CHANGELOG.md index 965aefb..b83be28 100644 --- a/noa-harvester/CHANGELOG.md +++ b/noa-harvester/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.9.0] - 2024-11-21 +### Added +- Added publishing to kafka topic the successful/failed id lists (#75) +- Added consuming from kafka topic, getting id lists for download (#75) +- Added new cli command (noa_harvester_service) (#75) + +### Changed +- Downloading from id (products table, column id) now returns successful and failed lists (#75) + ## [0.8.0] - 2024-11-08 ### Added - Support for uuid list download of CDSE. Gateway CLI support and postgres read/update (#67) diff --git a/noa-harvester/README.md b/noa-harvester/README.md index a7d62a3..53be61e 100644 --- a/noa-harvester/README.md +++ b/noa-harvester/README.md @@ -16,6 +16,8 @@ It can be used as a standalone cli application or be built in a Docker container The noaharvester processor can be executed as: - Standalone [**Cli application**](#standalone-cli-execution) or - Inside a [**Container**](#docker-execution) +- As a container, inside a Kubernetes environment with kafka, with a postgres database. This is a Beyond specific setup, where a user can instantiate Harvester and request the download of Products, based on an id from the postgres Products table. This table includes a uuid and a title fields, necessary to construct the request to CDSE (for now). Then, it updates the products table for the downloaded path, while it posts to a kafka topic the result for each of these ids. +- As a microservice inside a Kubernetes environment with kafka, with a postgres database. Same as above, but now it can be deployed as a service: always listening to the appointed kafka topic for uuid lists, inside a specific message. In either case, a user must have credentials for accessing one or more data hubs: - [Copernicus] @@ -200,12 +202,14 @@ Cli can be executed with the following: - Commands * `download` - The main option. Downloads according with the config file parameters. - * `from-uuid-list` - Download from uuid (e.g. id in Sentinel 2 metadata products) list. Needs to be combined with -u option. Necessary a db connection (TODO: optional) + * `from-uuid-list` - Download from uuid db list. Needs to be combined with -u option. Necessary a db connection (TODO: optional) + * `noa_harvester_service` - Deploy a service always listening to a specific kafka topic (can also be defined in config file - look at config/config_service.json). * `query` - Queries the collection(s) for products according to the parameters present in the config file. * `describe` (Copernicus only) - Describes the available query parameters for the collections as defined in the config file. - Options * `--output_path` (download only) Custom download location. Default is `.data` - * `-u, --uuid` [**multiple**] (from-uuid-list only). Multiple option of uuids. + * `-u, --uuid` [**multiple**] (from-uuid-list only). Multiple option of uuids. + * `-t, --test` **Service only**: testing kafka consumer functionality. * `-bb, --bbox_only` Draw total bbox, not individual polygons in multipolygon shapefile. * `-v`, `--verbose` Shows the progress indicator when downloading (Copernicus - only for download command) * `--log LEVEL (INFO, DEBUG, WARNING, ERROR)` Shows the logs depending on the selected `LEVEL` @@ -224,7 +228,7 @@ The necessary env vars are: `DB_PORT` `DB_NAME` -Moreover, Harvester will query the db to get the UUID (to query based on the input uuid) and Title of the product to be downloaded (it does not query CDSE for metadata - it only downloads). +Moreover, Harvester will query the db to get the UUID of the Product to be downloaded, and Title (it does not query CDSE for metadata - it only downloads). So make sure that a postgres with a table named "Products", includes at least a `uuid` field and a `name` field. ## Examples @@ -237,15 +241,23 @@ docker run -it \ noaharvester describe config/config.json ``` -* Download (with download indicator) from Copernicus providing a uuid list and store in mnt point: +* Download (with download indicator) from Copernicus providing an id list (which corresponds to an entry in Products db table) and store in mnt point: ``` docker run -it \ --v ./config/config.json:/app/config/config.json \ +-v ./config/config.json:/app/config/config_from_id.json \ -v /mnt/data:/app/data \ -noaharvester from-uuid-list -v -u caf8620d-974d-5841-b315-7489ffdd853b config/config.json +noaharvester from-uuid-list -v -u caf8620d-974d-5841-b315-7489ffdd853b config/config_from_id.json ``` +* Deploying Harvester as a service (for kafka testing - if you do not want to test, omit flag -t): + +``` +docker run -it \ +-v ./config/config.json:/app/config/config_service.json \ +-v /mnt/data:/app/data \ +noaharvester noa-harvester-service -v -t config/config_service.json +``` * Download (with download indicator) from Copernicus and Earthdata as defined in the config file, for an area provided by the shapefile files (`area.shp` and `area.prj`) located in folder `/home/user/project/strange_area`: diff --git a/noa-harvester/config/config_from_uri.json b/noa-harvester/config/config_from_id.json similarity index 100% rename from noa-harvester/config/config_from_uri.json rename to noa-harvester/config/config_from_id.json diff --git a/noa-harvester/config/config_service.json b/noa-harvester/config/config_service.json new file mode 100644 index 0000000..28889bb --- /dev/null +++ b/noa-harvester/config/config_service.json @@ -0,0 +1,6 @@ +{ + "group_id": "harvester-group-request", + "topics": ["harvester.order.requested"], + "num_partitions": 2, + "replication_factor": 3 +} \ No newline at end of file diff --git a/noa-harvester/docker-compose.yml b/noa-harvester/docker-compose.yml index 1dd62fc..c16840c 100644 --- a/noa-harvester/docker-compose.yml +++ b/noa-harvester/docker-compose.yml @@ -14,6 +14,8 @@ services: - DB_HOST - DB_PORT - DB_NAME + environment: + KAFKA_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092,kafka3:9092" working_dir: /app volumes: - ./config:/app/config diff --git a/noa-harvester/noaharvester/__init__.py b/noa-harvester/noaharvester/__init__.py index 029b8d1..4a3c0a8 100644 --- a/noa-harvester/noaharvester/__init__.py +++ b/noa-harvester/noaharvester/__init__.py @@ -1,2 +1,2 @@ # pylint:disable=missing-module-docstring -__version__ = "0.8.0" +__version__ = "0.9.0" diff --git a/noa-harvester/noaharvester/cli.py b/noa-harvester/noaharvester/cli.py index 9aeb494..73f4996 100644 --- a/noa-harvester/noaharvester/cli.py +++ b/noa-harvester/noaharvester/cli.py @@ -5,17 +5,24 @@ """ from __future__ import annotations +import os import sys +import datetime +import json +from time import sleep import logging from pathlib import Path import click from click import Argument, Option - +from kafka import KafkaConsumer as k_KafkaConsumer # Appending the module path in order to have a kind of cli "dry execution" sys.path.append(str(Path(__file__).parent / "..")) from noaharvester import harvester # noqa:402 pylint:disable=wrong-import-position +from noaharvester.utils import Message +from noaharvester.messaging import AbstractConsumer +from noaharvester.messaging.kafka_consumer import KafkaConsumer logger = logging.getLogger(__name__) @@ -112,6 +119,112 @@ def download( click.echo("Done.\n") +@cli.command( + help=( + """ + Microservice facilitating EO data downloads. Implemented by + using a kafka producer/consumer pattern. + """ + ) +) +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Shows the progress indicator (for Copernicus only)", +) +@click.option( + "--test", + "-t", + is_flag=True, + help="Testing kafka receiving requests. No other functionality", +) +@click.argument("config_file", required=True) +@click.option("--output_path", default="./data", help="Output path") +def noa_harvester_service( + config_file: Argument | str, + output_path: Option | str, + verbose: Option | bool, + test: Option | bool +) -> None: + """ + Instantiate Harvester class activate service, listening to kafka topic. + When triggered, downloads all ids from Products table, based on the + list argument from kafka message. + + Parameters: + output_path (click.Option | str): where to download to + verbose (click.Option | bool): to show download progress indicator or not + """ + # if config_file: + logger.debug("Starting NOA-Harvester service...") + + harvest = harvester.Harvester( + config_file=config_file, + output_path=output_path, + verbose=verbose, + is_service=True + ) + + consumer: AbstractConsumer | k_KafkaConsumer = None + + # Warning: topics is a list, even if there is only one topic + topics = [harvest._config.get( + "topics", os.environ.get( + "KAFKA_INPUT_TOPIC", "harvester.order.requested" + ) + )] + schema_def = Message.schema_request() + num_partitions = int(harvest._config.get( + "num_partitions", os.environ.get( + "KAFKA_NUM_PARTITIONS", 2 + ) + )) + replication_factor = int(harvest._config.get( + "replication_factor", os.environ.get( + "KAFKA_REPLICATION_FACTOR", 3 + ) + )) + + while consumer is None: + consumer = KafkaConsumer( + bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), + group_id=harvest._config.get("group_id", "harvester-group-request"), + # Topics are none in the constructor, and then it subscribes/ + # If harvester is responsible for also creating them, + # it should be done here. + # topics=None, + topics=topics, + schema=schema_def + ) + consumer.create_topics( + topics=topics, num_partitions=num_partitions, replication_factor=replication_factor) + if consumer is None: + sleep(5) + + # consumer.subscribe(topics=topics) + click.echo(f"NOA-Harvester service started. Output path: {output_path}\n") + + while True: + for message in consumer.read(): + item = message.value + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + msg = f"Digesting Item from Topic {message.topic} ({now_time})..." + msg += "\n> Item: " + json.dumps(item) + logger.debug(msg) + click.echo("Received list to download") + if test: + downloaded_uuids = "Some downloaded ids" + failed_uuids = "Some failed ids" + else: + uuid_list = json.loads(item) + downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid_list["uuid"]) + logger.debug("Downloaded items: %s", downloaded_uuids) + if failed_uuids: + logger.error("Failed uuids: %s", failed_uuids) + sleep(1) + + # TODO v2: integrate functionality in download command @cli.command( help=( @@ -128,7 +241,7 @@ def download( ) @click.argument("config_file", required=True) @click.option("--output_path", default="./data", help="Output path") -@click.option("--uuid", "-u", multiple=True, help="Uuid. Can be set multiple times") +@click.option("--uuid", "-u", multiple=True, help="Id field of products table. Can be set multiple times") def from_uuid_list( config_file: Argument | str, output_path: Option | str, @@ -137,7 +250,7 @@ def from_uuid_list( ) -> None: """ Instantiate Harvester class and call download function. - Downloads all relevant data as defined in the config file. + Downloads all ids from Products table, based on the --uuid multiple option. Parameters: config_file (click.Argument | str): config json file listing @@ -154,7 +267,9 @@ def from_uuid_list( config_file=config_file, output_path=output_path, verbose=verbose, - from_uri=True + # TODO is not a service, but needs refactoring of Harvester and logic + # since we have changed the functionality + is_service=True ) downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid) if failed_uuids: @@ -163,7 +278,7 @@ def from_uuid_list( # harvest.test_db_connection() print(downloaded_uuids) click.echo("Done.\n") - return downloaded_uuids + return downloaded_uuids, failed_uuids @cli.command( diff --git a/noa-harvester/noaharvester/db/utils.py b/noa-harvester/noaharvester/db/utils.py index 2c557c2..aee8481 100644 --- a/noa-harvester/noaharvester/db/utils.py +++ b/noa-harvester/noaharvester/db/utils.py @@ -82,7 +82,7 @@ def update_uuid(config, table, uuid, column, value): sql = f""" UPDATE {table} SET {column} = %s - WHERE products.uuid = %s; + WHERE products.id = %s; """ if column == "id": # Do not explain diff --git a/noa-harvester/noaharvester/harvester.py b/noa-harvester/noaharvester/harvester.py index 969c7a1..dce1aac 100644 --- a/noa-harvester/noaharvester/harvester.py +++ b/noa-harvester/noaharvester/harvester.py @@ -3,6 +3,7 @@ from __future__ import annotations from copy import deepcopy +import os import logging import json import zipfile @@ -26,12 +27,12 @@ class Harvester: def __init__( self, - config_file: str, + config_file: str = None, output_path: str = None, shape_file: str = None, verbose: bool = False, bbox_only: bool = False, - from_uri: bool = False + is_service: bool = False ) -> Harvester: """ Harvester class. Constructor reads and loads the search items json file. @@ -41,6 +42,7 @@ def __init__( shape_file (str - Optional): Read and use shapefile instead of config coordinates. verbose (bool - Optional): Indicate if Copernicus download progress is verbose. """ + self._config = {} self._config_filename = config_file self._output_path = output_path self._verbose = verbose @@ -56,7 +58,7 @@ def __init__( with open(config_file, encoding="utf8") as f: self._config = json.load(f) - if from_uri: + if is_service: pass else: for item in self._config: @@ -71,7 +73,7 @@ def __init__( logger.debug("Appending search item: %s", item) logger.debug("Total search items: %s", len(self._search_items)) - def download_from_uuid_list(self, uuid_list) -> tuple[list, list]: + def download_from_uuid_list(self, uuid_list: list[str]) -> tuple[list, list]: """ Utilize the minimum provider interface for downloading single items """ @@ -88,10 +90,11 @@ def download_from_uuid_list(self, uuid_list) -> tuple[list, list]: for single_uuid in uuid_list: uuid_db_entry = db_utils.query_all_from_table_column_value( - db_config, "products", "uuid", single_uuid) + db_config, "products", "id", single_uuid) provider = db_utils.query_all_from_table_column_value( db_config, "providers", "id", uuid_db_entry.get("provider_id", None) ).get("name") + # TODO coupling of db with module if provider == "cdse": provider = "copernicus" print(provider) @@ -99,8 +102,8 @@ def download_from_uuid_list(self, uuid_list) -> tuple[list, list]: # Check for uuid as passed from request. It should replace uri # uuid = None # Test uuid: "83c19de3-e045-40bd-9277-836325b4b64e" if uuid_db_entry: - logger.debug("Found db entry with uuid: %s", single_uuid) - uuid_title = (single_uuid, uuid_db_entry.get("name")) + logger.debug("Found db entry in Products table with id: %s", single_uuid) + uuid_title = (uuid_db_entry.get("uuid"), uuid_db_entry.get("name")) print(uuid_title) downloaded_item_path = download_provider.single_download(*uuid_title) # Unfortunately, need to distinguish cases: @@ -117,12 +120,20 @@ def download_from_uuid_list(self, uuid_list) -> tuple[list, list]: update_item_path = db_utils.update_uuid( db_config, "products", single_uuid, "path", str(downloaded_item_path)) - + # TODO delete order_id if update_status & update_item_path: downloaded_items.append(single_uuid) else: failed_items.append(single_uuid) logger.error("Could not update uuid: %s", single_uuid) + + kafka_topic = os.environ.get("KAFKA_INPUT_TOPIC", "harvester.order.completed") + try: + utils.send_kafka_message(kafka_topic, downloaded_items, failed_items) + logger.info("Kafka message sent") + except Exception as e: + logger.error(f"Error sending kafka message: {e}") + return (downloaded_items, failed_items) def test_db_connection(self): diff --git a/noa-harvester/noaharvester/messaging/__init__.py b/noa-harvester/noaharvester/messaging/__init__.py new file mode 100644 index 0000000..a52dcc7 --- /dev/null +++ b/noa-harvester/noaharvester/messaging/__init__.py @@ -0,0 +1,40 @@ +class AbstractProducer(object): + """ + Abstract Kafka Producer doing nothing. + """ + def __init__(self, bootstrap_servers: list, schema: dict) -> None: + """ + Create the Producer instance. + """ + self.bootstrap_servers = bootstrap_servers + self.schema = schema + + def send(self, topic: str, key: str, value: dict) -> None: + """ + Send the specified Value to a Kafka Topic. + """ + +class AbstractConsumer(object): + """ + Abstract Kafka Consumer doing nothing. + """ + def __init__(self, bootstrap_servers: list, group_id: str, topics: list, schema: dict) -> None: + """ + Create the Consumer instance. + """ + self.bootstrap_servers = bootstrap_servers + self.group_id = group_id + self.topics = topics + self.schema = schema + + def create_topics(self, topics: list, num_partitions: int = 2, replication_factor: int = 1): + """ + Create the specified list of Topics. + """ + return [] + + def read(self): + """ + Read messages from the configured Kafka Topics. + """ + return [] diff --git a/noa-harvester/noaharvester/messaging/kafka_consumer.py b/noa-harvester/noaharvester/messaging/kafka_consumer.py new file mode 100644 index 0000000..fd7c3cf --- /dev/null +++ b/noa-harvester/noaharvester/messaging/kafka_consumer.py @@ -0,0 +1,55 @@ +""" +Simple kafka producer schema +""" + +import json +from kafka import KafkaConsumer as k_KafkaConsumer +from kafka.admin import KafkaAdminClient, NewTopic + +from noaharvester import messaging as noa_messaging + + +class KafkaConsumer(noa_messaging.AbstractConsumer): + """ + Kafka Consumer using https://kafka-python.readthedocs.io/ with JSON deserialization. + """ + def __init__(self, bootstrap_servers: list, group_id: str, topics: list, schema: dict) -> k_KafkaConsumer: + """ + Create the Consumer instance. + """ + super(KafkaConsumer, self).__init__( + bootstrap_servers=bootstrap_servers, group_id=group_id, topics=topics, schema=schema + ) + self.consumer = k_KafkaConsumer( + *topics, + bootstrap_servers=bootstrap_servers, + group_id=group_id, + auto_offset_reset='earliest', + value_deserializer=lambda x: json.loads(x.decode('utf-8')) + ) + + def create_topics(self, topics: list, num_partitions: int = 2, replication_factor: int = 1): + """ + Create the specified list of Topics. + """ + admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers) + topic_list = [] + + new_topics = set(topics) - set(self.consumer.topics()) + for topic in new_topics: + try: + t = NewTopic(name=topic, num_partitions=num_partitions, replication_factor=replication_factor) + admin_client.create_topics(new_topics=[t], validate_only=False) + topic_list.append(t) + # Ignore the error when the Topic already exists. + except: + pass + + return topic_list + + def read(self): + """ + Read messages from the configured Kafka Topics. + """ + for message in self.consumer: + yield message \ No newline at end of file diff --git a/noa-harvester/noaharvester/messaging/kafka_producer.py b/noa-harvester/noaharvester/messaging/kafka_producer.py new file mode 100644 index 0000000..b36b223 --- /dev/null +++ b/noa-harvester/noaharvester/messaging/kafka_producer.py @@ -0,0 +1,32 @@ +""" +Simple kafka producer schema +""" + +import json +from kafka import KafkaProducer as k_KafkaProducer + +from noaharvester import messaging as noa_messaging + + +class KafkaProducer(noa_messaging.AbstractProducer): + """ + Kafka Producer using https://kafka-python.readthedocs.io/ with JSON serialization. + """ + def __init__(self, bootstrap_servers: list, schema: dict) -> None: + """ + Create the Producer instance. + """ + super(KafkaProducer, self).__init__( + bootstrap_servers=bootstrap_servers, schema=schema + ) + self.producer = k_KafkaProducer( + bootstrap_servers=bootstrap_servers, + value_serializer=lambda x: json.dumps(x).encode('utf-8') + ) + + def send(self, topic: str, key: str, value: dict) -> None: + """ + Send the specified Value to a Kafka Topic. + """ + self.producer.send(topic, value) + self.producer.flush() diff --git a/noa-harvester/noaharvester/messaging/message.py b/noa-harvester/noaharvester/messaging/message.py new file mode 100644 index 0000000..3e89efb --- /dev/null +++ b/noa-harvester/noaharvester/messaging/message.py @@ -0,0 +1,55 @@ +class Message: + """ + Defines the message. + """ + + _schema_request_def = { + "namespace": "noa.harvester.order", + "type": "object", + "properties": { + "uuid": { + "type": "array", + "items": { + "type": "string", + "uniqueItems": True, + } + } + }, + "required": ["uuid"] + } + + _schema_response_def = { + "namespace": "noa.harvester.order", + "type": "object", + "properties": { + "succeeded": { + "type": "array", + "items": { + "type": "string", + "uniqueItems": True, + } + }, + "failed": { + "type": "array", + "items": { + "type": "string", + "uniqueItems": True, + } + } + }, + "required": ["succeeded", "failed"] + } + + @staticmethod + def schema_request() -> dict: + """ + Returns the Schema definition of this type. + """ + return Message._schema_request_def + + @staticmethod + def schema_response() -> dict: + """ + Returns the Schema definition of this type. + """ + return Message._schema_response_def \ No newline at end of file diff --git a/noa-harvester/noaharvester/utils.py b/noa-harvester/noaharvester/utils.py index d537036..be76dba 100644 --- a/noa-harvester/noaharvester/utils.py +++ b/noa-harvester/noaharvester/utils.py @@ -1,9 +1,13 @@ """Utility functions""" +import os import logging from pyproj import Transformer, CRS import shapefile +from noaharvester.messaging.kafka_producer import KafkaProducer +from noaharvester.messaging.message import Message + logger = logging.getLogger(__name__) @@ -56,3 +60,12 @@ def get_bbox_from_shp(shp_path: str, bbox_only: bool) -> list: bboxes.append([west, south, east, north]) return bboxes + + +def send_kafka_message(topic, succeeded, failed): + schema_def = Message.schema_response() + bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092") + + producer = KafkaProducer(bootstrap_servers=bootstrap_servers, schema=schema_def) + kafka_message = {"succeeded": succeeded, "failed": failed} + producer.send(topic=topic, value=kafka_message) diff --git a/noa-harvester/pyproject.toml b/noa-harvester/pyproject.toml index f46d42a..70dbdbd 100644 --- a/noa-harvester/pyproject.toml +++ b/noa-harvester/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "click>=8.1.7", "cdsetool>=0.2.13", "earthaccess>=0.9.0", + "kafka-python-ng==2.2.3", "psycopg>=3.2.3", "pyproj>=3.6.1", "pyshp>=2.3.1", diff --git a/noa-harvester/requirements.txt b/noa-harvester/requirements.txt index 59dd0da..5c7af37 100644 --- a/noa-harvester/requirements.txt +++ b/noa-harvester/requirements.txt @@ -1,6 +1,7 @@ cdsetool==0.2.13 click==8.1.7 earthaccess==0.9.0 +kafka-python-ng==2.2.3 psycopg==3.2.3 pyproj==3.6.1 pyshp==2.3.1