Source code for bioio_conversion.converters.batch_converter
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, Union
import pandas as pd
from .ome_zarr_converter import OmeZarrConverter
[docs]
class BatchConverter:
"""
BatchConverter orchestrates bulk conversions of image files using
a specified converter backend.
Supports three input modes:
- CSV-driven: each row defines a conversion job
- Directory-driven: scan up to `max_depth` for matching files
- List-driven: explicit list of file paths
Default parameters for all jobs may be provided via `default_opts`.
"""
# Map converter keys to classes
_CONVERTERS: Dict[str, Type] = {
"ome-zarr": OmeZarrConverter,
}
[docs]
def __init__(
self,
*,
converter_key: str = "ome-zarr",
default_opts: Optional[Dict[str, Any]] = None,
):
"""
Initialize the BatchConverter.
Parameters
----------
converter_key : str
Key to select the converter backend (must exist in `_CONVERTERS`).
default_opts : dict, optional
Shared default options for each job (e.g. destination, tbatch, overwrite).
"""
if converter_key not in self._CONVERTERS:
raise KeyError(f"Unknown converter: {converter_key}")
self.converter_cls = self._CONVERTERS[converter_key]
self.default_opts = default_opts.copy() if default_opts else {}
[docs]
def from_csv(self, csv_path: Union[str, Path]) -> List[Dict[str, Any]]:
"""
Parse a CSV file into a list of job option dicts.
Each column maps to a converter parameter. Empty cells are skipped.
Values that decode as JSON become native Python objects.
"""
df = pd.read_csv(csv_path, dtype=str).fillna("")
jobs: List[Dict[str, Any]] = []
for _, row in df.iterrows():
opts = self.default_opts.copy()
for col, val in row.items():
if not val:
continue
try:
parsed = json.loads(val)
except json.JSONDecodeError:
parsed = val
opts[col] = parsed
jobs.append(opts)
return jobs
[docs]
def from_directory(
self,
directory: Union[str, Path],
*,
max_depth: int = 0,
pattern: str = "*",
) -> List[Dict[str, Any]]:
"""
Recursively find files matching `pattern` up to `max_depth` levels.
max_depth=0 → only top-level files
max_depth=1 → include one subdirectory level, etc.
"""
base = Path(directory)
if not base.is_dir():
raise ValueError(f"Not a directory: {base}")
jobs: List[Dict[str, Any]] = []
for path in base.rglob(pattern):
if not path.is_file():
continue
rel = path.relative_to(base)
# depth = number of subfolders = len(parts) - 1
if len(rel.parts) - 1 <= max_depth:
opts = self.default_opts.copy()
opts["source"] = str(path)
jobs.append(opts)
return jobs
[docs]
def from_list(
self,
paths: List[Union[str, Path]],
) -> List[Dict[str, Any]]:
"""
Build jobs from an explicit list of file paths.
Each path yields one job dict; default_opts are merged in.
"""
jobs: List[Dict[str, Any]] = []
for p in paths:
opts = self.default_opts.copy()
opts["source"] = str(p)
jobs.append(opts)
return jobs
[docs]
def run_jobs(
self,
jobs: List[Dict[str, Any]],
) -> None:
"""
Execute each job: must include 'source'; merges defaults and job params.
"""
for job in jobs:
source = job.get("source")
if not source:
raise ValueError("Job missing 'source'")
# Merge defaults and job params, excluding 'source'
params = {k: v for k, v in job.items() if k != "source"}
conv_opts = {**self.default_opts, **params}
conv = self.converter_cls(source=source, **conv_opts)
conv.convert()