import logging
import pandas as pd
from pathlib import Path
from ..core.executor import shell_do
from ..core.get_references import FetcherLDRegions, Fetcher1000Genome
from ..qc.ancestry_qc import ReferenceGenomicMerger
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
[docs]
class FstSummary:
[docs]
def __init__(self, input_path: Path, input_name: str, output_path: Path, high_ld_file: Path=Path(), build: str = '38', recompute_merge: bool = True, reference_files: dict = dict()) -> None:
"""
Initialize FstSummary object for Fst analysis.
Parameters
----------
input_path : Path
Path to the directory containing input files
input_name : str
Name of the input file
output_path : Path
Path to the directory where results will be saved
Raises
------
TypeError
If input types are incorrect for any parameter
FileNotFoundError
If input_path or output_path do not exist
"""
if not isinstance(input_path, Path):
raise TypeError("input_path should be a Path object")
if not isinstance(output_path, Path):
raise TypeError("output_path should be a Path object")
if not isinstance(input_name, str):
raise TypeError("input_name should be a string")
if not input_path.exists():
raise FileNotFoundError("input_path does not exist")
if not output_path.exists():
raise FileNotFoundError("output_path does not exist")
if not isinstance(build, str):
raise TypeError("build should be a string")
if build not in ['37', '38']:
raise ValueError("build should be either '37' or '38'")
if not high_ld_file.is_file():
logger.info(f"High LD file not found at {high_ld_file}")
logger.info('High LD file will be fetched from the package')
ld_fetcher = FetcherLDRegions()
ld_fetcher.get_ld_regions()
if ld_fetcher.ld_regions is None:
raise FileNotFoundError("Could not fetch LD regions file.")
high_ld_file = ld_fetcher.ld_regions
logger.info(f"High LD file fetched from the package and saved at {high_ld_file}")
self.input_path = input_path
self.input_name = input_name
self.output_path= output_path
self.recompute_merge = recompute_merge
self.high_ld_regions = high_ld_file
self.build = build
if not reference_files:
logger.info(f"No reference files provided. Fetching 1000 Genomes reference data for build {self.build}")
fetcher = Fetcher1000Genome(build=self.build)
fetcher.get_1000genomes()
fetcher.get_1000genomes_binaries()
self.reference_files = {
'bim': fetcher.bim_file,
'bed': fetcher.bed_file,
'fam': fetcher.fam_file,
'psam': fetcher.psam_file
}
self.results_dir = self.output_path / 'fst_results'
self.results_dir.mkdir(parents=True, exist_ok=True)
self.merging_dir = self.results_dir / 'merging'
self.merging_dir.mkdir(parents=True, exist_ok=True)
pass
[docs]
def merge_reference_study(self, ind_pair: list = [50, 5, 0.2]) -> None:
"""
Merge reference and study data by applying quality control filters and merging steps.
This method performs a series of quality control steps to merge study data with reference data:
1. Filters problematic SNPs
2. Performs LD pruning
3. Fixes chromosome mismatches
4. Fixes position mismatches
5. Fixes allele flips
6. Removes remaining mismatches
7. Merges the datasets
Parameters
----------
ind_pair : list, default [50, 5, 0.2]
Parameters for LD pruning: [window size, step size, r2 threshold]
Returns
-------
None
Notes
-----
If recompute_merge is False, the method will skip the merging process and expect
merged data to already exist in the merging directory.
Raises
------
TypeError
If ind_pair is not a list
"""
if not isinstance(ind_pair, list):
raise TypeError("ind_pair should be a list")
if not self.recompute_merge:
logger.info("STEP: Merging study and reference data: recompute_merge is set to False. Skipping merging step")
logger.info(f"STEP: Merging study and reference data: merged data is expected to be in {self.merging_dir}")
return
rgm = ReferenceGenomicMerger(
input_path= self.input_path,
input_name= self.input_name,
output_path= self.merging_dir,
output_name= 'cleaned-with-ref',
high_ld_regions =self.high_ld_regions,
reference_files = self.reference_files,
)
rgm.execute_rename_snpid()
rgm.execute_filter_prob_snps()
rgm.execute_ld_pruning(ind_pair=ind_pair)
rgm.execute_fix_chromosome_mismatch()
rgm.execute_fix_position_mismatch()
rgm.execute_fix_allele_flip()
rgm.execute_remove_mismatches()
rgm.execute_merge_data()
for file in self.merging_dir.iterdir():
if file.is_file() and '-merged' not in file.name and file.suffix != '.log':
file.unlink()
return
[docs]
def compute_fst(self) -> None:
"""
Compute FST (fixation index) statistics between populations.
This method calculates FST statistics between each super-population in the dataset
and a study population ('StPop'). The process involves:
1. Reading population tags from the specified file
2. For each unique super-population (except 'StPop'):
- Creating population filter files (keep and within files)
- Running PLINK commands to filter the dataset and compute FST statistics
The method requires the following instance variables to be set:
- population_tags: Path to a file containing population information
- results_dir: Directory where results will be stored
- merging_dir: Directory containing the merged genotype data
Returns:
--------
None
"""
df_tags = pd.read_csv(self.population_tags, sep=r"\s+", engine='python')
files = dict()
for pop in df_tags['SuperPop'].unique():
if pop != 'StPop':
df_temp = df_tags[(df_tags['SuperPop'] == pop) | (df_tags['SuperPop'] == 'StPop')].reset_index(drop=True)
df_temp[['ID1', 'ID2']].to_csv(self.results_dir / f'keep-{pop}_StPop.txt', sep='\t', index=False, header=False)
df_temp.to_csv(self.results_dir / f'within-{pop}_StPop.txt', index=False, header=False, sep='\t',)
files[pop] = (self.results_dir / f'keep-{pop}_StPop.txt', self.results_dir / f'within-{pop}_StPop.txt')
logger.info(f"Created keep and within files for population {pop}")
input_file = self.merging_dir / 'cleaned-with-ref-merged'
for key in files.keys():
keep_file, within_file = files[key]
output_file = self.results_dir / f'keep-{key}-StPop'
plink_cmd1 = f"plink --bfile {input_file} --keep {keep_file} --make-bed --out {output_file}"
plink_cmd2 = f"plink --bfile {output_file} --fst --within {within_file} --out {self.results_dir / f'fst-{key}-StPop'}"
plink_cmds = [plink_cmd1, plink_cmd2]
for cmd in plink_cmds:
shell_do(cmd, log=True)
logger.info("Fst computation completed for all populations.")
return
[docs]
def report_fst(self) -> pd.DataFrame:
"""
Generate a report of Fst results.
This method reads the Fst results from the results directory and generates a summary report.
Returns
-------
pd.DataFrame
DataFrame containing the Fst results summary
Raises
------
FileNotFoundError
If no Fst result files are found in the results directory.
"""
df_summary = pd.DataFrame(columns=['SuperPop', 'Fst', 'WeightedFst'])
# Get a list of all log files in the results directory
log_files = [f for f in self.results_dir.iterdir() if f.is_file() and f.suffix == '.log']
if not log_files:
raise FileNotFoundError(f"No log files found in {self.results_dir}")
logger.info(f"Found {len(log_files)} log files in {self.results_dir}")
# Extract the population names from log file names
# Assuming log files follow the pattern 'fst-{population}-StPop.log'
files = {}
for log_file in log_files:
if log_file.stem.startswith('fst-') and log_file.stem.endswith('-StPop'):
pop = log_file.stem.split('-')[1]
files[pop] = log_file
logger.info(f"Found Fst result file for population {pop} at {log_file}")
if not files:
raise FileNotFoundError(f"No Fst result files found in {self.results_dir}")
for key in files.keys():
log_file = files[key]
with open(log_file, 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('Mean Fst'):
fst = line.split(':')[1].strip()
if line.startswith('Weighted Fst'):
weighted_fst = line.split(':')[1].strip()
df_summary = pd.concat([df_summary, pd.DataFrame({'SuperPop': [key], 'Fst': [fst], 'WeightedFst': [weighted_fst]})], ignore_index=True)
df_summary.to_csv(
self.results_dir / 'fst_summary.csv',
index=False,
sep='\t'
)
logger.info(f"Fst summary report generated at {self.results_dir / 'fst_summary.csv'}")
for file in self.results_dir.iterdir():
if file.is_file() and (file.suffix == '.bed' or file.suffix == '.bim' or file.suffix == '.fam'):
file.unlink()
return df_summary