From 0ada9d1527f27503e60b278fcaf19bb4fecb6f2b Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Tue, 5 Nov 2024 13:50:26 +0300 Subject: [PATCH 1/7] Implement canonical domain update script --- bin/run-elastic-canonical-domain-update.sh | 5 + .../elastic-canonical-domain-update.py | 288 ++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100755 bin/run-elastic-canonical-domain-update.sh create mode 100644 indexer/scripts/elastic-canonical-domain-update.py diff --git a/bin/run-elastic-canonical-domain-update.sh b/bin/run-elastic-canonical-domain-update.sh new file mode 100755 index 00000000..92165b81 --- /dev/null +++ b/bin/run-elastic-canonical-domain-update.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +. bin/func.sh + +run_python indexer.scripts.elastic-canonical-domain-update "$@" diff --git a/indexer/scripts/elastic-canonical-domain-update.py b/indexer/scripts/elastic-canonical-domain-update.py new file mode 100644 index 00000000..78d56b03 --- /dev/null +++ b/indexer/scripts/elastic-canonical-domain-update.py @@ -0,0 +1,288 @@ +import argparse +import json +from logging import getLogger +from typing import Any, Dict, Generator, List, Optional, Tuple + +import mcmetadata +from elasticsearch.helpers import bulk, scan + +from indexer.app import App +from indexer.elastic import ElasticMixin + +logger = getLogger("elastic-canonical-domain-updater") + + +class CanonicalDomainUpdater(ElasticMixin, App): + + def __init__(self, process_name: str, descr: str) -> None: + super().__init__(process_name, descr) + self.batch_size: int = 0 + self.updates_buffer: List[Dict[str, Any]] = [] + self.buffer_size: int = 0 + self.index_name: str = "" + self.query: str = "" + self.total_matched_docs: Optional[int] = None + + def define_options(self, ap: argparse.ArgumentParser) -> None: + """ + Define command line arguments for the script. + Extends the parent class argument definitions. + + Args: + ap (argparse.ArgumentParser): The argument parser instance to add arguments to + + Adds the following arguments: + --index: Name of the Elasticsearch index to update + --batch: Batch size for document fetching (default: 1000) + --buffer: Size of update operation buffer (default: 2000) + --query: Elasticsearch query string for filtering documents + Returns: + None + """ + super().define_options(ap) + ap.add_argument( + "--index", + dest="index", + help="The name of the Elasticsearch index to update", + ) + + ap.add_argument( + "--batch", + dest="batch_size", + default=1000, + help="The number of documents to fetch from Elasticsearch in each batch (default: 1000)", + ) + + ap.add_argument( + "--buffer", + dest="buffer_size", + default=2000, + help="The maximum number of update operations to buffer before flushing to Elasticsearch", + ) + + ap.add_argument( + "--query", + dest="query", + help="Elasticsearch query string to filter documents for processing", + ) + + def process_args(self) -> None: + """ + Process command line arguments and set instance variables. + + Sets the following instance attributes: + index_name (str): Name of the Elasticsearch index to operate on + batch_size (int): Number of documents to retrieve per batch from Elasticsearch + buffer_size (int): Maximum number of update operations to buffer before flushing to Elasticsearch + query (str): Elasticsearch query string to filter documents for processing + + Note: + Calls the parent class's process_args() first to handle any inherited argument processing. + """ + super().process_args() + + args = self.args + assert args + + self.index_name = args.index + self.batch_size = args.batch_size + self.buffer_size = int(args.buffer_size) + self.query = args.query + + def initialize(self) -> None: + """ + Initializes the Elasticsearch instance and sets up application arguments. + Returns: + None + """ + parser = argparse.ArgumentParser() + app.define_options(parser) + app.args = parser.parse_args() + app.process_args() + self.es_client = self.elasticsearch_client() + + def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: + """ + Builds a query for searching documents whose canonical domain must be updated, the query is validated by + the Elasticsearch _validate API, before execution + + Returns: + Tuple of (is_valid, parsed_query, error_message) + """ + try: + # Try to parse the string as JSON first + query = json.loads(self.query) + + # Extract just the query part if it exists + query_body = query.get("query", query) + + # Use the Elasticsearch validate API to ensure the query is ok + validation_result = self.es_client.indices.validate_query( + index=self.index_name, body={"query": query_body}, explain=True + ) + + if validation_result["valid"]: + return True, query, None + else: + error_msg = "No detailed explanation is available" + if "error" in validation_result: + error_msg = f"Invalid Query - {validation_result['error']}" + return False, None, error_msg + except json.JSONDecodeError as e: + return False, None, f"Invalid Query: Invalid JSON format - {str(e)}" + except Exception as e: + return False, None, f"Invalid Query: Validation error - {str(e)}" + + def get_document_count(self, query: Dict[str, Any]) -> Optional[int]: + """ + Get the total number of documents matching the query + + Args: + query: Elasticsearch query + + Returns: + Total number of matching documents + """ + try: + count_response = self.es_client.count(index=self.index_name, body=query) + count = count_response.get("count") + if count is not None: + return int(count) + except Exception as e: + logger.error(f"Error getting document count: {e}") + return None + + def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: + """ + Get documents that need to be updated using scroll API + + Yields: + Document dictionaries + """ + try: + # Build the query from the string passed by the user as an arg, the build process also validates the query + success, query, error = self.build_query() + + if success: + assert query + self.total_matched_docs = self.get_document_count(query) + logger.info( + f"Found a total of [{self.total_matched_docs}] documents to update." + ) + + for hit in scan( + client=self.es_client, + query=query, + index=self.index_name, + size=self.batch_size, + preserve_order=True, + ): + yield { + "index": hit["_index"], + "source": hit["_source"], + "id": hit["_id"], + } + else: + raise Exception(error) + + except Exception as e: + logger.error(e) + + def queue_canonical_domain_update(self, doc_data: Dict[str, Any]) -> None: + """ + Extracts canonical domain from document URL and buffers an update action. + When the buffer is full, updates are flushed to Elasticsearch. + + Args: + doc_data: Dictionary containing document data with 'source.url', 'index', and 'id' fields + """ + try: + # Determine the canonical domain from mcmetadata, the URL we have on these document is a canonical URL + canonical_domain = mcmetadata.urls.canonical_domain( + doc_data["source"]["url"] + ) + + # Create an update action where we update only the canonical domain by Document ID + update_action = { + "_op_type": "update", + "_index": doc_data["index"], + "_id": doc_data["id"], + "doc": { + "canonical_domain": canonical_domain, + }, + } + + self.updates_buffer.append(update_action) + + # Check if the buffer is full and flush updates + if len(self.updates_buffer) >= self.buffer_size: + self.flush_updates() + except Exception as e: + logger.error(f"Error processing document {doc_data.get('id')}: {e}") + + def flush_updates(self) -> None: + """ + Flush the buffered updates to Elasticsearch + """ + if not self.updates_buffer: + return + try: + # Perform bulk update + success, failed = bulk( + client=self.es_client, + actions=self.updates_buffer, + refresh=False, + raise_on_error=False, + ) + + is_failed_list = isinstance(failed, list) + if is_failed_list: + assert isinstance(failed, list) + failed_count = len(failed) + else: + assert isinstance(failed, int) + failed_count = failed + logger.info(f"Bulk update: {success} successful, {failed_count} failed") + + if is_failed_list: + assert isinstance(failed, list) + for error in failed: + logger.error(f"Failed to update: {error}") + except Exception as e: + logger.error(f"Bulk update failed: {e}") + finally: + # Clear the buffer + self.updates_buffer = [] + + def main(self) -> None: + """ + Main execution method for processing canonical domain updates to documents. + + This method serves as the entry point for the application logic. It initializes the + application, retrieves documents that require updates, and processes them by queuing + updates for the canonical domain. Any exceptions encountered during execution are + logged as fatal errors. Finally, it ensures that any remaining updates in the buffer + are flushed before execution completes. + + Returns: + None + """ + try: + # Initialize the app + self.initialize() + # Get a buffer of documents that need to be updated + for doc in self.get_documents_to_update(): + self.queue_canonical_domain_update(doc) + except Exception as e: + logger.fatal(e) + finally: + # Ensure the buffer is flushed if it's not empty before finishing execution + if self.updates_buffer: + self.flush_updates() + + +if __name__ == "__main__": + app = CanonicalDomainUpdater( + "elastic-canonical-domain-updater", "Updates canonical domain" + ) + app.main() From 91c600760370989b6ee81e734b382cbbcb5cf7b3 Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Wed, 6 Nov 2024 15:25:32 +0300 Subject: [PATCH 2/7] Replace f-string in logging with positional arguments --- .../elastic-canonical-domain-update.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/indexer/scripts/elastic-canonical-domain-update.py b/indexer/scripts/elastic-canonical-domain-update.py index 78d56b03..cf0936c7 100644 --- a/indexer/scripts/elastic-canonical-domain-update.py +++ b/indexer/scripts/elastic-canonical-domain-update.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Generator, List, Optional, Tuple import mcmetadata +from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan from indexer.app import App @@ -16,6 +17,7 @@ class CanonicalDomainUpdater(ElasticMixin, App): def __init__(self, process_name: str, descr: str) -> None: super().__init__(process_name, descr) + self.es_client: Optional[Elasticsearch] = None self.batch_size: int = 0 self.updates_buffer: List[Dict[str, Any]] = [] self.buffer_size: int = 0 @@ -110,6 +112,7 @@ def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: Tuple of (is_valid, parsed_query, error_message) """ try: + assert self.es_client # Try to parse the string as JSON first query = json.loads(self.query) @@ -144,12 +147,13 @@ def get_document_count(self, query: Dict[str, Any]) -> Optional[int]: Total number of matching documents """ try: + assert self.es_client count_response = self.es_client.count(index=self.index_name, body=query) count = count_response.get("count") if count is not None: return int(count) except Exception as e: - logger.error(f"Error getting document count: {e}") + logger.error("Error getting document count: %s", e) return None def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: @@ -164,10 +168,10 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: success, query, error = self.build_query() if success: - assert query + assert query and self.es_client self.total_matched_docs = self.get_document_count(query) logger.info( - f"Found a total of [{self.total_matched_docs}] documents to update." + "Found a total of [%s] documents to update", self.total_matched_docs ) for hit in scan( @@ -218,7 +222,7 @@ def queue_canonical_domain_update(self, doc_data: Dict[str, Any]) -> None: if len(self.updates_buffer) >= self.buffer_size: self.flush_updates() except Exception as e: - logger.error(f"Error processing document {doc_data.get('id')}: {e}") + logger.error("Error processing document %s: %s", doc_data.get("id"), e) def flush_updates(self) -> None: """ @@ -227,6 +231,7 @@ def flush_updates(self) -> None: if not self.updates_buffer: return try: + assert self.es_client # Perform bulk update success, failed = bulk( client=self.es_client, @@ -242,14 +247,14 @@ def flush_updates(self) -> None: else: assert isinstance(failed, int) failed_count = failed - logger.info(f"Bulk update: {success} successful, {failed_count} failed") + logger.info("Bulk update: %s successful, %s failed", success, failed_count) if is_failed_list: assert isinstance(failed, list) for error in failed: - logger.error(f"Failed to update: {error}") + logger.error("Failed to update: %s", error) except Exception as e: - logger.error(f"Bulk update failed: {e}") + logger.error("Bulk update failed: %s", e) finally: # Clear the buffer self.updates_buffer = [] From caf47fae324e78b1af4c08001f0f2400cd995459 Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Thu, 7 Nov 2024 16:40:29 +0300 Subject: [PATCH 3/7] Add support for query string queries --- .../elastic-canonical-domain-update.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/indexer/scripts/elastic-canonical-domain-update.py b/indexer/scripts/elastic-canonical-domain-update.py index cf0936c7..07ad19b8 100644 --- a/indexer/scripts/elastic-canonical-domain-update.py +++ b/indexer/scripts/elastic-canonical-domain-update.py @@ -1,7 +1,7 @@ import argparse import json from logging import getLogger -from typing import Any, Dict, Generator, List, Optional, Tuple +from typing import Any, Dict, Generator, List, Literal, Optional, Tuple import mcmetadata from elasticsearch import Elasticsearch @@ -23,6 +23,7 @@ def __init__(self, process_name: str, descr: str) -> None: self.buffer_size: int = 0 self.index_name: str = "" self.query: str = "" + self.query_format: Literal["DLS", "query_string"] = "query_string" self.total_matched_docs: Optional[int] = None def define_options(self, ap: argparse.ArgumentParser) -> None: @@ -68,6 +69,14 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="Elasticsearch query string to filter documents for processing", ) + ap.add_argument( + "--format", + dest="query_format", + default="query_string", + choices=["DSL", "query_string"], + help="The elasticsearch query format (supported values are: [DSL, query_string])", + ) + def process_args(self) -> None: """ Process command line arguments and set instance variables. @@ -90,6 +99,7 @@ def process_args(self) -> None: self.batch_size = args.batch_size self.buffer_size = int(args.buffer_size) self.query = args.query + self.query_format = args.query_format def initialize(self) -> None: """ @@ -113,8 +123,15 @@ def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ try: assert self.es_client - # Try to parse the string as JSON first - query = json.loads(self.query) + + if ( + self.query_format == "query_string" + ): # This is query in the format canonical_domain:mediacloud.org + # Construct the query in the "query_string" format based on the value passed by the user + query = {"query": {"query_string": {"query": self.query}}} + else: + # Try to parse the string as JSON first + query = json.loads(self.query) # Extract just the query part if it exists query_body = query.get("query", query) From 9a6dc44057f1499137e1ca8e4af527f96b096bf9 Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Fri, 8 Nov 2024 10:27:01 +0300 Subject: [PATCH 4/7] Implement document retrieval using search_after and point_in_time to enhance efficiency and avoid performance bottlenecks --- .../elastic-canonical-domain-update.py | 75 +++++++++++++++---- 1 file changed, 60 insertions(+), 15 deletions(-) diff --git a/indexer/scripts/elastic-canonical-domain-update.py b/indexer/scripts/elastic-canonical-domain-update.py index 07ad19b8..7e919bc4 100644 --- a/indexer/scripts/elastic-canonical-domain-update.py +++ b/indexer/scripts/elastic-canonical-domain-update.py @@ -5,7 +5,7 @@ import mcmetadata from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk, scan +from elasticsearch.helpers import bulk from indexer.app import App from indexer.elastic import ElasticMixin @@ -17,6 +17,8 @@ class CanonicalDomainUpdater(ElasticMixin, App): def __init__(self, process_name: str, descr: str) -> None: super().__init__(process_name, descr) + self.pit_id: Optional[str] = None + self.keep_alive: str = "" self.es_client: Optional[Elasticsearch] = None self.batch_size: int = 0 self.updates_buffer: List[Dict[str, Any]] = [] @@ -77,6 +79,13 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="The elasticsearch query format (supported values are: [DSL, query_string])", ) + ap.add_argument( + "--keep_alive", + dest="keep_alive", + default="1m", + help="How long should Elasticsearch keep the PIT alive", + ) + def process_args(self) -> None: """ Process command line arguments and set instance variables. @@ -100,6 +109,7 @@ def process_args(self) -> None: self.buffer_size = int(args.buffer_size) self.query = args.query self.query_format = args.query_format + self.keep_alive = args.keep_alive def initialize(self) -> None: """ @@ -112,6 +122,11 @@ def initialize(self) -> None: app.args = parser.parse_args() app.process_args() self.es_client = self.elasticsearch_client() + self.pit_id = self.es_client.open_point_in_time( + index=self.index_name, + keep_alive=self.keep_alive, + ).get("id") + logger.info("Successfully opened Point-in-Time with ID %s", self.pit_id) def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ @@ -175,7 +190,7 @@ def get_document_count(self, query: Dict[str, Any]) -> Optional[int]: def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: """ - Get documents that need to be updated using scroll API + Get documents that need to be updated using search_after Yields: Document dictionaries @@ -190,19 +205,42 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: logger.info( "Found a total of [%s] documents to update", self.total_matched_docs ) - - for hit in scan( - client=self.es_client, - query=query, - index=self.index_name, - size=self.batch_size, - preserve_order=True, - ): - yield { - "index": hit["_index"], - "source": hit["_source"], - "id": hit["_id"], - } + # Add a sort by "_doc" (the most efficient sort order) for "search_after" tracking + # See https://www.elastic.co/guide/en/elasticsearch/reference/current/sort-search-results.html + query["sort"] = [{"_doc": "asc"}] + + # Limit the number of results returned per search query + query["size"] = self.batch_size + + # Add PIT to the query + query["pit"] = {"id": self.pit_id, "keep_alive": self.keep_alive} + + search_after = None + + while True: + if search_after: + # Update the query with the last sort values to continue the pagination + query["search_after"] = search_after + + # Fetch the next batch of documents + response = self.es_client.search(body=query) + hits = response["hits"]["hits"] + + # Each result will return a PIT ID which may change, thus we just need to update it + self.pit_id = response.get("pit_id") + + if not hits: + # No more documents to process, exit the loop + break + + for hit in hits: + yield { + "index": hit["_index"], + "source": hit["_source"], + "id": hit["_id"], + } + # Since we are sorting in ascending order, lets get the last sort value to use for "search_after" + search_after = hits[-1]["sort"] else: raise Exception(error) @@ -301,6 +339,13 @@ def main(self) -> None: # Ensure the buffer is flushed if it's not empty before finishing execution if self.updates_buffer: self.flush_updates() + # Ensure we can close the PIT opened + if isinstance(self.es_client, Elasticsearch) and self.pit_id: + response = self.es_client.close_point_in_time(id=self.pit_id) + if response.get("succeeded"): + logger.info( + "Successfully closed Point-in-Time with ID %s", self.pit_id + ) if __name__ == "__main__": From bf2cbea37ae55e9f30532caa7a36c0a02d13acbb Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Wed, 13 Nov 2024 17:39:57 +0300 Subject: [PATCH 5/7] fix: Improve application implementation after review --- bin/run-elastic-canonical-domain-update.sh | 5 - bin/run-elastic-update-canonical-domain.sh | 5 + ....py => elastic-update-canonical-domain.py} | 101 +++++------------- 3 files changed, 33 insertions(+), 78 deletions(-) delete mode 100755 bin/run-elastic-canonical-domain-update.sh create mode 100755 bin/run-elastic-update-canonical-domain.sh rename indexer/scripts/{elastic-canonical-domain-update.py => elastic-update-canonical-domain.py} (72%) diff --git a/bin/run-elastic-canonical-domain-update.sh b/bin/run-elastic-canonical-domain-update.sh deleted file mode 100755 index 92165b81..00000000 --- a/bin/run-elastic-canonical-domain-update.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -. bin/func.sh - -run_python indexer.scripts.elastic-canonical-domain-update "$@" diff --git a/bin/run-elastic-update-canonical-domain.sh b/bin/run-elastic-update-canonical-domain.sh new file mode 100755 index 00000000..4e3e92aa --- /dev/null +++ b/bin/run-elastic-update-canonical-domain.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +. bin/func.sh + +run_python indexer.scripts.elastic-update-canonical-domain "$@" diff --git a/indexer/scripts/elastic-canonical-domain-update.py b/indexer/scripts/elastic-update-canonical-domain.py similarity index 72% rename from indexer/scripts/elastic-canonical-domain-update.py rename to indexer/scripts/elastic-update-canonical-domain.py index 7e919bc4..dcc63502 100644 --- a/indexer/scripts/elastic-canonical-domain-update.py +++ b/indexer/scripts/elastic-update-canonical-domain.py @@ -10,10 +10,10 @@ from indexer.app import App from indexer.elastic import ElasticMixin -logger = getLogger("elastic-canonical-domain-updater") +logger = getLogger("elastic-update-canonical-domain") -class CanonicalDomainUpdater(ElasticMixin, App): +class CanonicalDomainUpdate(ElasticMixin, App): def __init__(self, process_name: str, descr: str) -> None: super().__init__(process_name, descr) @@ -89,15 +89,6 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: def process_args(self) -> None: """ Process command line arguments and set instance variables. - - Sets the following instance attributes: - index_name (str): Name of the Elasticsearch index to operate on - batch_size (int): Number of documents to retrieve per batch from Elasticsearch - buffer_size (int): Maximum number of update operations to buffer before flushing to Elasticsearch - query (str): Elasticsearch query string to filter documents for processing - - Note: - Calls the parent class's process_args() first to handle any inherited argument processing. """ super().process_args() @@ -111,23 +102,6 @@ def process_args(self) -> None: self.query_format = args.query_format self.keep_alive = args.keep_alive - def initialize(self) -> None: - """ - Initializes the Elasticsearch instance and sets up application arguments. - Returns: - None - """ - parser = argparse.ArgumentParser() - app.define_options(parser) - app.args = parser.parse_args() - app.process_args() - self.es_client = self.elasticsearch_client() - self.pit_id = self.es_client.open_point_in_time( - index=self.index_name, - keep_alive=self.keep_alive, - ).get("id") - logger.info("Successfully opened Point-in-Time with ID %s", self.pit_id) - def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ Builds a query for searching documents whose canonical domain must be updated, the query is validated by @@ -139,24 +113,23 @@ def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: try: assert self.es_client - if ( - self.query_format == "query_string" - ): # This is query in the format canonical_domain:mediacloud.org - # Construct the query in the "query_string" format based on the value passed by the user + if self.query_format == "query_string": query = {"query": {"query_string": {"query": self.query}}} else: - # Try to parse the string as JSON first query = json.loads(self.query) - # Extract just the query part if it exists query_body = query.get("query", query) - # Use the Elasticsearch validate API to ensure the query is ok validation_result = self.es_client.indices.validate_query( index=self.index_name, body={"query": query_body}, explain=True ) if validation_result["valid"]: + self.pit_id = self.es_client.open_point_in_time( + index=self.index_name, + keep_alive=self.keep_alive, + ).get("id") + logger.info("Successfully opened Point-in-Time with ID %s", self.pit_id) return True, query, None else: error_msg = "No detailed explanation is available" @@ -185,10 +158,10 @@ def get_document_count(self, query: Dict[str, Any]) -> Optional[int]: if count is not None: return int(count) except Exception as e: - logger.error("Error getting document count: %s", e) + logger.exception("Error getting document count: %s", e) return None - def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: + def fetch_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: """ Get documents that need to be updated using search_after @@ -196,7 +169,6 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: Document dictionaries """ try: - # Build the query from the string passed by the user as an arg, the build process also validates the query success, query, error = self.build_query() if success: @@ -209,10 +181,7 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: # See https://www.elastic.co/guide/en/elasticsearch/reference/current/sort-search-results.html query["sort"] = [{"_doc": "asc"}] - # Limit the number of results returned per search query query["size"] = self.batch_size - - # Add PIT to the query query["pit"] = {"id": self.pit_id, "keep_alive": self.keep_alive} search_after = None @@ -230,7 +199,6 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: self.pit_id = response.get("pit_id") if not hits: - # No more documents to process, exit the loop break for hit in hits: @@ -245,7 +213,7 @@ def get_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: raise Exception(error) except Exception as e: - logger.error(e) + logger.exception(e) def queue_canonical_domain_update(self, doc_data: Dict[str, Any]) -> None: """ @@ -256,12 +224,10 @@ def queue_canonical_domain_update(self, doc_data: Dict[str, Any]) -> None: doc_data: Dictionary containing document data with 'source.url', 'index', and 'id' fields """ try: - # Determine the canonical domain from mcmetadata, the URL we have on these document is a canonical URL canonical_domain = mcmetadata.urls.canonical_domain( doc_data["source"]["url"] ) - # Create an update action where we update only the canonical domain by Document ID update_action = { "_op_type": "update", "_index": doc_data["index"], @@ -273,13 +239,12 @@ def queue_canonical_domain_update(self, doc_data: Dict[str, Any]) -> None: self.updates_buffer.append(update_action) - # Check if the buffer is full and flush updates if len(self.updates_buffer) >= self.buffer_size: - self.flush_updates() + self.bulk_update() except Exception as e: - logger.error("Error processing document %s: %s", doc_data.get("id"), e) + logger.exception("Error processing document %s: %s", doc_data.get("id"), e) - def flush_updates(self) -> None: + def bulk_update(self) -> None: """ Flush the buffered updates to Elasticsearch """ @@ -287,36 +252,30 @@ def flush_updates(self) -> None: return try: assert self.es_client - # Perform bulk update success, failed = bulk( client=self.es_client, actions=self.updates_buffer, refresh=False, raise_on_error=False, ) - - is_failed_list = isinstance(failed, list) - if is_failed_list: - assert isinstance(failed, list) + if isinstance(failed, list): failed_count = len(failed) + for error in failed: + logger.error("Failed to update: [%s]", error) else: - assert isinstance(failed, int) failed_count = failed - logger.info("Bulk update: %s successful, %s failed", success, failed_count) - - if is_failed_list: - assert isinstance(failed, list) - for error in failed: - logger.error("Failed to update: %s", error) + logger.info( + "Bulk update summary: %s successful, %s failed", success, failed_count + ) except Exception as e: - logger.error("Bulk update failed: %s", e) + logger.exception("Bulk update failed: %s", e) finally: # Clear the buffer self.updates_buffer = [] - def main(self) -> None: + def main_loop(self) -> None: """ - Main execution method for processing canonical domain updates to documents. + Main loop execution method for processing canonical domain updates to documents. This method serves as the entry point for the application logic. It initializes the application, retrieves documents that require updates, and processes them by queuing @@ -328,18 +287,14 @@ def main(self) -> None: None """ try: - # Initialize the app - self.initialize() - # Get a buffer of documents that need to be updated - for doc in self.get_documents_to_update(): + self.es_client = self.elasticsearch_client() + for doc in self.fetch_documents_to_update(): self.queue_canonical_domain_update(doc) except Exception as e: logger.fatal(e) finally: - # Ensure the buffer is flushed if it's not empty before finishing execution if self.updates_buffer: - self.flush_updates() - # Ensure we can close the PIT opened + self.bulk_update() if isinstance(self.es_client, Elasticsearch) and self.pit_id: response = self.es_client.close_point_in_time(id=self.pit_id) if response.get("succeeded"): @@ -349,7 +304,7 @@ def main(self) -> None: if __name__ == "__main__": - app = CanonicalDomainUpdater( - "elastic-canonical-domain-updater", "Updates canonical domain" + app = CanonicalDomainUpdate( + "elastic-update-canonical-domain", "Updates canonical domain" ) app.main() From 311ae4c63a1f5f1a6c62a54bf6996ec010108520 Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Fri, 15 Nov 2024 13:57:11 +0300 Subject: [PATCH 6/7] Refractor implementation to validate query when parsing args --- .../elastic-update-canonical-domain.py | 171 +++++++----------- 1 file changed, 68 insertions(+), 103 deletions(-) diff --git a/indexer/scripts/elastic-update-canonical-domain.py b/indexer/scripts/elastic-update-canonical-domain.py index dcc63502..b58400d5 100644 --- a/indexer/scripts/elastic-update-canonical-domain.py +++ b/indexer/scripts/elastic-update-canonical-domain.py @@ -1,7 +1,7 @@ import argparse import json from logging import getLogger -from typing import Any, Dict, Generator, List, Literal, Optional, Tuple +from typing import Any, Dict, Generator, List, Literal, Optional import mcmetadata from elasticsearch import Elasticsearch @@ -18,32 +18,17 @@ class CanonicalDomainUpdate(ElasticMixin, App): def __init__(self, process_name: str, descr: str) -> None: super().__init__(process_name, descr) self.pit_id: Optional[str] = None + self.query: Optional[Dict[str, Any]] = {} self.keep_alive: str = "" - self.es_client: Optional[Elasticsearch] = None + self._es_client: Optional[Elasticsearch] = None self.batch_size: int = 0 self.updates_buffer: List[Dict[str, Any]] = [] self.buffer_size: int = 0 self.index_name: str = "" - self.query: str = "" self.query_format: Literal["DLS", "query_string"] = "query_string" self.total_matched_docs: Optional[int] = None def define_options(self, ap: argparse.ArgumentParser) -> None: - """ - Define command line arguments for the script. - Extends the parent class argument definitions. - - Args: - ap (argparse.ArgumentParser): The argument parser instance to add arguments to - - Adds the following arguments: - --index: Name of the Elasticsearch index to update - --batch: Batch size for document fetching (default: 1000) - --buffer: Size of update operation buffer (default: 2000) - --query: Elasticsearch query string for filtering documents - Returns: - None - """ super().define_options(ap) ap.add_argument( "--index", @@ -91,69 +76,67 @@ def process_args(self) -> None: Process command line arguments and set instance variables. """ super().process_args() - + assert self.args args = self.args - assert args - self.index_name = args.index self.batch_size = args.batch_size self.buffer_size = int(args.buffer_size) - self.query = args.query - self.query_format = args.query_format self.keep_alive = args.keep_alive + self.query_format = args.query_format + self.query = self.validate_query(args.query) - def build_query(self) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: + @property + def es_client(self) -> Elasticsearch: + if self._es_client is None: + self._es_client = self.elasticsearch_client() + return self._es_client + + def validate_query(self, query: str) -> Optional[Dict[str, Any]]: """ - Builds a query for searching documents whose canonical domain must be updated, the query is validated by - the Elasticsearch _validate API, before execution + Validates the query using the Elasticsearch _validate API, opens Point in time Returns: - Tuple of (is_valid, parsed_query, error_message) + Validated query """ + validated_query = None try: - assert self.es_client - if self.query_format == "query_string": - query = {"query": {"query_string": {"query": self.query}}} + query_dict = {"query": {"query_string": {"query": query}}} else: - query = json.loads(self.query) - - query_body = query.get("query", query) - + query_dict = json.loads(query) validation_result = self.es_client.indices.validate_query( - index=self.index_name, body={"query": query_body}, explain=True + index=self.index_name, body=query_dict, explain=True ) - if validation_result["valid"]: self.pit_id = self.es_client.open_point_in_time( - index=self.index_name, - keep_alive=self.keep_alive, + index=self.index_name, keep_alive=self.keep_alive ).get("id") logger.info("Successfully opened Point-in-Time with ID %s", self.pit_id) - return True, query, None + validated_query = query_dict else: error_msg = "No detailed explanation is available" if "error" in validation_result: error_msg = f"Invalid Query - {validation_result['error']}" - return False, None, error_msg + logger.error(error_msg) except json.JSONDecodeError as e: - return False, None, f"Invalid Query: Invalid JSON format - {str(e)}" + logger.exception("Invalid Query: Invalid JSON format - {%s}", e) except Exception as e: - return False, None, f"Invalid Query: Validation error - {str(e)}" + logger.exception("Invalid Query: Validation error - {%s}", e) + finally: + return validated_query - def get_document_count(self, query: Dict[str, Any]) -> Optional[int]: + def fetch_document_count(self) -> Optional[int]: """ Get the total number of documents matching the query - Args: - query: Elasticsearch query - Returns: Total number of matching documents """ try: assert self.es_client - count_response = self.es_client.count(index=self.index_name, body=query) + count_response = self.es_client.count( + index=self.index_name, body=self.query + ) count = count_response.get("count") if count is not None: return int(count) @@ -169,49 +152,43 @@ def fetch_documents_to_update(self) -> Generator[Dict[str, Any], None, None]: Document dictionaries """ try: - success, query, error = self.build_query() - - if success: - assert query and self.es_client - self.total_matched_docs = self.get_document_count(query) - logger.info( - "Found a total of [%s] documents to update", self.total_matched_docs - ) - # Add a sort by "_doc" (the most efficient sort order) for "search_after" tracking - # See https://www.elastic.co/guide/en/elasticsearch/reference/current/sort-search-results.html - query["sort"] = [{"_doc": "asc"}] - - query["size"] = self.batch_size - query["pit"] = {"id": self.pit_id, "keep_alive": self.keep_alive} - - search_after = None - - while True: - if search_after: - # Update the query with the last sort values to continue the pagination - query["search_after"] = search_after - - # Fetch the next batch of documents - response = self.es_client.search(body=query) - hits = response["hits"]["hits"] - - # Each result will return a PIT ID which may change, thus we just need to update it - self.pit_id = response.get("pit_id") - - if not hits: - break - - for hit in hits: - yield { - "index": hit["_index"], - "source": hit["_source"], - "id": hit["_id"], - } - # Since we are sorting in ascending order, lets get the last sort value to use for "search_after" - search_after = hits[-1]["sort"] - else: - raise Exception(error) - + assert self.query and self.es_client + self.total_matched_docs = self.fetch_document_count() + logger.info( + "Found a total of [%s] documents to update", self.total_matched_docs + ) + # Add a sort by "_doc" (the most efficient sort order) for "search_after" tracking + # See https://www.elastic.co/guide/en/elasticsearch/reference/current/sort-search-results.html + self.query["sort"] = [{"_doc": "asc"}] + + self.query["size"] = self.batch_size + self.query["pit"] = {"id": self.pit_id, "keep_alive": self.keep_alive} + + search_after = None + + while True: + if search_after: + # Update the query with the last sort values to continue the pagination + self.query["search_after"] = search_after + + # Fetch the next batch of documents + response = self.es_client.search(body=self.query) + hits = response["hits"]["hits"] + + # Each result will return a PIT ID which may change, thus we just need to update it + self.pit_id = response.get("pit_id") + + if not hits: + break + + for hit in hits: + yield { + "index": hit["_index"], + "source": hit["_source"], + "id": hit["_id"], + } + # Since we are sorting in ascending order, lets get the last sort value to use for "search_after" + search_after = hits[-1]["sort"] except Exception as e: logger.exception(e) @@ -274,20 +251,8 @@ def bulk_update(self) -> None: self.updates_buffer = [] def main_loop(self) -> None: - """ - Main loop execution method for processing canonical domain updates to documents. - - This method serves as the entry point for the application logic. It initializes the - application, retrieves documents that require updates, and processes them by queuing - updates for the canonical domain. Any exceptions encountered during execution are - logged as fatal errors. Finally, it ensures that any remaining updates in the buffer - are flushed before execution completes. - - Returns: - None - """ try: - self.es_client = self.elasticsearch_client() + assert self.query and self.es_client for doc in self.fetch_documents_to_update(): self.queue_canonical_domain_update(doc) except Exception as e: From 26f4afcfe5a124d2f64e3c273fe642e52f0fe406 Mon Sep 17 00:00:00 2001 From: Michael Hudson Nkotagu Date: Fri, 15 Nov 2024 16:35:20 +0300 Subject: [PATCH 7/7] Update args to include query_string | simple_query_string as options --- .../scripts/elastic-update-canonical-domain.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/indexer/scripts/elastic-update-canonical-domain.py b/indexer/scripts/elastic-update-canonical-domain.py index b58400d5..ca525333 100644 --- a/indexer/scripts/elastic-update-canonical-domain.py +++ b/indexer/scripts/elastic-update-canonical-domain.py @@ -25,7 +25,7 @@ def __init__(self, process_name: str, descr: str) -> None: self.updates_buffer: List[Dict[str, Any]] = [] self.buffer_size: int = 0 self.index_name: str = "" - self.query_format: Literal["DLS", "query_string"] = "query_string" + self.query_type: Optional[Literal["query_string", "simple_query_string"]] = None self.total_matched_docs: Optional[int] = None def define_options(self, ap: argparse.ArgumentParser) -> None: @@ -57,11 +57,10 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: ) ap.add_argument( - "--format", - dest="query_format", - default="query_string", - choices=["DSL", "query_string"], - help="The elasticsearch query format (supported values are: [DSL, query_string])", + "--type", + dest="query_type", + choices=["query_string", "simple_query_string"], + help="The elasticsearch query format (supported values are: [query_string, simple_query_string])", ) ap.add_argument( @@ -82,7 +81,7 @@ def process_args(self) -> None: self.batch_size = args.batch_size self.buffer_size = int(args.buffer_size) self.keep_alive = args.keep_alive - self.query_format = args.query_format + self.query_type = args.query_type self.query = self.validate_query(args.query) @property @@ -100,8 +99,10 @@ def validate_query(self, query: str) -> Optional[Dict[str, Any]]: """ validated_query = None try: - if self.query_format == "query_string": + if self.query_type == "query_string": query_dict = {"query": {"query_string": {"query": query}}} + elif self.query_type == "simple_query_string": + query_dict = {"query": {"simple_query_string": {"query": query}}} else: query_dict = json.loads(query) validation_result = self.es_client.indices.validate_query(