Skip to content

Commit

Permalink
Merge pull request #76 from Agri-Hub/75-harvester-hotfix-database-cor…
Browse files Browse the repository at this point in the history
…rections

75 harvester hotfix database corrections
  • Loading branch information
fbalaban authored Nov 21, 2024
2 parents 092f951 + c459469 commit 204c2be
Show file tree
Hide file tree
Showing 16 changed files with 373 additions and 21 deletions.
9 changes: 9 additions & 0 deletions noa-harvester/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.9.0] - 2024-11-21
### Added
- Added publishing to kafka topic the successful/failed id lists (#75)
- Added consuming from kafka topic, getting id lists for download (#75)
- Added new cli command (noa_harvester_service) (#75)

### Changed
- Downloading from id (products table, column id) now returns successful and failed lists (#75)

## [0.8.0] - 2024-11-08
### Added
- Support for uuid list download of CDSE. Gateway CLI support and postgres read/update (#67)
Expand Down
24 changes: 18 additions & 6 deletions noa-harvester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ It can be used as a standalone cli application or be built in a Docker container
The noaharvester processor can be executed as:
- Standalone [**Cli application**](#standalone-cli-execution) or
- Inside a [**Container**](#docker-execution)
- As a container, inside a Kubernetes environment with kafka, with a postgres database. This is a Beyond specific setup, where a user can instantiate Harvester and request the download of Products, based on an id from the postgres Products table. This table includes a uuid and a title fields, necessary to construct the request to CDSE (for now). Then, it updates the products table for the downloaded path, while it posts to a kafka topic the result for each of these ids.
- As a microservice inside a Kubernetes environment with kafka, with a postgres database. Same as above, but now it can be deployed as a service: always listening to the appointed kafka topic for uuid lists, inside a specific message.

In either case, a user must have credentials for accessing one or more data hubs:
- [Copernicus]
Expand Down Expand Up @@ -200,12 +202,14 @@ Cli can be executed with the following:

- Commands
* `download` - The main option. Downloads according with the config file parameters.
* `from-uuid-list` - Download from uuid (e.g. id in Sentinel 2 metadata products) list. Needs to be combined with -u option. Necessary a db connection (TODO: optional)
* `from-uuid-list` - Download from uuid db list. Needs to be combined with -u option. Necessary a db connection (TODO: optional)
* `noa_harvester_service` - Deploy a service always listening to a specific kafka topic (can also be defined in config file - look at config/config_service.json).
* `query` - Queries the collection(s) for products according to the parameters present in the config file.
* `describe` (Copernicus only) - Describes the available query parameters for the collections as defined in the config file.
- Options
* `--output_path` (download only) Custom download location. Default is `.data`
* `-u, --uuid` [**multiple**] (from-uuid-list only). Multiple option of uuids.
* `-u, --uuid` [**multiple**] (from-uuid-list only). Multiple option of uuids.
* `-t, --test` **Service only**: testing kafka consumer functionality.
* `-bb, --bbox_only` Draw total bbox, not individual polygons in multipolygon shapefile.
* `-v`, `--verbose` Shows the progress indicator when downloading (Copernicus - only for download command)
* `--log LEVEL (INFO, DEBUG, WARNING, ERROR)` Shows the logs depending on the selected `LEVEL`
Expand All @@ -224,7 +228,7 @@ The necessary env vars are:
`DB_PORT`
`DB_NAME`

Moreover, Harvester will query the db to get the UUID (to query based on the input uuid) and Title of the product to be downloaded (it does not query CDSE for metadata - it only downloads).
Moreover, Harvester will query the db to get the UUID of the Product to be downloaded, and Title (it does not query CDSE for metadata - it only downloads).
So make sure that a postgres with a table named "Products", includes at least a `uuid` field and a `name` field.

## Examples
Expand All @@ -237,15 +241,23 @@ docker run -it \
noaharvester describe config/config.json
```

* Download (with download indicator) from Copernicus providing a uuid list and store in mnt point:
* Download (with download indicator) from Copernicus providing an id list (which corresponds to an entry in Products db table) and store in mnt point:

```
docker run -it \
-v ./config/config.json:/app/config/config.json \
-v ./config/config.json:/app/config/config_from_id.json \
-v /mnt/data:/app/data \
noaharvester from-uuid-list -v -u caf8620d-974d-5841-b315-7489ffdd853b config/config.json
noaharvester from-uuid-list -v -u caf8620d-974d-5841-b315-7489ffdd853b config/config_from_id.json
```

* Deploying Harvester as a service (for kafka testing - if you do not want to test, omit flag -t):

```
docker run -it \
-v ./config/config.json:/app/config/config_service.json \
-v /mnt/data:/app/data \
noaharvester noa-harvester-service -v -t config/config_service.json
```

* Download (with download indicator) from Copernicus and Earthdata as defined in the config file, for an area provided by the shapefile files (`area.shp` and `area.prj`) located in folder `/home/user/project/strange_area`:

Expand Down
File renamed without changes.
6 changes: 6 additions & 0 deletions noa-harvester/config/config_service.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"group_id": "harvester-group-request",
"topics": ["harvester.order.requested"],
"num_partitions": 2,
"replication_factor": 3
}
2 changes: 2 additions & 0 deletions noa-harvester/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ services:
- DB_HOST
- DB_PORT
- DB_NAME
environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092,kafka3:9092"
working_dir: /app
volumes:
- ./config:/app/config
Expand Down
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# pylint:disable=missing-module-docstring
__version__ = "0.8.0"
__version__ = "0.9.0"
125 changes: 120 additions & 5 deletions noa-harvester/noaharvester/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
"""

from __future__ import annotations
import os
import sys
import datetime
import json
from time import sleep
import logging
from pathlib import Path

import click
from click import Argument, Option

from kafka import KafkaConsumer as k_KafkaConsumer
# Appending the module path in order to have a kind of cli "dry execution"
sys.path.append(str(Path(__file__).parent / ".."))

from noaharvester import harvester # noqa:402 pylint:disable=wrong-import-position
from noaharvester.utils import Message
from noaharvester.messaging import AbstractConsumer
from noaharvester.messaging.kafka_consumer import KafkaConsumer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,6 +119,112 @@ def download(
click.echo("Done.\n")


@cli.command(
help=(
"""
Microservice facilitating EO data downloads. Implemented by
using a kafka producer/consumer pattern.
"""
)
)
@click.option(
"--verbose",
"-v",
is_flag=True,
help="Shows the progress indicator (for Copernicus only)",
)
@click.option(
"--test",
"-t",
is_flag=True,
help="Testing kafka receiving requests. No other functionality",
)
@click.argument("config_file", required=True)
@click.option("--output_path", default="./data", help="Output path")
def noa_harvester_service(
config_file: Argument | str,
output_path: Option | str,
verbose: Option | bool,
test: Option | bool
) -> None:
"""
Instantiate Harvester class activate service, listening to kafka topic.
When triggered, downloads all ids from Products table, based on the
list argument from kafka message.
Parameters:
output_path (click.Option | str): where to download to
verbose (click.Option | bool): to show download progress indicator or not
"""
# if config_file:
logger.debug("Starting NOA-Harvester service...")

harvest = harvester.Harvester(
config_file=config_file,
output_path=output_path,
verbose=verbose,
is_service=True
)

consumer: AbstractConsumer | k_KafkaConsumer = None

# Warning: topics is a list, even if there is only one topic
topics = [harvest._config.get(
"topics", os.environ.get(
"KAFKA_INPUT_TOPIC", "harvester.order.requested"
)
)]
schema_def = Message.schema_request()
num_partitions = int(harvest._config.get(
"num_partitions", os.environ.get(
"KAFKA_NUM_PARTITIONS", 2
)
))
replication_factor = int(harvest._config.get(
"replication_factor", os.environ.get(
"KAFKA_REPLICATION_FACTOR", 3
)
))

while consumer is None:
consumer = KafkaConsumer(
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
group_id=harvest._config.get("group_id", "harvester-group-request"),
# Topics are none in the constructor, and then it subscribes/
# If harvester is responsible for also creating them,
# it should be done here.
# topics=None,
topics=topics,
schema=schema_def
)
consumer.create_topics(
topics=topics, num_partitions=num_partitions, replication_factor=replication_factor)
if consumer is None:
sleep(5)

# consumer.subscribe(topics=topics)
click.echo(f"NOA-Harvester service started. Output path: {output_path}\n")

while True:
for message in consumer.read():
item = message.value
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
msg = f"Digesting Item from Topic {message.topic} ({now_time})..."
msg += "\n> Item: " + json.dumps(item)
logger.debug(msg)
click.echo("Received list to download")
if test:
downloaded_uuids = "Some downloaded ids"
failed_uuids = "Some failed ids"
else:
uuid_list = json.loads(item)
downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid_list["uuid"])
logger.debug("Downloaded items: %s", downloaded_uuids)
if failed_uuids:
logger.error("Failed uuids: %s", failed_uuids)
sleep(1)


# TODO v2: integrate functionality in download command
@cli.command(
help=(
Expand All @@ -128,7 +241,7 @@ def download(
)
@click.argument("config_file", required=True)
@click.option("--output_path", default="./data", help="Output path")
@click.option("--uuid", "-u", multiple=True, help="Uuid. Can be set multiple times")
@click.option("--uuid", "-u", multiple=True, help="Id field of products table. Can be set multiple times")
def from_uuid_list(
config_file: Argument | str,
output_path: Option | str,
Expand All @@ -137,7 +250,7 @@ def from_uuid_list(
) -> None:
"""
Instantiate Harvester class and call download function.
Downloads all relevant data as defined in the config file.
Downloads all ids from Products table, based on the --uuid multiple option.
Parameters:
config_file (click.Argument | str): config json file listing
Expand All @@ -154,7 +267,9 @@ def from_uuid_list(
config_file=config_file,
output_path=output_path,
verbose=verbose,
from_uri=True
# TODO is not a service, but needs refactoring of Harvester and logic
# since we have changed the functionality
is_service=True
)
downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid)
if failed_uuids:
Expand All @@ -163,7 +278,7 @@ def from_uuid_list(
# harvest.test_db_connection()
print(downloaded_uuids)
click.echo("Done.\n")
return downloaded_uuids
return downloaded_uuids, failed_uuids


@cli.command(
Expand Down
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def update_uuid(config, table, uuid, column, value):
sql = f"""
UPDATE {table}
SET {column} = %s
WHERE products.uuid = %s;
WHERE products.id = %s;
"""
if column == "id":
# Do not explain
Expand Down
27 changes: 19 additions & 8 deletions noa-harvester/noaharvester/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations
from copy import deepcopy

import os
import logging
import json
import zipfile
Expand All @@ -26,12 +27,12 @@ class Harvester:

def __init__(
self,
config_file: str,
config_file: str = None,
output_path: str = None,
shape_file: str = None,
verbose: bool = False,
bbox_only: bool = False,
from_uri: bool = False
is_service: bool = False
) -> Harvester:
"""
Harvester class. Constructor reads and loads the search items json file.
Expand All @@ -41,6 +42,7 @@ def __init__(
shape_file (str - Optional): Read and use shapefile instead of config coordinates.
verbose (bool - Optional): Indicate if Copernicus download progress is verbose.
"""
self._config = {}
self._config_filename = config_file
self._output_path = output_path
self._verbose = verbose
Expand All @@ -56,7 +58,7 @@ def __init__(
with open(config_file, encoding="utf8") as f:
self._config = json.load(f)

if from_uri:
if is_service:
pass
else:
for item in self._config:
Expand All @@ -71,7 +73,7 @@ def __init__(
logger.debug("Appending search item: %s", item)
logger.debug("Total search items: %s", len(self._search_items))

def download_from_uuid_list(self, uuid_list) -> tuple[list, list]:
def download_from_uuid_list(self, uuid_list: list[str]) -> tuple[list, list]:
"""
Utilize the minimum provider interface for downloading single items
"""
Expand All @@ -88,19 +90,20 @@ def download_from_uuid_list(self, uuid_list) -> tuple[list, list]:

for single_uuid in uuid_list:
uuid_db_entry = db_utils.query_all_from_table_column_value(
db_config, "products", "uuid", single_uuid)
db_config, "products", "id", single_uuid)
provider = db_utils.query_all_from_table_column_value(
db_config, "providers", "id", uuid_db_entry.get("provider_id", None)
).get("name")
# TODO coupling of db with module
if provider == "cdse":
provider = "copernicus"
print(provider)
download_provider = self._resolve_provider_instance(provider)
# Check for uuid as passed from request. It should replace uri
# uuid = None # Test uuid: "83c19de3-e045-40bd-9277-836325b4b64e"
if uuid_db_entry:
logger.debug("Found db entry with uuid: %s", single_uuid)
uuid_title = (single_uuid, uuid_db_entry.get("name"))
logger.debug("Found db entry in Products table with id: %s", single_uuid)
uuid_title = (uuid_db_entry.get("uuid"), uuid_db_entry.get("name"))
print(uuid_title)
downloaded_item_path = download_provider.single_download(*uuid_title)
# Unfortunately, need to distinguish cases:
Expand All @@ -117,12 +120,20 @@ def download_from_uuid_list(self, uuid_list) -> tuple[list, list]:

update_item_path = db_utils.update_uuid(
db_config, "products", single_uuid, "path", str(downloaded_item_path))

# TODO delete order_id
if update_status & update_item_path:
downloaded_items.append(single_uuid)
else:
failed_items.append(single_uuid)
logger.error("Could not update uuid: %s", single_uuid)

kafka_topic = os.environ.get("KAFKA_INPUT_TOPIC", "harvester.order.completed")
try:
utils.send_kafka_message(kafka_topic, downloaded_items, failed_items)
logger.info("Kafka message sent")
except Exception as e:
logger.error(f"Error sending kafka message: {e}")

return (downloaded_items, failed_items)

def test_db_connection(self):
Expand Down
Loading

0 comments on commit 204c2be

Please sign in to comment.