Source code for VAPr.chunk_processing

from __future__ import division, print_function

# built-in libraries
import itertools
import logging
import time

# third-party libraries
import myvariant
import pymongo
import vcf

# project libraries
from VAPr.annovar_output_parsing import AnnovarTxtParser, AnnovarAnnotatedVariant


[docs]class AnnotationJobParamsIndices: CHUNK_INDEX_INDEX = 0 FILE_PATH_INDEX = 1 CHUNK_SIZE_INDEX = 2 DB_NAME_INDEX = 3 COLLECTION_NAME_INDEX = 4 GENOME_BUILD_VERSION_INDEX = 5 VERBOSE_LEVEL_INDEX = 6 SAMPLE_LIST_INDEX = 7 # TODO: someday: refactor so one doesn't have to remember to add new indices to the below function
[docs] @classmethod def get_num_possible_indices(cls): max_index = max(cls.CHUNK_INDEX_INDEX, cls.FILE_PATH_INDEX, cls.CHUNK_SIZE_INDEX, cls.DB_NAME_INDEX, cls.COLLECTION_NAME_INDEX, cls.GENOME_BUILD_VERSION_INDEX, cls.VERBOSE_LEVEL_INDEX, cls.SAMPLE_LIST_INDEX) return max_index+1
[docs]def collect_chunk_annotations_and_store(job_params_tuple): db_name = job_params_tuple[AnnotationJobParamsIndices.DB_NAME_INDEX] collection_name = job_params_tuple[AnnotationJobParamsIndices.COLLECTION_NAME_INDEX] variant_dicts_to_store = _collect_chunk_annotations(job_params_tuple) _store_annotations_to_db(variant_dicts_to_store, db_name, collection_name)
def _collect_chunk_annotations(job_params_tuple): chunk_index = job_params_tuple[AnnotationJobParamsIndices.CHUNK_INDEX_INDEX] chunk_size = job_params_tuple[AnnotationJobParamsIndices.CHUNK_SIZE_INDEX] file_path = job_params_tuple[AnnotationJobParamsIndices.FILE_PATH_INDEX] genome_build_version = job_params_tuple[AnnotationJobParamsIndices.GENOME_BUILD_VERSION_INDEX] verbose_level = job_params_tuple[AnnotationJobParamsIndices.VERBOSE_LEVEL_INDEX] with open(file_path, 'r') as input_file_obj: if len(job_params_tuple) > AnnotationJobParamsIndices.SAMPLE_LIST_INDEX: merge_variants = True sample_names_list = job_params_tuple[AnnotationJobParamsIndices.SAMPLE_LIST_INDEX] hgvs_ids_list, annovar_variants = AnnovarTxtParser.read_chunk_of_annotations_to_dicts_list( input_file_obj, sample_names_list, chunk_index, chunk_size) else: merge_variants = False annovar_variants = None hgvs_ids_list = _get_hgvs_ids_from_vcf(input_file_obj, chunk_index, chunk_size) myvariants_variants = _get_myvariantinfo_annotations_dict(hgvs_ids_list, genome_build_version, verbose_level) result = myvariants_variants if merge_variants: result = [] for i in range(0, len(hgvs_ids_list)): result.append(_merge_annovar_and_myvariant_dicts(myvariants_variants[i], annovar_variants[i])) return result def _get_hgvs_ids_from_vcf(vcf_file_obj, chunk_index, chunk_size): reader = vcf.Reader(vcf_file_obj) hgvs_ids = [] for record in itertools.islice(reader, chunk_index * chunk_size, (chunk_index + 1) * chunk_size): hgvs_id = myvariant.format_hgvs(record.CHROM, record.POS, record.REF, str(record.ALT[0])) # ensure syntax consistency for chromosome M variants if AnnovarTxtParser.RAW_CHR_MT_SUFFIX_VAL in hgvs_id: one = hgvs_id.split(':')[0] two = hgvs_id.split(':')[1] if AnnovarTxtParser.STANDARDIZED_CHR_MT_SUFFIX_VAL not in one: one = AnnovarTxtParser.STANDARDIZED_CHR_MT_VAL hgvs_id = "".join([one, ':', two]) hgvs_ids.append(hgvs_id) return hgvs_ids # TODO: someday: refactor myvariant fields into external file so easy to modify which are pulled def _get_myvariantinfo_annotations_dict(hgvs_ids_list, genome_build_version, verbose_level, num_failed_attempts=0): """ Retrieve variants from MyVariant.info""" max_failed_attempts = 5 myvariant_fields = [ 'cadd.1000g', 'cadd.esp', 'cadd.phred', 'cadd.gerp', 'cadd.polyphen', 'cadd.sift', 'dbsnp.rsid', 'cosmic.cosmic_id', 'cosmic.tumor_site', 'clinvar.rcv.accession', 'clinvar.rcv.clinical_significance', 'clinvar.rcv.conditions', 'civic.description', 'civic.evidence_items', 'cgi', 'gwassnps', 'wellderly.alleles' ] be_verbose = verbose_level >= 2 mv = myvariant.MyVariantInfo() try: myvariantinfo_dicts_list = mv.getvariants(hgvs_ids_list, verbose=int(be_verbose), as_dataframe=False, fields=myvariant_fields, assembly=genome_build_version) except ValueError as unrecoverable_error: # If myvariant.info returned a value error, recalling with the same values won't help so error out now raise unrecoverable_error except Exception as error: # If we got something other than a value error, problem may be with internet connection or myvariant.info # availability, so try again a couple of times just in case we can recover logging.info('Error: ' + str(error) + 'while fetching from MyVariant') num_failed_attempts += 1 if num_failed_attempts < max_failed_attempts: time.sleep(5) logging.info("Retrying MyVariant.info fetch") myvariantinfo_dicts_list = _get_myvariantinfo_annotations_dict(hgvs_ids_list, genome_build_version, verbose_level, num_failed_attempts) else: # give up and raise error raise error myvariantinfo_dicts_list = _remove_unwanted_keys(myvariantinfo_dicts_list) return myvariantinfo_dicts_list def _remove_unwanted_keys(myvariantinfo_dicts_list): for curr_myvariantinfo_dict in myvariantinfo_dicts_list: # Put the contents of the query key into a new hgvs_id key; NB, use AnnovarAnnotatedVariant.HGVS_ID_KEY # as the value of this key because later, when we combine this dict with the annovar-generated dict (if any), # we want to make sure the keys are duplicates and thus collapse into one. curr_myvariantinfo_dict[AnnovarAnnotatedVariant.HGVS_ID_KEY] = curr_myvariantinfo_dict.pop("query", None) # Also, just drop the _id key--we want mongo db to create its own _id key. curr_myvariantinfo_dict.pop("_id", None) return myvariantinfo_dicts_list def _merge_annovar_and_myvariant_dicts(myvariantinfo_annotations_dict, annovar_annotations_dict): hgvs_id_key = AnnovarAnnotatedVariant.HGVS_ID_KEY if myvariantinfo_annotations_dict[hgvs_id_key] != annovar_annotations_dict[hgvs_id_key]: raise ValueError( "myvariant HGVS id '{0}' not equal to annovar HGVS id '{1}'".format( myvariantinfo_annotations_dict[hgvs_id_key], annovar_annotations_dict[hgvs_id_key])) annovar_annotations_dict.update(myvariantinfo_annotations_dict) return annovar_annotations_dict def _store_annotations_to_db(annotation_dicts_list, db_name, collection_name, client=None, num_failed_attempts=0): max_failed_attempts = 5 if client is None: client = pymongo.MongoClient(maxPoolSize=None, waitQueueTimeoutMS=200) db = getattr(client, db_name) collection = getattr(db, collection_name) logging.info('Parsing Buffer...') if len(annotation_dicts_list) == 0: logging.info('List of annotations to store is empty; continuing.') return try: collection.insert_many(annotation_dicts_list, ordered=False) except Exception as error: if "Connection refused" in str(error) and num_failed_attempts < max_failed_attempts: num_failed_attempts += 1 logging.error('Error connecting to mongodb: ' + str(error)) logging.info('Retrying connection to mongodb') time.sleep(4) _store_annotations_to_db(annotation_dicts_list, db_name, collection_name, client=client, num_failed_attempts=num_failed_attempts) # Recurse! else: error_msg = "Encountered error '{0}' when attempting to store the following annotations " \ "to mongo db collection '{1}': '{2}'".format(str(error), collection.full_name, annotation_dicts_list) raise RuntimeError(error_msg) try: client.close() except: pass # if the client is already closed, just move along