From ad4076a17d2726348bec86a6a9b16db8a0e344f9 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Mon, 14 Oct 2024 11:40:21 -0700 Subject: [PATCH] Improve performance of validity time updates (DM-46812) Addes a special Postgres-only partial index that reduces the amount of data generated by sub-queries in the update query. Modified subquery to use that index. --- .pre-commit-config.yaml | 6 ++--- python/lsst/dax/ppdb/sql/_ppdb_sql.py | 36 +++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a57c469..01f70aa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v5.0.0 hooks: - id: check-yaml args: @@ -8,7 +8,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/psf/black-pre-commit-mirror - rev: 24.2.0 + rev: 24.10.0 hooks: - id: black # It is recommended to specify the latest version of Python @@ -23,6 +23,6 @@ repos: name: isort (python) - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.3.1 + rev: v0.6.9 hooks: - id: ruff diff --git a/python/lsst/dax/ppdb/sql/_ppdb_sql.py b/python/lsst/dax/ppdb/sql/_ppdb_sql.py index eff3a98..c09064b 100644 --- a/python/lsst/dax/ppdb/sql/_ppdb_sql.py +++ b/python/lsst/dax/ppdb/sql/_ppdb_sql.py @@ -60,7 +60,7 @@ _MON = monitor.MonAgent(__name__) -VERSION = VersionTuple(0, 1, 0) +VERSION = VersionTuple(0, 1, 1) """Version for the code defined in this module. This needs to be updated (following compatibility rules) when schema produced by this code changes. """ @@ -118,7 +118,7 @@ def __init__(self, config: PpdbConfig): self.config = config self._sa_metadata, schema_version = self._read_schema( - config.felis_path, config.schema_name, config.felis_schema + config.felis_path, config.schema_name, config.felis_schema, config.db_url ) self._engine = self._make_engine(config) @@ -170,7 +170,7 @@ def init_database( drop : `bool` If `True` then drop existing tables. """ - sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema) + sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema, db_url) config = PpdbSqlConfig( db_url=db_url, schema_name=schema_name, @@ -185,7 +185,7 @@ def init_database( @classmethod def _read_schema( - cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None + cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None, db_url: str ) -> tuple[sqlalchemy.schema.MetaData, VersionTuple]: """Read felis schema definitions for PPDB. @@ -199,6 +199,8 @@ def _read_schema( felis_schema : `str`, optional Name of the schema in YAML file, if `None` then file has to contain single schema. + db_url : `str` + Database URL. Returns ------- @@ -297,6 +299,25 @@ def _read_schema( converter = ModelToSql(metadata=metadata) converter.make_tables(schema.tables) + # Add an additional index to DiaObject table to speed up replication. + # This is a partial index (Postgres-only), we do not have support for + # partial indices in ModelToSql, so we have to do it using sqlalchemy. + url = sqlalchemy.engine.make_url(db_url) + if url.get_backend_name() == "postgresql": + table: sqlalchemy.schema.Table | None = None + for table in metadata.tables.values(): + if table.name == "DiaObject": + name = "IDX_DiaObject_diaObjectId_validityEnd_IS_NULL" + sqlalchemy.schema.Index( + name, + table.columns["diaObjectId"], + postgresql_where=table.columns["validityEnd"].is_(None), + ) + break + else: + # Cannot find table, odd, but what do I know. + pass + return metadata, version @classmethod @@ -547,7 +568,12 @@ def _store_objects( order_by=table.columns["validityStart"], ) .label("rank"), - ).where(table.columns["diaObjectId"].in_(chunk)) + ).where( + sqlalchemy.and_( + table.columns["diaObjectId"].in_(chunk), + table.columns["validityEnd"] == None, # noqa: E711 + ) + ) ) sub1 = select_cte.alias("s1") sub2 = select_cte.alias("s2")