Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [NEXMANAGE-737] Enable sota cancel mode. #559

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

yengliong93
Copy link
Contributor

PULL DESCRIPTION

This PR implements the sota cancel mode. When the thread is created, it will be added to a thread list.
When the dispatcher receives the sota cancel request, the dispatcher checks the current running thread and retrieves its id. Next, it sends a termination signal to the thread.

Impact Analysis

Info Please fill out this column
Root Cause Specifically for bugs, empty in case of no variants
Jira ticket Add the name to the Jira ticket eg: "NEXMANAGE-622". Automation will do the linking to Jira

CODE MAINTAINABILITY

  • Added required new tests relevant to the changes
  • Updated Documentation as relevant to the changes
  • PR change contains code related to security
  • PR introduces changes that break compatibility with other modules/services (If YES, please provide description)
  • Run go fmt or format-python.sh as applicable
  • Update Changelog
  • Integration tests are passing
  • If Cloudadapter changes, check Azure connectivity manually

Code must act as a teacher for future developers

This PR implements the sota cancel mode. When the thread is created, it
will be added  to a thread list.
When the dispatcher receives the sota cancel request, the dispatcher
checks the current running thread and retrieves its id. Next, it sends a
termination signal to the thread.

Signed-off-by: yengliong <[email protected]>
@gblewis1
Copy link
Contributor

gblewis1 commented Oct 2, 2024

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

@yengliong93
Copy link
Contributor Author

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

Hi @gblewis1, the current implementation will terminate any OTA that's running on the active thread. I'm thinking whether we should check the OTA type and only terminate the thread if it's the SOTA download-only mode.

@gblewis1
Copy link
Contributor

gblewis1 commented Oct 3, 2024

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

Hi @gblewis1, the current implementation will terminate any OTA that's running on the active thread. I'm thinking whether we should check the OTA type and only terminate the thread if it's the SOTA download-only mode.

another consideration is, there may be an inbc process waiting on the thread to report back to it, so we want to give the thread a chance to clean up gracefully.

here's an example of sending a signal to a thread that's downloading a file

import threading
import requests
import time
import os

def download_file(url, destination, stop_event, chunk_size=1024):
    """
    Downloads a file from the given URL to the destination path.
    Periodically checks if a stop_event is set to cancel the download gracefully.
    
    :param url: URL of the file to download
    :param destination: Local path to save the downloaded file
    :param stop_event: threading.Event to signal cancellation
    :param chunk_size: Size of each chunk to read (in bytes)
    """
    try:
        with requests.get(url, stream=True, timeout=10) as response:
            response.raise_for_status()  # Raise an error for bad status codes
            total_length = response.headers.get('content-length')

            if total_length is None:
                print("Unable to determine the file size.")
                total_length = 0
            else:
                total_length = int(total_length)
                print(f"Total file size: {total_length / (1024 * 1024):.2f} MB")

            downloaded = 0
            with open(destination, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if stop_event.is_set():
                        print("Download cancellation requested. Stopping download...")
                        break
                    if chunk:  # Filter out keep-alive chunks
                        f.write(chunk)
                        downloaded += len(chunk)
                        if total_length:
                            percent = (downloaded / total_length) * 100
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB "
                                  f"({percent:.2f}%)")
                        else:
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB")
        if stop_event.is_set():
            # Optionally, delete the incomplete file
            if os.path.exists(destination):
                os.remove(destination)
                print(f"Incomplete file '{destination}' has been deleted.")
            print("Download was cancelled gracefully.")
        else:
            print(f"Download completed successfully and saved to '{destination}'.")
    except (
        requests.exceptions.RequestException,  # Catches all requests-related exceptions
        IOError  # Catches file I/O related errors
    ) as e:
        print(f"An error occurred during download: {e}")
        cleanup(destination)

def cleanup(destination):
    """
    Cleans up the incomplete download file if it exists.
    
    :param destination: Path to the incomplete file
    """
    if os.path.exists(destination):
        try:
            os.remove(destination)
            print(f"Incomplete file '{destination}' has been deleted.")
        except OSError as e:
            print(f"Error deleting incomplete file: {e}")

def controller(stop_event, delay):
    """
    Simulates an external trigger to cancel the download after a certain delay.
    
    :param stop_event: threading.Event to signal cancellation
    :param delay: Time in seconds before triggering cancellation
    """
    print(f"Controller will request cancellation in {delay} seconds.")
    time.sleep(delay)
    print("Controller is signaling the download thread to stop.")
    stop_event.set()

def main():
    # URL of a large file for demonstration purposes
    # Replace this with any large file URL for testing
    url = "https://speed.hetzner.de/100MB.bin"  # Example: 100 MB file
    destination = "downloaded_file.bin"

    # Create an Event object to signal the worker thread to stop
    stop_event = threading.Event()

    # Create and start the download thread
    download_thread = threading.Thread(target=download_file, args=(url, destination, stop_event), name="DownloadThread")
    download_thread.start()

    # Create and start the controller thread
    # For example, cancel the download after 5 seconds
    cancel_delay = 5  # seconds
    controller_thread = threading.Thread(target=controller, args=(stop_event, cancel_delay), name="ControllerThread")
    controller_thread.start()

    # Wait for both threads to complete
    download_thread.join()
    controller_thread.join()

    print("Download process has been handled.")

if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants