diff --git a/cloudbuild_CI.yaml b/cloudbuild_CI.yaml index 81f2cbb10..5ec4559f8 100644 --- a/cloudbuild_CI.yaml +++ b/cloudbuild_CI.yaml @@ -42,7 +42,9 @@ steps: - '--project ${PROJECT_ID}' - '--image_tag ${COMMIT_SHA}' - '--run_unit_tests' - - '--run_presubmit_tests' + - '--run_preprocessor_tests' + - '--run_bq_to_vcf_tests' + - '--run_all_tests' - '--test_name_prefix cloud-ci-' id: 'test-gcp-variant-transforms-docker' entrypoint: '/opt/gcp_variant_transforms/src/deploy_and_run_tests.sh' diff --git a/gcp_variant_transforms/libs/bigquery_util.py b/gcp_variant_transforms/libs/bigquery_util.py index eb3256799..0b7f54120 100644 --- a/gcp_variant_transforms/libs/bigquery_util.py +++ b/gcp_variant_transforms/libs/bigquery_util.py @@ -57,7 +57,7 @@ _BQ_EXTRACT_SCHEMA_COMMAND = ( 'bq show --schema --format=prettyjson {FULL_TABLE_ID} > {SCHEMA_FILE_PATH}') _GCS_DELETE_FILES_COMMAND = 'gsutil -m rm -f -R {ROOT_PATH}' -_BQ_NUM_RETRIES = 3 +_BQ_NUM_RETRIES = 5 _MAX_NUM_CONCURRENT_BQ_LOAD_JOBS = 4 _GET_COLUMN_NAMES_QUERY = ( @@ -502,7 +502,21 @@ def create_sample_info_table(output_table_id): _run_table_creation_command(bq_command) class FlattenCallColumn(object): + """Flattens call column to convert varinat opt tables to sample opt tables.""" + def __init__(self, base_table_id, suffixes): + # type (str, List[str]) -> None + """Initialize `FlattenCallColumn` object. + + In preparation to convert variant lookup optimized tables to sample lookup + optimized tables, we initiate this class with the base table name of variant + opt table (set using --output_table flag) and the list of suffixes (which + are extracted from sharding config file). + + Args: + base_table_id: Base name of variant opt outputs (set by --output_table). + suffixes: List of suffixes (extracted from sharding config file). + """ (self._project_id, self._dataset_id, self._base_table) = parse_table_reference(base_table_id) @@ -589,7 +603,7 @@ def _copy_to_flatten_table(self, output_table_id, cp_query): break logging.info('Copy to table query was successful: %s', output_table_id) - def _create_temp_flatten_table(self): + def _create_temp_flatten_table_with_1_row(self): temp_suffix = time.strftime('%Y%m%d_%H%M%S') temp_table_id = '{}{}'.format(self._schema_table_id, temp_suffix) full_output_table_id = '{}.{}.{}'.format( @@ -610,7 +624,21 @@ def _create_temp_flatten_table(self): return temp_table_id def get_flatten_table_schema(self, schema_file_path): - temp_table_id = self._create_temp_flatten_table() + # type: (str) -> bool + """Write the flatten table's schema to the given json file. + + This method basically performs the following tasks: + * composes a 'flattening query' based on _schema_table_id table's schema. + * Runs the 'flattening query' to read 1 row and writes it to a temp table. + * Extracts the schema of the temp table using _BQ_EXTRACT_SCHEMA_COMMAND. + + Args: + schema_file_path: The json schema will be written to this file. + + Returns; + A bool value indicating if the schema was successfully extracted. + """ + temp_table_id = self._create_temp_flatten_table_with_1_row() full_table_id = '{}:{}.{}'.format( self._project_id, self._dataset_id, temp_table_id) bq_command = _BQ_EXTRACT_SCHEMA_COMMAND.format( @@ -626,9 +654,22 @@ def get_flatten_table_schema(self, schema_file_path): logging.info('Successfully deleted temporary table: %s', full_table_id) else: logging.error('Was not able to delete temporary table: %s', full_table_id) - return result + return result == 0 def copy_to_flatten_table(self, output_base_table_id): + # type: (str) -> None + """Copies data from variant lookup optimized tables to sample lookup tables. + + Copies rows from _base_table_id__* to output_base_table_id__* for each value + of _suffixes. Here we assume destination tables are already created and are + partitioned based on call_sample_id column. The copying process is done via + a flattening query so if the source tables have repeated sample_ids the + output table will have more rows. In fact: + # of output rows = # of input rows * # of sample_ids + + Args: + output_base_table_id: Base table name of output tables. + """ # Here we assume all output_table_base + suffices[:] are already created. (output_project_id, output_dataset_id, diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index e0c8c495d..b60929e71 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -141,10 +141,14 @@ def add_arguments(self, parser): '--sample_lookup_optimized_output_table', default='', help=('In addition to the default output tables (which are optimized ' - 'for variant look up queries), you can store a second copy of ' + 'for variant lookup queries), you can store a second copy of ' 'your data in BigQuery tables that are optimized for sample ' - 'look up queries. Note that setting this option will double your ' - 'BigQuery storage costs.')) + 'lookup queries using this flag.' + 'Note that setting this flag will *at least* double your ' + 'BigQuery storage costs. If your input VCF files are joint ' + 'genotyped (say with n sample) then sample lookup tables will ' + 'have n * the number of rows of their corresponding variant ' + 'lookup table.')) parser.add_argument( '--output_avro_path', default='', @@ -236,6 +240,10 @@ def validate(self, parsed_args, client=None): parsed_args.sharding_config_path, parsed_args.append) if parsed_args.sample_lookup_optimized_output_table: + if (parsed_args.output_table == + parsed_args.sample_lookup_optimized_output_table): + raise ValueError('sample_lookup_optimized_output_table cannot be the ' + 'same as output_table.') self._validate_output_tables( client, parsed_args.sample_lookup_optimized_output_table, parsed_args.sharding_config_path, parsed_args.append) @@ -245,7 +253,7 @@ def _validate_output_tables(self, client, sharding_config_path, append): if (output_table_base_name != bigquery_util.get_table_base_name(output_table_base_name)): - raise ValueError(('Output table cannot contain "{}" we reserve this ' + raise ValueError(('Output table cannot contain "{}". we reserve this ' 'string to mark sharded output tables.').format( bigquery_util.TABLE_SUFFIX_SEPARATOR)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 7662534b1..ad4409b5c 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -563,7 +563,7 @@ def run(argv=None): known_args.output_table, suffixes) try: flatten_schema_file = tempfile.mkstemp(suffix=_BQ_SCHEMA_FILE_SUFFIX)[1] - if flatten_call_column.get_flatten_table_schema(flatten_schema_file) != 0: + if not flatten_call_column.get_flatten_table_schema(flatten_schema_file): raise ValueError('Failed to extract schema of flatten table') # Create output flatten tables if needed if not known_args.append: