import requests
import logging
import gzip
import shutil
import os
import re
import pandas as pd
from pathlib import Path
from typing import Optional
from gtfparse import read_gtf
from bs4 import BeautifulSoup, Tag
from .executor import run_plink2
from .utils import get_available_memory, get_optimal_threads
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
[docs]
class Fetcher1000Genome:
[docs]
def __init__(self, destination: Optional[Path] = None, build: str = '38'):
"""Initialize a reference data handler.
This class manages reference data files from 1000 Genomes Project.
Parameters
----------
destination : Path, optional
Path where reference files will be stored. If not provided, defaults to '../data/1000genomes_build_{build}'.
build : str, optional
Human genome build version. Defaults to '38'.
Attributes
----------
destination : Path
Directory path where reference files are stored
build : str
Human genome build version being used
pgen_file : Path
Path to PGEN format file
pvar_file : Path
Path to PVAR format file
psam_file : Path
Path to PSAM format file
bed_file : Path
Path to BED format file
bim_file : Path
Path to BIM format file
fam_file : Path
Path to FAM format file
"""
if not isinstance(build, str):
raise TypeError("Build must be a string representing the genome build version (e.g., '37' or '38').")
if build not in ['37', '38']:
raise ValueError("Build must be either '37' or '38'.")
if not destination:
destination = Path(__file__).resolve().parent.parent / "data" / f"1000genomes_build_{build}"
logger.info(f"Destination folder: {destination}")
self.destination = destination
self.build = build
self.pgen_file = None
self.pvar_file = None
self.psam_file = None
self.bed_file = None
self.bim_file = None
self.fam_file = None
[docs]
def get_1000genomes(self, url_pgen: Optional[str] = None, url_pvar: Optional[str] = None, url_psam: Optional[str] = None)-> Path:
"""
Download and decompress 1000 Genomes reference data.
This method downloads the PLINK2 binary files (.pgen, .pvar, .psam) for the 1000 Genomes
reference dataset, corresponding to the specified genome build (37 or 38). If the files
already exist in the destination directory, the download is skipped.
Parameters:
-----------
url_pgen (str, optional): Custom URL for downloading the .pgen file.
If None, uses default URL based on genome build.
url_pvar (str, optional): Custom URL for downloading the .pvar file.
If None, uses default URL based on genome build.
url_psam (str, optional): Custom URL for downloading the .psam file.
If None, uses default URL based on genome build.
Returns:
--------
Path: Path object pointing to the decompressed .pgen file location.
Note:
-----
The method requires plink2 to be installed and accessible in the system path
for decompressing the .pgen file.
"""
self.destination.mkdir(parents=True, exist_ok=True)
if self.build == '38':
if url_pgen is None:
url_pgen = r"https://www.dropbox.com/s/j72j6uciq5zuzii/all_hg38.pgen.zst?dl=1"
if url_pvar is None:
url_pvar = r"https://www.dropbox.com/scl/fi/fn0bcm5oseyuawxfvkcpb/all_hg38_rs.pvar.zst?rlkey=przncwb78rhz4g4ukovocdxaz&dl=1"
if url_psam is None:
url_psam = r"https://www.dropbox.com/scl/fi/u5udzzaibgyvxzfnjcvjc/hg38_corrected.psam?rlkey=oecjnk4vmbhc8b1p202l0ih4x&dl=1"
elif self.build == '37':
if url_pgen is None:
url_pgen = r"https://www.dropbox.com/s/y6ytfoybz48dc0u/all_phase3.pgen.zst?dl=1"
if url_pvar is None:
url_pvar = r"https://www.dropbox.com/s/odlexvo8fummcvt/all_phase3.pvar.zst?dl=1"
if url_psam is None:
url_psam = r"https://www.dropbox.com/scl/fi/haqvrumpuzfutklstazwk/phase3_corrected.psam?rlkey=0yyifzj2fb863ddbmsv4jkeq6&dl=1"
# Check if final binaries already exist
if self._check_if_binaries_exist():
logger.info("1000 Genomes binaries already exist. Skipping download.")
self.pvar_file = self.destination / "all_phase3.pvar.zst"
self.psam_file = self.destination / "all_phase3.psam"
self.pgen_decompressed = self.destination / "all_phase3.pgen"
return self.pgen_decompressed
# Step 1: Download files if not already downloaded
if not self._check_downloaded_files_exist():
logger.info("Downloading 1000 Genomes data...")
try:
if url_pgen is not None:
self.pgen_file = self._download_file(url_pgen, self.destination / "all_phase3.pgen.zst")
if url_pvar is not None:
self.pvar_file = self._download_file(url_pvar, self.destination / "all_phase3.pvar.zst")
if url_psam is not None:
self.psam_file = self._download_file(url_psam, self.destination / "all_phase3.psam")
logger.info("Download completed successfully.")
except Exception as e:
logger.error(f"Download failed: {e}")
raise
else:
logger.info("Downloaded files already exist. Skipping download.")
# Step 2: Decompress pgen file if not already decompressed
pgen_file = self.destination / "all_phase3.pgen.zst"
pgen_decompressed = self.destination / "all_phase3.pgen"
if not self._check_decompressed_files_exist():
logger.info("Decompressing pgen file from 1000 Genomes data...")
try:
# Execute plink2 command to decompress
run_plink2([
'--zst-decompress', str(pgen_file), str(pgen_decompressed)
])
logger.info("Decompression completed successfully.")
# Clean up compressed pgen file after successful decompression
pgen_file.unlink(missing_ok=True)
logger.info("Compressed pgen file cleaned up.")
except Exception as e:
logger.error(f"Decompression failed: {e}")
# Keep compressed file for retry
raise
else:
logger.info("Decompressed pgen file already exists. Skipping decompression.")
# Clean up compressed file if decompressed version exists
pgen_file.unlink(missing_ok=True)
self.pgen_file = pgen_decompressed
return pgen_decompressed
def _download_file(self, url: str, destination: Path) -> Path:
"""Downloads a file from a given URL using the base class method."""
with requests.get(url, stream=True) as response:
response.raise_for_status()
with open(destination, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
logger.info(f"Downloaded file to: {destination}")
return destination
[docs]
def get_1000genomes_binaries(self) -> Path:
"""
Convert downloaded 1000 Genomes data into PLINK binary files (.bed, .bim, .fam).
This method processes the downloaded 1000 Genomes data files and converts them into PLINK binary format.
If the binary files already exist, it skips the conversion process. The method handles file cleanup
and proper renaming of output files.
The conversion is done in two steps:
1. Convert pfile to binary format including only SNPs from chromosomes 1-22,X,Y,MT
2. Update variant IDs and create final binary files
Returns
-------
Path
Path object pointing to the generated binary files (without extension)
The actual files created will be .bed, .bim, .fam and .psam with the same prefix
"""
# Check if final binaries already exist
if self._check_if_binaries_exist():
logger.info("1000 Genomes binaries already exist. Skipping conversion into bfiles...")
# Clean up any remaining intermediate files
(self.destination / "all_phase3.pgen").unlink(missing_ok=True)
(self.destination / "all_phase3.pgen.zst").unlink(missing_ok=True)
(self.destination / "all_phase3.pvar.zst").unlink(missing_ok=True)
(self.destination / "all_phase3.bed").unlink(missing_ok=True)
(self.destination / "all_phase3.bim").unlink(missing_ok=True)
(self.destination / "all_phase3.fam").unlink(missing_ok=True)
self.bed_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bed')
self.bim_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bim')
self.fam_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.fam')
# Rename psam file to match the final naming convention
original_psam = self.destination / "all_phase3.psam"
final_psam = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.psam')
if original_psam.exists():
self.psam_file = original_psam.rename(final_psam)
else:
self.psam_file = final_psam
return self.destination / f'1kG_phase3_GRCh{self.build}'
memory = get_available_memory()
threads = get_optimal_threads()
# Step 1: Convert pfile to intermediate binary format
if not self._check_intermediate_binaries_exist():
logger.info("Converting pfile to intermediate binary format...")
try:
run_plink2([
'--pfile', str(self.destination / 'all_phase3'), 'vzs',
'--chr', '1-22,X,Y,MT',
'--snps-only',
'--max-alleles', '2',
'--memory', str(memory),
'--threads', str(threads),
'--make-bed',
'--out', str(self.destination / 'all_phase3')
])
logger.info("First conversion step completed successfully.")
# Clean up source files after successful first conversion
(self.destination / "all_phase3.pgen").unlink(missing_ok=True)
(self.destination / "all_phase3.pvar.zst").unlink(missing_ok=True)
logger.info("Source pfile data cleaned up after first conversion.")
except Exception as e:
logger.error(f"First conversion step failed: {e}")
logger.info("Keeping source files for retry.")
raise
else:
logger.info("Intermediate binary files already exist. Skipping first conversion.")
# Clean up source files if intermediate files exist
(self.destination / "all_phase3.pgen").unlink(missing_ok=True)
(self.destination / "all_phase3.pvar.zst").unlink(missing_ok=True)
# Step 2: Create final binary files with updated variant IDs
logger.info("Creating final binary files with updated variant IDs...")
try:
run_plink2([
'--bfile', str(self.destination / 'all_phase3'),
'--set-all-var-ids', '@:#:$r:$a',
'--memory', str(memory),
'--make-bed',
'--out', str(self.destination / f'1kG_phase3_GRCh{self.build}')
])
logger.info("Final conversion step completed successfully.")
# Rename psam file to match the final naming convention
original_psam = self.destination / "all_phase3.psam"
final_psam = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.psam')
if original_psam.exists():
self.psam_file = original_psam.rename(final_psam)
else:
self.psam_file = final_psam
# Verify final files were created successfully
if not self._check_if_binaries_exist():
raise RuntimeError("Final binary files were not created successfully")
# Clean up intermediate files after successful final conversion
(self.destination / "all_phase3.bed").unlink(missing_ok=True)
(self.destination / "all_phase3.bim").unlink(missing_ok=True)
(self.destination / "all_phase3.fam").unlink(missing_ok=True)
logger.info("Intermediate binary files cleaned up after successful final conversion.")
except Exception as e:
logger.error(f"Final conversion step failed: {e}")
logger.info("Keeping intermediate files for retry.")
raise
self.bed_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bed')
self.bim_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bim')
self.fam_file = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.fam')
return self.destination / f'1kG_phase3_GRCh{self.build}'
def _check_if_binaries_exist(self) -> bool:
"""
Checks if all required binary files exist in the destination directory.
This method verifies the existence of .bed, .bim, .fam, and .psam files
for the 1000 Genomes Phase 3 reference panel in the specified genome build.
Returns:
--------
bool: True if all required files exist, False otherwise.
"""
check_bed = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bed').exists()
check_bim = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.bim').exists()
check_fam = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.fam').exists()
check_psam = (self.destination / f'1kG_phase3_GRCh{self.build}').with_suffix('.psam').exists()
return check_bed and check_bim and check_fam and check_psam
def _check_downloaded_files_exist(self) -> bool:
"""
Check if the downloaded raw files exist.
Returns:
--------
bool: True if all downloaded files exist, False otherwise.
"""
pgen_compressed = (self.destination / "all_phase3.pgen.zst").exists()
pvar_compressed = (self.destination / "all_phase3.pvar.zst").exists()
psam_file = (self.destination / "all_phase3.psam").exists()
return pgen_compressed and pvar_compressed and psam_file
def _check_decompressed_files_exist(self) -> bool:
"""
Check if the decompressed pgen file exists.
Returns:
--------
bool: True if decompressed pgen exists, False otherwise.
"""
return (self.destination / "all_phase3.pgen").exists()
def _check_intermediate_binaries_exist(self) -> bool:
"""
Check if intermediate binary files exist (after first conversion).
Returns:
--------
bool: True if intermediate binaries exist, False otherwise.
"""
check_bed = (self.destination / "all_phase3.bed").exists()
check_bim = (self.destination / "all_phase3.bim").exists()
check_fam = (self.destination / "all_phase3.fam").exists()
return check_bed and check_bim and check_fam
[docs]
class ReferenceDataFetcher:
"""A class for fetching, downloading, and processing reference genome data.
This class provides a framework for retrieving genomic reference data from various
sources. It handles downloading compressed files, unzipping them, and extracting
gene information from GTF files.
Attributes
----------
build : str
The genome build (e.g., 'hg38', 'GRCh38').
source : str
The data source (e.g., 'ensembl', 'ucsc').
base_url : str
The base URL to fetch data from.
destination_folder : Optional[str]
The directory to save downloaded files. If None, defaults to project_root/data/{source}_latest.
latest_url : Optional[str]
The URL of the latest release after calling get_latest_release().
gz_file : Optional[str]
Path to the downloaded compressed file.
gtf_file : Optional[str]
Path to the uncompressed GTF file.
Notes
-----
This is an abstract base class that requires subclasses to implement the
get_latest_release() method for specific data sources.
"""
[docs]
def __init__(self, base_url: str, build: str, source: str, destination_folder: Optional[str] = None) -> None:
self.build = build
self.source = source
self.base_url = base_url
self.destination_folder = destination_folder
self.latest_url = None
self.gz_file = None
self.gtf_file = None
pass
[docs]
def get_latest_release(self) -> None:
"""Determine the specific URL for fetching data."""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
def download_latest(self) -> str:
"""Downloads the latest file from `self.latest_url` to `self.destination_folder`.
Raises
------
AttributeError
If `self.latest_url` is not set.
requests.exceptions.RequestException
If the HTTP request fails.
"""
if not self.latest_url:
raise AttributeError("`self.latest_url` is not set. Call `get_latest_release` first.")
self.destination_folder = self.get_destination_folder()
file_path = self.destination_folder / Path(self.latest_url).name
if file_path.exists():
self.gz_file = str(file_path)
logger.info(f"File already exists: {file_path}")
return str(file_path)
self._download_file(self.latest_url, file_path)
self.gz_file = str(file_path)
return str(file_path)
[docs]
def get_destination_folder(self) -> Path:
"""Determine the destination folder for downloads."""
if self.destination_folder:
destination = Path(self.destination_folder)
else:
# Determine project root and default `data` directory
project_root = Path(__file__).resolve().parent.parent
destination = project_root / "data" / f"{self.source}_latest"
destination.mkdir(parents=True, exist_ok=True)
return destination
def _download_file(self, url: str, file_path: Path) -> None:
"""Download a file from the given URL and save it to `file_path`."""
with requests.get(url, stream=True) as response:
response.raise_for_status()
with open(file_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
logger.info(f"Downloaded file to: {file_path}")
def _find_file_in_directory(self, url: str, build_pattern: str, extension: str, avoid_substring: Optional[str] = None) -> Optional[str]:
"""Generic method to find files in HTML directory listings.
Parameters
----------
url : str
URL of the directory to search
build_pattern : str
Pattern to match in filenames (e.g., 'GRCh38', 'Homo_sapiens')
extension : str
File extension to look for (e.g., '.gtf.gz')
avoid_substring : str, optional
Substring to avoid in filenames
Returns
-------
str or None
Filename if found, None otherwise
"""
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to access {url}")
soup = BeautifulSoup(response.text, "html.parser")
for link in soup.find_all("a"):
if isinstance(link, Tag): # Check if it's a Tag element
href = link.get("href")
if href and isinstance(href, str) and build_pattern in href and href.endswith(extension):
if avoid_substring is None or avoid_substring not in href:
return href
return None
[docs]
def unzip_latest(self) -> str:
"""Unzips the latest downloaded file and stores it as a GTF file."""
if not self.latest_url:
raise AttributeError("`self.latest_url` is not set. Call `get_latest_release` first.")
self.destination_folder = self.get_destination_folder()
if not hasattr(self, 'gz_file') or self.gz_file is None or not Path(self.gz_file).is_file():
raise FileNotFoundError("Reference file not found")
gtf_file = self.destination_folder / (Path(self.gz_file).stem) # Removes .gz extension
if gtf_file.exists():
self.gtf_file = str(gtf_file)
logger.info(f"File already exists: {gtf_file}")
return str(gtf_file)
try:
with gzip.open(self.gz_file, 'rb') as f_in:
with open(gtf_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
logger.info(f"Successfully unzipped file to: {gtf_file}")
except OSError as e:
logger.info(f"Error occurred while unzipping the file: {e}")
raise
self.gtf_file = str(gtf_file)
return str(gtf_file)
[docs]
def get_all_genes(self) -> str:
"""Extract all genes from the GTF file and save them to a new compressed file.
This method reads the GTF file specified in self.gtf_file, filters for gene features,
and creates a new GTF file containing only the gene entries. If the output file
already exists, it will return the path without reprocessing.
Returns
-------
str:
Path to the compressed GTF file containing all genes
Raises
------
FileNotFoundError
If the reference GTF file (self.gtf_file) is not found
TypeError
If read_gtf does not return a pandas DataFrame
Note
----
The output file will be named based on the input GTF file with "-all_genes.gtf.gz" suffix
"""
if not hasattr(self, 'gtf_file') or self.gtf_file is None or not os.path.isfile(self.gtf_file):
raise FileNotFoundError("Reference file not found")
if os.path.isfile(self.gtf_file[:-4]+"-all_genes.gtf.gz"):
self.all_genes_path = self.gtf_file[:-4] + "-all_genes.gtf.gz"
logger.info(f"File already exists: {self.all_genes_path}")
return self.all_genes_path
gtf = read_gtf(
self.gtf_file,
usecols =["feature", "gene_biotype", "gene_id", "gene_name"],
result_type='pandas'
)
if not isinstance(gtf, pd.DataFrame):
raise TypeError("read_gtf did not return a pandas DataFrame")
gene_list = gtf[gtf["feature"]=="gene"]["gene_id"].values
gene_list = gtf.loc[gtf["feature"]=="gene", "gene_id"].values
gtf_raw = pd.read_csv(
self.gtf_file,
sep="\t",
header=None,
comment="#",
dtype="string"
)
gtf_raw["_gene_id"] = gtf_raw[8].str.extract(r'gene_id "([\w\.-]+)"')
gtf_raw = gtf_raw.loc[ gtf_raw["_gene_id"].isin(gene_list) ,:]
gtf_raw = gtf_raw.drop("_gene_id",axis=1)
all_genes_path = self.gtf_file[:-4]+"-all_genes.gtf.gz"
gtf_raw.to_csv(all_genes_path, header=False, index=False, sep="\t")
logger.info(f"Saved all genes to: {all_genes_path}")
self.all_genes_path = all_genes_path
return all_genes_path
[docs]
class FetcherLDRegions(ReferenceDataFetcher):
[docs]
def __init__(self, destination: Optional[Path] = None, build: str = '38'):
"""
Initialize LDRegions object.
This initializer sets up the destination path for LD regions files and the genome build version.
If no destination is provided, it defaults to a 'data/ld_regions_files' directory relative to
the parent directory of the current file.
Parameters
----------
destination : Path, optional
Path where LD region files will be stored. If None, uses default path.
built : str, optional
Genome build version, defaults to '38'.
Attributes
----------
destination : Path
Directory path where LD region files are stored
built : str
Genome build version being used
ld_regions : None
Placeholder for LD regions data, initially set to None
"""
if not destination:
destination = Path(__file__).resolve().parent.parent / "data" / "ld_regions_files"
super().__init__(
base_url="", # Not used for LD regions
build=build,
source="ld_regions",
destination_folder=str(destination)
)
self.destination = destination
self.ld_regions = None
[docs]
def get_ld_regions(self)-> Path:
"""Download or create high LD regions file based on genome build version.
This method handles the retrieval of high Linkage Disequilibrium (LD) regions for
different genome builds (37 or 38). For build 37, it downloads the regions from a
GitHub repository. For build 38, it creates the file from predefined coordinates.
Returns
-------
Path
Path to the created/downloaded LD regions file. Returns empty Path if
download fails for build 37.
Raises
------
None
Explicitly, but may raise standard I/O related exceptions.
Notes
-----
- For build 37: Downloads from genepi-freiburg/gwas repository
- For build 38: Creates file from hardcoded coordinates from GWAS-pipeline
- Files are named as 'high-LD-regions_GRCh{build}.txt'
- Creates destination directory if it doesn't exist
"""
self.destination.mkdir(parents=True, exist_ok=True)
out_dir = self.destination
if self.build == '37':
url_ld_regions = r"https://raw.githubusercontent.com/genepi-freiburg/gwas/refs/heads/master/single-pca/high-LD-regions.txt"
ld = requests.get(url_ld_regions)
if ld.status_code == 200:
with open((out_dir / f"high-LD-regions_GRCh{self.build}.txt"), "wb") as f:
f.write(ld.content)
logger.info(f"LD regions file for built {self.build} downloaded successfully to {out_dir}")
self.ld_regions = out_dir / f"high-LD-regions_GRCh{self.build}.txt"
return out_dir / f"high-LD-regions_GRCh{self.build}.txt"
else:
logger.info(f"Failed to download .bim file: {ld.status_code}")
return Path()
elif self.build == '38':
# extracted from
# https://github.com/neurogenetics/GWAS-pipeline
data = [
(1, 47534328, 51534328, "r1"),
(2, 133742429, 137242430, "r2"),
(2, 182135273, 189135274, "r3"),
(3, 47458510, 49962567, "r4"),
(3, 83450849, 86950850, "r5"),
(5, 98664296, 101164296, "r6"),
(5, 129664307, 132664308, "r7"),
(5, 136164311, 139164311, "r8"),
(6, 24999772, 35032223, "r9"),
(6, 139678863, 142178863, "r10"),
(8, 7142478, 13142491, "r11"),
(8, 110987771, 113987771, "r12"),
(11, 87789108, 90766832, "r13"),
(12, 109062195, 111562196, "r14"),
(20, 33412194, 35912078, "r15")
]
with open(out_dir / f'high-LD-regions_GRCH{self.build}.txt', 'w') as file:
for line in data:
file.write(f"{line[0]}\t{line[1]}\t{line[2]}\t{line[3]}\n")
self.ld_regions = out_dir / f"high-LD-regions_GRCH{self.build}.txt"
return out_dir / f'high-LD-regions_GRCH{self.build}.txt'
else:
logger.error(f"Unsupported genome build: {self.build}. Only '37' and '38' are supported.")
return Path()
[docs]
class Ensembl38Fetcher(ReferenceDataFetcher):
"""A class for fetching human genome reference data from Ensembl based on GRCh38 build.
This class extends ReferenceDataFetcher to specifically handle Ensembl's human
genome data with build 38. It provides functionality to find and retrieve the
latest GTF file from Ensembl's FTP server.
Attributes
----------
base_url : str
Base URL for Ensembl FTP server where GTF files are stored
build : str
Genome build version ('38')
source : str
Data source ('ensembl')
destination_folder : str
Local folder to store downloaded files
latest_url : str
URL of the latest GTF file after calling get_latest_release()
Raises
------
Exception
If the Ensembl FTP server cannot be accessed
FileNotFoundError
If no matching GTF file is found
"""
[docs]
def __init__(self, destination_folder = None):
super().__init__(
base_url = "https://ftp.ensembl.org/pub/current_gtf/homo_sapiens/",
build ='38',
source = 'ensembl',
destination_folder = destination_folder
)
[docs]
def get_latest_release(self) -> None:
"""Retrieves the URL of the latest GTF file for human genome (GRCh38) from the base URL.
This method scrapes the base URL to find the most recent Homo_sapiens GRCh38 GTF file
available for download. Upon finding the file, it constructs the complete URL and
stores it in the instance variable `latest_url`.
Returns
-------
None
Raises
------
Exception
If the base URL cannot be accessed (non-200 response)
FileNotFoundError
If no GTF file matching the criteria is found
"""
# Get the latest file dynamically using base class method
latest_gtf = self._find_file_in_directory(
self.base_url,
"Homo_sapiens.GRCh38",
".chr.gtf.gz"
)
if latest_gtf:
latest_url = self.base_url + str(latest_gtf)
logger.info(f"Latest GTF file: {latest_gtf}")
logger.info(f"Download URL: {latest_url}")
self.latest_url = latest_url
else:
raise FileNotFoundError("GTF file not found")
pass
[docs]
class Ensembl37Fetcher(ReferenceDataFetcher):
"""A class for fetching reference genome data from Ensembl's GRCh37 (hg19) repository.
This class specializes the ReferenceDataFetcher to work specifically with
Ensembl's GRCh37 human genome build. It provides functionality to automatically
detect and download the latest available GTF file for Homo sapiens from the
Ensembl GRCh37 archive.
The fetcher connects to Ensembl's FTP server, identifies the most recent release
available for GRCh37, and locates the chromosome GTF file for human genome data.
Attributes
----------
base_url : str
The base URL for Ensembl's GRCh37 repository
build : str
The genome build identifier ('37')
source : str
The data source identifier ('ensembl')
latest_url : str
The complete URL to the latest GTF file, populated after calling get_latest_release()
"""
[docs]
def __init__(self, destination_folder = None):
"""
Initialize a reference genome downloader for Ensembl GRCh37.
This constructor configures the downloader to retrieve data from Ensembl's GRCh37
repository.
Parameters
----------
destination_folder : str, optional
The folder where downloaded files will be stored. If None, a default
location will be used based on the parent class implementation.
"""
super().__init__(
base_url = 'https://ftp.ensembl.org/pub/grch37/',
build ='37',
source = 'ensembl',
destination_folder = destination_folder
)
[docs]
def get_latest_release(self) -> None:
"""Fetches the URL of the latest GTF file for Homo sapiens GRCh37 from Ensembl.
This method:
1. Connects to the base URL and identifies all available release folders
2. Determines the latest release by finding the highest release number
3. Navigates to the GTF directory for that release
4. Locates the Homo sapiens GRCh37 chromosome GTF file
5. Stores the complete download URL in self.latest_url
Raises
------
Exception
If the base URL cannot be accessed
Exception
If no release folders are found
Exception
If the latest release folder cannot be accessed
FileNotFoundError
If the GTF file is not found in the latest release
Returns
-------
None
"""
response = requests.get(self.base_url)
if response.status_code != 200:
raise Exception(f"Failed to access {self.base_url}")
# Parse the HTML content
soup = BeautifulSoup(response.text, "html.parser")
# Find all folder names matching 'release-*'
releases = []
for link in soup.find_all("a"):
href = link.get("href") # type: ignore
if href and isinstance(href, str):
match = re.match(r"release-(\d+)", href)
if match:
releases.append(int(match.group(1))) # Extract the release number as integer
if not releases:
raise Exception("No release folders found.")
latest_release = max(releases) # Get the highest release number
latest_folder = self.base_url + f"release-{latest_release}/" + 'gtf/homo_sapiens/'
response = requests.get(latest_folder)
if response.status_code != 200:
raise Exception(f"Failed to access {latest_folder}")
# Parse the HTML content
soup = BeautifulSoup(response.text, "html.parser")
latest_gtf = None
for link in soup.find_all("a"):
href = link.get("href") # type: ignore
if href and "Homo_sapiens.GRCh37" in href and href.endswith(".chr.gtf.gz"): # type: ignore
latest_gtf = href
break # Assuming the first match is the latest
if latest_gtf:
latest_url = latest_folder + str(latest_gtf)
logger.info(f"Latest GTF file: {latest_gtf}")
logger.info(f"Download URL: {latest_url}")
self.latest_url = latest_url
else:
raise FileNotFoundError("GTF file not found")
pass
[docs]
class RefSeqFetcher(ReferenceDataFetcher):
"""A class for fetching and downloading reference genome data from NCBI's RefSeq repository.
This class extends ReferenceDataFetcher to specifically handle downloading human
genome reference files from the RefSeq database. It supports different genome
builds (e.g., 'GRCh37', 'GRCh38') and automatically identifies the latest version
available for the specified build.
The class handles navigating the NCBI FTP directory structure, finding the
appropriate GTF files for the requested genome build, and managing the
download process.
Attributes
----------
base_url : str
The base URL for the NCBI RefSeq FTP server directory.
build : str
The genome build version ('37' for GRCh37, '38' for GRCh38).
source : str
The source of the reference data (set to 'refseq').
latest_url : str
URL to the latest GTF file, set after calling get_latest_release().
"""
[docs]
def __init__(self, build: str, destination_folder: Optional[str] = None):
super().__init__(
base_url = "https://ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Homo_sapiens/all_assembly_versions/",
build = build,
source = 'refseq',
destination_folder = destination_folder
)
[docs]
def get_latest_release(self) -> None:
"""Fetches the latest GTF file dynamically from the specified base URL.
This method sends a GET request to the base URL, parses the HTML response
to find the latest GTF file link, and sets the `latest_url` attribute
to the full URL of the latest GTF file.
Raises
------
FileNotFoundError
If no GTF file is found in the HTML response.
Returns
-------
None
"""
# Get the latest file dynamically
response = requests.get(self.base_url)
if response.status_code != 200:
raise Exception(f"Failed to access {self.base_url}")
soup = BeautifulSoup(response.text, "html.parser")
if self.build == "38":
version_name = "GRCh38"
elif self.build == "37":
version_name = "GRCh37"
else:
raise ValueError("Unsupported build version. Only 'GRCh37' and 'GRCh38' are supported.")
# Find all folder names matching 'release-*'
latest_release = ''
latest_release_num = 0
for link in soup.find_all("a"):
href = link.get("href") # type: ignore
if href and isinstance(href, str) and version_name in href:
version = href.split('.')[-1][1:-1]
if version.isdigit():
version_num = int(version)
if version_num > latest_release_num:
latest_release_num = version_num
latest_release = href
if len(latest_release)==0:
raise Exception("No release folders found.")
latest_folder = self.base_url + latest_release
response = requests.get(latest_folder)
if response.status_code != 200:
raise Exception(f"Failed to access {latest_folder}")
# Parse the HTML content
soup = BeautifulSoup(response.text, "html.parser")
latest_gtf = None
for link in soup.find_all("a"):
href = link.get("href") # type: ignore
if href and version_name in href and href.endswith(".gtf.gz"): # type: ignore
latest_gtf = href
break # Assuming the first match is the latest
if latest_gtf:
latest_url = latest_folder + str(latest_gtf)
logger.info(f"Latest GTF file: {latest_gtf}")
logger.info(f"Download URL: {latest_url}")
self.latest_url = latest_url
else:
raise FileNotFoundError("GTF file not found")
return
[docs]
class AssemblyReferenceFetcher(ReferenceDataFetcher):
"""A class for fetching and preparing genomic reference files from online repositories.
This class handles the process of:
1. Finding the appropriate reference file URL based on build parameters
2. Downloading the reference file
3. Unzipping compressed reference files if necessary
Parameters
----------
base_url : str
The base URL where reference files are hosted
build : str
The genome build identifier (e.g., 'GRCh38', 'hg19')
extension : str
File extension to look for (e.g., '.gtf.gz', '.fa.gz')
destination_folder : Optional[str], default=None
Path where files should be downloaded. If None, uses project_root/data/assembly_references
avoid_substring : str, default='extra'
Substring to avoid when selecting reference files
Attributes
----------
reference_url : str or None
URL of the identified reference file
reference_file : str or None
Filename of the identified reference file
file_path : Path or None
Local path to the downloaded reference file
Raises
------
Exception
If the base URL cannot be accessed
FileNotFoundError
If no matching reference file is found
AttributeError
If methods are called out of sequence
ValueError
If required attributes are None when needed
"""
[docs]
def __init__(self, base_url: str, build: str, extension: str, destination_folder: Optional[str] = None, avoid_substring: str = 'extra') -> None:
super().__init__(
base_url=base_url,
build=build,
source="assembly_references",
destination_folder=destination_folder
)
self.extension = extension
self.avoid_substring = avoid_substring
self.file_path = None
self.reference_url = None
self.reference_file = None
[docs]
def get_reference_url(self) -> str:
"""Retrieves the URL for the reference file from the base URL.
This method performs an HTTP GET request to the base URL, parses the HTML content,
and searches for links matching specific criteria:
- Contains the build version string
- Ends with the specified extension
- Does not contain the specified substring to avoid
The first matching link is considered the reference file.
Returns
-------
str: The complete URL to the reference file
Raises
------
Exception
If the base URL cannot be accessed
FileNotFoundError
If no matching reference file is found
Notes
-----
- Sets self.reference_file to the name of the found file
- Sets self.reference_url to the complete URL
- Logs information about the found file and URL
"""
# Use base class method to find the reference file
reference_file = self._find_file_in_directory(
self.base_url,
self.build,
self.extension,
self.avoid_substring
)
self.reference_file = reference_file
if reference_file:
reference_url = self.base_url + str(reference_file)
logger.info(f"Latest GTF file: {reference_file}")
logger.info(f"Download URL: {reference_url}")
self.reference_url = reference_url
else:
raise FileNotFoundError("Reference file not found")
return str(reference_url)
[docs]
def download_reference_file(self) -> str:
"""
Downloads a reference file from the specified URL to the destination folder.
This method first checks whether the reference file already exists locally.
If not found, it also looks for an alternative version with a '.fa' extension.
If neither is present, it downloads the file from the given URL.
Raises
------
AttributeError
If `self.reference_url` or `self.reference_file` are not set.
ValueError
If `self.reference_url` or `self.reference_file` are set to None.
Returns
-------
str
The path to the downloaded or existing reference file.
Note
----
`self.reference_url` and `self.reference_file` must be set by calling `get_reference_url()` before using this method.
"""
if not getattr(self, 'reference_url', None):
raise AttributeError("`self.reference_url` is not set. Call `get_reference_url` first.")
if not getattr(self, 'reference_file', None):
raise AttributeError("`self.reference_file` is not set. Call `get_reference_url` first.")
if self.reference_file is None:
raise ValueError("reference_file is None. Cannot construct file path.")
if self.reference_url is None:
raise ValueError("reference_url is None. Cannot download file.")
self.destination_folder = self.get_destination_folder()
file_path = self.destination_folder / Path(str(self.reference_file)).name
if file_path.exists():
logger.info(f"File already exists: {file_path}")
self.file_path = file_path
return str(file_path)
fa_file = file_path.with_suffix('.fa')
if fa_file.exists():
logger.info(f"File already exists: {fa_file}")
self.file_path = file_path
return str(fa_file)
self._download_file(self.reference_url, file_path)
self.file_path = file_path
return str(file_path)
[docs]
def unzip_reference_file(self) -> str:
"""Unzips a reference genome file (typically .fa.gz to .fa) and returns the path to the unzipped file.
This method checks if the file is already unzipped, and if not, unzips it using gzip.
After successful unzipping, the original compressed file is deleted.
Returns
-------
str
Path to the unzipped reference file (.fa)
Raises
------
AttributeError
If self.reference_file is not set (get_reference_url should be called first)
AttributeError
If self.file_path is not set or None (download_reference_file should be called first)
OSError
If an error occurs during the unzipping process
"""
if not getattr(self, 'reference_file', None):
raise AttributeError("`self.reference_file` is not set. Call `get_reference_url` first.")
if not getattr(self, 'file_path', None) or self.file_path is None:
raise AttributeError("`self.file_path` is not set. Call `download_reference_file` first.")
if self.file_path.suffix == '.fa':
logger.info(f"File already unzipped: {self.file_path}")
return str(self.file_path)
fa_file = self.get_destination_folder() / Path(self.file_path.stem)
if fa_file.exists():
logger.info(f"File already exists: {fa_file}")
return str(fa_file)
try:
with gzip.open(self.file_path, 'rb') as f_in:
with open(fa_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
logger.info(f"Successfully unzipped file to: {fa_file}")
except OSError as e:
logger.info(f"Error occurred while unzipping the file: {e}")
raise
self.file_path.unlink()
self.file_path = fa_file
return str(fa_file)