# standard libraries
import os
import tempfile
import unittest
import warnings
# project-specific libraries
import VAPr.vcf_merging as ns_merge
import VAPr.tests.test_vcf_merging as ns_merge_test
import VAPr.filtering as ns_filter
import VAPr.vapr_core as ns_test
[docs]def help_make_filter(sample_names=None):
result = {"1000g2015aug_all": {"$gt": 0.05}}
if sample_names is not None:
and_list = [result]
result = ns_filter._append_sample_id_constraint_if_needed(and_list, sample_names)
return result
[docs]class TestVaprDataset(unittest.TestCase):
# This effectively created a test database with the documents in mongo required by the other tests,
# not much else is needed to be tested here.
[docs] @classmethod
def setUpClass(cls):
warnings.simplefilter("always")
cls.test_file_dir, cls.test_bgzipped_fps = ns_merge_test.help_get_test_file_info()
cls.var1 = {"hgvs_id": "chr1:g.1000A>C",
"cadd":
{"esp": {"af": 0.05},
"phred": 11},
"func_knowngene": "exonic",
"1000g2015aug_all": 0.05,
"exonicfunc_knowngene": "nonsynonymous SNV",
"clinvar":
{"rcv":
{"accession": "ABC123",
"clinical_significance": "Pathogenic"}},
"cosmic": {"cosmic_id": "XYZ789"},
"samples": {"sample_id": "sample1"}}
cls.var2 = {"hgvs_id": "chr1:g.2000G>T",
"cadd":
{"esp": {"af": 0.05}},
"func_knowngene": "intronic",
"1000g2015aug_all": 0.06,
"exonicfunc_knowngene": "nonsynonymous SNV",
"cosmic": {"cosmic_id": "XYZ789"},
"samples": {"sample_id": "sample2"}}
cls.var3 = {"hgvs_id": "chr1:g.3000T>A",
"cadd":
{"esp": {"af": 0.95},
"phred": 40},
"func_knowngene": "exonic",
"1000g2015aug_all": 0.05,
"exonicfunc_knowngene": "synonymous SNV",
"genotype_subclass_by_class": {"heterozygous": "compound"},
"samples": {"sample_id": "sample3"}}
cls._db_name = "queries_test"
cls._collection_name = "collect"
[docs] def test__get_filtered_variants_by_sample_all(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
real_output = test_dataset._get_filtered_variants_by_sample(help_make_filter)
self.assertListEqual([self.var2["hgvs_id"]], [x['hgvs_id'] for x in real_output])
[docs] def test__get_filtered_variants_by_sample_some(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
real_output = test_dataset._get_filtered_variants_by_sample(help_make_filter, ["sample1", "sample3"])
self.assertListEqual([], real_output)
[docs] def test__get_filtered_variants_by_sample_single(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
real_output = test_dataset._get_filtered_variants_by_sample(help_make_filter, ["sample2"])
self.assertListEqual([self.var2["hgvs_id"]], [x['hgvs_id'] for x in real_output])
[docs] def test__warn_if_no_output_true(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
with warnings.catch_warnings(record=True) as warning_set:
real_output = test_dataset._warn_if_no_output("test", [])
self.assertTrue(real_output)
self.assertEqual(1, len(warning_set))
warn_msg = str(warning_set[-1].message)
expected_msg = "test wrote no file(s) because no relevant samples were found in dataset " \
"'queries_test.collect'."
self.assertEqual(expected_msg, warn_msg)
[docs] def test__warn_if_no_output_false(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
with warnings.catch_warnings(record=True) as warning_set:
real_output = test_dataset._warn_if_no_output("test", ["hi", "there"])
self.assertFalse(real_output)
self.assertEqual(0, len(warning_set))
[docs] def test__write_annotated_vcf(self):
expected_contents = """##fileformat=VCFv4.1
##fileDate=20150218
##reference=ftp://ftp.1000genomes.ebi.ac.uk//vol1/ftp/technical/reference/phase2_reference_assembly_sequence/hs37d5.fa.gz
##source=1000GenomesPhase3Pipeline
##bcftools_viewVersion=1.6+htslib-1.6
##bcftools_viewCommand=view -c1 -Oz -s HG00096 -o G1000_chr1_10000_20000.HG00096.vcf.gz G1000_chr1_10000_20000.vcf.gz; Date=Mon Nov 6 15:48:17 2017
##INFO=<ID=CIEND,Number=2,Type=Integer,Description="Confidence interval around END for imprecise variants">
##INFO=<ID=CIPOS,Number=2,Type=Integer,Description="Confidence interval around POS for imprecise variants">
##INFO=<ID=CS,Number=1,Type=String,Description="Source call set.">
##INFO=<ID=END,Number=1,Type=Integer,Description="End coordinate of this variant">
##INFO=<ID=IMPRECISE,Number=0,Type=Flag,Description="Imprecise structural variation">
##INFO=<ID=MC,Number=.,Type=String,Description="Merged calls.">
##INFO=<ID=MEINFO,Number=4,Type=String,Description="Mobile element info of the form NAME,START,END<POLARITY; If there is only 5' OR 3' support for this call, will be NULL NULL for START and END">
##INFO=<ID=MEND,Number=1,Type=Integer,Description="Mitochondrial end coordinate of inserted sequence">
##INFO=<ID=MLEN,Number=1,Type=Integer,Description="Estimated length of mitochondrial insert">
##INFO=<ID=MSTART,Number=1,Type=Integer,Description="Mitochondrial start coordinate of inserted sequence">
##INFO=<ID=SVLEN,Number=.,Type=Integer,Description="SV length. It is only calculated for structural variation MEIs. For other types of SVs; one may calculate the SV length by INFO:END-START+1, or by finding the difference between lengthes of REF and ALT alleles">
##INFO=<ID=SVTYPE,Number=1,Type=String,Description="Type of structural variant">
##INFO=<ID=TSD,Number=1,Type=String,Description="Precise Target Site Duplication for bases, if unknown, value will be NULL">
##INFO=<ID=AC,Number=A,Type=Integer,Description="Total number of alternate alleles in called genotypes">
##INFO=<ID=AF,Number=A,Type=Float,Description="Estimated allele frequency in the range (0,1)">
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of samples with data">
##INFO=<ID=AN,Number=1,Type=Integer,Description="Total number of alleles in called genotypes">
##INFO=<ID=EAS_AF,Number=A,Type=Float,Description="Allele frequency in the EAS populations calculated from AC and AN, in the range (0,1)">
##INFO=<ID=EUR_AF,Number=A,Type=Float,Description="Allele frequency in the EUR populations calculated from AC and AN, in the range (0,1)">
##INFO=<ID=AFR_AF,Number=A,Type=Float,Description="Allele frequency in the AFR populations calculated from AC and AN, in the range (0,1)">
##INFO=<ID=AMR_AF,Number=A,Type=Float,Description="Allele frequency in the AMR populations calculated from AC and AN, in the range (0,1)">
##INFO=<ID=SAS_AF,Number=A,Type=Float,Description="Allele frequency in the SAS populations calculated from AC and AN, in the range (0,1)">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total read depth; only low coverage data were counted towards the DP, exome data were not used">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele. Format: AA|REF|ALT|IndelType. AA: Ancestral allele, REF:Reference Allele, ALT:Alternate Allele, IndelType:Type of Indel (REF, ALT and IndelType are only defined for indels)">
##INFO=<ID=VT,Number=.,Type=String,Description="indicates what type of variant the line represents">
##INFO=<ID=EX_TARGET,Number=0,Type=Flag,Description="indicates whether a variant is within the exon pull down target boundaries">
##INFO=<ID=MULTI_ALLELIC,Number=0,Type=Flag,Description="indicates whether a site is multi-allelic">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FILTER=<ID=PASS,Description="All filters passed">
##ALT=<ID=CNV,Description="Copy Number Polymorphism">
##ALT=<ID=DEL,Description="Deletion">
##ALT=<ID=DUP,Description="Duplication">
##ALT=<ID=INS:ME:ALU,Description="Insertion of ALU element">
##ALT=<ID=INS:ME:LINE1,Description="Insertion of LINE1 element">
##ALT=<ID=INS:ME:SVA,Description="Insertion of SVA element">
##ALT=<ID=INS:MT,Description="Nuclear Mitochondrial Insertion">
##ALT=<ID=INV,Description="Inversion">
##ALT=<ID=CN0,Description="Copy number allele: 0 copies">
##ALT=<ID=CN1,Description="Copy number allele: 1 copy">
##ALT=<ID=CN2,Description="Copy number allele: 2 copies">
##ALT=<ID=CN3,Description="Copy number allele: 3 copies">
##ALT=<ID=CN4,Description="Copy number allele: 4 copies">
##ALT=<ID=CN5,Description="Copy number allele: 5 copies">
##ALT=<ID=CN6,Description="Copy number allele: 6 copies">
##ALT=<ID=CN7,Description="Copy number allele: 7 copies">
##ALT=<ID=CN8,Description="Copy number allele: 8 copies">
##ALT=<ID=CN9,Description="Copy number allele: 9 copies">
##ALT=<ID=CN10,Description="Copy number allele: 10 copies">
##ALT=<ID=CN11,Description="Copy number allele: 11 copies">
##ALT=<ID=CN12,Description="Copy number allele: 12 copies">
##ALT=<ID=CN13,Description="Copy number allele: 13 copies">
##ALT=<ID=CN14,Description="Copy number allele: 14 copies">
##ALT=<ID=CN15,Description="Copy number allele: 15 copies">
##ALT=<ID=CN16,Description="Copy number allele: 16 copies">
##ALT=<ID=CN17,Description="Copy number allele: 17 copies">
##ALT=<ID=CN18,Description="Copy number allele: 18 copies">
##ALT=<ID=CN19,Description="Copy number allele: 19 copies">
##ALT=<ID=CN20,Description="Copy number allele: 20 copies">
##ALT=<ID=CN21,Description="Copy number allele: 21 copies">
##ALT=<ID=CN22,Description="Copy number allele: 22 copies">
##ALT=<ID=CN23,Description="Copy number allele: 23 copies">
##ALT=<ID=CN24,Description="Copy number allele: 24 copies">
##ALT=<ID=CN25,Description="Copy number allele: 25 copies">
##ALT=<ID=CN26,Description="Copy number allele: 26 copies">
##ALT=<ID=CN27,Description="Copy number allele: 27 copies">
##ALT=<ID=CN28,Description="Copy number allele: 28 copies">
##ALT=<ID=CN29,Description="Copy number allele: 29 copies">
##ALT=<ID=CN30,Description="Copy number allele: 30 copies">
##ALT=<ID=CN31,Description="Copy number allele: 31 copies">
##ALT=<ID=CN32,Description="Copy number allele: 32 copies">
##ALT=<ID=CN33,Description="Copy number allele: 33 copies">
##ALT=<ID=CN34,Description="Copy number allele: 34 copies">
##ALT=<ID=CN35,Description="Copy number allele: 35 copies">
##ALT=<ID=CN36,Description="Copy number allele: 36 copies">
##ALT=<ID=CN37,Description="Copy number allele: 37 copies">
##ALT=<ID=CN38,Description="Copy number allele: 38 copies">
##ALT=<ID=CN39,Description="Copy number allele: 39 copies">
##ALT=<ID=CN40,Description="Copy number allele: 40 copies">
##ALT=<ID=CN41,Description="Copy number allele: 41 copies">
##ALT=<ID=CN42,Description="Copy number allele: 42 copies">
##ALT=<ID=CN43,Description="Copy number allele: 43 copies">
##ALT=<ID=CN44,Description="Copy number allele: 44 copies">
##ALT=<ID=CN45,Description="Copy number allele: 45 copies">
##ALT=<ID=CN46,Description="Copy number allele: 46 copies">
##ALT=<ID=CN47,Description="Copy number allele: 47 copies">
##ALT=<ID=CN48,Description="Copy number allele: 48 copies">
##ALT=<ID=CN49,Description="Copy number allele: 49 copies">
##ALT=<ID=CN50,Description="Copy number allele: 50 copies">
##ALT=<ID=CN51,Description="Copy number allele: 51 copies">
##ALT=<ID=CN52,Description="Copy number allele: 52 copies">
##ALT=<ID=CN53,Description="Copy number allele: 53 copies">
##ALT=<ID=CN54,Description="Copy number allele: 54 copies">
##ALT=<ID=CN55,Description="Copy number allele: 55 copies">
##ALT=<ID=CN56,Description="Copy number allele: 56 copies">
##ALT=<ID=CN57,Description="Copy number allele: 57 copies">
##ALT=<ID=CN58,Description="Copy number allele: 58 copies">
##ALT=<ID=CN59,Description="Copy number allele: 59 copies">
##ALT=<ID=CN60,Description="Copy number allele: 60 copies">
##ALT=<ID=CN61,Description="Copy number allele: 61 copies">
##ALT=<ID=CN62,Description="Copy number allele: 62 copies">
##ALT=<ID=CN63,Description="Copy number allele: 63 copies">
##ALT=<ID=CN64,Description="Copy number allele: 64 copies">
##ALT=<ID=CN65,Description="Copy number allele: 65 copies">
##ALT=<ID=CN66,Description="Copy number allele: 66 copies">
##ALT=<ID=CN67,Description="Copy number allele: 67 copies">
##ALT=<ID=CN68,Description="Copy number allele: 68 copies">
##ALT=<ID=CN69,Description="Copy number allele: 69 copies">
##ALT=<ID=CN70,Description="Copy number allele: 70 copies">
##ALT=<ID=CN71,Description="Copy number allele: 71 copies">
##ALT=<ID=CN72,Description="Copy number allele: 72 copies">
##ALT=<ID=CN73,Description="Copy number allele: 73 copies">
##ALT=<ID=CN74,Description="Copy number allele: 74 copies">
##ALT=<ID=CN75,Description="Copy number allele: 75 copies">
##ALT=<ID=CN76,Description="Copy number allele: 76 copies">
##ALT=<ID=CN77,Description="Copy number allele: 77 copies">
##ALT=<ID=CN78,Description="Copy number allele: 78 copies">
##ALT=<ID=CN79,Description="Copy number allele: 79 copies">
##ALT=<ID=CN80,Description="Copy number allele: 80 copies">
##ALT=<ID=CN81,Description="Copy number allele: 81 copies">
##ALT=<ID=CN82,Description="Copy number allele: 82 copies">
##ALT=<ID=CN83,Description="Copy number allele: 83 copies">
##ALT=<ID=CN84,Description="Copy number allele: 84 copies">
##ALT=<ID=CN85,Description="Copy number allele: 85 copies">
##ALT=<ID=CN86,Description="Copy number allele: 86 copies">
##ALT=<ID=CN87,Description="Copy number allele: 87 copies">
##ALT=<ID=CN88,Description="Copy number allele: 88 copies">
##ALT=<ID=CN89,Description="Copy number allele: 89 copies">
##ALT=<ID=CN90,Description="Copy number allele: 90 copies">
##ALT=<ID=CN91,Description="Copy number allele: 91 copies">
##ALT=<ID=CN92,Description="Copy number allele: 92 copies">
##ALT=<ID=CN93,Description="Copy number allele: 93 copies">
##ALT=<ID=CN94,Description="Copy number allele: 94 copies">
##ALT=<ID=CN95,Description="Copy number allele: 95 copies">
##ALT=<ID=CN96,Description="Copy number allele: 96 copies">
##ALT=<ID=CN97,Description="Copy number allele: 97 copies">
##ALT=<ID=CN98,Description="Copy number allele: 98 copies">
##ALT=<ID=CN99,Description="Copy number allele: 99 copies">
##ALT=<ID=CN100,Description="Copy number allele: 100 copies">
##ALT=<ID=CN101,Description="Copy number allele: 101 copies">
##ALT=<ID=CN102,Description="Copy number allele: 102 copies">
##ALT=<ID=CN103,Description="Copy number allele: 103 copies">
##ALT=<ID=CN104,Description="Copy number allele: 104 copies">
##ALT=<ID=CN105,Description="Copy number allele: 105 copies">
##ALT=<ID=CN106,Description="Copy number allele: 106 copies">
##ALT=<ID=CN107,Description="Copy number allele: 107 copies">
##ALT=<ID=CN108,Description="Copy number allele: 108 copies">
##ALT=<ID=CN109,Description="Copy number allele: 109 copies">
##ALT=<ID=CN110,Description="Copy number allele: 110 copies">
##ALT=<ID=CN111,Description="Copy number allele: 111 copies">
##ALT=<ID=CN112,Description="Copy number allele: 112 copies">
##ALT=<ID=CN113,Description="Copy number allele: 113 copies">
##ALT=<ID=CN114,Description="Copy number allele: 114 copies">
##ALT=<ID=CN115,Description="Copy number allele: 115 copies">
##ALT=<ID=CN116,Description="Copy number allele: 116 copies">
##ALT=<ID=CN117,Description="Copy number allele: 117 copies">
##ALT=<ID=CN118,Description="Copy number allele: 118 copies">
##ALT=<ID=CN119,Description="Copy number allele: 119 copies">
##ALT=<ID=CN120,Description="Copy number allele: 120 copies">
##ALT=<ID=CN121,Description="Copy number allele: 121 copies">
##ALT=<ID=CN122,Description="Copy number allele: 122 copies">
##ALT=<ID=CN123,Description="Copy number allele: 123 copies">
##ALT=<ID=CN124,Description="Copy number allele: 124 copies">
##contig=<ID=1,length=249250621>
##contig=<ID=2,length=243199373>
##contig=<ID=3,length=198022430>
##contig=<ID=4,length=191154276>
##contig=<ID=5,length=180915260>
##contig=<ID=6,length=171115067>
##contig=<ID=7,length=159138663>
##contig=<ID=8,length=146364022>
##contig=<ID=9,length=141213431>
##contig=<ID=10,length=135534747>
##contig=<ID=11,length=135006516>
##contig=<ID=12,length=133851895>
##contig=<ID=13,length=115169878>
##contig=<ID=14,length=107349540>
##contig=<ID=15,length=102531392>
##contig=<ID=16,length=90354753>
##contig=<ID=17,length=81195210>
##contig=<ID=18,length=78077248>
##contig=<ID=19,length=59128983>
##contig=<ID=20,length=63025520>
##contig=<ID=21,length=48129895>
##contig=<ID=22,length=51304566>
##contig=<ID=GL000191.1,length=106433>
##contig=<ID=GL000192.1,length=547496>
##contig=<ID=GL000193.1,length=189789>
##contig=<ID=GL000194.1,length=191469>
##contig=<ID=GL000195.1,length=182896>
##contig=<ID=GL000196.1,length=38914>
##contig=<ID=GL000197.1,length=37175>
##contig=<ID=GL000198.1,length=90085>
##contig=<ID=GL000199.1,length=169874>
##contig=<ID=GL000200.1,length=187035>
##contig=<ID=GL000201.1,length=36148>
##contig=<ID=GL000202.1,length=40103>
##contig=<ID=GL000203.1,length=37498>
##contig=<ID=GL000204.1,length=81310>
##contig=<ID=GL000205.1,length=174588>
##contig=<ID=GL000206.1,length=41001>
##contig=<ID=GL000207.1,length=4262>
##contig=<ID=GL000208.1,length=92689>
##contig=<ID=GL000209.1,length=159169>
##contig=<ID=GL000210.1,length=27682>
##contig=<ID=GL000211.1,length=166566>
##contig=<ID=GL000212.1,length=186858>
##contig=<ID=GL000213.1,length=164239>
##contig=<ID=GL000214.1,length=137718>
##contig=<ID=GL000215.1,length=172545>
##contig=<ID=GL000216.1,length=172294>
##contig=<ID=GL000217.1,length=172149>
##contig=<ID=GL000218.1,length=161147>
##contig=<ID=GL000219.1,length=179198>
##contig=<ID=GL000220.1,length=161802>
##contig=<ID=GL000221.1,length=155397>
##contig=<ID=GL000222.1,length=186861>
##contig=<ID=GL000223.1,length=180455>
##contig=<ID=GL000224.1,length=179693>
##contig=<ID=GL000225.1,length=211173>
##contig=<ID=GL000226.1,length=15008>
##contig=<ID=GL000227.1,length=128374>
##contig=<ID=GL000228.1,length=129120>
##contig=<ID=GL000229.1,length=19913>
##contig=<ID=GL000230.1,length=43691>
##contig=<ID=GL000231.1,length=27386>
##contig=<ID=GL000232.1,length=40652>
##contig=<ID=GL000233.1,length=45941>
##contig=<ID=GL000234.1,length=40531>
##contig=<ID=GL000235.1,length=34474>
##contig=<ID=GL000236.1,length=41934>
##contig=<ID=GL000237.1,length=45867>
##contig=<ID=GL000238.1,length=39939>
##contig=<ID=GL000239.1,length=33824>
##contig=<ID=GL000240.1,length=41933>
##contig=<ID=GL000241.1,length=42152>
##contig=<ID=GL000242.1,length=43523>
##contig=<ID=GL000243.1,length=43341>
##contig=<ID=GL000244.1,length=39929>
##contig=<ID=GL000245.1,length=36651>
##contig=<ID=GL000246.1,length=38154>
##contig=<ID=GL000247.1,length=36422>
##contig=<ID=GL000248.1,length=39786>
##contig=<ID=GL000249.1,length=38502>
##contig=<ID=MT,length=16569>
##contig=<ID=NC_007605,length=171823>
##contig=<ID=X,length=155270560>
##contig=<ID=Y,length=59373566>
##contig=<ID=hs37d5,length=35477943>
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT HG00096
1 14464 rs546169444 A T 100 PASS AC=2;AF=0.0958466;NS=2504;AN=2;EAS_AF=0.005;EUR_AF=0.1859;AFR_AF=0.0144;AMR_AF=0.1138;SAS_AF=0.1943;DP=26761;AA=a|||;VT=SNP;cadd={'_license': 'http://goo.gl/bkpNhq', 'gerp': {'n': 0.848, 's': -1.7}, 'phred': 0.603};dbsnp={'_license': 'https://goo.gl/Ztr5rl', 'rsid': 'rs546169444'};hgvs_id=chr1:g.14464A>T;wellderly={'_license': 'https://goo.gl/e8OO17', 'alleles': [{'allele': 'A', 'freq': 0.87}, {'allele': 'T', 'freq': 0.13}]} GT 1|1
1 18849 rs533090414 C G 100 PASS AC=2;AF=0.951877;NS=2504;AN=2;EAS_AF=1.0;EUR_AF=0.9911;AFR_AF=0.8411;AMR_AF=0.9769;SAS_AF=0.9939;DP=4700;AA=g|||;VT=SNP;dbsnp={'_license': 'https://goo.gl/Ztr5rl', 'rsid': 'rs533090414'};hgvs_id=chr1:g.18849C>G GT 1|1
"""
variant_input = [{'cadd': {'_license': 'http://goo.gl/bkpNhq', 'gerp': {'n': 0.848, 's': -1.7}, 'phred': 0.603},
'dbsnp': {'_license': 'https://goo.gl/Ztr5rl', 'rsid': 'rs546169444'},
'wellderly': {'_license': 'https://goo.gl/e8OO17',
'alleles': [{'allele': 'A', 'freq': 0.87}, {'allele': 'T', 'freq': 0.13}]},
'hgvs_id': 'chr1:g.14464A>T'},
{'dbsnp': {'_license': 'https://goo.gl/Ztr5rl', 'rsid': 'rs533090414'},
'hgvs_id': 'chr1:g.18849C>G'}]
# NB: output .vcf.gz file and .vcf.gz.tbi files are placed in the same directory as the input file.
# To ensure they are cleaned up after the test is over, place everything in a temporary directory
temp_dir = tempfile.TemporaryDirectory()
out_path = os.path.join(temp_dir.name, "test_out.vcf")
bgzip_filepath = self.test_bgzipped_fps[0]
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name, bgzip_filepath)
test_dataset._write_annotated_vcf(variant_input, out_path)
self.assertTrue(os.path.isfile(out_path))
with open(out_path, 'r') as file_handle:
real_output_contents = file_handle.read()
self.assertEqual(expected_contents, real_output_contents)
[docs] def test__write_annotated_csv(self):
expected_output_csv = """,1000g2015aug_all,cadd,clinvar,cosmic,exonicfunc_knowngene,func_knowngene,genotype_subclass_by_class,hgvs_id,samples
0,0.05,"{'esp': {'af': 0.05}, 'phred': 11}","{'rcv': {'accession': 'ABC123', 'clinical_significance': 'Pathogenic'}}",{'cosmic_id': 'XYZ789'},nonsynonymous SNV,exonic,,chr1:g.1000A>C,{'sample_id': 'sample1'}
1,0.06,{'esp': {'af': 0.05}},,{'cosmic_id': 'XYZ789'},nonsynonymous SNV,intronic,,chr1:g.2000G>T,{'sample_id': 'sample2'}
2,0.05,"{'esp': {'af': 0.95}, 'phred': 40}",,,synonymous SNV,exonic,{'heterozygous': 'compound'},chr1:g.3000T>A,{'sample_id': 'sample3'}
"""
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
input_list = [self.var1, self.var2, self.var3]
test_dataset._mongo_db_collection.insert_many(input_list)
# NB: output .vcf.gz file and .vcf.gz.tbi files are placed in the same directory as the input file.
# To ensure they are cleaned up after the test is over, place everything in a temporary directory
temp_dir = tempfile.TemporaryDirectory()
out_path = os.path.join(temp_dir.name, "test_out.csv")
test_dataset._write_annotated_csv("test__write_annotated_csv", input_list, out_path)
self.assertTrue(os.path.isfile(out_path))
with open(out_path, 'r') as file_handle:
real_output_contents = file_handle.read()
self.assertEqual(expected_output_csv, real_output_contents)
[docs] def test_de_novo_variants(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
dnv = test_dataset.get_de_novo_variants("sample1", "sample2", "sample3")
self.assertListEqual([var['hgvs_id'] for var in dnv], [self.var1['hgvs_id']])
[docs] def test_deleterious_compound_heterozygous_variants_all_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
dch = test_dataset.get_deleterious_compound_heterozygous_variants()
self.assertListEqual([var['hgvs_id'] for var in dch], [self.var3['hgvs_id']])
[docs] def test_deleterious_compound_heterozygous_variants_specific_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2])
dch = test_dataset.get_deleterious_compound_heterozygous_variants()
self.assertEqual(0, len(dch))
[docs] def test_known_disease_variants_all_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
kdv = test_dataset.get_known_disease_variants()
self.assertListEqual([var['hgvs_id'] for var in kdv], [self.var1['hgvs_id'], self.var2['hgvs_id']])
[docs] def test_known_disease_variants_specific_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var3])
kdv = test_dataset.get_known_disease_variants()
self.assertListEqual([var['hgvs_id'] for var in kdv], [self.var1['hgvs_id']])
[docs] def test_rare_deleterious_variants_all_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
rdv = test_dataset.get_rare_deleterious_variants()
self.assertEqual(rdv[0]['samples']['sample_id'], self.var1['samples']['sample_id'])
[docs] def test_rare_deleterious_variants_specific_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var2, self.var3])
rdv = test_dataset.get_rare_deleterious_variants()
self.assertEqual(0, len(rdv))
[docs] def test_get_custom_filtered_variants(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var2, self.var3])
real_output = test_dataset.get_custom_filtered_variants({"1000g2015aug_all": {"$gt": 0.05}})
self.assertListEqual([self.var2["hgvs_id"]], [x['hgvs_id'] for x in real_output])
[docs] def test_get_custom_filtered_variants_empty_db(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
with warnings.catch_warnings(record=True) as warning_set:
real_output = test_dataset.get_custom_filtered_variants({"1000g2015aug_all": {"$gt": 0.05}})
self.assertTrue(len(real_output)==0)
self.assertEqual(1, len(warning_set))
warn_msg = str(warning_set[-1].message)
expected_msg = "Dataset 'queries_test.collect' is empty, so all filters return an empty list."
self.assertEqual(expected_msg, warn_msg)
[docs] def test_get_distinct_sample_ids(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
real_output = test_dataset.get_distinct_sample_ids()
self.assertListEqual(['sample1', 'sample2', 'sample3'], real_output)
[docs] def test_get_all_variants(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
input_list = expected_output = [self.var1, self.var2, self.var3]
test_dataset._mongo_db_collection.insert_many(input_list)
real_output = test_dataset.get_all_variants()
self.assertListEqual(expected_output, real_output)
[docs] def test_get_variants_for_sample(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
sample1_var = test_dataset.get_variants_for_sample("sample1")
self.assertTrue(len(sample1_var) == 1)
self.assertEqual(sample1_var[0]['hgvs_id'], self.var1['hgvs_id'])
[docs] def test_get_variants_for_samples(self):
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
test_dataset._mongo_db_collection.insert_many([self.var1, self.var2, self.var3])
sample_var = test_dataset.get_variants_for_samples(["sample1", "sample2"])
self.assertTrue(len(sample_var) == 2)
self.assertListEqual([var['hgvs_id'] for var in sample_var], [self.var1['hgvs_id'], self.var2['hgvs_id']])
[docs] def test_get_variants_as_dataframe_all(self):
expected_output_csv = """,1000g2015aug_all,cadd,clinvar,cosmic,exonicfunc_knowngene,func_knowngene,genotype_subclass_by_class,hgvs_id,samples
0,0.05,"{'esp': {'af': 0.05}, 'phred': 11}","{'rcv': {'accession': 'ABC123', 'clinical_significance': 'Pathogenic'}}",{'cosmic_id': 'XYZ789'},nonsynonymous SNV,exonic,,chr1:g.1000A>C,{'sample_id': 'sample1'}
1,0.06,{'esp': {'af': 0.05}},,{'cosmic_id': 'XYZ789'},nonsynonymous SNV,intronic,,chr1:g.2000G>T,{'sample_id': 'sample2'}
2,0.05,"{'esp': {'af': 0.95}, 'phred': 40}",,,synonymous SNV,exonic,{'heterozygous': 'compound'},chr1:g.3000T>A,{'sample_id': 'sample3'}
"""
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
input_list = [self.var1, self.var2, self.var3]
test_dataset._mongo_db_collection.insert_many(input_list)
real_output = test_dataset.get_variants_as_dataframe()
self.assertEqual(expected_output_csv, real_output.to_csv())
[docs] def test_get_variants_as_dataframe_some(self):
# NB: output includes ONLY fields that are in at least one of the variants being output so, for example, in
# this test case, genotype_subclass_by_class is not included as a field
expected_output_csv = """,1000g2015aug_all,cadd,clinvar,cosmic,exonicfunc_knowngene,func_knowngene,hgvs_id,samples
0,0.05,"{'esp': {'af': 0.05}, 'phred': 11}","{'rcv': {'accession': 'ABC123', 'clinical_significance': 'Pathogenic'}}",{'cosmic_id': 'XYZ789'},nonsynonymous SNV,exonic,chr1:g.1000A>C,{'sample_id': 'sample1'}
1,0.06,{'esp': {'af': 0.05}},,{'cosmic_id': 'XYZ789'},nonsynonymous SNV,intronic,chr1:g.2000G>T,{'sample_id': 'sample2'}
"""
test_dataset = ns_test.VaprDataset(self._db_name, self._collection_name)
test_dataset._mongo_db_collection.delete_many({})
input_list = [self.var1, self.var2, self.var3]
test_dataset._mongo_db_collection.insert_many(input_list)
real_output = test_dataset.get_variants_as_dataframe([self.var1, self.var2])
self.assertEqual(expected_output_csv, real_output.to_csv())
[docs]class TestVaprAnnotator(unittest.TestCase):
[docs] def test__get_num_lines_in_file(self):
# create a temporary file with 10,000 lines and ensure that is how many lines we get back
num_lines = 10000
temp_file = tempfile.NamedTemporaryFile(delete=False)
for _ in range(0, num_lines):
temp_file.write(b'test line\n')
temp_file.close() # but don't delete yet, as delete=False
real_output = ns_test.VaprAnnotator._get_num_lines_in_file(temp_file.name)
self.assertEqual(num_lines, real_output)
# region _make_jobs_params_tuples_list tests
[docs] def test__make_jobs_params_tuples_list_no_samples_default_verbose(self):
input_file_path = "my/path/to/file.txt"
input_num_file_lines = 21
input_chunk_size = 10
input_db_name = "mydb"
input_collection_name = "mycol"
input_build_version = "hg19"
default_verbose_level = 1
expected_output = [(0, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, default_verbose_level),
(1, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, default_verbose_level),
(2, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, default_verbose_level)]
real_output = ns_test.VaprAnnotator._make_jobs_params_tuples_list(
input_file_path, input_num_file_lines, input_chunk_size, input_db_name, input_collection_name,
input_build_version)
self.assertListEqual(expected_output, real_output)
[docs] def test__make_jobs_params_tuples_list_no_samples_default_verbose_less_than_chunk(self):
input_file_path = "my/path/to/file.txt"
input_num_file_lines = 2
input_chunk_size = 10
input_db_name = "mydb"
input_collection_name = "mycol"
input_build_version = "hg19"
default_verbose_level = 1
expected_output = [(0, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, default_verbose_level)]
real_output = ns_test.VaprAnnotator._make_jobs_params_tuples_list(
input_file_path, input_num_file_lines, input_chunk_size, input_db_name, input_collection_name,
input_build_version)
self.assertListEqual(expected_output, real_output)
[docs] def test__make_jobs_params_tuples_list_samples_with_verbose(self):
input_file_path = "my/path/to/file.txt"
input_num_file_lines = 21
input_chunk_size = 10
input_db_name = "mydb"
input_collection_name = "mycol"
input_build_version = "hg19"
input_verbose_level = 2
input_sample_names_list = ["sample_1", "sample_2"]
expected_output = [(0, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, input_verbose_level, input_sample_names_list),
(1, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, input_verbose_level, input_sample_names_list),
(2, input_file_path, input_chunk_size, input_db_name, input_collection_name,
input_build_version, input_verbose_level, input_sample_names_list)]
real_output = ns_test.VaprAnnotator._make_jobs_params_tuples_list(
input_file_path, input_num_file_lines, input_chunk_size, input_db_name, input_collection_name,
input_build_version, sample_names_list=input_sample_names_list, verbose_level=input_verbose_level)
self.assertListEqual(expected_output, real_output)
# endregion
# region _get_validated_genome_version tests
[docs] def test__get_validated_genome_version_default(self):
real_output = ns_test.VaprAnnotator._get_validated_genome_version(None)
self.assertEqual(ns_test.VaprAnnotator.DEFAULT_GENOME_VERSION, real_output)
[docs] def test__get_validated_genome_version_error(self):
with self.assertRaises(ValueError):
ns_test.VaprAnnotator._get_validated_genome_version("blue")
[docs] def test__get_validated_genome_version(self):
real_output = ns_test.VaprAnnotator._get_validated_genome_version(ns_test.VaprAnnotator.HG38_VERSION)
self.assertEqual(ns_test.VaprAnnotator.HG38_VERSION, real_output)
# endregion
# region _make_merged_vcf tests
[docs] def test__make_merged_vcf_w_design_file(self):
temp_dir = tempfile.TemporaryDirectory()
temp_HG00096_vcf_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, suffix=ns_merge.VCF_EXTENSION,
delete=False)
temp_HG00096_vcf_file.write(ns_merge_test.TestFunctions.HG00096_VCF_CONTENTS.encode('ascii'))
temp_HG00096_vcf_file.close() # but DON'T delete yet
temp_HG00097_vcf_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, suffix=ns_merge.VCF_EXTENSION,
delete=False)
temp_HG00097_vcf_file.write(ns_merge_test.TestFunctions.HG00097_VCF_CONTENTS.encode('ascii'))
temp_HG00097_vcf_file.close() # but DON'T delete yet
design_file_contents = """Sample_Names
{0}
{1}
""".format(temp_HG00096_vcf_file.name, temp_HG00097_vcf_file.name)
temp_design_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False)
temp_design_file.write(design_file_contents.encode('ascii'))
temp_design_file.close() # but DON'T delete yet
expected_output = os.path.join(temp_dir.name, "test.vcf")
real_output = ns_test.VaprAnnotator._make_merged_vcf(temp_dir.name, temp_dir.name, "test",
temp_design_file.name, True)
self.assertEqual(expected_output, real_output)
self.assertTrue(os.path.isfile(real_output))
self.assertTrue(os.stat(real_output).st_size > 0) # file size > 0
[docs] def test__make_merged_vcf_wo_design_file(self):
temp_dir = tempfile.TemporaryDirectory()
temp_HG00096_vcf_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, suffix=ns_merge.VCF_EXTENSION,
delete=False)
temp_HG00096_vcf_file.write(ns_merge_test.TestFunctions.HG00096_VCF_CONTENTS.encode('ascii'))
temp_HG00096_vcf_file.close() # but DON'T delete yet
temp_HG00097_vcf_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, suffix=ns_merge.VCF_EXTENSION,
delete=False)
temp_HG00097_vcf_file.write(ns_merge_test.TestFunctions.HG00097_VCF_CONTENTS.encode('ascii'))
temp_HG00097_vcf_file.close() # but DON'T delete yet
expected_output = os.path.join(temp_dir.name, "test.vcf")
real_output = ns_test.VaprAnnotator._make_merged_vcf(temp_dir.name, temp_dir.name, "test",
None, False)
self.assertEqual(expected_output, real_output)
self.assertTrue(os.path.isfile(real_output))
self.assertTrue(os.stat(real_output).st_size > 0) # file size > 0
# endregion
[docs] def test___init__single_bzipped_file(self):
test_file_dir, test_bgzipped_fps = ns_merge_test.help_get_test_file_info()
temp_dir = tempfile.TemporaryDirectory()
design_file_contents = """Sample_Names
{0}
""".format(test_bgzipped_fps[0])
temp_design_file = tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False)
temp_design_file.write(design_file_contents.encode('ascii'))
temp_design_file.close() # but DON'T delete yet
test_annotator = ns_test.VaprAnnotator(test_file_dir, temp_dir.name, "test_db", "test_col",
design_file=temp_design_file.name)
self.assertListEqual(['HG00096'], test_annotator._sample_names_list)