Source code for VAPr.vapr_core

from __future__ import division, print_function

# built-in libraries
import logging
import multiprocessing
import os
import pymongo
import re
import tqdm
import warnings

# third-party libraries
import pandas
import vcf

# project libraries
import VAPr.vcf_merging
import VAPr.annovar_running
import VAPr.filtering
import VAPr.chunk_processing
from VAPr.annovar_output_parsing import AnnovarTxtParser

[docs]class VaprDataset(object): def __init__(self, mongo_db_name, mongo_collection_name, merged_vcf_path=None): """Class that contains methods to interact with a parsed database of variants Args: mongo_db_name(str): database name mongo_collection_name(str): collection name merged_vcf_path(str): path to merged vcf """ self._mongo_db_name = mongo_db_name self._mongo_collection_name = mongo_collection_name self._merged_vcf_path = merged_vcf_path self._mongo_client = pymongo.MongoClient(maxPoolSize=None, waitQueueTimeoutMS=200) self._mongo_db = getattr(self._mongo_client, self._mongo_db_name) self._mongo_db_collection = getattr(self._mongo_db, self._mongo_collection_name) @property def full_name(self): """Full name of database and collection Args: Returns: str: Full name of database and collection """ return self._mongo_db_collection.full_name @property def is_empty(self): """If there are no records in the collection, returns True Args: Returns: bool: if there are no records in the collection, returns True """ return self._mongo_db_collection.count() == 0 @property def num_records(self): """Number of records in MongoDB collection Args: Returns: int: Number of records in MongoDB collection """ return self._mongo_db_collection.count()
[docs] def get_rare_deleterious_variants(self, sample_names_list=None): """See :ref:`rare-del-variants` for more information on how this is implemented Args: sample_names_list(list: list, optional): list of samples to draw variants from (Default value = None) Returns: list: list of variants """ return self._get_filtered_variants_by_sample(VAPr.filtering.make_rare_deleterious_variants_filter, sample_names_list)
[docs] def get_known_disease_variants(self, sample_names_list=None): """See :ref:`known-disease` for more information on how this is implemented Args: sample_names_list(list: list, optional): list of samples to draw variants from (Default value = None) Returns: list: list of variants """ return self._get_filtered_variants_by_sample(VAPr.filtering.make_known_disease_variants_filter, sample_names_list)
[docs] def get_deleterious_compound_heterozygous_variants(self, sample_names_list=None): """See :ref:`del-compound` for more information on how this is implemented Args: sample_names_list(list: list, optional): list of samples to draw variants from (Default value = None) Returns: list: list of variants """ return self._get_filtered_variants_by_sample( VAPr.filtering.make_deleterious_compound_heterozygous_variants_filter, sample_names_list)
[docs] def get_de_novo_variants(self, proband, ancestor1, ancestor2): """See :ref:`de-novo` for more information on how this is implemented Args: proband(str): proband variant ancestor1(str): ancestor #1 variant ancestor2(str): ancestor #2 variant Returns: list: list of variants """ filter_dict = VAPr.filtering.make_de_novo_variants_filter(proband, ancestor1, ancestor2) return self.get_custom_filtered_variants(filter_dict)
[docs] def get_custom_filtered_variants(self, filter_dictionary): """See :ref:`custom-filter` for more information on how to implement Args: filter_dictionary(dictionary: dict): mongodb custom filter Returns: list: list of variants """ if self.is_empty: warnings.warn("Dataset '{0}' is empty, so all filters return an empty list.".format(self.full_name)) return list(self._mongo_db_collection.find(filter_dictionary))
[docs] def get_distinct_sample_ids(self): """Self-explanatory Args: Returns: list: list of sample ids """ result = self._mongo_db_collection.distinct(VAPr.filtering.SAMPLE_ID_SELECTOR) return result
[docs] def get_all_variants(self): """Self-explanatory Args: Returns: list: list of variants """ return self.get_custom_filtered_variants({})
[docs] def get_variants_for_sample(self, sample_name): """Return variants for a specific sample Args: sample_name(str): name of sample Returns: list: list of variants """ filter_dict = VAPr.filtering.get_sample_id_filter(sample_name) return self.get_custom_filtered_variants(filter_dict)
[docs] def get_variants_for_samples(self, specific_sample_names): """Return variants from multiple samples Args: specific_sample_names(list): name of samples Returns: list: list of variants """ filter_dict = VAPr.filtering.get_any_of_sample_ids_filter(specific_sample_names) return self.get_custom_filtered_variants(filter_dict)
[docs] def get_variants_as_dataframe(self, filtered_variants=None): """Utility to get a dataframe from variants, either all of them or a filtered subset Args: filtered_variants: a list of variants (Default value = None) Returns: pandas.DataFrame """ if filtered_variants is None: filtered_variants = self.get_all_variants() result = pandas.DataFrame(filtered_variants) # remove the object id as it is different every time and internal to database result.drop('_id', axis=1, inplace=True) return result
[docs] def write_unfiltered_annotated_csv(self, output_fp): """Full csv file containing annotations from both annovar and Args: output_fp(str): Output file path Returns: None """ all_variants = self.get_all_variants() self._write_annotated_csv("write_unfiltered_annotated_csv", all_variants, output_fp)
[docs] def write_filtered_annotated_csv(self, filtered_variants, output_fp): """Filtered csv file containing annotations from a list passed to it, coming from MongoDB Args: filtered_variants(list): variants coming from MongoDB output_fp(str): Output file path Returns: None """ self._write_annotated_csv("write_filtered_annotated_csv", filtered_variants, output_fp)
[docs] def write_unfiltered_annotated_vcf(self, vcf_output_path, info_out=True): """Filtered vcf file containing annotations from a list passed to it, coming from MongoDB Args: vcf_output_path(str): Output file path info_out: if True, extra annotation information will be written to the vcf file (Default value = True) info_out: bool (Default value = True) Returns: None """ filtered_variants = self.get_all_variants() self._write_annotated_vcf(filtered_variants, vcf_output_path, info_out=info_out)
[docs] def write_filtered_annotated_vcf(self, filtered_variants, vcf_output_path, info_out=True): """ Args: filtered_variants(list): variants coming from MongoDB vcf_output_path(str): Output file path info_out: if True, extra annotation information will be written to the vcf file (Default value = True) info_out: bool (Default value = True) Returns: None """ self._write_annotated_vcf(filtered_variants, vcf_output_path, info_out=info_out)
[docs] def write_unfiltered_annotated_csvs_per_sample(self, output_dir): """ Args: output_dir: return: None Returns: None """ sample_ids_list = self.get_distinct_sample_ids() for curr_sample_id in sample_ids_list: variant_dicts_list = self.get_variants_for_sample(curr_sample_id) curr_output_fp = os.path.join(output_dir, curr_sample_id + 'unfiltered_annotated_variants.csv') self.write_filtered_annotated_csv(variant_dicts_list, curr_output_fp) self._warn_if_no_output("write_unfiltered_annotated_csvs_per_sample", sample_ids_list)
def _write_annotated_csv(self, func_name, filtered_variants, output_fp): no_output = self._warn_if_no_output(func_name, filtered_variants) if not no_output: dataframe = self.get_variants_as_dataframe(filtered_variants) dataframe.to_csv(output_fp) def _get_filtered_variants_by_sample(self, filter_builder_func, sample_names=None): if sample_names is not None and not isinstance(sample_names, list): sample_names = [sample_names] filter_dict = filter_builder_func(sample_names) return self.get_custom_filtered_variants(filter_dict) def _write_annotated_vcf(self, filtered_variants_dicts_list, vcf_output_path, info_out=True): if self._merged_vcf_path is None: raise ValueError("Original vcf file (to be used as template for output vcf) is not set.") # match at least one character of anything but a : followed by :g. followed by at least one digit followed # by at least one NOT digit followed by the end of the line hgvs_regex = r"^([^:]+):g\.(\d+)[^\d].*$" vcf_template_path = VAPr.vcf_merging.bgzip_and_index_vcf(self._merged_vcf_path) # This open is done using the filename rather than passing a file handle directly (as is done elsewhere) # because compressed files must be opened with 'rb' while regular files must be opened with 'r'; # vcf.Reader will work this out for itself if you pass the file name and let it do the opening. # The slight drawback here is that vcf.Reader doesn't clean up after itself well: it leaves its file # handle open after use, causing a niggling ResourceWarning: unclosed file warning. vcf_reader = vcf.Reader(filename=vcf_template_path) vcf_writer = vcf.Writer(open(vcf_output_path, 'w'), vcf_reader) for curr_record_dict in filtered_variants_dicts_list: curr_hgvs_id = curr_record_dict["hgvs_id"] match_obj = re.match(hgvs_regex, curr_hgvs_id) curr_chrom =, "") if curr_chrom == AnnovarTxtParser.STANDARDIZED_CHR_MT_VAL: curr_chrom = AnnovarTxtParser.RAW_CHR_MT_VAL curr_start = int( for record in vcf_reader.fetch(curr_chrom, curr_start - 1, curr_start + 1): if info_out is True: record.INFO.update(curr_record_dict) vcf_writer.write_record(record) vcf_writer.close() self._warn_if_no_output("write_unfiltered_annotated_csvs_per_sample", filtered_variants_dicts_list) def _warn_if_no_output(self, output_func_name, items_list): no_output = False if len(items_list) == 0: no_output = True warnings.warn("{0} wrote no file(s) because no relevant samples were found in dataset '{1}'.".format( output_func_name, self._mongo_db_collection.full_name)) return no_output
[docs]class VaprAnnotator(object): """Class in charge of gathering requirements, finding files, downloading databases required to run the annotation Args: input_dir(str): Input directory to vcf files output_dir(str): Output directory to annotated vcf files mongo_db_name(str): Name of the database to which you'll store the collection of variants mongo_collection_name(str): Name of the collection to which you'd store the annotated variants annovar_install_path(str): Path to locally installed annovar scripts design_file(str): path to csv design file build_ver(str): genome build version to which annotation will be done against. Either `hg19` or `hg38` vcfs_gzipped(bool): if the vcf files are gzipped, set to True Returns: """ SAMPLE_NAMES_KEY = "Sample_Names" HG19_VERSION = "hg19" HG38_VERSION = "hg38" DEFAULT_GENOME_VERSION = HG19_VERSION SUPPORTED_GENOME_BUILD_VERSIONS = [HG19_VERSION, HG38_VERSION] @staticmethod def _get_num_lines_in_file(file_path): with open(file_path) as file_obj: result = sum(1 for _ in file_obj) return result @staticmethod def _make_jobs_params_tuples_list(file_path, num_file_lines, chunk_size, db_name, collection_name, genome_build_version, sample_names_list=None, verbose_level=1): num_params = VAPr.chunk_processing.AnnotationJobParamsIndices.get_num_possible_indices() if sample_names_list is not None: shared_job_params = [None] * num_params shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.SAMPLE_LIST_INDEX] = sample_names_list else: shared_job_params = [None] * (num_params - 1) shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.CHUNK_SIZE_INDEX] = chunk_size shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.FILE_PATH_INDEX] = file_path shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.DB_NAME_INDEX] = db_name shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.COLLECTION_NAME_INDEX] = collection_name shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.GENOME_BUILD_VERSION_INDEX] = \ genome_build_version shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.VERBOSE_LEVEL_INDEX] = verbose_level jobs_params_tuples_list = [] num_steps = int(num_file_lines / chunk_size) + 1 for curr_chunk_index in range(num_steps): shared_job_params[VAPr.chunk_processing.AnnotationJobParamsIndices.CHUNK_INDEX_INDEX] = curr_chunk_index curr_job_params_tuple = tuple(shared_job_params) jobs_params_tuples_list.append(curr_job_params_tuple) return jobs_params_tuples_list @classmethod def _get_validated_genome_version(cls, input_genome_build_version): if input_genome_build_version is None: result = cls.DEFAULT_GENOME_VERSION elif input_genome_build_version not in cls.SUPPORTED_GENOME_BUILD_VERSIONS: str_of_acceptable_versions = ", ".join(cls.SUPPORTED_GENOME_BUILD_VERSIONS) raise ValueError("Input genome build version '{0}' is not recognized. Supported builds are {1}".format( input_genome_build_version, str_of_acceptable_versions)) else: result = input_genome_build_version return result @classmethod def _make_merged_vcf(cls, input_dir, output_dir, analysis_name, design_file, vcfs_gzipped): vcf_file_paths_list = None if design_file is not None: design_df = pandas.read_csv(design_file) vcf_file_paths_list = design_df[cls.SAMPLE_NAMES_KEY].tolist() result = VAPr.vcf_merging.merge_vcfs(input_dir, output_dir, analysis_name, vcf_file_paths_list, vcfs_gzipped) return result def __init__(self, input_dir, output_dir, mongo_db_name, mongo_collection_name, annovar_install_path=None, design_file=None, build_ver=None, vcfs_gzipped=False): self._input_dir = input_dir self._output_dir = output_dir self._mongo_db_name = mongo_db_name self._mongo_collection_name = mongo_collection_name self._analysis_name = mongo_db_name + "_" + mongo_collection_name self._path_to_annovar_install = annovar_install_path self._design_file = design_file self._vcfs_gzipped = vcfs_gzipped self._genome_build_version = self._get_validated_genome_version(build_ver) self._single_vcf_path = self._make_merged_vcf(self._input_dir, self._output_dir, self._analysis_name, self._design_file, self._vcfs_gzipped) self._output_basename = os.path.splitext(os.path.basename(self._single_vcf_path))[0] # This open is done using the filename rather than passing a file handle directly (as is done elsewhere) # because compressed files must be opened with 'rb' while regular files must be opened with 'r'; # vcf.Reader will work this out for itself if you pass the file name and let it do the opening. # The slight drawback here is that vcf.Reader doesn't clean up after itself well: it leaves its file # handle open after use, causing a niggling ResourceWarning: unclosed file warning. vcf_reader = vcf.Reader(filename=self._single_vcf_path) self._sample_names_list = vcf_reader.samples # TODO: someday: put back the functionality for custom annovar dbs? self._annovar_wrapper = None if self._path_to_annovar_install is not None: self._annovar_wrapper = VAPr.annovar_running.AnnovarWrapper( self._path_to_annovar_install, genome_build_version=self._genome_build_version, custom_annovar_dbs_to_use=None) try: os.mkdir(output_dir) except OSError:'Output directory %s for analysis already exists; using existing directory' % output_dir)
[docs] def download_annovar_databases(self): """ Needed for ANNOVAR to run, it will download the required databases Args: Returns: """ if self._path_to_annovar_install is None: raise ValueError("No ANNOVAR install path provided.") self._annovar_wrapper.download_databases()
[docs] def annotate_lite(self, num_processes=8, chunk_size=2000, verbose_level=1, allow_adds=False): """'Lite' Annotation: it will query ` <>`_ only, without generating annotations from Annovar. It requires solely VAPr to be installed. The execution will grab the HGVS ids from the vcf files and query the variant data from .. warnings:: It is subject to the issue of potentially having completely empty data for some of the variants, and inability to run native VAPr queries on the data. It will return the class :class:`~VAPr.vapr_core.VaprDataset`, which can then be used for downstream filtering and analysis. Args: num_processes(int, optional): number of parallel processes. Defaults to 8 chunk_size(int, optional): int number of variants to be processed at once. Defaults to 2000 verbose_level(int, optional): int higher verbosity will give more feedback, raise to 2 or 3 when debugging. Defaults to 1 allow_adds(bool, optional): bool Allow adding new variants to a pre-existing Mongo collection, or overwrite it (Default value = False) Returns: class:`~VAPr.vapr_core.VaprDataset` """ result = self._make_dataset_for_results("annotate_lite", allow_adds) self._collect_annotations_and_store(self._single_vcf_path, chunk_size, num_processes, sample_names_list=None, verbose_level=verbose_level) return result
[docs] def annotate(self, num_processes=4, chunk_size=2000, verbose_level=1, allow_adds=False): """This is the main function of the package. It will run Annovar beforehand, and will kick-start the full annotation functionality. Namely, it will collect all the variant data from Annovar annotations, combine it with data coming from, and parse it to MongoDB, in the database and collection specified in project_data. It will return the class :class:`~VAPr.vapr_core.VaprDataset`, which can then be used for downstream filtering and analysis. Args: num_processes(int, optional): number of parallel processes. Defaults to 8 chunk_size(int, optional): int number of variants to be processed at once. Defaults to 2000 verbose_level(int, optional): int higher verbosity will give more feedback, raise to 2 or 3 when debugging. Defaults to 1 allow_adds(bool, optional): bool Allow adding new variants to a pre-existing Mongo collection, or overwrite it (Default value = False) Returns: class: class:`~VAPr.vapr_core.VaprDataset` """ if self._path_to_annovar_install is None: raise ValueError("No ANNOVAR install path provided.") result = self._make_dataset_for_results("annotate", allow_adds) annovar_output_fp = self._annovar_wrapper.run_annotation(self._single_vcf_path, self._output_basename, self._output_dir) self._collect_annotations_and_store(annovar_output_fp, chunk_size, num_processes, sample_names_list=self._sample_names_list, verbose_level=verbose_level) return result
def _make_dataset_for_results(self, func_name, allow_adds): result = VaprDataset(self._mongo_db_name, self._mongo_collection_name, self._single_vcf_path) if not result.is_empty: msg_prefix = "Dataset '{0}' already contains {1} records".format(result.full_name, result.num_records) if allow_adds:"{0}; adding to this dataset.".format(msg_prefix)) else: error_msg = "{0}, but writing into an already-filled dataset is disallowed by default. " \ "Either create a VaprAnnotator with a new collection name, clear your existing collection " \ "manually, or (if you definitely wish to add to an existing dataset), rerun {1} with the " \ "'allow_adds' parameter set to True.".format(msg_prefix, func_name) raise ValueError(error_msg) return result # TODO: someday: extra_data from design file needs to come back in here def _collect_annotations_and_store(self, file_path, chunk_size, num_processes, sample_names_list=None, verbose_level=1): num_file_lines = self._get_num_lines_in_file(file_path) jobs_params_tuples_list = self._make_jobs_params_tuples_list( file_path, num_file_lines, chunk_size, self._mongo_db_name, self._mongo_collection_name, self._genome_build_version, sample_names_list, verbose_level) pool = multiprocessing.Pool(num_processes) for _ in tqdm.tqdm( pool.imap_unordered( VAPr.chunk_processing.collect_chunk_annotations_and_store, jobs_params_tuples_list), total=len(jobs_params_tuples_list)): pass pool.close() pool.join()