From 5a324268914fd810787258d3a4df5683254604c0 Mon Sep 17 00:00:00 2001 From: khaledk2 Date: Sun, 27 Oct 2024 18:20:34 +0000 Subject: [PATCH] Fix import csv and add support to be used with multosearch --- configurations/configuration.py | 96 +++++---- manage.py | 60 +++++- .../api/v1/resources/query_handler.py | 2 +- .../api/v1/resources/resource_analyser.py | 110 +++++----- omero_search_engine/api/v1/resources/utils.py | 50 +++++ .../elasticsearch/transform_data.py | 197 ++++++++++-------- omero_search_engine/database/utils.py | 1 - .../validation/results_validator.py | 2 +- 8 files changed, 335 insertions(+), 183 deletions(-) diff --git a/configurations/configuration.py b/configurations/configuration.py index d84edf1..49b4f8c 100644 --- a/configurations/configuration.py +++ b/configurations/configuration.py @@ -58,30 +58,41 @@ def set_database_connection_variables(config): from omero_search_engine.database.database_connector import DatabaseConnector config.database_connectors = {} + config.FILES = {} for source in config.DATA_SOURCES: - if source.get("DATABASE").get("DATABASE_PORT"): - address = source.get("DATABASE").get( - "DATABASE_SERVER_URI" - ) + ":%s" % source.get("DATABASE").get("DATABASE_PORT") - else: - address = source.get("DATABASE").get("DATABASE_SERVER_URI") - DATABASE_URI = "postgresql://%s:%s@%s/%s" % ( - source.get("DATABASE").get("DATABASE_USER"), - source.get("DATABASE").get("DATABASE_PASSWORD"), - address, - source.get("DATABASE").get("DATABASE_NAME"), - ) - database_connector = DatabaseConnector( - source.get("DATABASE").get("DATABASE_NAME"), DATABASE_URI - ) - config.database_connectors[source.get("name")] = database_connector - - -def update_config_file(updated_configuration, configure_database=False): + if source.get("DATABASE"): + if source.get("DATABASE").get("DATABASE_PORT"): + address = source.get("DATABASE").get( + "DATABASE_SERVER_URI" + ) + ":%s" % source.get("DATABASE").get("DATABASE_PORT") + else: + address = source.get("DATABASE").get("DATABASE_SERVER_URI") + DATABASE_URI = "postgresql://%s:%s@%s/%s" % ( + source.get("DATABASE").get("DATABASE_USER"), + source.get("DATABASE").get("DATABASE_PASSWORD"), + address, + source.get("DATABASE").get("DATABASE_NAME"), + ) + database_connector = DatabaseConnector( + source.get("DATABASE").get("DATABASE_NAME"), DATABASE_URI + ) + config.database_connectors[source.get("name")] = database_connector + elif source.get("CSV"): + csv_config={"Type":"CSV"} + config.FILES [source.get("name")]= csv_config + if source.get("CSV"). get("images_folder"): + csv_config["images_folder"]=source.get("CSV"). get("images_folder") + if source.get("CSV").get("projects_file"): + csv_config["projects_file"] = source.get("CSV").get("projects_file") + if source.get("CSV").get("screens_file"): + csv_config["screens_file"] = source.get("CSV").get("screens_file") + + +def update_config_file(updated_configuration, data_source=False): is_changed = False with open(app_config.INSTANCE_CONFIG) as f: configuration = yaml.load(f) - if not configure_database: + if not data_source: found = [] for key, value in updated_configuration.items(): if key in configuration: @@ -98,27 +109,40 @@ def update_config_file(updated_configuration, configure_database=False): print("%s value is added with value %s " % (key, value)) is_changed = True else: - is_changed = config_database(configuration, updated_configuration) + is_changed = config_datasource(configuration, updated_configuration) if is_changed: with open(app_config.INSTANCE_CONFIG, "w") as f: yaml.dump(configuration, f) - -def config_database(configuration, updated_configuration): - for data_source in configuration.get("DATA_SOURCES"): - changed = False - Found = False - if data_source["name"].lower() == updated_configuration["name"].lower(): - Found = True - for k, v in updated_configuration["DATABASE"].items(): - if data_source["DATABASE"][k] != v: - data_source["DATABASE"][k] = v - changed = True - break - if not Found: - configuration.get("DATA_SOURCES").append(updated_configuration) - changed = True +def config_datasource(configuration, updated_configuration): + changed = False + Found = False + if updated_configuration.get("CSV").get("type") =="CSV": + for data_source in configuration.get("DATA_SOURCES"): + if data_source.get("name").lower()==updated_configuration.get("name").lower(): + Found=True + for k, v in updated_configuration["CSV"].items(): + if v=="CSV": + continue + if data_source["CSV"].get(k) != v: + data_source["CSV"][k] = v + changed = True + if not Found: + configuration.get("DATA_SOURCES").append(updated_configuration) + changed = True + else: + for data_source in configuration.get("DATA_SOURCES"): + if data_source["name"].lower() == updated_configuration["name"].lower(): + Found = True + for k, v in updated_configuration["DATABASE"].items(): + if data_source["DATABASE"][k] != v: + data_source["DATABASE"][k] = v + changed = True + break + if not Found: + configuration.get("DATA_SOURCES").append(updated_configuration) + changed = True return changed diff --git a/manage.py b/manage.py index 14aa37b..4b3dab5 100644 --- a/manage.py +++ b/manage.py @@ -172,6 +172,7 @@ def get_index_data_from_database(resource="all", source="all", backup="True"): # return # get_insert_data_to_index(sql_st, resource) # else: + for res, sql_st in sqls_resources.items(): if resource.lower() != "all" and resource.lower() != res.lower(): continue @@ -198,7 +199,6 @@ def get_index_data_from_database(resource="all", source="all", backup="True"): if backup: backup_elasticsearch_data() - # set configurations @manager.command @manager.option("-u", "--url", help="database server url") @@ -237,7 +237,7 @@ def set_database_configuration( database_attrs["DATABASE_BACKUP_FILE"] = backup_filename if len(database_attrs) > 0: - update_config_file(databse_config, configure_database=True) + update_config_file(databse_config, data_source=True) else: search_omero_app.logger.info( "At least one database attribute\ @@ -246,6 +246,31 @@ def set_database_configuration( ) +@manager.command +@manager.option("-n", "--name", help="data source name") +@manager.option("-i", "--images_folder", help="path to a folder contains csv files cwhich contains the image data ") +@manager.option("-p", "--projects_file", help="path to the a file containing the projects data") +@manager.option("-s", "--screens_file", help="path to the a file containing the screens data") +@manager.option("-d", "--datasource_type", help=" data source type; supports csv") + +def set_data_source_files(name=None, images_folder=None, projects_file=None, screens_file=None,datasource_type="CSV"): + source={} + if not name: + print ("Source name attribute is missing") + return + source["name"]=name + source_attrs={} + source["CSV"]=source_attrs + source_attrs["type"]=datasource_type + if images_folder: + source_attrs["images_folder"]=images_folder + if projects_file: + source_attrs["projects_file"]=projects_file + if screens_file: + source_attrs["screens_file"]=screens_file + + update_config_file(source, True) + @manager.command @manager.option("-e", "--elasticsearch_url", help="elasticsearch url") def set_elasticsearch_configuration(elasticsearch_url=None): @@ -479,6 +504,37 @@ def test_container_key_value(): check_container_keys_vakues() +@manager.command +@manager.option( + "-s", + "--source", + help="data source name, ndexeing all the data sources is the default", # noqa +) +@manager.option( + "-f", + "--folder", + help="data folder which contains csv files", # noqa +) +@manager.option( + "-r", + "--resource", + help="resource name, creating all the indexes for all the resources is the default", # noqa +) +def get_index_data_from_csv_files(source=None, folder=None, resource="image"): + from omero_search_engine.cache_functions.elasticsearch.transform_data import ( + insert_resource_data, + save_key_value_buckets + ) + insert_resource_data( + folder=folder, resource=resource, data_source=source, from_json=False + ) + save_key_value_buckets( + resource_table_=resource, + data_source=source, + clean_index=False, + only_values=False, + ) + if __name__ == "__main__": from flask_script import Command diff --git a/omero_search_engine/api/v1/resources/query_handler.py b/omero_search_engine/api/v1/resources/query_handler.py index 8da6f4c..ad57341 100644 --- a/omero_search_engine/api/v1/resources/query_handler.py +++ b/omero_search_engine/api/v1/resources/query_handler.py @@ -53,7 +53,7 @@ def check_get_names(idr_, resource, attribute, return_exact=False): for name in pr_names_ if name[attribute] and idr_.lower() in name[attribute].lower() ] - print(act_name, data_source) + # print(act_name, data_source) all_act_names = all_act_names + act_name else: # This should be modified to query specific data source specific diff --git a/omero_search_engine/api/v1/resources/resource_analyser.py b/omero_search_engine/api/v1/resources/resource_analyser.py index 107908a..8d233a6 100644 --- a/omero_search_engine/api/v1/resources/resource_analyser.py +++ b/omero_search_engine/api/v1/resources/resource_analyser.py @@ -41,44 +41,11 @@ }}}}}}}}""" ) key_number_search_template = Template( - r""" -{ - "size":0, - "query":{ "bool" : {"must": { - "match":{ - "data_source.keyvalue":"$data_source" - } - } - } - }, - "aggs":{ - "value_search":{ - "nested":{ - "path":"key_values" - }, - "aggs":{ - "value_filter":{ - "filter":{ - "terms":{ - "key_values.name.keyword":[ - "$key" - ] - } - }, - "aggs":{ - "required_values":{ - "cardinality":{ - "field":"key_values.value.keyvalue", - "precision_threshold":4000 - } - } - } - } - } - } - } -} -""" + """ +{"size":0,"query":{ "bool": {"must": {"match":{"data_source.keyvalue":"$data_source"}}}}, +"aggs":{"value_search":{"nested":{"path":"key_values"},"aggs":{"value_filter":{"filter":{"terms":{ +"key_values.name.keyword":["$key"]}},"aggs":{"required_values":{"cardinality":{ +"field":"key_values.value.keyvalue","precision_threshold":4000}}}}}}}}""" ) search_by_value_only = Template( @@ -535,8 +502,8 @@ def query_cashed_bucket( if name: name = name.strip() - if resource != "all": + if resource != "all": query = key_values_buckets_template.substitute( name=name, resource=resource, data_source=json.dumps(data_source) ) @@ -886,6 +853,8 @@ def get_the_results( es_index, query ) # .search(index=es_index, body=query) hits = results_["hits"]["hits"] + print ("===>>> Hist %s"%hits) + if len(hits) > 0: for hit in hits: if len(hits) > 0: @@ -912,6 +881,8 @@ def get_the_results( returned_results[hit["_source"]["data_source"]] = [ item for item in hit["_source"]["resourcename"] ] + else: + return returned_results # remove container description from the results, # should be added again later after cleaning up the description @@ -921,6 +892,7 @@ def get_the_results( return returned_results + def get_container_values_for_key( table_, container_name, csv, ret_data_source=None, key=None ): @@ -1053,22 +1025,50 @@ def process_container_query(table_, attribute_name, container_id, key, resourse) {"terms": {"field": "key_values.value.keyvalue","size": 10000}}}}}}}""" ) - """ Get all the keys bucket""" -container_project_keys_template = { - "keys_search": { - "nested": {"path": "key_values"}, - "aggs": { - "required_values": { - "cardinality": { - "field": "key_values.name.keynamenormalize", - "precision_threshold": 4000, - }, - }, - "uniquesTerms": { - "terms": {"field": "key_values.name.keynamenormalize", "size": 10000} - }, - }, - } +container_project_keys_template = Template( + """ +{"keys_search": {"nested": {"path": "key_values"}, +"aggs": {"required_values": {"cardinality": {"field": "key_values.name.keynamenormalize","precision_threshold": 4000, +},},"uniquesTerms": {"terms": {"field": "key_values.name.keynamenormalize", "size": 10000}},},}} +""" +) +resource_keys_template= Template( + ''' + { + "size":0, + "query":{ "bool" : {"must": { + "match":{ + "data_source.keyvalue":"$data_source" + } + } + } + }, + "aggs":{ + "value_search":{ + "nested":{ + "path":"key_values" + }, + "aggs":{ + "required_values":{ + "cardinality":{ + "field":"key_values.name.keyword", + "precision_threshold":4000 + } + } + }, +"aggs": {"required_name": { +"terms": {"field": "key_values.name.keyword","size": 9999}}} + + } + } } +''' +) + + +def get_resource_keys(resource, data_source): + res_index = resource_elasticsearchindex.get(resource) + res = search_index_for_value(res_index, json.loads(resource_keys_template.substitute(data_source=data_source))) + return res["aggregations"]["value_search"]["required_name"]["buckets"] diff --git a/omero_search_engine/api/v1/resources/utils.py b/omero_search_engine/api/v1/resources/utils.py index 8991439..ea7a851 100644 --- a/omero_search_engine/api/v1/resources/utils.py +++ b/omero_search_engine/api/v1/resources/utils.py @@ -1338,6 +1338,8 @@ def get_data_sources(): data_sources = [] for data_source in search_omero_app.config.database_connectors.keys(): data_sources.append(data_source) + for data_source in search_omero_app.config.get("FILES").keys(): + data_sources.append(data_source) return data_sources @@ -1345,3 +1347,51 @@ def check_empty_string(string_to_check): if string_to_check: string_to_check = string_to_check.strip() return string_to_check + + +def get_all_index_data(res_table, data_source): + query_return_all_data = {"query_details": {"and_filters": [], "or_filters": [], "case_sensitive": False}} + res=search_resource_annotation( + res_table, + query_return_all_data, + return_containers=False, + data_source=data_source, + ) + return res + +################## +def get_number_image_inside_container(resource, res_id, data_source): + and_filters = [] + main_attributes = { + "and_main_attributes": [ + { + "name": "%s_id" % resource, + "value": res_id, + "operator": "equals", + "resource": "image", + }, + { + "name": "data_source", + "value": data_source, + "operator": "equals", + "resource": "image", + }, + ] + } + or_filters = [] + query = {"and_filters": and_filters, "or_filters": or_filters} + + query_data = { + "query_details": query, + "main_attributes": main_attributes, + } + + returned_results = search_resource_annotation("image", query_data) + if returned_results.get("results"): + if returned_results.get("results").get("size"): + searchengine_results = returned_results["results"]["size"] + else: + searchengine_results = 0 + return searchengine_results + +##################### diff --git a/omero_search_engine/cache_functions/elasticsearch/transform_data.py b/omero_search_engine/cache_functions/elasticsearch/transform_data.py index 4fe769e..cebd488 100644 --- a/omero_search_engine/cache_functions/elasticsearch/transform_data.py +++ b/omero_search_engine/cache_functions/elasticsearch/transform_data.py @@ -354,7 +354,42 @@ def get_file_list(path_name): return f +def handle_file_2(lock, global_counter, val): + file_name = val[0] + resource = val[1] + data_source = val[2] + total_files = val[3] + try: + lock.acquire() + global_counter.value += 1 + except Exception as ex: + print("Error %s" % ex) + raise ex + finally: + lock.release() + search_omero_app.logger.info( + "%s/%s Reading the csv file %s" % (global_counter.value, total_files, file_name) + ) + if resource == "imqge": + df = pd.read_csv(file_name, low_memory=False).replace({np.nan: None}) + else: + df = pd.read_csv(file_name, low_memory=False).replace({np.nan: None}) + insert_resource_data_from_df(df, resource, data_source, lock) + + def insert_resource_data(folder, resource, data_source, from_json): + start_time = datetime.now() + no_processors = search_omero_app.config.get("NO_PROCESSES") + if not no_processors: + no_processors = int(multiprocessing.cpu_count() / 2) + search_omero_app.logger.info( + "Number of the allowed parallel\ + processes inside the pool: %s" + % no_processors + ) + # create the multiprocessing pool + pool = multiprocessing.Pool(no_processors) + search_omero_app.logger.info( "Adding data to\ {} using {}".format( @@ -369,55 +404,6 @@ def insert_resource_data(folder, resource, data_source, from_json): ) return - es_index = resource_elasticsearchindex.get(resource) - if resource == "image": - is_image = True - cols = [ - "id", - "data_source", - "owner_id", - "experiment", - "group_id", - "name", - "description", - "mapvalue_name", - "mapvalue_value", - "mapvalue_index", - "project_name", - "project_id", - "dataset_name", - "dataset_id", - "screen_id", - "screen_name", - "plate_id", - "plate_name", - "well_id", - "wellsample_id", - ] - else: - is_image = False - if resource == "well": - cols = [ - "id", - "data_source", - "owner_id", - "group_id", - "mapvalue_name", - "mapvalue_value", - "mapvalue_index", - ] - else: - cols = [ - "id", - "data_source", - "owner_id", - "group_id", - "name", - "description", - "mapvalue_name", - "mapvalue_value", - "mapvalue_index", - ] f_con = 0 if os.path.isfile(folder): files_list = [folder] @@ -428,6 +414,7 @@ def insert_resource_data(folder, resource, data_source, from_json): "No valid folder ({folder}) is provided ".format(folder=folder) ) return + vals = [] for fil in files_list: fil = fil.strip() if from_json and not fil.endswith(".json"): @@ -435,20 +422,38 @@ def insert_resource_data(folder, resource, data_source, from_json): n = len(files_list) search_omero_app.logger.info("%s==%s == %s" % (f_con, fil, n)) file_name = os.path.join(folder, fil) - handle_file(file_name, es_index, cols, is_image, data_source, from_json) - search_omero_app.logger.info("File: %s has been processed" % fil) - try: - with open(file_name + ".done", "w") as outfile: - json.dump(f_con, outfile) - except Exception as ex: - search_omero_app.logger.info("Error .... writing Done file ...") - raise ex - f_con += 1 - + vals.append((file_name, resource, data_source, len(files_list))) + # vals.append((cur_max_id, (cur_max_id - page_size), resource, data_source)) + # handle_file(file_name, es_index, cols, is_image, data_source, from_json) + # handle_file_2(file_name, resource, data_source) + # search_omero_app.logger.info("File: %s has been processed" % fil) + # try: + # #with open(file_name + ".done", "w") as outfile: + # json.dump(f_con, outfile) + # print ("======") + # except Exception as ex: + # search_omero_app.logger.info("Error .... writing Done file ...") + # raise ex + # f_con += 1 + try: + manager = multiprocessing.Manager() + # a lock which will be used between the processes in the pool + lock = manager.Lock() + # a counter which will be used by the processes in the pool + counter_val = manager.Value("i", 0) + func = partial(handle_file_2, lock, counter_val) + # map the data which will be consumed by the processes inside the pool + res = pool.map(func, vals) # noqa + delta = str(datetime.now() - start_time) + search_omero_app.logger.info("Total time=%s" % delta) + # print(res) + except Exception as ex: + print("Error is : %s" % ex) + raise ex + finally: + pool.close() total_process = 0 - - def get_insert_data_to_index(sql_st, resource, data_source, clean_index=True): """ - Query the postgreSQL database server and get metadata (key-value pair) @@ -656,7 +661,6 @@ def insert_plate_data(folder, plate_file): ] handle_file(file_name, es_index, cols) - def save_key_value_buckets( resource_table_=None, data_source=None, clean_index=False, only_values=False ): @@ -699,35 +703,51 @@ def save_key_value_buckets( %s ......." % resource_table ) - resource_keys = get_keys(resource_table, data_source) + from omero_search_engine.api.v1.resources.resource_analyser import ( + get_resource_keys, + get_resource_names) + from omero_search_engine.api.v1.resources.utils import get_all_index_data,get_number_image_inside_container + res = get_resource_keys(resource_table, data_source) + resource_keys = [res["key"] for res in res] + # resource_keys = get_keys(resource_table, data_source) name_results = None if resource_table in ["project", "screen"]: - sql = "select id, name,description from {resource}".format( - resource=resource_table - ) - conn = search_omero_app.config.database_connectors[data_source] - name_result = conn.execute_query(sql) + #sql = "select id, name,description from {resource}".format( + # resource=resource_table + #) + #conn = search_omero_app.config.database_connectors[data_source] + #name_result = conn.execute_query(sql) + #name_result = get_resource_names(resource=resource_table, data_source=json.dumps(data_source)) + #print (name_result) # name_results = [res["name"] for res in name_results] # Determine the number of images for each container - for res in name_result: - id = res.get("id") - if resource_table == "project": - sql_n = query_images_in_project_id.substitute(project_id=id) - elif resource_table == "screen": - sql_n = query_images_in_screen_id.substitute(screen_id=id) - no_images_co = conn.execute_query(sql_n) - res["no_images"] = len(no_images_co) - - name_results = [ - { - "id": res["id"], - # "data_source": data_source, - "description": res["description"], - "name": res["name"], - "no_images": res["no_images"], - } - for res in name_result - ] + name_result = get_all_index_data(resource_table, data_source) + # res=get_resource_names(resource=res_tabel, data_source=json.dumps(data_source)) + try: + for res in name_result["results"]["results"]: + id = res.get("id") + # if resource_table == "project": + # sql_n = query_images_in_project_id.substitute(project_id=id) + #elif resource_table == "screen": + # sql_n = query_images_in_screen_id.substitute(screen_id=id) + no_images_co = get_number_image_inside_container(resource_table, id, data_source) + #no_images_co = conn.execute_query(sql_n) + res["no_images"] = no_images_co + + + name_results = [ + { + "id": res["id"], + # "data_source": data_source, + "description": res["description"], + "name": res["name"], + "no_images": res["no_images"], + } + for res in name_result["results"]["results"] + ] + + except: + print(resource_table, "Error, Reslts: %s" % name_result) push_keys_cache_index( resource_keys, resource_table, data_source, es_index_2, name_results @@ -749,6 +769,9 @@ def save_key_value_buckets( vals.append( (key, resource_table, es_index, len(resource_keys), data_source) ) + + + # determine the number of processes inside the process pool no_processors = search_omero_app.config.get("NO_PROCESSES") if not no_processors: diff --git a/omero_search_engine/database/utils.py b/omero_search_engine/database/utils.py index b9a8862..2e76b84 100644 --- a/omero_search_engine/database/utils.py +++ b/omero_search_engine/database/utils.py @@ -38,7 +38,6 @@ def restore_database(source): print(search_omero_app.config.get("DATA_SOURCES")) print(search_omero_app.config.database_connectors.keys()) for data_source in search_omero_app.config.get("DATA_SOURCES"): - print(data_source["name"]) if ( source and source.lower() != "all" diff --git a/omero_search_engine/validation/results_validator.py b/omero_search_engine/validation/results_validator.py index 70b51ce..34186dc 100644 --- a/omero_search_engine/validation/results_validator.py +++ b/omero_search_engine/validation/results_validator.py @@ -1073,7 +1073,7 @@ def get_no_images_sql_containers(data_source, write_report=True): for res_name_ in res_name__: res_name = res_name_.get("name") res_id = res_name_.get("id") - print(res_name) + # print(res_name) message1 = "Checking %s name: %s" % (resource, res_name) messages.append(message1) search_omero_app.logger.info(message1)