"""Scan operations for groupby reductions.
This module provides scan operations (cumulative reductions) for grouped data.
"""
from __future__ import annotations
import copy
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from .aggregations import (
SCANS,
AlignedArrays,
Scan,
ScanState,
_atleast_1d,
generic_aggregate,
)
from .cohorts import find_group_cohorts
from .factorize import _factorize_multiple
from .lib import _should_auto_rechunk_blockwise
from .rechunk import rechunk_for_blockwise
from .xrutils import is_duck_array, is_duck_dask_array, module_available
if module_available("numpy", minversion="2.0.0"):
from numpy.lib.array_utils import normalize_axis_tuple
else:
from numpy.core.numeric import normalize_axis_tuple # type: ignore[no-redef]
if TYPE_CHECKING:
from typing import Literal
from .core import (
T_By,
T_EngineOpt,
T_ExpectedGroupsOpt,
T_MethodOpt,
T_Scan,
)
from .types import DaskArray
T_ScanMethod = Literal["blockwise", "blelloch"]
def _choose_scan_method(
method: T_MethodOpt, preferred_method: T_ScanMethod, nax: int, by_ndim: int
) -> T_ScanMethod:
"""Choose the scan method based on user input and preferred method.
Parameters
----------
method : T_MethodOpt
User-specified method, or None for automatic selection.
preferred_method : T_ScanMethod
The preferred method based on data layout analysis.
nax : int
Number of axes being reduced.
by_ndim : int
Number of dimensions in the `by` array.
Returns
-------
T_ScanMethod
The chosen scan method: "blockwise" or "blelloch".
"""
if method is None:
# Scans must reduce along all dimensions of by for blockwise
if nax != by_ndim:
return "blelloch"
return preferred_method
elif method == "blockwise":
return "blockwise"
else:
# For any other method (including map-reduce, cohorts), use blelloch
return "blelloch"
def _validate_expected_groups(nby, expected_groups):
"""Validate expected_groups for scan operations."""
if expected_groups is None:
return (None,) * nby
return expected_groups
def _convert_expected_groups_to_index(expected_groups):
"""Convert expected_groups to index for scan operations."""
result = []
for expect in expected_groups:
if expect is None:
result.append(None)
elif isinstance(expect, pd.Index):
result.append(expect)
else:
result.append(pd.Index(expect))
return tuple(result)
[docs]
def groupby_scan(
array: np.ndarray | DaskArray,
*by: T_By,
func: T_Scan,
expected_groups: T_ExpectedGroupsOpt = None,
axis: int | tuple[int] = -1,
dtype: np.typing.DTypeLike = None,
method: T_MethodOpt = None,
engine: T_EngineOpt = None,
) -> np.ndarray | DaskArray:
"""
GroupBy reductions using parallel scans for dask.array
Parameters
----------
array : ndarray or DaskArray
Array to be reduced, possibly nD
*by : ndarray or DaskArray
Array of labels to group over. Must be aligned with ``array`` so that
``array.shape[-by.ndim :] == by.shape`` or any disagreements in that
equality check are for dimensions of size 1 in `by`.
func : {"nancumsum", "ffill", "bfill"} or Scan
Single function name or a Scan instance
expected_groups : (optional) Sequence
Expected unique labels.
axis : None or int or Sequence[int], optional
If None, reduce across all dimensions of by
Else, reduce across corresponding axes of array
Negative integers are normalized using array.ndim.
fill_value : Any
Value to assign when a label in ``expected_groups`` is not present.
dtype : data-type , optional
DType for the output. Can be anything that is accepted by ``np.dtype``.
method : {"blockwise", "blelloch"}, optional
Strategy for scan of dask arrays only:
* ``"blockwise"``:
Only scan using blockwise and avoid aggregating blocks
together. Useful for resampling-style groupby problems where group
members are always together. If `by` is 1D, `array` is automatically
rechunked so that chunk boundaries line up with group boundaries
i.e. each block contains all members of any group present
in that block. For nD `by`, you must make sure that all members of a group
are present in a single block.
* ``"blelloch"``:
Use Blelloch's parallel prefix scan algorithm, which allows
scanning across chunk boundaries. This is the default when groups
span multiple chunks.
engine : {"flox", "numpy", "numba", "numbagg"}, optional
Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk:
* ``"numpy"``:
Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``.
This is the default choice because it works for most array types.
* ``"flox"``:
Use an internal implementation where the data is sorted so that
all members of a group occur sequentially, and then numpy.ufunc.reduceat
is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
* ``"numba"``:
Use the implementations in ``numpy_groupies.aggregate_numba``.
* ``"numbagg"``:
Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
Returns
-------
result
Aggregated result
See Also
--------
xarray.xarray_reduce
"""
from . import xrdtypes
axis = _atleast_1d(axis)
if len(axis) > 1:
raise NotImplementedError("Scans are only supported along a single dimension.")
bys: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by)
nby = len(by)
by_is_dask = tuple(is_duck_dask_array(b) for b in bys)
any_by_dask = any(by_is_dask)
axis_ = normalize_axis_tuple(axis, array.ndim)
if engine is not None:
raise NotImplementedError("Setting `engine` is not supported for scans yet.")
if engine is None:
engine = "flox"
assert engine == "flox"
if not is_duck_array(array):
array = np.asarray(array)
agg = SCANS[func] if isinstance(func, str) else func
assert isinstance(agg, Scan)
agg = copy.deepcopy(agg)
if (agg == SCANS["ffill"] or agg == SCANS["bfill"]) and array.dtype.kind != "f":
# nothing to do, no NaNs!
return array
if expected_groups is not None:
raise NotImplementedError("Setting `expected_groups` with scans is not supported yet.")
expected_groups = _validate_expected_groups(nby, expected_groups)
expected_groups = _convert_expected_groups_to_index(expected_groups)
# Don't factorize early only when
# grouping by dask arrays, and not having expected_groups
factorize_early = not (
# can't do it if we are grouping by dask array but don't have expected_groups
any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups))
)
if factorize_early:
bys, final_groups, grp_shape = _factorize_multiple(
bys,
expected_groups,
any_by_dask=any_by_dask,
sort=False,
)
else:
raise NotImplementedError
assert len(bys) == 1
by_: np.ndarray
(by_,) = bys
has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
nax = len(axis_)
# Method selection for dask arrays
scan_method: T_ScanMethod = "blelloch"
if has_dask:
(single_axis,) = axis_ # type: ignore[misc]
# Try rechunking for sorted 1D by when method is not specified
if _should_auto_rechunk_blockwise(method, array, any_by_dask, by_):
rechunk_method, array = rechunk_for_blockwise(array, single_axis, by_, force=False)
if rechunk_method == "blockwise":
method = "blockwise"
# Determine preferred method based on data layout
if not any_by_dask and method is None:
cohorts_method, _ = find_group_cohorts(
by_,
[array.chunks[ax] for ax in range(-by_.ndim, 0)], # type: ignore[union-attr]
expected_groups=None,
merge=False,
)
# Map groupby_reduce methods to scan methods
preferred_method: T_ScanMethod = "blockwise" if cohorts_method == "blockwise" else "blelloch"
else:
preferred_method = "blelloch"
# Choose the final method
scan_method = _choose_scan_method(method, preferred_method, nax, by_.ndim)
# Rechunk if blockwise was explicitly requested but data isn't aligned
if preferred_method != "blockwise" and scan_method == "blockwise" and by_.ndim == 1:
_, array = rechunk_for_blockwise(array, axis=-1, labels=by_)
if array.dtype.kind in "Mm":
cast_to = array.dtype
array = array.view(np.int64)
elif array.dtype.kind == "b":
array = array.view(np.int8)
cast_to = None
if agg.preserves_dtype:
cast_to = bool
else:
cast_to = None
# TODO: move to aggregate_npg.py
if agg.name in ["cumsum", "nancumsum"] and array.dtype.kind in ["i", "u"]:
# https://numpy.org/doc/stable/reference/generated/numpy.cumsum.html
# it defaults to the dtype of a, unless a
# has an integer dtype with a precision less than that of the default platform integer.
if array.dtype.kind == "i":
agg.dtype = np.result_type(array.dtype, np.int_)
elif array.dtype.kind == "u":
agg.dtype = np.result_type(array.dtype, np.uint)
else:
agg.dtype = array.dtype if dtype is None else dtype
agg.identity = xrdtypes._get_fill_value(agg.dtype, agg.identity)
(single_axis,) = axis_ # type: ignore[misc]
# avoid some roundoff error when we can.
if by_.shape[-1] == 1 or by_.shape == grp_shape:
array = array.astype(agg.dtype)
if cast_to is not None:
array = array.astype(cast_to)
return array
# Made a design choice here to have `preprocess` handle both array and group_idx
# Example: for reversing, we need to reverse the whole array, not just reverse
# each block independently
inp = AlignedArrays(array=array, group_idx=by_)
if agg.preprocess:
inp = agg.preprocess(inp)
if not has_dask:
final_state = chunk_scan(inp, axis=single_axis, agg=agg, dtype=agg.dtype)
result = _finalize_scan(final_state, dtype=agg.dtype)
else:
from .dask import dask_groupby_scan
result = dask_groupby_scan(inp.array, inp.group_idx, axes=axis_, agg=agg, method=scan_method)
# Made a design choice here to have `postprocess` handle both array and group_idx
out = AlignedArrays(array=result, group_idx=by_)
if agg.finalize:
out = agg.finalize(out)
if cast_to is not None:
return out.array.astype(cast_to)
return out.array
def chunk_scan(inp: AlignedArrays, *, axis: int, agg: Scan, dtype=None, keepdims=None) -> ScanState:
assert axis == inp.array.ndim - 1
# I don't think we need to re-factorize here unless we are grouping by a dask array
accumulated = generic_aggregate(
inp.group_idx,
inp.array,
axis=axis,
engine="flox",
func=agg.scan,
dtype=dtype,
fill_value=agg.identity,
)
result = AlignedArrays(array=accumulated, group_idx=inp.group_idx)
return ScanState(result=result, state=None)
def grouped_reduce(inp: AlignedArrays, *, agg: Scan, axis: int, keepdims=None) -> ScanState:
from .core import chunk_reduce
assert axis == inp.array.ndim - 1
reduced = chunk_reduce(
inp.array,
inp.group_idx,
func=(agg.reduction,),
axis=axis,
engine="flox",
dtype=inp.array.dtype,
fill_value=agg.identity,
expected_groups=None,
)
return ScanState(
state=AlignedArrays(array=reduced["intermediates"][0], group_idx=reduced["groups"]),
result=None,
)
def _zip(group_idx: np.ndarray, array: np.ndarray) -> AlignedArrays:
return AlignedArrays(group_idx=group_idx, array=array)
def _finalize_scan(block: ScanState, dtype) -> np.ndarray:
assert block.result is not None
return block.result.array.astype(dtype, copy=False)
__all__ = [
"_finalize_scan",
"_zip",
"chunk_scan",
"grouped_reduce",
"groupby_scan",
]