Source code for genvarloader._dataset._write

import gc
import json
import warnings
from collections.abc import Iterator, Sequence
from importlib.metadata import version
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

if TYPE_CHECKING:
    from .._types import IntervalTrack

import awkward as ak
import numpy as np
import polars as pl
import seqpro as sp
from genoray import PGEN, VCF, Reader, SparseVar
from genoray._svar import dense2sparse
from genoray._svar import _dense2sparse_with_length  # type: ignore[missing-module-attribute]
from genoray._types import V_IDX_TYPE
from genoray._utils import ContigNormalizer, format_memory, parse_memory
from loguru import logger
from more_itertools import mark_ends
from natsort import natsorted
from numpy.typing import NDArray
from pydantic import BaseModel
from pydantic_extra_types.semantic_version import SemanticVersion
from seqpro.rag import Ragged
from tqdm.auto import tqdm

from .._atomic import atomic_dir
from .._ragged import INTERVAL_DTYPE
from .._utils import lengths_to_offsets, normalize_contig_name
from .._variants._utils import path_is_pgen, path_is_vcf
from ._svar_link import SvarLink
from ._utils import bed_to_regions, splits_sum_le_value


DATASET_FORMAT_VERSION = SemanticVersion.parse("1.0.0")
"""On-disk layout version for a gvl.write dataset directory. Bump MAJOR only when
an existing dataset can no longer be read correctly by new code."""


class Metadata(BaseModel, arbitrary_types_allowed=True):
    samples: list[str]
    contigs: list[str]
    n_regions: int
    ploidy: int | None = None
    max_jitter: int = 0
    version: SemanticVersion | None = None
    format_version: SemanticVersion | None = None
    svar_link: SvarLink | None = None

    @property
    def n_samples(self) -> int:
        return len(self.samples)


[docs] def write( path: str | Path, bed: str | Path | pl.DataFrame, variants: str | Path | Reader | None = None, tracks: "IntervalTrack | Sequence[IntervalTrack] | None" = None, samples: list[str] | None = None, max_jitter: int | None = None, overwrite: bool = False, max_mem: int | str = "4g", extend_to_length: bool = True, ): """Write a GVL dataset. Parameters ---------- path Path to write the dataset to. bed :func:`BED-like <genvarloader.read_bedlike()>` file or DataFrame of regions satisfying the BED3+ specification. Specifically, it must have columns 'chrom', 'chromStart', and 'chromEnd'. If 'strand' is present, its values must be either '+' or '-'. Negative stranded regions will be reverse complemented during sequence and/or track reconstruction. variants A :code:`genoray` VCF or PGEN instance (:code:`genoray` is a GVL dependency so it will be import-able). All variants must be left-aligned, bi-allelic, and atomized. Multi-allelic variants can be included by splitting them into bi-allelic half-calls. For VCFs, the `bcftools norm <https://samtools.github.io/bcftools/bcftools.html#norm>`_ command can do all of this normalization. Likewise, see the `PLINK2 documentation <https://www.cog-genomics.org/plink/2.0>`_ for PGEN files. Commands of interest include :code:`--make-bpgen` for splitting variants, :code:`--normalize` for left-aligning and atomizing overlapping variants, and :code:`--ref-from-fa` for REF allele correction. tracks An :class:`IntervalTrack` (e.g. :class:`BigWigs`, :class:`Table`) or a sequence of them. Each track must have a unique ``name``; the on-disk layout writes to ``<path>/intervals/<track.name>/``. samples Samples to include in the dataset max_jitter Maximum jitter to add to the regions overwrite Whether to overwrite an existing dataset max_mem Approximate maximum total memory to use, including the genoray variant index. The reader's index is loaded eagerly at the start of :func:`write` (for :class:`~genoray.VCF` and :class:`~genoray.PGEN`) so that :attr:`~genoray.VCF.nbytes` reflects its true size; that value is subtracted from ``max_mem`` to determine the budget available for genotype chunking. A :class:`ValueError` is raised if the remaining budget is too small to fit even a single variant chunk. Otherwise ``max_mem`` is a soft limit on overall usage and may be exceeded by a small amount. extend_to_length Whether to continue reading/writing variants until all haplotypes have a length at least as long as the intervals in `bed`. Otherwise, deletions can cause the length of haplotypes to be less than the intervals in `bed`. This can be disabled if having haplotypes shorter than the intervals is acceptable, in which case they will be padded with reference bases when appropriate. Disabling this also reduces the amount of data read/written and is faster to run. Notes ----- The dataset directory is built atomically: all data is written to a private sibling temp directory and published via :func:`os.replace`. A best-effort ``filelock`` prevents redundant parallel rebuilds, but correctness relies on the atomic rename — the lock is advisory only. Out of scope: ``genoray`` ``.gvi`` index files and ``pysam`` ``.fai``/``.gzi`` index files are created by those libraries and are not covered by gvl's atomic/locked creation. Concurrent jobs that trigger index creation for those files depend on the upstream libraries' behavior. """ # ignore polars warning about os.fork which is caused by using joblib's loky backend warnings.simplefilter("ignore", RuntimeWarning) try: if variants is None and tracks is None: raise ValueError("At least one of `variants` or `tracks` must be provided.") if tracks is not None and not isinstance(tracks, (list, tuple)): tracks = [tracks] elif tracks is not None: tracks = list(tracks) if tracks is not None: names = [t.name for t in tracks] if len(set(names)) != len(names): raise ValueError( f"Duplicate track names: {names}. Each track must have a unique `name`." ) logger.info(f"Writing dataset to {path}") max_mem = parse_memory(max_mem) metadata: dict[str, Any] = { "version": SemanticVersion.parse(version("genvarloader")), "format_version": DATASET_FORMAT_VERSION, } dest = Path(path) with atomic_dir(dest, overwrite=overwrite) as path: if isinstance(bed, (str, Path)): bed = sp.bed.read(bed) gvl_bed, contigs, input_to_sorted_idx_map = _prep_bed(bed, max_jitter) bed.with_columns(r_idx_map=pl.Series(input_to_sorted_idx_map)).write_ipc( path / "input_regions.arrow" ) metadata["contigs"] = contigs if max_jitter is not None: metadata["max_jitter"] = max_jitter available_samples: set[str] | None = None if variants is not None: if isinstance(variants, (str, Path)): variants = Path(variants) if path_is_pgen(variants): if variants.suffix == "": variants = variants.with_suffix(".pgen") variants = PGEN(variants) elif path_is_vcf(variants): variants = VCF(variants) elif variants.is_dir() and variants.suffix == ".svar": variants = SparseVar(variants) else: raise ValueError( f"File {variants} has an unrecognized file extension. Please provide either a VCF or PGEN file.`" ) if available_samples is None: available_samples = set(variants.available_samples) # Eagerly load the variant index so max_mem accounting is honest. # VCF and PGEN both support lazy-index construction; without this, # variants.nbytes returns 0 and the budget overcounts memory. if isinstance(variants, VCF): if variants._index is None: if not variants._valid_index(): logger.info("VCF genoray index is invalid, writing") variants._write_gvi_index() variants._load_index() elif isinstance(variants, PGEN): variants._init_index() if tracks is not None: unavail = [] for tr in tracks: if unavailable_contigs := set(contigs) - { normalize_contig_name(c, contigs) for c in tr.contigs }: unavail.append(unavailable_contigs) if available_samples is None: available_samples = set(tr.samples) else: available_samples.intersection_update(tr.samples) if unavail: logger.warning( f"Contigs in queries {set().union(*unavail)} are not found in one or more tracks." ) if available_samples is None: raise ValueError( "No samples available across all variant file(s) and/or tracks." ) if samples is None: samples = list(available_samples) elif missing := (set(samples) - available_samples): raise ValueError(f"Samples {missing} not found in variants or tracks.") samples.sort() logger.info(f"Using {len(samples)} samples.") metadata["samples"] = samples metadata["n_regions"] = gvl_bed.height if variants is not None: logger.info("Writing genotypes.") effective_max_mem = max_mem if isinstance(variants, (VCF, PGEN)): idx_bytes = variants.nbytes effective_max_mem = max_mem - idx_bytes logger.info( f"Variant reader resident size: {format_memory(idx_bytes)}; " f"max_mem budget: {format_memory(max_mem)}; " f"available for chunking: {format_memory(max(effective_max_mem, 0))}" ) if isinstance(variants, VCF): bytes_per_var = ( variants.n_samples * variants.ploidy ) # Genos8: 1 byte else: bytes_per_var = ( variants.n_samples * variants.ploidy * 4 ) # int32 if effective_max_mem < bytes_per_var: raise ValueError( f"max_mem ({format_memory(max_mem)}) is too small: the variant " f"index alone consumes {format_memory(idx_bytes)}, leaving " f"{format_memory(max(effective_max_mem, 0))} for chunking, but " f"at least {format_memory(bytes_per_var)} is needed per variant. " f"Increase max_mem." ) if isinstance(variants, VCF): variants.set_samples(samples) gvl_bed = _write_from_vcf( path, gvl_bed, variants, effective_max_mem, extend_to_length ) elif isinstance(variants, PGEN): variants.set_samples(samples) gvl_bed = _write_from_pgen( path, gvl_bed, variants, effective_max_mem, extend_to_length ) elif isinstance(variants, SparseVar): gvl_bed, _svar_link = _write_from_svar( path, gvl_bed, variants, samples, extend_to_length ) metadata["svar_link"] = _svar_link metadata["ploidy"] = variants.ploidy # free memory del variants gc.collect() _write_regions(path, gvl_bed, contigs) if tracks is not None: logger.info("Writing track intervals.") for tr in tracks: _write_track(path, gvl_bed, tr, samples, max_mem) _metadata = Metadata(**metadata) with open(path / "metadata.json", "w") as f: f.write(_metadata.model_dump_json()) logger.info("Finished writing.") finally: warnings.simplefilter("default")
[docs] def get_splice_bed( gtf: str | Path, contigs: list[str] | None = None, transcript_support_level: str | None = "1", require_multiple_of_3: bool = True, ) -> pl.DataFrame: """Process a GTF into a BED-compatible DataFrame for splicing datasets. The result has columns ``chrom``, ``chromStart`` (0-based), ``chromEnd``, ``strand``, ``gene_name``, ``transcript_id``, and ``exon_number``, sorted by chromosome (natural order) and ``chromStart``. Pass it directly to :func:`gvl.write` for splicing datasets. Parameters ---------- gtf Path to a GTF file (gzipped or plain) accepted by :func:`seqpro.gtf.scan`. contigs If provided, keep only rows whose ``seqname`` is in this list. transcript_support_level If a string, require the GTF ``transcript_support_level`` attribute to equal it. ``None`` disables the filter. require_multiple_of_3 If ``True``, keep only transcripts whose summed CDS length is a multiple of 3. """ lf = sp.gtf.scan(gtf) if contigs is not None: lf = lf.filter(pl.col("seqname").is_in(contigs)) lf = lf.filter(pl.col("feature") == "CDS").rename( { "seqname": "chrom", "start": "chromStart", "end": "chromEnd", } ) lf = lf.with_columns( pl.col("chrom").cast(pl.Utf8), pl.col("chromStart") - 1, pl.col("strand").cast(pl.Utf8), sp.gtf.attr("gene_name"), sp.gtf.attr("transcript_id"), sp.gtf.attr("exon_number").cast(pl.Int32), ) drop_cols = ["source", "score", "frame", "feature", "attribute"] if require_multiple_of_3: lf = lf.with_columns( transcript_len=(pl.col("chromEnd") - pl.col("chromStart")) .sum() .over("transcript_id") ).filter(pl.col("transcript_len") % 3 == 0) drop_cols.append("transcript_len") if transcript_support_level is not None: lf = lf.filter( sp.gtf.attr("transcript_support_level") == transcript_support_level ) df = lf.drop(drop_cols).collect() return sp.bed.sort(df)
def _prep_bed( bed: pl.DataFrame, max_jitter: int | None = None, ) -> tuple[pl.DataFrame, list[str], NDArray[np.intp]]: if bed.height == 0: raise ValueError("No regions found in the BED file.") with pl.StringCache(): if "strand" not in bed: bed = bed.with_columns(strand=pl.lit(1, pl.Int32)) else: bed = bed.with_columns( pl.col("strand") .cast(pl.Utf8) .replace_strict({"+": 1, "-": -1, ".": 1}, return_dtype=pl.Int32) ) bed = bed.select("chrom", "chromStart", "chromEnd", "strand") contigs = natsorted(bed["chrom"].unique()) bed = sp.bed.sort(bed.with_row_index()) input_to_sorted_idx_map = np.argsort(bed["index"]) bed = bed.drop("index") if max_jitter is not None: bed = bed.with_columns( chromStart=pl.col("chromStart") - max_jitter, chromEnd=pl.col("chromEnd") + max_jitter, ) return bed, contigs, input_to_sorted_idx_map def _write_regions(path: Path, bed: pl.DataFrame, contigs: list[str]): regions = bed_to_regions(bed, ContigNormalizer(contigs)) np.save(path / "regions.npy", regions) def _write_from_vcf( path: Path, bed: pl.DataFrame, vcf: VCF, max_mem: int, extend_to_length: bool ): out_dir = path / "genotypes" out_dir.mkdir(parents=True, exist_ok=True) assert vcf._index is not None, ( "caller must load the VCF index before _write_from_vcf" ) if vcf._index.select((pl.col("ALT").list.len() > 1).any()).item(): raise ValueError( "VCF with filtering applied still contains multi-allelic variants. Please filter or split them." ) (out_dir / "variants.arrow").hardlink_to(vcf._index_path()) return _write_phased_chunked( out_dir, bed, _vcf_region_chunks(bed, vcf, max_mem, extend_to_length) ) def _window_to_sparse( genos: NDArray[np.integer], var_idxs: NDArray[V_IDX_TYPE], q_start: int, q_end: int, v_starts: NDArray[np.int32], ilens: NDArray[np.int32], extend_to_length: bool, ) -> Ragged: """Convert a full dense region window into per-haplotype sparse genotypes. ``genos`` has shape ``(samples, ploidy, variants)`` and must cover the entire region window (all genoray memory-chunks concatenated along the variant axis). ``var_idxs`` are the window's global variant indices. ``v_starts`` (``POS - 1``) and ``ilens`` (``ILEN``) are window-aligned, positionally aligned with ``var_idxs``. When ``extend_to_length`` is ``True`` this defers to genoray's ``_dense2sparse_with_length``, which walks each haplotype's length and keeps only the variants it needs to reach ``q_end`` (per-haplotype-minimal, identical to ``SparseVar.read_ranges_with_length``). When ``False`` it falls back to plain ``dense2sparse`` (every haplotype keeps exactly the variants it carries within the window, with no length extension). """ if extend_to_length: return _dense2sparse_with_length( genos, var_idxs, q_start, q_end, v_starts, ilens ) return dense2sparse(genos, var_idxs) def _region_end(rag: Ragged, v_ends: NDArray, fallback_end: int) -> int: """Per-region chromEnd = end position of the furthest retained variant. ``rag`` is a sparse ``(samples, ploidy, ~variants)`` Ragged of global variant indices. Returns ``v_ends[max idx]`` across all haplotypes, or ``fallback_end`` when no variant is retained (mirrors _write_from_svar). """ if rag.data.size == 0: return int(fallback_end) return int(v_ends[int(rag.data.max())]) def _region_ends_from_list( ls_sparse: list[Ragged], v_ends: NDArray, fallback_end: int ) -> int: """Same as `_region_end` but over a list of per-chunk Ragged arrays.""" max_idx = -1 for rag in ls_sparse: if rag.data.size: max_idx = max(max_idx, int(rag.data.max())) if max_idx < 0: return int(fallback_end) return int(v_ends[max_idx]) def _vcf_region_chunks( bed: pl.DataFrame, vcf: VCF, max_mem: int, extend_to_length: bool ) -> Iterator[tuple[list[Ragged], Any, str | None]]: assert vcf._index is not None pos = vcf._index["POS"].to_numpy() ilen_all = vcf._index["ILEN"].list.first().to_numpy() # end position of each variant = POS + deletion length (matches _write_from_svar) v_ends = pos - np.clip(ilen_all, a_min=None, a_max=0) for (contig,), df in bed.partition_by( "chrom", as_dict=True, maintain_order=True ).items(): contig = cast(str, contig) starts = df["chromStart"].to_numpy() ends = df["chromEnd"].to_numpy() # unextended in-range variant indices, split per region v_idx, v_offsets = vcf._var_idxs(contig, starts, ends) unextended_idxs = np.array_split(v_idx.astype(V_IDX_TYPE), v_offsets[1:-1]) contig_desc = f"Processing genotypes for {df.height} regions on contig {contig}" first_in_contig = True if extend_to_length: region_iter = vcf._chunk_ranges_with_length( contig, starts, ends, max_mem, VCF.Genos8 ) else: # one generator per region; VCF.chunk takes a single range region_iter = ( vcf.chunk(contig, s, e, max_mem, VCF.Genos8) for s, e in zip(starts, ends) ) for ri, range_ in enumerate(region_iter): q_start = int(starts[ri]) q_end = int(ends[ri]) reg_unext = unextended_idxs[ri] desc = contig_desc if first_in_contig else None first_in_contig = False if extend_to_length: # assemble the full window across memory-chunks chunk_genos_list: list[NDArray] = [] n_ext_total = 0 for _, is_last, (chunk_genos, _chunk_end, n_ext) in mark_ends(range_): chunk_genos_list.append(chunk_genos) if is_last: n_ext_total = n_ext genos = np.concatenate(chunk_genos_list, axis=-1) if reg_unext.size == 0 and n_ext_total == 0: # empty region: no variants for any sample yield [dense2sparse(genos, reg_unext)], q_end, desc continue if n_ext_total > 0: ext_start = int(reg_unext[-1]) + 1 ext_idxs = np.arange( ext_start, ext_start + n_ext_total, dtype=V_IDX_TYPE ) var_idxs = np.concatenate([reg_unext, ext_idxs]) else: var_idxs = reg_unext v_starts = (pos[var_idxs] - 1).astype(np.int32) ilens = ilen_all[var_idxs].astype(np.int32) rag = _window_to_sparse( genos, var_idxs, q_start, q_end, v_starts, ilens, True ) region_end = _region_end(rag, v_ends, q_end) yield [rag], region_end, desc else: # no extension: convert each chunk independently with plain # dense2sparse; var_idxs are exactly the unextended in-range ones ls_sparse: list[Ragged] = [] offset = 0 for genos in range_: n_vars = genos.shape[-1] chunk_idxs = reg_unext[offset : offset + n_vars] offset += n_vars ls_sparse.append(dense2sparse(genos, chunk_idxs)) assert offset == reg_unext.size, ( f"VCF.chunk variant count ({offset}) != _var_idxs count ({reg_unext.size}) " f"for region [{q_start}, {q_end})" ) if not ls_sparse: empty_genos = np.empty( (vcf.n_samples, vcf.ploidy, 0), dtype=np.int8 ) ls_sparse = [dense2sparse(empty_genos, reg_unext)] region_end = _region_ends_from_list(ls_sparse, v_ends, q_end) yield ls_sparse, region_end, desc def _write_from_pgen( path: Path, bed: pl.DataFrame, pgen: PGEN, max_mem: int, extend_to_length: bool ): if pgen._sei is None: raise ValueError( "PGEN with filtering has multi-allelic variants. Please filter or split them." ) assert pgen._sei is not None out_dir = path / "genotypes" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "variants.arrow").hardlink_to(pgen._index_path()) return _write_phased_chunked( out_dir, bed, _pgen_region_chunks(bed, pgen, max_mem, extend_to_length) ) def _pgen_region_chunks( bed: pl.DataFrame, pgen: PGEN, max_mem: int, extend_to_length: bool ) -> Iterator[tuple[list[Ragged], Any, str | None]]: assert pgen._index is not None pos = pgen._index["POS"].to_numpy() ilen_all = pgen._index["ILEN"].list.first().to_numpy() v_ends = pos - np.clip(ilen_all, a_min=None, a_max=0) for (contig,), df in bed.partition_by( "chrom", as_dict=True, maintain_order=True ).items(): contig = cast(str, contig) starts = df["chromStart"].to_numpy() ends = df["chromEnd"].to_numpy() contig_desc = f"Processing genotypes for {df.height} regions on contig {contig}" first_in_contig = True unextended_idxs: list[NDArray] = [] if extend_to_length: region_iter = pgen._chunk_ranges_with_length(contig, starts, ends, max_mem) else: v_idx, v_offsets = pgen.var_idxs(contig, starts, ends) unextended_idxs = np.array_split(v_idx.astype(V_IDX_TYPE), v_offsets[1:-1]) region_iter = ( pgen.chunk(contig, int(s), int(e), max_mem) for s, e in zip(starts, ends) ) for ri, range_ in enumerate(region_iter): q_start = int(starts[ri]) q_end = int(ends[ri]) desc = contig_desc if first_in_contig else None first_in_contig = False if extend_to_length: genos_list: list[NDArray] = [] idx_list: list[NDArray] = [] for genos, _chunk_end, chunk_idxs in range_: genos_list.append(genos.astype(np.int8)) idx_list.append(chunk_idxs.astype(V_IDX_TYPE)) genos = np.concatenate(genos_list, axis=-1) var_idxs = ( np.concatenate(idx_list) if idx_list else np.empty(0, dtype=V_IDX_TYPE) ) if var_idxs.size == 0: yield [dense2sparse(genos, var_idxs)], q_end, desc continue v_starts = (pos[var_idxs] - 1).astype(np.int32) ilens = ilen_all[var_idxs].astype(np.int32) rag = _window_to_sparse( genos, var_idxs, q_start, q_end, v_starts, ilens, True ) region_end = _region_end(rag, v_ends, q_end) yield [rag], region_end, desc else: reg_unext = unextended_idxs[ri] ls_sparse: list[Ragged] = [] offset = 0 for genos in range_: n_vars = genos.shape[-1] chunk_idxs = reg_unext[offset : offset + n_vars] offset += n_vars ls_sparse.append(dense2sparse(genos.astype(np.int8), chunk_idxs)) assert offset == reg_unext.size, ( f"PGEN.chunk variant count ({offset}) != var_idxs count " f"({reg_unext.size}) for region [{q_start}, {q_end})" ) if not ls_sparse: empty_genos = np.empty( (pgen.n_samples, pgen.ploidy, 0), dtype=np.int8 ) ls_sparse = [dense2sparse(empty_genos, reg_unext)] region_end = _region_ends_from_list(ls_sparse, v_ends, q_end) yield ls_sparse, region_end, desc def _write_phased_chunked( out_dir: Path, bed: pl.DataFrame, region_iter: Iterator[tuple[list[Ragged], Any, str | None]], ) -> pl.DataFrame: """Aggregate per-region sparse genotype chunks and write them to ``out_dir``. ``region_iter`` yields one ``(ls_sparse, region_end, pbar_desc)`` per region in ``bed`` order. ``pbar_desc`` updates the progress bar description (used at the first region of a new contig); ``None`` leaves it unchanged. """ v_idx_memmap_offsets = 0 offset_memmap_offsets = 0 last_offset = 0 max_ends: list[Any] = [] pbar = tqdm(total=bed.height, unit=" region") first_no_variant_warning = True for ls_sparse, region_end, desc in region_iter: if desc is not None: pbar.set_description(desc) max_ends.append(region_end) var_idxs = ak.flatten( ak.concatenate(ls_sparse, -1), None, ).to_numpy() # (s p) lengths = np.stack([a.lengths for a in ls_sparse], 0).sum(0) if first_no_variant_warning and (lengths == 0).all(): first_no_variant_warning = False logger.warning( "A region has no variants for any sample. This could be expected depending on the region lengths" " and source of variants. However, this can also be caused by a mismatch between the" " reference genome used for the BED file coordinates and the one used for the variants." ) sp_genos = Ragged.from_lengths(var_idxs, lengths) ( v_idx_memmap_offsets, offset_memmap_offsets, last_offset, ) = _write_phased_variants_chunk( out_dir, sp_genos, v_idx_memmap_offsets, offset_memmap_offsets, last_offset, ) pbar.update() pbar.close() out = np.memmap( out_dir / "offsets.npy", dtype=np.int64, mode="r+", shape=1, offset=offset_memmap_offsets, ) out[-1] = last_offset out.flush() return bed.with_columns(chromEnd=pl.Series(max_ends)) def _write_from_svar( path: Path, bed: pl.DataFrame, svar: SparseVar, samples: list[str], extend_to_length: bool, ) -> tuple[pl.DataFrame, SvarLink]: out_dir = path / "genotypes" out_dir.mkdir(parents=True, exist_ok=True) offsets = np.memmap( out_dir / "offsets.npy", np.int64, "w+", shape=(2, bed.height, len(samples), svar.ploidy), ) with open(out_dir / "svar_meta.json", "w") as f: json.dump({"shape": offsets.shape, "dtype": offsets.dtype.str}, f) v_ends = svar.index.select( end=pl.col("POS") - pl.col("ILEN").list.first().clip(upper_bound=0) )["end"].to_numpy() max_ends = np.empty(bed.height, np.int32) contig_offset = 0 pbar = tqdm(total=bed.height, unit=" region") first_no_variant_warning = True for (c,), df in bed.partition_by( "chrom", as_dict=True, maintain_order=True ).items(): c = cast(str, c) pbar.set_description( f"Processing genotypes for {df.height} regions on contig {c}" ) # set offsets # (2 r s p) out = offsets[:, contig_offset : contig_offset + df.height] if extend_to_length: svar._find_starts_ends_with_length( c, df["chromStart"], df["chromEnd"], samples=samples, out=out ) else: svar._find_starts_ends( c, df["chromStart"], df["chromEnd"], samples=samples, out=out ) if first_no_variant_warning and (out == 0).all((1, 2, 3)).any(): first_no_variant_warning = False logger.warning( "Some regions have no variants for any sample. This could be expected depending on the region lengths" " and source of variants. However, this can also be caused by a mismatch between the" " reference genome used for the BED file coordinates and the one used for the variants." ) # compute max_ends for the bed shape = (df.height, len(samples), svar.ploidy, None) # (r s p ~v) sp_genos = Ragged.from_offsets(svar.genos.data, shape, out.reshape(2, -1)) # this is fine if there aren't any overlapping variants that could make a v_idx < -1 # have a further end than v_idx == -1 # * calling ak.max() means v_idxs is not a view of svar.genos.data # (r s p ~v) -> (r) v_idxs = ak.max(sp_genos, -1).to_numpy().max((1, 2)) c_max_ends = max_ends[contig_offset : contig_offset + df.height] if v_idxs.mask is np.ma.nomask: c_max_ends[:] = v_ends[v_idxs.data] else: c_max_ends[~v_idxs.mask] = v_ends[v_idxs.data[~v_idxs.mask]] c_max_ends[v_idxs.mask] = df.filter(v_idxs.mask)["chromEnd"] contig_offset += df.height pbar.update(df.height) pbar.close() offsets.flush() import os from ._svar_link import SvarFingerprint svar_resolved = svar.path.resolve() variant_idxs_path = svar_resolved / "variant_idxs.npy" svar_link = SvarLink( relative_path=os.path.relpath(svar_resolved, start=path).replace(os.sep, "/"), absolute_path=str(svar_resolved), fingerprint=SvarFingerprint( n_variants=svar.index.height, variant_idxs_bytes=variant_idxs_path.stat().st_size, ), ) return bed.with_columns(chromEnd=pl.Series(max_ends)), svar_link def _write_phased_variants_chunk( out_dir: Path, genos: Ragged, v_idx_memmap_offset: int, offsets_memmap_offset: int, last_offset: int, ): if not genos.is_empty: out = np.memmap( out_dir / "variant_idxs.npy", dtype=genos.data.dtype, mode="w+" if v_idx_memmap_offset == 0 else "r+", shape=genos.data.shape, offset=v_idx_memmap_offset, ) out[:] = genos.data[:] out.flush() v_idx_memmap_offset += out.nbytes offsets = genos.offsets offsets += last_offset last_offset = offsets[-1] out = np.memmap( out_dir / "offsets.npy", dtype=offsets.dtype, mode="w+" if offsets_memmap_offset == 0 else "r+", shape=len(offsets) - 1, offset=offsets_memmap_offset, ) out[:] = offsets[:-1] out.flush() offsets_memmap_offset += out.nbytes return v_idx_memmap_offset, offsets_memmap_offset, last_offset def _write_track( path: Path, bed: pl.DataFrame, track: "IntervalTrack", samples: list[str] | None, max_mem: int, ): if samples is None: _samples = track.samples else: if missing := (set(samples) - set(track.samples)): raise ValueError(f"Samples {missing} not found in track.") _samples = samples MEM_PER_INTERVAL = ( 12 * 2 ) # start u32, end u32, value f32, times 2 for intermediate copies chunk_labels = np.empty(bed.height, np.uint32) chunk_offsets: dict[int, NDArray[np.int64]] = {} n_chunks = 0 last_chunk_offset = 0 pbar = tqdm(total=bed["chrom"].n_unique()) for (contig,), part in bed.partition_by( "chrom", as_dict=True, include_key=False, maintain_order=True ).items(): pbar.set_description(f"Calculating memory usage for {part.height} regions") contig = cast(str, contig) _contig = normalize_contig_name(contig, track.contigs) if _contig is not None: starts = part["chromStart"].to_numpy() ends = part["chromEnd"].to_numpy() # (regions, samples) n_per_query = track.count_intervals(contig, starts, ends, sample=_samples) # (regions) mem_per_r = n_per_query.sum(1) * MEM_PER_INTERVAL if np.any(mem_per_r > max_mem): # TODO subset by samples as well if needed raise NotImplementedError( f"""Memory usage per region exceeds maximum of {max_mem / 1e9} GB. Largest amount needed for a single region is {mem_per_r.max() / 1e9} GB, set `max_mem` to this value or higher. Otherwise, chunking by region and sample is not yet implemented.""" ) split_offsets = splits_sum_le_value(mem_per_r, max_mem) split_lengths = np.diff(split_offsets) for i in range(len(split_lengths)): o_s, o_e = split_offsets[i], split_offsets[i + 1] chunk_idx = n_chunks + i chunk_offsets[chunk_idx] = lengths_to_offsets( n_per_query[o_s:o_e].ravel() ) first_chunk_idx = n_chunks last_chunk_idx = n_chunks + len(split_lengths) _chunk_labels = np.arange( first_chunk_idx, last_chunk_idx, dtype=np.uint32 ).repeat(split_lengths) chunk_labels[last_chunk_offset : last_chunk_offset + len(_chunk_labels)] = ( _chunk_labels ) n_chunks += len(split_lengths) last_chunk_offset += len(_chunk_labels) pbar.update() pbar.close() bed = bed.with_columns(chunk=pl.lit(chunk_labels)) out_dir = path / "intervals" / track.name out_dir.mkdir(parents=True, exist_ok=True) interval_offset = 0 offset_offset = 0 last_offset = 0 pbar = tqdm(total=bed["chunk"].n_unique()) for (chunk_idx,), part in bed.partition_by( "chunk", as_dict=True, include_key=False, maintain_order=True ).items(): chunk_idx = cast(int, chunk_idx) contig = cast(str, part[0, "chrom"]) pbar.set_description(f"Reading intervals for {part.height} regions on {contig}") starts = part["chromStart"].to_numpy() ends = part["chromEnd"].to_numpy() _offsets = chunk_offsets[chunk_idx] intervals = track._intervals_from_offsets( contig, starts, ends, _offsets, sample=_samples ) pbar.set_description(f"Writing intervals for {part.height} regions on {contig}") out = np.memmap( out_dir / "intervals.npy", dtype=INTERVAL_DTYPE, mode="w+" if interval_offset == 0 else "r+", shape=intervals.values.data.shape, offset=interval_offset, ) out["start"] = intervals.starts.data out["end"] = intervals.ends.data out["value"] = intervals.values.data out.flush() interval_offset += out.nbytes offsets = intervals.values.offsets offsets += last_offset last_offset = offsets[-1] out = np.memmap( out_dir / "offsets.npy", dtype=offsets.dtype, mode="w+" if offset_offset == 0 else "r+", shape=len(offsets) - 1, offset=offset_offset, ) out[:] = offsets[:-1] out.flush() offset_offset += out.nbytes pbar.update() pbar.close() out = np.memmap( out_dir / "offsets.npy", dtype=offsets.dtype, mode="r+", shape=1, offset=offset_offset, ) out[-1] = offsets[-1] out.flush()