From aa40427403018bb3aac91133dcf6ad6954391081 Mon Sep 17 00:00:00 2001 From: khaledk2 Date: Wed, 14 Aug 2024 17:21:16 +0100 Subject: [PATCH] Support indexing from multiple data sources andd fix unit tests and queries --- configurations/configuration.py | 32 ++-- manage.py | 69 ++++--- omero_search_engine/__init__.py | 9 +- .../api/v1/resources/query_handler.py | 2 +- .../api/v1/resources/resource_analyser.py | 179 ++++++++++++------ omero_search_engine/api/v1/resources/utils.py | 1 + .../elasticsearch/elasticsearch_templates.py | 20 ++ .../elasticsearch/transform_data.py | 115 +++++++---- omero_search_engine/database/utils.py | 7 + .../validation/results_validator.py | 25 ++- unit_tests/test_app.py | 6 +- 11 files changed, 316 insertions(+), 149 deletions(-) diff --git a/configurations/configuration.py b/configurations/configuration.py index e7e8f5e5..d84edf15 100644 --- a/configurations/configuration.py +++ b/configurations/configuration.py @@ -26,21 +26,17 @@ def load_configuration_variables_from_file(config): # loading application configuration variables from a file print("Injecting config variables from :%s" % app_config.INSTANCE_CONFIG) - with open(app_config.INSTANCE_CONFIG, 'rt') as f: + with open(app_config.INSTANCE_CONFIG, "rt") as f: - #with open(app_config.INSTANCE_CONFIG) as f: + # with open(app_config.INSTANCE_CONFIG) as f: cofg = yaml.safe_load(f.read()) - - - print (cofg) for x, y in cofg.items(): setattr(config, x, y) if hasattr(config, "verify_certs"): try: verify_certs = json.load(config.verify_certs) except Exception as ex: - print (x) - print("Error %s"%str(ex)) + print("Error %s" % str(ex)) verify_certs = False else: verify_certs = False @@ -61,22 +57,24 @@ def set_database_connection_variables(config): """ from omero_search_engine.database.database_connector import DatabaseConnector - config.database_connectors={} + config.database_connectors = {} for source in config.DATA_SOURCES: if source.get("DATABASE").get("DATABASE_PORT"): - address = source.get("DATABASE").get("DATABASE_SERVER_URI") + ":%s" % source.get("DATABASE").get("DATABASE_PORT") + address = source.get("DATABASE").get( + "DATABASE_SERVER_URI" + ) + ":%s" % source.get("DATABASE").get("DATABASE_PORT") else: address = source.get("DATABASE").get("DATABASE_SERVER_URI") DATABASE_URI = "postgresql://%s:%s@%s/%s" % ( source.get("DATABASE").get("DATABASE_USER"), source.get("DATABASE").get("DATABASE_PASSWORD"), address, - source.get("DATABASE").get("DATABASE_NAME") + source.get("DATABASE").get("DATABASE_NAME"), ) database_connector = DatabaseConnector( source.get("DATABASE").get("DATABASE_NAME"), DATABASE_URI ) - config.database_connectors[source.get("name")]= database_connector + config.database_connectors[source.get("name")] = database_connector def update_config_file(updated_configuration, configure_database=False): @@ -106,22 +104,24 @@ def update_config_file(updated_configuration, configure_database=False): with open(app_config.INSTANCE_CONFIG, "w") as f: yaml.dump(configuration, f) + def config_database(configuration, updated_configuration): for data_source in configuration.get("DATA_SOURCES"): - changed=False + changed = False Found = False - if data_source["name"].lower()==updated_configuration["name"].lower(): + if data_source["name"].lower() == updated_configuration["name"].lower(): Found = True for k, v in updated_configuration["DATABASE"].items(): - if data_source["DATABASE"][k] !=v: - data_source["DATABASE"][k]=v - changed=True + if data_source["DATABASE"][k] != v: + data_source["DATABASE"][k] = v + changed = True break if not Found: configuration.get("DATA_SOURCES").append(updated_configuration) changed = True return changed + class app_config(object): # the configuration can be loadd from yml file later home_folder = os.path.expanduser("~") diff --git a/manage.py b/manage.py index 9386e987..fe64a939 100644 --- a/manage.py +++ b/manage.py @@ -32,9 +32,7 @@ def show_saved_indices(): ) all_indexes = get_all_indexes() - for index in all_indexes: - print("Index: ==>>>", index) - # return (all_indexes) + return all_indexes @manager.command @@ -118,6 +116,8 @@ def sql_results_to_panda(): def restore_postgresql_database(): from omero_search_engine.database.utils import restore_database + print("HI") + restore_database() @@ -150,31 +150,47 @@ def get_index_data_from_database(resource="all", source="all", backup="True"): save_key_value_buckets, ) import json + backup = json.loads(backup.lower()) + if not source: + print("Data source is required to process") + return + elif source == "all": + clean_index = True + + else: + clean_index = False + for data_source in search_omero_app.config.database_connectors.keys(): - if source.lower()!="all" and data_source.lower() != source.lower(): + if source.lower() != "all" and data_source.lower() != source.lower(): continue - #if resource != "all": + # if resource != "all": # sql_st = sqls_resources.get(resource) # if not sql_st: # return - # get_insert_data_to_index(sql_st, resource) - # else: + # get_insert_data_to_index(sql_st, resource) + # else: for res, sql_st in sqls_resources.items(): - if resource.lower()!="all" and resource.lower() != res.lower(): + if resource.lower() != "all" and resource.lower() != res.lower(): continue - get_insert_data_to_index(sql_st, res, data_source) - + get_insert_data_to_index(sql_st, res, data_source, clean_index) save_key_value_buckets( - resource_table_=None, data_source=data_source, re_create_index=True ,only_values=False + resource_table_=None, + data_source=data_source, + clean_index=clean_index, + only_values=False, ) + + if clean_index: + clean_index = False # validat ethe indexing - test_indexing_search_query(source=data_source, deep_check=False, check_studies=True) + # test_indexing_search_query(source=data_source, deep_check=False, + # check_studies=True) # backup the index data - if backup: - backup_elasticsearch_data() + # if backup: + # backup_elasticsearch_data() # set configurations @@ -186,14 +202,19 @@ def get_index_data_from_database(resource="all", source="all", backup="True"): @manager.option("-p", "--password", help="database username password") @manager.option("-w", "--working_data_source", help="data source") def set_database_configuration( - working_data_source=None,url=None, server_port_number=None, database=None, name=None, password=None + working_data_source=None, + url=None, + server_port_number=None, + database=None, + name=None, + password=None, ): - if not working_data_source: - print ("Data source is required to process") + if not working_data_source: + print("Data source is required to process") database_attrs = {} - databse_config={} - databse_config["name"]=working_data_source - databse_config["DATABASE"]=database_attrs + databse_config = {} + databse_config["name"] = working_data_source + databse_config["DATABASE"] = database_attrs if database: database_attrs["DATABASE_NAME"] = database if url: @@ -342,7 +363,10 @@ def cache_key_value_index(resource=None, create_index=None, only_values=None): help="data source name, ndexeing all the data sources is the default", # noqa ) def test_indexing_search_query( - json_file="app_data/test_index_data.json", source=None, deep_check=False, check_studies=False + json_file="app_data/test_index_data.json", + source=None, + deep_check=False, + check_studies=False, ): """ test the indexing and the searchengine query functions @@ -360,8 +384,9 @@ def test_indexing_search_query( get_omero_stats, get_no_images_sql_containers, ) + if not source: - print ("Data source is required to process ") + print("Data source is required to process ") return validate_queries(json_file, source, deep_check) diff --git a/omero_search_engine/__init__.py b/omero_search_engine/__init__.py index 33be0da2..40ebca63 100644 --- a/omero_search_engine/__init__.py +++ b/omero_search_engine/__init__.py @@ -22,7 +22,8 @@ import logging from elasticsearch import Elasticsearch from flasgger import Swagger, LazyString, LazyJSONEncoder -from omero_search_engine.database.database_connector import DatabaseConnector + +# from omero_search_engine.database.database_connector import DatabaseConnector from configurations.configuration import ( configLooader, load_configuration_variables_from_file, @@ -62,8 +63,7 @@ def create_app(config_name="development"): app_config = configLooader.get(config_name) load_configuration_variables_from_file(app_config) set_database_connection_variables(app_config) - print ("config.database_connectors::::::", app_config.database_connectors) - #atabase_connector = DatabaseConnector( + # atabase_connector = DatabaseConnector( # app_config.DATABASE_NAME, app_config.DATABASE_URI # search_omero_app.config.from_object(app_config) @@ -82,7 +82,7 @@ def create_app(config_name="development"): scheme="https", http_auth=("elastic", ELASTIC_PASSWORD), ) - search_omero_app.config.database_connectors= app_config.database_connectors + search_omero_app.config.database_connectors = app_config.database_connectors print(search_omero_app.config.database_connectors) search_omero_app.config["es_connector"] = es_connector log_folder = os.path.join(os.path.expanduser("~"), "logs") @@ -117,6 +117,7 @@ def create_app(config_name="development"): resources_routers_blueprint_v1, url_prefix="/api/v1/resources" ) + # add it to account for CORS @search_omero_app.after_request def after_request(response): diff --git a/omero_search_engine/api/v1/resources/query_handler.py b/omero_search_engine/api/v1/resources/query_handler.py index f4b4435a..b3023175 100644 --- a/omero_search_engine/api/v1/resources/query_handler.py +++ b/omero_search_engine/api/v1/resources/query_handler.py @@ -110,7 +110,7 @@ def adjust_resource(self): ) if ac_value and len(ac_value) == 1: self.value = ac_value[0] - elif not ac_value or len(ac_value) == 0: + elif not ac_value or len(ac_value) == 0: self.value = -1 else: self.value = ac_value diff --git a/omero_search_engine/api/v1/resources/resource_analyser.py b/omero_search_engine/api/v1/resources/resource_analyser.py index c58ff259..64ab9c21 100644 --- a/omero_search_engine/api/v1/resources/resource_analyser.py +++ b/omero_search_engine/api/v1/resources/resource_analyser.py @@ -30,7 +30,7 @@ import math from flask import jsonify, Response -key_number_search_template = Template( +key_number_search_template_ = Template( """ {"size":0,"aggs":{"value_search":{"nested":{"path":"key_values"}, "aggs":{"value_filter":{"filter":{"terms": @@ -39,6 +39,46 @@ {"field":"key_values.value.keyvalue","precision_threshold":4000 }}}}}}}}""" ) +key_number_search_template = Template( + """ +{ + "size":0, + "query":{ "bool" : {"must": { + "match":{ + "data_source.keyvalue":"$data_source" + } + } + } + }, + "aggs":{ + "value_search":{ + "nested":{ + "path":"key_values" + }, + "aggs":{ + "value_filter":{ + "filter":{ + "terms":{ + "key_values.name.keyword":[ + "$key" + ] + } + }, + "aggs":{ + "required_values":{ + "cardinality":{ + "field":"key_values.value.keyvalue", + "precision_threshold":4000 + } + } + } + } + } + } + } +} +""" +) search_by_value_only = Template( """ @@ -87,6 +127,13 @@ values_for_key_template = Template( """ {"size":0, + "query":{ "bool" : {"must": { + "match":{ + "data_source.keyvalue":"$data_source" + } + } + } + }, "aggs":{"name_search":{"nested":{ "path":"key_values"}, "aggs":{"value_filter":{"filter":{ "terms":{"key_values.name.keyword":["$key"]}},"aggs":{"required_values":{ @@ -193,8 +240,8 @@ def search_value_for_resource_(table_, value): return {"data": returned_results, "total_number": total_number} -def get_number_of_buckets(key, res_index): - query = key_number_search_template.substitute(key=key) +def get_number_of_buckets(key, data_source, res_index): + query = key_number_search_template.substitute(key=key, data_source=data_source) res = search_index_for_value(res_index, query) number_of_buckets = ( res.get("aggregations") @@ -210,9 +257,9 @@ def get_number_of_buckets(key, res_index): return number_of_buckets, number_of_images -def get_all_values_for_a_key(table_, key): +def get_all_values_for_a_key(table_, data_source, key): res_index = resource_elasticsearchindex.get(table_) - query = key_number_search_template.substitute(key=key) + query = key_number_search_template.substitute(key=key, data_source=data_source) res = search_index_for_value(res_index, query) number_of_buckets = ( res.get("aggregations") @@ -232,7 +279,9 @@ def get_all_values_for_a_key(table_, key): total_ret = 0 while co < total: search_omero_app.logger.info("processing: %s / %s" % ((co + 1), total)) - query = values_for_key_template.substitute(key=key, total=total, cur=co) + query = values_for_key_template.substitute( + key=key, total=total, cur=co, data_source=data_source + ) res = search_index_for_value(res_index, query) results.append(res) total_ret += len( @@ -269,14 +318,16 @@ def get_all_values_for_a_key(table_, key): } -def get_values_for_a_key(table_, key): +def get_values_for_a_key(table_, key, data_source): """ search the index to get the available values for a key and get values number for the key """ total_number = 0 res_index = resource_elasticsearchindex.get(table_) - number_of_buckets, number_of_images = get_number_of_buckets(key, res_index) + number_of_buckets, number_of_images = get_number_of_buckets( + key, data_source, res_index + ) query = key_search_template.substitute(key=key) start_time = time.time() res = search_index_for_value(res_index, query) @@ -298,6 +349,7 @@ def get_values_for_a_key(table_, key): returned_results.append(singe_row) singe_row["Key"] = key singe_row["Value"] = value + singe_row["data_source"] = data_source singe_row["Number of %ss" % table_] = value_no return { "data": returned_results, @@ -364,6 +416,7 @@ def prepare_search_results_buckets(results_): res = hit["_source"] row["Key"] = res["Attribute"] row["Value"] = res["Value"] + row["data_resource"] = res["data_source"] resource = res.get("resource") row["Number of %ss" % resource] = res.get("items_in_the_bucket") total_number += res["items_in_the_bucket"] @@ -761,6 +814,9 @@ def get_resource_names(resource, name=None, description=False): for res in ress: returned_results[res] = get_the_results(res, name, description) + print("#############################################") + print(returned_results) + print("#############################################") return returned_results @@ -773,68 +829,82 @@ def get_the_results(resource, name, description, es_index="key_values_resource_c hits = results_["hits"]["hits"] if len(hits) > 0: - # print (hits[0]["_source"]) - if name and not description: - returned_results = [ - item - for item in hits[0]["_source"]["resourcename"] - if item.get("name") and name.lower() in item.get("name").lower() - ] - elif name and description: - returned_results = [ - item - for item in hits[0]["_source"]["resourcename"] - if (item.get("name") and name.lower() in item.get("name").lower()) - or ( - item.get("description") - and name.lower() in item.get("description").lower() - ) - ] - elif "resourcename" in hits[0]["_source"]: - print("==================================") - print ("========>>>>",hits[0]["_source"]) - print ("==================================") - returned_results = [item for item in hits[0]["_source"]["resourcename"]] - else: - return returned_results + for hit in hits: + if len(hits) > 0: + if name and not description: + returned_results[hit["_source"]["data_source"]] = [ + item + for item in hit["_source"]["resourcename"] + if item.get("name") and name.lower() in item.get("name").lower() + ] + elif name and description: + returned_results[hit["_source"]["data_resource"]] = [ + item + for item in hit["_source"]["resourcename"] + if ( + item.get("name") + and name.lower() in item.get("name").lower() + ) + or ( + item.get("description") + and name.lower() in item.get("description").lower() + ) + ] + elif "resourcename" in hit["_source"]: + returned_results[hit["_source"]["data_resource"]] = [ + item for item in hit["_source"]["resourcename"] + ] + else: + return returned_results # remove container description from the results, # should be added again later after cleaning up the description - for item in returned_results: - del item["description"] - + for k, item in returned_results.items(): + del item[0]["description"] return returned_results def get_container_values_for_key(table_, container_name, csv, key=None): returned_results = [] pr_names = get_resource_names("all") - for resourse, names in pr_names.items(): - act_name = [ - {"id": name["id"], "name": name["name"]} - for name in names - if name["name"] and container_name.lower() in name["name"].lower() - ] - if len(act_name) > 0: - for id in act_name: - if resourse != table_: - res = process_container_query( - table_, resourse + "_id", id["id"], key, table_ - ) - else: - res = process_container_query(table_, "id", id["id"], key, table_) - if len(res) > 0: - returned_results.append( - {"name": id["name"], "type": resourse, "results": res} - ) + for resourse, names_ in pr_names.items(): + for data_source, names in names_.items(): + act_name = [ + {"id": name["id"], "name": name["name"]} + for name in names + if name["name"] and container_name.lower() in name["name"].lower() + ] + if len(act_name) > 0: + for id in act_name: + if resourse != table_: + res = process_container_query( + table_, resourse + "_id", id["id"], key, table_ + ) + else: + res = process_container_query( + table_, "id", id["id"], key, table_ + ) + if len(res) > 0: + returned_results.append( + { + "name": id["name"], + "type": resourse, + "data_source": data_source, + "results": res, + } + ) if csv: if key: containers = [ ",".join(["Container", "Type", "Key", "Value", "No of %s" % table_]) ] else: - containers = [",".join(["Container", "Type", "Key", "No of %s" % table_])] + containers = [ + ",".join( + ["Container", "Type", "data_source", "Key", "No of %s" % table_] + ) + ] for r_results in returned_results: reso = r_results.get("name") type = r_results.get("type") @@ -845,6 +915,7 @@ def get_container_values_for_key(table_, container_name, csv, key=None): [ reso, type, + data_source, res.get("key"), res.get("value"), str(res.get("no_%s" % table_)), diff --git a/omero_search_engine/api/v1/resources/utils.py b/omero_search_engine/api/v1/resources/utils.py index c5f89266..23c69bd3 100644 --- a/omero_search_engine/api/v1/resources/utils.py +++ b/omero_search_engine/api/v1/resources/utils.py @@ -1026,6 +1026,7 @@ def search_index_using_search_after( bookmark_ = get_bookmark(pagination_dict) if not bookmark_: result = es.search(index=e_index, body=query) + # print (result) if len(result["hits"]["hits"]) == 0: search_omero_app.logger.info("No result is found") return returned_results diff --git a/omero_search_engine/cache_functions/elasticsearch/elasticsearch_templates.py b/omero_search_engine/cache_functions/elasticsearch/elasticsearch_templates.py index 0fa76da9..bdd01784 100644 --- a/omero_search_engine/cache_functions/elasticsearch/elasticsearch_templates.py +++ b/omero_search_engine/cache_functions/elasticsearch/elasticsearch_templates.py @@ -37,6 +37,10 @@ "properties": { "doc_type": {"type": "keyword"}, "id": {"type": "long"}, + "data_source": { + "type": "text", + "fields": {"keyvalue": {"type": "keyword"}}, + }, "name": { "type": "text", "fields": {"keyvalue": {"type": "keyword"}}, @@ -108,6 +112,10 @@ "plate_id": {"type": "long"}, "well_id": {"type": "long"}, "wellsample_id": {"type": "long"}, + "data_source": { + "type": "text", + "fields": {"keyvalue": {"type": "keyword"}}, + }, "name": { "type": "text", "fields": {"keyvalue": {"type": "keyword"}}, @@ -177,6 +185,10 @@ "type": "text", "fields": {"keyresource": {"type": "keyword"}}, }, # noqa + "data_source": { + "type": "text", + "fields": {"keyvalue": {"type": "keyword"}}, + }, "name": {"type": "text", "fields": {"keyname": {"type": "keyword"}}}, }, } @@ -197,6 +209,10 @@ "id": { "type": "keyword", }, + "data_source": { + "type": "text", + "fields": {"keyvalue": {"type": "keyword"}}, + }, "resource": { "type": "text", "fields": { @@ -250,6 +266,10 @@ "type": "text", "fields": {"keyresource": {"type": "keyword"}}, }, + "data_source": { + "type": "text", + "fields": {"keyvalue": {"type": "keyword"}}, + }, "name": { "type": "text", "fields": {"keyname": {"type": "keyword"}}, diff --git a/omero_search_engine/cache_functions/elasticsearch/transform_data.py b/omero_search_engine/cache_functions/elasticsearch/transform_data.py index 1d7cf060..4f2ed04c 100644 --- a/omero_search_engine/cache_functions/elasticsearch/transform_data.py +++ b/omero_search_engine/cache_functions/elasticsearch/transform_data.py @@ -70,6 +70,7 @@ def create_index(es_index, template): es_index=es_index, error=str(ex) ) ) + raise ex return False return True @@ -188,9 +189,10 @@ def delete_index(resource, es_index=None): return False -def prepare_images_data(data, doc_type): +def prepare_images_data(data, data_source, doc_type): data_record = [ "id", + "data_source", "owner_id", "experiment", "group_id", @@ -230,7 +232,10 @@ def prepare_images_data(data, doc_type): for rcd in data_record: if rcd in ["mapvalue_name", "mapvalue_value"]: continue - row_to_insert[rcd] = row[rcd] + elif rcd == "data_source": + row_to_insert[rcd] = data_source + else: + row_to_insert[rcd] = row[rcd] row_to_insert["key_values"] = [] data_to_be_inserted[row["id"]] = row_to_insert @@ -246,9 +251,10 @@ def prepare_images_data(data, doc_type): return data_to_be_inserted -def prepare_data(data, doc_type): +def prepare_data(data, data_source, doc_type): data_record = [ "id", + "data_source", "owner_id", "group_id", "name", @@ -277,7 +283,10 @@ def prepare_data(data, doc_type): for rcd in data_record: if rcd in ["mapvalue_name", "mapvalue_value"]: continue - row_to_insert[rcd] = row.get(rcd) + elif rcd == "data_source": + row_to_insert[rcd] = data_source + else: + row_to_insert[rcd] = row.get(rcd) row_to_insert["key_values"] = [] data_to_be_inserted[row["id"]] = row_to_insert key_value = row_to_insert["key_values"] @@ -292,7 +301,7 @@ def prepare_data(data, doc_type): return data_to_be_inserted -def handle_file(file_name, es_index, cols, is_image, from_json): +def handle_file(file_name, es_index, cols, is_image, data_source, from_json): co = 0 search_omero_app.logger.info("Reading the csv file") if not from_json: @@ -301,9 +310,9 @@ def handle_file(file_name, es_index, cols, is_image, from_json): df.columns = cols search_omero_app.logger.info("Prepare the data...") if not is_image: - data_to_be_inserted = prepare_data(df, es_index) + data_to_be_inserted = prepare_data(df, data_source, es_index) else: - data_to_be_inserted = prepare_images_data(df, es_index) + data_to_be_inserted = prepare_images_data(df, data_source, es_index) # print (data_to_be_inserted) search_omero_app.logger.info(len(data_to_be_inserted)) with open(file_name + ".json", "w") as outfile: @@ -344,7 +353,7 @@ def get_file_list(path_name): return f -def insert_resource_data(folder, resource, from_json): +def insert_resource_data(folder, resource, data_source, from_json): search_omero_app.logger.info( "Adding data to\ {} using {}".format( @@ -364,6 +373,7 @@ def insert_resource_data(folder, resource, from_json): is_image = True cols = [ "id", + "data_source", "owner_id", "experiment", "group_id", @@ -388,6 +398,7 @@ def insert_resource_data(folder, resource, from_json): if resource == "well": cols = [ "id", + "data_source", "owner_id", "group_id", "mapvalue_name", @@ -397,6 +408,7 @@ def insert_resource_data(folder, resource, from_json): else: cols = [ "id", + "data_source", "owner_id", "group_id", "name", @@ -422,20 +434,21 @@ def insert_resource_data(folder, resource, from_json): n = len(files_list) search_omero_app.logger.info("%s==%s == %s" % (f_con, fil, n)) file_name = os.path.join(folder, fil) - handle_file(file_name, es_index, cols, is_image, from_json) + handle_file(file_name, es_index, cols, is_image, data_source, from_json) search_omero_app.logger.info("File: %s has been processed" % fil) try: with open(file_name + ".done", "w") as outfile: json.dump(f_con, outfile) - except Exception: + except Exception as ex: search_omero_app.logger.info("Error .... writing Done file ...") + raise ex f_con += 1 total_process = 0 -def get_insert_data_to_index(sql_st, resource, data_source): +def get_insert_data_to_index(sql_st, resource, data_source, clean_index=True): """ - Query the postgreSQL database server and get metadata (key-value pair) - Process the results data @@ -446,11 +459,12 @@ def get_insert_data_to_index(sql_st, resource, data_source): """ from datetime import datetime - delete_index(resource) - create_omero_indexes(resource) + if clean_index: + delete_index(resource) + create_omero_indexes(resource) sql_ = "select max (id) from %s" % resource res2 = search_omero_app.config.database_connectors[data_source].execute_query(sql_) - #res2 = search_omero_app.config["database_connector"].execute_query(sql_) + # res2 = search_omero_app.config["database_connector"].execute_query(sql_) max_id = res2[0]["max"] if not max_id: return @@ -467,7 +481,7 @@ def get_insert_data_to_index(sql_st, resource, data_source): global total_process total_process = len(vals) # Determine the number of processes inside the multiprocessing pool, - # i.e the number of allowed processors to run at the same time + # i.e. the number of allowed processors to run at the same time # It depends on the number of the processors that the hosting machine has no_processors = search_omero_app.config.get("NO_PROCESSES") if not no_processors: @@ -487,11 +501,14 @@ def get_insert_data_to_index(sql_st, resource, data_source): counter_val = manager.Value("i", 0) func = partial(processor_work, lock, counter_val) # map the data which will be consumed by the processes inside the pool - res = pool.map(func, vals) + res = pool.map(func, vals) # noqa search_omero_app.logger.info(cur_max_id) delta = str(datetime.now() - start_time) search_omero_app.logger.info("Total time=%s" % delta) - #print(res) + # print(res) + except Exception as ex: + print("Error is : %s" % ex) + raise ex finally: pool.close() @@ -513,6 +530,9 @@ def processor_work(lock, global_counter, val): try: lock.acquire() global_counter.value += 1 + except Exception as ex: + print("Error is %s" % ex) + raise ex finally: lock.release() whereclause = " where %s.id < %s and %s.id >= %s" % ( @@ -529,18 +549,18 @@ def processor_work(lock, global_counter, val): conn = search_omero_app.config.database_connectors[data_source] results = conn.execute_query(mod_sql) search_omero_app.logger.info("Processing the results...") - process_results(results, resource, lock) + process_results(results, resource, data_source, lock) average_time = (datetime.now() - st) / 2 search_omero_app.logger.info("Done") search_omero_app.logger.info("elpased time:%s" % average_time) -def process_results(results, resource, lock=None): +def process_results(results, resource, data_source, lock=None): df = pd.DataFrame(results).replace({np.nan: None}) - insert_resource_data_from_df(df, resource, lock) + insert_resource_data_from_df(df, resource, data_source, lock) -def insert_resource_data_from_df(df, resource, lock=None): +def insert_resource_data_from_df(df, resource, data_source, lock=None): if resource == "image": is_image = True else: @@ -548,9 +568,9 @@ def insert_resource_data_from_df(df, resource, lock=None): es_index = resource_elasticsearchindex.get(resource) search_omero_app.logger.info("Prepare the data...") if not is_image: - data_to_be_inserted = prepare_data(df, es_index) + data_to_be_inserted = prepare_data(df, data_source, es_index) else: - data_to_be_inserted = prepare_images_data(df, es_index) + data_to_be_inserted = prepare_images_data(df, data_source, es_index) # print (data_to_be_inserted) search_omero_app.logger.info(len(data_to_be_inserted)) @@ -636,7 +656,7 @@ def insert_plate_data(folder, plate_file): def save_key_value_buckets( - resource_table_=None, data_source=None, re_create_index=False, only_values=False + resource_table_=None, data_source=None, clean_index=False, only_values=False ): """ Query the database and get all available keys and values for @@ -645,12 +665,12 @@ def save_key_value_buckets( It will use multiprocessing pool to use parallel processing """ - if data_source==None: - return + if data_source is None: + return "No data source is provided" es_index = "key_value_buckets_information" es_index_2 = "key_values_resource_cach" - if re_create_index: + if clean_index: if not only_values: search_omero_app.logger.info( "Try to delete if exist: %s " % delete_es_index(es_index) @@ -699,6 +719,7 @@ def save_key_value_buckets( name_results = [ { "id": res["id"], + # "data_source": data_source, "description": res["description"], "name": res["name"], "no_images": res["no_images"], @@ -706,7 +727,10 @@ def save_key_value_buckets( for res in name_result ] - push_keys_cache_index(resource_keys, resource_table, es_index_2, name_results) + push_keys_cache_index( + resource_keys, resource_table, data_source, es_index_2, name_results + ) + print(type(resource_keys), type(resource_table), es_index_2, type(name_results)) if only_values: continue search_omero_app.logger.info( @@ -718,7 +742,9 @@ def save_key_value_buckets( # prepare the data which will be consumed by the processes # inside the multiprocessing Pool for key in resource_keys: - vals.append((key, resource_table, es_index, len(resource_keys))) + vals.append( + (key, resource_table, es_index, len(resource_keys), data_source) + ) # determine the number of processes inside the process pool no_processors = search_omero_app.config.get("NO_PROCESSES") if not no_processors: @@ -747,6 +773,7 @@ def save_key_value_buckets_process(lock, global_counter, vals): resource_table = vals[1] es_index = vals[2] total = vals[3] + data_source = vals[4] wrong_keys = {} try: lock.acquire() @@ -760,7 +787,10 @@ def save_key_value_buckets_process(lock, global_counter, vals): % (global_counter.value, total) ) search_omero_app.logger.info("Checking {key}".format(key=key)) - data_to_be_pushed = get_buckets(key, resource_table, es_index, lock) + data_to_be_pushed = get_buckets( + key, data_source, resource_table, es_index, lock + ) + actions = [] search_omero_app.logger.info( "data_to_be_pushed:\ @@ -776,6 +806,7 @@ def save_key_value_buckets_process(lock, global_counter, vals): search_omero_app.logger.info(helpers.bulk(es, actions)) except Exception as e: search_omero_app.logger.info("Error: %s" % str(e)) + raise e # raise e finally: lock.release() @@ -785,6 +816,7 @@ def save_key_value_buckets_process(lock, global_counter, vals): Error:%s " % (global_counter.value, str(e)) ) + raise e if wrong_keys.get(resource_table): wrong_keys[resource_table] = wrong_keys[resource_table].append(key) else: @@ -798,17 +830,22 @@ def get_keys(res_table, data_source): annotation_mapvalue.annotation_id".format( res_table=res_table ) - results = search_omero_app.config.database_connectors[data_source].execute_query(sql) - #results = search_omero_app.config["database_connector"].execute_query(sql) + results = search_omero_app.config.database_connectors[data_source].execute_query( + sql + ) + # results = search_omero_app.config["database_connector"].execute_query(sql) results = [res["name"] for res in results] return results -def push_keys_cache_index(results, resource, es_index, resourcename=None): +def push_keys_cache_index( + results, resource, data_resource, es_index, resourcename=None +): row = {} row["name"] = results row["doc_type"] = es_index row["resource"] = resource + row["data_resource"] = data_resource if resourcename: row["resourcename"] = resourcename @@ -819,20 +856,23 @@ def push_keys_cache_index(results, resource, es_index, resourcename=None): search_omero_app.logger.info(helpers.bulk(es, actions)) -def get_buckets(key, resourcse, es_index, lock=None): +def get_buckets(key, data_source, resourcse, es_index, lock=None): try: lock.acquire() - res = get_all_values_for_a_key(resourcse, key) + res = get_all_values_for_a_key(resourcse, data_source, key) + except Exception as ex: + print("Error %s" % ex) + raise ex finally: lock.release() search_omero_app.logger.info( "number of bucket: %s" % res.get("total_number_of_buckets") ) - data_to_be_pushed = prepare_bucket_index_data(res, resourcse, es_index) + data_to_be_pushed = prepare_bucket_index_data(res, resourcse, data_source, es_index) return data_to_be_pushed -def prepare_bucket_index_data(results, res_table, es_index): +def prepare_bucket_index_data(results, res_table, data_source, es_index): data_to_be_inserted = [] for result in results.get("data"): row = {} @@ -844,6 +884,7 @@ def prepare_bucket_index_data(results, res_table, es_index): row["Value"] = result["Value"] row["items_in_the_bucket"] = result["Number of %ss" % res_table] row["total_buckets"] = results["total_number_of_buckets"] + row["data_source"] = data_source row["total_items_in_saved_buckets"] = results["total_number"] row["total_items"] = results["total_number_of_%s" % res_table] return data_to_be_inserted diff --git a/omero_search_engine/database/utils.py b/omero_search_engine/database/utils.py index 8f6e58da..ce6f2199 100644 --- a/omero_search_engine/database/utils.py +++ b/omero_search_engine/database/utils.py @@ -28,10 +28,16 @@ def restore_database(): """ from omero_search_engine import search_omero_app + print("1111111111") main_dir = os.path.abspath(os.path.dirname(__file__)) + print(main_dir) mm = main_dir.replace("omero_search_engine/database", "") + print(mm) sys.path.append(mm) + print("3333333333333") + dat_file_name = os.path.join(mm, "app_data/omero.pgdump") + print(dat_file_name) restore_command = "psql --username %s --host %s --port %s -d %s -f %s" % ( search_omero_app.config.get("DATABASE_USER"), search_omero_app.config.get("DATABASE_SERVER_URI"), @@ -39,6 +45,7 @@ def restore_database(): search_omero_app.config.get("DATABASE_NAME"), dat_file_name, ) + print(restore_command) try: proc = subprocess.Popen( restore_command, diff --git a/omero_search_engine/validation/results_validator.py b/omero_search_engine/validation/results_validator.py index af6b3f71..9feaef77 100644 --- a/omero_search_engine/validation/results_validator.py +++ b/omero_search_engine/validation/results_validator.py @@ -75,7 +75,7 @@ class Validator(object): def __init__(self, data_source, deep_check=False): self.deep_check = deep_check - self.data_source=data_source + self.data_source = data_source self.identical = True def set_simple_query(self, resource, name, value, type="keyvalue"): @@ -139,7 +139,7 @@ def get_in_sql(self, clauses, name="in_clause"): names=names, values=values, operator="not in" ) # sql = query_methods[name].substitute(names=names, values=values) - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[self.data_source] postgres_results = conn.execute_query(sql) results = [item["id"] for item in postgres_results] @@ -160,7 +160,7 @@ def get_or_sql(self, clauses, name="query_image_or"): values = "'%s'" % claus[1].lower() # sql = query_methods[name].substitute(names=names, values=values) sql = query_methods[name].substitute(names=names, values=values, operator="in") - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[self.data_source] postgres_results = conn.execute_query(sql) results = [item["id"] for item in postgres_results] @@ -175,7 +175,7 @@ def get_sql_value_only(self, clauses): else: operator = "and" conn = search_omero_app.config.database_connectors[self.data_source] - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] all_res = [] for val in clauses: sql = query_methods["image_value_only"].substitute(value=val) @@ -201,7 +201,7 @@ def get_and_sql(self, clauses): value=claus[1].lower(), ) conn = search_omero_app.config.database_connectors[self.data_source] - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] postgres_results = conn.execute_query(sql) res = [item["id"] for item in postgres_results] search_omero_app.logger.info("results for 'and' received %s" % len(res)) @@ -223,12 +223,12 @@ def get_results_db(self, operator=None): name=self.name ) conn = search_omero_app.config.database_connectors[self.data_source] - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] self.postgres_results = conn.execute_query(sql) elif self.value: sql = query_methods["search_any_value"].substitute(val_part=self.value) conn = search_omero_app.config.database_connectors[self.data_source] - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] self.postgres_results = conn.execute_query(sql) return if self.type == "in_clause": @@ -280,7 +280,7 @@ def get_results_db(self, operator=None): sql = sql + " and %s.group_id=%s" % (self.resource, self.group_id) print(sql) # search_omero_app.logger.info ("sql: %s"%sql) - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[self.data_source] postgres_results = conn.execute_query(sql) self.postgres_results = [item["id"] for item in postgres_results] @@ -499,7 +499,7 @@ def get_containers_test_cases(self): projects_count_sql = query_methods["projects_count"].substitute( key=self.name, value=self.value ) - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[self.data_source] screens_results = conn.execute_query(screens_count_sql) projects_results = conn.execute_query(projects_count_sql) @@ -727,7 +727,7 @@ def validate_queries(json_file, data_source, deep_check): search_omero_app.logger.info("Total time=%s" % str(end_in - start_time)) # test the same but change the operator to not in search_omero_app.logger.info("Total time=%s" % str(end_in - start_time)) - validator_not_in = Validator(data_source,deep_check) + validator_not_in = Validator(data_source, deep_check) validator_not_in.set_in_query(case, resource, type="not_in_clause") res = validator_not_in.compare_results() messages.append( @@ -930,7 +930,7 @@ def check_number_images_sql_containers_using_ids(data_source): dd = True - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[data_source] all_names = get_resource_names("all") for resource in all_names: @@ -1020,10 +1020,9 @@ def get_no_images_sql_containers(data_source, write_report=True): ) from omero_search_engine.api.v1.resources.utils import adjust_query_for_container - #conn = search_omero_app.config["database_connector"] + # conn = search_omero_app.config["database_connector"] conn = search_omero_app.config.database_connectors[data_source] - all_names = get_resource_names("all") messages = [] for resource in all_names: diff --git a/unit_tests/test_app.py b/unit_tests/test_app.py index 6090436b..4ad9994b 100644 --- a/unit_tests/test_app.py +++ b/unit_tests/test_app.py @@ -69,7 +69,7 @@ # deep_check should be a configuration item deep_check = True -#for data_source in search_omero_app.config.database_connectors.keys(): +# for data_source in search_omero_app.config.database_connectors.keys(): class BasicTestCase(unittest.TestCase): @@ -110,7 +110,9 @@ def test_query_database(self): test connection with postgresql database """ for data_source in search_omero_app.config.database_connectors.keys(): - res = search_omero_app.config.database_connectors[data_source].execute_query(sql) + res = search_omero_app.config.database_connectors[ + data_source + ].execute_query(sql) self.assertIsNotNone(res) self.assertEqual(res[0]["current_database"], "omero")