from pathlib import Path
import numpy as np
import polars as pl
import seqpro as sp
from einops import repeat
from genoray._types import POS_TYPE
from genoray._utils import ContigNormalizer
from natsort import natsorted
from ._dataset._impl import RaggedDataset
from ._dataset._indexing import DatasetIndexer, SpliceIndexer
from ._dataset._intervals import tracks_to_intervals
from ._dataset._reconstruct import Haps, HapsTracks, Tracks, TrackType, _Variants
from ._dataset._reference import Reference
from ._dataset._splice import SpliceMap
from ._dataset._utils import bed_to_regions
from ._ragged import Ragged, RaggedIntervals, RaggedSeqs, RaggedTracks
from ._utils import lengths_to_offsets
from ._variants._records import RaggedAlleles
[docs]
def get_dummy_dataset(spliced: bool = False):
"""Return a dummy :class:`Dataset <genvarloader.Dataset>` with 4 regions, 4 samples, max jitter of 2, a reference genome of all :code:`"N"`, genotypes, and
1 track "read-depth" where each track is :code:`[1, 2, 3, 4, 5, 6]` in the reference coordinate system, where :code:`3` is aligned
with each region's start coordinate. Is initialized to return ragged haplotypes and tracks with no jitter and deterministic reconstruction algorithms.
Parameters
----------
spliced
If :code:`True`, the dataset will be setup for splicing with all regions moved to chromosome 1 and
a splice indexer with 2 genes, "tp53" and "shh", corresponding to regions:
.. code-block:: python
{
"tp53": [3, 0, 2],
"shh": [1],
}
"""
max_jitter = 2
dummy_samples = ["Aang", "Katara", "Sokka", "Toph"]
n_samples = len(dummy_samples)
dummy_contigs = [str(i) for i in range(1, 23)] + ["X", "Y", "MT"]
dummy_bed = pl.DataFrame(
{
"chrom": ["8", "3", "6", "2"],
"chromStart": [5, 13, 8, 2],
"chromEnd": [8, 16, 11, 5],
"strand": ["+", "-", "+", "+"],
"gene": ["tp53", "shh", "tp53", "tp53"],
"exon": [3, 1, 1, 2],
}
)
n_regions = len(dummy_bed)
with pl.StringCache():
pl.Series(natsorted(dummy_contigs), dtype=pl.Categorical())
sorted_bed = dummy_bed.with_row_index().sort(
pl.col("chrom").cast(pl.Categorical())
)
r_idx_map = np.argsort(sorted_bed["index"])
sorted_bed = sorted_bed.drop("index")
dummy_idxer = DatasetIndexer.from_region_and_sample_idxs(
r_idx_map, np.arange(len(dummy_samples)), dummy_samples
)
dummy_regions = bed_to_regions(sorted_bed, ContigNormalizer(dummy_contigs))
ref_len = 20
ref_lens = np.full(len(dummy_contigs), ref_len, dtype=np.int32)
ref = np.full(ref_len * len(dummy_contigs), b"N", dtype="S1").view(np.uint8)
dummy_ref = Reference(
path=Path("dummy"),
reference=ref,
offsets=lengths_to_offsets(ref_lens, np.int64),
pad_char=ord(b"N"),
c_map=ContigNormalizer(dummy_contigs),
)
dummy_vars = _Variants(
path=Path("dummy"),
start=repeat(dummy_regions[:, 1].astype(POS_TYPE), "r -> (r s)", s=n_samples),
ilen=repeat(np.array([-2, -1, 0, 1], np.int32), "s -> (r s)", r=n_regions),
ref=None,
alt=RaggedAlleles.from_offsets( # type: ignore[bad-argument-type] # RaggedAlleles is a Phantom subclass; from_offsets returns base Ragged[bytes_]
data=repeat(sp.cast_seqs("ACGTT"), "a -> (r a)", r=n_regions),
shape=(n_regions * n_samples, None),
offsets=lengths_to_offsets(
repeat(np.array([1, 1, 1, 2]), "s -> (r s)", r=n_regions)
),
),
info={},
)
v_idxs = (
np.array([[3, 2, 4, 1], [1, 3, 2, 4], [2, 1, 4, 3], [4, 2, 3, 1]])[
[3, 1, 2, 0]
] # target lengths
- 1 # 0-based idx within region
+ 4 * np.arange(4)[:, None] # adjust by region/contig offset
).astype(np.int32)
shape = (4, 4, 1, None)
dummy_genos = Ragged.from_offsets(
data=v_idxs.ravel(),
shape=shape,
offsets=np.arange(0, 4 * 4 + 1, dtype=np.int64), # every entry has 1 variant
)
dummy_haps = Haps(
path=Path("dummy"),
reference=dummy_ref,
variants=dummy_vars,
genotypes=dummy_genos,
dosages=None,
kind=RaggedSeqs,
filter=None,
min_af=None,
max_af=None,
)
# (r s), want tracks of [1, 2, 3, 4, 5] for each region so that pad values of 0 are obvious
track_regions = dummy_regions.copy()
track_regions[:, 1] -= max_jitter
track_regions[:, 2] = track_regions[:, 1] + 5 + max_jitter
t_len = 5
starts, ends, values, offsets = tracks_to_intervals(
regions=track_regions,
tracks=repeat(
np.arange(1, 1 + t_len + 2 * max_jitter, dtype=np.float32),
"l -> (r l)",
r=4,
),
track_offsets=lengths_to_offsets(np.full(4, t_len + 2 * max_jitter)),
)
starts = repeat(starts, "(r i) -> (r s i)", r=4, s=4)
ends = repeat(ends, "(r i) -> (r s i)", r=4, s=4)
values = repeat(values, "(r i) -> (r s i)", r=4, s=4)
lengths = np.diff(offsets)
offsets = lengths_to_offsets(repeat(lengths, "r -> (r s)", s=4))
starts = Ragged.from_offsets(starts, (4, 4, None), offsets)
ends = Ragged.from_offsets(ends, (4, 4, None), offsets)
values = Ragged.from_offsets(values, (4, 4, None), offsets)
dummy_itvs = {"read-depth": RaggedIntervals(starts, ends, values)}
# (r), want tracks of [0, 0, 1, 0, 0] for each region so that pad values of 0 are obvious
track_regions = dummy_regions.copy()
track_regions[:, 1] -= max_jitter
track_regions[:, 2] = track_regions[:, 1] + 5 + max_jitter
t_len = 5
one_track = np.zeros(t_len + 2 * max_jitter, dtype=np.float32)
one_track[2] = 1
starts, ends, values, offsets = tracks_to_intervals(
regions=track_regions,
tracks=repeat(one_track, "l -> (r l)", r=len(dummy_regions)),
track_offsets=lengths_to_offsets(np.full(4, t_len + 2 * max_jitter)),
)
lengths = np.diff(offsets)
offsets = lengths_to_offsets(lengths)
shape = (4, None)
starts = Ragged.from_offsets(starts, shape, offsets)
ends = Ragged.from_offsets(ends, shape, offsets)
values = Ragged.from_offsets(values, shape, offsets)
dummy_itvs["annot"] = RaggedIntervals(starts, ends, values)
avail_tracks = {"read-depth": TrackType.SAMPLE, "annot": TrackType.ANNOT}
dummy_tracks = Tracks(
dummy_itvs,
avail_tracks,
avail_tracks,
kind=RaggedTracks,
n_regions=4,
n_samples=4,
)
dummy_recon = HapsTracks(dummy_haps, dummy_tracks)
if spliced:
dummy_bed = dummy_bed.with_columns(chrom=pl.lit("chr1"))
sm, sp_bed = SpliceMap.from_bed(("gene", "exon"), dummy_bed)
dummy_spi = SpliceIndexer(map=sm, dsi=dummy_idxer)
else:
dummy_spi = None
sp_bed = None
dummy_dataset: RaggedDataset[RaggedSeqs, Ragged[np.float32]] = RaggedDataset(
path=Path("dummy"),
output_length="ragged",
max_jitter=max_jitter,
return_indices=False,
contigs=dummy_contigs,
jitter=0,
deterministic=True,
rc_neg=True,
_full_bed=dummy_bed,
_spliced_bed=sp_bed,
_full_regions=dummy_regions,
_idxer=dummy_idxer,
_sp_idxer=dummy_spi,
_seqs=dummy_haps,
_tracks=dummy_tracks,
_seqs_kind="haplotypes",
_recon=dummy_recon,
_rng=np.random.default_rng(),
)
return dummy_dataset