Skip to content

Commit

Permalink
Enable users to store sample look up optimized tables
Browse files Browse the repository at this point in the history
Add input flag and validate its value.
  • Loading branch information
samanvp committed Mar 31, 2020
1 parent cae7031 commit 4d177c5
Showing 1 changed file with 81 additions and 51 deletions.
132 changes: 81 additions & 51 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,27 @@ class BigQueryWriteOptions(VariantTransformsOptions):

def add_arguments(self, parser):
# type: (argparse.ArgumentParser) -> None
parser.add_argument('--output_table',
default='',
help='BigQuery table to store the results.')
parser.add_argument(
'--output_table',
default='',
help=('Base name of the BigQuery tables which will store the results. '
'Note that sharded tables will be named as following: '
' * `output_table`__chr1 '
' * `output_table`__chr2 '
' * ... '
' * `output_table`__residual '
'where "chr1", "chr2", ..., and "residual" suffixes correspond '
'to the value of `table_name_suffix` in the sharding config file '
'(see --sharding_config_path).'))

parser.add_argument(
'--sample_lookup_optimized_output_table',
default='',
help=('In addition to the default output tables (which are optimized '
'for variant look up queries), you can store a second copy of '
'your data in BigQuery tables that are optimized for sample '
'look up queries. Note that setting this option will double your '
'BigQuery storage costs.'))

parser.add_argument(
'--sharding_config_path',
Expand Down Expand Up @@ -187,55 +205,67 @@ def validate(self, parsed_args, client=None):
not parsed_args.sharding_config_path.strip()):
raise ValueError(
'--sharding_config_path must point to a valid config file.')
# Ensuring (not) existence of output tables is aligned with --append value.
if parsed_args.output_table:
if (parsed_args.output_table !=
bigquery_util.get_table_base_name(parsed_args.output_table)):
raise ValueError(('Output table cannot contain "{}" we reserve this '

if not client:
credentials = GoogleCredentials.get_application_default().create_scoped(
['https://www.googleapis.com/auth/bigquery'])
client = bigquery.BigqueryV2(credentials=credentials)
if not parsed_args.output_table:
raise ValueError('--output_table must have a value.')
self._validate_output_tables(
client, parsed_args.output_table,
parsed_args.sharding_config_path, parsed_args.append)

if parsed_args.sample_lookup_optimized_output_table:
self._validate_output_tables(
client, parsed_args.sample_lookup_optimized_output_table,
parsed_args.sharding_config_path, parsed_args.append)

def _validate_output_tables(self, client,
output_table_base_name,
sharding_config_path, append):
if (output_table_base_name !=
bigquery_util.get_table_base_name(output_table_base_name)):
raise ValueError(('Output table cannot contain "{}" we reserve this '
'string to mark sharded output tables.').format(
bigquery_util.TABLE_SUFFIX_SEPARATOR))

project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
output_table_base_name)
bigquery_util.raise_error_if_dataset_not_exists(client, project_id,
dataset_id)
all_output_tables = []
all_output_tables.append(
bigquery_util.compose_table_name(
table_id, bigquery_util.TABLE_SUFFIX, is_sample=True))
sharding = variant_sharding.VariantSharding(sharding_config_path)
num_shards = sharding.get_num_shards()
# In case there is no residual in config we will ignore the last shard.
if not sharding.should_keep_shard(sharding.get_residual_index()):
num_shards -= 1
for i in range(num_shards):
table_suffix = sharding.get_output_table_suffix(i)
if table_suffix != bigquery_util.get_table_base_name(table_suffix):
raise ValueError(('Table suffix cannot contain "{}" we reserve this '
'string to mark sharded output tables.').format(
bigquery_util.TABLE_SUFFIX_SEPARATOR))
if not client:
credentials = GoogleCredentials.get_application_default().create_scoped(
['https://www.googleapis.com/auth/bigquery'])
client = bigquery.BigqueryV2(credentials=credentials)

project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
parsed_args.output_table)
bigquery_util.raise_error_if_dataset_not_exists(client, project_id,
dataset_id)
all_output_tables = []
all_output_tables.append(
bigquery_util.compose_table_name(
table_id, bigquery_util.TABLE_SUFFIX, is_sample=True))
sharding = variant_sharding.VariantSharding(
parsed_args.sharding_config_path)
num_shards = sharding.get_num_shards()
# In case there is no residual in config we will ignore the last shard.
if not sharding.should_keep_shard(sharding.get_residual_index()):
num_shards -= 1
for i in range(num_shards):
table_suffix = sharding.get_output_table_suffix(i)
if table_suffix != bigquery_util.get_table_base_name(table_suffix):
raise ValueError(('Table suffix cannot contain "{}" we reserve this '
'string to mark sharded output tables.').format(
bigquery_util.TABLE_SUFFIX_SEPARATOR))
all_output_tables.append(bigquery_util.compose_table_name(table_id,
table_suffix))

for output_table in all_output_tables:
if parsed_args.append:
if not bigquery_util.table_exist(client, project_id,
dataset_id, output_table):
raise ValueError(
'Table {}:{}.{} does not exist, cannot append to it.'.format(
project_id, dataset_id, output_table))
else:
if bigquery_util.table_exist(client, project_id,
dataset_id, output_table):
raise ValueError(
('Table {}:{}.{} already exists, cannot overwrite it. Please '
'set `--append True` if you want to append to it.').format(
project_id, dataset_id, output_table))
bigquery_util.TABLE_SUFFIX_SEPARATOR))
all_output_tables.append(bigquery_util.compose_table_name(table_id,
table_suffix))

for output_table in all_output_tables:
if append:
if not bigquery_util.table_exist(client, project_id,
dataset_id, output_table):
raise ValueError(
'Table {}:{}.{} does not exist, cannot append to it.'.format(
project_id, dataset_id, output_table))
else:
if bigquery_util.table_exist(client, project_id,
dataset_id, output_table):
raise ValueError(
('Table {}:{}.{} already exists, cannot overwrite it. Please '
'set `--append True` if you want to append to it.').format(
project_id, dataset_id, output_table))


class AnnotationOptions(VariantTransformsOptions):
Expand Down

0 comments on commit 4d177c5

Please sign in to comment.