Skip to content

Commit

Permalink
types, better compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard C. Burhans committed Apr 23, 2024
1 parent 1a55aab commit b8aa943
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
51 changes: 30 additions & 21 deletions tools/segalign/diagonal_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@

import os
import sys
import typing

T = typing.TypeVar("T", bound="_Sliceable")

def chunks(lst, n):

class _Sliceable(typing.Protocol):
def __len__(self) -> int:
...

def __getitem__(self: T, i: slice) -> T:
...


def chunks(lst: T, n: int) -> typing.Iterator[T]:
"""Yield successive n-sized chunks from list."""
for i in range(0, len(lst), n):
yield lst[i: i + n]
Expand All @@ -29,7 +40,7 @@ def chunks(lst, n):
# Parsing command output from SegAlign
segment_key = "--segments="
segment_index = None
input_file = None
input_file: typing.Optional[str] = None

for index, value in enumerate(params):
if value[: len(segment_key)] == segment_key:
Expand All @@ -40,7 +51,7 @@ def chunks(lst, n):
print(f"Error: could not segment key {segment_key} in parameters {params}")
exit(0)

if not os.path.isfile(input_file):
if input_file is None or not os.path.isfile(input_file):
print(f"Error: File {input_file} does not exist")
exit(0)

Expand All @@ -52,9 +63,9 @@ def chunks(lst, n):

# Find rest of relevant parameters
output_key = "--output="
output_index = None
output_index: typing.Optional[int] = None
output_alignment_file = None
output_alignment_file_base = None
output_alignment_file_base: typing.Optional[str] = None
output_format = None

strand_key = "--strand="
Expand All @@ -78,7 +89,9 @@ def chunks(lst, n):
err_index = -1 # error file is at very end
err_name_base = params[-1].split(".err", 1)[0]

data = {} # dict of list of tuple (x, y, str)
data: typing.Dict[
typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]]
] = {} # dict of list of tuple (x, y, str)

direction = None
if "plus" in params[strand_index]:
Expand Down Expand Up @@ -153,13 +166,10 @@ def chunks(lst, n):
# update segment file in command
params[segment_index] = segment_key + fname
# update output file in command
params[output_index] = (
output_key
+ output_alignment_file_base
+ name_addition
+ "."
+ output_format
)
if output_index is not None:
params[output_index] = (
f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}"
)
# update error file in command
params[-1] = err_name_base + name_addition + ".err"
print(" ".join(params), flush=True)
Expand All @@ -169,7 +179,9 @@ def chunks(lst, n):
skip_pairs_with_len = sorted(
[(len(data[p]), p) for p in skip_pairs]
) # list of tuples of (pair length, pair)
aggregated_skip_pairs = [] # list of list of pair names
aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = (
[]
) # list of list of pair names
current_count = 0
aggregated_skip_pairs.append([])
for count, pair in skip_pairs_with_len:
Expand All @@ -192,13 +204,10 @@ def chunks(lst, n):
# update segment file in command
params[segment_index] = segment_key + fname
# update output file in command
params[output_index] = (
output_key
+ output_alignment_file_base
+ name_addition
+ "."
+ output_format
)
if output_index is not None:
params[output_index] = (
f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}"
)
# update error file in command
params[-1] = err_name_base + name_addition + ".err"
print(" ".join(params), flush=True)
Expand Down
14 changes: 7 additions & 7 deletions tools/segalign/package_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _initialize(self) -> None:
name=self.pathname,
mode="w:gz",
format=tarfile.GNU_FORMAT,
compresslevel=1,
compresslevel=6,
)

def add_config(self, pathname: str) -> None:
Expand Down Expand Up @@ -152,7 +152,7 @@ def _parse_lines(self) -> None:

def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]:
# resolve shell redirects
trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined]
trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False)
positions: typing.List[typing.Tuple[int, int]] = []

for tree in trees:
Expand All @@ -176,7 +176,7 @@ def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]:
return command_dict

def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]:
argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined]
argv: typing.List[str] = list(bashlex.split(line))
self.executable = argv.pop(0)

parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False)
Expand Down Expand Up @@ -236,7 +236,7 @@ def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]:
return command_dict


class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc]
class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[misc]
def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None:
self.positions = positions
self.stdin = None
Expand All @@ -245,14 +245,14 @@ def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None:

def visitredirect(
self,
n: bashlex.ast.node, # type: ignore[name-defined]
n: bashlex.ast.node,
n_input: int,
n_type: str,
output: typing.Any,
heredoc: typing.Any,
) -> None:
if isinstance(n_input, int) and 0 <= n_input <= 2:
if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined]
if isinstance(output, bashlex.ast.node) and output.kind == "word":
self.positions.append(n.pos)
if n_input == 0:
self.stdin = output.word
Expand All @@ -265,7 +265,7 @@ def visitredirect(
else:
sys.exit(f"oops 2: {type(n_input)}")

def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined]
def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None:
pass


Expand Down

0 comments on commit b8aa943

Please sign in to comment.