Source code for bioio_base.transforms

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections import Counter
from numbers import Integral
from typing import Any, List, Literal, Optional, Tuple, Union

import dask.array as da
import numpy as np
import xarray as xr

from . import types
from .exceptions import ConflictingArgumentsError, UnexpectedShapeError
from .image_container import ImageContainer

###############################################################################


[docs] def reduce_to_slice(L: Union[List, Tuple]) -> Union[int, List, slice, Tuple]: # if the list only has one element, then just use it if len(L) == 1: return slice(L[0], L[0] + 1) # if the list has at least 2 elements we can check for sliceable # it is convertable to a slice if the step size between each # consecutive pair of elements is equal and positive # 1. get all the deltas in a list: steps = [(L[i + 1] - L[i]) for i in range(len(L) - 1)] # 2. check if all the deltas are equal and positive if steps[0] > 0 and steps.count(steps[0]) == len(steps): return slice(min(L), max(L) + 1, steps[0]) # if we can't convert to a slice, then just return the list unmodified return L
[docs] def transpose_to_dims( data: types.ArrayLike, given_dims: str, return_dims: str, ) -> types.ArrayLike: """ This shuffles the data dimensions from given_dims to return_dims. Each dimension must be present in given_dims must be used in return_dims Parameters ---------- data: types.ArrayLike Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions specified in given_dims given_dims: str The dimension ordering of data, "CZYX", "VBTCXZY" etc return_dims: str The dimension ordering of the return data Returns ------- data: types.ArrayLike The data with the specified dimension ordering. Raises ------ ConflictingArgumentsError given_dims and return_dims are incompatible. """ # Use a counter to track that the contents are composed of the same letters # and that no letter is repeated if ( Counter(given_dims) != Counter(return_dims) or max(Counter(given_dims).values()) > 1 ): raise ConflictingArgumentsError( f"given_dims={given_dims} and return_dims={return_dims} are incompatible." ) # Resort the data into return_dims order match_map = {dim: given_dims.find(dim) for dim in given_dims} transposer = [] for dim in return_dims: transposer.append(match_map[dim]) data = data.transpose(transposer) return data
[docs] def reshape_data( data: types.ArrayLike, given_dims: str, return_dims: str, **kwargs: Any ) -> types.ArrayLike: """ Reshape the data into return_dims, pad missing dimensions, and prune extra dimensions. Warns the user to use the base reader if the depth of the Dimension being removed is not 1. Parameters ---------- data: types.ArrayLike Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions specified in given_dims given_dims: str The dimension ordering of data, "CZYX", "VBTCXZY" etc return_dims: str The dimension ordering of the return data kwargs: * C=1 => desired specific channel, if C in the input data has depth 3 then C=1 returns the 2nd slice (0 indexed) * Z=10 => desired specific channel, if Z in the input data has depth 20 then Z=10 returns the 11th slice * T=[0, 1] => desired specific timepoints, if T in the input data has depth 100 then T=[0, 1] returns the 1st and 2nd slice (0 indexed) * T=(0, 1) => desired specific timepoints, if T in the input data has depth 100 then T=(0, 1) returns the 1st and 2nd slice (0 indexed) * T=(0, -1) => desired specific timepoints, if T in the input data has depth 100 then T=(0, -1) returns the first and last slice * T=range(10) => desired specific timepoints, if T in the input data has depth 100 then T=range(10) returns the first ten slices * T=slice(0, -1, 5) => desired specific timepoints, T=slice(0, -1, 5) returns every fifth timepoint Returns ------- data: types.ArrayLike The data with the specified dimension ordering. Raises ------ ConflictingArgumentsError Missing dimension in return dims when using range, slice, or multi-index dimension selection for the requested dimension. IndexError Requested dimension index not present in data. Examples -------- Specific index selection >>> data = np.random.rand((10, 100, 100)) ... z1 = reshape_data(data, "ZYX", "YX", Z=1) List of index selection >>> data = np.random.rand((10, 100, 100)) ... first_and_second = reshape_data(data, "ZYX", "YX", Z=[0, 1]) Tuple of index selection >>> data = np.random.rand((10, 100, 100)) ... first_and_last = reshape_data(data, "ZYX", "YX", Z=(0, -1)) Range of index selection >>> data = np.random.rand((10, 100, 100)) ... first_three = reshape_data(data, "ZYX", "YX", Z=range(3)) Slice selection >>> data = np.random.rand((10, 100, 100)) ... every_other = reshape_data(data, "ZYX", "YX", Z=slice(0, -1, 2)) Empty dimension expansion >>> data = np.random.rand((10, 100, 100)) ... with_time = reshape_data(data, "ZYX", "TZYX") Dimension order shuffle >>> data = np.random.rand((10, 100, 100)) ... as_zx_base = reshape_data(data, "ZYX", "YZX") Selections, empty dimension expansions, and dimension order shuffle >>> data = np.random.rand((10, 100, 100)) ... example = reshape_data(data, "CYX", "BSTCZYX", C=slice(0, -1, 3)) """ # Check for parameter conflicts for dim in given_dims: # return_dims='CZYX' and iterable dimensions 'T=range(10)' # Dimension is in kwargs # Dimension is an iterable # Dimension is not in return dimensions if ( isinstance(kwargs.get(dim), (list, tuple, range, slice)) and dim not in return_dims ): raise ConflictingArgumentsError( f"When selecting a multiple dimension indices, the specified " f"dimension must be provided in return_dims. " f"return_dims={return_dims}, dimension {dim} = {kwargs.get(dim)}" ) # Process each dimension available new_dims = given_dims dim_specs = [] for dim in given_dims: # Store index of the dim as it is in given data dim_index = given_dims.index(dim) # Handle dim in return_dims which means that it is # an iterable or None selection if dim in return_dims: # Specific iterable requested if dim in kwargs: # Actual dim specification # The specification provided for this dimension in the kwargs dim_spec = kwargs.get(dim) display_dim_spec = dim_spec if isinstance(dim_spec, int): dim_spec = slice(dim_spec, dim_spec + 1) # Convert operator to standard list or slice # dask.Array and numpy.ndarray both natively support # List[int] and slices being passed to getitem so no need to cast them # to anything different if isinstance(dim_spec, (tuple, range)): dim_spec = list(dim_spec) # Get the largest absolute value index in the list using min and max if isinstance(dim_spec, list): check_selection_max = max([abs(min(dim_spec)), max(dim_spec)]) # try to convert to slice if possible dim_spec = reduce_to_slice(dim_spec) # Get the largest absolute value index from start and stop of slice if isinstance(dim_spec, slice): check_selection_max = max([abs(dim_spec.stop), abs(dim_spec.start)]) else: # Nothing was requested from this dimension dim_spec = slice(None, None, None) display_dim_spec = dim_spec # No op means that it doesn't matter how much data is in this dimension check_selection_max = 0 # Not in return_dims means that it is a fixed integer selection else: if dim in kwargs: # Integer requested dim_spec = kwargs.get(dim) display_dim_spec = dim_spec # Check that integer if not isinstance(dim_spec, Integral): raise TypeError( "Dimensions not in output must be integers. " f"Got {type(dim_spec).__name__} for {dim}." ) check_selection_max = dim_spec else: dim_spec = 0 display_dim_spec = dim_spec check_selection_max = 0 # Remove dim from new dims as it is fixed size new_dims = new_dims.replace(dim, "") # Check that fixed integer request isn't outside of request if check_selection_max > data.shape[dim_index]: raise IndexError( f"Dimension specified with {dim}={display_dim_spec} " f"but Dimension shape is {data.shape[dim_index]}." ) # All checks and operations passed, append dim operation to getitem ops dim_specs.append(dim_spec) # Run getitems data = data[tuple(dim_specs)] # Add empty dims where dimensions were requested but data doesn't exist # Add dimensions to new dims where empty dims are added for i, dim in enumerate(return_dims): # This dimension wasn't processed if dim not in given_dims: new_dims = f"{new_dims[:i]}{dim}{new_dims[i:]}" data = data.reshape(*data.shape[:i], 1, *data.shape[i:]) # Any extra dimensions have been removed, only a problem if the depth is > 1 return transpose_to_dims( data, given_dims=new_dims, return_dims=return_dims ) # don't pass kwargs or 2 copies
[docs] def generate_stack( image_container: ImageContainer, mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"], drop_non_matching_scenes: bool = False, select_scenes: Optional[ Union[list[Union[str, int]], tuple[Union[str, int], ...]] ] = None, scene_character: str = "I", scene_coord_values: str = "index", ) -> types.MetaArrayLike: """ Stack each scene contained in the reader into a single array. This method handles the logic of determining which stack function to use (dask or numpy) and whether or not to return a labelled array (xr.DataArray). Users should prefer to use one of get_stack, get_dask_stack, get_xarray_stack, or get_xarray_dask_stack. Parameters ---------- mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"] String describing the style of data to return. Should be one of: "data", "dask_data", "xarray_data", "xarray_dask_data". drop_non_matching_scenes: bool During the scene iteration process, if the next scene to be added to the stack has different shape or dtype, should it be dropped or raise an error. Default: False (raise an error) select_scenes: Optional[ Union[List[Union[str, int]], Tuple[Union[str, int], ...]]] Which scenes to stack into a single array. Scenes can be provided as a list or tuple of scene indices or names. It is recommended to use the scene integer index instead of the scene name to avoid duplicate scene name lookup issues. Default: None (stack all scenes) scene_character: str Character to use as the name of the scene dimension on the output array. Default "I" scene_coord_values : str How to assign coordinates to the scene dimension of the final array. If scene_coord_values="names" use the scene name from the reader object. If scene_coord_values="index" don't attach any coordinates and fall back to integer values. Default: "index" Returns ------- stack: types.MetaArrayLike The fully stacked array. This can be 6+ dimensions with Scene being the first dimension. """ mode_check = ["data", "dask_data", "xarray_data", "xarray_dask_data"] if mode not in mode_check: raise ValueError( f"Invalid mode kwarg. Found {mode} but should be one of:" f"{', '.join(mode_check)}." ) scene_stacks = [] scene_names = [] if select_scenes is None: select_scenes = list(range(len(image_container.scenes))) for i, s in enumerate(select_scenes): image_container.set_scene(s) data = getattr(image_container, mode) # Store the shape and dtype of the first scenes data # to check against later scenes. If returning a DataArray # store the coords and dims to use in the final output if i == 0: shape = data.shape dtype = data.dtype if "xarray" in mode: coords = dict(data.coords) dims = data.dims if scene_character in dims: raise ValueError( f"Provided scene dimension character '{scene_character}' " f"was found in the existing dimensions of the data {dims}" ) # Check other scenes against the first scene else: if data.shape != shape: if not drop_non_matching_scenes: raise UnexpectedShapeError( f"All scenes must have same shape. Found shape" f"{data.shape} in scene {s} but expected" f"{shape} based on scene {select_scenes[0]}" ) else: continue if data.dtype != dtype: if not drop_non_matching_scenes: raise TypeError( f"All scenes must have the same dtype. Found data" f"with dtype {data.dtype} in scene {s} but expected" f"dtype {dtype} based on scene {select_scenes[0]}" ) else: continue scene_stacks.append(data) scene_names.append(image_container.current_scene) stack = da.stack if "dask" in mode else np.stack if "xarray" in mode: all_data = stack([x.data for x in scene_stacks]) if scene_coord_values == "names": coords = {scene_character: scene_names, **coords} return xr.DataArray( all_data, dims=(scene_character, *dims), coords=coords, ) else: return stack(scene_stacks)