Skip to content

Commit

Permalink
Merge pull request #79 from Agri-Hub/77-harvester-hotfixes-for-poc
Browse files Browse the repository at this point in the history
77 harvester hotfixes for poc
  • Loading branch information
fbalaban authored Dec 2, 2024
2 parents e285a86 + 1c7b00e commit 232b1b1
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 46 deletions.
3 changes: 3 additions & 0 deletions noa-harvester/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.10.0] - 2024-12-02
### Fixed
- Some minor hot-fixes (#77)

## [0.9.0] - 2024-11-21
### Added
Expand Down
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# pylint:disable=missing-module-docstring
__version__ = "0.9.0"
__version__ = "0.10.0"
71 changes: 38 additions & 33 deletions noa-harvester/noaharvester/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations
import os
import sys
import datetime
from datetime import datetime
import json
from time import sleep
import logging
Expand All @@ -20,9 +20,9 @@
sys.path.append(str(Path(__file__).parent / ".."))

from noaharvester import harvester # noqa:402 pylint:disable=wrong-import-position
from noaharvester.utils import Message
from noaharvester.messaging import AbstractConsumer
from noaharvester.messaging.kafka_consumer import KafkaConsumer
from noaharvester.messaging.message import Message # noqa:402 pylint:disable=wrong-import-position
from noaharvester.messaging import AbstractConsumer # noqa:402 pylint:disable=wrong-import-position
from noaharvester.messaging.kafka_consumer import KafkaConsumer # noqa:402 pylint:disable=wrong-import-position

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,10 +121,10 @@ def download(

@cli.command(
help=(
"""
Microservice facilitating EO data downloads. Implemented by
using a kafka producer/consumer pattern.
"""
"""
Microservice facilitating EO data downloads. Implemented by
using a kafka producer/consumer pattern.
"""
)
)
@click.option(
Expand Down Expand Up @@ -169,18 +169,18 @@ def noa_harvester_service(
consumer: AbstractConsumer | k_KafkaConsumer = None

# Warning: topics is a list, even if there is only one topic
topics = [harvest._config.get(
topics = [harvest.config.get(
"topics", os.environ.get(
"KAFKA_INPUT_TOPIC", "harvester.order.requested"
)
)]
schema_def = Message.schema_request()
num_partitions = int(harvest._config.get(
num_partitions = int(harvest.config.get(
"num_partitions", os.environ.get(
"KAFKA_NUM_PARTITIONS", 2
)
))
replication_factor = int(harvest._config.get(
replication_factor = int(harvest.config.get(
"replication_factor", os.environ.get(
"KAFKA_REPLICATION_FACTOR", 3
)
Expand All @@ -189,7 +189,7 @@ def noa_harvester_service(
while consumer is None:
consumer = KafkaConsumer(
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
group_id=harvest._config.get("group_id", "harvester-group-request"),
group_id=harvest.config.get("group_id", "harvester-group-request"),
# Topics are none in the constructor, and then it subscribes/
# If harvester is responsible for also creating them,
# it should be done here.
Expand Down Expand Up @@ -241,13 +241,18 @@ def noa_harvester_service(
)
@click.argument("config_file", required=True)
@click.option("--output_path", default="./data", help="Output path")
@click.option("--uuid", "-u", multiple=True, help="Id field of products table. Can be set multiple times")
@click.option(
"--uuid",
"-u",
multiple=True,
help="Id field of products table. Can be set multiple times"
)
def from_uuid_list(
config_file: Argument | str,
output_path: Option | str,
uuid: Option | tuple[str],
verbose: Option | bool,
) -> None:
) -> tuple:
"""
Instantiate Harvester class and call download function.
Downloads all ids from Products table, based on the --uuid multiple option.
Expand All @@ -259,26 +264,26 @@ def from_uuid_list(
uuid (click.Option | tuple[str]): A tuple of uuids to download
verbose (click.Option | bool): to show download progress indicator or not
"""
if config_file:
logger.debug("Cli download for config file: %s", config_file)

click.echo(f"Downloading at: {output_path}\n")
harvest = harvester.Harvester(
config_file=config_file,
output_path=output_path,
verbose=verbose,
# TODO is not a service, but needs refactoring of Harvester and logic
# since we have changed the functionality
is_service=True
)
downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid)
if failed_uuids:
logger.error("Failed uuids: %s", failed_uuids)
# TODO The following is a dev test: to be converted to unit tests
# harvest.test_db_connection()
print(downloaded_uuids)
click.echo("Done.\n")
return downloaded_uuids, failed_uuids
logger.debug("Cli download for config file: %s", config_file)

click.echo(f"Downloading at: {output_path}\n")
harvest = harvester.Harvester(
config_file=config_file,
output_path=output_path,
verbose=verbose,
# TODO is not a service, but needs refactoring of Harvester and logic
# since we have changed the functionality
is_service=True
)
downloaded_uuids, failed_uuids = harvest.download_from_uuid_list(uuid)
if failed_uuids:
logger.error("Failed uuids: %s", failed_uuids)
# TODO The following is a dev test: to be converted to unit tests
# harvest.test_db_connection()
print(downloaded_uuids)
click.echo("Done.\n")
return downloaded_uuids, failed_uuids


@cli.command(
Expand Down
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def update_uuid(config, table, uuid, column, value):
curs.execute(sql, (value, uuid))
conn.commit()

# If updated succesfully, return True
# If updated successfully, return True
if curs.rowcount > 0:
return True
return False
Expand Down
10 changes: 8 additions & 2 deletions noa-harvester/noaharvester/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def __init__(
logger.debug("Appending search item: %s", item)
logger.debug("Total search items: %s", len(self._search_items))

@property
def config(self):
"""Get config"""
return self._config

def download_from_uuid_list(self, uuid_list: list[str]) -> tuple[list, list]:
"""
Utilize the minimum provider interface for downloading single items
Expand Down Expand Up @@ -131,12 +136,13 @@ def download_from_uuid_list(self, uuid_list: list[str]) -> tuple[list, list]:
try:
utils.send_kafka_message(kafka_topic, downloaded_items, failed_items)
logger.info("Kafka message sent")
except Exception as e:
logger.error(f"Error sending kafka message: {e}")
except BrokenPipeError as e:
logger.error("Error sending kafka message: %s ", e)

return (downloaded_items, failed_items)

def test_db_connection(self):
"""Out of place Testing"""
db_config = db_utils.get_env_config()
if not db_config:
db_config = db_utils.get_local_config()
Expand Down
4 changes: 2 additions & 2 deletions noa-harvester/noaharvester/messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class AbstractProducer(object):
class AbstractProducer():
"""
Abstract Kafka Producer doing nothing.
"""
Expand All @@ -14,7 +14,7 @@ def send(self, topic: str, key: str, value: dict) -> None:
Send the specified Value to a Kafka Topic.
"""

class AbstractConsumer(object):
class AbstractConsumer():
"""
Abstract Kafka Consumer doing nothing.
"""
Expand Down
11 changes: 7 additions & 4 deletions noa-harvester/noaharvester/messaging/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
"""

import json
import logging
from kafka import KafkaConsumer as k_KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic

from noaharvester import messaging as noa_messaging

logger = logging.getLogger(__name__)


class KafkaConsumer(noa_messaging.AbstractConsumer):
"""
Expand Down Expand Up @@ -42,14 +45,14 @@ def create_topics(self, topics: list, num_partitions: int = 2, replication_facto
admin_client.create_topics(new_topics=[t], validate_only=False)
topic_list.append(t)
# Ignore the error when the Topic already exists.
except:
pass
except RuntimeWarning:
logging.warning("Topic %s exists. Just a warning", topic)
continue

return topic_list

def read(self):
"""
Read messages from the configured Kafka Topics.
"""
for message in self.consumer:
yield message
yield from self.consumer
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/messaging/kafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def send(self, topic: str, key: str, value: dict) -> None:
"""
Send the specified Value to a Kafka Topic.
"""
self.producer.send(topic, value)
self.producer.send(topic, key=key, value=value)
self.producer.flush()
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/messaging/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ def schema_response() -> dict:
"""
Returns the Schema definition of this type.
"""
return Message._schema_response_def
return Message._schema_response_def
2 changes: 1 addition & 1 deletion noa-harvester/noaharvester/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ def send_kafka_message(topic, succeeded, failed):

producer = KafkaProducer(bootstrap_servers=bootstrap_servers, schema=schema_def)
kafka_message = {"succeeded": succeeded, "failed": failed}
producer.send(topic=topic, value=kafka_message)
producer.send(topic=topic, key=None, value=kafka_message)

0 comments on commit 232b1b1

Please sign in to comment.