Source code for VAPr.vcf_genotype_fields_parsing

# coding: utf-8
# standard libraries
import logging
import re
import warnings

# project libraries
import VAPr.validation


def _warn_of_unparseable_format_field(field_desc, field_tag, field_value, failure_desc):
    """Raise a warning for the input field's issue using a standard wording and format.

    Args:
        field_desc (str): A brief human-readable phrase identifying the info indicated by the tag.
        field_tag (str): The two- or three-character key for this field in the VCF format line (e.g., AD, PL, etc).
        field_value (object): The value associated with the field in the VCF format value line; may have been parsed
            to a non-string data type.
        failure_desc (str): A brief human-readable phrase describing the problem identified with the tag_value.
    """
    warn_msg = "The {0} tag value {1} {2} so {3} information could not be captured for the current variant.".format(
        field_tag, field_value, failure_desc, field_desc)
    warnings.warn(warn_msg)


def _capture_unprocessed_field(field_tag, field_value, genotype_info_to_fill):
    """Attempt basic delimiter splitting and/or numeric casting for input field, and store results in VCFGenotypeInfo.

    Args:
        field_tag (str): The two- or three-character key for this field in the VCF format line (e.g., AD, PL, etc).
        field_value (str): The string value associated with the field in the VCF format value line.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with additional entries added to unprocessed_info dictionary.

    """
    try:
        genotype_info_to_fill.unprocessed_info[field_tag] = float(field_value)
    except ValueError:  # if the value can't be converted to a float
        split_list = field_value.split(",")
        cast_split_list = []

        for value in split_list:
            try:
                cast_split_list.append(float(value))
            except ValueError:  # if the value can't be converted to a float
                cast_split_list.append(value)

        if len(cast_split_list) > 1:
            genotype_info_to_fill.unprocessed_info[field_tag] = cast_split_list
        else:
            genotype_info_to_fill.unprocessed_info[field_tag] = cast_split_list[0]

    return genotype_info_to_fill


def _fill_genotype_class(alleles, genotype_info_to_fill):
    """Id genotype class (homozygous/heterozygous) and subclass (reference, alt, compound) and store in VCFGenotypeInfo.

    Args:
        alleles (List[str]): Results of splitting the value for the GT (genotype) format tag.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with genotype_subclass_by_class value set.

    """
    genotype_class = "homozygous"
    genotype_subclass = "reference"
    alt_subclass_name = "alt"

    if alleles[0] != alleles[1]:
        genotype_class = "heterozygous"
        alt_subclass_name = "compound"

    if "0" not in alleles:
        genotype_subclass = alt_subclass_name

    result = {genotype_class: genotype_subclass}
    genotype_info_to_fill.genotype_subclass_by_class = result
    return result


def _fill_genotype(field_value, genotype_info_to_fill):
    """Parse the genotype of this sample at this site and store in the VCFGenotypeInfo.

    Args:
        field_value (str): The value associated with the GT tag in the format value string.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: the input VCFGenotypeInfo with additional fields filled in.

    From https://gatkforums.broadinstitute.org/gatk/discussion/1268/what-is-a-vcf-and-how-should-i-interpret-it :
    " GT : The genotype of this sample at this site.
        For a diploid organism, the GT field indicates the two alleles carried by the sample, encoded by a 0 for the REF
         allele, 1 for the first ALT allele, 2 for the second ALT allele, etc. When there's a single ALT allele (by far
         the more common case), GT will be either:

        0/0 - the sample is homozygous reference
        0/1 - the sample is heterozygous, carrying 1 copy of each of the REF and ALT alleles
        1/1 - the sample is homozygous alternate

      In the three sites shown in the example above, NA12878 is observed with the allele combinations T/G, G/G, and C/T
      respectively.

      For non-diploids, the same pattern applies; in the haploid case there will be just a single value in GT; for
      polyploids there will be more, e.g. 4 values for a tetraploid organism."
    """

    alleles = field_value.split('/')
    if len(alleles) == 1:
        alleles = field_value.split('|')

    if len(alleles) != 2:
        _warn_of_unparseable_format_field("genotype", VCFGenotypeParser.GENOTYPE_TAG, field_value,
                                          "does not split into exactly two values")
        return genotype_info_to_fill
    genotype_info_to_fill.genotype = field_value
    _fill_genotype_class(alleles, genotype_info_to_fill)
    return genotype_info_to_fill


def _fill_unfiltered_reads_counts(field_value, genotype_info_to_fill):
    """Parse the unfiltered reads counts for this sample at this site and store in the VCFGenotypeInfo.

    Args:
        field_value (str): The value associated with the AD tag in the format value string.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with new entries added to the alleles list.

    From https://gatkforums.broadinstitute.org/gatk/discussion/1268/what-is-a-vcf-and-how-should-i-interpret-it  :
    " AD ... : Allele depth ....
      AD is the unfiltered allele depth, i.e. the number of reads that support each of the reported alleles. All reads
      at the position (including reads that did not pass the variant caller's filters) are included in this number,
      except reads that were considered uninformative. Reads are considered uninformative when they do not provide
      enough statistical evidence to support one allele over another."
    """
    delimiter = ','
    counts = field_value.split(delimiter)
    if len(counts) < 2:
        _warn_of_unparseable_format_field("unfiltered allele depth", VCFGenotypeParser.UNFILTERED_ALLELE_DEPTH_TAG,
                                          field_value, "does not split into at least two values")
    else:
        for curr_count in counts:
            new_allele = Allele(curr_count)
            genotype_info_to_fill.alleles.append(new_allele)

    return genotype_info_to_fill


def _fill_filtered_reads_count(field_value, genotype_info_to_fill):
    """Parse the filtered depth of coverage of this sample at this site and store in the VCFGenotypeInfo.

    Args:
        field_value (str): The value associated with the DP tag in the format value string.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with filter_passing_reads_count filled in.

    From https://gatkforums.broadinstitute.org/gatk/discussion/1268/what-is-a-vcf-and-how-should-i-interpret-it :
    " DP : ... depth of coverage
      DP is the filtered depth, at the sample level. This gives you the number of filtered reads that support each of
      the reported alleles. You can check the variant caller’s documentation to see which filters are applied by
      default. Only reads that passed the variant caller’s filters are included in this number. However, unlike the AD
      calculation, uninformative reads are included in DP."
    """
    genotype_info_to_fill.filter_passing_reads_count = field_value
    return genotype_info_to_fill


def _fill_genotype_confidence(field_value, genotype_info_to_fill):
    """Parse the genotype quality (confidence) of this sample at this site and store in the VCFGenotypeInfo.

    Args:
        field_value (str): The value associated with the GQ tag in the format value string.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with genotype_confidence filled in.

    From https://gatkforums.broadinstitute.org/gatk/discussion/1268/what-is-a-vcf-and-how-should-i-interpret-it :
    " GQ : Quality of the assigned genotype.
      The Genotype Quality represents the Phred-scaled confidence that the genotype assignment (GT) is correct, derived
      from the genotype PLs. Specifically, the GQ is the difference between the PL of the second most likely genotype,
      and the PL of the most likely genotype. As noted above, the values of the PLs are normalized so that the most
      likely PL is always 0, so the GQ ends up being equal to the second smallest PL, unless that PL is greater than 99.
      In GATK, the value of GQ is capped at 99 because larger values are not more informative, but they take more space
      in the file. So if the second most likely PL is greater than 99, we still assign a GQ of 99.

      Basically the GQ gives you the difference between the likelihoods of the two most likely genotypes. If it is low,
      you can tell there is not much confidence in the genotype, i.e. there was not enough evidence to confidently
      choose one genotype over another. See the FAQ article on the Phred scale to get a sense of what would be
      considered low.

      Not to be confused with the site-level annotation QUAL; see this FAQ article for an explanation of the differences
      in what they mean and how they should be used."
    """
    genotype_info_to_fill.genotype_confidence = field_value
    return genotype_info_to_fill


def _fill_genotype_likelihoods(field_value, genotype_info_to_fill):
    """Parse the "normalized" Phred-scaled likelihoods of possible genotypes of this sample at this site and store.

    Args:
        field_value (str): The value associated with the PL tag in the format value string.
        genotype_info_to_fill (VCFGenotypeInfo): A partially filled VCFGenotypeInfo object.

    Returns:
        VCFGenotypeInfo: The input VCFGenotypeInfo with additional fields filled in.

    Note that this function will MAKE the number of Allele objects implied by the likelihood string if no alleles have
    been filled into the genotype_info_to_fill by the time this function is called. The reason for this is that there
    ARE valid VCF format strings (e.g., 'GT:GQ:PL') that have the PL tag (likelihood) but no AD tag in them, and since
    Allele objects are usually created in the processing of the AD tag, some back-up approach was needed to infer
    Alleles in this situation.  Of course, the Alleles created in this situation will all have None as their
    read_counts, since the read_counts value comes from the AD tag.

    From https://gatkforums.broadinstitute.org/gatk/discussion/1268/what-is-a-vcf-and-how-should-i-interpret-it:
    " PL : "Normalized" Phred-scaled likelihoods of the possible genotypes.
      For the typical case of a monomorphic site (where there is only one ALT allele) in a diploid organism, the PL
      field will contain three numbers, corresponding to the three possible genotypes (0/0, 0/1, and 1/1). The PL values
      are "normalized" so that the PL of the most likely genotype (assigned in the GT field) is 0 in the Phred scale. We
      use "normalized" in quotes because these are not probabilities. We set the most likely genotype PL to 0 for easy
      reading purpose.The other values are scaled relative to this most likely genotype.

      Keep in mind, if you're not familiar with the statistical lingo, that when we say PL is the "Phred-scaled
      likelihood of the genotype", we mean it is "How much less likely that genotype is compared to the best one".
    "
    """
    generate_alleles = False
    delimiter = ','
    likelihoods = field_value.split(delimiter)

    num_expected_alleles = len(genotype_info_to_fill.alleles)
    if num_expected_alleles == 0:
        generate_alleles = True
        genotype_info_to_fill.alleles.append(Allele(None))

    allele_number = 0
    likelihood_number = 0
    for index in range(len(likelihoods)):
        if likelihood_number > allele_number:
            allele_number += 1
            likelihood_number = 0

            if generate_alleles:
                genotype_info_to_fill.alleles.append(Allele(None))

            # NB: The check below looks at the *current* number of alleles in genotype_info_to_fill, which may be
            # different than the value found above and stored in num_expected_alleles due to the alleles.append
            # statements at various points above.
            if allele_number >= len(genotype_info_to_fill.alleles):
                _warn_of_unparseable_format_field("'normalized' Phred-scaled likelihoods of possible genotypes",
                                                  VCFGenotypeParser.NORMALIZED_SCALED_LIKELIHOODS_TAG,
                                                  field_value,
                                                  "appears to contain information for more alleles than expected")

                # in case of warning, clear any already-parsed genotype likelihoods (they probably can't be trusted)
                # and return out of function
                genotype_info_to_fill.genotype_likelihoods = []
                return genotype_info_to_fill
            # end warn
        # end if likelihood_number > allele_number

        new_likelihood = GenotypeLikelihood(likelihood_number, allele_number, likelihoods[index])
        genotype_info_to_fill.genotype_likelihoods.append(new_likelihood)
        likelihood_number += 1

    if allele_number < (num_expected_alleles-1) or likelihood_number < num_expected_alleles:
        _warn_of_unparseable_format_field("'normalized' Phred-scaled likelihoods of possible genotypes",
                                          VCFGenotypeParser.NORMALIZED_SCALED_LIKELIHOODS_TAG,
                                          field_value, "appears to contain information for fewer alleles than expected")

        # in case of warning, clear any already-parsed genotype likelihoods (they probably can't be trusted)
        genotype_info_to_fill.genotype_likelihoods = []

    return genotype_info_to_fill


[docs]class VCFGenotypeInfo(object): """Store parsed info from VCF genotype fields for a single sample. Attributes: _raw_string (str): The genotype fields values string from a VCF file (e.g., '0/1:173,141:282:99:255,0,255'). genotype (Optional[`str`]): The type of each of the sample's two alleles, such as 0/0, 0/1, etc. alleles (List[Allele]): One Allele object for each allele detected for this variant (this can be across samples, so there can be more than 2 alleles). genotype_likelihoods (List[GenotypeLikelihood]): The GenotypeLikelihood object for each allele. unprocessed_info (Dict[str, Any]): Dictionary of field tag and value(s) for any fields not stored in dedicated attributes of VCFGenotypeInfo. Values are parsed to lists and/or floats if possible. genotype_subclass_by_class (Dict[str, str]): Genotype subclass (reference, alt, compound) keyed by genotype class (homozygous/heterozygous). """ def __init__(self, raw_string): """Create VCFGenotypeInfo object. Args: raw_string (str): The genotype fields values string from a VCF file (e.g., '0/1:173,141:282:99:255,0,255'). """ self._raw_string = raw_string self._genotype_confidence = None self._filter_passing_reads_count = None # TODO: someday: Probably these should become properties so they are protected from user resetting them, etc. self.genotype = None self.alleles = [] # 0 is ref, 1 is first alt, etc self.genotype_likelihoods = [] self.unprocessed_info = {} self.genotype_subclass_by_class = None @property def genotype_confidence(self): """str: Genotype quality (confidence) of this sample at this site, from the GQ field.""" return self._genotype_confidence @genotype_confidence.setter def genotype_confidence(self, value): # TODO: someday: Determine if genotype confidence value is limited to being a positive or non-negative number self._genotype_confidence = VAPr.validation.convert_to_nullable(value, float) @property def filter_passing_reads_count(self): """int or None: Filtered depth of coverage of this sample at this site from the DP field.""" return self._filter_passing_reads_count @filter_passing_reads_count.setter def filter_passing_reads_count(self, value): self._filter_passing_reads_count = VAPr.validation.convert_to_nonneg_int(value, nullable=True)
# TODO: someday: refactor to remove Allele object and replace with single value # One could argue pretty convincingly that there is no longer a need for a whole Allele object now that # (with the removal of database-related properties) it has only one property--surely that could be represented by # a single value rather than a class! But this would be a non-trivial refactor, so I'm leaving it for some misty future # date.
[docs]class Allele(object): """Store unfiltered read counts, if any, for a particular allele.""" def __init__(self, unfiltered_read_counts=None): """Create Allele object. Args: unfiltered_read_counts (Optional[str]): Number of unfiltered reads counts for this sample at this site, from AD field. """ self._unfiltered_read_counts = None if unfiltered_read_counts is not None: self.unfiltered_read_counts = unfiltered_read_counts @property def unfiltered_read_counts(self): """int or None: Number of unfiltered reads counts for this sample at this site, from AD field.""" return self._unfiltered_read_counts @unfiltered_read_counts.setter def unfiltered_read_counts(self, value): self._unfiltered_read_counts = VAPr.validation.convert_to_nonneg_int(value, nullable=True)
[docs]class GenotypeLikelihood(object): """Store parsed info from VCF genotype likelihood field for a single sample.""" @staticmethod def _validate_allele_relationship(allele1_number, allele2_number): """Ensure that allele1_number is not greater than allele2_number. Args: allele1_number (int): The allele identifier (0 for reference, 1 for first alternate, etc) for the left-hand allele inferred for this genotype likelihood. allele2_number (int): The allele identifier (0 for reference, 1 for first alternate, etc) for the right-hand allele inferred for this genotype likelihood. Raises: ValueError: If `allele1_number` is greater than `allele2_number`. Genotype likelihood strings contain one likelihood for each possible allele combination for this variant at this site. The likelihood string does not explicity state the allele combination associated with each likelihood; rather, the allele combinations are inferred based on the convention that they are always listed in ascending order from lowest allele number to highest allele number. For example, for the likelihood string '495,162,123,213,129,175,67,0,46,28.1', the implied allele combinations are 0/0, 0/1, 1/1, 0/2, 1/2, 2/2, 0/3, 1/3, 2/3, 3/3 . Note that these are COMBINATIONS, not PERMUTATIONS, so each pair of values occurs only once and, again by convention, the representation expected is the one where the allele number on the left (arbitrarily referred to here as allele1) has a value less than (or equal to) the allele on the right (arbitrarily referred to here as allele2). IF, in parsing the likelihood string and inferring the allele combinations, the code somehow ended up with an inferred combination in which the left-hand allele number is GREATER than the right-hand allele number, that means something has gone tragically wrong! """ if allele1_number > allele2_number: raise ValueError("VCF-format genotypes must have allele 2 number ({0}) " "greater than or equal to allele 1 number ({1})".format(allele2_number, allele1_number)) def __init__(self, allele1_number, allele2_number, likelihood_neg_exponent): """Create GenotypeLikelihood object. Args: allele1_number (int or str): The allele id for the right-hand allele inferred for this genotype likelihood. allele2_number (int or str): The allele id for the left-hand allele inferred for this genotype likelihood. likelihood_neg_exponent (float or str): The "normalized" Phred-scaled likelihood of the genotype represented by allele1 and allele2, as a string. """ self._allele1_number = None self._allele2_number = None self._likelihood_neg_exponent = None self.allele1_number = allele1_number self.allele2_number = allele2_number self.likelihood_neg_exponent = likelihood_neg_exponent @property def allele1_number(self): """int: The allele identifier for the left-hand allele inferred for this genotype likelihood.""" return self._allele1_number @allele1_number.setter def allele1_number(self, value): int_value = VAPr.validation.convert_to_nonneg_int(value, nullable=True) if self.allele2_number is not None: self._validate_allele_relationship(int_value, self.allele2_number) self._allele1_number = int_value @property def allele2_number(self): """int: The allele identifier for the right-hand allele inferred for this genotype likelihood. """ return self._allele2_number @allele2_number.setter def allele2_number(self, value): int_value = VAPr.validation.convert_to_nonneg_int(value, nullable=True) if self.allele1_number is not None: self._validate_allele_relationship(self.allele1_number, int_value) self._allele2_number = int_value @property def likelihood_neg_exponent(self): """float: The "normalized" Phred-scaled likelihood of the genotype represented by allele1 and allele2.""" return self._likelihood_neg_exponent @likelihood_neg_exponent.setter def likelihood_neg_exponent(self, value): self._likelihood_neg_exponent = VAPr.validation.convert_to_nullable(value, float)
[docs]class VCFGenotypeParser(object): """Mine format string and genotype fields string to create a filled VCFGenotypeInfo object."""
[docs] @staticmethod def is_valid_genotype_fields_string(genotype_fields_string): """Return true if input has any real genotype fields content, false if is just periods, zeroes, and delimiters. Args: genotype_fields_string (str): A VCF-style genotype fields string, such as 1/1:0,2:2:6:89,6,0 or ./.:.:.:.:. Returns bool: true if input has any real genotype fields content, false if is just periods, zeroes, and delimiters. """ result = False # this regex means "one or more characters that is not a comma, period, colon, zero, or forward slash" content_char_match = re.search(r"[^,.:0\/]+", genotype_fields_string) # NB: necessary to ALSO check first character of string, even if no match to above regex is found, because # "0/0" should be a valid genotype (even though all the characters it contains could signal null content in # other configurations), and a genotype fields string with nothing but a genotype in it should be legal. if content_char_match is not None or not genotype_fields_string.startswith("."): result = True return result
GENOTYPE_TAG = "GT" # str: VCF tag for the genotype of this sample at this site. UNFILTERED_ALLELE_DEPTH_TAG = "AD" # str: VCF tag for the unfiltered allele depth of this sample at this site. FILTERED_ALLELE_DEPTH_TAG = "DP" # str: VCF tag for the filtered depth of coverage of this sample at this site. GENOTYPE_QUALITY_TAG = "GQ" # str: VCF tag for the genotype quality of this sample at this site. NORMALIZED_SCALED_LIKELIHOODS_TAG = "PL" # str: VCF tag for the genotype likelihoods of this sample at this site. _DELIMITER = ':' # str: Delimiter between fields in format and genotype fields strings. # Dict(str, Callable[str, VCFGenotypeInfo]): Special parsing functions by the VCF tag whose value they parse. _PARSER_FUNCS = {GENOTYPE_TAG: _fill_genotype, # GT UNFILTERED_ALLELE_DEPTH_TAG: _fill_unfiltered_reads_counts, # AD FILTERED_ALLELE_DEPTH_TAG: _fill_filtered_reads_count, # DP GENOTYPE_QUALITY_TAG: _fill_genotype_confidence, # GQ NORMALIZED_SCALED_LIKELIHOODS_TAG: _fill_genotype_likelihoods} # PL
[docs] @classmethod def parse(cls, format_key_string, format_value_string): """Parse the input format string and genotype fields string into a filled VCFGenotypeInfo object. Args: format_key_string (str): The VCF format string (e.g., 'GT:AD:DP:GQ:PL') for this sample at this site. format_value_string (str): The VCF genotype fields values string (e.g., '1/1:0,34:34:99:1187.2,101,0') corresponding to the format_key_string for this sample at this site. Returns: VCFGenotypeInfo or None: A filled VCFGenotypeInfo for this sample at this site unless an error was encountered, in which case None is returned. encountered, in which case None is returned. """ result = None try: if cls.is_valid_genotype_fields_string(format_value_string): result = VCFGenotypeInfo(format_value_string) format_subkeys = format_key_string.split(cls._DELIMITER) format_values = format_value_string.split(cls._DELIMITER) for index, curr_key in enumerate(format_subkeys): curr_value = format_values[index] if curr_key in cls._PARSER_FUNCS: # if this key has a special parsing function associated with it, use that function parse_func = cls._PARSER_FUNCS[curr_key] result = parse_func(curr_value, result) else: # otherwise, capture this key/value with minimal processing to a catch-all dictionary result = _capture_unprocessed_field(curr_key, curr_value, result) except Exception as e: warn_msg = "Encountered error '{0}' so genotype fields information could not be captured for the " \ "current variant.".format(e) warnings.warn(warn_msg) result = None # reset result to None, as contents can't be trusted return result