Skip to content

Commit

Permalink
Merge pull request #5 from lsst/tickets/DM-46812
Browse files Browse the repository at this point in the history
DM-46812: Improve performance of validity time updates
  • Loading branch information
andy-slac authored Oct 14, 2024
2 parents d821f4e + ad4076a commit e7e506f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: check-yaml
args:
- "--unsafe"
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.2.0
rev: 24.10.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -23,6 +23,6 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.1
rev: v0.6.9
hooks:
- id: ruff
36 changes: 31 additions & 5 deletions python/lsst/dax/ppdb/sql/_ppdb_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
_MON = monitor.MonAgent(__name__)


VERSION = VersionTuple(0, 1, 0)
VERSION = VersionTuple(0, 1, 1)
"""Version for the code defined in this module. This needs to be updated
(following compatibility rules) when schema produced by this code changes.
"""
Expand Down Expand Up @@ -118,7 +118,7 @@ def __init__(self, config: PpdbConfig):
self.config = config

self._sa_metadata, schema_version = self._read_schema(
config.felis_path, config.schema_name, config.felis_schema
config.felis_path, config.schema_name, config.felis_schema, config.db_url
)

self._engine = self._make_engine(config)
Expand Down Expand Up @@ -170,7 +170,7 @@ def init_database(
drop : `bool`
If `True` then drop existing tables.
"""
sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema)
sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema, db_url)
config = PpdbSqlConfig(
db_url=db_url,
schema_name=schema_name,
Expand All @@ -185,7 +185,7 @@ def init_database(

@classmethod
def _read_schema(
cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None
cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None, db_url: str
) -> tuple[sqlalchemy.schema.MetaData, VersionTuple]:
"""Read felis schema definitions for PPDB.
Expand All @@ -199,6 +199,8 @@ def _read_schema(
felis_schema : `str`, optional
Name of the schema in YAML file, if `None` then file has to contain
single schema.
db_url : `str`
Database URL.
Returns
-------
Expand Down Expand Up @@ -297,6 +299,25 @@ def _read_schema(
converter = ModelToSql(metadata=metadata)
converter.make_tables(schema.tables)

# Add an additional index to DiaObject table to speed up replication.
# This is a partial index (Postgres-only), we do not have support for
# partial indices in ModelToSql, so we have to do it using sqlalchemy.
url = sqlalchemy.engine.make_url(db_url)
if url.get_backend_name() == "postgresql":
table: sqlalchemy.schema.Table | None = None
for table in metadata.tables.values():
if table.name == "DiaObject":
name = "IDX_DiaObject_diaObjectId_validityEnd_IS_NULL"
sqlalchemy.schema.Index(
name,
table.columns["diaObjectId"],
postgresql_where=table.columns["validityEnd"].is_(None),
)
break
else:
# Cannot find table, odd, but what do I know.
pass

return metadata, version

@classmethod
Expand Down Expand Up @@ -547,7 +568,12 @@ def _store_objects(
order_by=table.columns["validityStart"],
)
.label("rank"),
).where(table.columns["diaObjectId"].in_(chunk))
).where(
sqlalchemy.and_(
table.columns["diaObjectId"].in_(chunk),
table.columns["validityEnd"] == None, # noqa: E711
)
)
)
sub1 = select_cte.alias("s1")
sub2 = select_cte.alias("s2")
Expand Down

0 comments on commit e7e506f

Please sign in to comment.