from __future__ import annotations
from collections.abc import Mapping
from pathlib import Path
import numpy as np
import polars as pl
import pyBigWig
from numpy.typing import ArrayLike, NDArray
from seqpro.rag import Ragged
from ._ragged import RaggedIntervals
from ._types import Reader
from ._utils import get_rel_starts, lengths_to_offsets, normalize_contig_name
from .genvarloader import count_intervals as bw_count_intervals
from .genvarloader import intervals as bw_intervals
[docs]
class BigWigs(Reader):
dtype = np.float32 # pyBigWig always returns f32
chunked = False
name: str
paths: dict[str, str]
readers: dict[str, pyBigWig.BigWig] | None
samples: list[str]
coords: dict[str, NDArray]
sizes: dict[str, int]
contigs: Mapping[str, int]
[docs]
def __init__(self, name: str, paths: dict[str, str]) -> None:
"""Read data from bigWig files.
Parameters
----------
name
Name of the reader, for example `'signal'`.
paths
Dictionary of sample names and paths to bigWig files for those samples.
"""
self.name = name
self.paths = paths
self.readers = None
self.samples = list(self.paths)
self.coords = {"sample": np.asarray(self.samples)}
self.sizes = {"sample": len(self.samples)}
contigs: dict[str, int] | None = None
for path in self.paths.values():
with pyBigWig.open(path, "r") as f:
if contigs is None:
contigs = {
contig: int(length) for contig, length in f.chroms().items()
}
else:
common_contigs = contigs.keys() & f.chroms()
contigs = {k: v for k, v in contigs.items() if k in common_contigs}
if contigs is None:
raise ValueError("No bigWig files provided.")
self.contigs = contigs
[docs]
@classmethod
def from_table(cls, name: str, table: str | Path | pl.DataFrame):
"""Read data from bigWig files.
Parameters
----------
name
Name of the reader, for example `'signal'`.
table
Path to a table or a DataFrame containing sample names and paths to bigWig files for those samples.
It must have columns "sample" and "path".
"""
if isinstance(table, (str, Path)):
table = Path(table)
if table.suffix == ".csv":
table = pl.read_csv(table)
elif table.suffix == ".tsv" or table.suffix == ".txt":
table = pl.read_csv(table, separator="\t")
else:
raise ValueError("Table should be a csv or tsv file.")
paths = dict(zip(table["sample"], table["path"]))
return cls(name, paths)
@staticmethod
def rev_strand_fn(data: NDArray[np.float32]) -> NDArray[np.float32]:
return data[..., ::-1]
def close(self):
if self.readers is not None:
for reader in self.readers.values():
reader.close()
self.readers = None
def read(
self,
contig: str,
starts: ArrayLike,
ends: ArrayLike,
**kwargs,
) -> NDArray[np.float32]:
"""Read data corresponding to given genomic coordinates. The output shape will
have length as the final dimension/axis i.e. (..., length).
Parameters
----------
contig
Name of the contig/chromosome.
starts : ArrayLike
Start coordinates, 0-based.
ends : ArrayLike
End coordinates, 0-based, exclusive.
sample
Name of the samples to read data from.
Returns
-------
Shape: (samples length). Data corresponding to the given genomic coordinates and samples.
"""
_contig = normalize_contig_name(contig, self.contigs)
if _contig is None:
raise ValueError(f"Contig {contig} not found.")
else:
contig = _contig
if self.readers is None:
self.readers = {s: pyBigWig.open(p, "r") for s, p in self.paths.items()}
samples = kwargs.get("sample", self.samples)
if isinstance(samples, str):
samples = [samples]
if not set(samples).issubset(self.samples):
raise ValueError(f"Sample {samples} not found in bigWig paths.")
starts = np.atleast_1d(np.asarray(starts, dtype=int))
ends = np.atleast_1d(np.asarray(ends, dtype=int))
rel_starts = get_rel_starts(starts, ends)
rel_ends = rel_starts + (ends - starts)
out = np.empty((len(samples), (ends - starts).sum()), dtype=np.float32)
for s, e, r_s, r_e in zip(starts, ends, rel_starts, rel_ends):
for i, sample in enumerate(samples):
out[i, r_s:r_e] = self.readers[sample].values(contig, s, e, numpy=True)
out = np.nan_to_num(out, copy=False)
return out
def count_intervals(
self,
contig: str,
starts: ArrayLike,
ends: ArrayLike,
sample: str | list[str] | None = None,
**kwargs,
) -> NDArray[np.int32]:
"""Count the number of intervals corresponding to given genomic coordinates.
Parameters
----------
contig
Name of the contig/chromosome.
starts : ArrayLike
Start coordinates, 0-based.
ends : ArrayLike
End coordinates, 0-based, exclusive.
sample
Name of the samples to read data from.
Returns
-------
Shape: (regions, samples). Number of intervals that overlap with each region and sample.
"""
_contig = normalize_contig_name(contig, self.contigs)
if _contig is None:
raise ValueError(f"Contig {contig} not found.")
else:
contig = _contig
if sample is None:
samples = self.samples
elif isinstance(sample, str):
samples = [sample]
else:
samples = sample
if not set(samples).issubset(self.samples):
raise ValueError(f"Sample {samples} not found in bigWig paths.")
starts = np.atleast_1d(np.asarray(starts, dtype=np.int32))
ends = np.atleast_1d(np.asarray(ends, dtype=np.int32))
paths = [self.paths[s] for s in samples]
# (regions, samples)
n_per_query = bw_count_intervals(paths, contig, starts, ends)
return n_per_query
def intervals(
self,
contig: str,
starts: ArrayLike,
ends: ArrayLike,
sample: str | list[str] | None = None,
**kwargs,
) -> RaggedIntervals:
"""Read intervals corresponding to given genomic coordinates. The output data
will be a 2D Ragged array of :code:`struct{start, end, value}` with shape (regions, samples).
Parameters
----------
contig
Name of the contig/chromosome.
starts : ArrayLike
Start coordinates, 0-based.
ends : ArrayLike
End coordinates, 0-based, exclusive.
sample
Name of the samples to read data from.
"""
_contig = normalize_contig_name(contig, self.contigs)
if _contig is None:
raise ValueError(f"Contig {contig} not found.")
else:
contig = _contig
if sample is None:
samples = self.samples
elif isinstance(sample, str):
samples = [sample]
else:
samples = sample
if not set(samples).issubset(self.samples):
raise ValueError(f"Sample {samples} not found in bigWig paths.")
starts = np.atleast_1d(np.asarray(starts, dtype=np.int32))
ends = np.atleast_1d(np.asarray(ends, dtype=np.int32))
paths = [self.paths[s] for s in samples]
# (regions, samples)
n_per_query = bw_count_intervals(paths, contig, starts, ends)
offsets = lengths_to_offsets(n_per_query.ravel())
intervals = self._intervals_from_offsets(contig, starts, ends, offsets, sample)
return intervals
def _intervals_from_offsets(
self,
contig: str,
starts: ArrayLike,
ends: ArrayLike,
offsets: NDArray[np.int64],
sample: str | list[str] | None = None,
**kwargs,
) -> RaggedIntervals:
"""This function is unsafe! Reads intervals corresponding to given genomic coordinates
using provided offsets. If the offsets are incorrect this function is undefined behavior.
To ensure offsets are correct, use the count_intervals function (see :meth:`intervals()`).
The output data will be a 2D Ragged array of :code:`struct{start, end, value}` with shape (regions, samples).
Parameters
----------
contig
Name of the contig/chromosome.
starts : ArrayLike
Start coordinates, 0-based.
ends : ArrayLike
End coordinates, 0-based, exclusive.
offsets
Offsets corresponding to the returned interval data of shape (regions, samples). Can be
computed from the number of intervals per query, e.g. with the count_intervals function.
sample
Name of the samples to read data from.
"""
_contig = normalize_contig_name(contig, self.contigs)
if _contig is None:
raise ValueError(f"Contig {contig} not found.")
else:
contig = _contig
if sample is None:
samples = self.samples
elif isinstance(sample, str):
samples = [sample]
else:
samples = sample
if not set(samples).issubset(self.samples):
raise ValueError(f"Sample {samples} not found in bigWig paths.")
starts = np.atleast_1d(np.asarray(starts, dtype=np.int32))
ends = np.atleast_1d(np.asarray(ends, dtype=np.int32))
paths = [self.paths[s] for s in samples]
# (intervals, 2) (intervals)
coordinates, values = bw_intervals(paths, contig, starts, ends, offsets)
coordinates = coordinates.astype(np.int32)
shape = (len(starts), len(samples), None)
starts = Ragged.from_offsets(coordinates[:, 0], shape, offsets)
ends = Ragged.from_offsets(coordinates[:, 1], shape, offsets)
values = Ragged.from_offsets(values, shape, offsets)
return RaggedIntervals(starts, ends, values)