Source code for VAPr.vcf_merging

import os
import shlex
import subprocess
import shutil

__author__ = 'Adam Mark<a1mark@ucsd.edu>'

VCF_EXTENSION = ".vcf"
BGZIP_EXTENSION = ".gz"
BGZIPPED_VCF_EXTENSION = VCF_EXTENSION + BGZIP_EXTENSION

[docs]def bgzip_and_index_vcf(vcf_path): """bgzip and index each vcf so it can be merged with bcftools.""" if vcf_path.endswith(BGZIPPED_VCF_EXTENSION): # TODO: someday: check that the input is *really* bgzipped, rather than just gzipped # TODO: someday: check to for an index file for this compressed file, rather than assuming is there # something like if not os.path.isfile(vcf_path + ".gz.tbi")): raise ValueError bgzipped_vcf_path = vcf_path else: bgzipped_vcf_path = vcf_path + BGZIP_EXTENSION bgzip_cmd_string = _build_bgzip_vcf_command_str(vcf_path) bgzip_args = shlex.split(bgzip_cmd_string) with open(bgzipped_vcf_path, "w") as outfile: p = subprocess.Popen(bgzip_args, stdout=outfile, stderr=subprocess.PIPE) p.communicate() index_cmd_string = _build_index_vcf_command_str(bgzipped_vcf_path) index_args = shlex.split(index_cmd_string) subprocess.call(index_args) return bgzipped_vcf_path
# TODO: someday: refactor since raw_vcf_path_list and vcfs_gzipped are really mutually exclusive
[docs]def merge_vcfs(input_dir, output_dir, project_name, raw_vcf_path_list=None, vcfs_gzipped=False): """Merge vcf files into single multisample vcf, bgzip and index merged vcf file.""" if raw_vcf_path_list is None: vcf_file_extension = BGZIPPED_VCF_EXTENSION if vcfs_gzipped else VCF_EXTENSION raw_vcf_path_list = _get_vcf_file_paths_list_in_directory(input_dir, vcf_file_extension) if len(raw_vcf_path_list) == 0: raise ValueError("No VCFs found with extension '{0}'.".format(vcf_file_extension)) elif len(raw_vcf_path_list) == 0: raise ValueError("Input list of VCF files is empty.") if len(raw_vcf_path_list) > 1: bgzipped_vcf_path_list = set([bgzip_and_index_vcf(vcf_fp) for vcf_fp in raw_vcf_path_list]) single_vcf_path = os.path.join(output_dir, project_name + VCF_EXTENSION) _merge_bgzipped_indexed_vcfs(bgzipped_vcf_path_list, single_vcf_path) else: file_name = os.path.basename(raw_vcf_path_list[0]) # w/o path single_vcf_path = os.path.join(output_dir, file_name) try: # move to output dir with same file name shutil.copyfile(raw_vcf_path_list[0], single_vcf_path) except shutil.SameFileError: # I ran into a case where there was a single input file, AND the input and output dirs were the same so it # was already where it needed to be. In this case, an error is thrown because you can't copy a file to # itself, but that's cool, so just ignore it. pass return single_vcf_path
def _get_vcf_file_paths_list_in_directory(base_dir, vcf_file_extension): vcf_file_paths_list = [] walker = os.walk(base_dir) for folder, _, files in walker: for curr_file in files: if curr_file.endswith(vcf_file_extension): full_path_single_file = os.path.join(os.path.abspath(folder), curr_file) vcf_file_paths_list.append(full_path_single_file) return sorted(vcf_file_paths_list) def _merge_bgzipped_indexed_vcfs(bgzipped_vcf_path_list, output_vcf_fp): merge_cmd_string = _build_merge_vcf_command_str(bgzipped_vcf_path_list) merge_args = shlex.split(merge_cmd_string) with open(output_vcf_fp, 'w') as outfile: p = subprocess.Popen(merge_args, stdout=outfile, stderr=subprocess.PIPE) p.communicate() def _build_merge_vcf_command_str(raw_vcf_path_list): """Generate command string to merge vcf files into single multisample vcf.""" command = " ".join([ 'bcftools merge', " ".join(raw_vcf_path_list) ]) return command def _build_bgzip_vcf_command_str(vcf_path): """Generate command string to bgzip vcf file.""" command = " ".join([ 'bgzip -c', vcf_path ]) return command def _build_index_vcf_command_str(bgzipped_vcf): """Generate command string to index vcf file.""" command = " ".join([ 'tabix -p vcf', bgzipped_vcf ]) return command