Skip to content

Commit

Permalink
Merge pull request #2 from lsst/tickets/DM-44129
Browse files Browse the repository at this point in the history
DM-44129: Improvements to replication script
  • Loading branch information
andy-slac authored Jul 30, 2024
2 parents f5ce8fd + d2553b5 commit 608ee7a
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 113 deletions.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ ignore_missing_imports = True
[mypy-lsst.sphgeom]
ignore_missing_imports = True

[mypy-testing.*]
ignore_missing_imports = True

[mypy-lsst.dax.ppdb.*]
ignore_missing_imports = False
ignore_errors = False
Expand Down
1 change: 1 addition & 0 deletions python/lsst/dax/ppdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@

from .config import *
from .ppdb import *
from .replicator import *
from .version import * # Generated by sconsUtils
21 changes: 21 additions & 0 deletions python/lsst/dax/ppdb/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,24 @@ def replication_options(parser: argparse.ArgumentParser) -> None:
group.add_argument(
"--update", help="Allow updates to already replicated data.", default=False, action="store_true"
)
group.add_argument(
"--min-wait-time",
type=int,
default=300,
metavar="SECONDS",
help="Minimum time to wait for replicating a chunk after a next chunk appears, default: %(default)s.",
)
group.add_argument(
"--max-wait-time",
type=int,
default=900,
metavar="SECONDS",
help="Maximum time to wait for replicating a chunk if no chunk appears, default: %(default)s.",
)
group.add_argument(
"--check-interval",
type=int,
default=360,
metavar="SECONDS",
help="Time to wait before next check if there was no replicated chunks, default: %(default)s.",
)
2 changes: 1 addition & 1 deletion python/lsst/dax/ppdb/cli/ppdb_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def main() -> None:
def _create_sql_subcommand(subparsers: argparse._SubParsersAction) -> None:
parser = subparsers.add_parser("create-sql", help="Create new PPDB instance in SQL database.")
parser.add_argument("db_url", help="Database URL in SQLAlchemy format for PPDB instance.")
parser.add_argument("config_path", help="Name of the new configuration file for created PPDB instance.")
parser.add_argument("output_config", help="Name of the new configuration file for created PPDB instance.")
options.felis_schema_options(parser)
options.sql_db_options(parser)
parser.add_argument(
Expand Down
20 changes: 20 additions & 0 deletions python/lsst/dax/ppdb/cli/ppdb_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import argparse

from lsst.dax.apdb import monitor
from lsst.dax.apdb.cli.logging_cli import LoggingCli

from .. import scripts
Expand All @@ -37,6 +38,15 @@ def main() -> None:
"""Commands for managing APDB-to-PPDB replication."""
parser = argparse.ArgumentParser(description="PPDB command line tools")
log_cli = LoggingCli(parser)
parser.add_argument(
"--mon-logger", help="Name of the logger to output monitoring metrics.", metavar="LOGGER"
)
parser.add_argument(
"--mon-rules",
help="Comma-separated list of monitoring filter rules.",
default="",
metavar="RULE[,RULE...]",
)

subparsers = parser.add_subparsers(title="available subcommands", required=True)
_list_chunks_apdb_subcommand(subparsers)
Expand All @@ -46,6 +56,16 @@ def main() -> None:
args = parser.parse_args()
log_cli.process_args(args)
kwargs = vars(args)

# Setup monitoring output.
mon_logger = kwargs.pop("mon_logger", None)
mon_rules = kwargs.pop("mon_rules", None)
if mon_logger is not None:
mon_handler = monitor.LoggingMonHandler(mon_logger)
monitor.MonService().add_handler(mon_handler)
if mon_rules:
monitor.MonService().set_filters(mon_rules.split(","))

method = kwargs.pop("method")
method(**kwargs)

Expand Down
8 changes: 7 additions & 1 deletion python/lsst/dax/ppdb/ppdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ def metadata(self) -> ApdbMetadata:
raise NotImplementedError()

@abstractmethod
def get_replica_chunks(self) -> list[PpdbReplicaChunk] | None:
def get_replica_chunks(self, start_chunk_id: int | None = None) -> list[PpdbReplicaChunk] | None:
"""Return collection of replica chunks known to the database.
Parameters
----------
start_chunk_id : `int`, optional
If specified this will be the starting chunk ID to return. If not
specified then all chunks areturned
Returns
-------
chunks : `list` [`PpdbReplicaChunk`] or `None`
Expand Down
174 changes: 174 additions & 0 deletions python/lsst/dax/ppdb/replicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# This file is part of dax_ppdb
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ["Replicator"]

import logging
from collections.abc import Iterable
from typing import TYPE_CHECKING

import astropy.time
from lsst.dax.apdb import monitor
from lsst.dax.apdb.timer import Timer

if TYPE_CHECKING:
from lsst.dax.apdb import ApdbReplica, ReplicaChunk

from .ppdb import Ppdb, PpdbReplicaChunk

_LOG = logging.getLogger(__name__)
_MON = monitor.MonAgent(__name__)


class Replicator:
"""Implementation of APDB-to-PPDB replication metods.
Parameters
----------
apdb : `~lsst.dax.apdb.ApdbReplica`
Object providing access to APDB replica management.
ppdb : `Ppdb`
Object providing access to PPD operations.
single : `bool`
Copy single bucket and stop.
update : `bool`
If `True` then allow updates to previously replicated data.
min_wait_time : `int`
Minimum time in seconds to wait for replicating a chunk after a next
chunk appears.
max_wait_time : `int`
Maximum time in seconds to wait for replicating a chunk if no chunk
appears.
"""

def __init__(
self,
apdb: ApdbReplica,
ppdb: Ppdb,
update: bool,
min_wait_time: int,
max_wait_time: int,
):
self._apdb = apdb
self._ppdb = ppdb
self._update = update
self._min_wait_time = min_wait_time
self._max_wait_time = max_wait_time

def copy_chunks(
self,
apdb_chunks: Iterable[ReplicaChunk],
ppdb_chunks: Iterable[PpdbReplicaChunk],
count: int | None = None,
) -> list[ReplicaChunk]:
"""Copy chunks of APDB data to PPDB.
Parameters
----------
apdb_chunks : `~collections.abc.Iterable` [`ReplicaChunk`]
List of APDB chunks.
ppdb_chunks : `~collections.abc.Iterable` [`PpdbReplicaChunk`]
List of PPDB chunks.
count : `int`, optional
Maximum number of chunks to copy, if not specified then copy all
chunks that can be copied.
Returns
-------
count : `list` [`ReplicaChunk`]
Chunks that were replicated to PPDB.
"""
existing_ppdb_ids = {ppdb_chunk.id for ppdb_chunk in ppdb_chunks}
chunks_to_copy = sorted(
(apdb_chunk for apdb_chunk in apdb_chunks if apdb_chunk.id not in existing_ppdb_ids),
key=lambda apdb_chunk: apdb_chunk.id,
)
_LOG.info("Replica chunks list contains %s chunks.", len(chunks_to_copy))

copied = []
while chunks_to_copy:
apdb_chunk = chunks_to_copy.pop(0)
if not self._can_replicate(apdb_chunk, bool(chunks_to_copy)):
break

_LOG.info("Will replicate chunk %s", apdb_chunk)
with Timer("replicate_chunk_time", _MON, tags={"chunk_id": apdb_chunk.id}):
self._replicate_one(apdb_chunk)
copied.append(apdb_chunk)
if count is not None and len(copied) >= count:
break

return copied

def _can_replicate(self, apdb_chunk: ReplicaChunk, more_chunks: bool) -> bool:
"""Decide whether chunk can be copied.
Parameters
----------
apdb_chunk : `ReplicaChunk`
APDB chunk to copy.
more_chunks : `bool`
If True then there are more chunks to copy after this one.
Returns
-------
can_copy : `bool`
If True then chunk is OK to copy.
"""
now = astropy.time.Time.now()
delta = (now - apdb_chunk.last_update_time).to_value("sec")
if more_chunks and delta >= self._min_wait_time:
# There are newer chunks, wait `min_wait_time` before copy.
_LOG.info(
"Chunk %s can be copied, it is older than %s seconds and newer chunks exist.",
apdb_chunk.id,
self._min_wait_time,
)
return True
if delta >= self._max_wait_time:
# Otherwise wait `max_wait_time` before copy.
_LOG.info(
"Chunk %s can be copied, it is older than %s seconds.",
apdb_chunk.id,
self._max_wait_time,
)
return True
return False

def _replicate_one(self, replica_chunk: ReplicaChunk) -> None:
"""Copy single chcunk from APDB to PPDB."""
with Timer("get_chunks_time", _MON, tags={"table": "DiaObject"}) as timer:
dia_objects = self._apdb.getDiaObjectsChunks([replica_chunk.id])
timer.add_values(row_count=len(dia_objects.rows()))
_LOG.info("Selected %s DiaObjects for replication", len(dia_objects.rows()))
with Timer("get_chunks_time", _MON, tags={"table": "DiaSource"}) as timer:
dia_sources = self._apdb.getDiaSourcesChunks([replica_chunk.id])
timer.add_values(row_count=len(dia_objects.rows()))
_LOG.info("Selected %s DiaSources for replication", len(dia_sources.rows()))
with Timer("get_chunks_time", _MON, tags={"table": "DiaForcedSource"}) as timer:
dia_forced_sources = self._apdb.getDiaForcedSourcesChunks([replica_chunk.id])
timer.add_values(row_count=len(dia_objects.rows()))
_LOG.info("Selected %s DiaForcedSources for replication", len(dia_forced_sources.rows()))

with Timer("store_chunks_time", _MON):
self._ppdb.store(replica_chunk, dia_objects, dia_sources, dia_forced_sources, update=self._update)
6 changes: 3 additions & 3 deletions python/lsst/dax/ppdb/scripts/create_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
def create_sql(
db_url: str,
schema: str | None,
config_path: str,
output_config: str,
felis_path: str,
felis_schema: str,
connection_pool: bool,
Expand All @@ -47,7 +47,7 @@ def create_sql(
SQLAlchemy connection string.
schema : `str` or `None`
Database schema name, `None` to use default schema.
config_path : `str`
output_config : `str`
Name of the file to write PPDB configuration.
felis_path : `str`
Path to the Felis YAML file with table schema definition.
Expand Down Expand Up @@ -75,5 +75,5 @@ def create_sql(
)
config_dict = config.model_dump(exclude_unset=True, exclude_defaults=True)
config_dict["implementation_type"] = "sql"
with open(config_path, "w") as config_file:
with open(output_config, "w") as config_file:
yaml.dump(config_dict, config_file)
Loading

0 comments on commit 608ee7a

Please sign in to comment.