Skip to content

Commit

Permalink
🚧 Split run() into separate functions, add types
Browse files Browse the repository at this point in the history
Sequence support requires the ability to load metadata into the database
without actually merging (if --output-metadata is not specified).
  • Loading branch information
victorlin committed Aug 30, 2024
1 parent 5c93e2d commit 96211b9
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions augur/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from shutil import which
from tempfile import mkstemp
from textwrap import dedent
from typing import Iterable, Tuple, TypeVar
from typing import Dict, Iterable, List, Sequence, Tuple, TypeVar

from augur.argparse_ import ExtendOverwriteDefault, SKIP_AUTO_DEFAULT_IN_HELP
from augur.errors import AugurError
Expand All @@ -55,6 +55,9 @@

T = TypeVar('T')

# Tuple is (table name, column name)
Columns = Dict[str, List[Tuple[str, str]]]


print_info = print_err

Expand Down Expand Up @@ -146,14 +149,27 @@ def run(args):
if args.quiet:
print_info = lambda *_: None

db = Database()

metadata = get_metadata(args.metadata, args.metadata_id_columns, args.metadata_delimiters)
output_columns = get_output_columns(metadata)
load_metadata(db, metadata)
merge_metadata(db, metadata, output_columns, args.output_metadata)


def get_metadata(
input_metadata: Sequence[str],
input_metadata_id_columns: Sequence[str],
input_metadata_delimiters: Sequence[str],
) -> List[NamedMetadata]:
# Validate --metadata arguments
metadata = parse_named_inputs(args.metadata)
metadata = parse_named_inputs(input_metadata)

# Parse --metadata-id-columns and --metadata-delimiters
metadata_names = set(name for name, _ in metadata)

metadata_id_columns = pairs(args.metadata_id_columns)
metadata_delimiters = pairs(args.metadata_delimiters)
metadata_id_columns = pairs(input_metadata_id_columns)
metadata_delimiters = pairs(input_metadata_delimiters)

if unknown_names := [repr(name) for name, _ in metadata_id_columns if name and name not in metadata_names]:
raise AugurError(dedent(f"""\
Expand All @@ -175,18 +191,17 @@ def run(args):


# Infer delimiters and id columns
metadata = [
return [
NamedMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS,
[column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS)
for name, path in metadata]


db = Database()

def get_output_columns(metadata: List[NamedMetadata]):
# Track columns as we see them, in order. The first metadata's id column
# is always the first output column of the merge, so insert it now.
output_id_column = metadata[0].id_column
output_columns = { output_id_column: [] }
output_columns: Columns = { output_id_column: [] }

if conflicting_columns := [f"{c!r} in metadata table {m.name!r} (id column: {m.id_column!r})"
for m in metadata
Expand All @@ -205,6 +220,28 @@ def run(args):
Renaming may be done with `augur curate rename`.
"""))

for m in metadata:
# Track which columns appear in which metadata inputs, preserving
# the order of both.
for column in m.columns:
# Match different id column names in different metadata files
# since they're logically equivalent. Any non-id columns that
# match the output_id_column (i.e. first table's id column) and
# would thus overwrite it with this logic are already a fatal
# error above.
output_column = output_id_column if column == m.id_column else column

output_columns.setdefault(output_column, [])
output_columns[output_column] += [(m.table_name, column)]

return output_columns


def load_metadata(
db: Database,
metadata: List[NamedMetadata],
):

# Read all metadata files into a SQLite db
for m in metadata:
# All other metadata reading in Augur (i.e. via the csv module)
Expand Down Expand Up @@ -234,20 +271,15 @@ def run(args):
assert m.columns == (table_columns := sqlite3_table_columns(db.path, m.table_name)), \
f"{m.columns!r} == {table_columns!r}"

# Track which columns appear in which metadata inputs, preserving
# the order of both.
for column in m.columns:
# Match different id column names in different metadata files
# since they're logically equivalent. Any non-id columns that
# match the output_id_column (i.e. first table's id column) and
# would thus overwrite it with this logic are already a fatal
# error above.
output_column = output_id_column if column == m.id_column else column

output_columns.setdefault(output_column, [])
output_columns[output_column] += [(m.table_name, column)]
return metadata


def merge_metadata(
db: Database,
metadata: List[NamedMetadata],
output_columns: Columns,
output_metadata: str,
):
# Construct query to produce merged metadata.
select_list = [
# Output metadata columns coalesced across input metadata columns
Expand Down Expand Up @@ -279,13 +311,13 @@ def run(args):
# Write merged metadata as export from SQLite db.
#
# Assume TSV like nearly all other extant --output-metadata options.
print_info(f"Merging metadata and writing to {args.output_metadata!r}…")
print_info(f"Merging metadata and writing to {output_metadata!r}…")
print_debug(query)
db.run(
f'.mode csv',
f'.separator "\\t" "\\n"',
f'.headers on',
f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(args.output_metadata)}")}',
f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(output_metadata)}")}',
query)

db.cleanup()
Expand Down

0 comments on commit 96211b9

Please sign in to comment.