From 7f88d10270f8786bd38535f3374a6bad51a249a4 Mon Sep 17 00:00:00 2001 From: Patrick Leary Date: Wed, 20 Dec 2023 12:22:00 -0500 Subject: [PATCH] test data exporter and testing process use async/await for faster overall processing --- export_data.py | 9 +- lib/model_test_data_export_manager.py | 81 +++++++ lib/model_test_data_exporter.py | 311 ++++++++++++-------------- lib/vision_testing.py | 114 ++++++---- test_model.py | 7 +- 5 files changed, 312 insertions(+), 210 deletions(-) create mode 100644 lib/model_test_data_export_manager.py diff --git a/export_data.py b/export_data.py index 96559b1..26d3ba3 100644 --- a/export_data.py +++ b/export_data.py @@ -1,5 +1,6 @@ import click import json +import asyncio @click.command() @@ -13,16 +14,16 @@ @click.option("--taxon_id", type=int, help="Export observations in this taxon.") def test(**args): # some libraries are slow to import, so wait until command is validated and properly invoked - from lib.model_test_data_exporter import ModelTestDataExporter + from lib.model_test_data_export_manager import ModelTestDataExportManager print("\nArguments:") print(json.dumps(args, indent=4)) print("\nInitializing ModelTestDataExporter...\n") - model_test_data_exporter = ModelTestDataExporter(**args) + model_test_data_exporter = ModelTestDataExportManager(**args) print("Exporting data...\n") if "standard_set" in args and args["standard_set"]: - model_test_data_exporter.generate_standard_set() + asyncio.run(model_test_data_exporter.generate_standard_set()) else: - model_test_data_exporter.generate_from_cmd_args() + asyncio.run(model_test_data_exporter.generate_from_cmd_args()) print("\nDone\n") diff --git a/lib/model_test_data_export_manager.py b/lib/model_test_data_export_manager.py new file mode 100644 index 0000000..4c3ee2e --- /dev/null +++ b/lib/model_test_data_export_manager.py @@ -0,0 +1,81 @@ +import pandas as pd +from datetime import datetime +from lib.model_test_data_exporter import ModelTestDataExporter + + +class ModelTestDataExportManager: + def __init__(self, **args): + self.cmd_args = args + self.load_train_data_photo_ids() + + def load_train_data_photo_ids(self): + if not self.cmd_args["exclude_train_photos_path"]: + self.train_data_photo_ids = [] + return + + self.train_data_photo_ids = pd.concat( + map(lambda x: pd.read_csv(x, usecols=["photo_id"]), + self.cmd_args["exclude_train_photos_path"]) + ).drop_duplicates("photo_id").set_index("photo_id").sort_index().index + + def export_path(self, filename_addition): + currentDatetime = datetime.now() + timestamp = currentDatetime.strftime("%Y%m%d") + export_path = f'test-obs-{timestamp}' + if filename_addition: + export_path += f'-{filename_addition}' + if "filename_suffix" in self.cmd_args and self.cmd_args["filename_suffix"]: + export_path += "-" + self.cmd_args["filename_suffix"] + export_path += ".csv" + return export_path + + async def generate_from_cmd_args(self): + api_parameters = {} + if self.cmd_args["place_id"]: + api_parameters["place_id"] = self.cmd_args["place_id"] + if self.cmd_args["taxon_id"]: + api_parameters["taxon_id"] = self.cmd_args["taxon_id"] + + parameters_string = None + if api_parameters: + parameters_string = "-".join(map(lambda key: f'{key}-{api_parameters[key]}', + api_parameters)) + export_path = self.export_path(parameters_string) + exporter = ModelTestDataExporter( + export_path=export_path, + max_results=self.cmd_args["limit"], + parameters=api_parameters, + train_data_photo_ids=self.train_data_photo_ids + ) + await exporter.generate() + + async def generate_standard_set(self): + files_to_generate = { + "global": {}, + "fungi": {"taxon_id": 47170}, + "insecta": {"taxon_id": 47158}, + "mammalia": {"taxon_id": 40151}, + "plantae": {"taxon_id": 47126}, + "actinopterygii": {"taxon_id": 47178}, + "reptilia": {"taxon_id": 26036}, + "amphibia": {"taxon_id": 20978}, + "arachnida": {"taxon_id": 47119}, + "aves": {"taxon_id": 3}, + "animalia": {"taxon_id": 1}, + "mollusca": {"taxon_id": 47115}, + "north-america": {"place_id": 97394}, + "south-america": {"place_id": 97389}, + "europe": {"place_id": 97391}, + "asia": {"place_id": 97395}, + "africa": {"place_id": 97392}, + "oceania": {"place_id": 97393} + } + for key in files_to_generate: + export_path = self.export_path(key) + exporter = ModelTestDataExporter( + export_path=export_path, + max_results=self.cmd_args["limit"], + parameters=files_to_generate[key], + train_data_photo_ids=self.train_data_photo_ids + ) + await exporter.generate() diff --git a/lib/model_test_data_exporter.py b/lib/model_test_data_exporter.py index df60bad..5013bd7 100644 --- a/lib/model_test_data_exporter.py +++ b/lib/model_test_data_exporter.py @@ -1,56 +1,56 @@ -import pandas as pd -import requests -import json import prison import re -from datetime import datetime +import asyncio +import aiohttp +import math class ModelTestDataExporter: + API_BASE_URL = "https://api.inaturalist.org/v2/observations" + API_REQUEST_PER_PAGE = 200 + N_WORKERS = 3 + + def __init__(self, export_path, max_results=5000, parameters={}, train_data_photo_ids=[]): + self.export_path = export_path + self.max_results = max_results + self.parameters = parameters + self.train_data_photo_ids = train_data_photo_ids + self.rows_written = 0 + self.iterations_with_zero_results = 0 + self.used_observations = {} + self.prepare_test_data_parameters() + self.create_output_file() + + async def generate(self): + async with aiohttp.ClientSession() as self.session: + await self.generate_test_data() + + def create_output_file(self): + columns = [ + "observation_id", + "observed_on", + "iconic_taxon_id", + "taxon_id", + "taxon_ancestry", + "lat", + "lng", + "photo_url" + ] + self.output_file = open(self.export_path, "w") + self.output_file.write(",".join(columns) + "\n") - def __init__(self, **args): - self.cmd_args = args - self.load_train_data_photo_ids() - - def load_train_data_photo_ids(self, train_data_paths=[]): - if not self.cmd_args["exclude_train_photos_path"]: - self.train_data_photo_ids = [] - return - self.train_data_photo_ids = pd.concat( - map(lambda x: pd.read_csv(x, usecols=["photo_id"]), - self.cmd_args["exclude_train_photos_path"]) - ).drop_duplicates("photo_id").set_index("photo_id").sort_index().index - - def generate_from_cmd_args(self): - additional_parameters = {} - if self.cmd_args["place_id"]: - additional_parameters["place_id"] = self.cmd_args["place_id"] - if self.cmd_args["taxon_id"]: - additional_parameters["taxon_id"] = self.cmd_args["taxon_id"] - currentDatetime = datetime.now() - timestamp = currentDatetime.strftime("%Y%m%d") - export_path = f'test-obs-{timestamp}' - if additional_parameters: - parameter_string = "-".join(map(lambda index: f'{index}-{additional_parameters[index]}', - additional_parameters)) - export_path += "-" + parameter_string - if "filename_suffix" in self.cmd_args and self.cmd_args["filename_suffix"]: - export_path += "-" + self.cmd_args["filename_suffix"] - export_path += ".csv" - self.generate_test_data(export_path, self.cmd_args["limit"], additional_parameters) - - def export_test_data_parameters(self, additional_parameters={}): + def prepare_test_data_parameters(self): api_parameters = {} api_parameters["quality_grade"] = "research,casual" api_parameters["rank"] = "species" api_parameters["photos"] = "true" api_parameters["geo"] = "true" api_parameters["identified"] = "true" - api_parameters["per_page"] = 200 + api_parameters["per_page"] = ModelTestDataExporter.API_REQUEST_PER_PAGE api_parameters["order_by"] = "random" api_parameters["identifications"] = "most_agree" api_parameters["ttl"] = -1 - api_parameters.update(additional_parameters) + api_parameters.update(self.parameters) print(api_parameters) fields = { @@ -74,131 +74,116 @@ def export_test_data_parameters(self, additional_parameters={}): } } api_parameters["fields"] = prison.dumps(fields) - return api_parameters - - def process_api_response(self, api_parameters, used_observations): # noqa: C901 - response = requests.get("https://api.inaturalist.org/v2/observations", - params=api_parameters) - json_object = response.json() - useable_rows = [] - for row in json_object["results"]: - if row["uuid"] in used_observations: - continue - - # must have a taxon and observed_on_details - if not row["taxon"] or "observed_on_details" not in row \ - or not row["observed_on_details"] or "taxon" not in row \ - or "iconic_taxon_id" not in row["taxon"] or not row["taxon"]["iconic_taxon_id"]: - used_observations[row["uuid"]] = True - continue - - # must pass quality metrics except wild - metric_counts = {} - for metric in row["quality_metrics"]: - if metric["metric"] not in metric_counts: - metric_counts[metric["metric"]] = 0 - if metric["agree"]: - metric_counts[metric["metric"]] += 1 - else: - metric_counts[metric["metric"]] -= 1 - if ("location" in metric_counts and metric_counts["location"] < 0) \ - or ("evidence" in metric_counts and metric_counts["evidence"] < 0) \ - or ("date" in metric_counts and metric_counts["date"] < 0) \ - or ("recent" in metric_counts and metric_counts["recent"] < 0): - used_observations[row["uuid"]] = True - continue - - # check if any photos are included in the test data - photo_in_training_data = False - for photo in row["photos"]: - if photo["id"] in self.train_data_photo_ids: - photo_in_training_data = True - break - if photo_in_training_data is True: - used_observations[row["uuid"]] = True - continue - if re.search(r"\.jpe?g", row["photos"][0]["url"]) is None: - used_observations[row["uuid"]] = True - continue - - if row["quality_grade"] == "casual" and not (row["community_taxon_id"] and row["community_taxon_id"] == row["taxon"]["id"]): - used_observations[row["uuid"]] = True - continue - - useable_rows.append(row) - used_observations[row["uuid"]] = True - return useable_rows - - def generate_test_data(self, filename, max_results=5000, additional_parameters={}): - iterations_with_zero_results = 0 - rows_to_use = [] - used_observations = {} - api_parameters = self.export_test_data_parameters(additional_parameters) - - while len(rows_to_use) < max_results and iterations_with_zero_results < 5: - print(f'Fetching more results... {len(rows_to_use)} so far') - useable_rows = self.process_api_response(api_parameters, used_observations) - print(f'{len(useable_rows)} this batch') - if not useable_rows: - iterations_with_zero_results += 1 - continue - iterations_with_zero_results = 0 - rows_to_use += useable_rows + self.api_parameters = api_parameters + + async def worker_task(self): + while not self.queue.empty(): + await self.queue.get() + try: + await self.process_api_response() + finally: + self.queue.task_done() + + async def generate_test_data(self): + while not self.finished(): + await self.fetch_more_data() + + async def fetch_more_data(self): + self.queue = asyncio.Queue(ModelTestDataExporter.N_WORKERS) + self.workers = [asyncio.create_task(self.worker_task()) + for _ in range(ModelTestDataExporter.N_WORKERS)] + min_pages_remaining = math.ceil( + (self.max_results / ModelTestDataExporter.API_REQUEST_PER_PAGE) + ) + print(f'Queueing {min_pages_remaining} workers') + for i in range(min_pages_remaining): + await self.queue.put(i) + await self.queue.join() + for worker in self.workers: + worker.cancel() + + def finished(self): + return (self.rows_written >= self.max_results) or \ + (self.iterations_with_zero_results >= 5) + + async def process_api_response(self): + if self.finished(): + return - columns = [ - "observation_id", - "observed_on", - "iconic_taxon_id", - "taxon_id", - "taxon_ancestry", - "lat", - "lng", - "photo_url" - ] - output_file = open(filename, "w") - output_file.write(",".join(columns) + "\n") - for row in rows_to_use[:max_results]: - [latitude, longitude] = row["location"].split(",") - columns_to_write = [ - row["uuid"], - row["observed_on_details"]["date"], - row["taxon"]["iconic_taxon_id"], - row["taxon"]["id"], - "/".join(map(str, row["taxon"]["ancestor_ids"])), - latitude, - longitude, - row["photos"][0]["url"].replace("square", "medium").replace("http://", "https://") - ] - output_file.write(",".join(map(str, columns_to_write)) + "\n") - - def generate_standard_set(self): - files_to_generate = { - "global": {}, - "fungi": {"taxon_id": 47170}, - "insecta": {"taxon_id": 47158}, - "mammalia": {"taxon_id": 40151}, - "plantae": {"taxon_id": 47126}, - "actinopterygii": {"taxon_id": 47178}, - "reptilia": {"taxon_id": 26036}, - "amphibia": {"taxon_id": 20978}, - "arachnida": {"taxon_id": 47119}, - "aves": {"taxon_id": 3}, - "animalia": {"taxon_id": 1}, - "mollusca": {"taxon_id": 47115}, - "north-america": {"place_id": 97394}, - "south-america": {"place_id": 97389}, - "europe": {"place_id": 97391}, - "asia": {"place_id": 97395}, - "africa": {"place_id": 97392}, - "oceania": {"place_id": 97393} - } + print(f'Fetching more results... {self.rows_written} so far') + starting_rows_written = self.rows_written + async with self.session.get(ModelTestDataExporter.API_BASE_URL, + params=self.api_parameters) as response: + json_object = await response.json() + for row in json_object["results"]: + self.process_api_response_row(row) + if self.rows_written == starting_rows_written: + self.iterations_with_zero_results += 1 + return + + self.iterations_with_zero_results = 0 + + def process_api_response_row(self, row): + if row["uuid"] in self.used_observations: + return + + # must have a taxon and observed_on_details + if not row["taxon"] or "observed_on_details" not in row \ + or not row["observed_on_details"] or "taxon" not in row \ + or "iconic_taxon_id" not in row["taxon"] or not row["taxon"]["iconic_taxon_id"]: + self.used_observations[row["uuid"]] = True + return - currentDatetime = datetime.now() - timestamp = currentDatetime.strftime("%Y%m%d") + # must pass quality metrics except wild + metric_counts = {} + for metric in row["quality_metrics"]: + if metric["metric"] not in metric_counts: + metric_counts[metric["metric"]] = 0 + if metric["agree"]: + metric_counts[metric["metric"]] += 1 + else: + metric_counts[metric["metric"]] -= 1 + if ("location" in metric_counts and metric_counts["location"] < 0) \ + or ("evidence" in metric_counts and metric_counts["evidence"] < 0) \ + or ("date" in metric_counts and metric_counts["date"] < 0) \ + or ("recent" in metric_counts and metric_counts["recent"] < 0): + self.used_observations[row["uuid"]] = True + return + + # check if any photos are included in the test data + if self.photo_in_training_data(row) is True: + self.used_observations[row["uuid"]] = True + return + if re.search(r"\.jpe?g", row["photos"][0]["url"]) is None: + self.used_observations[row["uuid"]] = True + return + + if row["quality_grade"] == "casual" and not (row["community_taxon_id"] and row["community_taxon_id"] == row["taxon"]["id"]): + self.used_observations[row["uuid"]] = True + return + + self.used_observations[row["uuid"]] = True + self.write_row(row) + self.rows_written += 1 + + def photo_in_training_data(self, row): + for photo in row["photos"]: + if photo["id"] in self.train_data_photo_ids: + return True + return False - for key in files_to_generate: - export_path = f'test-obs-{timestamp}-{key}' - if "filename_suffix" in self.cmd_args and self.cmd_args["filename_suffix"]: - export_path += "-" + self.cmd_args["filename_suffix"] - export_path += ".csv" - self.generate_test_data(export_path, self.cmd_args["limit"], files_to_generate[key]) + def write_row(self, row): + if self.rows_written >= self.max_results: + return + [latitude, longitude] = row["location"].split(",") + columns_to_write = [ + row["uuid"], + row["observed_on_details"]["date"], + row["taxon"]["iconic_taxon_id"], + row["taxon"]["id"], + "/".join(map(str, row["taxon"]["ancestor_ids"])), + latitude, + longitude, + row["photos"][0]["url"].replace("square", "medium").replace("http://", "https://") + ] + self.output_file.write(",".join(map(str, columns_to_write)) + "\n") diff --git a/lib/vision_testing.py b/lib/vision_testing.py index b45bb4d..73f21ce 100644 --- a/lib/vision_testing.py +++ b/lib/vision_testing.py @@ -5,7 +5,12 @@ import magic import time import json +import pandas as pd import tensorflow as tf +import asyncio +import aiohttp +import aiofiles +import aiofiles.os from statistics import mean from PIL import Image from lib.test_observation import TestObservation @@ -39,41 +44,58 @@ def __init__(self, config, **args): print("\n") self.upload_folder = "static/" - def run(self): - count = 0 - limit = self.cmd_args["limit"] or 100 + async def worker_task(self): + while not self.queue.empty(): + observation = await self.queue.get() + try: + if self.processed_counter >= self.limit: + continue + obs = TestObservation(observation.to_dict()) + inferrer_results = await self.test_observation_async(obs) + if inferrer_results is False: + continue + self.append_to_aggregate_results(obs, inferrer_results) + self.processed_counter += 1 + self.report_progress() + + except Exception as err: + print(f'\nObservation: {observation["observation_id"]} failed') + print(err) + + finally: + self.queue.task_done() + + async def run_async(self): + N_WORKERS = 5 + # queue = asyncio.Queue(N_WORKERS) + self.limit = self.cmd_args["limit"] or 100 target_observation_id = self.cmd_args["observation_id"] - start_time = time.time() - try: - with open(self.cmd_args["path"], "r") as csv_file: - csv_reader = csv.DictReader(csv_file, delimiter=",") - for row in csv_reader: - observation = TestObservation(row) - if target_observation_id: - # if only one target observation was requested, test this row if it - # matches the request, otherwise skip it - if int(observation.observation_id) == target_observation_id: - inferrer_results = self.test_observation(observation) - else: - continue + self.start_time = time.time() + self.queued_counter = 0 + self.processed_counter = 0 + + async with aiohttp.ClientSession() as self.session: + self.queue = asyncio.Queue(N_WORKERS) + self.workers = [asyncio.create_task(self.worker_task()) + for _ in range(N_WORKERS)] + df = pd.read_csv(self.cmd_args["path"]) + for index, observation in df.iterrows(): + if self.processed_counter >= self.limit: + break + if target_observation_id: + if observation.observation_id == target_observation_id: + await self.queue.put(observation) else: - inferrer_results = self.test_observation(observation) - if inferrer_results is False: - # there was some problem processing this test observation. Continue but - # don't increment the counter so the requested number of observations - # will still be tested continue - count += 1 - self.append_to_aggregate_results(observation, inferrer_results) - if count % 10 == 0: - total_time = round(time.time() - start_time, 3) - remaining_time = round((limit - count) / (count / total_time), 3) - print(f'Processed {count} in {total_time} sec\testimated {remaining_time} sec remaining') - if count >= limit: - return - except IOError as e: - print(e) - print("Testing run failed") + else: + await self.queue.put(observation) + + # processes the remaining queue + await self.queue.join() + # stop the workers + for worker in self.workers: + worker.cancel() + return # given an x, return the number of scores less than x. Otherwise return the number # of scores that are empty or greather than or equal to 100 (essentially the fails) @@ -168,8 +190,8 @@ def assess_top_results(self, observation, top_results): distance_scores.append(0) return match_index, distance_scores - def test_observation(self, observation): - cache_path = self.download_photo(observation.photo_url) + async def test_observation_async(self, observation): + cache_path = await self.download_photo_async(observation.photo_url) if cache_path is None or not os.path.exists(cache_path): return False if observation.lat == '' or observation.lng == '': @@ -179,7 +201,6 @@ def test_observation(self, observation): if observation.iconic_taxon_id != "" and self.cmd_args["filter_iconic"] is not False: iconic_taxon_id = int(observation.iconic_taxon_id) - inferrer_scores = {} for index, inferrer in self.inferrers.items(): lat = None @@ -220,6 +241,8 @@ def ancestor_distance_scores(self, observation, inferrer, results): return ancestor_distance_scores def append_to_aggregate_results(self, observation, inferrer_scores): + if self.processed_counter >= self.limit: + return vision_indices = set() combined_indices = set() for index, results in inferrer_scores.items(): @@ -269,15 +292,18 @@ def append_to_aggregate_results(self, observation, inferrer_scores): self.scores["top10_distance_scores"]["combined"][index].append( max(combined_taxon_distance_scores[0:10])) - # if len(combined_indices) > 1: - # print(f'Results of Observation: {observation.observation_id}: {combined_indices}') - - def download_photo(self, photo_url): + async def download_photo_async(self, photo_url): checksum = hashlib.md5(photo_url.encode()).hexdigest() cache_path = os.path.join(self.upload_folder, "obs-" + checksum) + ".jpg" - if os.path.exists(cache_path): + if await aiofiles.os.path.exists(cache_path): return cache_path - urllib.request.urlretrieve(photo_url, cache_path) + async with self.session.get(photo_url) as resp: + if resp.status == 200: + f = await aiofiles.open(cache_path, mode="wb") + await f.write(await resp.read()) + await f.close() + if not os.path.exists(cache_path): + return mime_type = magic.from_file(cache_path, mime=True) if mime_type != "image/jpeg": im = Image.open(cache_path) @@ -288,3 +314,9 @@ def download_photo(self, photo_url): def debug(self, message): if self.cmd_args["debug"]: print(message) + + def report_progress(self): + if self.processed_counter % 10 == 0: + total_time = round(time.time() - self.start_time, 3) + remaining_time = round((self.limit - self.processed_counter) / (self.processed_counter / total_time), 3) + print(f'Processed {self.processed_counter} in {total_time} sec\testimated {remaining_time} sec remaining') diff --git a/test_model.py b/test_model.py index 4a555dc..5bb98fe 100644 --- a/test_model.py +++ b/test_model.py @@ -1,6 +1,7 @@ import click import yaml import json +import asyncio CONFIG = yaml.safe_load(open("config.yml")) @@ -16,7 +17,7 @@ help="Use vision results cache.") @click.option("--cache-key", type=str, show_default=True, default="default", help="Salt to use when caching vision results.") -@click.option("--observation_id", type=int, help="Single observation ID to test.") +@click.option("--observation_id", type=str, help="Single observation UUID to test.") @click.option("--filter-iconic/--no-filter-iconic", show_default=True, default=True, help="Use iconic taxon for filtering.") @click.option("--print-tree", is_flag=True, show_default=True, default=False, @@ -30,7 +31,9 @@ def test(**args): print(json.dumps(args, indent=4)) print("\nInitializing VisionTesting...\n") testing = VisionTesting(CONFIG, **args) - testing.run() + + asyncio.run(testing.run_async()) + testing.print_scores() print("\nDone\n")