From a39784483a288f03e7dd915de8da84610285e43d Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Fri, 8 Mar 2024 15:45:32 -0800 Subject: [PATCH] First version of PPDB classes and CLI tools. The main purpose of this package id to implement management tools for PDDB, including tools for migration of APDB data to PPDB. Regular clients will access PDDB either via SQL or using TAP services. What is implemented on thi ticket: - Added base Ppdb class with factory methods, and its SQL implementation. - Two CLI tools - `ppdb-cli` for general commands and `ppdb-replication` for replication-related stuff. - Efficient bulk insert methods for Postgres. --- .github/workflows/formatting.yaml | 11 + .github/workflows/lint.yaml | 24 +- .github/workflows/mypy.yaml | 11 + .github/workflows/rebase_checker.yaml | 1 - .github/workflows/yamllint.yaml | 11 + .pre-commit-config.yaml | 28 + README.rst | 2 +- bin.src/SConscript | 1 + bin.src/ppdb-cli | 6 + bin.src/ppdb-replication | 6 + mypy.ini | 28 + pyproject.toml | 111 ++++ python/lsst/dax/ppdb/__init__.py | 1 + python/lsst/dax/ppdb/_factory.py | 110 ++++ python/lsst/dax/ppdb/cli/__init__.py | 0 python/lsst/dax/ppdb/cli/options.py | 89 +++ python/lsst/dax/ppdb/cli/ppdb_cli.py | 59 ++ python/lsst/dax/ppdb/cli/ppdb_replication.py | 72 +++ python/lsst/dax/ppdb/config.py | 64 +++ python/lsst/dax/ppdb/ppdb.py | 139 +++++ python/lsst/dax/ppdb/py.typed | 0 python/lsst/dax/ppdb/scripts/__init__.py | 25 + python/lsst/dax/ppdb/scripts/create_sql.py | 79 +++ .../scripts/replication_list_chunks_apdb.py | 50 ++ .../scripts/replication_list_chunks_ppdb.py | 52 ++ .../lsst/dax/ppdb/scripts/replication_run.py | 106 ++++ python/lsst/dax/ppdb/sql/__init__.py | 22 + python/lsst/dax/ppdb/sql/_ppdb_sql.py | 530 ++++++++++++++++++ python/lsst/dax/ppdb/sql/bulk_insert.py | 134 +++++ python/lsst/dax/ppdb/sql/pg_dump.py | 274 +++++++++ requirements.txt | 9 + setup.cfg | 4 - ups/dax_ppdb.table | 12 +- 33 files changed, 2044 insertions(+), 27 deletions(-) create mode 100644 .github/workflows/formatting.yaml create mode 100644 .github/workflows/mypy.yaml create mode 100644 .github/workflows/yamllint.yaml create mode 100644 .pre-commit-config.yaml create mode 100644 bin.src/ppdb-cli create mode 100644 bin.src/ppdb-replication create mode 100644 mypy.ini create mode 100644 pyproject.toml create mode 100644 python/lsst/dax/ppdb/_factory.py create mode 100644 python/lsst/dax/ppdb/cli/__init__.py create mode 100644 python/lsst/dax/ppdb/cli/options.py create mode 100644 python/lsst/dax/ppdb/cli/ppdb_cli.py create mode 100644 python/lsst/dax/ppdb/cli/ppdb_replication.py create mode 100644 python/lsst/dax/ppdb/config.py create mode 100644 python/lsst/dax/ppdb/ppdb.py create mode 100644 python/lsst/dax/ppdb/py.typed create mode 100644 python/lsst/dax/ppdb/scripts/__init__.py create mode 100644 python/lsst/dax/ppdb/scripts/create_sql.py create mode 100644 python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py create mode 100644 python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py create mode 100644 python/lsst/dax/ppdb/scripts/replication_run.py create mode 100644 python/lsst/dax/ppdb/sql/__init__.py create mode 100644 python/lsst/dax/ppdb/sql/_ppdb_sql.py create mode 100644 python/lsst/dax/ppdb/sql/bulk_insert.py create mode 100644 python/lsst/dax/ppdb/sql/pg_dump.py create mode 100644 requirements.txt diff --git a/.github/workflows/formatting.yaml b/.github/workflows/formatting.yaml new file mode 100644 index 0000000..27f34a6 --- /dev/null +++ b/.github/workflows/formatting.yaml @@ -0,0 +1,11 @@ +name: Check Python formatting + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/formatting.yaml@main diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 2b20981..6c463ee 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -1,22 +1,16 @@ name: lint on: - - push - - pull_request + push: + branches: + - main + pull_request: jobs: - lint: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/lint.yaml@main + ruff: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: 3.7 - - - name: Install - run: pip install -r <(curl https://raw.githubusercontent.com/lsst/linting/main/requirements.txt) - - - name: Run linter - run: flake8 + - uses: actions/checkout@v3 + - uses: chartboost/ruff-action@v1 diff --git a/.github/workflows/mypy.yaml b/.github/workflows/mypy.yaml new file mode 100644 index 0000000..0849ea4 --- /dev/null +++ b/.github/workflows/mypy.yaml @@ -0,0 +1,11 @@ +name: Run mypy + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/mypy.yaml@main diff --git a/.github/workflows/rebase_checker.yaml b/.github/workflows/rebase_checker.yaml index 62aeca7..65516d9 100644 --- a/.github/workflows/rebase_checker.yaml +++ b/.github/workflows/rebase_checker.yaml @@ -1,4 +1,3 @@ ---- name: Check that 'main' is not merged into the development branch on: pull_request diff --git a/.github/workflows/yamllint.yaml b/.github/workflows/yamllint.yaml new file mode 100644 index 0000000..76ad875 --- /dev/null +++ b/.github/workflows/yamllint.yaml @@ -0,0 +1,11 @@ +name: Lint YAML Files + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/yamllint.yaml@main diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a57c469 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-yaml + args: + - "--unsafe" + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/psf/black-pre-commit-mirror + rev: 24.2.0 + hooks: + - id: black + # It is recommended to specify the latest version of Python + # supported by your project here, or alternatively use + # pre-commit's default_language_version, see + # https://pre-commit.com/#top_level-default_language_version + language_version: python3.11 + - repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + name: isort (python) + - repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.3.1 + hooks: + - id: ruff diff --git a/README.rst b/README.rst index 5357088..c40ee96 100644 --- a/README.rst +++ b/README.rst @@ -2,6 +2,6 @@ dax_ppdb ######## - +``dax_ppdb`` is a package in the `LSST Science Pipelines `_. .. Add a brief (few sentence) description of what this package provides. diff --git a/bin.src/SConscript b/bin.src/SConscript index e00724c..6d4fd52 100644 --- a/bin.src/SConscript +++ b/bin.src/SConscript @@ -1,3 +1,4 @@ # -*- python -*- from lsst.sconsUtils import scripts + scripts.BasicSConscript.shebang() diff --git a/bin.src/ppdb-cli b/bin.src/ppdb-cli new file mode 100644 index 0000000..c1b7bae --- /dev/null +++ b/bin.src/ppdb-cli @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from lsst.dax.ppdb.cli import ppdb_cli + +if __name__ == "__main__": + ppdb_cli.main() diff --git a/bin.src/ppdb-replication b/bin.src/ppdb-replication new file mode 100644 index 0000000..a0ecc5c --- /dev/null +++ b/bin.src/ppdb-replication @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from lsst.dax.ppdb.cli import ppdb_replication + +if __name__ == "__main__": + ppdb_replication.main() diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..02dd1d5 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,28 @@ +[mypy] +ignore_errors = False +warn_unused_configs = True +warn_redundant_casts = True +ignore_missing_imports = False +disallow_untyped_defs = True +disallow_incomplete_defs = True + +[mypy-astropy.*] +ignore_missing_imports = True + +[mypy-lsst.daf.*] +ignore_missing_imports = True + +[mypy-lsst.sphgeom] +ignore_missing_imports = True + +[mypy-lsst.dax.ppdb.*] +ignore_missing_imports = False +ignore_errors = False +disallow_untyped_defs = True +disallow_incomplete_defs = True +strict_equality = True +warn_unreachable = True +warn_unused_ignores = True + +[mypy-lsst.dax.ppdb.version] +ignore_errors = True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..15dd062 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,111 @@ +[build-system] +requires = ["setuptools", "lsst-versions >= 1.3.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "lsst-dax-ppdb" +description = "Prompt Products Database for LSST AP pipeline." +license = {text = "GPLv3+ License"} +readme = "README.md" +authors = [ + {name="Rubin Observatory Data Management", email="dm-admin@lists.lsst.org"}, +] +classifiers = [ + "Intended Audience :: Science/Research", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Astronomy", +] +keywords = ["lsst"] +dependencies = [ + "astropy", + "numpy", + "pandas", + "pyyaml >= 5.1", + "sqlalchemy", + "felis", + "lsst-utils", + "lsst-resources", + "lsst-dax-apdb", +] +dynamic = ["version"] + +[project.urls] +"Homepage" = "https://github.com/lsst/dax_ppdb" + +[project.optional-dependencies] +test = [ + "pytest >= 3.2", + "pytest-openfiles >= 0.5.0" +] + +[tool.setuptools.packages.find] +where = ["python"] + +[tool.setuptools] +zip-safe = true +license-files = ["COPYRIGHT", "LICENSE"] + +[tool.setuptools.package-data] +"lsst.dax.ppdb" = ["py.typed"] + +[tool.setuptools.dynamic] +version = { attr = "lsst_versions.get_lsst_version" } + +[tool.black] +line-length = 110 +target-version = ["py311"] + +[tool.isort] +profile = "black" +line_length = 110 + +[tool.lsst_versions] +write_to = "python/lsst/dax/ppdb/version.py" + +[tool.ruff] +exclude = [ + "__init__.py", + "doc/conf.py", +] +line-length = 110 +target-version = "py311" + +[tool.ruff.lint] +ignore = [ + "N802", + "N803", + "N806", + "N812", + "N815", + "N816", + "N999", + "D107", + "D105", + "D102", + "D104", + "D100", + "D200", + "D205", + "D400", +] +select = [ + "E", # pycodestyle + "F", # pycodestyle + "N", # pep8-naming + "W", # pycodestyle + "D", # pydocstyle +] +extend-select = [ + "RUF100", # Warn about unused noqa +] + +[tool.ruff.lint.pycodestyle] +max-doc-length = 79 + +[tool.ruff.lint.pydocstyle] +convention = "numpy" diff --git a/python/lsst/dax/ppdb/__init__.py b/python/lsst/dax/ppdb/__init__.py index 35961cf..2cb6bd2 100644 --- a/python/lsst/dax/ppdb/__init__.py +++ b/python/lsst/dax/ppdb/__init__.py @@ -19,4 +19,5 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from .ppdb import * from .version import * # Generated by sconsUtils diff --git a/python/lsst/dax/ppdb/_factory.py b/python/lsst/dax/ppdb/_factory.py new file mode 100644 index 0000000..ddbfb4e --- /dev/null +++ b/python/lsst/dax/ppdb/_factory.py @@ -0,0 +1,110 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["config_type_for_name", "ppdb_type", "ppdb_type_for_name"] + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .config import PpdbConfig + from .sql import PpdbSql + + +def ppdb_type(config: PpdbConfig) -> type[PpdbSql]: + """Return Ppdb class matching Ppdb configuration type. + + Parameters + ---------- + config : `PpdbConfig` + Configuration object, sub-class of PpdbConfig. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `Ppdb` class. + + Raises + ------ + TypeError + Raised if type of ``config`` does not match any known types. + """ + from .sql import PpdbSqlConfig + + if type(config) is PpdbSqlConfig: + from .sql import PpdbSql + + return PpdbSql + + raise TypeError(f"Unknown type of config object: {type(config)}") + + +def ppdb_type_for_name(type_name: str) -> type[PpdbSql]: + """Return Ppdb class matching type name. + + Parameters + ---------- + type_name : `str` + Short type name of Ppdb implement, for now only "sql" is supported. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `Ppdb` class. + + Raises + ------ + TypeError + Raised if ``type_name`` does not match any known types. + """ + if type_name == "sql": + from .sql import PpdbSql + + return PpdbSql + + raise TypeError(f"Unknown type name: {type_name}") + + +def config_type_for_name(type_name: str) -> type[PpdbConfig]: + """Return PpdbConfig class matching type name. + + Parameters + ---------- + type_name : `str` + Short type name of Ppdb implement, for now only "sql" is supported. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `PpdbConfig` class. + + Raises + ------ + TypeError + Raised if ``type_name`` does not match any known types. + """ + if type_name == "sql": + from .sql import PpdbSqlConfig + + return PpdbSqlConfig + + raise TypeError(f"Unknown type name: {type_name}") diff --git a/python/lsst/dax/ppdb/cli/__init__.py b/python/lsst/dax/ppdb/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/lsst/dax/ppdb/cli/options.py b/python/lsst/dax/ppdb/cli/options.py new file mode 100644 index 0000000..e7d1731 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/options.py @@ -0,0 +1,89 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["felis_schema_options", "sql_db_options", "replication_options"] + +import argparse + + +def felis_schema_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for Felis schema file.""" + group = parser.add_argument_group("felis schema options") + group.add_argument( + "--felis-path", + help="YAML file with PPDB felis schema (can be same as APDB schema).", + metavar="PATH", + default=None, + ) + group.add_argument( + "--felis-schema", + help="Schema name used in felis YAML file.", + metavar="NAME", + default=None, + ) + + +def sql_db_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for database connection.""" + group = parser.add_argument_group("database options") + group.add_argument( + "-s", + "--schema", + help="Optional schema name.", + metavar="DB_SCHEMA", + default=None, + ) + + group.add_argument( + "--connection-pool", + help="Enable/disable use of connection pool.", + action=argparse.BooleanOptionalAction, + default=True, + ) + + group.add_argument( + "--isolation-level", + help="Transaction isolation level, allowed values: %(choices)s", + metavar="STRING", + choices=["READ_COMMITTED", "READ_UNCOMMITTED", "REPEATABLE_READ", "SERIALIZABLE"], + default=None, + ) + + group.add_argument( + "--connection-timeout", + type=float, + help="Maximum connection timeout in seconds.", + metavar="SECONDS", + default=None, + ) + + +def replication_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for replication.""" + group = parser.add_argument_group("replication options") + group.add_argument( + "--single", help="Copy single replication item and stop.", default=False, action="store_true" + ) + group.add_argument( + "--update", help="Allow updates to already replicated data.", default=False, action="store_true" + ) diff --git a/python/lsst/dax/ppdb/cli/ppdb_cli.py b/python/lsst/dax/ppdb/cli/ppdb_cli.py new file mode 100644 index 0000000..82b82e3 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/ppdb_cli.py @@ -0,0 +1,59 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["main"] + +import argparse + +from lsst.dax.apdb.cli.logging_cli import LoggingCli + +from .. import scripts +from . import options + + +def main() -> None: + """PPDB command line tools.""" + parser = argparse.ArgumentParser(description="PPDB command line tools") + log_cli = LoggingCli(parser) + + subparsers = parser.add_subparsers(title="available subcommands", required=True) + _create_sql_subcommand(subparsers) + + args = parser.parse_args() + log_cli.process_args(args) + + kwargs = vars(args) + method = kwargs.pop("method") + method(**kwargs) + + +def _create_sql_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("create-sql", help="Create new PPDB instance in SQL database.") + parser.add_argument("db_url", help="Database URL in SQLAlchemy format for PPDB instance.") + parser.add_argument("config_path", help="Name of the new configuration file for created PPDB instance.") + options.felis_schema_options(parser) + options.sql_db_options(parser) + parser.add_argument( + "--drop", help="If True then drop existing tables.", default=False, action="store_true" + ) + parser.set_defaults(method=scripts.create_sql) diff --git a/python/lsst/dax/ppdb/cli/ppdb_replication.py b/python/lsst/dax/ppdb/cli/ppdb_replication.py new file mode 100644 index 0000000..ea3fe10 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/ppdb_replication.py @@ -0,0 +1,72 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["main"] + +import argparse + +from lsst.dax.apdb.cli.logging_cli import LoggingCli + +from .. import scripts +from . import options + +_longLogFmt = "%(asctime)s %(levelname)s %(name)s - %(message)s" + + +def main() -> None: + """Commands for managing APDB-to-PPDB replication.""" + parser = argparse.ArgumentParser(description="PPDB command line tools") + log_cli = LoggingCli(parser) + + subparsers = parser.add_subparsers(title="available subcommands", required=True) + _list_chunks_apdb_subcommand(subparsers) + _list_chunks_ppdb_subcommand(subparsers) + _run_subcommand(subparsers) + + args = parser.parse_args() + log_cli.process_args(args) + kwargs = vars(args) + method = kwargs.pop("method") + method(**kwargs) + + +def _list_chunks_apdb_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "list-chunks-apdb", help="Print full list of replic chunks existing on APDB side." + ) + parser.add_argument("apdb_config", help="Path to the APDB configuration.") + parser.set_defaults(method=scripts.replication_list_chunks_apdb) + + +def _list_chunks_ppdb_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("list-chunks-ppdb", help="List full set of replica chunks in PPDB.") + parser.add_argument("ppdb_config", help="Path to the PPDB configuration.") + parser.set_defaults(method=scripts.replication_list_chunks_ppdb) + + +def _run_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("run", help="Run replication from APDB to PPDB.") + parser.add_argument("apdb_config", help="Path to the APDB configuration.") + parser.add_argument("ppdb_config", help="Path to the PPDB configuration.") + options.replication_options(parser) + parser.set_defaults(method=scripts.replication_run) diff --git a/python/lsst/dax/ppdb/config.py b/python/lsst/dax/ppdb/config.py new file mode 100644 index 0000000..3263f49 --- /dev/null +++ b/python/lsst/dax/ppdb/config.py @@ -0,0 +1,64 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PpdbConfig"] + +from collections.abc import Mapping +from typing import Any + +import yaml +from lsst.resources import ResourcePath, ResourcePathExpression +from pydantic import BaseModel + +from ._factory import config_type_for_name + + +class PpdbConfig(BaseModel): + """Base class for PPDB configuration types.""" + + @classmethod + def from_uri(cls, uri: ResourcePathExpression) -> PpdbConfig: + """Load configuration object from external file. + + Parameters + ---------- + uri : `~lsst.resources.ResourcePathExpression` + Location of the file containing serialized configuration in YAML + format. + + Returns + ------- + config : `PpdbConfig` + PPD configuration object. + """ + path = ResourcePath(uri) + config_str = path.read() + config_object = yaml.safe_load(config_str) + if not isinstance(config_object, Mapping): + raise TypeError("YAML configuration file does not represent valid object") + config_dict: dict[str, Any] = dict(config_object) + type_name = config_dict.pop("implementation_type", None) + if not type_name: + raise LookupError("YAML configuration file does not have `implementation_type` key") + klass = config_type_for_name(type_name) + return klass(**config_dict) diff --git a/python/lsst/dax/ppdb/ppdb.py b/python/lsst/dax/ppdb/ppdb.py new file mode 100644 index 0000000..9177aaf --- /dev/null +++ b/python/lsst/dax/ppdb/ppdb.py @@ -0,0 +1,139 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["Ppdb"] + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +import astropy.time +from lsst.dax.apdb import ApdbMetadata, ApdbTableData, ReplicaChunk +from lsst.resources import ResourcePathExpression + +from ._factory import ppdb_type +from .config import PpdbConfig + + +@dataclass(frozen=True) +class PpdbReplicaChunk(ReplicaChunk): + """ReplicaChunk with additional PPDB-specific info.""" + + replica_time: astropy.time.Time + """Time when this bucket was replicated (`astropy.time.Time`).""" + + +class Ppdb(ABC): + """Class defining an interface for PPDB management operations.""" + + @classmethod + def from_config(cls, config: PpdbConfig) -> Ppdb: + """Create Ppdb instance from configuration object. + + Parameters + ---------- + config : `PpdbConfig` + Configuration object, type of this object determines type of the + Ppdb implementation. + + Returns + ------- + ppdb : `Ppdb` + Instance of `Ppdb` class. + """ + # Dispatch to actual implementation class based on config type. + + ppdb_class = ppdb_type(config) + return ppdb_class(config) + + @classmethod + def from_uri(cls, uri: ResourcePathExpression) -> Ppdb: + """Read PPDB configuration from URI and make a Ppdb instance. + + Parameters + ---------- + uri : `~lsst.resources.ResourcePathExpression` + Location of the file containing serialized configuration in YAML + format. + + Returns + ------- + ppdb : `Ppdb` + Instance of `Ppdb` class. + """ + config = PpdbConfig.from_uri(uri) + return cls.from_config(config) + + @property + @abstractmethod + def metadata(self) -> ApdbMetadata: + """Object controlling access to metadata + (`~lsst.dax.apdb.ApdbMetadata`). + """ + raise NotImplementedError() + + @abstractmethod + def get_replica_chunks(self) -> list[PpdbReplicaChunk] | None: + """Return collection of replica chunks known to the database. + + Returns + ------- + chunks : `list` [`PpdbReplicaChunk`] or `None` + List of chunks, they may be time-ordered if database supports + ordering. `None` is returned if database is not configured to store + chunk information. + """ + raise NotImplementedError() + + @abstractmethod + def store( + self, + replica_chunk: ReplicaChunk, + objects: ApdbTableData, + sources: ApdbTableData, + forced_sources: ApdbTableData, + *, + update: bool = False, + ) -> None: + """Copy APDB data to PPDB. + + Parameters + ---------- + replica_chunk : `~lsst.dax.apdb.ReplicaChunk` + Insertion ID for APDB data. + objects : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaObjects. + sources : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaSources. + forced_sources : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaForcedSources. + update : `bool`, optional + If `True` then allow updates for existing data from the same + ``replica_chunk``. + + Notes + ----- + Replication from APDB to PPDB should happen in the same order as + insertion order for APDB, i.e. in the order of increasing + ``replica_chunk.id`` values. + """ + raise NotImplementedError() diff --git a/python/lsst/dax/ppdb/py.typed b/python/lsst/dax/ppdb/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/python/lsst/dax/ppdb/scripts/__init__.py b/python/lsst/dax/ppdb/scripts/__init__.py new file mode 100644 index 0000000..94b85c3 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/__init__.py @@ -0,0 +1,25 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from .create_sql import create_sql +from .replication_list_chunks_apdb import replication_list_chunks_apdb +from .replication_list_chunks_ppdb import replication_list_chunks_ppdb +from .replication_run import replication_run diff --git a/python/lsst/dax/ppdb/scripts/create_sql.py b/python/lsst/dax/ppdb/scripts/create_sql.py new file mode 100644 index 0000000..2dec70c --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/create_sql.py @@ -0,0 +1,79 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["create_sql"] + +import yaml + +from ..sql import PpdbSql + + +def create_sql( + db_url: str, + schema: str | None, + config_path: str, + felis_path: str, + felis_schema: str, + connection_pool: bool, + isolation_level: str | None, + connection_timeout: float | None, + drop: bool, +) -> None: + """Create new PPDB instance in SQL database. + + Parameters + ---------- + db_url : `str` + SQLAlchemy connection string. + schema : `str` or `None` + Database schema name, `None` to use default schema. + config_path : `str` + Name of the file to write PPDB configuration. + felis_path : `str` + Path to the Felis YAML file with table schema definition. + felis_schema : `str` + Name of the schema defined in felis YAML file. + connection_pool : `bool` + If True then enable connection pool. + isolation_level : `str` or `None` + Transaction isolation level, if unset then backend-default value is + used. + connection_timeout: `float` or `None` + Maximum connection timeout in seconds. + drop : `bool` + If `True` then drop existing tables. + """ + config = PpdbSql.init_database( + db_url=db_url, + schema_name=schema, + schema_file=felis_path, + felis_schema=felis_schema, + use_connection_pool=connection_pool, + isolation_level=isolation_level, + connection_timeout=connection_timeout, + drop=drop, + ) + config_dict = config.model_dump(exclude_unset=True, exclude_defaults=True) + config_dict["implementation_type"] = "sql" + with open(config_path, "w") as config_file: + yaml.dump(config_dict, config_file) diff --git a/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py b/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py new file mode 100644 index 0000000..6cd1560 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py @@ -0,0 +1,50 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_list_chunks_apdb"] + +from lsst.dax.apdb import ApdbReplica + + +def replication_list_chunks_apdb(apdb_config: str) -> None: + """Print full list of replica chunks existing on APDB side. + + Parameters + ---------- + apdb_config : `str` + URL for APDB configuration file. + """ + apdb = ApdbReplica.from_uri(apdb_config) + chunks = apdb.getReplicaChunks() + if chunks is not None: + print(" Chunk Id Update time Unique Id") + sep = "-" * 77 + print(sep) + chunks = sorted(chunks, key=lambda chunk: chunk.id) + for chunk in chunks: + insert_time = chunk.last_update_time + print(f"{chunk.id:10d} {insert_time.tai.isot}/tai {chunk.unique_id}") + print(sep) + print(f"Total: {len(chunks)}") + else: + print("APDB instance does not support InsertIds") diff --git a/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py b/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py new file mode 100644 index 0000000..ec5f9c1 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py @@ -0,0 +1,52 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_list_chunks_ppdb"] + +from ..ppdb import Ppdb + + +def replication_list_chunks_ppdb(ppdb_config: str) -> None: + """Print list of replica chunks existing on PPDB side. + + Parameters + ---------- + ppdb_config : `str` + URL for PPDB configuration file. + """ + ppdb = Ppdb.from_uri(ppdb_config) + chunks = ppdb.get_replica_chunks() + if chunks is not None: + print(" Chunk Id Update time Replica time Unique Id") + sep = "-" * 106 + print(sep) + chunks = sorted(chunks, key=lambda chunk: chunk.id) + for insert in chunks: + print( + f"{insert.id:10d} {insert.last_update_time.tai.isot}/tai " + f"{insert.replica_time.tai.isot}/tai {insert.unique_id}" + ) + print(sep) + print(f"Total: {len(chunks)}") + else: + print("APDB instance does not support InsertIds") diff --git a/python/lsst/dax/ppdb/scripts/replication_run.py b/python/lsst/dax/ppdb/scripts/replication_run.py new file mode 100644 index 0000000..96c71db --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_run.py @@ -0,0 +1,106 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_run"] + +import logging +from typing import TYPE_CHECKING + +from lsst.dax.apdb import ApdbReplica + +from ..ppdb import Ppdb + +if TYPE_CHECKING: + from lsst.dax.apdb import ReplicaChunk + + from ..ppdb import PpdbReplicaChunk + +_LOG = logging.getLogger(__name__) + + +def replication_run( + apdb_config: str, + ppdb_config: str, + single: bool, + update: bool, +) -> None: + """Execute replication process from APDB to PPDB. + + Parameters + ---------- + apdb_config : `str` + URL for APDB configuration file. + ppdb_config : `str` + URL for PPDB configuration file. + single : `bool` + Copy single bucket and stop. + update : `bool` + If `True` then allow updates to previously replicated data. + """ + apdb = ApdbReplica.from_uri(apdb_config) + ppdb = Ppdb.from_uri(ppdb_config) + + chunks = apdb.getReplicaChunks() + if chunks is None: + raise TypeError("APDB implementation does not support replication") + ppdb_chunks = ppdb.get_replica_chunks() + if ppdb_chunks is None: + raise TypeError("PPDB implementation does not support replication") + + ids = _merge_ids(chunks, ppdb_chunks) + + # Check existing PPDB ids for consistency. + for apdb_chunk, ppdb_chunk in ids: + if ppdb_chunk is not None: + if ppdb_chunk.unique_id != apdb_chunk.unique_id: + raise ValueError(f"Inconsistent values of unique ID - APDB: {apdb_chunk} PPDB: {ppdb_chunk}") + + ids_to_copy = [apdb_chunk for apdb_chunk, ppdb_chunk in ids if ppdb_chunk is None] + for apdb_chunk in ids_to_copy: + _LOG.info("Will replicate bucket %s", apdb_chunk) + _replicate_one(apdb, ppdb, apdb_chunk, update) + if single: + break + + +def _merge_ids( + chunks: list[ReplicaChunk], ppdb_chunks: list[PpdbReplicaChunk] +) -> list[tuple[ReplicaChunk, PpdbReplicaChunk | None]]: + """Make a list of pairs (apdb_chunk, ppdb_chunk), if ppdb_chunk does not + exist for apdb_chunk then it will be None. + """ + ppdb_id_map = {ppdb_chunk.id: ppdb_chunk for ppdb_chunk in ppdb_chunks} + apdb_ids = sorted(chunks, key=lambda apdb_chunk: apdb_chunk.id) + return [(apdb_chunk, ppdb_id_map.get(apdb_chunk.id)) for apdb_chunk in apdb_ids] + + +def _replicate_one(apdb: ApdbReplica, ppdb: Ppdb, replica_chunk: ReplicaChunk, update: bool) -> None: + + dia_objects = apdb.getDiaObjectsChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaObjects for replication", len(dia_objects.rows())) + dia_sources = apdb.getDiaSourcesChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaSources for replication", len(dia_sources.rows())) + dia_forced_sources = apdb.getDiaForcedSourcesChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaForcedSources for replication", len(dia_forced_sources.rows())) + + ppdb.store(replica_chunk, dia_objects, dia_sources, dia_forced_sources, update=update) diff --git a/python/lsst/dax/ppdb/sql/__init__.py b/python/lsst/dax/ppdb/sql/__init__.py new file mode 100644 index 0000000..d8ad126 --- /dev/null +++ b/python/lsst/dax/ppdb/sql/__init__.py @@ -0,0 +1,22 @@ +# This file is part of dax_ppdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from ._ppdb_sql import PpdbSql, PpdbSqlConfig diff --git a/python/lsst/dax/ppdb/sql/_ppdb_sql.py b/python/lsst/dax/ppdb/sql/_ppdb_sql.py new file mode 100644 index 0000000..590fae6 --- /dev/null +++ b/python/lsst/dax/ppdb/sql/_ppdb_sql.py @@ -0,0 +1,530 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PpdbSql", "PpdbSqlConfig"] + +import logging +import os +import sqlite3 +from collections.abc import MutableMapping +from contextlib import closing, suppress +from typing import Any + +import astropy.time +import sqlalchemy +import yaml +from felis.datamodel import Schema, SchemaVersion +from felis.metadata import MetaDataBuilder +from lsst.dax.apdb import ApdbMetadata, ApdbTableData, IncompatibleVersionError, ReplicaChunk, VersionTuple +from lsst.dax.apdb.sql.apdbMetadataSql import ApdbMetadataSql +from lsst.dax.apdb.sql.apdbSqlSchema import GUID +from lsst.resources import ResourcePath +from lsst.utils.iteration import chunk_iterable +from sqlalchemy import sql +from sqlalchemy.pool import NullPool + +from ..config import PpdbConfig +from ..ppdb import Ppdb, PpdbReplicaChunk +from .bulk_insert import make_inserter + +_LOG = logging.getLogger(__name__) + +VERSION = VersionTuple(0, 1, 0) +"""Version for the code defined in this module. This needs to be updated +(following compatibility rules) when schema produced by this code changes. +""" + + +def _onSqlite3Connect( + dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord +) -> None: + # Enable foreign keys + with closing(dbapiConnection.cursor()) as cursor: + cursor.execute("PRAGMA foreign_keys=ON;") + + +class PpdbSqlConfig(PpdbConfig): + db_url: str + """SQLAlchemy database connection URI.""" + + schema_name: str | None = None + """Database schema name, if `None` then default schema is used.""" + + felis_path: str | None = None + """Name of YAML file with ``felis`` schema, if `None` then default schema + file is used. + """ + + felis_schema: str | None = None + """Name of the schema in YAML file, if `None` then file has to contain + single schema. + """ + + use_connection_pool: bool = True + """If True then allow use of connection pool.""" + + isolation_level: str | None = None + """Transaction isolation level, if unset then backend-default value is + used. + """ + + connection_timeout: float | None = None + """Maximum connection timeout in seconds.""" + + +class PpdbSql(Ppdb): + default_felis_schema_file = "${SDM_SCHEMAS_DIR}/yml/apdb.yaml" + + meta_schema_version_key = "version:schema" + """Name of the metadata key to store schema version number.""" + + meta_code_version_key = "version:PpdbSql" + """Name of the metadata key to store code version number.""" + + def __init__(self, config: PpdbConfig): + if not isinstance(config, PpdbSqlConfig): + raise TypeError("Expecting PpdbSqlConfig instance") + self.config = config + + self._sa_metadata, schema_version = self._read_schema( + config.felis_path, config.schema_name, config.felis_schema + ) + + self._engine = self._make_engine(config) + sa_metadata = sqlalchemy.MetaData(schema=config.schema_name) + + meta_table: sqlalchemy.schema.Table | None = None + with suppress(sqlalchemy.exc.NoSuchTableError): + meta_table = sqlalchemy.schema.Table("metadata", sa_metadata, autoload_with=self._engine) + + self._metadata = ApdbMetadataSql(self._engine, meta_table) + + # Check schema version compatibility + if self._metadata.table_exists(): + self._versionCheck(self._metadata, schema_version) + + @classmethod + def init_database( + cls, + db_url: str, + schema_name: str | None, + schema_file: str | None, + felis_schema: str | None, + use_connection_pool: bool, + isolation_level: str | None, + connection_timeout: float | None, + drop: bool, + ) -> PpdbConfig: + """Initialize PPDB database. + + Parameters + ---------- + db_url : `str` + SQLAlchemy database connection URI. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + schema_file : `str` or `None` + Name of YAML file with ``felis`` schema, if `None` then default + schema file is used. + felis_schema : `str` or `None` + Name of the schema in YAML file, if `None` then file has to contain + single schema. + use_connection_pool : `bool` + If True then allow use of connection pool. + isolation_level : `str` or `None` + Transaction isolation level, if unset then backend-default value is + used. + connection_timeout: `float` or `None` + Maximum connection timeout in seconds. + drop : `bool` + If `True` then drop existing tables. + """ + sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema) + config = PpdbSqlConfig( + db_url=db_url, + schema_name=schema_name, + felis_path=schema_file, + felis_schema=felis_schema, + use_connection_pool=use_connection_pool, + isolation_level=isolation_level, + connection_timeout=connection_timeout, + ) + cls._make_database(config, sa_metadata, schema_version, drop) + return config + + @classmethod + def _read_schema( + cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None + ) -> tuple[sqlalchemy.schema.MetaData, VersionTuple]: + """Read felis schema definitions for PPDB. + + Parameters + ---------- + schema_file : `str` or `None` + Name of YAML file with ``felis`` schema, if `None` then default + schema file is used. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + felis_schema : `str`, optional + Name of the schema in YAML file, if `None` then file has to contain + single schema. + + Returns + ------- + metadata : `sqlalchemy.schema.MetaData` + SQLAlchemy metadata instance containing information for all tables. + version : `lsst.dax.apdb.VersionTuple` or `None` + Schema version defined in schema or `None` if not defined. + """ + if schema_file is None: + schema_file = os.path.expandvars(cls.default_felis_schema_file) + + res = ResourcePath(schema_file) + schemas_list = list(yaml.load_all(res.read(), Loader=yaml.SafeLoader)) + if not schemas_list: + raise ValueError(f"Schema file {schema_file!r} does not define any schema") + if felis_schema is not None: + schemas_list = [schema for schema in schemas_list if schema.get("name") == felis_schema] + if not schemas_list: + raise ValueError(f"Schema file {schema_file!r} does not define schema {felis_schema!r}") + elif len(schemas_list) > 1: + raise ValueError(f"Schema file {schema_file!r} defines multiple schemas") + schema_dict = schemas_list[0] + + # In case we use APDB schema drop tables that are not needed in PPDB. + filtered_tables = [ + table for table in schema_dict["tables"] if table["name"] not in ("DiaObjectLast",) + ] + schema_dict["tables"] = filtered_tables + schema = Schema.model_validate(schema_dict) + + # Replace schema name with a configured one, this helps in case we + # want to use default schema on database side. + if schema_name: + schema.name = schema_name + metadata = MetaDataBuilder(schema).build() + else: + builder = MetaDataBuilder(schema, apply_schema_to_metadata=False, apply_schema_to_tables=False) + metadata = builder.build() + + # Add table for replication support. + sqlalchemy.schema.Table( + "PpdbReplicaChunk", + metadata, + sqlalchemy.schema.Column( + "apdb_replica_chunk", sqlalchemy.BigInteger, primary_key=True, autoincrement=False + ), + sqlalchemy.schema.Column("last_update_time", sqlalchemy.types.TIMESTAMP, nullable=False), + sqlalchemy.schema.Column("unique_id", GUID, nullable=False), + sqlalchemy.schema.Column("replica_time", sqlalchemy.types.TIMESTAMP, nullable=False), + sqlalchemy.schema.Index("PpdbInsertId_idx_last_update_time", "last_update_time"), + sqlalchemy.schema.Index("PpdbInsertId_idx_replica_time", "replica_time"), + schema=schema_name, + ) + + if isinstance(schema.version, str): + version = VersionTuple.fromString(schema.version) + elif isinstance(schema.version, SchemaVersion): + version = VersionTuple.fromString(schema.version.current) + else: + # Missing schema version is identical to 0.1.0 + version = VersionTuple(0, 1, 0) + + return metadata, version + + @classmethod + def _make_database( + cls, + config: PpdbSqlConfig, + sa_metadata: sqlalchemy.schema.MetaData, + schema_version: VersionTuple | None, + drop: bool, + ) -> None: + """Initialize database schema. + + Parameters + ---------- + db_url : `str` + SQLAlchemy database connection URI. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + sa_metadata : `sqlalchemy.schema.MetaData` + Schema definition. + schema_version : `lsst.dax.apdb.VersionTuple` or `None` + Schema version defined in schema or `None` if not defined. + drop : `bool` + If `True` then drop existing tables before creating new ones. + """ + engine = cls._make_engine(config) + + if config.schema_name is not None: + dialect = engine.dialect + quoted_schema = dialect.preparer(dialect).quote_schema(config.schema_name) + create_schema = sqlalchemy.DDL( + "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema} + ).execute_if(dialect="postgresql") + sqlalchemy.event.listen(sa_metadata, "before_create", create_schema) + + if drop: + _LOG.info("dropping all tables") + sa_metadata.drop_all(engine) + _LOG.info("creating all tables") + sa_metadata.create_all(engine) + + # Need metadata table to store few items in it, if table exists. + meta_table: sqlalchemy.schema.Table | None = None + for table in sa_metadata.tables.values(): + if table.name == "metadata": + meta_table = table + break + + apdb_meta = ApdbMetadataSql(engine, meta_table) + if apdb_meta.table_exists(): + # Fill version numbers, overwrite if they are already there. + if schema_version is not None: + _LOG.info("Store metadata %s = %s", cls.meta_schema_version_key, schema_version) + apdb_meta.set(cls.meta_schema_version_key, str(schema_version), force=True) + _LOG.info("Store metadata %s = %s", cls.meta_code_version_key, VERSION) + apdb_meta.set(cls.meta_code_version_key, str(VERSION), force=True) + + @classmethod + def _make_engine(cls, config: PpdbSqlConfig) -> sqlalchemy.engine.Engine: + """Make SQLALchemy engine based on configured parameters. + + Parameters + ---------- + config : `PpdbSqlConfig` + Configuration object. + """ + kw: MutableMapping[str, Any] = {} + conn_args: dict[str, Any] = dict() + if not config.use_connection_pool: + kw["poolclass"] = NullPool + if config.isolation_level is not None: + kw.update(isolation_level=config.isolation_level) + elif config.db_url.startswith("sqlite"): # type: ignore + # Use READ_UNCOMMITTED as default value for sqlite. + kw.update(isolation_level="READ_UNCOMMITTED") + if config.connection_timeout is not None: + if config.db_url.startswith("sqlite"): + conn_args.update(timeout=config.connection_timeout) + elif config.db_url.startswith(("postgresql", "mysql")): + conn_args.update(connect_timeout=config.connection_timeout) + kw = {"connect_args": conn_args} + engine = sqlalchemy.create_engine(config.db_url, **kw) + + if engine.dialect.name == "sqlite": + # Need to enable foreign keys on every new connection. + sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect) + + return engine + + def _versionCheck(self, metadata: ApdbMetadataSql, schema_version: VersionTuple) -> None: + """Check schema version compatibility.""" + + def _get_version(key: str, default: VersionTuple) -> VersionTuple: + """Retrieve version number from given metadata key.""" + if metadata.table_exists(): + version_str = metadata.get(key) + if version_str is None: + # Should not happen with existing metadata table. + raise RuntimeError(f"Version key {key!r} does not exist in metadata table.") + return VersionTuple.fromString(version_str) + return default + + # For old databases where metadata table does not exist we assume that + # version of both code and schema is 0.1.0. + initial_version = VersionTuple(0, 1, 0) + db_schema_version = _get_version(self.meta_schema_version_key, initial_version) + db_code_version = _get_version(self.meta_code_version_key, initial_version) + + # For now there is no way to make read-only APDB instances, assume that + # any access can do updates. + if not schema_version.checkCompatibility(db_schema_version, True): + raise IncompatibleVersionError( + f"Configured schema version {schema_version} " + f"is not compatible with database version {db_schema_version}" + ) + if not VERSION.checkCompatibility(db_code_version, True): + raise IncompatibleVersionError( + f"Current code version {VERSION} " + f"is not compatible with database version {db_code_version}" + ) + + def _get_table(self, name: str) -> sqlalchemy.schema.Table: + for table in self._sa_metadata.tables.values(): + if table.name == name: + return table + raise LookupError(f"Unknown table {name}") + + @property + def metadata(self) -> ApdbMetadata: + # docstring is inherited from a base class + return self._metadata + + def get_replica_chunks(self) -> list[PpdbReplicaChunk] | None: + # docstring is inherited from a base class + table = self._get_table("PpdbReplicaChunk") + query = sql.select( + table.columns["apdb_replica_chunk"], + table.columns["last_update_time"], + table.columns["unique_id"], + table.columns["replica_time"], + ).order_by(table.columns["last_update_time"]) + with self._engine.connect() as conn: + result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query) + ids = [] + for row in result: + last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai") + replica_time = astropy.time.Time(row[3].timestamp(), format="unix_tai") + ids.append( + PpdbReplicaChunk( + id=row[0], + last_update_time=last_update_time, + unique_id=row[2], + replica_time=replica_time, + ) + ) + return ids + + def store( + self, + replica_chunk: ReplicaChunk, + objects: ApdbTableData, + sources: ApdbTableData, + forced_sources: ApdbTableData, + *, + update: bool = False, + ) -> None: + # docstring is inherited from a base class + + # We want to run all inserts in one transaction. + with self._engine.begin() as connection: + # Check for existing InsertId first, if it does not exist we can + # run more optimal queries. + if update: + table = self._get_table("PpdbReplicaChunk") + query = sql.select(sql.expression.literal(1)).where( + table.columns["apdb_replica_chunk"] == replica_chunk.id + ) + if connection.execute(query).one_or_none() is None: + update = False + + self._store_insert_id(replica_chunk, connection, update) + self._store_objects(objects, connection, update) + self._store_table_data(sources, connection, update, "DiaSource", 100) + self._store_table_data(forced_sources, connection, update, "DiaForcedSource", 1000) + + def _store_insert_id( + self, replica_chunk: ReplicaChunk, connection: sqlalchemy.engine.Connection, update: bool + ) -> None: + """Insert or replace single record in PpdbReplicaChunk table""" + insert_dt = replica_chunk.last_update_time.tai.datetime + now = astropy.time.Time.now().tai.datetime + + table = self._get_table("PpdbReplicaChunk") + + values = {"last_update_time": insert_dt, "unique_id": replica_chunk.unique_id, "replica_time": now} + row = {"apdb_replica_chunk": replica_chunk.id} | values + if update: + # We need UPSERT which is dialect-specific construct + if connection.dialect.name == "sqlite": + insert_sqlite = sqlalchemy.dialects.sqlite.insert(table) + insert_sqlite = insert_sqlite.on_conflict_do_update( + index_elements=table.primary_key, set_=values + ) + connection.execute(insert_sqlite, row) + elif connection.dialect.name == "postgresql": + insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table) + insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values) + connection.execute(insert_pg, row) + else: + raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.") + else: + insert = table.insert() + connection.execute(insert, row) + + def _store_objects( + self, objects: ApdbTableData, connection: sqlalchemy.engine.Connection, update: bool + ) -> None: + """Store or replace DiaObjects.""" + # Store all records. + self._store_table_data(objects, connection, update, "DiaObject", 100) + + table = self._get_table("DiaObject") + + # We need to fill validityEnd column for the previously stored + # objects that have new records. Window function is used here to find + # records with validityEnd=NULL, order them and update validityEnd + # of older records from validityStart of newer records + idx = objects.column_names().index("diaObjectId") + ids = sorted(set(row[idx] for row in objects.rows())) + count = 0 + for chunk in chunk_iterable(ids, 1000): + select_cte = sqlalchemy.cte( + sqlalchemy.select( + table.columns["diaObjectId"], + table.columns["validityStart"], + table.columns["validityEnd"], + sqlalchemy.func.rank() + .over( + partition_by=table.columns["diaObjectId"], + order_by=table.columns["validityStart"], + ) + .label("rank"), + ).where(table.columns["diaObjectId"].in_(chunk)) + ) + sub1 = select_cte.alias("s1") + sub2 = select_cte.alias("s2") + new_end = sql.select(sub2.columns["validityStart"]).select_from( + sub1.join( + sub2, + sqlalchemy.and_( + sub1.columns["diaObjectId"] == sub2.columns["diaObjectId"], + sub1.columns["rank"] + sqlalchemy.literal(1) == sub2.columns["rank"], + sub1.columns["validityStart"] == table.columns["validityStart"], + ), + ) + ) + stmt = ( + table.update() + .values(validityEnd=new_end.scalar_subquery()) + .where(table.columns["validityStart"] == None) # noqa: E711 + ) + result = connection.execute(stmt) + count += result.rowcount + _LOG.info("Updated %d rows in DiaObject table with new validityEnd values", count) + + def _store_table_data( + self, + table_data: ApdbTableData, + connection: sqlalchemy.engine.Connection, + update: bool, + table_name: str, + chunk_size: int, + ) -> None: + """Store or replace DiaSources.""" + table = self._get_table(table_name) + inserter = make_inserter(connection) + count = inserter.insert(table, table_data, chunk_size=chunk_size) + _LOG.info("Inserted %d rows into %s table", count, table_name) diff --git a/python/lsst/dax/ppdb/sql/bulk_insert.py b/python/lsst/dax/ppdb/sql/bulk_insert.py new file mode 100644 index 0000000..dd416dd --- /dev/null +++ b/python/lsst/dax/ppdb/sql/bulk_insert.py @@ -0,0 +1,134 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["BulkInserter", "make_inserter"] + +import logging +import tempfile +from abc import ABC, abstractmethod +from collections.abc import Sequence +from typing import Any + +import sqlalchemy +from lsst.dax.apdb import ApdbTableData +from lsst.utils.iteration import chunk_iterable + +from .pg_dump import PgBinaryDumper + +_LOG = logging.getLogger(__name__) + + +class BulkInserter(ABC): + """Interface for bulk insert operations into a table.""" + + @abstractmethod + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + """Insert multiple rows into a single table. + + Parameters + ---------- + table : `sqlalchemy.schema.Table` + Table to insert data into. + data : `ApdbTableData` + Data to insert into the table. + chunk_size : `int`, optional + Number of rows for a single chunk for insertion. + """ + raise NotImplementedError() + + +class _DefaultBulkInserter(BulkInserter): + + def __init__(self, connection: sqlalchemy.engine.Connection): + self.connection = connection + + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + # Docstring inherited. + table_columns = set(column.name for column in table.columns) + data_columns = set(data.column_names()) + drop_columns = data_columns - table_columns + insert = table.insert() + count = 0 + # DiaObject table is very wide, use smaller chunks. + for chunk in chunk_iterable(data.rows(), chunk_size): + insert_data = [self._row_to_dict(data.column_names(), row, drop_columns) for row in chunk] + result = self.connection.execute(insert.values(insert_data)) + count += result.rowcount + return count + + @staticmethod + def _row_to_dict(column_names: Sequence[str], row: tuple, drop_columns: set[str]) -> dict[str, Any]: + """Convert TableData row into dict.""" + data = dict(zip(column_names, row)) + for column in drop_columns: + del data[column] + return data + + +class _Psycopg2BulkInserter(BulkInserter): + + def __init__(self, connection: sqlalchemy.engine.Connection): + self.connection = connection + self.ident_prepare = sqlalchemy.sql.compiler.IdentifierPreparer(connection.dialect) + + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + # Docstring inherited. + + # To oavoid potential mismatch or ambiguity in column definitions I + # want to reflect actual table definition from database. + meta = sqlalchemy.schema.MetaData() + reflected = sqlalchemy.schema.Table( + table.name, meta, autoload_with=self.connection, resolve_fks=False, schema=table.schema + ) + + conn = self.connection.connection.dbapi_connection + assert conn is not None, "Connection cannot be None" + cursor = conn.cursor() + + with tempfile.TemporaryFile() as stream: + + _LOG.info("Writing %s data to a temporary file", table.name) + + dumper = PgBinaryDumper(stream, reflected) + columns = dumper.dump(data) + + # Build COPY query, may need to quote some column names. + columns_str = ", ".join(self.ident_prepare.quote(column) for column in columns) + table_name = self.ident_prepare.format_table(table) + sql = f"COPY {table_name} ({columns_str}) FROM STDIN WITH BINARY" + _LOG.debug("COPY query: %s", sql) + + # Rewind the file so that reading from it can work. + _LOG.info("Ingesting %s data to Postgres table", table.name) + stream.seek(0) + cursor.copy_expert(sql, stream, 1024 * 1024) + _LOG.info("Successfully ingested %s data", table.name) + + return len(data.rows()) + + +def make_inserter(connection: sqlalchemy.engine.Connection) -> BulkInserter: + """Make instance of `BulkInserter` suitable for a given connection.""" + if connection.dialect.driver == "psycopg2": + return _Psycopg2BulkInserter(connection) + return _DefaultBulkInserter(connection) diff --git a/python/lsst/dax/ppdb/sql/pg_dump.py b/python/lsst/dax/ppdb/sql/pg_dump.py new file mode 100644 index 0000000..7b6f00b --- /dev/null +++ b/python/lsst/dax/ppdb/sql/pg_dump.py @@ -0,0 +1,274 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PgBinaryDumper"] + +import logging +import struct +from abc import ABC, abstractmethod +from datetime import datetime, timedelta, timezone +from typing import Any, BinaryIO, NamedTuple + +import sqlalchemy +import sqlalchemy.dialects.postgresql.types as pg_types +from lsst.dax.apdb import ApdbTableData +from sqlalchemy.sql import sqltypes + +_LOG = logging.getLogger(__name__) + + +class StructData(NamedTuple): + + size: int + format: str + value: Any + + +class _ColumnDataHandler(ABC): + + @abstractmethod + def to_struct(self, column_value: Any) -> StructData: + raise NotImplementedError() + + +class PgBinaryDumper: + """Class that knows how to dump ApdbTableData to binary PostgreSQL file.""" + + _HEADER = b"PGCOPY\n\377\r\n\0" + + def __init__(self, stream: BinaryIO, table: sqlalchemy.schema.Table): + self._stream = stream + self._table = table + + def dump(self, data: ApdbTableData) -> list[str]: + """Dump the whole contents of table data to a file.""" + # Only care about columns that exists in both table and data. + data_column_names = data.column_names() + table_column_names = set(column.name for column in self._table.columns) + _LOG.debug("table_column_names: %s", table_column_names) + + column_indices = [idx for idx, name in enumerate(data_column_names) if name in table_column_names] + types = [self._table.columns[data_column_names[idx]].type for idx in column_indices] + handlers = [_TYPE_MAP[column_type.__class__] for column_type in types] + + # Write PGDUMP header and flags (two 32-bit integers) + self._stream.write(self._HEADER + b"\0\0\0\0\0\0\0\0") + + # Dump all rows. + for row in data.rows(): + + # Buld row struct, it starts with the number of columns as 16-bit + # integer, all data is in network order. + fmt = ["!h"] + args = [len(column_indices)] + for idx, handler in zip(column_indices, handlers): + struct_data = handler.to_struct(row[idx]) + if struct_data.value is None: + # Null is encoded as size=-1, without data + fmt.append("i") + args.append(-1) + else: + fmt.extend(["i", struct_data.format]) + args.extend([struct_data.size, struct_data.value]) + + row_bytes = struct.pack("".join(fmt), *args) + self._stream.write(row_bytes) + + return [data_column_names[idx] for idx in column_indices] + + +class _FixedColumnDataHandler(_ColumnDataHandler): + + def __init__(self, size: int, format: str): + self._size = size + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + return StructData(size=self._size, format=self._format, value=column_value) + + +class _ByteArrayColumnDataHandler(_ColumnDataHandler): + + def __init__(self, format: str): + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format=self._format, value=None) + format = f"{len(column_value)}{self._format}" + return StructData(size=len(column_value), format=format, value=column_value) + + +class _StringColumnDataHandler(_ColumnDataHandler): + + def __init__(self, format: str): + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format=self._format, value=None) + # Assume that utf8 is OK for all string data + assert isinstance(column_value, str), "Expect string instance" + value = column_value.encode() + format = f"{len(value)}{self._format}" + return StructData(size=len(value), format=format, value=value) + + +class _TimestampColumnDataHandler(_ColumnDataHandler): + + epoch_utc = datetime(2000, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + epoch_naive = datetime(2000, 1, 1, 0, 0, 0) + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format="q", value=None) + assert isinstance(column_value, datetime), "Expect datetime instance" + # Timestamps are stored internally as microseconds since epoch + # (Jan 1, 2000) + if column_value.tzinfo is None: + delta = column_value - self.epoch_naive + else: + delta = column_value - self.epoch_utc + delta_usec = int(delta / timedelta(microseconds=1)) + return StructData(size=8, format="q", value=delta_usec) + + +_pg_types = [ + "BIT", + "BYTEA", # done + "CIDR", + "CITEXT", + "INET", + "INTERVAL", + "MACADDR", + "MACADDR8", + "MONEY", + "OID", + "PGBit", + "PGCidr", + "PGInet", + "PGInterval", + "PGMacAddr", + "PGMacAddr8", + "PGUuid", + "REGCLASS", + "REGCONFIG", + "TIME", + "TIMESTAMP", + "TSQUERY", + "TSVECTOR", +] + +_sqltypes = [ + "ARRAY", + "BIGINT", # done + "BINARY", + "BLOB", + "BOOLEAN", + "BigInteger", + "Boolean", + "CHAR", # done + "CLOB", + "DATE", + "DATETIME", + "DATETIME_TIMEZONE", + "DECIMAL", + "DOUBLE", + "DOUBLE_PRECISION", + "Date", + "DateTime", + "Double", + "FLOAT", + "Float", + "INT", # done + "INTEGER", # done + "Integer", + "Interval", + "JSON", + "LargeBinary", + "MATCHTYPE", + "MatchType", + "NCHAR", + "NULLTYPE", + "NUMERIC", + "NUMERICTYPE", + "NVARCHAR", + "Numeric", + "REAL", + "SMALLINT", # done + "STRINGTYPE", + "SmallInteger", + "String", + "TEXT", + "TIME", + "TIMESTAMP", + "TIME_TIMEZONE", + "Text", + "Time", + "Tuple", + "TupleType", + "Type", + "UUID", + "Unicode", + "UnicodeText", + "Uuid", + "VARBINARY", + "VARCHAR", +] + +_TYPE_MAP = { + sqltypes.SMALLINT: _FixedColumnDataHandler(2, "h"), + sqltypes.INT: _FixedColumnDataHandler(4, "i"), + sqltypes.INTEGER: _FixedColumnDataHandler(4, "i"), + sqltypes.BIGINT: _FixedColumnDataHandler(8, "q"), + sqltypes.DOUBLE: _FixedColumnDataHandler(8, "d"), + sqltypes.DOUBLE_PRECISION: _FixedColumnDataHandler(8, "d"), + sqltypes.BOOLEAN: _FixedColumnDataHandler(1, "?"), + sqltypes.FLOAT: _FixedColumnDataHandler(4, "f"), + sqltypes.CHAR: _StringColumnDataHandler("s"), + sqltypes.VARCHAR: _StringColumnDataHandler("s"), + pg_types.BYTEA: _ByteArrayColumnDataHandler("s"), + pg_types.TIMESTAMP: _TimestampColumnDataHandler(), +} + + +def _dump_pgdump(filename: str) -> None: + + with open(filename, "rb") as pgdump: + header = pgdump.read(11 + 4 + 4) + print("header:", header) + buffer = pgdump.read(2) + (count,) = struct.unpack("!h", buffer) + print("column count:", count) + for i in range(count): + buffer = pgdump.read(4) + (size,) = struct.unpack("!i", buffer) + print(f" {i}: {size}") + if size > 0: + buffer = pgdump.read(size) + + +if __name__ == "__main__": + import sys + + _dump_pgdump(sys.argv[1]) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2b69573 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +astropy +numpy +pandas +pyyaml >= 5.1 +sqlalchemy +lsst-dax-apdb @ git+https://github.com/lsst/dax_apdb@main +lsst-utils @ git+https://github.com/lsst/utils@main +lsst-resources[s3] @ git+https://github.com/lsst/resources@main +lsst-felis @ git+https://github.com/lsst/felis@main diff --git a/setup.cfg b/setup.cfg index 53fa4eb..bdd0dc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,3 @@ exclude = **/*/__init__.py, **/*/version.py, tests/.tests - -[tool:pytest] -addopts = --flake8 -flake8-ignore = E133 E226 E228 N802 N803 N806 N812 N813 N815 N816 W503 diff --git a/ups/dax_ppdb.table b/ups/dax_ppdb.table index e3b1923..f12decc 100644 --- a/ups/dax_ppdb.table +++ b/ups/dax_ppdb.table @@ -1,9 +1,9 @@ -# List EUPS dependencies of this package here. -# - Any package whose API is used directly should be listed explicitly. -# - Common third-party packages can be assumed to be recursively included by -# the "sconsUtils" package. +setupRequired(dax_apdb) +setupRequired(felis) +setupRequired(resources) setupRequired(sconsUtils) +setupRequired(sdm_schemas) +setupRequired(utils) -# The following is boilerplate for all packages. -# See https://dmtn-001.lsst.io for details on LSST_LIBRARY_PATH. +envPrepend(PATH, ${PRODUCT_DIR}/bin) envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)