#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from collections import Counter
from numbers import Integral
from typing import Any, List, Literal, Optional, Tuple, Union
import dask.array as da
import numpy as np
import xarray as xr
from . import types
from .exceptions import ConflictingArgumentsError, UnexpectedShapeError
from .image_container import ImageContainer
def reduce_to_slice(L: Union[List, Tuple]) -> Union[int, List, slice, Tuple]:
# if the list only has one element, then just use it
if len(L) == 1:
return slice(L[0], L[0] + 1)
# if the list has at least 2 elements we can check for sliceable
# it is convertable to a slice if the step size between each
# consecutive pair of elements is equal and positive
# 1. get all the deltas in a list:
steps = [(L[i + 1] - L[i]) for i in range(len(L) - 1)]
# 2. check if all the deltas are equal and positive
if steps[0] > 0 and steps.count(steps[0]) == len(steps):
return slice(min(L), max(L) + 1, steps[0])
# if we can't convert to a slice, then just return the list unmodified
return L
def transpose_to_dims(
data: types.ArrayLike,
given_dims: str,
return_dims: str,
) -> types.ArrayLike:
This shuffles the data dimensions from given_dims to return_dims. Each dimension
must be present in given_dims must be used in return_dims
data: types.ArrayLike
Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions
specified in given_dims
given_dims: str
The dimension ordering of data, "CZYX", "VBTCXZY" etc
return_dims: str
The dimension ordering of the return data
data: types.ArrayLike
The data with the specified dimension ordering.
given_dims and return_dims are incompatible.
# Use a counter to track that the contents are composed of the same letters
# and that no letter is repeated
if (
Counter(given_dims) != Counter(return_dims)
or max(Counter(given_dims).values()) > 1
raise ConflictingArgumentsError(
f"given_dims={given_dims} and return_dims={return_dims} are incompatible."
# Resort the data into return_dims order
match_map = {dim: given_dims.find(dim) for dim in given_dims}
transposer = []
for dim in return_dims:
data = data.transpose(tuple(transposer))
return data
def reshape_data(
data: types.ArrayLike, given_dims: str, return_dims: str, **kwargs: Any
) -> types.ArrayLike:
Reshape the data into return_dims, pad missing dimensions, and prune extra
dimensions. Warns the user to use the base reader if the depth of the Dimension
being removed is not 1.
data: types.ArrayLike
Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions
specified in given_dims
given_dims: str
The dimension ordering of data, "CZYX", "VBTCXZY" etc
return_dims: str
The dimension ordering of the return data
* C=1 => desired specific channel, if C in the input data has depth 3 then C=1
returns the 2nd slice (0 indexed)
* Z=10 => desired specific channel, if Z in the input data has depth 20 then
Z=10 returns the 11th slice
* T=[0, 1] => desired specific timepoints, if T in the input data has depth 100
then T=[0, 1] returns the 1st and 2nd slice (0 indexed)
* T=(0, 1) => desired specific timepoints, if T in the input data has depth 100
then T=(0, 1) returns the 1st and 2nd slice (0 indexed)
* T=(0, -1) => desired specific timepoints, if T in the input data has depth 100
then T=(0, -1) returns the first and last slice
* T=range(10) => desired specific timepoints, if T in the input data has depth
100 then T=range(10) returns the first ten slices
* T=slice(0, -1, 5) => desired specific timepoints, T=slice(0, -1, 5) returns
every fifth timepoint
data: types.ArrayLike
The data with the specified dimension ordering.
Missing dimension in return dims when using range, slice, or multi-index
dimension selection for the requested dimension.
Requested dimension index not present in data.
Specific index selection
>>> data = np.random.rand((10, 100, 100))
... z1 = reshape_data(data, "ZYX", "YX", Z=1)
List of index selection
>>> data = np.random.rand((10, 100, 100))
... first_and_second = reshape_data(data, "ZYX", "YX", Z=[0, 1])
Tuple of index selection
>>> data = np.random.rand((10, 100, 100))
... first_and_last = reshape_data(data, "ZYX", "YX", Z=(0, -1))
Range of index selection
>>> data = np.random.rand((10, 100, 100))
... first_three = reshape_data(data, "ZYX", "YX", Z=range(3))
Slice selection
>>> data = np.random.rand((10, 100, 100))
... every_other = reshape_data(data, "ZYX", "YX", Z=slice(0, -1, 2))
Empty dimension expansion
>>> data = np.random.rand((10, 100, 100))
... with_time = reshape_data(data, "ZYX", "TZYX")
Dimension order shuffle
>>> data = np.random.rand((10, 100, 100))
... as_zx_base = reshape_data(data, "ZYX", "YZX")
Selections, empty dimension expansions, and dimension order shuffle
>>> data = np.random.rand((10, 100, 100))
... example = reshape_data(data, "CYX", "BSTCZYX", C=slice(0, -1, 3))
# Check for parameter conflicts
for dim in given_dims:
# return_dims='CZYX' and iterable dimensions 'T=range(10)'
# Dimension is in kwargs
# Dimension is an iterable
# Dimension is not in return dimensions
if (
isinstance(kwargs.get(dim), (list, tuple, range, slice))
and dim not in return_dims
raise ConflictingArgumentsError(
f"When selecting a multiple dimension indices, the specified "
f"dimension must be provided in return_dims. "
f"return_dims={return_dims}, dimension {dim} = {kwargs.get(dim)}"
# Process each dimension available
new_dims = given_dims
dim_specs = []
for dim in given_dims:
# Store index of the dim as it is in given data
dim_index = given_dims.index(dim)
# Handle dim in return_dims which means that it is
# an iterable or None selection
if dim in return_dims:
# Specific iterable requested
if dim in kwargs:
# Actual dim specification
# The specification provided for this dimension in the kwargs
dim_spec = kwargs.get(dim)
display_dim_spec = dim_spec
if isinstance(dim_spec, int):
dim_spec = slice(dim_spec, dim_spec + 1)
# Convert operator to standard list or slice
# dask.Array and numpy.ndarray both natively support
# List[int] and slices being passed to getitem so no need to cast them
# to anything different
if isinstance(dim_spec, (tuple, range)):
dim_spec = list(dim_spec)
# Get the largest absolute value index in the list using min and max
if isinstance(dim_spec, list):
check_selection_max = max([abs(min(dim_spec)), max(dim_spec)])
# try to convert to slice if possible
dim_spec = reduce_to_slice(dim_spec)
# Get the largest absolute value index from start and stop of slice
if isinstance(dim_spec, slice):
check_selection_max = max([abs(dim_spec.stop), abs(dim_spec.start)])
# Nothing was requested from this dimension
dim_spec = slice(None, None, None)
display_dim_spec = dim_spec
# No op means that it doesn't matter how much data is in this dimension
check_selection_max = 0
# Not in return_dims means that it is a fixed integer selection
if dim in kwargs:
# Integer requested
dim_spec = kwargs.get(dim)
display_dim_spec = dim_spec
# Check that integer
if not isinstance(dim_spec, Integral):
raise TypeError(
"Dimensions not in output must be integers. "
f"Got {type(dim_spec).__name__} for {dim}."
check_selection_max = dim_spec
dim_spec = 0
display_dim_spec = dim_spec
check_selection_max = 0
# Remove dim from new dims as it is fixed size
new_dims = new_dims.replace(dim, "")
# Check that fixed integer request isn't outside of request
if check_selection_max > data.shape[dim_index]:
raise IndexError(
f"Dimension specified with {dim}={display_dim_spec} "
f"but Dimension shape is {data.shape[dim_index]}."
# All checks and operations passed, append dim operation to getitem ops
# Run getitems
data = data[tuple(dim_specs)]
# Add empty dims where dimensions were requested but data doesn't exist
# Add dimensions to new dims where empty dims are added
for i, dim in enumerate(return_dims):
# This dimension wasn't processed
if dim not in given_dims:
new_dims = f"{new_dims[:i]}{dim}{new_dims[i:]}"
data = data.reshape(*data.shape[:i], 1, *data.shape[i:])
# Any extra dimensions have been removed, only a problem if the depth is > 1
return transpose_to_dims(
data, given_dims=new_dims, return_dims=return_dims
) # don't pass kwargs or 2 copies
def generate_stack(
image_container: ImageContainer,
mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"],
drop_non_matching_scenes: bool = False,
select_scenes: Optional[
Union[list[Union[str, int]], tuple[Union[str, int], ...]]
] = None,
scene_character: str = "I",
scene_coord_values: str = "index",
) -> types.MetaArrayLike:
Stack each scene contained in the reader into a
single array. This method handles the logic of determining which
stack function to use (dask or numpy) and whether or not to return a
labelled array (xr.DataArray). Users should prefer
to use one of get_stack, get_dask_stack, get_xarray_stack, or
mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"]
String describing the style of data to return. Should be one of:
"data", "dask_data", "xarray_data", "xarray_dask_data".
drop_non_matching_scenes: bool
During the scene iteration process, if the next scene to be added
to the stack has different shape or dtype, should it be dropped or
raise an error. Default: False (raise an error)
select_scenes: Optional[
Union[List[Union[str, int]], Tuple[Union[str, int], ...]]]
Which scenes to stack into a single array. Scenes can be provided
as a list or tuple of scene indices or names. It is recommended to
use the scene integer index instead of the scene name to avoid
duplicate scene name lookup issues.
Default: None (stack all scenes)
scene_character: str
Character to use as the name of the scene dimension on the output
array. Default "I"
scene_coord_values : str
How to assign coordinates to the scene dimension of the final
array. If scene_coord_values="names" use the scene name from
the reader object. If scene_coord_values="index" don't attach any
coordinates and fall back to integer values.
Default: "index"
stack: types.MetaArrayLike
The fully stacked array. This can be 6+ dimensions with Scene being
the first dimension.
mode_check = ["data", "dask_data", "xarray_data", "xarray_dask_data"]
if mode not in mode_check:
raise ValueError(
f"Invalid mode kwarg. Found {mode} but should be one of:"
f"{', '.join(mode_check)}."
scene_stacks = []
scene_names = []
if select_scenes is None:
select_scenes = list(range(len(image_container.scenes)))
for i, s in enumerate(select_scenes):
data = getattr(image_container, mode)
# Store the shape and dtype of the first scenes data
# to check against later scenes. If returning a DataArray
# store the coords and dims to use in the final output
if i == 0:
shape = data.shape
dtype = data.dtype
if "xarray" in mode:
coords = dict(data.coords)
dims = data.dims
if scene_character in dims:
raise ValueError(
f"Provided scene dimension character '{scene_character}' "
f"was found in the existing dimensions of the data {dims}"
# Check other scenes against the first scene
if data.shape != shape:
if not drop_non_matching_scenes:
raise UnexpectedShapeError(
f"All scenes must have same shape. Found shape"
f"{data.shape} in scene {s} but expected"
f"{shape} based on scene {select_scenes[0]}"
if data.dtype != dtype:
if not drop_non_matching_scenes:
raise TypeError(
f"All scenes must have the same dtype. Found data"
f"with dtype {data.dtype} in scene {s} but expected"
f"dtype {dtype} based on scene {select_scenes[0]}"
stack = da.stack if "dask" in mode else np.stack
if "xarray" in mode:
all_data = stack([x.data for x in scene_stacks])
if scene_coord_values == "names":
coords = {scene_character: scene_names, **coords}
return xr.DataArray(
dims=(scene_character, *dims),
return stack(scene_stacks)