From 67792445b24d1ad801475569f09234b0d29aa1b2 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Mon, 30 Oct 2023 11:28:53 +0100 Subject: [PATCH 01/14] Recursively retry smaller batch when OOM error is given --- src/api/batch_predictor.py | 87 +++++++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 5 deletions(-) diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index ea4eacb7..fd50a8b1 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -118,10 +118,8 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, batch_info = list(zip(batch_groups, batch_identifiers)) # Here, make the batch prediction - # TODO: if OOM, split the batch into halves and try again for each - # half try: - predictions = batch_predict( + predictions = safe_batch_predict( model, batch_images, batch_info, utils, decode_batch_predictions, output_path, normalize_confidence) @@ -185,7 +183,6 @@ def create_model(model_path: str, from custom_layers import ResidualBlock from model import CERMetric, WERMetric, CTCLoss - from custom_layers import ResidualBlock from utils import Utils, load_model_from_directory logger = logging.getLogger(__name__) @@ -211,6 +208,86 @@ def create_model(model_path: str, return model, utils +def safe_batch_predict(model: tf.keras.Model, + batch_images: List[tf.Tensor], + batch_info: List[Tuple[str, str]], + utils: object, + decode_batch_predictions: Callable, + output_path: str, + normalize_confidence: Callable) -> List[str]: + """ + Attempt to predict on a batch of images using the provided model. If a + TensorFlow Out of Memory (OOM) error occurs, the batch is split in half and + each half is attempted again, recursively. If an OOM error occurs with a + batch of size 1, the offending image is logged and skipped. + + Parameters + ---------- + model : TensorFlow model + The model used for making predictions. + batch_images : List or ndarray + A list or numpy array of images for which predictions need to be made. + batch_info : List of tuples + A list of tuples containing additional information (e.g., group and + identifier) for each image in `batch_images`. + utils : module or object + Utility module/object containing necessary utility functions or + settings. + decode_batch_predictions : function + A function to decode the predictions made by the model. + output_path : str + Path where any output files should be saved. + normalize_confidence : function + A function to normalize the confidence of the predictions. + logger : Logger + A logging.Logger object for logging messages. + + Returns + ------- + List + A list of predictions made by the model. If an image causes an OOM + error, it is skipped, and no prediction is returned for it. + """ + + logger = logging.getLogger(__name__) + try: + return batch_predict( + model, batch_images, batch_info, utils, + decode_batch_predictions, output_path, + normalize_confidence) + except tf.errors.ResourceExhaustedError: + # If the batch size is 1 and still causing OOM, then skip the image and + # return an empty list + if len(batch_images) == 1: + logger.error( + "OOM error with single image. Skipping image" + f"{batch_info[0][1]}.") + return [] + + logger.warning( + f"OOM error with batch size {len(batch_images)}. Splitting batch " + "in half and retrying.") + + # Splitting batch in half + mid_index = len(batch_images) // 2 + first_half_images = batch_images[:mid_index] + second_half_images = batch_images[mid_index:] + first_half_info = batch_info[:mid_index] + second_half_info = batch_info[mid_index:] + + # Recursive calls for each half + first_half_predictions = safe_batch_predict( + model, first_half_images, first_half_info, utils, + decode_batch_predictions, output_path, + normalize_confidence) + second_half_predictions = safe_batch_predict( + model, second_half_images, second_half_info, utils, + decode_batch_predictions, output_path, + normalize_confidence) + + return first_half_predictions + second_half_predictions + + def batch_predict(model: tf.keras.Model, images: List[Tuple[tf.Tensor, str, str]], batch_info: List[Tuple[str, str]], @@ -255,7 +332,7 @@ def batch_predict(model: tf.keras.Model, # Unpack the batch groups, identifiers = zip(*batch_info) - logger.info("Making predictions...") + logger.info(f"Making {len(images)} predictions...") encoded_predictions = model.predict_on_batch(images) logger.debug("Predictions made") From 2a8a8dcb3781328d2933e818dd9b4e7c527ab7b1 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Tue, 31 Oct 2023 10:57:31 +0100 Subject: [PATCH 02/14] Readd standard +50 width padding --- src/api/image_preparator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index 3bab9a7d..b381cfc1 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -191,6 +191,10 @@ def prepare_image(identifier: str, image = tf.image.resize(image, [target_height, target_width]) + image = tf.image.resize_with_pad(image, + target_height, + target_width + 50) + # Normalize the image and something else image = 0.5 - (image / 255) From 62ecf71ef0bb7503f688a43542943dfba32d817f Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Wed, 8 Nov 2023 15:58:42 +0100 Subject: [PATCH 03/14] Require charlist.txt in model directory --- src/api/app_utils.py | 5 ++--- src/api/batch_predictor.py | 23 +++++++++++------------ src/api/flask_app.py | 2 -- src/api/gunicorn_app.py | 2 -- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/api/app_utils.py b/src/api/app_utils.py index 61316796..a1702a2e 100644 --- a/src/api/app_utils.py +++ b/src/api/app_utils.py @@ -151,7 +151,7 @@ def get_env_variable(var_name: str, default_value: str = None) -> str: def start_processes(batch_size: int, max_queue_size: int, model_path: str, - charlist_path: str, output_path: str, gpus: str): + output_path: str, gpus: str): logger = logging.getLogger(__name__) # Create a thread-safe Queue @@ -179,8 +179,7 @@ def start_processes(batch_size: int, max_queue_size: int, model_path: str, prediction_process = Process( target=batch_prediction_worker, args=(prepared_queue, model_path, - charlist_path, output_path, - gpus), + output_path, gpus), name="Batch Prediction Process") prediction_process.daemon = True prediction_process.start() diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index fd50a8b1..77dcd7ff 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -17,7 +17,6 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, model_path: str, - charlist_path: str, output_path: str, gpus: str = '0'): """ @@ -34,8 +33,6 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, Queue from which preprocessed images are fetched. model_path : str Path to the model file. - charlist_path : str - Path to the character list file. output_path : str Path where predictions should be saved. gpus : str, optional @@ -56,7 +53,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, os.environ['CUDA_VISIBLE_DEVICES'] = str(gpus) physical_devices = tf.config.experimental.list_physical_devices('GPU') - logger.debug(f"Number of GPUs available: {len(physical_devices)}") + logger.info(f"Number of GPUs available: {len(physical_devices)}") if physical_devices: all_gpus_support_mixed_precision = True @@ -99,7 +96,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, try: with strategy.scope(): - model, utils = create_model(model_path, charlist_path) + model, utils = create_model(model_path) logger.info("Model created and utilities initialized") except Exception as e: logger.error(e) @@ -155,8 +152,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, "Batch Prediction Worker process interrupted. Exiting...") -def create_model(model_path: str, - charlist_path: str) -> Tuple[tf.keras.Model, object]: +def create_model(model_path: str) -> Tuple[tf.keras.Model, object]: """ Load a pre-trained model and create utility methods. @@ -164,8 +160,6 @@ def create_model(model_path: str, ---------- model_path : str Path to the pre-trained model file. - charlist_path : str - Path to the character list file. Returns ------- @@ -195,13 +189,18 @@ def create_model(model_path: str, 'ResidualBlock': ResidualBlock } model = load_model_from_directory(model_path, custom_objects) - logger.info("Model loaded successfully") + logger.info(f"Model {model.name} loaded successfully") if logger.isEnabledFor(logging.DEBUG): model.summary() - with open(charlist_path) as file: - charlist = list(char for char in file.read()) + try: + with open(f"{model_path}/charlist.txt") as file: + charlist = list(char for char in file.read()) + except FileNotFoundError: + logger.error("charlist.txt not found at {model_path}. Exiting...") + sys.exit(1) + utils = Utils(charlist, use_mask=True) logger.debug("Utilities initialized") diff --git a/src/api/flask_app.py b/src/api/flask_app.py index 431a5d71..c6c65183 100644 --- a/src/api/flask_app.py +++ b/src/api/flask_app.py @@ -56,7 +56,6 @@ def create_app(request_queue) -> Flask: # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") model_path = get_env_variable("LOGHI_MODEL_PATH") - charlist_path = get_env_variable("LOGHI_CHARLIST_PATH") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -71,7 +70,6 @@ def create_app(request_queue) -> Flask: batch_size, max_queue_size, model_path, - charlist_path, output_path, gpus ) diff --git a/src/api/gunicorn_app.py b/src/api/gunicorn_app.py index 7cc58bcf..db1a887a 100644 --- a/src/api/gunicorn_app.py +++ b/src/api/gunicorn_app.py @@ -85,7 +85,6 @@ def load(self): # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") model_path = get_env_variable("LOGHI_MODEL_PATH") - charlist_path = get_env_variable("LOGHI_CHARLIST_PATH") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -100,7 +99,6 @@ def load(self): batch_size, max_queue_size, model_path, - charlist_path, output_path, gpus ) From b7d6f88e39e1460b3d425e8b80087182dc54f36d Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Wed, 8 Nov 2023 16:01:07 +0100 Subject: [PATCH 04/14] Missing 'f' in f-string --- src/api/batch_predictor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 77dcd7ff..540d622b 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -198,7 +198,7 @@ def create_model(model_path: str) -> Tuple[tf.keras.Model, object]: with open(f"{model_path}/charlist.txt") as file: charlist = list(char for char in file.read()) except FileNotFoundError: - logger.error("charlist.txt not found at {model_path}. Exiting...") + logger.error(f"charlist.txt not found at {model_path}. Exiting...") sys.exit(1) utils = Utils(charlist, use_mask=True) From 1e45b6e96614d904e0153b894ff90941c821ef4c Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Thu, 9 Nov 2023 10:32:02 +0100 Subject: [PATCH 05/14] Add model switching support Model can now be switched dynamically using the "model" field. Model path is no longer required as env variable. --- src/api/app_utils.py | 26 ++++++++------ src/api/batch_predictor.py | 27 ++++++++------- src/api/flask_app.py | 2 -- src/api/gunicorn_app.py | 2 -- src/api/image_preparator.py | 67 +++++++++++++++++++++++++------------ src/api/routes.py | 7 ++-- src/api/start_local_app.sh | 2 -- 7 files changed, 80 insertions(+), 53 deletions(-) diff --git a/src/api/app_utils.py b/src/api/app_utils.py index a1702a2e..bb4ce740 100644 --- a/src/api/app_utils.py +++ b/src/api/app_utils.py @@ -62,25 +62,27 @@ def setup_logging(level: str = "INFO") -> logging.Logger: return logging.getLogger(__name__) -def extract_request_data() -> Tuple[bytes, str, str]: +def extract_request_data() -> Tuple[bytes, str, str, str]: """ Extract image and other form data from the current request. Returns ------- - tuple of (bytes, str, str) + tuple of (bytes, str, str, str) image_content : bytes Content of the uploaded image. group_id : str ID of the group from form data. identifier : str Identifier from form data. + model : str + Location of the model to use for prediction. Raises ------ ValueError - If required data (image, group_id, identifier) is missing or if the - image format is invalid. + If required data (image, group_id, identifier, model) is missing or if + the image format is invalid. """ # Extract the uploaded image @@ -106,7 +108,13 @@ def extract_request_data() -> Tuple[bytes, str, str]: if not identifier: raise ValueError("No identifier provided.") - return image_content, group_id, identifier + model = request.form.get('model') + if not model: + raise ValueError("No model provided.") + if not os.path.exists(model): + raise ValueError(f"Model directory {model} does not exist.") + + return image_content, group_id, identifier, model def get_env_variable(var_name: str, default_value: str = None) -> str: @@ -150,7 +158,7 @@ def get_env_variable(var_name: str, default_value: str = None) -> str: return value -def start_processes(batch_size: int, max_queue_size: int, model_path: str, +def start_processes(batch_size: int, max_queue_size: int, output_path: str, gpus: str): logger = logging.getLogger(__name__) @@ -168,8 +176,7 @@ def start_processes(batch_size: int, max_queue_size: int, model_path: str, logger.info("Starting image preparation process") preparation_process = Process( target=image_preparation_worker, - args=(batch_size, request_queue, - prepared_queue, model_path), + args=(batch_size, request_queue, prepared_queue), name="Image Preparation Process") preparation_process.daemon = True preparation_process.start() @@ -178,8 +185,7 @@ def start_processes(batch_size: int, max_queue_size: int, model_path: str, logger.info("Starting batch prediction process") prediction_process = Process( target=batch_prediction_worker, - args=(prepared_queue, model_path, - output_path, gpus), + args=(prepared_queue, output_path, gpus), name="Batch Prediction Process") prediction_process.daemon = True prediction_process.start() diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 540d622b..24f49219 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -16,7 +16,6 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, - model_path: str, output_path: str, gpus: str = '0'): """ @@ -31,8 +30,6 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, ---------- prepared_queue : multiprocessing.JoinableQueue Queue from which preprocessed images are fetched. - model_path : str - Path to the model file. output_path : str Path where predictions should be saved. gpus : str, optional @@ -94,26 +91,30 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, strategy = tf.distribute.MirroredStrategy() - try: - with strategy.scope(): - model, utils = create_model(model_path) - logger.info("Model created and utilities initialized") - except Exception as e: - logger.error(e) - logger.error("Error creating model. Exiting...") - return - total_predictions = 0 + old_model = None try: while True: - batch_images, batch_groups, batch_identifiers = \ + batch_images, batch_groups, batch_identifiers, model_path = \ prepared_queue.get() logger.debug(f"Retrieved batch of size {len(batch_images)} from " "prepared_queue") batch_info = list(zip(batch_groups, batch_identifiers)) + if model_path != old_model: + old_model = model_path + try: + logger.info("Model changed, adjusting batch prediction") + with strategy.scope(): + model, utils = create_model(model_path) + logger.info("Model created and utilities initialized") + except Exception as e: + logger.error(e) + logger.error("Error creating model. Exiting...") + return + # Here, make the batch prediction try: predictions = safe_batch_predict( diff --git a/src/api/flask_app.py b/src/api/flask_app.py index c6c65183..1cdf7106 100644 --- a/src/api/flask_app.py +++ b/src/api/flask_app.py @@ -55,7 +55,6 @@ def create_app(request_queue) -> Flask: # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") - model_path = get_env_variable("LOGHI_MODEL_PATH") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -69,7 +68,6 @@ def create_app(request_queue) -> Flask: request_queue, preparation_process, prediction_process = start_processes( batch_size, max_queue_size, - model_path, output_path, gpus ) diff --git a/src/api/gunicorn_app.py b/src/api/gunicorn_app.py index db1a887a..f5c37b0c 100644 --- a/src/api/gunicorn_app.py +++ b/src/api/gunicorn_app.py @@ -84,7 +84,6 @@ def load(self): # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") - model_path = get_env_variable("LOGHI_MODEL_PATH") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -98,7 +97,6 @@ def load(self): request_queue, preparation_process, prediction_process = start_processes( batch_size, max_queue_size, - model_path, output_path, gpus ) diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index b381cfc1..6b4b2016 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -14,8 +14,7 @@ def image_preparation_worker(batch_size: int, request_queue: multiprocessing.Queue, - prepared_queue: multiprocessing.Queue, - model_path: str): + prepared_queue: multiprocessing.Queue): """ Worker process to prepare images for batch processing. @@ -30,8 +29,6 @@ def image_preparation_worker(batch_size: int, Queue from which raw images are fetched. prepared_queue : multiprocessing.Queue Queue to which prepared images are pushed. - model_path : str - Path to the model. Side Effects ------------ @@ -44,19 +41,12 @@ def image_preparation_worker(batch_size: int, # Disable GPU visibility to prevent memory allocation issues tf.config.set_visible_devices([], 'GPU') - try: - num_channels = get_model_channels(model_path) - except Exception as e: - logger.error(f"Error: {e}") - logger.error("Error retrieving number of channels. Exiting...") - return - logger.debug(f"Input channels: {num_channels}") - # Define the maximum time to wait for new images TIMEOUT_DURATION = 1 MAX_WAIT_COUNT = 1 wait_count = 0 + old_model = None try: while True: @@ -64,10 +54,40 @@ def image_preparation_worker(batch_size: int, while len(batch_images) < batch_size: try: - image, group, identifier = request_queue.get( + image, group, identifier, model_path = request_queue.get( timeout=TIMEOUT_DURATION) logger.debug(f"Retrieved {identifier} from request_queue") + if model_path != old_model: + old_model = model_path + + logger.info( + "Model changed, adjusting image preparation") + + if batch_images: + # Add the existing batch to the prepared_queue + logger.info( + f"Sending old batch of {len(batch_images)} " + "images") + pad_batch(batch_images) + prepared_queue.put( + (np.array(batch_images), batch_groups, + batch_identifiers, old_model)) + # Reset the batches after sending + batch_images, batch_groups, batch_identifiers = \ + [], [], [] + try: + num_channels = get_model_channels(model_path) + logger.debug( + f"New number of channels: " + f"{num_channels}") + except Exception as e: + logger.error(f"Error: {e}") + logger.error( + "Error retrieving number of channels. " + "Exiting...") + return + image = prepare_image(identifier, image, num_channels) logger.debug( @@ -89,19 +109,14 @@ def image_preparation_worker(batch_size: int, if wait_count > MAX_WAIT_COUNT and len(batch_images) > 0: break - # Determine the maximum width among all images in the batch - max_width = max(image.shape[0] for image in batch_images) - - # Resize each image in the batch to the maximum width - for i in range(len(batch_images)): - batch_images[i] = pad_to_width( - batch_images[i], max_width, -10) + pad_batch(batch_images) logger.info(f"Prepared batch of {len(batch_images)} images") # Push the prepared batch to the prepared_queue prepared_queue.put( - (np.array(batch_images), batch_groups, batch_identifiers)) + (np.array(batch_images), batch_groups, batch_identifiers, + old_model)) logger.debug("Pushed prepared batch to prepared_queue") logger.debug( f"{request_queue.qsize()} images waiting to be processed") @@ -115,6 +130,16 @@ def image_preparation_worker(batch_size: int, logger.error(f"Error: {e}") +def pad_batch(batch_images): + # Determine the maximum width among all images in the batch + max_width = max(image.shape[0] for image in batch_images) + + # Resize each image in the batch to the maximum width + for i in range(len(batch_images)): + batch_images[i] = pad_to_width( + batch_images[i], max_width, -10) + + def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): """ Pads a transposed image (where the first dimension is width) to a specified diff --git a/src/api/routes.py b/src/api/routes.py index 8b1c3fd3..5a8d906e 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -46,16 +46,17 @@ def predict() -> flask.Response: # Add incoming request to queue # Here, we're just queuing the raw data. - image_file, group_id, identifier = extract_request_data() + image_file, group_id, identifier, model = extract_request_data() logger = logging.getLogger(__name__) logger.debug(f"Data received: {group_id}, {identifier}") logger.debug(f"Adding {identifier} to queue") + logger.debug(f"Using model {model}") try: - app.request_queue.put((image_file, group_id, identifier), block=True, - timeout=30) + app.request_queue.put((image_file, group_id, identifier, model), + block=True, timeout=30) except Full: response = jsonify({ "status": "error", diff --git a/src/api/start_local_app.sh b/src/api/start_local_app.sh index 4f132bc7..00637609 100755 --- a/src/api/start_local_app.sh +++ b/src/api/start_local_app.sh @@ -3,8 +3,6 @@ export GUNICORN_WORKERS=1 export GUNICORN_THREADS=1 export GUNICORN_ACCESSLOG='-' -export LOGHI_MODEL_PATH="/home/tim/Downloads/new_model/" -export LOGHI_CHARLIST_PATH="/home/tim/Downloads/new_model/charlist.txt" export LOGHI_BATCH_SIZE=300 export LOGHI_OUTPUT_PATH="/home/tim/Documents/development/loghi-htr/output/" export LOGHI_MAX_QUEUE_SIZE=50000 From d8cc285588903f5d46bd5bdfb462d29b580ef12e Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Thu, 9 Nov 2023 10:38:05 +0100 Subject: [PATCH 06/14] Fix model switching channels bug --- src/api/image_preparator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index 6b4b2016..b1a20566 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -59,11 +59,8 @@ def image_preparation_worker(batch_size: int, logger.debug(f"Retrieved {identifier} from request_queue") if model_path != old_model: - old_model = model_path - logger.info( "Model changed, adjusting image preparation") - if batch_images: # Add the existing batch to the prepared_queue logger.info( @@ -76,6 +73,8 @@ def image_preparation_worker(batch_size: int, # Reset the batches after sending batch_images, batch_groups, batch_identifiers = \ [], [], [] + + old_model = model_path try: num_channels = get_model_channels(model_path) logger.debug( From 7436b1a62af04f19beb1c0c5cf2e8d776ca3a268 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Thu, 9 Nov 2023 16:27:14 +0100 Subject: [PATCH 07/14] Improve code readability --- src/api/app_utils.py | 2 + src/api/batch_predictor.py | 90 ++++++++++++------------ src/api/image_preparator.py | 132 +++++++++++++++++++++++++++--------- src/api/routes.py | 2 +- 4 files changed, 150 insertions(+), 76 deletions(-) diff --git a/src/api/app_utils.py b/src/api/app_utils.py index bb4ce740..6e624c43 100644 --- a/src/api/app_utils.py +++ b/src/api/app_utils.py @@ -166,11 +166,13 @@ def start_processes(batch_size: int, max_queue_size: int, logger.info("Initializing request queue") manager = Manager() request_queue = manager.JoinableQueue(maxsize=max_queue_size//2) + logger.info(f"Request queue size: {max_queue_size//2}") # Max size of prepared queue is half of the max size of request queue # expressed in number of batches max_prepared_queue_size = max_queue_size // 2 // batch_size prepared_queue = manager.JoinableQueue(maxsize=max_prepared_queue_size) + logger.info(f"Prediction queue size: {max_prepared_queue_size}") # Start the image preparation process logger.info("Starting image preparation process") diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 24f49219..06890d63 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -8,8 +8,6 @@ from typing import Callable, List, Tuple import gc -# > Local dependencies - # > Third-party dependencies import tensorflow as tf from tensorflow.keras import mixed_precision @@ -43,56 +41,30 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, - Logs various messages regarding the batch processing status. """ - logger = logging.getLogger(__name__) - logger.info("Batch Prediction Worker process started") - - # Only use the specified GPUs - os.environ['CUDA_VISIBLE_DEVICES'] = str(gpus) - - physical_devices = tf.config.experimental.list_physical_devices('GPU') - logger.info(f"Number of GPUs available: {len(physical_devices)}") - if physical_devices: - all_gpus_support_mixed_precision = True - - for device in physical_devices: - tf.config.experimental.set_memory_growth(device, True) - logger.debug(device) - - # Get the compute capability of the GPU - details = tf.config.experimental.get_device_details(device) - major = details.get('compute_capability')[0] - - # Check if the compute capability is less than 7.0 - if int(major) < 7: - all_gpus_support_mixed_precision = False - logger.debug( - f"GPU {device} does not support efficient mixed precision." - ) - break - - # If all GPUs support mixed precision, enable it - if all_gpus_support_mixed_precision: - mixed_precision.set_global_policy('mixed_float16') - logger.debug("Mixed precision set to 'mixed_float16'") - else: - logger.debug( - "Not all GPUs support efficient mixed precision. Running in " - "standard mode.") - else: - logger.warning("No GPUs available") - # Add parent directory to path for imports current_path = os.path.dirname(os.path.realpath(__file__)) parent_path = os.path.dirname(current_path) - sys.path.append(parent_path) from utils import decode_batch_predictions, normalize_confidence + logger = logging.getLogger(__name__) + logger.info("Batch Prediction Worker process started") + + # If all GPUs support mixed precision, enable it + gpus_support_mixed_precision = setup_gpu_environment(gpus, logger) + if gpus_support_mixed_precision: + mixed_precision.set_global_policy('mixed_float16') + logger.debug("Mixed precision set to 'mixed_float16'") + else: + logger.debug( + "Not all GPUs support efficient mixed precision. Running in " + "standard mode.") + strategy = tf.distribute.MirroredStrategy() total_predictions = 0 - old_model = None + old_model_path = None try: while True: @@ -103,8 +75,8 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, batch_info = list(zip(batch_groups, batch_identifiers)) - if model_path != old_model: - old_model = model_path + if model_path != old_model_path: + old_model_path = model_path try: logger.info("Model changed, adjusting batch prediction") with strategy.scope(): @@ -153,6 +125,36 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, "Batch Prediction Worker process interrupted. Exiting...") +def setup_gpu_environment(gpus: str, logger: logging.Logger): + """ + Setup the GPU environment for batch prediction. + + Parameters + ---------- + gpus : str + IDs of GPUs to be used (comma-separated). + logger : logging.Logger + A logging.Logger object for logging messages. + + Returns + ------- + bool + True if all GPUs support mixed precision, False otherwise. + """ + + os.environ['CUDA_VISIBLE_DEVICES'] = gpus + physical_devices = tf.config.list_physical_devices('GPU') + if not physical_devices: + logger.warning("No GPUs found. Running in CPU mode.") + return False + for device in physical_devices: + tf.config.experimental.set_memory_growth(device, True) + if tf.config.experimental.\ + get_device_details(device)['compute_capability'][0] < 7: + return False + return True + + def create_model(model_path: str) -> Tuple[tf.keras.Model, object]: """ Load a pre-trained model and create utility methods. diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index b1a20566..2f75a156 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -47,17 +47,17 @@ def image_preparation_worker(batch_size: int, wait_count = 0 old_model = None + batch_images, batch_groups, batch_identifiers = [], [], [] try: while True: - batch_images, batch_groups, batch_identifiers = [], [], [] - while len(batch_images) < batch_size: try: image, group, identifier, model_path = request_queue.get( timeout=TIMEOUT_DURATION) logger.debug(f"Retrieved {identifier} from request_queue") + # Check if the model has changed if model_path != old_model: logger.info( "Model changed, adjusting image preparation") @@ -66,39 +66,30 @@ def image_preparation_worker(batch_size: int, logger.info( f"Sending old batch of {len(batch_images)} " "images") - pad_batch(batch_images) - prepared_queue.put( - (np.array(batch_images), batch_groups, - batch_identifiers, old_model)) + # Reset the batches after sending batch_images, batch_groups, batch_identifiers = \ - [], [], [] + pad_and_queue_batch(batch_images, batch_groups, + batch_identifiers, + prepared_queue, old_model) old_model = model_path - try: - num_channels = get_model_channels(model_path) - logger.debug( - f"New number of channels: " - f"{num_channels}") - except Exception as e: - logger.error(f"Error: {e}") - logger.error( - "Error retrieving number of channels. " - "Exiting...") - return + num_channels = update_channels(model_path, logger) image = prepare_image(identifier, image, num_channels) - logger.debug( f"Prepared image {identifier} with shape: " f"{image.shape}") + # Append the image to the batch batch_images.append(image) batch_groups.append(group) batch_identifiers.append(identifier) request_queue.task_done() wait_count = 0 + + # If no new images are available, wait for a while except Empty: wait_count += 1 logger.debug( @@ -108,19 +99,13 @@ def image_preparation_worker(batch_size: int, if wait_count > MAX_WAIT_COUNT and len(batch_images) > 0: break - pad_batch(batch_images) - - logger.info(f"Prepared batch of {len(batch_images)} images") - - # Push the prepared batch to the prepared_queue - prepared_queue.put( - (np.array(batch_images), batch_groups, batch_identifiers, - old_model)) - logger.debug("Pushed prepared batch to prepared_queue") + # Add the existing batch to the prepared_queue + batch_images, batch_groups, batch_identifiers = \ + pad_and_queue_batch(batch_images, batch_groups, + batch_identifiers, prepared_queue, + old_model) logger.debug( f"{request_queue.qsize()} images waiting to be processed") - logger.debug( - f"{prepared_queue.qsize()} batches ready for prediction") except KeyboardInterrupt: logger.warning( @@ -129,7 +114,90 @@ def image_preparation_worker(batch_size: int, logger.error(f"Error: {e}") -def pad_batch(batch_images): +def pad_and_queue_batch(batch_images: np.ndarray, + batch_groups: list, + batch_identifiers: list, + prepared_queue: multiprocessing.Queue, + model_path: str) -> tuple: + """ + Pad and queue a batch of images for prediction. + + Parameters + ---------- + batch_images : np.ndarray + Batch of images to be padded and queued. + batch_groups : list + List of groups to which the images belong. + batch_identifiers : list + List of identifiers for the images. + prepared_queue : multiprocessing.Queue + Queue to which the padded batch should be pushed. + model_path : str + Path to the model used for image preparation. + + Returns + ------- + tuple + Tuple containing the empty batch images, groups, and identifiers. + """ + + logger = logging.getLogger(__name__) + + # Pad the batch + padded_batch = pad_batch(batch_images) + + # Push the prepared batch to the prepared_queue + prepared_queue.put( + (np.array(padded_batch), batch_groups, batch_identifiers, model_path)) + logger.debug("Pushed prepared batch to prepared_queue") + logger.debug( + f"{prepared_queue.qsize()} batches ready for prediction") + + return [], [], [] + + +def update_channels(model_path: str, logger): + """ + Update the model used for image preparation. + + Parameters + ---------- + model_path : str + The path to the directory containing the 'config.json' file. + The function will append "/config.json" to this path. + logger : logging.Logger + Logger object to log messages. + """ + + try: + num_channels = get_model_channels(model_path) + logger.debug( + f"New number of channels: " + f"{num_channels}") + return num_channels + except Exception as e: + logger.error(f"Error: {e}") + logger.error( + "Error retrieving number of channels. " + "Exiting...") + return + + +def pad_batch(batch_images: np.ndarray) -> np.ndarray: + """ + Pad a batch of images to the same width. + + Parameters + ---------- + batch_images : np.ndarray + Batch of images to be padded. + + Returns + ------- + np.ndarray + Batch of padded images. + """ + # Determine the maximum width among all images in the batch max_width = max(image.shape[0] for image in batch_images) @@ -138,6 +206,8 @@ def pad_batch(batch_images): batch_images[i] = pad_to_width( batch_images[i], max_width, -10) + return batch_images + def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): """ diff --git a/src/api/routes.py b/src/api/routes.py index 5a8d906e..166ba773 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -56,7 +56,7 @@ def predict() -> flask.Response: try: app.request_queue.put((image_file, group_id, identifier, model), - block=True, timeout=30) + block=True, timeout=15) except Full: response = jsonify({ "status": "error", From 4ace30da8cc042b3d929f0d5b3213c59e1dc6756 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Thu, 9 Nov 2023 16:32:50 +0100 Subject: [PATCH 08/14] Update README for API model switching --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3cad8c5f..6140fff6 100644 --- a/README.md +++ b/README.md @@ -303,8 +303,6 @@ GUNICORN_ACCESSLOG # Default: "-": Access log settings. **Loghi-HTR Options:** ```bash -LOGHI_MODEL_PATH # Path to the model. -LOGHI_CHARLIST_PATH # Path to the character list. LOGHI_BATCH_SIZE # Default: "256": Batch size for processing. LOGHI_OUTPUT_PATH # Directory where predictions are saved. LOGHI_MAX_QUEUE_SIZE # Default: "10000": Maximum size of the processing queue. @@ -323,10 +321,10 @@ You can set these variables in your shell or use a script. An example script to Once the API is up and running, you can send HTR requests using curl. Here's how: ```bash -curl -X POST -F "image=@$input_path" -F "group_id=$group_id" -F "identifier=$filename" http://localhost:5000/predict +curl -X POST -F "image=@$input_path" -F "group_id=$group_id" -F "identifier=$filename" -F "model=$model_path" http://localhost:5000/predict ``` -Replace `$input_path`, `$group_id`, and `$filename` with your specific values. The model processes the image, predicts the handwritten text, and saves the predictions in the specified output path (from the `LOGHI_OUTPUT_PATH` environment variable). +Replace `$input_path`, `$group_id`, `$filename`, and `$model_path` with your specific values. The model processes the image, predicts the handwritten text, and saves the predictions in the specified output path (from the `LOGHI_OUTPUT_PATH` environment variable). --- From 9ae95fcb28d1f6845e6a7cc38802f13fed2da175 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 11:13:02 +0100 Subject: [PATCH 09/14] Write .error file on uncatched prediction errors --- src/api/batch_predictor.py | 10 +++++++ src/api/image_preparator.py | 59 ++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 06890d63..78355b66 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -99,6 +99,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, logger.error("Failed batch:") for id in batch_identifiers: logger.error(id) + output_error(output_path, id, e) predictions = [] # Update the total number of predictions made @@ -264,6 +265,8 @@ def safe_batch_predict(model: tf.keras.Model, logger.error( "OOM error with single image. Skipping image" f"{batch_info[0][1]}.") + + output_error(output_path, batch_info[0][1], "OOM error") return [] logger.warning( @@ -406,3 +409,10 @@ def output_predictions(predictions: List[Tuple[float, str]], f.write(text + "\n") return outputs + + +def output_error(output_dir: str, identifier: str, text: str): + if not os.path.exists(output_dir): + os.makedirs(output_dir) + with open(os.path.join(output_dir, identifier + ".error"), "w") as f: + f.write(text + "\n") diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index 2f75a156..a36e82b5 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -76,7 +76,7 @@ def image_preparation_worker(batch_size: int, old_model = model_path num_channels = update_channels(model_path, logger) - image = prepare_image(identifier, image, num_channels) + image = prepare_image(image, num_channels) logger.debug( f"Prepared image {identifier} with shape: " f"{image.shape}") @@ -156,6 +156,32 @@ def pad_and_queue_batch(batch_images: np.ndarray, return [], [], [] +def pad_batch(batch_images: np.ndarray) -> np.ndarray: + """ + Pad a batch of images to the same width. + + Parameters + ---------- + batch_images : np.ndarray + Batch of images to be padded. + + Returns + ------- + np.ndarray + Batch of padded images. + """ + + # Determine the maximum width among all images in the batch + max_width = max(image.shape[0] for image in batch_images) + + # Resize each image in the batch to the maximum width + for i in range(len(batch_images)): + batch_images[i] = pad_to_width( + batch_images[i], max_width, -10) + + return batch_images + + def update_channels(model_path: str, logger): """ Update the model used for image preparation. @@ -183,32 +209,6 @@ def update_channels(model_path: str, logger): return -def pad_batch(batch_images: np.ndarray) -> np.ndarray: - """ - Pad a batch of images to the same width. - - Parameters - ---------- - batch_images : np.ndarray - Batch of images to be padded. - - Returns - ------- - np.ndarray - Batch of padded images. - """ - - # Determine the maximum width among all images in the batch - max_width = max(image.shape[0] for image in batch_images) - - # Resize each image in the batch to the maximum width - for i in range(len(batch_images)): - batch_images[i] = pad_to_width( - batch_images[i], max_width, -10) - - return batch_images - - def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): """ Pads a transposed image (where the first dimension is width) to a specified @@ -251,8 +251,7 @@ def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): return tf.pad(image, padding, "CONSTANT", constant_values=pad_value) -def prepare_image(identifier: str, - image_bytes: bytes, +def prepare_image(image_bytes: bytes, num_channels: int) -> tf.Tensor: """ Prepare a raw image for batch processing. @@ -262,8 +261,6 @@ def prepare_image(identifier: str, Parameters ---------- - identifier : str - Identifier of the image (used for logging). image_bytes : bytes Raw bytes of the image. num_channels : int From ffaeb67c6ecc7af77aa514ea0e1a5947a0a8498c Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 11:23:31 +0100 Subject: [PATCH 10/14] Improve error output --- src/api/batch_predictor.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 78355b66..55dcf83f 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -97,9 +97,9 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, logger.error(e) logger.error("Error making predictions. Skipping batch.") logger.error("Failed batch:") - for id in batch_identifiers: + for group, id in batch_info: logger.error(id) - output_error(output_path, id, e) + output_prediction_error(output_path, group, id, e) predictions = [] # Update the total number of predictions made @@ -258,7 +258,7 @@ def safe_batch_predict(model: tf.keras.Model, model, batch_images, batch_info, utils, decode_batch_predictions, output_path, normalize_confidence) - except tf.errors.ResourceExhaustedError: + except tf.errors.ResourceExhaustedError as e: # If the batch size is 1 and still causing OOM, then skip the image and # return an empty list if len(batch_images) == 1: @@ -266,7 +266,8 @@ def safe_batch_predict(model: tf.keras.Model, "OOM error with single image. Skipping image" f"{batch_info[0][1]}.") - output_error(output_path, batch_info[0][1], "OOM error") + output_prediction_error( + output_path, batch_info[0][0], batch_info[0][1], e) return [] logger.warning( @@ -411,8 +412,9 @@ def output_predictions(predictions: List[Tuple[float, str]], return outputs -def output_error(output_dir: str, identifier: str, text: str): +def output_prediction_error(output_path: str, group_id: str, identifier: str, text: str): + output_dir = os.path.join(output_path, group_id) if not os.path.exists(output_dir): os.makedirs(output_dir) with open(os.path.join(output_dir, identifier + ".error"), "w") as f: - f.write(text + "\n") + f.write(str(text) + "\n") From affd9f97ed8268a1b5c0bd7c4539b5c8201cc981 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 11:41:18 +0100 Subject: [PATCH 11/14] Formatting of output_prediction_error code --- src/api/batch_predictor.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 55dcf83f..468ffe33 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -412,7 +412,25 @@ def output_predictions(predictions: List[Tuple[float, str]], return outputs -def output_prediction_error(output_path: str, group_id: str, identifier: str, text: str): +def output_prediction_error(output_path: str, + group_id: str, + identifier: str, + text: str): + """ + Output an error message to a file. + + Parameters + ---------- + output_path : str + Base path where prediction outputs should be saved. + group_id : str + Group ID of the image. + identifier : str + Identifier of the image. + text : str + Error message to be saved. + """ + output_dir = os.path.join(output_path, group_id) if not os.path.exists(output_dir): os.makedirs(output_dir) From 1f1ca34f5d6cab39c718b205ef688b0a027a4afe Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 14:18:20 +0100 Subject: [PATCH 12/14] Switch functions for readability --- src/api/image_preparator.py | 54 ++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index a36e82b5..df4ff52e 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -182,33 +182,6 @@ def pad_batch(batch_images: np.ndarray) -> np.ndarray: return batch_images -def update_channels(model_path: str, logger): - """ - Update the model used for image preparation. - - Parameters - ---------- - model_path : str - The path to the directory containing the 'config.json' file. - The function will append "/config.json" to this path. - logger : logging.Logger - Logger object to log messages. - """ - - try: - num_channels = get_model_channels(model_path) - logger.debug( - f"New number of channels: " - f"{num_channels}") - return num_channels - except Exception as e: - logger.error(f"Error: {e}") - logger.error( - "Error retrieving number of channels. " - "Exiting...") - return - - def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): """ Pads a transposed image (where the first dimension is width) to a specified @@ -251,6 +224,33 @@ def pad_to_width(image: tf.Tensor, target_width: int, pad_value: float): return tf.pad(image, padding, "CONSTANT", constant_values=pad_value) +def update_channels(model_path: str, logger): + """ + Update the model used for image preparation. + + Parameters + ---------- + model_path : str + The path to the directory containing the 'config.json' file. + The function will append "/config.json" to this path. + logger : logging.Logger + Logger object to log messages. + """ + + try: + num_channels = get_model_channels(model_path) + logger.debug( + f"New number of channels: " + f"{num_channels}") + return num_channels + except Exception as e: + logger.error(f"Error: {e}") + logger.error( + "Error retrieving number of channels. " + "Exiting...") + return + + def prepare_image(image_bytes: bytes, num_channels: int) -> tf.Tensor: """ From ccf4595365651db33af0b39ec504e44d41640801 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 16:54:07 +0100 Subject: [PATCH 13/14] Readd LOGHI_MODEL_PATH default model --- README.md | 3 ++- src/api/app_utils.py | 14 +++++++------- src/api/batch_predictor.py | 15 ++++++++++++++- src/api/flask_app.py | 4 +++- src/api/gunicorn_app.py | 4 +++- src/api/image_preparator.py | 12 +++++++++--- src/api/start_local_app.sh | 1 + 7 files changed, 39 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 6140fff6..521ee289 100644 --- a/README.md +++ b/README.md @@ -303,6 +303,7 @@ GUNICORN_ACCESSLOG # Default: "-": Access log settings. **Loghi-HTR Options:** ```bash +LOGHI_MODEL_PATH # Path to the model. LOGHI_BATCH_SIZE # Default: "256": Batch size for processing. LOGHI_OUTPUT_PATH # Directory where predictions are saved. LOGHI_MAX_QUEUE_SIZE # Default: "10000": Maximum size of the processing queue. @@ -324,7 +325,7 @@ Once the API is up and running, you can send HTR requests using curl. Here's how curl -X POST -F "image=@$input_path" -F "group_id=$group_id" -F "identifier=$filename" -F "model=$model_path" http://localhost:5000/predict ``` -Replace `$input_path`, `$group_id`, `$filename`, and `$model_path` with your specific values. The model processes the image, predicts the handwritten text, and saves the predictions in the specified output path (from the `LOGHI_OUTPUT_PATH` environment variable). +Replace `$input_path`, `$group_id`, `$filename`, and `$model_path` with your specific values. The model processes the image, predicts the handwritten text, and saves the predictions in the specified output path (from the `LOGHI_OUTPUT_PATH` environment variable). The `model` field is optional, and allows you to dynamically switch the model used. --- diff --git a/src/api/app_utils.py b/src/api/app_utils.py index 6e624c43..bf21039e 100644 --- a/src/api/app_utils.py +++ b/src/api/app_utils.py @@ -109,10 +109,9 @@ def extract_request_data() -> Tuple[bytes, str, str, str]: raise ValueError("No identifier provided.") model = request.form.get('model') - if not model: - raise ValueError("No model provided.") - if not os.path.exists(model): - raise ValueError(f"Model directory {model} does not exist.") + if model: + if not os.path.exists(model): + raise ValueError(f"Model directory {model} does not exist.") return image_content, group_id, identifier, model @@ -159,7 +158,7 @@ def get_env_variable(var_name: str, default_value: str = None) -> str: def start_processes(batch_size: int, max_queue_size: int, - output_path: str, gpus: str): + output_path: str, gpus: str, model_path: str): logger = logging.getLogger(__name__) # Create a thread-safe Queue @@ -178,7 +177,8 @@ def start_processes(batch_size: int, max_queue_size: int, logger.info("Starting image preparation process") preparation_process = Process( target=image_preparation_worker, - args=(batch_size, request_queue, prepared_queue), + args=(batch_size, request_queue, + prepared_queue, model_path), name="Image Preparation Process") preparation_process.daemon = True preparation_process.start() @@ -187,7 +187,7 @@ def start_processes(batch_size: int, max_queue_size: int, logger.info("Starting batch prediction process") prediction_process = Process( target=batch_prediction_worker, - args=(prepared_queue, output_path, gpus), + args=(prepared_queue, output_path, model_path, gpus), name="Batch Prediction Process") prediction_process.daemon = True prediction_process.start() diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 468ffe33..1f6d4e52 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -15,6 +15,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, output_path: str, + model_path: str, gpus: str = '0'): """ Worker process for batch prediction on images. @@ -30,6 +31,8 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, Queue from which preprocessed images are fetched. output_path : str Path where predictions should be saved. + model_path : str + Path to the initial model file. gpus : str, optional IDs of GPUs to be used (comma-separated). Default is '0'. @@ -63,8 +66,18 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, strategy = tf.distribute.MirroredStrategy() + # Create the model and utilities + try: + with strategy.scope(): + model, utils = create_model(model_path) + logger.info("Model created and utilities initialized") + except Exception as e: + logger.error(e) + logger.error("Error creating model. Exiting...") + return + total_predictions = 0 - old_model_path = None + old_model_path = model_path try: while True: diff --git a/src/api/flask_app.py b/src/api/flask_app.py index 1cdf7106..19041342 100644 --- a/src/api/flask_app.py +++ b/src/api/flask_app.py @@ -56,6 +56,7 @@ def create_app(request_queue) -> Flask: # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) + model_path = get_env_variable("LOGHI_MODEL_PATH") output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -69,7 +70,8 @@ def create_app(request_queue) -> Flask: batch_size, max_queue_size, output_path, - gpus + gpus, + model_path ) # Create and run the Flask app diff --git a/src/api/gunicorn_app.py b/src/api/gunicorn_app.py index f5c37b0c..aad27192 100644 --- a/src/api/gunicorn_app.py +++ b/src/api/gunicorn_app.py @@ -85,6 +85,7 @@ def load(self): # Get Loghi-HTR options from environment variables logger.info("Getting Loghi-HTR options from environment variables") batch_size = int(get_env_variable("LOGHI_BATCH_SIZE", "256")) + model_path = get_env_variable("LOGHI_MODEL_PATH") output_path = get_env_variable("LOGHI_OUTPUT_PATH") max_queue_size = int(get_env_variable("LOGHI_MAX_QUEUE_SIZE", "10000")) @@ -98,7 +99,8 @@ def load(self): batch_size, max_queue_size, output_path, - gpus + gpus, + model_path ) options = { diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index df4ff52e..b362b913 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -14,7 +14,8 @@ def image_preparation_worker(batch_size: int, request_queue: multiprocessing.Queue, - prepared_queue: multiprocessing.Queue): + prepared_queue: multiprocessing.Queue, + model_path: str): """ Worker process to prepare images for batch processing. @@ -29,6 +30,8 @@ def image_preparation_worker(batch_size: int, Queue from which raw images are fetched. prepared_queue : multiprocessing.Queue Queue to which prepared images are pushed. + model_path : str + Path to the initial model used for image preparation. Side Effects ------------ @@ -41,12 +44,15 @@ def image_preparation_worker(batch_size: int, # Disable GPU visibility to prevent memory allocation issues tf.config.set_visible_devices([], 'GPU') + # Define the number of channels for the images + num_channels = update_channels(model_path, logger) + # Define the maximum time to wait for new images TIMEOUT_DURATION = 1 MAX_WAIT_COUNT = 1 wait_count = 0 - old_model = None + old_model = model_path batch_images, batch_groups, batch_identifiers = [], [], [] try: @@ -58,7 +64,7 @@ def image_preparation_worker(batch_size: int, logger.debug(f"Retrieved {identifier} from request_queue") # Check if the model has changed - if model_path != old_model: + if model_path and model_path != old_model: logger.info( "Model changed, adjusting image preparation") if batch_images: diff --git a/src/api/start_local_app.sh b/src/api/start_local_app.sh index 00637609..8ed5dacd 100755 --- a/src/api/start_local_app.sh +++ b/src/api/start_local_app.sh @@ -4,6 +4,7 @@ export GUNICORN_THREADS=1 export GUNICORN_ACCESSLOG='-' export LOGHI_BATCH_SIZE=300 +export LOGHI_MODEL_PATH="/home/tim/Downloads/new_model" export LOGHI_OUTPUT_PATH="/home/tim/Documents/development/loghi-htr/output/" export LOGHI_MAX_QUEUE_SIZE=50000 From c1f8ddb52ce01eee948ba7932d5283cd80b21298 Mon Sep 17 00:00:00 2001 From: Tim Koornstra Date: Fri, 10 Nov 2023 17:35:09 +0100 Subject: [PATCH 14/14] Provide extra warnings and info for model switching --- README.md | 11 +++++++++-- src/api/batch_predictor.py | 2 +- src/api/image_preparator.py | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 521ee289..ae373b52 100644 --- a/README.md +++ b/README.md @@ -322,10 +322,17 @@ You can set these variables in your shell or use a script. An example script to Once the API is up and running, you can send HTR requests using curl. Here's how: ```bash -curl -X POST -F "image=@$input_path" -F "group_id=$group_id" -F "identifier=$filename" -F "model=$model_path" http://localhost:5000/predict +curl -X POST -F "image=@$input_path" -F "group_id=$group_id" -F "identifier=$filename" http://localhost:5000/predict ``` -Replace `$input_path`, `$group_id`, `$filename`, and `$model_path` with your specific values. The model processes the image, predicts the handwritten text, and saves the predictions in the specified output path (from the `LOGHI_OUTPUT_PATH` environment variable). The `model` field is optional, and allows you to dynamically switch the model used. +Replace `$input_path`, `$group_id`, and `$filename` with your respective file paths and identifiers. If you're considering switching the recognition model, use the `model` field cautiously: + +- The `model` field (`-F "model=$model_path"`) allows for specifying which handwritten text recognition model the API should use for the current request. +- To avoid the slowdown associated with loading different models for each request, it is preferable to set a specific model before starting your API by using the `LOGHI_MODEL_PATH` environment variable. +- Only use the `model` field if you are certain that a different model is needed for a particular task and you understand its performance characteristics. + +> [!WARNING] +> Continuous model switching with `$model_path` can lead to severe processing delays. For most users, it's best to set the `LOGHI_MODEL_PATH` once and use the same model consistently, restarting the API with a new variable only when necessary. --- diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 1f6d4e52..85101a3b 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -91,7 +91,7 @@ def batch_prediction_worker(prepared_queue: multiprocessing.JoinableQueue, if model_path != old_model_path: old_model_path = model_path try: - logger.info("Model changed, adjusting batch prediction") + logger.warning("Model changed, adjusting batch prediction") with strategy.scope(): model, utils = create_model(model_path) logger.info("Model created and utilities initialized") diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index b362b913..bcaa10ea 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -65,7 +65,7 @@ def image_preparation_worker(batch_size: int, # Check if the model has changed if model_path and model_path != old_model: - logger.info( + logger.warning( "Model changed, adjusting image preparation") if batch_images: # Add the existing batch to the prepared_queue