Skip to content

Commit

Permalink
Add the flag for null replacement #68 (#331)
Browse files Browse the repository at this point in the history
- Add the flag for null numeric replacement.
- Add an integration test for this flag.

Issue: 68
Tested: unit tests & integration tests.
  • Loading branch information
allieychen authored Aug 16, 2018
1 parent e767449 commit 6420e96
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 211 deletions.
9 changes: 4 additions & 5 deletions docs/bigquery_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ In addition, the schema from Variant Transforms has the following properties:
does not allow null values in repeated fields (the entire record can be null,
but values within the record must each have a value). For instance, if a
VCF INFO field is `1,.,2`, we cannot load `1,null,2` to BigQuery and need to
use a numeric replacement for the null value. The replacement value is
currently set to `-2^31` (equal to `-2147483648`).
[Issue #68](https://github.com/googlegenomics/gcp-variant-transforms/issues/68)
tracks the feature to make this value configurable. The alternative is to
convert such values to a string and use `.` to represent the null value.
use a numeric replacement for the null value. By default, the replacement
value is set to `-2^31` (equal to `-2147483648`). You can also use
`--null_numeric_value_replacement` to customize this value. The alternative is
to convert such values to a string and use `.` to represent the null value.
To do this, please change the header to specify the type as `String`.

22 changes: 14 additions & 8 deletions gcp_variant_transforms/libs/bigquery_row_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

import copy
import json
from typing import Dict, Any # pylint: disable=unused-import
from typing import Any, Dict # pylint: disable=unused-import

from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import bigquery_schema_descriptor # pylint: disable=unused-import
from gcp_variant_transforms.libs import bigquery_sanitizer
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import processed_variant # pylint: disable=unused-import
from gcp_variant_transforms.libs import vcf_field_conflict_resolver # pylint: disable=unused-import
Expand All @@ -35,6 +36,7 @@
# Number of bytes to add to the object size when concatenating calls (i.e.
# to account for ", "). We use 5 bytes to be conservative.
_JSON_CONCATENATION_OVERHEAD_BYTES = 5
_BigQuerySchemaSanitizer = bigquery_sanitizer.SchemaSanitizer


class BigQueryRowGenerator(object):
Expand All @@ -45,10 +47,13 @@ def __init__(
schema_descriptor, # type: bigquery_schema_descriptor.SchemaDescriptor
conflict_resolver=None,
# type: vcf_field_conflict_resolver.ConflictResolver
null_numeric_value_replacement=None # type: int
):
# type: (...) -> None
self._schema_descriptor = schema_descriptor
self._conflict_resolver = conflict_resolver
self._bigquery_field_sanitizer = bigquery_sanitizer.FieldSanitizer(
null_numeric_value_replacement)

def get_rows(self,
variant,
Expand Down Expand Up @@ -124,7 +129,7 @@ def _get_call_record(
"""
call_record = {
bigquery_util.ColumnKeyConstants.CALLS_NAME:
bigquery_util.get_bigquery_sanitized_field(call.name),
self._bigquery_field_sanitizer.get_sanitized_field(call.name),
bigquery_util.ColumnKeyConstants.CALLS_PHASESET: call.phaseset,
bigquery_util.ColumnKeyConstants.CALLS_GENOTYPE: call.genotype or []
}
Expand All @@ -150,21 +155,21 @@ def _get_base_row_from_variant(self, variant, allow_incompatible_records):
} # type: Dict[str, Any]
if variant.names:
row[bigquery_util.ColumnKeyConstants.NAMES] = (
bigquery_util.get_bigquery_sanitized_field(variant.names))
self._bigquery_field_sanitizer.get_sanitized_field(variant.names))
if variant.quality is not None:
row[bigquery_util.ColumnKeyConstants.QUALITY] = variant.quality
if variant.filters:
row[bigquery_util.ColumnKeyConstants.FILTER] = (
bigquery_util.get_bigquery_sanitized_field(variant.filters))
self._bigquery_field_sanitizer.get_sanitized_field(variant.filters))
# Add alternate bases.
row[bigquery_util.ColumnKeyConstants.ALTERNATE_BASES] = []
for alt in variant.alternate_data_list:
alt_record = {bigquery_util.ColumnKeyConstants.ALTERNATE_BASES_ALT:
alt.alternate_bases}
for key, data in alt.info.iteritems():
alt_record[bigquery_util.get_bigquery_sanitized_field_name(key)] = (
alt_record[_BigQuerySchemaSanitizer.get_sanitized_field_name(key)] = (
data if key in alt.annotation_field_names else
bigquery_util.get_bigquery_sanitized_field(data))
self._bigquery_field_sanitizer.get_sanitized_field(data))
row[bigquery_util.ColumnKeyConstants.ALTERNATE_BASES].append(alt_record)
# Add info.
for key, data in variant.non_alt_info.iteritems():
Expand All @@ -187,14 +192,15 @@ def _get_bigquery_field_entry(
# type: (...) -> (str, Any)
if data is None:
return None, None
field_name = bigquery_util.get_bigquery_sanitized_field_name(key)
field_name = _BigQuerySchemaSanitizer.get_sanitized_field_name(key)
if not schema_descriptor.has_simple_field(field_name):
raise ValueError('BigQuery schema has no such field: {}.\n'
'This can happen if the field is not defined in '
'the VCF headers, or is not inferred automatically. '
'Retry pipeline with --infer_headers.'
.format(field_name))
sanitized_field_data = bigquery_util.get_bigquery_sanitized_field(data)
sanitized_field_data = self._bigquery_field_sanitizer.get_sanitized_field(
data)
field_schema = schema_descriptor.get_field_descriptor(field_name)
field_data, is_compatible = self._check_and_resolve_schema_compatibility(
field_schema, sanitized_field_data)
Expand Down
22 changes: 13 additions & 9 deletions gcp_variant_transforms/libs/bigquery_row_generator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import bigquery_schema_descriptor
from gcp_variant_transforms.libs import bigquery_row_generator
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import bigquery_sanitizer
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
from gcp_variant_transforms.libs.bigquery_util import ColumnKeyConstants
Expand Down Expand Up @@ -281,9 +281,13 @@ def test_null_repeated_fields(self):
ColumnKeyConstants.ALTERNATE_BASES: [],
ColumnKeyConstants.FILTER: ['q10'],
ColumnKeyConstants.CALLS: [],
'IIR': [0, 1, bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT],
'IIR': [0,
1,
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT],
'IBR': [True, False, False],
'IFR': [0.1, 0.2, bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
'IFR': [0.1,
0.2,
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
0.4],
'ISR': ['.', 'data1', 'data2']}
self.assertEqual([expected_row],
Expand Down Expand Up @@ -329,22 +333,22 @@ def test_nonstandard_float_values(self):
ColumnKeyConstants.END_POSITION: 12,
ColumnKeyConstants.REFERENCE_BASES: 'CT',
ColumnKeyConstants.ALTERNATE_BASES: [
{'IF3': -bigquery_util._INF_FLOAT_VALUE, 'alt': 'A'},
{'IF3': -bigquery_sanitizer._INF_FLOAT_VALUE, 'alt': 'A'},
{'IF3': None, 'alt': 'C'},
{'IF3': bigquery_util._INF_FLOAT_VALUE, 'alt': 'T'},
{'IF3': bigquery_sanitizer._INF_FLOAT_VALUE, 'alt': 'T'},
{'IF3': 1.2, 'alt': 'TC'}
],
ColumnKeyConstants.CALLS: [
{
ColumnKeyConstants.CALLS_NAME: 'Sample1',
ColumnKeyConstants.CALLS_GENOTYPE: [0, 1],
ColumnKeyConstants.CALLS_PHASESET: '*',
'GQ': bigquery_util._INF_FLOAT_VALUE
'GQ': bigquery_sanitizer._INF_FLOAT_VALUE
}
],
'IF': bigquery_util._INF_FLOAT_VALUE,
'IFR': [-bigquery_util._INF_FLOAT_VALUE,
bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
'IF': bigquery_sanitizer._INF_FLOAT_VALUE,
'IFR': [-bigquery_sanitizer._INF_FLOAT_VALUE,
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
1.2],
'IF2': None
}
Expand Down
199 changes: 199 additions & 0 deletions gcp_variant_transforms/libs/bigquery_sanitizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sanitizes BigQuery schema and field according to BigQuery restrictions."""

import math
import re
import sys
from typing import List, Optional # pylint: disable=unused-import

from gcp_variant_transforms.beam_io import vcfio

# Prefix to use when the first character of the field name is not [a-zA-Z]
# as required by BigQuery.
_FALLBACK_FIELD_NAME_PREFIX = 'field_'

# A big number to represent infinite float values. The division by 10 is to
# prevent unintentional overflows when doing subsequent operations.
_INF_FLOAT_VALUE = sys.float_info.max / 10
_DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT = -2 ^ 31


class SchemaSanitizer(object):
"""Class to sanitize BigQuery schema according to BigQuery restrictions."""

@staticmethod
def get_sanitized_string(input_str):
# type: (str) -> unicode
"""Returns a unicode as BigQuery API does not support UTF-8 strings."""
return _decode_utf8_string(input_str)

@staticmethod
def get_sanitized_field_name(field_name):
# type: (str) -> str
"""Returns the sanitized field name according to BigQuery restrictions.
BigQuery field names must follow `[a-zA-Z][a-zA-Z0-9_]*`. This method
converts any unsupported characters to an underscore. Also, if the first
character does not match `[a-zA-Z]`, it prepends
`_FALLBACK_FIELD_NAME_PREFIX` to the name.
Args:
field_name: Name of the field to sanitize.
Returns:
Sanitized field name with unsupported characters replaced with an
underscore. It also prepends the name with `_FALLBACK_FIELD_NAME_PREFIX`
if the first character does not match `[a-zA-Z]`.
"""
assert field_name # field_name must not be empty by this stage.
if not re.match('[a-zA-Z]', field_name[0]):
field_name = _FALLBACK_FIELD_NAME_PREFIX + field_name
return re.sub('[^a-zA-Z0-9_]', '_', field_name)


class FieldSanitizer(object):
"""Class to sanitize field values according to BigQuery restrictions."""

def __init__(self, null_numeric_value_replacement):
# type: (Optional[int]) -> None
"""Initializes a `BigQueryFieldSanitizer`.
Args:
null_numeric_value_replacement: Value to use instead of null for
numeric (float/int/long) lists. For instance, [0, None, 1] will become
[0, `null_numeric_value_replacement`, 1].
"""
self._null_numeric_value_replacement = (
null_numeric_value_replacement or
_DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT)

def get_sanitized_field(self, field):
# type: (Any) -> Any
"""Returns sanitized field according to BigQuery restrictions.
This method only sanitizes lists and strings. It returns the same `field`
for all other types (including None).
For lists, null values are replaced with reasonable defaults since the
BigQuery API does not allow null values in lists (note that the entire
list is allowed to be null). For instance, [0, None, 1] becomes
[0, `null_numeric_value_replacement`, 1].
Null value replacements are:
- `False` for bool.
- `.` for string (null string values should not exist in Variants parsed
using PyVCF though).
- `null_numeric_value_replacement` for float/int/long.
For strings, it returns its unicode representation. The BigQuery API does
not support strings that are UTF-8 encoded.
Args:
field: Field to sanitize. It can be of any type.
Raises:
ValueError: If the field could not be sanitized (e.g. unsupported types in
lists).
"""
if not field:
return field
if isinstance(field, basestring):
return self._get_sanitized_string(field)
elif isinstance(field, float):
return self._get_sanitized_float(field)
elif isinstance(field, list):
return self._get_sanitized_list(field)
else:
return field

def _get_sanitized_list(self, input_list):
# type: (List) -> List
"""Returns sanitized list according to BigQuery restrictions.
Null values are replaced with reasonable defaults since the
BigQuery API does not allow null values in lists (note that the entire
list is allowed to be null). For instance, [0, None, 1] becomes
[0, `null_numeric_value_replacement`, 1].
Null value replacements are:
- `False` for bool.
- `.` for string (null string values should not exist in Variants parsed
using PyVCF though).
- `null_numeric_value_replacement` for float/int/long.
Lists that contain strings are also sanitized according to the
`_get_sanitized_string` method.
Args:
input_list: List to sanitize.
Raises:
ValueError: If a list contains unsupported values. Supported types are
basestring, bool, int, long, and float.
"""
null_replacement_value = None
for i in input_list:
if i is None:
continue
if isinstance(i, basestring):
null_replacement_value = vcfio.MISSING_FIELD_VALUE
elif isinstance(i, bool):
null_replacement_value = False
elif isinstance(i, (int, long, float)):
null_replacement_value = self._null_numeric_value_replacement
else:
raise ValueError('Unsupported value for input: %s' % str(i))
break # Assumption is that all fields have the same type.
if null_replacement_value is None: # Implies everything was None.
return []
sanitized_list = []
for i in input_list:
if i is None:
i = null_replacement_value
elif isinstance(i, basestring):
i = self._get_sanitized_string(i)
elif isinstance(i, float):
sanitized_float = self._get_sanitized_float(i)
i = (sanitized_float if sanitized_float is not None
else null_replacement_value)
sanitized_list.append(i)
return sanitized_list

def _get_sanitized_float(self, input_float):
"""Returns a sanitized float for BigQuery.
This method replaces INF and -INF with positive and negative numbers with
huge absolute values, and replaces NaN with None. It returns the same value
for all other values.
"""
if input_float == float('inf'):
return _INF_FLOAT_VALUE
elif input_float == float('-inf'):
return -_INF_FLOAT_VALUE
elif math.isnan(input_float):
return None
else:
return input_float

def _get_sanitized_string(self, input_str):
# type: (str) -> unicode
"""Returns a unicode as BigQuery API does not support UTF-8 strings."""
return _decode_utf8_string(input_str)


def _decode_utf8_string(input_str):
# type: (str) -> unicode
try:
return (input_str if isinstance(input_str, unicode)
else input_str.decode('utf-8'))
except UnicodeDecodeError:
raise ValueError('input_str is not UTF-8: %s ' % (input_str))
Loading

0 comments on commit 6420e96

Please sign in to comment.