diff --git a/dlg-code/wallaby_hires.py b/dlg-code/wallaby_hires.py index b94c0ba..7fb1ece 100644 --- a/dlg-code/wallaby_hires.py +++ b/dlg-code/wallaby_hires.py @@ -1074,7 +1074,7 @@ def process_and_download(credentials, input_csv, processed_catalogue, timeout_se - input_csv (str): Path to the input CSV file with source names. - processed_catalogue (str): Path to the catalogue of already processed sources. - timeout_seconds (int): Timeout setting in seconds for download operations. - - project_code (str): Code of the project + - project_code (str): Code of the project. Returns: - None: Outputs the results to 'hipass_ms_file_details.csv' in the working directory. @@ -1527,3 +1527,495 @@ def download_evaluation_files(filename, project_code, credentials): tar.extractall(path=tar_file_folder_name) print(f"Extracted '{tar_file}' to '{tar_file_folder_name}'") + +# 06/02/25 Further individual functions +# Function to filter out un-processed sources +def filter_unprocessed_data(input_csv, processed_catalogue): + """ + Processes an input catalogue to filter out already processed sources + and save the unprocessed details to a new CSV. + + Parameters: + - input_csv (str): Path to the input CSV file with source names. + - processed_catalogue (str): Path to the catalogue of already processed sources. + + Returns: + - None: Outputs the unprocessed sources to 'test_catalogue_unprocessed.csv' in the working directory. + """ + # Load the processed catalogue to check for already processed sources + processed_df = pd.read_csv(processed_catalogue) + processed_sources = set(processed_df['Name']) # Convert to set for faster lookups + + # Load the input CSV into a DataFrame + input_df = pd.read_csv(input_csv) + + # Filter the input DataFrame to exclude processed sources + unprocessed_df = input_df[~input_df['Name'].isin(processed_sources)] + + # Output the unprocessed sources to a new CSV file + output_csv = "test_catalogue_unprocessed.csv" + unprocessed_df.to_csv(output_csv, index=False) + print(f"Unprocessed sources saved to {output_csv}") + + +def process_data(credentials, input_csv, processed_catalogue, timeout_seconds, project_code): + """ + Processes an input catalogue of unprocessed sources to retrieve relevant data, and saves the processed details to a CSV. + + Parameters: + - credentials (str): Path to the CASDA credentials file. + - input_csv (str): Path to the input CSV file with source names. + - processed_catalogue (str): Path to the catalogue of already processed sources. + - timeout_seconds (int): Timeout setting in seconds for download operations. + - project_code (str): Code of the project. + + Returns: + - None: Outputs the results to 'hipass_ms_file_details.csv' in the working directory. + """ + + # Read credentials from the provided file + parser = configparser.ConfigParser() + parser.read(credentials) + username = parser["CASDA"]["username"] + password = parser["CASDA"]["password"] + + # Initialize CASDA instance + casda = Casda(parser["CASDA"]["username"], parser["CASDA"]["password"]) + + # Prepare a list to store the output rows for .ms files + output_data = [] + + # Load the processed catalogue to check for already processed sources + processed_catalogue = pd.read_csv(processed_catalogue) + processed_sources = set(processed_catalogue['Name']) + + with open(input_csv, mode='r') as csv_file: + csv_reader = csv.DictReader(csv_file) + + # For every row i.e. HIPASS source + for row in csv_reader: + name = row['Name'] + + # Check if the source has already been processed + if name in processed_sources: + print(f"{name} already processed") + continue # Skip to the next row if the source is processed + + print(f"Querying for: {name}") + + # Inserting the download_evaluation_files (code) + # Step 1: Create sbid_visibility_dict + sbid_visibility_dict = {} + res = tap_query_filename_visibility(name) + + obs_id_list = list(res['obs_id']) + obs_id_list = [str(item) for item in obs_id_list] + + visibility_list = list(res['filename']) + visibility_list = [str(item) for item in visibility_list] + + for obs_id, visibility in zip(obs_id_list, visibility_list): + sbid_visibility_dict.setdefault(obs_id, []).append(visibility) + + # Update the same dictionary by modifying keys + sbid_visibility_dict = {key.replace('ASKAP-', ''): value for key, value in sbid_visibility_dict.items()} + + # Step 2: Create sbid_evaluation_dict from sbid_visibility_dict + # Initialize the dictionary to store results + sbid_evaluation_dict = {} + + # Extract unique SBIDs from sbid_visibility_dict + unique_sbid_set = sbid_visibility_dict.keys() + + for sbid in unique_sbid_set: + # Run the TAP query for the current SBID + res = tap_query_sbid_evaluation(sbid) + + # Check if the result is not empty + if len(res) > 0: + # Convert the result to an Astropy Table for easier processing + table = Table(res) + + # Ensure the necessary columns exist + if "filename" in table.colnames and "filesize" in table.colnames: + # Find the row with the largest filesize + largest_file_row = table[table['filesize'].argmax()] + filename = largest_file_row['filename'] # Get the filename + else: + filename = None # If columns are missing, set to None + else: + filename = None # If query result is empty, set to None + + # Add the SBID and its corresponding filename to the dictionary + sbid_evaluation_dict[sbid] = filename + + # Convert np.str_ values to plain strings in sbid_evaluation_dict + sbid_evaluation_dict = {key: str(value) for key, value in sbid_evaluation_dict.items()} + + # Print the two dictionaries + # Print sbid_visibility_dict + print("sbid_visibility_dict:") + print(sbid_visibility_dict) + + # Print sbid_evaluation_dict + print("sbid_evaluation_dict:") + print(sbid_evaluation_dict) + + # Creating a new vis, eval dict based on the above two dictionaries + vis_eval_dict = {sbid_evaluation_dict[key]: value for key, value in sbid_visibility_dict.items()} + + # Print vis_eval_dict + print("vis_eval_dict:") + print(vis_eval_dict) + + # Rename the values of the dict accordingly + # Make a deep copy of the dictionary + updated_vis_eval_dict = copy.deepcopy(vis_eval_dict) + + # Dictionary to track the occurrence of filenames + occurrence_count = {} + + # Iterate through the copy and rename duplicates + for key, file_list in updated_vis_eval_dict.items(): + for i, filename in enumerate(file_list): + # If the filename has been seen before + if filename in occurrence_count: + occurrence_count[filename] += 1 # Increment the occurrence count + # Rename the file by appending _N + name_parts = filename.split('.ms.tar') # Split to add suffix + new_name = f"{name_parts[0]}_{occurrence_count[filename]}.ms.tar" + file_list[i] = new_name # Replace with the new name + else: + # If first occurrence, initialise count + occurrence_count[filename] = 1 + + # Print updated_vis_eval_dict + print("updated_vis_eval_dict:") + print(updated_vis_eval_dict) + + # Get RA, DEC, and Vsys from the query + res = tap_query_RA_DEC_VSYS(name) + + # Assuming res returns a DataFrame with the required values, extract them + if not res or len(res) == 0: + print(f"No results found for {name}. Skipping...") + continue + + ra = res['RAJ2000'][0] + dec = res['DEJ2000'][0] + vsys = res['VSys'][0] + print(f"Retrieved RA={ra}, DEC={dec}, VSys={vsys} for {name}") + + # Convert RA and DEC from degrees to hms and dms formats + ra_h, ra_m, ra_s = degrees_to_hms(ra) + dec_d, dec_m, dec_s = degrees_to_dms(dec) + print(f"Converted RA={ra_h}h {ra_m}m {ra_s:.2f}s, DEC={dec_d}° {dec_m}′ {dec_s:.2f}″ for {name}") + + # Get filenames + res = tap_query(name) + url_list = casda.stage_data(res, verbose=True) + print(f"Staging data URLs for {name}") + + files = res['filename'] + + # Dictionary to keep track of duplicate counts for each file + filename_counts = {} + for file in files: + # Remove the .tar extension from the filename + file_no_tar = file.replace('.ms.tar', '') + + # Check if the filename already exists in the dictionary + if file_no_tar in filename_counts: + # Increment the counter for this filename + filename_counts[file_no_tar] += 1 + # Insert the counter before the .ms suffix + new_filename = f"{file_no_tar}_{filename_counts[file_no_tar]}" + else: + # First occurrence of the filename, set counter to 1 + filename_counts[file_no_tar] = 1 + # Keep the original filename on the first occurrence + new_filename = file_no_tar + + print(f"File {new_filename} added to i/p for pipeline part B") + output_data.append([new_filename, f"{ra_h}: {ra_m}: {ra_s:.2f}", f"{dec_d}: {dec_m}: {dec_s:.2f}", vsys]) + + # Creates a df with with filename, RA, DEC and System Velocity + output_df = pd.DataFrame(output_data, columns=['Name', 'RA', 'DEC', 'Vsys']) + + # Add an additional column i.e. the evaluation file + # Apply the function to create the new column + output_df['evaluation_file'] = output_df['Name'].apply(find_evaluation_file, args=(updated_vis_eval_dict,)) + + # Define the suffix to append to evaluation_file for creating evaluation_file_path + suffix = "LinmosBeamImages/akpb.iquv.square_6x6.54.1295MHz.SB32736.cube.fits" + + # Create a new column evaluation_file_path by combining evaluation_file with the suffix + output_df['evaluation_file_path'] = output_df['evaluation_file'].apply( + lambda x: x.replace('.tar', f"/{suffix}") if pd.notnull(x) else None + ) + + output_csv = os.path.join('.', 'hipass_ms_file_details.csv') + output_df.to_csv(output_csv, index=False, header=False) + print(f"Output saved to {output_csv}") + + +def download_data_ms(credentials, input_csv, processed_catalogue, timeout_seconds, project_code): + """ + Downloads and untars the .ms files for a given HIPASS source. + + Parameters: + - credentials (str): Path to the CASDA credentials file. + - input_csv (str): Path to the input CSV file with source names. + - processed_catalogue (str): Path to the catalogue of already processed sources. + - timeout_seconds (int): Timeout setting in seconds for download operations. + - project_code (str): Code of the project. + + Returns: + - None + """ + + # Read credentials from the provided file + parser = configparser.ConfigParser() + parser.read(credentials) + username = parser["CASDA"]["username"] + password = parser["CASDA"]["password"] + + # Initialize CASDA instance + casda = Casda(parser["CASDA"]["username"], parser["CASDA"]["password"]) + + # Prepare a list to store the output rows for .ms files + output_data = [] + + # Load the processed catalogue to check for already processed sources + processed_catalogue = pd.read_csv(processed_catalogue) + processed_sources = set(processed_catalogue['Name']) + + with open(input_csv, mode='r') as csv_file: + csv_reader = csv.DictReader(csv_file) + + # For every row i.e. HIPASS source + for row in csv_reader: + name = row['Name'] + + # Check if the source has already been processed + if name in processed_sources: + print(f"{name} already processed") + continue # Skip to the next row if the source is processed + + print(f"Querying for: {name}") + + # Get filenames from the query + res = tap_query(name) + url_list = casda.stage_data(res, verbose=True) + print(f"url_list: {url_list}") + + # Download files concurrently in the current working directory + # Empty list to store downloaded filenames + file_list = [] + + # ThreadPoolExecuter created with a maximum of 4 threads, meaning upto 4 file downloads can happen simultaneously + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + + # List of futures, where each future represents a task to be submitted to the executor + futures = [ + executor.submit(download_file, url=url, check_exists=True, output='.', timeout=timeout_seconds) + for url in url_list if not url.endswith('checksum') + ] + + # For each completed future, save the file-name to file_list + for future in concurrent.futures.as_completed(futures): + file_list.append(future.result()) + + # Untar files in the current working directory + print(f"Untarring files for: {name}") + for file in file_list: + if file.endswith('.tar') and tarfile.is_tarfile(file): + untar_file(file, '.') + + print(f".ms files downloaded") + +def download_data_eval(credentials, input_csv, processed_catalogue, timeout_seconds, project_code): + """ + Downloads and untars the evaluation files for a given HIPASS source. + + Parameters: + - credentials (str): Path to the CASDA credentials file. + - input_csv (str): Path to the input CSV file with source names. + - processed_catalogue (str): Path to the catalogue of already processed sources. + - timeout_seconds (int): Timeout setting in seconds for download operations. + - project_code (str): Code of the project. + + Returns: + - None + """ + + # Read credentials from the provided file + parser = configparser.ConfigParser() + parser.read(credentials) + username = parser["CASDA"]["username"] + password = parser["CASDA"]["password"] + + # Initialize CASDA instance + casda = Casda(parser["CASDA"]["username"], parser["CASDA"]["password"]) + + # Prepare a list to store the output rows for .ms files + output_data = [] + + # Load the processed catalogue to check for already processed sources + processed_catalogue = pd.read_csv(processed_catalogue) + processed_sources = set(processed_catalogue['Name']) + + with open(input_csv, mode='r') as csv_file: + csv_reader = csv.DictReader(csv_file) + + # For every row i.e. HIPASS source + for row in csv_reader: + name = row['Name'] + + # Check if the source has already been processed + if name in processed_sources: + print(f"{name} already processed") + continue # Skip to the next row if the source is processed + + print(f"Querying for: {name}") + + # Inserting the download_evaluation_files (code) + # Step 1: Create sbid_visibility_dict + sbid_visibility_dict = {} + res = tap_query_filename_visibility(name) + + obs_id_list = list(res['obs_id']) + obs_id_list = [str(item) for item in obs_id_list] + + visibility_list = list(res['filename']) + visibility_list = [str(item) for item in visibility_list] + + for obs_id, visibility in zip(obs_id_list, visibility_list): + sbid_visibility_dict.setdefault(obs_id, []).append(visibility) + + # Update the same dictionary by modifying keys + sbid_visibility_dict = {key.replace('ASKAP-', ''): value for key, value in sbid_visibility_dict.items()} + + # Step 2: Create sbid_evaluation_dict from sbid_visibility_dict + # Initialize the dictionary to store results + sbid_evaluation_dict = {} + + # Extract unique SBIDs from sbid_visibility_dict + unique_sbid_set = sbid_visibility_dict.keys() + + for sbid in unique_sbid_set: + # Run the TAP query for the current SBID + res = tap_query_sbid_evaluation(sbid) + + # Check if the result is not empty + if len(res) > 0: + # Convert the result to an Astropy Table for easier processing + table = Table(res) + + # Ensure the necessary columns exist + if "filename" in table.colnames and "filesize" in table.colnames: + # Find the row with the largest filesize + largest_file_row = table[table['filesize'].argmax()] + filename = largest_file_row['filename'] # Get the filename + else: + filename = None # If columns are missing, set to None + else: + filename = None # If query result is empty, set to None + + # Add the SBID and its corresponding filename to the dictionary + sbid_evaluation_dict[sbid] = filename + + # Convert np.str_ values to plain strings in sbid_evaluation_dict + sbid_evaluation_dict = {key: str(value) for key, value in sbid_evaluation_dict.items()} + + # Print the two dictionaries + # Print the updated sbid_visibility_dict + print("sbid_visibility_dict:") + print(sbid_visibility_dict) + + # Print the updated dictionary + print("sbid_evaluation_dict:") + print(sbid_evaluation_dict) + + # Step 3: Downloading the required evaluation files + + # Iterate through the dictionaries + for sbid, required_filename in sbid_evaluation_dict.items(): + print(f"Processing SBID: {sbid}") + + # Remove 'ASKAP-' prefix if present + sbid = str(sbid).replace('ASKAP-', '') + + # Fetch the DID (data identification) for the sbid and project code + url = f"{DID_URL}?projectCode={project_code}&sbid={sbid}" + logging.info(f"Requesting data from: {url}") + res = requests.get(url) + if res.status_code != 200: + raise Exception(f"Error fetching data: {res.reason} (HTTP {res.status_code})") + + logging.info(f"Response received: {res.json()}") + + # Filter evaluation files + evaluation_files = [f for f in res.json() if "evaluation" in f] + evaluation_files.sort() + + if not evaluation_files: + logging.warning(f"No evaluation files found for projectCode={project_code} and sbid={sbid}.") + return + + logging.info(f"Found evaluation files: {evaluation_files}") + + # Prepare the table for staging + t = Table() + t["access_url"] = [f"{EVAL_URL}{f}" for f in evaluation_files] + + # Stage files for download + url_list = casda.stage_data(t) + logging.info(f"Staging files: {url_list}") + + # Check which files need to be downloaded and filter by required filename + download_url_list = [] + for url in url_list: + filename = url.split("?")[0].rsplit("/", 1)[1] + if filename == required_filename: + download_url_list.append(url) + + # View the download_url_list + print("Files staged for download:") + for idx, url in enumerate(download_url_list, start=1): + print(f"- link {idx}: {url}") + + # Download the required files + # Define the download directory as the current working directory + download_dir = os.getcwd() + + # Download the required files + if download_url_list: + print(f"Downloading files to: {download_dir}") + filelist = casda.download_files(download_url_list, savedir=download_dir) + logging.info(f"Downloaded files: {filelist}") + logging.info(f"All files have been downloaded to {download_dir}.") + else: + logging.warning("No files staged for download.") + + # Step 4: Untar all the evaluation files + # Download directory would be the current working directory + download_dir = os.getcwd() + + # Iterating through dict values to untar each file + for sbid, tar_file in sbid_evaluation_dict.items(): + + tar_path = os.path.join(download_dir, tar_file) + + tar_file_folder_name = os.path.splitext(tar_file)[0] + + # Create the folder if it doesn't already exist + os.makedirs(tar_file_folder_name, exist_ok=True) + + # Extract the .tar file into the folder + with tarfile.open(tar_path, "r") as tar: + tar.extractall(path=tar_file_folder_name) + + print(f"Extracted '{tar_file}' to '{tar_file_folder_name}'") + + print(f"Evaluation files downloaded!") \ No newline at end of file