import re
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numcodecs
import numpy as np
from bioio import BioImage
from bioio_ome_zarr.writers import Channel, OMEZarrWriter
from bioio_ome_zarr.writers.ome_zarr_writer import MultiResolutionShapeSpec
from bioio_ome_zarr.writers.utils import multiscale_chunk_size_from_memory_target
from zarr.codecs import BloscCodec
from ..cluster import Cluster
[docs]
class OmeZarrConverter:
"""
OmeZarrConverter handles conversion of any BioImage‐supported format
(TIFF, CZI, etc.) into OME-Zarr stores. Supports exporting one, many, or
all scenes from a multi-scene file.
"""
[docs]
def __init__(
self,
*,
source: str,
destination: Optional[str] = None,
scenes: Optional[Union[int, List[int]]] = None,
name: Optional[str] = None,
level_shapes: Optional[MultiResolutionShapeSpec] = None,
chunk_shape: Optional[MultiResolutionShapeSpec] = None,
shard_shape: Optional[MultiResolutionShapeSpec] = None,
compressor: Optional[Union[BloscCodec, numcodecs.abc.Codec]] = None,
zarr_format: Optional[int] = None,
image_name: Optional[str] = None,
channels: Optional[List[Channel]] = None,
rdefs: Optional[Dict[str, Any]] = None,
creator_info: Optional[Dict[str, Any]] = None,
root_transform: Optional[Dict[str, Any]] = None,
axes_names: Optional[List[str]] = None,
axes_types: Optional[List[str]] = None,
axes_units: Optional[List[Optional[str]]] = None,
physical_pixel_size: Optional[List[float]] = None,
num_levels: Optional[int] = None,
downsample_z: bool = False,
memory_target: Optional[int] = None,
start_T_src: Optional[int] = None,
start_T_dest: Optional[int] = None,
tbatch: Optional[int] = None,
dtype: Optional[Union[str, np.dtype]] = None,
auto_dask_cluster: bool = False,
) -> None:
"""
Initialize an OME-Zarr converter with flexible scene selection,
pyramid construction, and chunk-sizing.
Parameters
----------
source : str
Path to the input image (any format supported by BioImage).
destination : Optional[str]
Directory in which to write the ``.ome.zarr`` output(s).
If ``None``, the converter will use the current working directory
scenes : Optional[Union[int, List[int]]]
Which scene(s) to export:
- ``None`` → export all scenes
- ``int`` → a single scene index
- ``List[int]`` → those specific scene indices
name : Optional[str]
Base name for output files (defaults to the source stem). When exporting
multiple scenes, each file name is suffixed with the scene’s name.
level_shapes : Optional[List[Tuple[int, ...]]]
Explicit per-level, per-axis absolute shapes (level 0 first).
Each tuple length must match the native axis count.
If provided, convenience options like ``num_levels`` and ``downsample_z``
are ignored.
chunk_shape : Optional[Union[Tuple[int, ...], Tuple[Tuple[int, ...], ...]]]
Chunk shape for Zarr arrays. Either a single shape applied to all levels
(e.g., ``(1, 1, 16, 256, 256)``) or per-level shapes. Writer validates.
shard_factor : Optional[Tuple[int, ...]]
Optional shard factor per axis (Zarr v3 only). Writer validates.
compressor : Optional[Union[zarr.codecs.BloscCodec, numcodecs.abc.Codec]]
Compression codec. For v2 use ``numcodecs.Blosc``; for v3 use
``zarr.codecs.BloscCodec``.
zarr_format : Optional[int]
Target Zarr array format (``2`` or ``3``). ``None`` lets the writer
choose its default.
image_name : Optional[str]
Image name to record in multiscales metadata. Defaults to the output base.
channels : Optional[List[Channel]]
Optional OMERO-style channel metadata. Only used when a ``'c'`` axis
exists. If omitted, minimal channel models are derived from the reader.
rdefs : Optional[Dict[str, Any]]
Optional OMERO rendering defaults.
creator_info : Optional[Dict[str, Any]]
Optional “creator” metadata block (e.g., tool/version).
root_transform : Optional[Dict[str, Any]]
Optional multiscale root coordinate transformation.
axes_names : Optional[List[str]]
Axis names to write; defaults to the native axis names from the reader.
axes_types : Optional[List[str]]
Axis types (e.g., ``["time","channel","space",...]``). Writer validates.
axes_units : Optional[List[Optional[str]]]
Physical units per axis. Writer validates.
physical_pixel_size : Optional[List[float]]
Physical scale at level 0 per axis. If omitted, values are derived from
``BioImage.scale`` for present axes.
num_levels : Optional[int]
Convenience: number of pyramid levels to generate (including level 0).
If set, an XY half-pyramid is built by default:
- ``1`` = only level 0
- ``2`` = level 0 + one XY half
- ``3`` = level 0 + two XY halves, etc.
If ``downsample_z`` is True, Z is downsampled along with XY at each level.
downsample_z : bool, default = False
Whether to include the Z axis in downsampling when building levels
via ``num_levels``. Ignored if ``level_shapes`` is provided.
memory_target : Optional[int]
If set (bytes), suggests a single chunk shape derived from level-0 shape
and ``dtype`` via ``chunk_size_from_memory_target``. Writer may reuse or
adjust per level.
start_T_src : Optional[int]
Source T index at which to begin reading from the BioImage. Default: use
writer default.
start_T_dest : Optional[int]
Destination T index at which to begin writing into the store. Default:
use writer default.
tbatch : Optional[int]
Number of timepoints to transfer. If None, the converter writes as many
as available in both source and destination.
dtype : Optional[Union[str, np.dtype]]
Override output data type; defaults to the reader’s dtype.
auto_dask_cluster : bool
If True, automatically spin up a local Dask cluster with
8 workers (using `Cluster(n_workers=8).start()`) before any
conv
"""
self.source = source
self.destination = destination or str(Path.cwd())
self.output_basename = name or Path(source).stem
# Optional local Dask cluster
if auto_dask_cluster:
cluster = Cluster(n_workers=8)
cluster.start()
self.bioimage = BioImage(self.source)
self.scene_names = self.bioimage.scenes
nscenes = len(self.scene_names)
if scenes is None:
self.scene_indices = list(range(nscenes))
elif isinstance(scenes, int):
self.scene_indices = [scenes]
else:
self.scene_indices = list(scenes)
self.bioimage.set_scene(0)
self.output_dtype = (
np.dtype(dtype) if dtype is not None else self.bioimage.dtype
)
# Passthroughs
self._writer_level_shapes = level_shapes
self._writer_chunk_shape = chunk_shape
self._writer_shard_shape = shard_shape
self._writer_compressor = compressor
self._writer_zarr_format = zarr_format
self._writer_image_name = image_name
self._writer_channels = channels
self._writer_rdefs = rdefs
self._writer_creator_info = creator_info
self._writer_root_transform = root_transform
self._writer_axes_names = axes_names
self._writer_axes_types = axes_types
self._writer_axes_units = axes_units
self._writer_physical_pixel_size = physical_pixel_size
# Helpers
self._helper_num_levels = num_levels
self._helper_downsample_z = downsample_z
# Chunk suggestion
self._helper_memory_target_bytes = (
None if memory_target is None else memory_target
)
self._start_T_src = start_T_src
self._start_T_dest = start_T_dest
self._tbatch = None if tbatch is None else tbatch
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _infer_physical_pixel_sizes(
self, axis_names: List[str]
) -> Optional[List[float]]:
if self._writer_physical_pixel_size is not None:
return [float(x) for x in self._writer_physical_pixel_size]
# From BioImage.scale; include only present axes
scale_info = self.bioimage.scale
defaults = {"t": 1.0, "z": 1.0, "y": 1.0, "x": 1.0, "c": 1.0}
mapping = {
"t": getattr(scale_info, "T", None),
"z": getattr(scale_info, "Z", None),
"y": getattr(scale_info, "Y", None),
"x": getattr(scale_info, "X", None),
"c": 1.0,
}
return [
float(mapping.get(ax, defaults[ax]) or defaults[ax]) for ax in axis_names
]
def _resolve_channels(
self, axis_names: List[str], channel_count: int
) -> Optional[List[Channel]]:
if "c" not in axis_names:
return None
if self._writer_channels is not None:
return self._writer_channels
labels = self.bioimage.channel_names or [
f"Channel:{i}" for i in range(channel_count)
]
return [Channel(label=lab, color="#FFFFFF") for lab in labels[:channel_count]]
def _native_axes_and_shape_for_scene(
self, scene_index: int
) -> Tuple[List[str], Tuple[int, ...]]:
"""
Use BioImage.reader (the actual format plugin) to discover true
axis order & shape. This reflects CYX, CZYX, TCZYX, etc., without
padding.
"""
self.bioimage.set_scene(scene_index)
r = self.bioimage.reader
order = r.dims.order.upper()
axis_names = [c.lower() for c in order]
shape = tuple(int(getattr(r.dims, ax)) for ax in order)
return axis_names, shape
def _round_shape(
self, base_shape: Tuple[int, ...], factors: Tuple[float, ...]
) -> Tuple[int, ...]:
"""
Apply per-axis factors to `base_shape`; clamp each dim to >= 1.
"""
return tuple(max(1, int(round(d * f))) for d, f in zip(base_shape, factors))
def _build_level_shapes_simple(
self,
axis_names: List[str],
level0_shape: Tuple[int, ...],
) -> Optional[List[Tuple[int, ...]]]:
"""
Build per-level shapes from (num_levels, downsample_z) policy.
- If num_levels <= 1 or None → return None (single level).
- Else produce half-pyramid:
* XY always downsample by 0.5^level.
* If downsample_z=True and 'z' exists, Z also downsample by 0.5^level.
* t/c/other axes remain unchanged.
"""
if not self._helper_num_levels or self._helper_num_levels <= 1:
return None
result: List[Tuple[int, ...]] = [tuple(level0_shape)]
for lvl in range(1, self._helper_num_levels):
factors: List[float] = []
for ax in axis_names:
if ax in ("x", "y"):
factors.append(0.5**lvl)
elif ax == "z" and self._helper_downsample_z:
factors.append(0.5**lvl)
else:
factors.append(1.0)
result.append(self._round_shape(level0_shape, tuple(factors)))
return result
@staticmethod
def _ensure_per_level_shapes(
level_shapes_spec: MultiResolutionShapeSpec,
) -> List[Tuple[int, ...]]:
"""
Normalize a level-shape spec (single or per-level) into a per-level
list of tuples.
"""
if len(level_shapes_spec) == 0:
raise ValueError("level_shapes cannot be empty")
first = level_shapes_spec[0]
if isinstance(first, (int, np.integer)):
# Single level-0 shape
return [tuple(int(x) for x in level_shapes_spec)]
# Already per-level
return [tuple(int(x) for x in level) for level in level_shapes_spec]
# -------------------------------------------------------------------------
# Public
# -------------------------------------------------------------------------
[docs]
def convert(self) -> None:
if len(self.scene_indices) > 1:
bad = [
nm
for i, nm in enumerate(self.scene_names)
if i in self.scene_indices and re.search(r"[<>:\"/\\|?*]", nm)
]
if bad:
warnings.warn(
(
"Scene names contain invalid characters and will be "
"sanitized in filenames: "
f"{bad}"
),
UserWarning,
)
bio = self.bioimage
for scene_index in self.scene_indices:
# (1) Discover native axes/shape from the active reader
axis_names, level0_shape = self._native_axes_and_shape_for_scene(
scene_index
)
# (2) Channels
r = bio.reader
ccount = int(getattr(r.dims, "C", 1)) if "c" in axis_names else 0
channel_models = self._resolve_channels(axis_names, ccount)
pps = self._infer_physical_pixel_sizes(axis_names)
# (3) Scale to writer
if self._writer_level_shapes is not None:
writer_level_shapes_param: MultiResolutionShapeSpec = (
self._writer_level_shapes
)
else:
derived = self._build_level_shapes_simple(axis_names, level0_shape)
writer_level_shapes_param = (
derived if derived is not None else tuple(level0_shape)
)
# (4) Chunking
if self._writer_chunk_shape is not None:
writer_chunk_shape_param: Optional[
MultiResolutionShapeSpec
] = self._writer_chunk_shape
elif self._helper_memory_target_bytes is not None:
# Normalize level shapes to per-level list for the helper
level_shapes_list = self._ensure_per_level_shapes(
writer_level_shapes_param
)
suggested = multiscale_chunk_size_from_memory_target(
level_shapes_list,
self.output_dtype,
self._helper_memory_target_bytes,
)
writer_chunk_shape_param = [tuple(map(int, s)) for s in suggested]
else:
writer_chunk_shape_param = None # writer suggests per-level ~16 MiB
# (5) Output path
scene_name = self.scene_names[scene_index]
base = (
self.output_basename
if len(self.scene_indices) == 1
else f"{self.output_basename}_{scene_name}"
)
base = re.sub(r"[<>:\"/\\|?*]", "_", base)
out_path = Path(self.destination) / f"{base}.ome.zarr"
if out_path.exists():
raise FileExistsError(f"{out_path} already exists.")
# (6) Build writer kwargs
writer_kwargs: Dict[str, Any] = {
"store": str(out_path),
"level_shapes": writer_level_shapes_param,
"dtype": self.output_dtype,
**{
k: v
for k, v in {
"chunk_shape": writer_chunk_shape_param,
"shard_shape": self._writer_shard_shape,
"compressor": self._writer_compressor,
"zarr_format": self._writer_zarr_format,
"image_name": (self._writer_image_name or base),
"channels": channel_models,
"rdefs": self._writer_rdefs,
"creator_info": self._writer_creator_info,
"root_transform": self._writer_root_transform,
"axes_names": (self._writer_axes_names or axis_names),
"axes_types": self._writer_axes_types,
"axes_units": self._writer_axes_units,
"physical_pixel_size": pps,
}.items()
if v is not None
},
}
writer = OMEZarrWriter(**writer_kwargs)
# (7) Read pixels directly from the reader in its native (unpadded) order
bio.set_scene(scene_index)
r = bio.reader
native_order = r.dims.order.upper()
data_all = r.get_image_dask_data(native_order)
# (8) Write
has_t = "t" in axis_names
T_total = int(getattr(r.dims, "T", 1)) if has_t else 1
if has_t and T_total > 1:
kwargs: Dict[str, Any] = {"data": data_all}
if self._start_T_src is not None:
kwargs["start_T_src"] = self._start_T_src
if self._start_T_dest is not None:
kwargs["start_T_dest"] = self._start_T_dest
kwargs["total_T"] = (
self._tbatch if self._tbatch is not None else T_total
)
writer.write_timepoints(**kwargs)
else:
writer.write_full_volume(data_all)