# built-in libraries
import csv
import itertools
import logging
import sys

# third-party libraries
import myvariant

# project libraries
from VAPr.vcf_genotype_fields_parsing import VCFGenotypeParser

[docs]class AnnovarTxtParser(object): """ Class that processes an Annovar-created tab-delimited text file.""" CHR_HEADER = 'chr' START_HEADER = 'start' END_HEADER = 'end' REF_HEADER = 'ref' ALT_HEADER = 'alt' OTHERINFO_HEADER = 'otherinfo' THOU_G_2015_ALL_HEADER = '1000g2015aug_all' ESP6500_ALL_HEADER = 'esp6500siv2_all' NCI60_HEADER = 'nci60' CYTOBAND_HEADER = 'cytoband' GENOMIC_SUPERDUPS_HEADER = 'genomicsuperdups' TFBS_CONS_SITES_HEADER = 'tfbsconssites' FUNC_KNOWNGENE_HEADER = 'func_knowngene' GENE_KNOWNGENE_HEADER = 'gene_knowngene' GENEDETAIL_KNOWNGENE_HEADER = 'genedetail_knowngene' EXONICFUNC_KNOWNGENE_HEADER = 'exonicfunc_knowngene' SCORE_KEY = "Score" RAW_CHR_MT_VAL = "chrM" RAW_CHR_MT_SUFFIX_VAL = RAW_CHR_MT_VAL.replace(CHR_HEADER, "") STANDARDIZED_CHR_MT_VAL = "chrMT" STANDARDIZED_CHR_MT_SUFFIX_VAL = STANDARDIZED_CHR_MT_VAL.replace(CHR_HEADER, "") _ANNOVAR_OUTPUT_COLS = [CHR_HEADER, START_HEADER, END_HEADER, REF_HEADER, ALT_HEADER, FUNC_KNOWNGENE_HEADER, GENE_KNOWNGENE_HEADER, GENEDETAIL_KNOWNGENE_HEADER, EXONICFUNC_KNOWNGENE_HEADER, # CYTOBAND_HEADER, # GENOMIC_SUPERDUPS_HEADER, THOU_G_2015_ALL_HEADER] #, # ESP6500_ALL_HEADER, # 'cosmic70', # NCI60_HEADER, #OTHERINFO_HEADER] @staticmethod def _normalize_header(raw_headers_list): """Lower-case all header strings and replace periods with underscores. Args: raw_headers_list (List[str]): A list of the headers from the Annovar txt output file. Returns: List[str]: A list of the normalized headers, in the same order as the input list. """ normalized_headers_list = [] for curr_header in raw_headers_list: normalized_headers_list.append(curr_header.lower().replace('.', '_')) return normalized_headers_list
[docs] @classmethod def read_chunk_of_annotations_to_dicts_list(cls, annovar_txt_file_like_obj, sample_names_list, chunk_index, chunk_size): annotations_dict_per_variant_list = [] hgvsid_list = [] # read in the header line and normalize the header names reader = csv.reader(annovar_txt_file_like_obj, delimiter='\t') normed_headers_list = cls._normalize_header(next(reader)) # for each row in this chunk--which is to say, each variant for curr_line_fields_list in itertools.islice(reader, (chunk_index * chunk_size), ((chunk_index + 1) * chunk_size)): hgvs_id, annotations_dict_for_curr_variant = cls._parse_single_variant_record( normed_headers_list, curr_line_fields_list, sample_names_list) hgvsid_list.append(hgvs_id) annotations_dict_per_variant_list.append(annotations_dict_for_curr_variant) return hgvsid_list, annotations_dict_per_variant_list
@classmethod def _parse_single_variant_record(cls, normed_headers_list, curr_line_fields_list, sample_names_list): # This code assumes that the VCF-produced format string and the genotype fields string(s) for the sample(s) # will be the last fields on every line and that they will NOT all have their own headers--rather, it # assumes the last header will indicate that the rest of the fields are "other info". Here is a simplified # example: # chr start end ref alt func_knowngene otherinfo # chrM 146 146 T C upstream;downstream 1 61.74 AC=2;AF=1.00;AN=2;DP=2;FS=0.000 GT:AD:DP:GQ:PL 1/1:0,22:22:66:794,66,0 ./.:0,0 1/1:0,40:40:99:1494,119,0 # Note that the content at and after the position of the otherinfo header may list additional information # before the format string and genotype fields info (which are required to be at the end of the line); # any such extra info is ignored. # make a dictionary that pairs every named header *except* the last one with its content in this line last_field_index = len(normed_headers_list) - 1 raw_fields_dict = dict(zip(normed_headers_list[0:last_field_index], curr_line_fields_list[0:last_field_index])) # TODO: someday: perhaps stop limiting this to only the fields in ANNOVAR_OUTPUT_COLS instead of all # For only a limited subset of columns, look those columns up in raw_fields_dict; if they hold real content, # do any clean-up necessary to their values and write them into a new dict cleaned_fields_dict = {} for curr_header in cls._ANNOVAR_OUTPUT_COLS: curr_value = raw_fields_dict[curr_header] if curr_value != ".": curr_value = cls._rewrite_value_if_special_header(curr_header, curr_value) cleaned_fields_dict[curr_header] = curr_value # generate the hgvs id for this variant hgvs_id = myvariant.format_hgvs(cleaned_fields_dict[cls.CHR_HEADER], cleaned_fields_dict[cls.START_HEADER], cleaned_fields_dict[cls.REF_HEADER], cleaned_fields_dict[cls.ALT_HEADER]) # now grab the number-of-samples-plus-one-th field from the *end* of the line--this holds the format # string--and also grab a list of the number-of-samples fields from the *end* of the line--these are # the genotype fields strings for each sample. num_samples_plus_one = len(sample_names_list) + 1 format_string = curr_line_fields_list[-num_samples_plus_one] genotype_field_strings_per_sample = curr_line_fields_list[-len(sample_names_list)::] genotype_field_strings_by_sample_name = dict(zip(sample_names_list, genotype_field_strings_per_sample)) # turn the dictionary of annovar fields into a dictionary of annotations for the variant, including # nested structures containing sample-specific genotype-related info annotations_dict_for_curr_variant = AnnovarAnnotatedVariant.make_per_variant_annotation_dict( cleaned_fields_dict, hgvs_id, format_string, genotype_field_strings_by_sample_name) return hgvs_id, annotations_dict_for_curr_variant @classmethod def _rewrite_value_if_special_header(cls, format_header, field_value): result = field_value # by default, assume no special coddling needed if format_header == cls.CHR_HEADER: if field_value == cls.RAW_CHR_MT_VAL: result = cls.STANDARDIZED_CHR_MT_VAL elif format_header in [cls.THOU_G_2015_ALL_HEADER, cls.ESP6500_ALL_HEADER, cls.NCI60_HEADER]: result = float(field_value) elif format_header in [cls.START_HEADER, cls.END_HEADER]: result = int(field_value) # elif curr_header == cls.CYTOBAND_HEADER: # cytoband_data = CytoBand(curr_value) # result = cytoband_data.fill() elif format_header in [cls.GENOMIC_SUPERDUPS_HEADER, cls.TFBS_CONS_SITES_HEADER]: result = cls._parse_to_dict_with_score_key(field_value) # end checking if this is a field that needs special coddling return result @classmethod def _parse_to_dict_with_score_key(cls, delimited_str): """Parse delimited string of key/value pairs to a dictionary, with Score key's value cast to a float. Args: delimited_str (str): A string of key/value pairs, with each pair delimited from the next by a ';' and, with each pair, the key delimited from the value by an '='. A key named 'Score' must be present. Returns: dict(str, Any): Dictionary of key/value pairs from the input string; the value for the 'Score' key is cast to a float. """ key_val_pairs_as_str = delimited_str.split(";") result = dict(curr_key_val_pair_str.split("=") for curr_key_val_pair_str in key_val_pairs_as_str) result[cls.SCORE_KEY] = float(result[cls.SCORE_KEY]) return result
[docs]class AnnovarAnnotatedVariant(object): GENOTYPE_KEY = 'genotype' FILTER_PASSING_READS_COUNT_KEY = 'filter_passing_reads_count' GENOTYPE_LIKELIHOODS_KEY = 'genotype_likelihoods' SAMPLES_KEY = 'samples' SAMPLE_ID_KEY = 'sample_id' GENOTYPE_SUBCLASS_BY_CLASS_KEY = 'genotype_subclass_by_class' HGVS_ID_KEY = 'hgvs_id' ALLELE_DEPTH_KEY = 'AD' @staticmethod def _list_has_valid_content(a_list): """Return true if list has at least one non-None entry, false otherwise. Args: a_list (List[Any]): The list to evaluate for validity. Returns: bool: True if list has at least one non-None entry, false otherwise. """ is_valid = len(a_list) > 0 and not all(curr_item is None for curr_item in a_list) return is_valid
[docs] @classmethod def make_per_variant_annotation_dict(cls, fields_by_annovar_header, hgvs_id, format_string, genotype_field_strings_by_sample_name): result = fields_by_annovar_header result[cls.HGVS_ID_KEY] = hgvs_id # parse sample-level info into a list of per-sample dicts and add that list to the variant-level dict sample_specific_dicts_list = [] for curr_sample_name, genotype_fields_string_for_curr_sample in genotype_field_strings_by_sample_name.items(): sample_specific_dict = cls._make_per_sample_annotation_dict(curr_sample_name, format_string, genotype_fields_string_for_curr_sample) if sample_specific_dict is not None: sample_specific_dicts_list.append(sample_specific_dict) result[cls.SAMPLES_KEY] = sample_specific_dicts_list return result
@classmethod def _make_per_sample_annotation_dict(cls, sample_name, format_string, genotype_fields_string): if not VCFGenotypeParser.is_valid_genotype_fields_string(genotype_fields_string): return None genotype_info = VCFGenotypeParser.parse(format_string, genotype_fields_string) if genotype_info is None: return None # Always include a genotype key, EVEN IF the value for that key is None result = {cls.SAMPLE_ID_KEY: sample_name, cls.GENOTYPE_KEY: genotype_info.genotype} # for all other keys, include them only if they have meaningful values if genotype_info.genotype is not None: result[cls.GENOTYPE_SUBCLASS_BY_CLASS_KEY] = genotype_info.genotype_subclass_by_class if genotype_info.filter_passing_reads_count is not None: result[cls.FILTER_PASSING_READS_COUNT_KEY] = genotype_info.filter_passing_reads_count genotype_likelihoods_list = [i.likelihood_neg_exponent for i in genotype_info.genotype_likelihoods] if cls._list_has_valid_content(genotype_likelihoods_list): result[cls.GENOTYPE_LIKELIHOODS_KEY] = genotype_likelihoods_list unfiltered_read_depths_list = [i.unfiltered_read_counts for i in genotype_info.alleles] if cls._list_has_valid_content(unfiltered_read_depths_list): result[cls.ALLELE_DEPTH_KEY] = unfiltered_read_depths_list return result
# # Gets its own class because it is particularly pesky to parse # class CytoBand(object): # NAME_KEY = "Name" # CHROMOSOME_KEY = "Chromosome" # BAND_KEY = "Band" # REGION_KEY = "Region" # SUBBAND_KEY = "Sub_Band" # # def __init__(self, cyto_band_name): # # self.letters = set('XY') # = cyto_band_name # self.processed = self.fill() # # def fill(self): # # processed = {self.NAME_KEY:} # spliced = re.split('(\D+)', # \D means "any non-digit", so \D+ means 1 or more non-digit # # if any((c in self.letters) for c in # processed[self.CHROMOSOME_KEY] = spliced[1][0] # processed[self.BAND_KEY] = spliced[1][1] # else: # processed[self.CHROMOSOME_KEY] = int(spliced[0]) # processed[self.BAND_KEY] = spliced[1] # # processed[self.REGION_KEY] = spliced[2] # # if '.' in spliced: # processed[self.SUBBAND_KEY] = spliced[-1] # return processed