Skip to content

Commit

Permalink
The required modifications to enable multi data sources search
Browse files Browse the repository at this point in the history
  • Loading branch information
khaledk2 committed Sep 27, 2024
1 parent 2e083ad commit 48ba2f3
Show file tree
Hide file tree
Showing 26 changed files with 1,394 additions and 620 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ on:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
services:
postgres:
image: postgres:11
image: postgres:16
env:
POSTGRES_USER: postgress
POSTGRES_PASSWORD: passwprd
Expand Down Expand Up @@ -45,7 +45,8 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements.txt;
# Configure database url
python manage.py set_database_configuration -u localhost -s ${{ job.services.postgres.ports[5432] }} -n postgress -p passwprd
#python manage.py set_database_configuration -u localhost -s ${{ job.services.postgres.ports[5432] }} -n postgress -p passwprd
python manage.py set_database_configuration -u localhost -s ${{ job.services.postgres.ports[5432] }} -n postgress -p passwprd -w omero1 -d omero -b omero.pgdump
# configure elasticsearch
python manage.py set_elasticsearch_configuration -e localhost:${{ job.services.elasticsearch.ports[9200] }}
# download and extract the database backup file
Expand Down
19 changes: 12 additions & 7 deletions configurations/app_config.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
DATABASE_SERVER_URI : "127.0.0.1"
DATABASE_USER : "database_user"
DATABASE_PORT: 5432
DATABASE_PASSWORD : "password"
DATABASE_NAME : "omero"
CACHE_FOLDER : "path/to/folder/app_data"
SECRET_KEY : "fsdasdh3424vvcsd467fgh"
ASYNCHRONOUS_SEARCH : True
ELASTICSEARCH_URL : "https://localhost:9200"
IDR_TEST_FILE_URL : "https://raw.githubusercontent.com/IDR/idr.openmicroscopy.org/master/_data/studies.tsv"
PAGE_SIZE : 1000
CACHE_ROWS : 10000
MAX_RETUNED_ITEMS : 1700000
CACHE_ROWS : 1000
MAX_RETUNED_ITEMS : 1000
ELASTICSEARCH_BACKUP_FOLDER: "path/to/elasticsearch/backup/folder"
verify_certs: False
ELASTIC_PASSWORD: elasticsearch_user_password
BASE_FOLDER: /etc/searchengine/
#NO_PROCESSES: 1
DATA_SOURCES:
- name: omero1
DATABASE:
DATABASE_NAME: omero
DATABASE_USER: database_user
DATABASE_PASSWORD: password
DATABASE_SERVER_URI: 127.0.0.1
DATABASE_PORT: 5432
DATABASE_BACKUP_FILE: database.sql
92 changes: 61 additions & 31 deletions configurations/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
def load_configuration_variables_from_file(config):
# loading application configuration variables from a file
print("Injecting config variables from :%s" % app_config.INSTANCE_CONFIG)
with open(app_config.INSTANCE_CONFIG) as f:
cofg = yaml.load(f)
with open(app_config.INSTANCE_CONFIG, "rt") as f:

# with open(app_config.INSTANCE_CONFIG) as f:
cofg = yaml.safe_load(f.read())
for x, y in cofg.items():
setattr(config, x, y)
if hasattr(config, "verify_certs"):
try:
verify_certs = json.load(config.verify_certs)
except Exception as ex:
print(str(ex))
print("Error %s" % str(ex))
verify_certs = False
else:
verify_certs = False
Expand All @@ -53,43 +55,71 @@ def set_database_connection_variables(config):
:param database: databse name
:return:
"""
if hasattr(config, "DATABASE_PORT"):
address = config.DATABASE_SERVER_URI + ":%s" % app_config.DATABASE_PORT
else:
address = config.DATABASE_SERVER_URI
app_config.database_connector = ""
app_config.DATABASE_URI = "postgresql://%s:%s@%s/%s" % (
config.DATABASE_USER,
config.DATABASE_PASSWORD,
address,
config.DATABASE_NAME,
)


def update_config_file(updated_configuration):
from omero_search_engine.database.database_connector import DatabaseConnector

config.database_connectors = {}
for source in config.DATA_SOURCES:
if source.get("DATABASE").get("DATABASE_PORT"):
address = source.get("DATABASE").get(
"DATABASE_SERVER_URI"
) + ":%s" % source.get("DATABASE").get("DATABASE_PORT")
else:
address = source.get("DATABASE").get("DATABASE_SERVER_URI")
DATABASE_URI = "postgresql://%s:%s@%s/%s" % (
source.get("DATABASE").get("DATABASE_USER"),
source.get("DATABASE").get("DATABASE_PASSWORD"),
address,
source.get("DATABASE").get("DATABASE_NAME"),
)
database_connector = DatabaseConnector(
source.get("DATABASE").get("DATABASE_NAME"), DATABASE_URI
)
config.database_connectors[source.get("name")] = database_connector


def update_config_file(updated_configuration, configure_database=False):
is_changed = False
with open(app_config.INSTANCE_CONFIG) as f:
configuration = yaml.load(f)
found = []
for key, value in updated_configuration.items():
if key in configuration:
if configuration[key] != value:
configuration[key] = value
is_changed = True
print("%s is Updated, new value is %s " % (key, value))
else:
found.append(key)
if len(found) != len(updated_configuration):
if not configure_database:
found = []
for key, value in updated_configuration.items():
if key not in found:
configuration[key] = value
print("%s value is added with value %s " % (key, value))
is_changed = True
if key in configuration:
if configuration[key] != value:
configuration[key] = value
is_changed = True
print("%s is Updated, new value is %s " % (key, value))
else:
found.append(key)
if len(found) != len(updated_configuration):
for key, value in updated_configuration.items():
if key not in found:
configuration[key] = value
print("%s value is added with value %s " % (key, value))
is_changed = True
else:
is_changed = config_database(configuration, updated_configuration)

if is_changed:
with open(app_config.INSTANCE_CONFIG, "w") as f:
yaml.dump(configuration, f)

def config_database(configuration, updated_configuration):
for data_source in configuration.get("DATA_SOURCES"):
changed = False
Found = False
if data_source["name"].lower() == updated_configuration["name"].lower():
Found = True
for k, v in updated_configuration["DATABASE"].items():
if data_source["DATABASE"][k] != v:
data_source["DATABASE"][k] = v
changed = True
break
if not Found:
configuration.get("DATA_SOURCES").append(updated_configuration)
changed = True
return changed


class app_config(object):
# the configuration can be loadd from yml file later
Expand Down
116 changes: 89 additions & 27 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ def show_saved_indices():
)

all_indexes = get_all_indexes()
for index in all_indexes:
print("Index: ==>>>", index)
# return (all_indexes)
return all_indexes


@manager.command
Expand Down Expand Up @@ -115,24 +113,33 @@ def sql_results_to_panda():


@manager.command
def restore_postgresql_database():
@manager.option(
"-s",
"--source",
help="data source name, restore all the data sources is the default", # noqa
)
def restore_postgresql_database(source="all"):
from omero_search_engine.database.utils import restore_database

restore_database()

restore_database(source)

@manager.command
@manager.option(
"-r",
"--resource",
help="resource name, creating all the indexes for all the resources is the default", # noqa
)
@manager.option(
"-s",
"--source",
help="data source name, ndexeing all the data sources is the default", # noqa
)
@manager.option(
"-b",
"--backup",
help="if True, backup will be called ", # noqa
)
def get_index_data_from_database(resource="all", backup="True"):
def get_index_data_from_database(resource="all", source="all", backup="True"):
"""
insert data in Elasticsearch index for each resource
It gets the data from postgres database server
Expand All @@ -147,23 +154,45 @@ def get_index_data_from_database(resource="all", backup="True"):
import json

backup = json.loads(backup.lower())
if resource != "all":
sql_st = sqls_resources.get(resource)
if not sql_st:
return
get_insert_data_to_index(sql_st, resource)
if not source:
print("Data source is required to process")
return
elif source == "all":
clean_index = True

else:
clean_index = False
for data_source in search_omero_app.config.database_connectors.keys():
if source.lower() != "all" and data_source.lower() != source.lower():
continue
# if resource != "all":
# sql_st = sqls_resources.get(resource)
# if not sql_st:
# return
# get_insert_data_to_index(sql_st, resource)
# else:
for res, sql_st in sqls_resources.items():
get_insert_data_to_index(sql_st, res)
if resource.lower() != "all" and resource.lower() != res.lower():
continue
get_insert_data_to_index(sql_st, res, data_source, clean_index)
save_key_value_buckets(
resource_table_=None, re_create_index=True, only_values=False
resource_table_=None,
data_source=data_source,
clean_index=clean_index,
only_values=False,
)
print("!Done for data_source: %s from %s" % (data_source, search_omero_app.config.database_connectors.keys()))
if clean_index:
clean_index = False

# validat ethe indexing
test_indexing_search_query(deep_check=False, check_studies=True)
test_indexing_search_query(
source=data_source, deep_check=False, check_studies=True
)

# backup the index data
#backup the index data
if backup:
backup_elasticsearch_data()
backup_elasticsearch_data()


# set configurations
Expand All @@ -172,24 +201,39 @@ def get_index_data_from_database(resource="all", backup="True"):
@manager.option("-s", "--server_port_number", help="database port number")
@manager.option("-d", "--database", help="database name")
@manager.option("-n", "--name", help="database usernname")
@manager.option("-b", "--backup_filename", help="database backup filename ")
@manager.option("-p", "--password", help="database username password")
@manager.option("-w", "--working_data_source", help="data source")
def set_database_configuration(
url=None, server_port_number=None, database=None, name=None, password=None
working_data_source=None,
url=None,
server_port_number=None,
database=None,
backup_filename=None,
name=None,
password=None,
):
if not working_data_source:
print("Data source is required to process")
database_attrs = {}
if url:
database_attrs["DATABASE_SERVER_URI"] = url
databse_config = {}
databse_config["name"] = working_data_source
databse_config["DATABASE"] = database_attrs
if database:
database_attrs["DATABASE_NAME"] = database
if url:
database_attrs["DATABASE_SERVER_URI"] = url
if name:
database_attrs["DATABASE_USER"] = name
if password:
database_attrs["DATABASE_PASSWORD"] = password
if server_port_number and server_port_number.isdigit():
database_attrs["DATABASE_PORT"] = server_port_number
if backup_filename:
database_attrs["DATABASE_BACKUP_FILE"] = backup_filename

if len(database_attrs) > 0:
update_config_file(database_attrs)
update_config_file(databse_config, configure_database=True)
else:
search_omero_app.logger.info(
"At least one database attribute\
Expand Down Expand Up @@ -289,6 +333,12 @@ def set_no_processes(no_processes=None):


@manager.command
@manager.option(
"-d",
"--data_source",
help="data source name, the default is all", # noqa
)

@manager.option(
"-r",
"--resource",
Expand All @@ -300,15 +350,15 @@ def set_no_processes(no_processes=None):
help="creating the elastic search index if set to True", # noqa
)
@manager.option("-o", "--only_values", help="creating cached values only ")
def cache_key_value_index(resource=None, create_index=None, only_values=None):
def cache_key_value_index(resource=None, data_source='all',create_index=None, only_values=None):
"""
Cache the value bucket for each value for each resource
"""
from omero_search_engine.cache_functions.elasticsearch.transform_data import (
save_key_value_buckets,
)

save_key_value_buckets(resource, create_index, only_values)
save_key_value_buckets(resource,data_source ,create_index, only_values)


@manager.command
Expand All @@ -319,8 +369,16 @@ def cache_key_value_index(resource=None, create_index=None, only_values=None):
"--deep_check",
help="compare all the images from both search engine and database server, default is False so it will compare the number of images and the first searchengine page", # noqa
)
@manager.option(
"-s",
"--source",
help="data source name, ndexeing all the data sources is the default", # noqa
)
def test_indexing_search_query(
json_file="app_data/test_index_data.json", deep_check=False, check_studies=False
json_file="app_data/test_index_data.json",
source=None,
deep_check=False,
check_studies=False,
):
"""
test the indexing and the searchengine query functions
Expand All @@ -339,11 +397,15 @@ def test_indexing_search_query(
get_no_images_sql_containers,
)

validate_queries(json_file, deep_check)
if not source:
print("Data source is required to process ")
return

validate_queries(json_file, source, deep_check)
if check_studies:
test_no_images()
test_no_images(source)
get_omero_stats()
get_no_images_sql_containers()
get_no_images_sql_containers(data_source=source)


@manager.command
Expand Down
Loading

0 comments on commit 48ba2f3

Please sign in to comment.